From cd267cad7881f7c5695b075021411365f0a9db73 Mon Sep 17 00:00:00 2001 From: mertmit Date: Tue, 7 Nov 2023 06:26:56 +0000 Subject: [PATCH 1/9] feat: prepare worker cluster --- .../src/modules/jobs/fallback/jobs.service.ts | 8 ++ .../src/modules/jobs/jobs.controller.ts | 23 +++++ .../modules/jobs/redis/jobs-redis.service.ts | 36 ++++++++ .../src/modules/jobs/redis/jobs.service.ts | 38 +++++++-- packages/nocodb/src/run/testDocker.ts | 84 ++++++++++--------- 5 files changed, 144 insertions(+), 45 deletions(-) diff --git a/packages/nocodb/src/modules/jobs/fallback/jobs.service.ts b/packages/nocodb/src/modules/jobs/fallback/jobs.service.ts index 978f3d3d03..594e806a8e 100644 --- a/packages/nocodb/src/modules/jobs/fallback/jobs.service.ts +++ b/packages/nocodb/src/modules/jobs/fallback/jobs.service.ts @@ -48,4 +48,12 @@ export class JobsService { return job; } + + async resumeQueue() { + await this.fallbackQueueService.queue.start(); + } + + async pauseQueue() { + await this.fallbackQueueService.queue.pause(); + } } diff --git a/packages/nocodb/src/modules/jobs/jobs.controller.ts b/packages/nocodb/src/modules/jobs/jobs.controller.ts index f5a2cdeb4c..f036eba6cd 100644 --- a/packages/nocodb/src/modules/jobs/jobs.controller.ts +++ b/packages/nocodb/src/modules/jobs/jobs.controller.ts @@ -11,6 +11,7 @@ import { import { OnEvent } from '@nestjs/event-emitter'; import { customAlphabet } from 'nanoid'; import { ModuleRef } from '@nestjs/core'; +import { AuthGuard } from '@nestjs/passport'; import { JobsRedisService } from './redis/jobs-redis.service'; import type { OnModuleInit } from '@nestjs/common'; import { JobStatus } from '~/interface/Jobs'; @@ -174,6 +175,28 @@ export class JobsController implements OnModuleInit { return res; } + // reference: https://github.com/OptimalBits/bull/blob/develop/REFERENCE.md#queueresume + @Post('/internal/workers/resume') + @UseGuards(MetaApiLimiterGuard, AuthGuard('basic')) + async resumeWorkers(@Body() body: { global?: boolean }) { + if (body.global === true) { + await this.jobsService.resumeQueue(); + } else { + await this.jobsRedisService.publish('workers', 'resumeLocal'); + } + } + + // reference: https://github.com/OptimalBits/bull/blob/develop/REFERENCE.md#queuepause + @Post('/internal/workers/pause') + @UseGuards(MetaApiLimiterGuard, AuthGuard('basic')) + async pauseWorkers(@Body() body: { global?: boolean }) { + if (body.global === true) { + await this.jobsService.pauseQueue(); + } else { + await this.jobsRedisService.publish('workers', 'pauseLocal'); + } + } + @OnEvent(JobEvents.STATUS) sendJobStatus(data: { id: string; status: JobStatus; data?: any }): void { let response; diff --git a/packages/nocodb/src/modules/jobs/redis/jobs-redis.service.ts b/packages/nocodb/src/modules/jobs/redis/jobs-redis.service.ts index 5d50947b9e..9dd92c0578 100644 --- a/packages/nocodb/src/modules/jobs/redis/jobs-redis.service.ts +++ b/packages/nocodb/src/modules/jobs/redis/jobs-redis.service.ts @@ -7,9 +7,29 @@ export class JobsRedisService { private redisSubscriber: Redis; private unsubscribeCallbacks: { [key: string]: () => void } = {}; + public workerCallbacks: { [key: string]: () => void } = {}; + public instanceCallbacks: { [key: string]: () => void } = {}; + constructor() { this.redisClient = new Redis(process.env.NC_REDIS_JOB_URL); this.redisSubscriber = new Redis(process.env.NC_REDIS_JOB_URL); + + if (process.env.NC_WORKER_CONTAINER === 'true') { + this.redisSubscriber.subscribe('workers'); + } else { + this.redisSubscriber.subscribe('instances'); + } + + const onMessage = (channel, message) => { + console.log('onMessage', channel, message); + if (channel === 'workers') { + this.workerCallbacks[message] && this.workerCallbacks[message](); + } else if (channel === 'instances') { + this.instanceCallbacks[message] && this.instanceCallbacks[message](); + } + }; + + this.redisSubscriber.on('message', onMessage); } publish(channel: string, message: string | any) { @@ -47,4 +67,20 @@ export class JobsRedisService { delete this.unsubscribeCallbacks[channel]; } } + + workerCount(): Promise { + return new Promise((resolve, reject) => { + this.redisClient.publish( + 'workers', + 'count', + (error, numberOfSubscribers) => { + if (error) { + reject(0); + } else { + resolve(numberOfSubscribers); + } + }, + ); + }); + } } diff --git a/packages/nocodb/src/modules/jobs/redis/jobs.service.ts b/packages/nocodb/src/modules/jobs/redis/jobs.service.ts index 5e3dcb14a2..4202c6f8dc 100644 --- a/packages/nocodb/src/modules/jobs/redis/jobs.service.ts +++ b/packages/nocodb/src/modules/jobs/redis/jobs.service.ts @@ -3,30 +3,48 @@ import { Injectable } from '@nestjs/common'; import { Queue } from 'bull'; import type { OnModuleInit } from '@nestjs/common'; import { JOBS_QUEUE, JobStatus } from '~/interface/Jobs'; +import { JobsRedisService } from '~/modules/jobs/redis/jobs-redis.service'; @Injectable() export class JobsService implements OnModuleInit { - constructor(@InjectQueue(JOBS_QUEUE) protected readonly jobsQueue: Queue) {} + constructor( + @InjectQueue(JOBS_QUEUE) protected readonly jobsQueue: Queue, + protected readonly jobsRedisService: JobsRedisService, + ) {} // pause primary instance queue async onModuleInit() { if (process.env.NC_WORKER_CONTAINER !== 'true') { - // await this.jobsQueue.pause(true); + await this.jobsQueue.pause(true); + this.jobsRedisService.publish('workers', 'pause'); + } else { + this.jobsRedisService.workerCallbacks['resumeLocal'] = async () => { + await this.jobsQueue.resume(true); + }; + this.jobsRedisService.workerCallbacks['pauseLocal'] = async () => { + await this.jobsQueue.pause(true); + }; + this.jobsRedisService.workerCallbacks['resume'] = async () => { + await this.jobsQueue.resume(); + }; + this.jobsRedisService.workerCallbacks['pause'] = async () => { + await this.jobsQueue.pause(); + }; } } async add(name: string, data: any) { // resume primary instance queue if there is no worker - /* const workerCount = (await this.jobsQueue.getWorkers()).length; + const workerCount = await this.jobsRedisService.workerCount(); const localWorkerPaused = await this.jobsQueue.isPaused(true); // if there is no worker and primary instance queue is paused, resume it // if there is any worker and primary instance queue is not paused, pause it - if (workerCount === 1 && localWorkerPaused) { + if (workerCount === 0 && localWorkerPaused) { await this.jobsQueue.resume(true); - } else if (workerCount > 1 && !localWorkerPaused) { + } else if (workerCount > 0 && !localWorkerPaused) { await this.jobsQueue.pause(true); - } */ + } const job = await this.jobsQueue.add(name, data); @@ -72,4 +90,12 @@ export class JobsService implements OnModuleInit { return job; } + + async resumeQueue() { + await this.jobsQueue.resume(); + } + + async pauseQueue() { + await this.jobsQueue.pause(); + } } diff --git a/packages/nocodb/src/run/testDocker.ts b/packages/nocodb/src/run/testDocker.ts index 6391c75dec..68c3f78817 100644 --- a/packages/nocodb/src/run/testDocker.ts +++ b/packages/nocodb/src/run/testDocker.ts @@ -26,57 +26,63 @@ process.env[`DEBUG`] = 'xc*'; process.env[`NC_ALLOW_LOCAL_HOOKS`] = 'true'; (async () => { - const httpServer = server.listen(process.env.PORT || 8080, async () => { - server.use(await Noco.init({}, httpServer, server)); + if (process.env.NC_WORKER_CONTAINER === 'true') { + await await Noco.init({}, null, null); + } else { + const httpServer = server.listen(process.env.PORT || 8080, async () => { + server.use(await Noco.init({}, httpServer, server)); - let admin_response; - if (!(await User.getByEmail('user@nocodb.com'))) { - admin_response = await axios.post( - `http://localhost:${process.env.PORT || 8080}/api/v1/auth/user/signup`, - { - email: 'user@nocodb.com', - password: 'Password123.', - }, - ); - console.log(admin_response.data); - } - - for (let i = 0; i < 4; i++) { - if (!(await User.getByEmail(`user-${i}@nocodb.com`))) { - const response = await axios.post( + let admin_response; + if (!(await User.getByEmail('user@nocodb.com'))) { + admin_response = await axios.post( `http://localhost:${ process.env.PORT || 8080 }/api/v1/auth/user/signup`, { - email: `user-${i}@nocodb.com`, + email: 'user@nocodb.com', password: 'Password123.', }, ); - console.log(response.data); + console.log(admin_response.data); + } - const user = await axios.get( - `http://localhost:${process.env.PORT || 8080}/api/v1/auth/user/me`, - { - headers: { - 'xc-auth': response.data.token, + for (let i = 0; i < 4; i++) { + if (!(await User.getByEmail(`user-${i}@nocodb.com`))) { + const response = await axios.post( + `http://localhost:${ + process.env.PORT || 8080 + }/api/v1/auth/user/signup`, + { + email: `user-${i}@nocodb.com`, + password: 'Password123.', }, - }, - ); + ); + console.log(response.data); - const response2 = await axios.patch( - `http://localhost:${process.env.PORT || 8080}/api/v1/users/${ - user.data.id - }`, - { roles: 'org-level-creator' }, - { - headers: { - 'xc-auth': admin_response.data.token, + const user = await axios.get( + `http://localhost:${process.env.PORT || 8080}/api/v1/auth/user/me`, + { + headers: { + 'xc-auth': response.data.token, + }, }, - }, - ); + ); + + const response2 = await axios.patch( + `http://localhost:${process.env.PORT || 8080}/api/v1/users/${ + user.data.id + }`, + { roles: 'org-level-creator' }, + { + headers: { + 'xc-auth': admin_response.data.token, + }, + }, + ); - console.log(response2.data); + console.log(response2.data); + } } - } - }); + }); + } })().catch((e) => console.log(e)); From a75d1b9e9015f737524e5a5c20677d0ebe30d0da Mon Sep 17 00:00:00 2001 From: mertmit Date: Tue, 7 Nov 2023 06:26:56 +0000 Subject: [PATCH 2/9] feat: logging for worker operations --- .../nocodb/src/modules/jobs/redis/jobs.service.ts | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/packages/nocodb/src/modules/jobs/redis/jobs.service.ts b/packages/nocodb/src/modules/jobs/redis/jobs.service.ts index 4202c6f8dc..8dd29c02a4 100644 --- a/packages/nocodb/src/modules/jobs/redis/jobs.service.ts +++ b/packages/nocodb/src/modules/jobs/redis/jobs.service.ts @@ -1,5 +1,5 @@ import { InjectQueue } from '@nestjs/bull'; -import { Injectable } from '@nestjs/common'; +import { Injectable, Logger } from '@nestjs/common'; import { Queue } from 'bull'; import type { OnModuleInit } from '@nestjs/common'; import { JOBS_QUEUE, JobStatus } from '~/interface/Jobs'; @@ -7,6 +7,8 @@ import { JobsRedisService } from '~/modules/jobs/redis/jobs-redis.service'; @Injectable() export class JobsService implements OnModuleInit { + private logger = new Logger(JobsService.name); + constructor( @InjectQueue(JOBS_QUEUE) protected readonly jobsQueue: Queue, protected readonly jobsRedisService: JobsRedisService, @@ -16,20 +18,15 @@ export class JobsService implements OnModuleInit { async onModuleInit() { if (process.env.NC_WORKER_CONTAINER !== 'true') { await this.jobsQueue.pause(true); - this.jobsRedisService.publish('workers', 'pause'); } else { this.jobsRedisService.workerCallbacks['resumeLocal'] = async () => { + this.logger.log('Resuming local queue'); await this.jobsQueue.resume(true); }; this.jobsRedisService.workerCallbacks['pauseLocal'] = async () => { + this.logger.log('Pausing local queue'); await this.jobsQueue.pause(true); }; - this.jobsRedisService.workerCallbacks['resume'] = async () => { - await this.jobsQueue.resume(); - }; - this.jobsRedisService.workerCallbacks['pause'] = async () => { - await this.jobsQueue.pause(); - }; } } @@ -92,10 +89,12 @@ export class JobsService implements OnModuleInit { } async resumeQueue() { + this.logger.log('Resuming global queue'); await this.jobsQueue.resume(); } async pauseQueue() { + this.logger.log('Pausing global queue'); await this.jobsQueue.pause(); } } From 6f4af359168a91faa78800cb7f8654c3ab778c5e Mon Sep 17 00:00:00 2001 From: mertmit Date: Tue, 7 Nov 2023 06:26:56 +0000 Subject: [PATCH 3/9] refactor: globals for workers --- packages/nocodb/src/interface/Jobs.ts | 10 ++++++++++ .../src/modules/jobs/jobs.controller.ts | 12 ++++++++--- .../modules/jobs/redis/jobs-redis.service.ts | 15 +++++++------- .../src/modules/jobs/redis/jobs.service.ts | 20 ++++++++++--------- 4 files changed, 38 insertions(+), 19 deletions(-) diff --git a/packages/nocodb/src/interface/Jobs.ts b/packages/nocodb/src/interface/Jobs.ts index 93a1127af0..7c2d121ac3 100644 --- a/packages/nocodb/src/interface/Jobs.ts +++ b/packages/nocodb/src/interface/Jobs.ts @@ -26,3 +26,13 @@ export enum JobEvents { STATUS = 'job.status', LOG = 'job.log', } + +export enum InstanceTypes { + PRIMARY = 'primary', + WORKER = 'worker', +} + +export enum WorkerCommands { + RESUME_LOCAL = 'resumeLocal', + PAUSE_LOCAL = 'pauseLocal', +} diff --git a/packages/nocodb/src/modules/jobs/jobs.controller.ts b/packages/nocodb/src/modules/jobs/jobs.controller.ts index f036eba6cd..d8140e12e9 100644 --- a/packages/nocodb/src/modules/jobs/jobs.controller.ts +++ b/packages/nocodb/src/modules/jobs/jobs.controller.ts @@ -14,7 +14,7 @@ import { ModuleRef } from '@nestjs/core'; import { AuthGuard } from '@nestjs/passport'; import { JobsRedisService } from './redis/jobs-redis.service'; import type { OnModuleInit } from '@nestjs/common'; -import { JobStatus } from '~/interface/Jobs'; +import { InstanceTypes, JobStatus, WorkerCommands } from '~/interface/Jobs'; import { JobEvents } from '~/interface/Jobs'; import { GlobalGuard } from '~/guards/global/global.guard'; import NocoCache from '~/cache/NocoCache'; @@ -182,7 +182,10 @@ export class JobsController implements OnModuleInit { if (body.global === true) { await this.jobsService.resumeQueue(); } else { - await this.jobsRedisService.publish('workers', 'resumeLocal'); + await this.jobsRedisService.publish( + InstanceTypes.WORKER, + WorkerCommands.RESUME_LOCAL, + ); } } @@ -193,7 +196,10 @@ export class JobsController implements OnModuleInit { if (body.global === true) { await this.jobsService.pauseQueue(); } else { - await this.jobsRedisService.publish('workers', 'pauseLocal'); + await this.jobsRedisService.publish( + InstanceTypes.WORKER, + WorkerCommands.PAUSE_LOCAL, + ); } } diff --git a/packages/nocodb/src/modules/jobs/redis/jobs-redis.service.ts b/packages/nocodb/src/modules/jobs/redis/jobs-redis.service.ts index 9dd92c0578..8a77a3e618 100644 --- a/packages/nocodb/src/modules/jobs/redis/jobs-redis.service.ts +++ b/packages/nocodb/src/modules/jobs/redis/jobs-redis.service.ts @@ -1,5 +1,6 @@ import { Injectable } from '@nestjs/common'; import Redis from 'ioredis'; +import { InstanceTypes } from '~/interface/Jobs'; @Injectable() export class JobsRedisService { @@ -7,25 +8,25 @@ export class JobsRedisService { private redisSubscriber: Redis; private unsubscribeCallbacks: { [key: string]: () => void } = {}; + public primaryCallbacks: { [key: string]: () => void } = {}; public workerCallbacks: { [key: string]: () => void } = {}; - public instanceCallbacks: { [key: string]: () => void } = {}; constructor() { this.redisClient = new Redis(process.env.NC_REDIS_JOB_URL); this.redisSubscriber = new Redis(process.env.NC_REDIS_JOB_URL); if (process.env.NC_WORKER_CONTAINER === 'true') { - this.redisSubscriber.subscribe('workers'); + this.redisSubscriber.subscribe(InstanceTypes.WORKER); } else { - this.redisSubscriber.subscribe('instances'); + this.redisSubscriber.subscribe(InstanceTypes.PRIMARY); } const onMessage = (channel, message) => { console.log('onMessage', channel, message); - if (channel === 'workers') { + if (channel === InstanceTypes.WORKER) { this.workerCallbacks[message] && this.workerCallbacks[message](); - } else if (channel === 'instances') { - this.instanceCallbacks[message] && this.instanceCallbacks[message](); + } else if (channel === InstanceTypes.PRIMARY) { + this.primaryCallbacks[message] && this.primaryCallbacks[message](); } }; @@ -71,7 +72,7 @@ export class JobsRedisService { workerCount(): Promise { return new Promise((resolve, reject) => { this.redisClient.publish( - 'workers', + InstanceTypes.WORKER, 'count', (error, numberOfSubscribers) => { if (error) { diff --git a/packages/nocodb/src/modules/jobs/redis/jobs.service.ts b/packages/nocodb/src/modules/jobs/redis/jobs.service.ts index 8dd29c02a4..03ba96bbdf 100644 --- a/packages/nocodb/src/modules/jobs/redis/jobs.service.ts +++ b/packages/nocodb/src/modules/jobs/redis/jobs.service.ts @@ -2,7 +2,7 @@ import { InjectQueue } from '@nestjs/bull'; import { Injectable, Logger } from '@nestjs/common'; import { Queue } from 'bull'; import type { OnModuleInit } from '@nestjs/common'; -import { JOBS_QUEUE, JobStatus } from '~/interface/Jobs'; +import { JOBS_QUEUE, JobStatus, WorkerCommands } from '~/interface/Jobs'; import { JobsRedisService } from '~/modules/jobs/redis/jobs-redis.service'; @Injectable() @@ -19,14 +19,16 @@ export class JobsService implements OnModuleInit { if (process.env.NC_WORKER_CONTAINER !== 'true') { await this.jobsQueue.pause(true); } else { - this.jobsRedisService.workerCallbacks['resumeLocal'] = async () => { - this.logger.log('Resuming local queue'); - await this.jobsQueue.resume(true); - }; - this.jobsRedisService.workerCallbacks['pauseLocal'] = async () => { - this.logger.log('Pausing local queue'); - await this.jobsQueue.pause(true); - }; + this.jobsRedisService.workerCallbacks[WorkerCommands.RESUME_LOCAL] = + async () => { + this.logger.log('Resuming local queue'); + await this.jobsQueue.resume(true); + }; + this.jobsRedisService.workerCallbacks[WorkerCommands.PAUSE_LOCAL] = + async () => { + this.logger.log('Pausing local queue'); + await this.jobsQueue.pause(true); + }; } } From 60d41b99f663f5fceb7c52f624e66300dd19f7f6 Mon Sep 17 00:00:00 2001 From: mertmit Date: Tue, 7 Nov 2023 06:26:56 +0000 Subject: [PATCH 4/9] feat: enable health check api for worker --- packages/nocodb/src/modules/metas/metas.module.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/nocodb/src/modules/metas/metas.module.ts b/packages/nocodb/src/modules/metas/metas.module.ts index 31db12c9d1..228ed7e4bb 100644 --- a/packages/nocodb/src/modules/metas/metas.module.ts +++ b/packages/nocodb/src/modules/metas/metas.module.ts @@ -123,7 +123,7 @@ export const metaModuleMetadata = { SharedBasesController, NotificationsController, ] - : []), + : [UtilsController]), ], providers: [ /** Services */ From d9c721ed9755e42f47adc5647ef8ccc47f649553 Mon Sep 17 00:00:00 2001 From: mertmit Date: Tue, 7 Nov 2023 06:26:56 +0000 Subject: [PATCH 5/9] feat: worker controller --- packages/nocodb/src/Noco.ts | 38 ++++---- .../src/modules/jobs/jobs.controller.ts | 31 +------ .../nocodb/src/modules/jobs/jobs.module.ts | 2 + .../src/modules/jobs/redis/jobs.service.ts | 2 +- .../jobs/worker/worker.controller.spec.ts | 21 +++++ .../modules/jobs/worker/worker.controller.ts | 90 +++++++++++++++++++ .../nocodb/src/modules/metas/metas.module.ts | 3 +- packages/nocodb/src/run/cloud.ts | 12 +-- packages/nocodb/src/run/docker.ts | 10 +-- packages/nocodb/src/run/dockerEntry.ts | 10 +-- packages/nocodb/src/run/dockerRunMysql.ts | 10 +-- packages/nocodb/src/run/dockerRunPG.ts | 10 +-- .../nocodb/src/run/dockerRunPG_CyQuick.ts | 10 +-- packages/nocodb/src/run/local.ts | 12 +-- packages/nocodb/src/run/testDocker.ts | 4 +- 15 files changed, 161 insertions(+), 104 deletions(-) create mode 100644 packages/nocodb/src/modules/jobs/worker/worker.controller.spec.ts create mode 100644 packages/nocodb/src/modules/jobs/worker/worker.controller.ts diff --git a/packages/nocodb/src/Noco.ts b/packages/nocodb/src/Noco.ts index 56ab6032b2..b93af0c52d 100644 --- a/packages/nocodb/src/Noco.ts +++ b/packages/nocodb/src/Noco.ts @@ -107,35 +107,33 @@ export default class Noco { throw new Error('NC_REDIS_URL is required'); } process.env.NC_DISABLE_TELE = 'true'; + } - nestApp.init(); - } else { - nestApp.useWebSocketAdapter(new IoAdapter(httpServer)); - - this._httpServer = nestApp.getHttpAdapter().getInstance(); - this._server = server; + nestApp.useWebSocketAdapter(new IoAdapter(httpServer)); - nestApp.use(requestIp.mw()); - nestApp.use(cookieParser()); + this._httpServer = nestApp.getHttpAdapter().getInstance(); + this._server = server; - nestApp.useWebSocketAdapter(new IoAdapter(httpServer)); + nestApp.use(requestIp.mw()); + nestApp.use(cookieParser()); - nestApp.use( - express.json({ limit: process.env.NC_REQUEST_BODY_SIZE || '50mb' }), - ); + nestApp.useWebSocketAdapter(new IoAdapter(httpServer)); - await nestApp.init(); + nestApp.use( + express.json({ limit: process.env.NC_REQUEST_BODY_SIZE || '50mb' }), + ); - const dashboardPath = process.env.NC_DASHBOARD_URL ?? '/dashboard'; - server.use(NcToolGui.expressMiddleware(dashboardPath)); - server.use(express.static(path.join(__dirname, 'public'))); + await nestApp.init(); - if (dashboardPath !== '/' && dashboardPath !== '') { - server.get('/', (_req, res) => res.redirect(dashboardPath)); - } + const dashboardPath = process.env.NC_DASHBOARD_URL ?? '/dashboard'; + server.use(NcToolGui.expressMiddleware(dashboardPath)); + server.use(express.static(path.join(__dirname, 'public'))); - return nestApp.getHttpAdapter().getInstance(); + if (dashboardPath !== '/' && dashboardPath !== '') { + server.get('/', (_req, res) => res.redirect(dashboardPath)); } + + return nestApp.getHttpAdapter().getInstance(); } public static get httpServer(): http.Server { diff --git a/packages/nocodb/src/modules/jobs/jobs.controller.ts b/packages/nocodb/src/modules/jobs/jobs.controller.ts index d8140e12e9..f5a2cdeb4c 100644 --- a/packages/nocodb/src/modules/jobs/jobs.controller.ts +++ b/packages/nocodb/src/modules/jobs/jobs.controller.ts @@ -11,10 +11,9 @@ import { import { OnEvent } from '@nestjs/event-emitter'; import { customAlphabet } from 'nanoid'; import { ModuleRef } from '@nestjs/core'; -import { AuthGuard } from '@nestjs/passport'; import { JobsRedisService } from './redis/jobs-redis.service'; import type { OnModuleInit } from '@nestjs/common'; -import { InstanceTypes, JobStatus, WorkerCommands } from '~/interface/Jobs'; +import { JobStatus } from '~/interface/Jobs'; import { JobEvents } from '~/interface/Jobs'; import { GlobalGuard } from '~/guards/global/global.guard'; import NocoCache from '~/cache/NocoCache'; @@ -175,34 +174,6 @@ export class JobsController implements OnModuleInit { return res; } - // reference: https://github.com/OptimalBits/bull/blob/develop/REFERENCE.md#queueresume - @Post('/internal/workers/resume') - @UseGuards(MetaApiLimiterGuard, AuthGuard('basic')) - async resumeWorkers(@Body() body: { global?: boolean }) { - if (body.global === true) { - await this.jobsService.resumeQueue(); - } else { - await this.jobsRedisService.publish( - InstanceTypes.WORKER, - WorkerCommands.RESUME_LOCAL, - ); - } - } - - // reference: https://github.com/OptimalBits/bull/blob/develop/REFERENCE.md#queuepause - @Post('/internal/workers/pause') - @UseGuards(MetaApiLimiterGuard, AuthGuard('basic')) - async pauseWorkers(@Body() body: { global?: boolean }) { - if (body.global === true) { - await this.jobsService.pauseQueue(); - } else { - await this.jobsRedisService.publish( - InstanceTypes.WORKER, - WorkerCommands.PAUSE_LOCAL, - ); - } - } - @OnEvent(JobEvents.STATUS) sendJobStatus(data: { id: string; status: JobStatus; data?: any }): void { let response; diff --git a/packages/nocodb/src/modules/jobs/jobs.module.ts b/packages/nocodb/src/modules/jobs/jobs.module.ts index 87a61d7b42..2489335386 100644 --- a/packages/nocodb/src/modules/jobs/jobs.module.ts +++ b/packages/nocodb/src/modules/jobs/jobs.module.ts @@ -31,6 +31,7 @@ import { JOBS_QUEUE } from '~/interface/Jobs'; import { MetasModule } from '~/modules/metas/metas.module'; import { DatasModule } from '~/modules/datas/datas.module'; import { GlobalModule } from '~/modules/global/global.module'; +import { WorkerController } from '~/modules/jobs/worker/worker.controller'; export const JobsModuleMetadata = { imports: [ @@ -50,6 +51,7 @@ export const JobsModuleMetadata = { ], controllers: [ JobsController, + WorkerController, ...(process.env.NC_WORKER_CONTAINER !== 'true' ? [ DuplicateController, diff --git a/packages/nocodb/src/modules/jobs/redis/jobs.service.ts b/packages/nocodb/src/modules/jobs/redis/jobs.service.ts index 03ba96bbdf..a2a70995aa 100644 --- a/packages/nocodb/src/modules/jobs/redis/jobs.service.ts +++ b/packages/nocodb/src/modules/jobs/redis/jobs.service.ts @@ -10,7 +10,7 @@ export class JobsService implements OnModuleInit { private logger = new Logger(JobsService.name); constructor( - @InjectQueue(JOBS_QUEUE) protected readonly jobsQueue: Queue, + @InjectQueue(JOBS_QUEUE) public readonly jobsQueue: Queue, protected readonly jobsRedisService: JobsRedisService, ) {} diff --git a/packages/nocodb/src/modules/jobs/worker/worker.controller.spec.ts b/packages/nocodb/src/modules/jobs/worker/worker.controller.spec.ts new file mode 100644 index 0000000000..d1ebafd953 --- /dev/null +++ b/packages/nocodb/src/modules/jobs/worker/worker.controller.spec.ts @@ -0,0 +1,21 @@ +import { Test } from '@nestjs/testing'; +import { UtilsService } from '../../../services/utils.service'; +import { WorkerController } from './worker.controller'; +import type { TestingModule } from '@nestjs/testing'; + +describe('WorkerController', () => { + let controller: WorkerController; + + beforeEach(async () => { + const module: TestingModule = await Test.createTestingModule({ + controllers: [WorkerController], + providers: [UtilsService], + }).compile(); + + controller = module.get(WorkerController); + }); + + it('should be defined', () => { + expect(controller).toBeDefined(); + }); +}); diff --git a/packages/nocodb/src/modules/jobs/worker/worker.controller.ts b/packages/nocodb/src/modules/jobs/worker/worker.controller.ts new file mode 100644 index 0000000000..ddb2df1bab --- /dev/null +++ b/packages/nocodb/src/modules/jobs/worker/worker.controller.ts @@ -0,0 +1,90 @@ +import { Body, Controller, Get, Inject, Post, UseGuards } from '@nestjs/common'; +import { AuthGuard } from '@nestjs/passport'; +import { ModuleRef } from '@nestjs/core'; +import type { OnModuleInit } from '@nestjs/common'; +import type { Queue } from 'bull'; +import { JobsRedisService } from '~/modules/jobs/redis/jobs-redis.service'; +import { UtilsService } from '~/services/utils.service'; +import { InstanceTypes, WorkerCommands } from '~/interface/Jobs'; +import { MetaApiLimiterGuard } from '~/guards/meta-api-limiter.guard'; + +@Controller() +export class WorkerController implements OnModuleInit { + jobsRedisService: JobsRedisService; + + constructor( + protected readonly utilsService: UtilsService, + @Inject('JobsService') private readonly jobsService, + private moduleRef: ModuleRef, + ) {} + + onModuleInit() { + if (process.env.NC_REDIS_JOB_URL) { + this.jobsRedisService = this.moduleRef.get(JobsRedisService); + } + } + + @Get('/api/v1/health') + async appHealth() { + return await this.utilsService.appHealth(); + } + + // reference: https://github.com/OptimalBits/bull/blob/develop/REFERENCE.md#queueresume + @Post('/internal/workers/resume') + @UseGuards(MetaApiLimiterGuard, AuthGuard('basic')) + async resumeWorkers(@Body() body: { global?: boolean }) { + if (body.global === true) { + await this.jobsService.resumeQueue(); + } else { + await this.jobsRedisService.publish( + InstanceTypes.WORKER, + WorkerCommands.RESUME_LOCAL, + ); + } + } + + // reference: https://github.com/OptimalBits/bull/blob/develop/REFERENCE.md#queuepause + @Post('/internal/workers/pause') + @UseGuards(MetaApiLimiterGuard, AuthGuard('basic')) + async pauseWorkers(@Body() body: { global?: boolean }) { + if (body.global === true) { + await this.jobsService.pauseQueue(); + } else { + await this.jobsRedisService.publish( + InstanceTypes.WORKER, + WorkerCommands.PAUSE_LOCAL, + ); + } + } + + @Get('/internal/workers/status') + @UseGuards(MetaApiLimiterGuard, AuthGuard('basic')) + async workerStatus() { + const queue = this.jobsService.jobsQueue as Queue; + const status = (await queue.isPaused(true)) ? 'paused' : 'active'; + const hasRunningJobs = await new Promise((resolve) => { + queue.whenCurrentJobsFinished().then(() => { + resolve(false); + }); + setTimeout(() => { + resolve(true); + }, 2000); + }); + + return { + status, + hasRunningJobs, + }; + } + + @Get('/internal/workers/metrics') + @UseGuards(MetaApiLimiterGuard, AuthGuard('basic')) + async workerMetrics() { + const queue = this.jobsService.jobsQueue as Queue; + return { + queueStatus: (await queue.isPaused()) ? 'paused' : 'active', + jobCounts: await queue.getJobCounts(), + timestamp: Date.now(), + }; + } +} diff --git a/packages/nocodb/src/modules/metas/metas.module.ts b/packages/nocodb/src/modules/metas/metas.module.ts index 228ed7e4bb..d11e812fea 100644 --- a/packages/nocodb/src/modules/metas/metas.module.ts +++ b/packages/nocodb/src/modules/metas/metas.module.ts @@ -123,7 +123,7 @@ export const metaModuleMetadata = { SharedBasesController, NotificationsController, ] - : [UtilsController]), + : []), ], providers: [ /** Services */ @@ -184,6 +184,7 @@ export const metaModuleMetadata = { MetaDiffsService, BasesService, SourcesService, + UtilsService, ], }; diff --git a/packages/nocodb/src/run/cloud.ts b/packages/nocodb/src/run/cloud.ts index 8f428139c4..bfcc4174a4 100644 --- a/packages/nocodb/src/run/cloud.ts +++ b/packages/nocodb/src/run/cloud.ts @@ -18,12 +18,8 @@ server.use( server.set('view engine', 'ejs'); (async () => { - if (process.env.NC_WORKER_CONTAINER === 'true') { - await Noco.init({}, null, null); - } else { - const httpServer = server.listen(process.env.PORT || 8080, async () => { - console.log(`App started successfully.\nVisit -> ${Noco.dashboardUrl}`); - server.use(await Noco.init({}, httpServer, server)); - }); - } + const httpServer = server.listen(process.env.PORT || 8080, async () => { + console.log(`App started successfully.\nVisit -> ${Noco.dashboardUrl}`); + server.use(await Noco.init({}, httpServer, server)); + }); })().catch((e) => console.log(e)); diff --git a/packages/nocodb/src/run/docker.ts b/packages/nocodb/src/run/docker.ts index f80a313eb2..b413d1efa8 100644 --- a/packages/nocodb/src/run/docker.ts +++ b/packages/nocodb/src/run/docker.ts @@ -28,11 +28,7 @@ process.env[`DEBUG`] = 'xc*'; // })().catch((e) => console.log(e)); (async () => { - if (process.env.NC_WORKER_CONTAINER === 'true') { - await Noco.init({}, null, null); - } else { - const httpServer = server.listen(process.env.PORT || 8080, async () => { - server.use(await Noco.init({}, httpServer, server)); - }); - } + const httpServer = server.listen(process.env.PORT || 8080, async () => { + server.use(await Noco.init({}, httpServer, server)); + }); })().catch((e) => console.log(e)); diff --git a/packages/nocodb/src/run/dockerEntry.ts b/packages/nocodb/src/run/dockerEntry.ts index 9ac361d24d..c0f4cbaa0e 100644 --- a/packages/nocodb/src/run/dockerEntry.ts +++ b/packages/nocodb/src/run/dockerEntry.ts @@ -13,11 +13,7 @@ server.use(cors()); server.set('view engine', 'ejs'); (async () => { - if (process.env.NC_WORKER_CONTAINER === 'true') { - await Noco.init({}, null, null); - } else { - const httpServer = server.listen(process.env.PORT || 8080, async () => { - server.use(await Noco.init({}, httpServer, server)); - }); - } + const httpServer = server.listen(process.env.PORT || 8080, async () => { + server.use(await Noco.init({}, httpServer, server)); + }); })().catch((e) => console.log(e)); diff --git a/packages/nocodb/src/run/dockerRunMysql.ts b/packages/nocodb/src/run/dockerRunMysql.ts index d1b8cd2954..91ee249bf2 100644 --- a/packages/nocodb/src/run/dockerRunMysql.ts +++ b/packages/nocodb/src/run/dockerRunMysql.ts @@ -31,11 +31,7 @@ process.env[`NC_DB`] = `mysql2://localhost:3306?u=root&p=password&d=${metaDb}`; // process.env[`DEBUG`] = 'xc*'; (async () => { - if (process.env.NC_WORKER_CONTAINER === 'true') { - await Noco.init({}, null, null); - } else { - const httpServer = server.listen(process.env.PORT || 8080, async () => { - server.use(await Noco.init({}, httpServer, server)); - }); - } + const httpServer = server.listen(process.env.PORT || 8080, async () => { + server.use(await Noco.init({}, httpServer, server)); + }); })().catch((e) => console.log(e)); diff --git a/packages/nocodb/src/run/dockerRunPG.ts b/packages/nocodb/src/run/dockerRunPG.ts index e47e73af9c..e18b7d66af 100644 --- a/packages/nocodb/src/run/dockerRunPG.ts +++ b/packages/nocodb/src/run/dockerRunPG.ts @@ -30,11 +30,7 @@ process.env[`NC_DB`] = `pg://localhost:5432?u=postgres&p=password&d=${metaDb}`; // process.env[`DEBUG`] = 'xc*'; (async () => { - if (process.env.NC_WORKER_CONTAINER === 'true') { - await Noco.init({}, null, null); - } else { - const httpServer = server.listen(process.env.PORT || 8080, async () => { - server.use(await Noco.init({}, httpServer, server)); - }); - } + const httpServer = server.listen(process.env.PORT || 8080, async () => { + server.use(await Noco.init({}, httpServer, server)); + }); })().catch((e) => console.log(e)); diff --git a/packages/nocodb/src/run/dockerRunPG_CyQuick.ts b/packages/nocodb/src/run/dockerRunPG_CyQuick.ts index f0585c8b2a..f48965ce6f 100644 --- a/packages/nocodb/src/run/dockerRunPG_CyQuick.ts +++ b/packages/nocodb/src/run/dockerRunPG_CyQuick.ts @@ -24,11 +24,7 @@ process.env[ //process.env[`DEBUG`] = 'xc*'; (async () => { - if (process.env.NC_WORKER_CONTAINER === 'true') { - await Noco.init({}, null, null); - } else { - const httpServer = server.listen(process.env.PORT || 8080, async () => { - server.use(await Noco.init({}, httpServer, server)); - }); - } + const httpServer = server.listen(process.env.PORT || 8080, async () => { + server.use(await Noco.init({}, httpServer, server)); + }); })().catch((e) => console.log(e)); diff --git a/packages/nocodb/src/run/local.ts b/packages/nocodb/src/run/local.ts index 660c932d25..049f64b5d8 100644 --- a/packages/nocodb/src/run/local.ts +++ b/packages/nocodb/src/run/local.ts @@ -17,12 +17,8 @@ server.use( server.set('view engine', 'ejs'); (async () => { - if (process.env.NC_WORKER_CONTAINER === 'true') { - await Noco.init({}, null, null); - } else { - const httpServer = server.listen(process.env.PORT || 8080, async () => { - console.log(`App started successfully.\nVisit -> ${Noco.dashboardUrl}`); - server.use(await Noco.init({}, httpServer, server)); - }); - } + const httpServer = server.listen(process.env.PORT || 8080, async () => { + console.log(`App started successfully.\nVisit -> ${Noco.dashboardUrl}`); + server.use(await Noco.init({}, httpServer, server)); + }); })().catch((e) => console.log(e)); diff --git a/packages/nocodb/src/run/testDocker.ts b/packages/nocodb/src/run/testDocker.ts index 68c3f78817..bd9f2bc36c 100644 --- a/packages/nocodb/src/run/testDocker.ts +++ b/packages/nocodb/src/run/testDocker.ts @@ -27,7 +27,9 @@ process.env[`NC_ALLOW_LOCAL_HOOKS`] = 'true'; (async () => { if (process.env.NC_WORKER_CONTAINER === 'true') { - await await Noco.init({}, null, null); + const httpServer = server.listen(process.env.PORT || 8080, async () => { + server.use(await Noco.init({}, httpServer, server)); + }); } else { const httpServer = server.listen(process.env.PORT || 8080, async () => { server.use(await Noco.init({}, httpServer, server)); From b6ba7d8cbcd3602a0d2cfa913d969b3d72d9fbf5 Mon Sep 17 00:00:00 2001 From: mertmit Date: Tue, 7 Nov 2023 06:26:57 +0000 Subject: [PATCH 6/9] refactor: unnecessary log --- packages/nocodb/src/modules/jobs/redis/jobs-redis.service.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/nocodb/src/modules/jobs/redis/jobs-redis.service.ts b/packages/nocodb/src/modules/jobs/redis/jobs-redis.service.ts index 8a77a3e618..7673206d16 100644 --- a/packages/nocodb/src/modules/jobs/redis/jobs-redis.service.ts +++ b/packages/nocodb/src/modules/jobs/redis/jobs-redis.service.ts @@ -22,7 +22,6 @@ export class JobsRedisService { } const onMessage = (channel, message) => { - console.log('onMessage', channel, message); if (channel === InstanceTypes.WORKER) { this.workerCallbacks[message] && this.workerCallbacks[message](); } else if (channel === InstanceTypes.PRIMARY) { From 09a4459e895b9116323169db27a07e13c5cc303a Mon Sep 17 00:00:00 2001 From: mertmit Date: Tue, 7 Nov 2023 06:26:57 +0000 Subject: [PATCH 7/9] refactor: move worker controller to ee --- .../nocodb/src/modules/jobs/jobs.module.ts | 2 - .../jobs/worker/worker.controller.spec.ts | 21 ----- .../modules/jobs/worker/worker.controller.ts | 90 ------------------- 3 files changed, 113 deletions(-) delete mode 100644 packages/nocodb/src/modules/jobs/worker/worker.controller.spec.ts delete mode 100644 packages/nocodb/src/modules/jobs/worker/worker.controller.ts diff --git a/packages/nocodb/src/modules/jobs/jobs.module.ts b/packages/nocodb/src/modules/jobs/jobs.module.ts index 2489335386..87a61d7b42 100644 --- a/packages/nocodb/src/modules/jobs/jobs.module.ts +++ b/packages/nocodb/src/modules/jobs/jobs.module.ts @@ -31,7 +31,6 @@ import { JOBS_QUEUE } from '~/interface/Jobs'; import { MetasModule } from '~/modules/metas/metas.module'; import { DatasModule } from '~/modules/datas/datas.module'; import { GlobalModule } from '~/modules/global/global.module'; -import { WorkerController } from '~/modules/jobs/worker/worker.controller'; export const JobsModuleMetadata = { imports: [ @@ -51,7 +50,6 @@ export const JobsModuleMetadata = { ], controllers: [ JobsController, - WorkerController, ...(process.env.NC_WORKER_CONTAINER !== 'true' ? [ DuplicateController, diff --git a/packages/nocodb/src/modules/jobs/worker/worker.controller.spec.ts b/packages/nocodb/src/modules/jobs/worker/worker.controller.spec.ts deleted file mode 100644 index d1ebafd953..0000000000 --- a/packages/nocodb/src/modules/jobs/worker/worker.controller.spec.ts +++ /dev/null @@ -1,21 +0,0 @@ -import { Test } from '@nestjs/testing'; -import { UtilsService } from '../../../services/utils.service'; -import { WorkerController } from './worker.controller'; -import type { TestingModule } from '@nestjs/testing'; - -describe('WorkerController', () => { - let controller: WorkerController; - - beforeEach(async () => { - const module: TestingModule = await Test.createTestingModule({ - controllers: [WorkerController], - providers: [UtilsService], - }).compile(); - - controller = module.get(WorkerController); - }); - - it('should be defined', () => { - expect(controller).toBeDefined(); - }); -}); diff --git a/packages/nocodb/src/modules/jobs/worker/worker.controller.ts b/packages/nocodb/src/modules/jobs/worker/worker.controller.ts deleted file mode 100644 index ddb2df1bab..0000000000 --- a/packages/nocodb/src/modules/jobs/worker/worker.controller.ts +++ /dev/null @@ -1,90 +0,0 @@ -import { Body, Controller, Get, Inject, Post, UseGuards } from '@nestjs/common'; -import { AuthGuard } from '@nestjs/passport'; -import { ModuleRef } from '@nestjs/core'; -import type { OnModuleInit } from '@nestjs/common'; -import type { Queue } from 'bull'; -import { JobsRedisService } from '~/modules/jobs/redis/jobs-redis.service'; -import { UtilsService } from '~/services/utils.service'; -import { InstanceTypes, WorkerCommands } from '~/interface/Jobs'; -import { MetaApiLimiterGuard } from '~/guards/meta-api-limiter.guard'; - -@Controller() -export class WorkerController implements OnModuleInit { - jobsRedisService: JobsRedisService; - - constructor( - protected readonly utilsService: UtilsService, - @Inject('JobsService') private readonly jobsService, - private moduleRef: ModuleRef, - ) {} - - onModuleInit() { - if (process.env.NC_REDIS_JOB_URL) { - this.jobsRedisService = this.moduleRef.get(JobsRedisService); - } - } - - @Get('/api/v1/health') - async appHealth() { - return await this.utilsService.appHealth(); - } - - // reference: https://github.com/OptimalBits/bull/blob/develop/REFERENCE.md#queueresume - @Post('/internal/workers/resume') - @UseGuards(MetaApiLimiterGuard, AuthGuard('basic')) - async resumeWorkers(@Body() body: { global?: boolean }) { - if (body.global === true) { - await this.jobsService.resumeQueue(); - } else { - await this.jobsRedisService.publish( - InstanceTypes.WORKER, - WorkerCommands.RESUME_LOCAL, - ); - } - } - - // reference: https://github.com/OptimalBits/bull/blob/develop/REFERENCE.md#queuepause - @Post('/internal/workers/pause') - @UseGuards(MetaApiLimiterGuard, AuthGuard('basic')) - async pauseWorkers(@Body() body: { global?: boolean }) { - if (body.global === true) { - await this.jobsService.pauseQueue(); - } else { - await this.jobsRedisService.publish( - InstanceTypes.WORKER, - WorkerCommands.PAUSE_LOCAL, - ); - } - } - - @Get('/internal/workers/status') - @UseGuards(MetaApiLimiterGuard, AuthGuard('basic')) - async workerStatus() { - const queue = this.jobsService.jobsQueue as Queue; - const status = (await queue.isPaused(true)) ? 'paused' : 'active'; - const hasRunningJobs = await new Promise((resolve) => { - queue.whenCurrentJobsFinished().then(() => { - resolve(false); - }); - setTimeout(() => { - resolve(true); - }, 2000); - }); - - return { - status, - hasRunningJobs, - }; - } - - @Get('/internal/workers/metrics') - @UseGuards(MetaApiLimiterGuard, AuthGuard('basic')) - async workerMetrics() { - const queue = this.jobsService.jobsQueue as Queue; - return { - queueStatus: (await queue.isPaused()) ? 'paused' : 'active', - jobCounts: await queue.getJobCounts(), - timestamp: Date.now(), - }; - } -} From 3601c5fb097cc2d8a33aa44709ec7b9bfae49047 Mon Sep 17 00:00:00 2001 From: mertmit Date: Tue, 7 Nov 2023 06:26:57 +0000 Subject: [PATCH 8/9] feat: worker reset --- packages/nocodb/src/interface/Jobs.ts | 1 + packages/nocodb/src/modules/jobs/redis/jobs.service.ts | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/nocodb/src/interface/Jobs.ts b/packages/nocodb/src/interface/Jobs.ts index 7c2d121ac3..9bda48a9c5 100644 --- a/packages/nocodb/src/interface/Jobs.ts +++ b/packages/nocodb/src/interface/Jobs.ts @@ -35,4 +35,5 @@ export enum InstanceTypes { export enum WorkerCommands { RESUME_LOCAL = 'resumeLocal', PAUSE_LOCAL = 'pauseLocal', + RESET = 'reset', } diff --git a/packages/nocodb/src/modules/jobs/redis/jobs.service.ts b/packages/nocodb/src/modules/jobs/redis/jobs.service.ts index a2a70995aa..0c4a971caf 100644 --- a/packages/nocodb/src/modules/jobs/redis/jobs.service.ts +++ b/packages/nocodb/src/modules/jobs/redis/jobs.service.ts @@ -7,7 +7,7 @@ import { JobsRedisService } from '~/modules/jobs/redis/jobs-redis.service'; @Injectable() export class JobsService implements OnModuleInit { - private logger = new Logger(JobsService.name); + protected logger = new Logger(JobsService.name); constructor( @InjectQueue(JOBS_QUEUE) public readonly jobsQueue: Queue, From 9ef5c0d14193a9abb09b39856f58477d1f4b0d77 Mon Sep 17 00:00:00 2001 From: mertmit Date: Tue, 7 Nov 2023 06:26:57 +0000 Subject: [PATCH 9/9] fix: allow disabling dynamic queue --- .../src/modules/jobs/redis/jobs.service.ts | 21 +++++++++++-------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/packages/nocodb/src/modules/jobs/redis/jobs.service.ts b/packages/nocodb/src/modules/jobs/redis/jobs.service.ts index 0c4a971caf..d13bb165ac 100644 --- a/packages/nocodb/src/modules/jobs/redis/jobs.service.ts +++ b/packages/nocodb/src/modules/jobs/redis/jobs.service.ts @@ -33,16 +33,19 @@ export class JobsService implements OnModuleInit { } async add(name: string, data: any) { - // resume primary instance queue if there is no worker - const workerCount = await this.jobsRedisService.workerCount(); - const localWorkerPaused = await this.jobsQueue.isPaused(true); + // if NC_WORKER_CONTAINER is false, then skip dynamic queue pause/resume + if (process.env.NC_WORKER_CONTAINER !== 'false') { + // resume primary instance queue if there is no worker + const workerCount = await this.jobsRedisService.workerCount(); + const localWorkerPaused = await this.jobsQueue.isPaused(true); - // if there is no worker and primary instance queue is paused, resume it - // if there is any worker and primary instance queue is not paused, pause it - if (workerCount === 0 && localWorkerPaused) { - await this.jobsQueue.resume(true); - } else if (workerCount > 0 && !localWorkerPaused) { - await this.jobsQueue.pause(true); + // if there is no worker and primary instance queue is paused, resume it + // if there is any worker and primary instance queue is not paused, pause it + if (workerCount === 0 && localWorkerPaused) { + await this.jobsQueue.resume(true); + } else if (workerCount > 0 && !localWorkerPaused) { + await this.jobsQueue.pause(true); + } } const job = await this.jobsQueue.add(name, data);