Browse Source

feat: resume primary instance queue if no worker is available

Signed-off-by: mertmit <mertmit99@gmail.com>
pull/5711/head
mertmit 1 year ago
parent
commit
7d66251466
  1. 5
      packages/nocodb/src/modules/jobs/jobs.module.ts
  2. 40
      packages/nocodb/src/modules/jobs/redis/jobs-event.service.ts
  3. 25
      packages/nocodb/src/modules/jobs/redis/jobs.service.ts

5
packages/nocodb/src/modules/jobs/jobs.module.ts

@ -47,10 +47,7 @@ import { JobsEventService as FallbackJobsEventService } from './fallback/jobs-ev
providers: [
...(!process.env['NC_WORKER_CONTAINER'] ? [JobsGateway] : []),
...(process.env['NC_REDIS_URL']
? [
JobsRedisService,
...(process.env['NC_WORKER_CONTAINER'] ? [JobsEventService] : []),
]
? [JobsRedisService, JobsEventService]
: [FallbackQueueService, FallbackJobsEventService]),
{
provide: 'JobsService',

40
packages/nocodb/src/modules/jobs/redis/jobs-event.service.ts

@ -6,21 +6,31 @@ import {
} from '@nestjs/bull';
import { Job } from 'bull';
import boxen from 'boxen';
import { OnEvent } from '@nestjs/event-emitter';
import { EventEmitter2, OnEvent } from '@nestjs/event-emitter';
import { JobEvents, JOBS_QUEUE, JobStatus } from '../../../interface/Jobs';
import { JobsRedisService } from './jobs-redis.service';
@Processor(JOBS_QUEUE)
export class JobsEventService {
constructor(private jobsRedisService: JobsRedisService) {}
constructor(
private jobsRedisService: JobsRedisService,
private eventEmitter: EventEmitter2,
) {}
@OnQueueActive()
onActive(job: Job) {
if (process.env['NC_WORKER_CONTAINER']) {
this.jobsRedisService.publish(`jobs-${job.id.toString()}`, {
cmd: JobEvents.STATUS,
id: job.id.toString(),
status: JobStatus.ACTIVE,
});
} else {
this.eventEmitter.emit(JobEvents.STATUS, {
id: job.id.toString(),
status: JobStatus.ACTIVE,
});
}
}
@OnQueueFailed()
@ -36,6 +46,7 @@ export class JobsEventService {
),
);
if (process.env['NC_WORKER_CONTAINER']) {
this.jobsRedisService.publish(`jobs-${job.id.toString()}`, {
cmd: JobEvents.STATUS,
id: job.id.toString(),
@ -46,10 +57,23 @@ export class JobsEventService {
},
},
});
} else {
this.jobsRedisService.unsubscribe(`jobs-${job.id.toString()}`);
this.eventEmitter.emit(JobEvents.STATUS, {
id: job.id.toString(),
status: JobStatus.FAILED,
data: {
error: {
message: error?.message,
},
},
});
}
}
@OnQueueCompleted()
onCompleted(job: Job, data: any) {
if (process.env['NC_WORKER_CONTAINER']) {
this.jobsRedisService.publish(`jobs-${job.id.toString()}`, {
cmd: JobEvents.STATUS,
id: job.id.toString(),
@ -58,14 +82,26 @@ export class JobsEventService {
result: data,
},
});
} else {
this.jobsRedisService.unsubscribe(`jobs-${job.id.toString()}`);
this.eventEmitter.emit(JobEvents.STATUS, {
id: job.id.toString(),
status: JobStatus.COMPLETED,
data: {
result: data,
},
});
}
}
@OnEvent(JobEvents.LOG)
onLog(data: { id: string; data: { message: string } }) {
if (process.env['NC_WORKER_CONTAINER']) {
this.jobsRedisService.publish(`jobs-${data.id}`, {
cmd: JobEvents.LOG,
id: data.id,
data: data.data,
});
}
}
}

25
packages/nocodb/src/modules/jobs/redis/jobs.service.ts

@ -4,21 +4,39 @@ import { Queue } from 'bull';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { JobEvents, JOBS_QUEUE, JobStatus } from '../../../interface/Jobs';
import { JobsRedisService } from './jobs-redis.service';
import type { OnModuleInit } from '@nestjs/common';
@Injectable()
export class JobsService {
export class JobsService implements OnModuleInit {
constructor(
@InjectQueue(JOBS_QUEUE) private readonly jobsQueue: Queue,
private jobsRedisService: JobsRedisService,
private eventEmitter: EventEmitter2,
) {
) {}
// pause primary instance queue
async onModuleInit() {
if (!process.env['NC_WORKER_CONTAINER']) {
this.jobsQueue.pause(true);
await this.jobsQueue.pause(true);
}
}
async add(name: string, data: any) {
// resume primary instance queue if there is no worker
const workerCount = (await this.jobsQueue.getWorkers()).length;
const localWorkerPaused = await this.jobsQueue.isPaused(true);
// if there is no worker and primary instance queue is paused, resume it
// if there is any worker and primary instance queue is not paused, pause it
if (workerCount < 1 && localWorkerPaused) {
await this.jobsQueue.resume(true);
} else if (workerCount > 0 && !localWorkerPaused) {
await this.jobsQueue.pause(true);
}
const job = await this.jobsQueue.add(name, data);
// subscribe to job events
this.jobsRedisService.subscribe(`jobs-${job.id.toString()}`, (data) => {
const cmd = data.cmd;
delete data.cmd;
@ -34,6 +52,7 @@ export class JobsService {
break;
}
});
return job;
}

Loading…
Cancel
Save