diff --git a/packages/nocodb/src/modules/jobs/jobs.module.ts b/packages/nocodb/src/modules/jobs/jobs.module.ts index ca7ef7f7e5..88f5894cd7 100644 --- a/packages/nocodb/src/modules/jobs/jobs.module.ts +++ b/packages/nocodb/src/modules/jobs/jobs.module.ts @@ -47,10 +47,7 @@ import { JobsEventService as FallbackJobsEventService } from './fallback/jobs-ev providers: [ ...(!process.env['NC_WORKER_CONTAINER'] ? [JobsGateway] : []), ...(process.env['NC_REDIS_URL'] - ? [ - JobsRedisService, - ...(process.env['NC_WORKER_CONTAINER'] ? [JobsEventService] : []), - ] + ? [JobsRedisService, JobsEventService] : [FallbackQueueService, FallbackJobsEventService]), { provide: 'JobsService', diff --git a/packages/nocodb/src/modules/jobs/redis/jobs-event.service.ts b/packages/nocodb/src/modules/jobs/redis/jobs-event.service.ts index e6c98253bf..4892c07502 100644 --- a/packages/nocodb/src/modules/jobs/redis/jobs-event.service.ts +++ b/packages/nocodb/src/modules/jobs/redis/jobs-event.service.ts @@ -6,21 +6,31 @@ import { } from '@nestjs/bull'; import { Job } from 'bull'; import boxen from 'boxen'; -import { OnEvent } from '@nestjs/event-emitter'; +import { EventEmitter2, OnEvent } from '@nestjs/event-emitter'; import { JobEvents, JOBS_QUEUE, JobStatus } from '../../../interface/Jobs'; import { JobsRedisService } from './jobs-redis.service'; @Processor(JOBS_QUEUE) export class JobsEventService { - constructor(private jobsRedisService: JobsRedisService) {} + constructor( + private jobsRedisService: JobsRedisService, + private eventEmitter: EventEmitter2, + ) {} @OnQueueActive() onActive(job: Job) { - this.jobsRedisService.publish(`jobs-${job.id.toString()}`, { - cmd: JobEvents.STATUS, - id: job.id.toString(), - status: JobStatus.ACTIVE, - }); + if (process.env['NC_WORKER_CONTAINER']) { + this.jobsRedisService.publish(`jobs-${job.id.toString()}`, { + cmd: JobEvents.STATUS, + id: job.id.toString(), + status: JobStatus.ACTIVE, + }); + } else { + this.eventEmitter.emit(JobEvents.STATUS, { + id: job.id.toString(), + status: JobStatus.ACTIVE, + }); + } } @OnQueueFailed() @@ -36,36 +46,62 @@ export class JobsEventService { ), ); - this.jobsRedisService.publish(`jobs-${job.id.toString()}`, { - cmd: JobEvents.STATUS, - id: job.id.toString(), - status: JobStatus.FAILED, - data: { - error: { - message: error?.message, + if (process.env['NC_WORKER_CONTAINER']) { + this.jobsRedisService.publish(`jobs-${job.id.toString()}`, { + cmd: JobEvents.STATUS, + id: job.id.toString(), + status: JobStatus.FAILED, + data: { + error: { + message: error?.message, + }, }, - }, - }); + }); + } else { + this.jobsRedisService.unsubscribe(`jobs-${job.id.toString()}`); + this.eventEmitter.emit(JobEvents.STATUS, { + id: job.id.toString(), + status: JobStatus.FAILED, + data: { + error: { + message: error?.message, + }, + }, + }); + } } @OnQueueCompleted() onCompleted(job: Job, data: any) { - this.jobsRedisService.publish(`jobs-${job.id.toString()}`, { - cmd: JobEvents.STATUS, - id: job.id.toString(), - status: JobStatus.COMPLETED, - data: { - result: data, - }, - }); + if (process.env['NC_WORKER_CONTAINER']) { + this.jobsRedisService.publish(`jobs-${job.id.toString()}`, { + cmd: JobEvents.STATUS, + id: job.id.toString(), + status: JobStatus.COMPLETED, + data: { + result: data, + }, + }); + } else { + this.jobsRedisService.unsubscribe(`jobs-${job.id.toString()}`); + this.eventEmitter.emit(JobEvents.STATUS, { + id: job.id.toString(), + status: JobStatus.COMPLETED, + data: { + result: data, + }, + }); + } } @OnEvent(JobEvents.LOG) onLog(data: { id: string; data: { message: string } }) { - this.jobsRedisService.publish(`jobs-${data.id}`, { - cmd: JobEvents.LOG, - id: data.id, - data: data.data, - }); + if (process.env['NC_WORKER_CONTAINER']) { + this.jobsRedisService.publish(`jobs-${data.id}`, { + cmd: JobEvents.LOG, + id: data.id, + data: data.data, + }); + } } } diff --git a/packages/nocodb/src/modules/jobs/redis/jobs.service.ts b/packages/nocodb/src/modules/jobs/redis/jobs.service.ts index 74810a6bb0..d78e5d4a83 100644 --- a/packages/nocodb/src/modules/jobs/redis/jobs.service.ts +++ b/packages/nocodb/src/modules/jobs/redis/jobs.service.ts @@ -4,21 +4,39 @@ import { Queue } from 'bull'; import { EventEmitter2 } from '@nestjs/event-emitter'; import { JobEvents, JOBS_QUEUE, JobStatus } from '../../../interface/Jobs'; import { JobsRedisService } from './jobs-redis.service'; +import type { OnModuleInit } from '@nestjs/common'; @Injectable() -export class JobsService { +export class JobsService implements OnModuleInit { constructor( @InjectQueue(JOBS_QUEUE) private readonly jobsQueue: Queue, private jobsRedisService: JobsRedisService, private eventEmitter: EventEmitter2, - ) { + ) {} + + // pause primary instance queue + async onModuleInit() { if (!process.env['NC_WORKER_CONTAINER']) { - this.jobsQueue.pause(true); + await this.jobsQueue.pause(true); } } async add(name: string, data: any) { + // resume primary instance queue if there is no worker + const workerCount = (await this.jobsQueue.getWorkers()).length; + const localWorkerPaused = await this.jobsQueue.isPaused(true); + + // if there is no worker and primary instance queue is paused, resume it + // if there is any worker and primary instance queue is not paused, pause it + if (workerCount < 1 && localWorkerPaused) { + await this.jobsQueue.resume(true); + } else if (workerCount > 0 && !localWorkerPaused) { + await this.jobsQueue.pause(true); + } + const job = await this.jobsQueue.add(name, data); + + // subscribe to job events this.jobsRedisService.subscribe(`jobs-${job.id.toString()}`, (data) => { const cmd = data.cmd; delete data.cmd; @@ -34,6 +52,7 @@ export class JobsService { break; } }); + return job; }