Browse Source

refactor: globals for workers

pull/6899/head
mertmit 1 year ago
parent
commit
6f4af35916
  1. 10
      packages/nocodb/src/interface/Jobs.ts
  2. 12
      packages/nocodb/src/modules/jobs/jobs.controller.ts
  3. 15
      packages/nocodb/src/modules/jobs/redis/jobs-redis.service.ts
  4. 20
      packages/nocodb/src/modules/jobs/redis/jobs.service.ts

10
packages/nocodb/src/interface/Jobs.ts

@ -26,3 +26,13 @@ export enum JobEvents {
STATUS = 'job.status',
LOG = 'job.log',
}
export enum InstanceTypes {
PRIMARY = 'primary',
WORKER = 'worker',
}
export enum WorkerCommands {
RESUME_LOCAL = 'resumeLocal',
PAUSE_LOCAL = 'pauseLocal',
}

12
packages/nocodb/src/modules/jobs/jobs.controller.ts

@ -14,7 +14,7 @@ import { ModuleRef } from '@nestjs/core';
import { AuthGuard } from '@nestjs/passport';
import { JobsRedisService } from './redis/jobs-redis.service';
import type { OnModuleInit } from '@nestjs/common';
import { JobStatus } from '~/interface/Jobs';
import { InstanceTypes, JobStatus, WorkerCommands } from '~/interface/Jobs';
import { JobEvents } from '~/interface/Jobs';
import { GlobalGuard } from '~/guards/global/global.guard';
import NocoCache from '~/cache/NocoCache';
@ -182,7 +182,10 @@ export class JobsController implements OnModuleInit {
if (body.global === true) {
await this.jobsService.resumeQueue();
} else {
await this.jobsRedisService.publish('workers', 'resumeLocal');
await this.jobsRedisService.publish(
InstanceTypes.WORKER,
WorkerCommands.RESUME_LOCAL,
);
}
}
@ -193,7 +196,10 @@ export class JobsController implements OnModuleInit {
if (body.global === true) {
await this.jobsService.pauseQueue();
} else {
await this.jobsRedisService.publish('workers', 'pauseLocal');
await this.jobsRedisService.publish(
InstanceTypes.WORKER,
WorkerCommands.PAUSE_LOCAL,
);
}
}

15
packages/nocodb/src/modules/jobs/redis/jobs-redis.service.ts

@ -1,5 +1,6 @@
import { Injectable } from '@nestjs/common';
import Redis from 'ioredis';
import { InstanceTypes } from '~/interface/Jobs';
@Injectable()
export class JobsRedisService {
@ -7,25 +8,25 @@ export class JobsRedisService {
private redisSubscriber: Redis;
private unsubscribeCallbacks: { [key: string]: () => void } = {};
public primaryCallbacks: { [key: string]: () => void } = {};
public workerCallbacks: { [key: string]: () => void } = {};
public instanceCallbacks: { [key: string]: () => void } = {};
constructor() {
this.redisClient = new Redis(process.env.NC_REDIS_JOB_URL);
this.redisSubscriber = new Redis(process.env.NC_REDIS_JOB_URL);
if (process.env.NC_WORKER_CONTAINER === 'true') {
this.redisSubscriber.subscribe('workers');
this.redisSubscriber.subscribe(InstanceTypes.WORKER);
} else {
this.redisSubscriber.subscribe('instances');
this.redisSubscriber.subscribe(InstanceTypes.PRIMARY);
}
const onMessage = (channel, message) => {
console.log('onMessage', channel, message);
if (channel === 'workers') {
if (channel === InstanceTypes.WORKER) {
this.workerCallbacks[message] && this.workerCallbacks[message]();
} else if (channel === 'instances') {
this.instanceCallbacks[message] && this.instanceCallbacks[message]();
} else if (channel === InstanceTypes.PRIMARY) {
this.primaryCallbacks[message] && this.primaryCallbacks[message]();
}
};
@ -71,7 +72,7 @@ export class JobsRedisService {
workerCount(): Promise<number> {
return new Promise((resolve, reject) => {
this.redisClient.publish(
'workers',
InstanceTypes.WORKER,
'count',
(error, numberOfSubscribers) => {
if (error) {

20
packages/nocodb/src/modules/jobs/redis/jobs.service.ts

@ -2,7 +2,7 @@ import { InjectQueue } from '@nestjs/bull';
import { Injectable, Logger } from '@nestjs/common';
import { Queue } from 'bull';
import type { OnModuleInit } from '@nestjs/common';
import { JOBS_QUEUE, JobStatus } from '~/interface/Jobs';
import { JOBS_QUEUE, JobStatus, WorkerCommands } from '~/interface/Jobs';
import { JobsRedisService } from '~/modules/jobs/redis/jobs-redis.service';
@Injectable()
@ -19,14 +19,16 @@ export class JobsService implements OnModuleInit {
if (process.env.NC_WORKER_CONTAINER !== 'true') {
await this.jobsQueue.pause(true);
} else {
this.jobsRedisService.workerCallbacks['resumeLocal'] = async () => {
this.logger.log('Resuming local queue');
await this.jobsQueue.resume(true);
};
this.jobsRedisService.workerCallbacks['pauseLocal'] = async () => {
this.logger.log('Pausing local queue');
await this.jobsQueue.pause(true);
};
this.jobsRedisService.workerCallbacks[WorkerCommands.RESUME_LOCAL] =
async () => {
this.logger.log('Resuming local queue');
await this.jobsQueue.resume(true);
};
this.jobsRedisService.workerCallbacks[WorkerCommands.PAUSE_LOCAL] =
async () => {
this.logger.log('Pausing local queue');
await this.jobsQueue.pause(true);
};
}
}

Loading…
Cancel
Save