Browse Source

fix: move worker health check to jobs (#8497)

pull/8500/head
Mert E 6 months ago committed by GitHub
parent
commit
31a5350edb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 1
      packages/nocodb/src/interface/Jobs.ts
  2. 28
      packages/nocodb/src/modules/jobs/jobs/health-check.processor.ts
  3. 37
      packages/nocodb/src/modules/jobs/redis/jobs.service.ts

1
packages/nocodb/src/interface/Jobs.ts

@ -11,6 +11,7 @@ export enum JobTypes {
UpdateModelStat = 'update-model-stat', UpdateModelStat = 'update-model-stat',
UpdateWsStat = 'update-ws-stats', UpdateWsStat = 'update-ws-stats',
UpdateSrcStat = 'update-source-stat', UpdateSrcStat = 'update-source-stat',
HealthCheck = 'health-check',
} }
export enum JobStatus { export enum JobStatus {

28
packages/nocodb/src/modules/jobs/jobs/health-check.processor.ts

@ -0,0 +1,28 @@
import { Process, Processor } from '@nestjs/bull';
import { Inject, Logger } from '@nestjs/common';
import type { Queue } from 'bull';
import { JOBS_QUEUE, JobTypes } from '~/interface/Jobs';
@Processor(JOBS_QUEUE)
export class HealthCheckProcessor {
private logger = new Logger(HealthCheckProcessor.name);
constructor(@Inject('JobsService') protected readonly jobsService) {}
@Process(JobTypes.HealthCheck)
async healthCheck() {
const queue = this.jobsService.jobsQueue as Queue;
if (queue) {
queue
.getJobCounts()
.then((stats) => {
// log stats periodically
this.logger.log({ stats });
})
.catch((err) => {
this.logger.error(err);
});
}
}
}

37
packages/nocodb/src/modules/jobs/redis/jobs.service.ts

@ -21,25 +21,24 @@ export class JobsService implements OnModuleInit {
// pause primary instance queue // pause primary instance queue
async onModuleInit() { async onModuleInit() {
if (process.env.NC_WORKER_CONTAINER !== 'true') { await this.toggleQueue();
await this.jobsQueue.pause(true);
} else { this.jobsRedisService.workerCallbacks[InstanceCommands.RESUME_LOCAL] =
this.jobsRedisService.workerCallbacks[InstanceCommands.RESUME_LOCAL] = async () => {
async () => { this.logger.log('Resuming local queue');
this.logger.log('Resuming local queue'); await this.jobsQueue.resume(true);
await this.jobsQueue.resume(true); };
}; this.jobsRedisService.workerCallbacks[InstanceCommands.PAUSE_LOCAL] =
this.jobsRedisService.workerCallbacks[InstanceCommands.PAUSE_LOCAL] = async () => {
async () => { this.logger.log('Pausing local queue');
this.logger.log('Pausing local queue'); await this.jobsQueue.pause(true);
await this.jobsQueue.pause(true); };
};
}
} }
async add(name: string, data: any) { async toggleQueue() {
// if NC_WORKER_CONTAINER is false, then skip dynamic queue pause/resume if (process.env.NC_WORKER_CONTAINER === 'false') {
if (process.env.NC_WORKER_CONTAINER !== 'false') { await this.jobsQueue.pause(true);
} else if (process.env.NC_WORKER_CONTAINER !== 'true') {
// resume primary instance queue if there is no worker // resume primary instance queue if there is no worker
const workerCount = await this.jobsRedisService.workerCount(); const workerCount = await this.jobsRedisService.workerCount();
const localWorkerPaused = await this.jobsQueue.isPaused(true); const localWorkerPaused = await this.jobsQueue.isPaused(true);
@ -52,6 +51,10 @@ export class JobsService implements OnModuleInit {
await this.jobsQueue.pause(true); await this.jobsQueue.pause(true);
} }
} }
}
async add(name: string, data: any) {
await this.toggleQueue();
const job = await this.jobsQueue.add(name, data); const job = await this.jobsQueue.add(name, data);

Loading…
Cancel
Save