diff --git a/packages/nocodb/src/modules/jobs/fallback/jobs.service.ts b/packages/nocodb/src/modules/jobs/fallback/jobs.service.ts index 978f3d3d03..594e806a8e 100644 --- a/packages/nocodb/src/modules/jobs/fallback/jobs.service.ts +++ b/packages/nocodb/src/modules/jobs/fallback/jobs.service.ts @@ -48,4 +48,12 @@ export class JobsService { return job; } + + async resumeQueue() { + await this.fallbackQueueService.queue.start(); + } + + async pauseQueue() { + await this.fallbackQueueService.queue.pause(); + } } diff --git a/packages/nocodb/src/modules/jobs/jobs.controller.ts b/packages/nocodb/src/modules/jobs/jobs.controller.ts index f5a2cdeb4c..f036eba6cd 100644 --- a/packages/nocodb/src/modules/jobs/jobs.controller.ts +++ b/packages/nocodb/src/modules/jobs/jobs.controller.ts @@ -11,6 +11,7 @@ import { import { OnEvent } from '@nestjs/event-emitter'; import { customAlphabet } from 'nanoid'; import { ModuleRef } from '@nestjs/core'; +import { AuthGuard } from '@nestjs/passport'; import { JobsRedisService } from './redis/jobs-redis.service'; import type { OnModuleInit } from '@nestjs/common'; import { JobStatus } from '~/interface/Jobs'; @@ -174,6 +175,28 @@ export class JobsController implements OnModuleInit { return res; } + // reference: https://github.com/OptimalBits/bull/blob/develop/REFERENCE.md#queueresume + @Post('/internal/workers/resume') + @UseGuards(MetaApiLimiterGuard, AuthGuard('basic')) + async resumeWorkers(@Body() body: { global?: boolean }) { + if (body.global === true) { + await this.jobsService.resumeQueue(); + } else { + await this.jobsRedisService.publish('workers', 'resumeLocal'); + } + } + + // reference: https://github.com/OptimalBits/bull/blob/develop/REFERENCE.md#queuepause + @Post('/internal/workers/pause') + @UseGuards(MetaApiLimiterGuard, AuthGuard('basic')) + async pauseWorkers(@Body() body: { global?: boolean }) { + if (body.global === true) { + await this.jobsService.pauseQueue(); + } else { + await this.jobsRedisService.publish('workers', 'pauseLocal'); + } + } + @OnEvent(JobEvents.STATUS) sendJobStatus(data: { id: string; status: JobStatus; data?: any }): void { let response; diff --git a/packages/nocodb/src/modules/jobs/redis/jobs-redis.service.ts b/packages/nocodb/src/modules/jobs/redis/jobs-redis.service.ts index 5d50947b9e..9dd92c0578 100644 --- a/packages/nocodb/src/modules/jobs/redis/jobs-redis.service.ts +++ b/packages/nocodb/src/modules/jobs/redis/jobs-redis.service.ts @@ -7,9 +7,29 @@ export class JobsRedisService { private redisSubscriber: Redis; private unsubscribeCallbacks: { [key: string]: () => void } = {}; + public workerCallbacks: { [key: string]: () => void } = {}; + public instanceCallbacks: { [key: string]: () => void } = {}; + constructor() { this.redisClient = new Redis(process.env.NC_REDIS_JOB_URL); this.redisSubscriber = new Redis(process.env.NC_REDIS_JOB_URL); + + if (process.env.NC_WORKER_CONTAINER === 'true') { + this.redisSubscriber.subscribe('workers'); + } else { + this.redisSubscriber.subscribe('instances'); + } + + const onMessage = (channel, message) => { + console.log('onMessage', channel, message); + if (channel === 'workers') { + this.workerCallbacks[message] && this.workerCallbacks[message](); + } else if (channel === 'instances') { + this.instanceCallbacks[message] && this.instanceCallbacks[message](); + } + }; + + this.redisSubscriber.on('message', onMessage); } publish(channel: string, message: string | any) { @@ -47,4 +67,20 @@ export class JobsRedisService { delete this.unsubscribeCallbacks[channel]; } } + + workerCount(): Promise { + return new Promise((resolve, reject) => { + this.redisClient.publish( + 'workers', + 'count', + (error, numberOfSubscribers) => { + if (error) { + reject(0); + } else { + resolve(numberOfSubscribers); + } + }, + ); + }); + } } diff --git a/packages/nocodb/src/modules/jobs/redis/jobs.service.ts b/packages/nocodb/src/modules/jobs/redis/jobs.service.ts index 5e3dcb14a2..4202c6f8dc 100644 --- a/packages/nocodb/src/modules/jobs/redis/jobs.service.ts +++ b/packages/nocodb/src/modules/jobs/redis/jobs.service.ts @@ -3,30 +3,48 @@ import { Injectable } from '@nestjs/common'; import { Queue } from 'bull'; import type { OnModuleInit } from '@nestjs/common'; import { JOBS_QUEUE, JobStatus } from '~/interface/Jobs'; +import { JobsRedisService } from '~/modules/jobs/redis/jobs-redis.service'; @Injectable() export class JobsService implements OnModuleInit { - constructor(@InjectQueue(JOBS_QUEUE) protected readonly jobsQueue: Queue) {} + constructor( + @InjectQueue(JOBS_QUEUE) protected readonly jobsQueue: Queue, + protected readonly jobsRedisService: JobsRedisService, + ) {} // pause primary instance queue async onModuleInit() { if (process.env.NC_WORKER_CONTAINER !== 'true') { - // await this.jobsQueue.pause(true); + await this.jobsQueue.pause(true); + this.jobsRedisService.publish('workers', 'pause'); + } else { + this.jobsRedisService.workerCallbacks['resumeLocal'] = async () => { + await this.jobsQueue.resume(true); + }; + this.jobsRedisService.workerCallbacks['pauseLocal'] = async () => { + await this.jobsQueue.pause(true); + }; + this.jobsRedisService.workerCallbacks['resume'] = async () => { + await this.jobsQueue.resume(); + }; + this.jobsRedisService.workerCallbacks['pause'] = async () => { + await this.jobsQueue.pause(); + }; } } async add(name: string, data: any) { // resume primary instance queue if there is no worker - /* const workerCount = (await this.jobsQueue.getWorkers()).length; + const workerCount = await this.jobsRedisService.workerCount(); const localWorkerPaused = await this.jobsQueue.isPaused(true); // if there is no worker and primary instance queue is paused, resume it // if there is any worker and primary instance queue is not paused, pause it - if (workerCount === 1 && localWorkerPaused) { + if (workerCount === 0 && localWorkerPaused) { await this.jobsQueue.resume(true); - } else if (workerCount > 1 && !localWorkerPaused) { + } else if (workerCount > 0 && !localWorkerPaused) { await this.jobsQueue.pause(true); - } */ + } const job = await this.jobsQueue.add(name, data); @@ -72,4 +90,12 @@ export class JobsService implements OnModuleInit { return job; } + + async resumeQueue() { + await this.jobsQueue.resume(); + } + + async pauseQueue() { + await this.jobsQueue.pause(); + } } diff --git a/packages/nocodb/src/run/testDocker.ts b/packages/nocodb/src/run/testDocker.ts index 6391c75dec..68c3f78817 100644 --- a/packages/nocodb/src/run/testDocker.ts +++ b/packages/nocodb/src/run/testDocker.ts @@ -26,57 +26,63 @@ process.env[`DEBUG`] = 'xc*'; process.env[`NC_ALLOW_LOCAL_HOOKS`] = 'true'; (async () => { - const httpServer = server.listen(process.env.PORT || 8080, async () => { - server.use(await Noco.init({}, httpServer, server)); + if (process.env.NC_WORKER_CONTAINER === 'true') { + await await Noco.init({}, null, null); + } else { + const httpServer = server.listen(process.env.PORT || 8080, async () => { + server.use(await Noco.init({}, httpServer, server)); - let admin_response; - if (!(await User.getByEmail('user@nocodb.com'))) { - admin_response = await axios.post( - `http://localhost:${process.env.PORT || 8080}/api/v1/auth/user/signup`, - { - email: 'user@nocodb.com', - password: 'Password123.', - }, - ); - console.log(admin_response.data); - } - - for (let i = 0; i < 4; i++) { - if (!(await User.getByEmail(`user-${i}@nocodb.com`))) { - const response = await axios.post( + let admin_response; + if (!(await User.getByEmail('user@nocodb.com'))) { + admin_response = await axios.post( `http://localhost:${ process.env.PORT || 8080 }/api/v1/auth/user/signup`, { - email: `user-${i}@nocodb.com`, + email: 'user@nocodb.com', password: 'Password123.', }, ); - console.log(response.data); + console.log(admin_response.data); + } - const user = await axios.get( - `http://localhost:${process.env.PORT || 8080}/api/v1/auth/user/me`, - { - headers: { - 'xc-auth': response.data.token, + for (let i = 0; i < 4; i++) { + if (!(await User.getByEmail(`user-${i}@nocodb.com`))) { + const response = await axios.post( + `http://localhost:${ + process.env.PORT || 8080 + }/api/v1/auth/user/signup`, + { + email: `user-${i}@nocodb.com`, + password: 'Password123.', }, - }, - ); + ); + console.log(response.data); - const response2 = await axios.patch( - `http://localhost:${process.env.PORT || 8080}/api/v1/users/${ - user.data.id - }`, - { roles: 'org-level-creator' }, - { - headers: { - 'xc-auth': admin_response.data.token, + const user = await axios.get( + `http://localhost:${process.env.PORT || 8080}/api/v1/auth/user/me`, + { + headers: { + 'xc-auth': response.data.token, + }, }, - }, - ); + ); + + const response2 = await axios.patch( + `http://localhost:${process.env.PORT || 8080}/api/v1/users/${ + user.data.id + }`, + { roles: 'org-level-creator' }, + { + headers: { + 'xc-auth': admin_response.data.token, + }, + }, + ); - console.log(response2.data); + console.log(response2.data); + } } - } - }); + }); + } })().catch((e) => console.log(e));