diff --git a/packages/nocodb/src/interface/Jobs.ts b/packages/nocodb/src/interface/Jobs.ts index ee6d4e0409..b1ee06ca94 100644 --- a/packages/nocodb/src/interface/Jobs.ts +++ b/packages/nocodb/src/interface/Jobs.ts @@ -11,6 +11,7 @@ export enum JobTypes { UpdateModelStat = 'update-model-stat', UpdateWsStat = 'update-ws-stats', UpdateSrcStat = 'update-source-stat', + HealthCheck = 'health-check', } export enum JobStatus { diff --git a/packages/nocodb/src/modules/jobs/jobs/health-check.processor.ts b/packages/nocodb/src/modules/jobs/jobs/health-check.processor.ts new file mode 100644 index 0000000000..d3c839a558 --- /dev/null +++ b/packages/nocodb/src/modules/jobs/jobs/health-check.processor.ts @@ -0,0 +1,28 @@ +import { Process, Processor } from '@nestjs/bull'; +import { Inject, Logger } from '@nestjs/common'; +import type { Queue } from 'bull'; +import { JOBS_QUEUE, JobTypes } from '~/interface/Jobs'; + +@Processor(JOBS_QUEUE) +export class HealthCheckProcessor { + private logger = new Logger(HealthCheckProcessor.name); + + constructor(@Inject('JobsService') protected readonly jobsService) {} + + @Process(JobTypes.HealthCheck) + async healthCheck() { + const queue = this.jobsService.jobsQueue as Queue; + + if (queue) { + queue + .getJobCounts() + .then((stats) => { + // log stats periodically + this.logger.log({ stats }); + }) + .catch((err) => { + this.logger.error(err); + }); + } + } +} diff --git a/packages/nocodb/src/modules/jobs/redis/jobs.service.ts b/packages/nocodb/src/modules/jobs/redis/jobs.service.ts index be40e8ca0e..23add1c064 100644 --- a/packages/nocodb/src/modules/jobs/redis/jobs.service.ts +++ b/packages/nocodb/src/modules/jobs/redis/jobs.service.ts @@ -21,25 +21,24 @@ export class JobsService implements OnModuleInit { // pause primary instance queue async onModuleInit() { - if (process.env.NC_WORKER_CONTAINER !== 'true') { - await this.jobsQueue.pause(true); - } else { - this.jobsRedisService.workerCallbacks[InstanceCommands.RESUME_LOCAL] = - async () => { - this.logger.log('Resuming local queue'); - await this.jobsQueue.resume(true); - }; - this.jobsRedisService.workerCallbacks[InstanceCommands.PAUSE_LOCAL] = - async () => { - this.logger.log('Pausing local queue'); - await this.jobsQueue.pause(true); - }; - } + await this.toggleQueue(); + + this.jobsRedisService.workerCallbacks[InstanceCommands.RESUME_LOCAL] = + async () => { + this.logger.log('Resuming local queue'); + await this.jobsQueue.resume(true); + }; + this.jobsRedisService.workerCallbacks[InstanceCommands.PAUSE_LOCAL] = + async () => { + this.logger.log('Pausing local queue'); + await this.jobsQueue.pause(true); + }; } - async add(name: string, data: any) { - // if NC_WORKER_CONTAINER is false, then skip dynamic queue pause/resume - if (process.env.NC_WORKER_CONTAINER !== 'false') { + async toggleQueue() { + if (process.env.NC_WORKER_CONTAINER === 'false') { + await this.jobsQueue.pause(true); + } else if (process.env.NC_WORKER_CONTAINER !== 'true') { // resume primary instance queue if there is no worker const workerCount = await this.jobsRedisService.workerCount(); const localWorkerPaused = await this.jobsQueue.isPaused(true); @@ -52,6 +51,10 @@ export class JobsService implements OnModuleInit { await this.jobsQueue.pause(true); } } + } + + async add(name: string, data: any) { + await this.toggleQueue(); const job = await this.jobsQueue.add(name, data);