Add initial jobs implementation

This commit is contained in:
2024-04-11 22:28:56 -06:00
parent 73c91a7c63
commit aa1277fafd
6 changed files with 280 additions and 0 deletions

View File

@@ -24,6 +24,8 @@ import { FileModule } from './file/file.module';
import { FocoLiveModule } from './foco-live/foco-live.module';
import { PowModule } from './pow/pow.module';
import { FocoCoffeeModule } from './fococoffee/fococoffee.module';
import { JobsModule } from './jobs/jobs.module';
import { RedisModule, RedisModuleOptions } from '@liaoliaots/nestjs-redis';
@Module({
imports: [
@@ -32,6 +34,20 @@ import { FocoCoffeeModule } from './fococoffee/fococoffee.module';
isGlobal: true,
load: [configuration],
}),
RedisModule.forRootAsync({
imports: [ConfigModule],
inject: [ConfigService],
useFactory: async (configService: ConfigService): Promise<RedisModuleOptions> => {
return {
config: {
host: configService.get<string>('redis.host'),
port: configService.get<number>('redis.port'),
password: configService.get<string>('redis.password'),
db: configService.get<number>('redis.db'),
}
};
}
}),
CacheModule.registerAsync<RedisClientOptions>({
isGlobal: true,
inject: [ConfigService],
@@ -82,6 +98,7 @@ import { FocoCoffeeModule } from './fococoffee/fococoffee.module';
FocoLiveModule,
PowModule,
FocoCoffeeModule,
JobsModule,
],
controllers: [AppController],
providers: [AppService],

View File

@@ -0,0 +1,83 @@
import { Body, Controller, Get, Param, Post } from '@nestjs/common';
import { JobsService } from './jobs.service';
import { ApiBody, ApiConsumes, ApiParam, ApiProperty, ApiTags } from '@nestjs/swagger';
class ClaimCompleteDto {
@ApiProperty({
description: 'Identity of the completer',
example: 'my-identity'
})
completer: string;
@ApiProperty({
description: 'Identity of the item to complete',
example: 'my-item'
})
item: string;
@ApiProperty({
description: 'Data to store with the completion',
example: { foo: 'bar' }
})
data: any;
}
@Controller('jobs')
@ApiTags('jobs')
export class JobsController {
constructor(
private readonly jobsService: JobsService,
) { }
@Get(':jobName/stats')
@ApiParam({ name: 'jobName', required: true })
async getStats(@Param('jobName') jobName: string) {
return {
todoCount: await this.jobsService.getTodoItemCount(jobName),
claimedCount: await this.jobsService.getClaimedItemCount(jobName),
doneCount: await this.jobsService.getDoneItemCount(jobName),
};
}
@Get(':jobName/todo')
@ApiParam({ name: 'jobName', required: true })
async getTodoItems(@Param('jobName') jobName: string) {
return this.jobsService.getTodoItems(jobName);
}
@Get(':jobName/leaderboard')
@ApiParam({ name: 'jobName', required: true })
async getLeaderboard(@Param('jobName') jobName: string) {
return this.jobsService.getLeaderboard(jobName);
}
@Post(':jobName/add')
@ApiParam({ name: 'jobName', required: true })
@ApiConsumes('text/plain')
@ApiBody({ type: String, description: "Items to add, one per line separated by newline characters" })
async addItemsToJob(@Param('jobName') jobName: string, @Body() items: string) {
return this.jobsService.addItemsToJob(jobName, items.split('\n'));
}
@Post(':jobName/claim')
@ApiParam({ name: 'jobName', required: true })
@ApiConsumes('text/plain')
@ApiBody({ type: String, description: "Claimer identity string" })
async claimJobItem(@Param('jobName') jobName: string, @Body() claimer: string) {
return this.jobsService.claimJobItem(jobName, claimer);
}
@Post(':jobName/complete')
@ApiParam({ name: 'jobName', required: true })
async completeJobItem(@Param('jobName') jobName: string, @Body() body: ClaimCompleteDto) {
return this.jobsService.completeJobItem(jobName, body.item, body.completer, body.data);
}
@Post(':jobName/reset-claimed')
@ApiParam({ name: 'jobName', required: true })
async resetClaimed(@Param('jobName') jobName: string) {
return this.jobsService.resetClaimedItems(jobName);
}
}

10
src/jobs/jobs.module.ts Normal file
View File

@@ -0,0 +1,10 @@
import { Module } from '@nestjs/common';
import { JobsService } from './jobs.service';
import { JobsController } from './jobs.controller';
import { MinioService } from 'src/minio/minio.service';
@Module({
providers: [JobsService, MinioService],
controllers: [JobsController]
})
export class JobsModule { }

156
src/jobs/jobs.service.ts Normal file
View File

@@ -0,0 +1,156 @@
import { Inject, Injectable } from '@nestjs/common';
import { MinioService } from 'src/minio/minio.service';
import Redis from 'ioredis'
import { InjectRedis } from '@liaoliaots/nestjs-redis';
import { CACHE_MANAGER } from '@nestjs/cache-manager';
import { Cache } from 'cache-manager';
@Injectable()
export class JobsService {
constructor(
private readonly minioService: MinioService,
@InjectRedis() private readonly redis: Redis,
@Inject(CACHE_MANAGER) private readonly cacheManager: Cache,
) { }
private jobNameBuilder(jobName: string) {
return `job:${jobName}`;
}
private todoListNameBuilder(jobName: string) {
return `todo:${jobName}`;
}
private doneListNameBuilder(jobName: string) {
return `done:${jobName}`;
}
private claimedListNameBuilder(jobName: string) {
return `claimed:${jobName}`;
}
private completeCountNameBuilder(jobName: string, claimer: string) {
return `complete:${jobName}:${claimer}`;
}
private claimerCountNameBuilder(jobName: string, claimer: string) {
return `claim:${jobName}:${claimer}`;
}
private async getCompleteCounts(jobName: string) {
const keys = await this.redis.keys(`complete:${jobName}:*`);
const counts = await Promise.all(keys.map(async key => {
const count = await this.redis.get(key);
if (!count) {
return null;
}
return { claimer: key.split(':')[2], count: parseInt(count) };
}));
return counts.reduce((acc: any, val: any) => {
if (!val) {
return acc;
}
acc[val.claimer] = val.count;
return acc;
}, {})
}
private async getClaimCounts(jobName: string) {
const keys = await this.redis.keys(`claim:${jobName}:*`);
const counts = await Promise.all(keys.map(async key => {
const count = await this.redis.get(key);
if (!count) {
return null;
}
return { claimer: key.split(':')[2], count: parseInt(count) };
}));
return counts.reduce((acc: any, val: any) => {
if (!val) {
return acc;
}
acc[val.claimer] = val.count;
return acc;
}, {});
}
async getLeaderboard(jobName: string) {
const cachedLeaderboard = await this.cacheManager.get(`leaderboard:${jobName}`);
if (cachedLeaderboard) {
return cachedLeaderboard;
}
const completeCounts = await this.getCompleteCounts(jobName);
const claimCounts = await this.getClaimCounts(jobName);
this.cacheManager.set(`leaderboard:${jobName}`, { completeCounts, claimCounts }, 200);
return { completeCounts, claimCounts };
}
async addItemsToJob(jobName: string, items: string[]) {
await this.redis.rpush(this.todoListNameBuilder(jobName), ...items);
}
async claimJobItem(jobName: string, claimer: string): Promise<string | null> {
const jobItem = await this.redis.brpoplpush(this.todoListNameBuilder(jobName), this.claimedListNameBuilder(jobName), 10);
if (jobItem) {
await this.redis.rpush(this.jobNameBuilder(jobName), JSON.stringify({ item: jobItem, client: claimer }));
}
await this.redis.incr(this.claimerCountNameBuilder(jobName, claimer));
return jobItem;
}
async completeJobItem(jobName: string, jobItem: string, completer: string, data: any) {
await this.redis.lrem(this.claimedListNameBuilder(jobName), 1, jobItem);
await this.redis.lrem(this.todoListNameBuilder(jobName), 1, JSON.stringify({ item: jobItem, client: completer }));
await this.redis.rpush(this.doneListNameBuilder(jobName), JSON.stringify({ item: jobItem, client: completer, data }));
await this.redis.decr(this.claimerCountNameBuilder(jobName, completer));
await this.redis.incr(this.completeCountNameBuilder(jobName, completer));
}
async getTodoItems(jobName: string) {
return this.redis.lrange(this.todoListNameBuilder(jobName), 0, -1);
}
async getTodoItemCount(jobName: string) {
return this.redis.llen(this.todoListNameBuilder(jobName));
}
async getClaimedItems(jobName: string) {
return this.redis.lrange(this.claimedListNameBuilder(jobName), 0, -1);
}
async getClaimedItemCount(jobName: string) {
return this.redis.llen(this.claimedListNameBuilder(jobName));
}
async getDoneItems(jobName: string) {
return this.redis.lrange(this.doneListNameBuilder(jobName), 0, -1);
}
async getDoneItemCount(jobName: string) {
return this.redis.llen(this.doneListNameBuilder(jobName));
}
async getJobs() {
return this.redis.keys('job:*');
}
async registerJob(jobName: string, metadata: any) {
await this.redis.set(this.jobNameBuilder(jobName), JSON.stringify(metadata));
}
async getJobMetadata(jobName: string): Promise<any | null> {
const result = await this.redis.get(this.jobNameBuilder(jobName))
if (!result) {
return null;
}
return JSON.parse(result)
}
async resetClaimedItems(jobName: string) {
const claimedItems = await this.getClaimedItems(jobName);
for (const claimedItem of claimedItems) {
await this.redis.rpoplpush(this.claimedListNameBuilder(jobName), this.todoListNameBuilder(jobName));
}
}
}