diff --git a/package.json b/package.json index 0dfaac3..0e9870a 100644 --- a/package.json +++ b/package.json @@ -20,6 +20,7 @@ "test:e2e": "jest --config ./test/jest-e2e.json" }, "dependencies": { + "@liaoliaots/nestjs-redis": "^9.0.5", "@nestjs/bull": "^10.0.1", "@nestjs/cache-manager": "^2.1.1", "@nestjs/common": "^10.0.0", @@ -38,6 +39,7 @@ "cache-manager-redis-yet": "^4.1.2", "fp-ts": "^2.16.3", "hbs": "^4.2.0", + "ioredis": "^5.3.2", "minio": "^7.1.3", "open-graph-scraper": "^6.3.0", "prom-client": "^15.0.0", diff --git a/src/app.module.ts b/src/app.module.ts index 5e798a2..f206678 100644 --- a/src/app.module.ts +++ b/src/app.module.ts @@ -24,6 +24,8 @@ import { FileModule } from './file/file.module'; import { FocoLiveModule } from './foco-live/foco-live.module'; import { PowModule } from './pow/pow.module'; import { FocoCoffeeModule } from './fococoffee/fococoffee.module'; +import { JobsModule } from './jobs/jobs.module'; +import { RedisModule, RedisModuleOptions } from '@liaoliaots/nestjs-redis'; @Module({ imports: [ @@ -32,6 +34,20 @@ import { FocoCoffeeModule } from './fococoffee/fococoffee.module'; isGlobal: true, load: [configuration], }), + RedisModule.forRootAsync({ + imports: [ConfigModule], + inject: [ConfigService], + useFactory: async (configService: ConfigService): Promise => { + return { + config: { + host: configService.get('redis.host'), + port: configService.get('redis.port'), + password: configService.get('redis.password'), + db: configService.get('redis.db'), + } + }; + } + }), CacheModule.registerAsync({ isGlobal: true, inject: [ConfigService], @@ -82,6 +98,7 @@ import { FocoCoffeeModule } from './fococoffee/fococoffee.module'; FocoLiveModule, PowModule, FocoCoffeeModule, + JobsModule, ], controllers: [AppController], providers: [AppService], diff --git a/src/jobs/jobs.controller.ts b/src/jobs/jobs.controller.ts new file mode 100644 index 0000000..f02e797 --- /dev/null +++ b/src/jobs/jobs.controller.ts @@ -0,0 +1,83 @@ +import { Body, Controller, Get, Param, Post } from '@nestjs/common'; +import { JobsService } from './jobs.service'; +import { ApiBody, ApiConsumes, ApiParam, ApiProperty, ApiTags } from '@nestjs/swagger'; + + +class ClaimCompleteDto { + @ApiProperty({ + description: 'Identity of the completer', + example: 'my-identity' + }) + completer: string; + + @ApiProperty({ + description: 'Identity of the item to complete', + example: 'my-item' + }) + item: string; + + @ApiProperty({ + description: 'Data to store with the completion', + example: { foo: 'bar' } + }) + data: any; +} + +@Controller('jobs') +@ApiTags('jobs') +export class JobsController { + constructor( + private readonly jobsService: JobsService, + ) { } + + @Get(':jobName/stats') + @ApiParam({ name: 'jobName', required: true }) + async getStats(@Param('jobName') jobName: string) { + return { + todoCount: await this.jobsService.getTodoItemCount(jobName), + claimedCount: await this.jobsService.getClaimedItemCount(jobName), + doneCount: await this.jobsService.getDoneItemCount(jobName), + }; + } + + @Get(':jobName/todo') + @ApiParam({ name: 'jobName', required: true }) + async getTodoItems(@Param('jobName') jobName: string) { + return this.jobsService.getTodoItems(jobName); + } + + @Get(':jobName/leaderboard') + @ApiParam({ name: 'jobName', required: true }) + async getLeaderboard(@Param('jobName') jobName: string) { + return this.jobsService.getLeaderboard(jobName); + } + + @Post(':jobName/add') + @ApiParam({ name: 'jobName', required: true }) + @ApiConsumes('text/plain') + @ApiBody({ type: String, description: "Items to add, one per line separated by newline characters" }) + async addItemsToJob(@Param('jobName') jobName: string, @Body() items: string) { + return this.jobsService.addItemsToJob(jobName, items.split('\n')); + } + + @Post(':jobName/claim') + @ApiParam({ name: 'jobName', required: true }) + @ApiConsumes('text/plain') + @ApiBody({ type: String, description: "Claimer identity string" }) + async claimJobItem(@Param('jobName') jobName: string, @Body() claimer: string) { + return this.jobsService.claimJobItem(jobName, claimer); + } + + @Post(':jobName/complete') + @ApiParam({ name: 'jobName', required: true }) + async completeJobItem(@Param('jobName') jobName: string, @Body() body: ClaimCompleteDto) { + return this.jobsService.completeJobItem(jobName, body.item, body.completer, body.data); + } + + @Post(':jobName/reset-claimed') + @ApiParam({ name: 'jobName', required: true }) + async resetClaimed(@Param('jobName') jobName: string) { + return this.jobsService.resetClaimedItems(jobName); + } + +} diff --git a/src/jobs/jobs.module.ts b/src/jobs/jobs.module.ts new file mode 100644 index 0000000..d9e5887 --- /dev/null +++ b/src/jobs/jobs.module.ts @@ -0,0 +1,10 @@ +import { Module } from '@nestjs/common'; +import { JobsService } from './jobs.service'; +import { JobsController } from './jobs.controller'; +import { MinioService } from 'src/minio/minio.service'; + +@Module({ + providers: [JobsService, MinioService], + controllers: [JobsController] +}) +export class JobsModule { } diff --git a/src/jobs/jobs.service.ts b/src/jobs/jobs.service.ts new file mode 100644 index 0000000..e2124a9 --- /dev/null +++ b/src/jobs/jobs.service.ts @@ -0,0 +1,156 @@ +import { Inject, Injectable } from '@nestjs/common'; +import { MinioService } from 'src/minio/minio.service'; +import Redis from 'ioredis' +import { InjectRedis } from '@liaoliaots/nestjs-redis'; +import { CACHE_MANAGER } from '@nestjs/cache-manager'; +import { Cache } from 'cache-manager'; + +@Injectable() +export class JobsService { + constructor( + private readonly minioService: MinioService, + @InjectRedis() private readonly redis: Redis, + @Inject(CACHE_MANAGER) private readonly cacheManager: Cache, + ) { } + + private jobNameBuilder(jobName: string) { + return `job:${jobName}`; + } + + private todoListNameBuilder(jobName: string) { + return `todo:${jobName}`; + } + + private doneListNameBuilder(jobName: string) { + return `done:${jobName}`; + } + + private claimedListNameBuilder(jobName: string) { + return `claimed:${jobName}`; + } + + private completeCountNameBuilder(jobName: string, claimer: string) { + return `complete:${jobName}:${claimer}`; + } + + private claimerCountNameBuilder(jobName: string, claimer: string) { + return `claim:${jobName}:${claimer}`; + } + + private async getCompleteCounts(jobName: string) { + const keys = await this.redis.keys(`complete:${jobName}:*`); + const counts = await Promise.all(keys.map(async key => { + const count = await this.redis.get(key); + if (!count) { + return null; + } + return { claimer: key.split(':')[2], count: parseInt(count) }; + })); + return counts.reduce((acc: any, val: any) => { + if (!val) { + return acc; + } + acc[val.claimer] = val.count; + return acc; + }, {}) + } + + private async getClaimCounts(jobName: string) { + const keys = await this.redis.keys(`claim:${jobName}:*`); + const counts = await Promise.all(keys.map(async key => { + const count = await this.redis.get(key); + if (!count) { + return null; + } + return { claimer: key.split(':')[2], count: parseInt(count) }; + })); + return counts.reduce((acc: any, val: any) => { + if (!val) { + return acc; + } + acc[val.claimer] = val.count; + return acc; + }, {}); + } + + async getLeaderboard(jobName: string) { + const cachedLeaderboard = await this.cacheManager.get(`leaderboard:${jobName}`); + if (cachedLeaderboard) { + return cachedLeaderboard; + } + const completeCounts = await this.getCompleteCounts(jobName); + const claimCounts = await this.getClaimCounts(jobName); + this.cacheManager.set(`leaderboard:${jobName}`, { completeCounts, claimCounts }, 200); + return { completeCounts, claimCounts }; + } + + + async addItemsToJob(jobName: string, items: string[]) { + await this.redis.rpush(this.todoListNameBuilder(jobName), ...items); + } + + async claimJobItem(jobName: string, claimer: string): Promise { + const jobItem = await this.redis.brpoplpush(this.todoListNameBuilder(jobName), this.claimedListNameBuilder(jobName), 10); + if (jobItem) { + await this.redis.rpush(this.jobNameBuilder(jobName), JSON.stringify({ item: jobItem, client: claimer })); + } + await this.redis.incr(this.claimerCountNameBuilder(jobName, claimer)); + return jobItem; + } + + async completeJobItem(jobName: string, jobItem: string, completer: string, data: any) { + await this.redis.lrem(this.claimedListNameBuilder(jobName), 1, jobItem); + await this.redis.lrem(this.todoListNameBuilder(jobName), 1, JSON.stringify({ item: jobItem, client: completer })); + await this.redis.rpush(this.doneListNameBuilder(jobName), JSON.stringify({ item: jobItem, client: completer, data })); + await this.redis.decr(this.claimerCountNameBuilder(jobName, completer)); + await this.redis.incr(this.completeCountNameBuilder(jobName, completer)); + } + + async getTodoItems(jobName: string) { + return this.redis.lrange(this.todoListNameBuilder(jobName), 0, -1); + } + + async getTodoItemCount(jobName: string) { + return this.redis.llen(this.todoListNameBuilder(jobName)); + } + + async getClaimedItems(jobName: string) { + return this.redis.lrange(this.claimedListNameBuilder(jobName), 0, -1); + } + + async getClaimedItemCount(jobName: string) { + return this.redis.llen(this.claimedListNameBuilder(jobName)); + } + + async getDoneItems(jobName: string) { + return this.redis.lrange(this.doneListNameBuilder(jobName), 0, -1); + } + + async getDoneItemCount(jobName: string) { + return this.redis.llen(this.doneListNameBuilder(jobName)); + } + + async getJobs() { + return this.redis.keys('job:*'); + } + + async registerJob(jobName: string, metadata: any) { + await this.redis.set(this.jobNameBuilder(jobName), JSON.stringify(metadata)); + } + + async getJobMetadata(jobName: string): Promise { + const result = await this.redis.get(this.jobNameBuilder(jobName)) + if (!result) { + return null; + } + return JSON.parse(result) + } + + async resetClaimedItems(jobName: string) { + const claimedItems = await this.getClaimedItems(jobName); + for (const claimedItem of claimedItems) { + await this.redis.rpoplpush(this.claimedListNameBuilder(jobName), this.todoListNameBuilder(jobName)); + } + } + +} diff --git a/yarn.lock b/yarn.lock index 7cfee5d..89c8e0f 100644 --- a/yarn.lock +++ b/yarn.lock @@ -687,6 +687,13 @@ "@jridgewell/resolve-uri" "^3.1.0" "@jridgewell/sourcemap-codec" "^1.4.14" +"@liaoliaots/nestjs-redis@^9.0.5": + version "9.0.5" + resolved "https://registry.yarnpkg.com/@liaoliaots/nestjs-redis/-/nestjs-redis-9.0.5.tgz#8e8b1326792c83599425eca123bb3a93d6cef248" + integrity sha512-nPcGLj0zW4mEsYtQYfWx3o7PmrMjuzFk6+t/g2IRopAeWWUZZ/5nIJ4KTKiz/3DJEUkbX8PZqB+dOhklGF0SVA== + dependencies: + tslib "2.4.1" + "@lukeed/csprng@^1.0.0": version "1.1.0" resolved "https://registry.yarnpkg.com/@lukeed/csprng/-/csprng-1.1.0.tgz#1e3e4bd05c1cc7a0b2ddbd8a03f39f6e4b5e6cfe" @@ -5727,6 +5734,11 @@ tsconfig-paths@4.2.0, tsconfig-paths@^4.1.2, tsconfig-paths@^4.2.0: minimist "^1.2.6" strip-bom "^3.0.0" +tslib@2.4.1: + version "2.4.1" + resolved "https://registry.yarnpkg.com/tslib/-/tslib-2.4.1.tgz#0d0bfbaac2880b91e22df0768e55be9753a5b17e" + integrity sha512-tGyy4dAjRIEwI7BzsB0lynWgOpfqjUdq91XXAlIWD2OwKBH7oCl/GZG/HT4BOHrTlPMOASlMQ7veyTqpmRcrNA== + tslib@2.6.0: version "2.6.0" resolved "https://registry.yarnpkg.com/tslib/-/tslib-2.6.0.tgz#b295854684dbda164e181d259a22cd779dcd7bc3"