Browse Source

feat: prepare worker cluster

pull/6899/head
mertmit 1 year ago
parent
commit
cd267cad78
  1. 8
      packages/nocodb/src/modules/jobs/fallback/jobs.service.ts
  2. 23
      packages/nocodb/src/modules/jobs/jobs.controller.ts
  3. 36
      packages/nocodb/src/modules/jobs/redis/jobs-redis.service.ts
  4. 38
      packages/nocodb/src/modules/jobs/redis/jobs.service.ts
  5. 8
      packages/nocodb/src/run/testDocker.ts

8
packages/nocodb/src/modules/jobs/fallback/jobs.service.ts

@ -48,4 +48,12 @@ export class JobsService {
return job; return job;
} }
async resumeQueue() {
await this.fallbackQueueService.queue.start();
}
async pauseQueue() {
await this.fallbackQueueService.queue.pause();
}
} }

23
packages/nocodb/src/modules/jobs/jobs.controller.ts

@ -11,6 +11,7 @@ import {
import { OnEvent } from '@nestjs/event-emitter'; import { OnEvent } from '@nestjs/event-emitter';
import { customAlphabet } from 'nanoid'; import { customAlphabet } from 'nanoid';
import { ModuleRef } from '@nestjs/core'; import { ModuleRef } from '@nestjs/core';
import { AuthGuard } from '@nestjs/passport';
import { JobsRedisService } from './redis/jobs-redis.service'; import { JobsRedisService } from './redis/jobs-redis.service';
import type { OnModuleInit } from '@nestjs/common'; import type { OnModuleInit } from '@nestjs/common';
import { JobStatus } from '~/interface/Jobs'; import { JobStatus } from '~/interface/Jobs';
@ -174,6 +175,28 @@ export class JobsController implements OnModuleInit {
return res; return res;
} }
// reference: https://github.com/OptimalBits/bull/blob/develop/REFERENCE.md#queueresume
@Post('/internal/workers/resume')
@UseGuards(MetaApiLimiterGuard, AuthGuard('basic'))
async resumeWorkers(@Body() body: { global?: boolean }) {
if (body.global === true) {
await this.jobsService.resumeQueue();
} else {
await this.jobsRedisService.publish('workers', 'resumeLocal');
}
}
// reference: https://github.com/OptimalBits/bull/blob/develop/REFERENCE.md#queuepause
@Post('/internal/workers/pause')
@UseGuards(MetaApiLimiterGuard, AuthGuard('basic'))
async pauseWorkers(@Body() body: { global?: boolean }) {
if (body.global === true) {
await this.jobsService.pauseQueue();
} else {
await this.jobsRedisService.publish('workers', 'pauseLocal');
}
}
@OnEvent(JobEvents.STATUS) @OnEvent(JobEvents.STATUS)
sendJobStatus(data: { id: string; status: JobStatus; data?: any }): void { sendJobStatus(data: { id: string; status: JobStatus; data?: any }): void {
let response; let response;

36
packages/nocodb/src/modules/jobs/redis/jobs-redis.service.ts

@ -7,9 +7,29 @@ export class JobsRedisService {
private redisSubscriber: Redis; private redisSubscriber: Redis;
private unsubscribeCallbacks: { [key: string]: () => void } = {}; private unsubscribeCallbacks: { [key: string]: () => void } = {};
public workerCallbacks: { [key: string]: () => void } = {};
public instanceCallbacks: { [key: string]: () => void } = {};
constructor() { constructor() {
this.redisClient = new Redis(process.env.NC_REDIS_JOB_URL); this.redisClient = new Redis(process.env.NC_REDIS_JOB_URL);
this.redisSubscriber = new Redis(process.env.NC_REDIS_JOB_URL); this.redisSubscriber = new Redis(process.env.NC_REDIS_JOB_URL);
if (process.env.NC_WORKER_CONTAINER === 'true') {
this.redisSubscriber.subscribe('workers');
} else {
this.redisSubscriber.subscribe('instances');
}
const onMessage = (channel, message) => {
console.log('onMessage', channel, message);
if (channel === 'workers') {
this.workerCallbacks[message] && this.workerCallbacks[message]();
} else if (channel === 'instances') {
this.instanceCallbacks[message] && this.instanceCallbacks[message]();
}
};
this.redisSubscriber.on('message', onMessage);
} }
publish(channel: string, message: string | any) { publish(channel: string, message: string | any) {
@ -47,4 +67,20 @@ export class JobsRedisService {
delete this.unsubscribeCallbacks[channel]; delete this.unsubscribeCallbacks[channel];
} }
} }
workerCount(): Promise<number> {
return new Promise((resolve, reject) => {
this.redisClient.publish(
'workers',
'count',
(error, numberOfSubscribers) => {
if (error) {
reject(0);
} else {
resolve(numberOfSubscribers);
}
},
);
});
}
} }

38
packages/nocodb/src/modules/jobs/redis/jobs.service.ts

@ -3,30 +3,48 @@ import { Injectable } from '@nestjs/common';
import { Queue } from 'bull'; import { Queue } from 'bull';
import type { OnModuleInit } from '@nestjs/common'; import type { OnModuleInit } from '@nestjs/common';
import { JOBS_QUEUE, JobStatus } from '~/interface/Jobs'; import { JOBS_QUEUE, JobStatus } from '~/interface/Jobs';
import { JobsRedisService } from '~/modules/jobs/redis/jobs-redis.service';
@Injectable() @Injectable()
export class JobsService implements OnModuleInit { export class JobsService implements OnModuleInit {
constructor(@InjectQueue(JOBS_QUEUE) protected readonly jobsQueue: Queue) {} constructor(
@InjectQueue(JOBS_QUEUE) protected readonly jobsQueue: Queue,
protected readonly jobsRedisService: JobsRedisService,
) {}
// pause primary instance queue // pause primary instance queue
async onModuleInit() { async onModuleInit() {
if (process.env.NC_WORKER_CONTAINER !== 'true') { if (process.env.NC_WORKER_CONTAINER !== 'true') {
// await this.jobsQueue.pause(true); await this.jobsQueue.pause(true);
this.jobsRedisService.publish('workers', 'pause');
} else {
this.jobsRedisService.workerCallbacks['resumeLocal'] = async () => {
await this.jobsQueue.resume(true);
};
this.jobsRedisService.workerCallbacks['pauseLocal'] = async () => {
await this.jobsQueue.pause(true);
};
this.jobsRedisService.workerCallbacks['resume'] = async () => {
await this.jobsQueue.resume();
};
this.jobsRedisService.workerCallbacks['pause'] = async () => {
await this.jobsQueue.pause();
};
} }
} }
async add(name: string, data: any) { async add(name: string, data: any) {
// resume primary instance queue if there is no worker // resume primary instance queue if there is no worker
/* const workerCount = (await this.jobsQueue.getWorkers()).length; const workerCount = await this.jobsRedisService.workerCount();
const localWorkerPaused = await this.jobsQueue.isPaused(true); const localWorkerPaused = await this.jobsQueue.isPaused(true);
// if there is no worker and primary instance queue is paused, resume it // if there is no worker and primary instance queue is paused, resume it
// if there is any worker and primary instance queue is not paused, pause it // if there is any worker and primary instance queue is not paused, pause it
if (workerCount === 1 && localWorkerPaused) { if (workerCount === 0 && localWorkerPaused) {
await this.jobsQueue.resume(true); await this.jobsQueue.resume(true);
} else if (workerCount > 1 && !localWorkerPaused) { } else if (workerCount > 0 && !localWorkerPaused) {
await this.jobsQueue.pause(true); await this.jobsQueue.pause(true);
} */ }
const job = await this.jobsQueue.add(name, data); const job = await this.jobsQueue.add(name, data);
@ -72,4 +90,12 @@ export class JobsService implements OnModuleInit {
return job; return job;
} }
async resumeQueue() {
await this.jobsQueue.resume();
}
async pauseQueue() {
await this.jobsQueue.pause();
}
} }

8
packages/nocodb/src/run/testDocker.ts

@ -26,13 +26,18 @@ process.env[`DEBUG`] = 'xc*';
process.env[`NC_ALLOW_LOCAL_HOOKS`] = 'true'; process.env[`NC_ALLOW_LOCAL_HOOKS`] = 'true';
(async () => { (async () => {
if (process.env.NC_WORKER_CONTAINER === 'true') {
await await Noco.init({}, null, null);
} else {
const httpServer = server.listen(process.env.PORT || 8080, async () => { const httpServer = server.listen(process.env.PORT || 8080, async () => {
server.use(await Noco.init({}, httpServer, server)); server.use(await Noco.init({}, httpServer, server));
let admin_response; let admin_response;
if (!(await User.getByEmail('user@nocodb.com'))) { if (!(await User.getByEmail('user@nocodb.com'))) {
admin_response = await axios.post( admin_response = await axios.post(
`http://localhost:${process.env.PORT || 8080}/api/v1/auth/user/signup`, `http://localhost:${
process.env.PORT || 8080
}/api/v1/auth/user/signup`,
{ {
email: 'user@nocodb.com', email: 'user@nocodb.com',
password: 'Password123.', password: 'Password123.',
@ -79,4 +84,5 @@ process.env[`NC_ALLOW_LOCAL_HOOKS`] = 'true';
} }
} }
}); });
}
})().catch((e) => console.log(e)); })().catch((e) => console.log(e));

Loading…
Cancel
Save