From 6f4af359168a91faa78800cb7f8654c3ab778c5e Mon Sep 17 00:00:00 2001 From: mertmit Date: Tue, 7 Nov 2023 06:26:56 +0000 Subject: [PATCH] refactor: globals for workers --- packages/nocodb/src/interface/Jobs.ts | 10 ++++++++++ .../src/modules/jobs/jobs.controller.ts | 12 ++++++++--- .../modules/jobs/redis/jobs-redis.service.ts | 15 +++++++------- .../src/modules/jobs/redis/jobs.service.ts | 20 ++++++++++--------- 4 files changed, 38 insertions(+), 19 deletions(-) diff --git a/packages/nocodb/src/interface/Jobs.ts b/packages/nocodb/src/interface/Jobs.ts index 93a1127af0..7c2d121ac3 100644 --- a/packages/nocodb/src/interface/Jobs.ts +++ b/packages/nocodb/src/interface/Jobs.ts @@ -26,3 +26,13 @@ export enum JobEvents { STATUS = 'job.status', LOG = 'job.log', } + +export enum InstanceTypes { + PRIMARY = 'primary', + WORKER = 'worker', +} + +export enum WorkerCommands { + RESUME_LOCAL = 'resumeLocal', + PAUSE_LOCAL = 'pauseLocal', +} diff --git a/packages/nocodb/src/modules/jobs/jobs.controller.ts b/packages/nocodb/src/modules/jobs/jobs.controller.ts index f036eba6cd..d8140e12e9 100644 --- a/packages/nocodb/src/modules/jobs/jobs.controller.ts +++ b/packages/nocodb/src/modules/jobs/jobs.controller.ts @@ -14,7 +14,7 @@ import { ModuleRef } from '@nestjs/core'; import { AuthGuard } from '@nestjs/passport'; import { JobsRedisService } from './redis/jobs-redis.service'; import type { OnModuleInit } from '@nestjs/common'; -import { JobStatus } from '~/interface/Jobs'; +import { InstanceTypes, JobStatus, WorkerCommands } from '~/interface/Jobs'; import { JobEvents } from '~/interface/Jobs'; import { GlobalGuard } from '~/guards/global/global.guard'; import NocoCache from '~/cache/NocoCache'; @@ -182,7 +182,10 @@ export class JobsController implements OnModuleInit { if (body.global === true) { await this.jobsService.resumeQueue(); } else { - await this.jobsRedisService.publish('workers', 'resumeLocal'); + await this.jobsRedisService.publish( + InstanceTypes.WORKER, + WorkerCommands.RESUME_LOCAL, + ); } } @@ -193,7 +196,10 @@ export class JobsController implements OnModuleInit { if (body.global === true) { await this.jobsService.pauseQueue(); } else { - await this.jobsRedisService.publish('workers', 'pauseLocal'); + await this.jobsRedisService.publish( + InstanceTypes.WORKER, + WorkerCommands.PAUSE_LOCAL, + ); } } diff --git a/packages/nocodb/src/modules/jobs/redis/jobs-redis.service.ts b/packages/nocodb/src/modules/jobs/redis/jobs-redis.service.ts index 9dd92c0578..8a77a3e618 100644 --- a/packages/nocodb/src/modules/jobs/redis/jobs-redis.service.ts +++ b/packages/nocodb/src/modules/jobs/redis/jobs-redis.service.ts @@ -1,5 +1,6 @@ import { Injectable } from '@nestjs/common'; import Redis from 'ioredis'; +import { InstanceTypes } from '~/interface/Jobs'; @Injectable() export class JobsRedisService { @@ -7,25 +8,25 @@ export class JobsRedisService { private redisSubscriber: Redis; private unsubscribeCallbacks: { [key: string]: () => void } = {}; + public primaryCallbacks: { [key: string]: () => void } = {}; public workerCallbacks: { [key: string]: () => void } = {}; - public instanceCallbacks: { [key: string]: () => void } = {}; constructor() { this.redisClient = new Redis(process.env.NC_REDIS_JOB_URL); this.redisSubscriber = new Redis(process.env.NC_REDIS_JOB_URL); if (process.env.NC_WORKER_CONTAINER === 'true') { - this.redisSubscriber.subscribe('workers'); + this.redisSubscriber.subscribe(InstanceTypes.WORKER); } else { - this.redisSubscriber.subscribe('instances'); + this.redisSubscriber.subscribe(InstanceTypes.PRIMARY); } const onMessage = (channel, message) => { console.log('onMessage', channel, message); - if (channel === 'workers') { + if (channel === InstanceTypes.WORKER) { this.workerCallbacks[message] && this.workerCallbacks[message](); - } else if (channel === 'instances') { - this.instanceCallbacks[message] && this.instanceCallbacks[message](); + } else if (channel === InstanceTypes.PRIMARY) { + this.primaryCallbacks[message] && this.primaryCallbacks[message](); } }; @@ -71,7 +72,7 @@ export class JobsRedisService { workerCount(): Promise { return new Promise((resolve, reject) => { this.redisClient.publish( - 'workers', + InstanceTypes.WORKER, 'count', (error, numberOfSubscribers) => { if (error) { diff --git a/packages/nocodb/src/modules/jobs/redis/jobs.service.ts b/packages/nocodb/src/modules/jobs/redis/jobs.service.ts index 8dd29c02a4..03ba96bbdf 100644 --- a/packages/nocodb/src/modules/jobs/redis/jobs.service.ts +++ b/packages/nocodb/src/modules/jobs/redis/jobs.service.ts @@ -2,7 +2,7 @@ import { InjectQueue } from '@nestjs/bull'; import { Injectable, Logger } from '@nestjs/common'; import { Queue } from 'bull'; import type { OnModuleInit } from '@nestjs/common'; -import { JOBS_QUEUE, JobStatus } from '~/interface/Jobs'; +import { JOBS_QUEUE, JobStatus, WorkerCommands } from '~/interface/Jobs'; import { JobsRedisService } from '~/modules/jobs/redis/jobs-redis.service'; @Injectable() @@ -19,14 +19,16 @@ export class JobsService implements OnModuleInit { if (process.env.NC_WORKER_CONTAINER !== 'true') { await this.jobsQueue.pause(true); } else { - this.jobsRedisService.workerCallbacks['resumeLocal'] = async () => { - this.logger.log('Resuming local queue'); - await this.jobsQueue.resume(true); - }; - this.jobsRedisService.workerCallbacks['pauseLocal'] = async () => { - this.logger.log('Pausing local queue'); - await this.jobsQueue.pause(true); - }; + this.jobsRedisService.workerCallbacks[WorkerCommands.RESUME_LOCAL] = + async () => { + this.logger.log('Resuming local queue'); + await this.jobsQueue.resume(true); + }; + this.jobsRedisService.workerCallbacks[WorkerCommands.PAUSE_LOCAL] = + async () => { + this.logger.log('Pausing local queue'); + await this.jobsQueue.pause(true); + }; } }