import { Inject, Injectable } from '@nestjs/common'; import { MinioService } from 'src/minio/minio.service'; import Redis from 'ioredis' import { InjectRedis } from '@liaoliaots/nestjs-redis'; import { CACHE_MANAGER } from '@nestjs/cache-manager'; import { Cache } from 'cache-manager'; import { aperture } from 'ramda'; export interface JobMetadata { name: string; description: string; tags: string[]; createdBy: string; createdAt: string; claimSecret: string; } export const privateMetadataKeys = ['claimSecret']; export type PublicJobMetadata = Omit; type Leaderboard = { completeCounts: { [claimer: string]: number }, claimCounts: { [claimer: string]: number } } @Injectable() export class JobsService { constructor( private readonly minioService: MinioService, @InjectRedis() private readonly redis: Redis, @Inject(CACHE_MANAGER) private readonly cacheManager: Cache, ) { } public static cleanJobMetadata(metadata: JobMetadata): PublicJobMetadata { return Object.fromEntries(Object.entries(metadata).filter(([key]) => !privateMetadataKeys.includes(key))) as PublicJobMetadata; } private jobNameBuilder(jobName: string) { return `job:${jobName}`; } private todoListNameBuilder(jobName: string) { return `todo:${jobName}`; } private doneListNameBuilder(jobName: string) { return `done:${jobName}`; } private claimedListNameBuilder(jobName: string) { return `claimed:${jobName}`; } private completeCountNameBuilder(jobName: string, claimer: string) { return `complete:${jobName}:${claimer}`; } private claimerCountNameBuilder(jobName: string, claimer: string) { return `claim:${jobName}:${claimer}`; } private claimerCountWildcardBuilder(jobName: string) { return `claim:${jobName}:*`; } private async getCompleteCounts(jobName: string): Promise<{ [claimer: string]: number }> { const keys = await this.redis.keys(`complete:${jobName}:*`); const counts = await Promise.all(keys.map(async key => { const count = await this.redis.get(key); if (!count) { return null; } return { claimer: key.split(':')[2], count: parseInt(count) }; })); return counts.reduce((acc: any, val: any) => { if (!val) { return acc; } acc[val.claimer] = val.count; return acc; }, {}) } private async getClaimCounts(jobName: string): Promise<{ [claimer: string]: number }> { const keys = await this.redis.keys(`claim:${jobName}:*`); const counts = await Promise.all(keys.map(async key => { const count = await this.redis.get(key); if (!count) { return null; } return { claimer: key.split(':')[2], count: parseInt(count) }; })); return counts.reduce((acc: any, val: any) => { if (!val) { return acc; } acc[val.claimer] = val.count; return acc; }, {}); } async getLeaderboard(jobName: string): Promise { const cachedLeaderboard = await this.cacheManager.get(`leaderboard:${jobName}`); if (cachedLeaderboard) { return cachedLeaderboard; } const completeCounts = await this.getCompleteCounts(jobName); const claimCounts = await this.getClaimCounts(jobName); this.cacheManager.set(`leaderboard:${jobName}`, { completeCounts, claimCounts }, 200); return { completeCounts, claimCounts }; } async addItemsToJob(jobName: string, items: string[]) { const apertureSize = 100 for (const itemSubset of (items.length > apertureSize ? aperture(apertureSize, items) : [items])) { await this.redis.rpush(this.todoListNameBuilder(jobName), ...itemSubset); } } async claimJobItem(jobName: string, claimer: string): Promise { const jobItem = await this.redis.brpoplpush(this.todoListNameBuilder(jobName), this.claimedListNameBuilder(jobName), 10); if (!jobItem) { return null; } await this.redis.incr(this.claimerCountNameBuilder(jobName, claimer)); return jobItem; } async completeJobItem(jobName: string, jobItem: string, completer: string, data: any) { const claimRemoveResult = await this.redis.lrem(this.claimedListNameBuilder(jobName), 1, jobItem); if (claimRemoveResult === 0) { return false; } await this.redis.rpush(this.doneListNameBuilder(jobName), JSON.stringify({ item: jobItem, client: completer, data })); await this.redis.decr(this.claimerCountNameBuilder(jobName, completer)); await this.redis.incr(this.completeCountNameBuilder(jobName, completer)); return true } async getTodoItems(jobName: string) { return this.redis.lrange(this.todoListNameBuilder(jobName), 0, -1); } async getTodoItemCount(jobName: string) { return this.redis.llen(this.todoListNameBuilder(jobName)); } async getClaimedItems(jobName: string) { return this.redis.lrange(this.claimedListNameBuilder(jobName), 0, -1); } async getClaimedItemCount(jobName: string) { return this.redis.llen(this.claimedListNameBuilder(jobName)); } async getDoneItems(jobName: string) { return this.redis.lrange(this.doneListNameBuilder(jobName), 0, -1); } async getDoneItemCount(jobName: string) { return this.redis.llen(this.doneListNameBuilder(jobName)); } async getJobs() { return this.redis.keys('job:*'); } async isJobRegistered(jobName: string) { return this.redis.exists(this.jobNameBuilder(jobName)); } async registerJob(jobName: string, metadata: JobMetadata) { if (await this.isJobRegistered(jobName)) { return false; } await this.redis.set(this.jobNameBuilder(jobName), JSON.stringify(metadata)); return true } async getJobMetadata(jobName: string): Promise { const result = await this.redis.get(this.jobNameBuilder(jobName)) if (!result) { return null; } return JSON.parse(result) } async getPublicJobMetadata(jobName: string): Promise { const metadata = await this.getJobMetadata(jobName); if (!metadata) { return null; } return JobsService.cleanJobMetadata(metadata); } async clearClaimerCounts(jobName: string) { const keys = await this.redis.keys(this.claimerCountWildcardBuilder(jobName)); await Promise.all(keys.map(key => this.redis.del(key))); } async resetClaimedItems(jobName: string) { const claimedItems = await this.getClaimedItems(jobName); for (const claimedItem of claimedItems) { await this.redis.rpoplpush(this.claimedListNameBuilder(jobName), this.todoListNameBuilder(jobName)); } await this.clearClaimerCounts(jobName); } async clearTodoItems(jobName: string, claimKey: string) { const metadata = await this.getJobMetadata(jobName); if (metadata === null || metadata.claimSecret !== claimKey) { return false; } await this.redis.del(this.todoListNameBuilder(jobName)); } }