Browse Source

Merge pull request #6899 from nocodb/nc-feat/worker-cluster

fix: worker
pull/6901/head
mertmit 1 year ago committed by GitHub
parent
commit
6a03127189
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 38
      packages/nocodb/src/Noco.ts
  2. 11
      packages/nocodb/src/interface/Jobs.ts
  3. 8
      packages/nocodb/src/modules/jobs/fallback/jobs.service.ts
  4. 36
      packages/nocodb/src/modules/jobs/redis/jobs-redis.service.ts
  5. 58
      packages/nocodb/src/modules/jobs/redis/jobs.service.ts
  6. 1
      packages/nocodb/src/modules/metas/metas.module.ts
  7. 12
      packages/nocodb/src/run/cloud.ts
  8. 10
      packages/nocodb/src/run/docker.ts
  9. 10
      packages/nocodb/src/run/dockerEntry.ts
  10. 10
      packages/nocodb/src/run/dockerRunMysql.ts
  11. 10
      packages/nocodb/src/run/dockerRunPG.ts
  12. 10
      packages/nocodb/src/run/dockerRunPG_CyQuick.ts
  13. 12
      packages/nocodb/src/run/local.ts
  14. 86
      packages/nocodb/src/run/testDocker.ts

38
packages/nocodb/src/Noco.ts

@ -107,35 +107,33 @@ export default class Noco {
throw new Error('NC_REDIS_URL is required');
}
process.env.NC_DISABLE_TELE = 'true';
}
nestApp.init();
} else {
nestApp.useWebSocketAdapter(new IoAdapter(httpServer));
this._httpServer = nestApp.getHttpAdapter().getInstance();
this._server = server;
nestApp.useWebSocketAdapter(new IoAdapter(httpServer));
nestApp.use(requestIp.mw());
nestApp.use(cookieParser());
this._httpServer = nestApp.getHttpAdapter().getInstance();
this._server = server;
nestApp.useWebSocketAdapter(new IoAdapter(httpServer));
nestApp.use(requestIp.mw());
nestApp.use(cookieParser());
nestApp.use(
express.json({ limit: process.env.NC_REQUEST_BODY_SIZE || '50mb' }),
);
nestApp.useWebSocketAdapter(new IoAdapter(httpServer));
await nestApp.init();
nestApp.use(
express.json({ limit: process.env.NC_REQUEST_BODY_SIZE || '50mb' }),
);
const dashboardPath = process.env.NC_DASHBOARD_URL ?? '/dashboard';
server.use(NcToolGui.expressMiddleware(dashboardPath));
server.use(express.static(path.join(__dirname, 'public')));
await nestApp.init();
if (dashboardPath !== '/' && dashboardPath !== '') {
server.get('/', (_req, res) => res.redirect(dashboardPath));
}
const dashboardPath = process.env.NC_DASHBOARD_URL ?? '/dashboard';
server.use(NcToolGui.expressMiddleware(dashboardPath));
server.use(express.static(path.join(__dirname, 'public')));
return nestApp.getHttpAdapter().getInstance();
if (dashboardPath !== '/' && dashboardPath !== '') {
server.get('/', (_req, res) => res.redirect(dashboardPath));
}
return nestApp.getHttpAdapter().getInstance();
}
public static get httpServer(): http.Server {

11
packages/nocodb/src/interface/Jobs.ts

@ -26,3 +26,14 @@ export enum JobEvents {
STATUS = 'job.status',
LOG = 'job.log',
}
export enum InstanceTypes {
PRIMARY = 'primary',
WORKER = 'worker',
}
export enum WorkerCommands {
RESUME_LOCAL = 'resumeLocal',
PAUSE_LOCAL = 'pauseLocal',
RESET = 'reset',
}

8
packages/nocodb/src/modules/jobs/fallback/jobs.service.ts

@ -48,4 +48,12 @@ export class JobsService {
return job;
}
async resumeQueue() {
await this.fallbackQueueService.queue.start();
}
async pauseQueue() {
await this.fallbackQueueService.queue.pause();
}
}

36
packages/nocodb/src/modules/jobs/redis/jobs-redis.service.ts

@ -1,5 +1,6 @@
import { Injectable } from '@nestjs/common';
import Redis from 'ioredis';
import { InstanceTypes } from '~/interface/Jobs';
@Injectable()
export class JobsRedisService {
@ -7,9 +8,28 @@ export class JobsRedisService {
private redisSubscriber: Redis;
private unsubscribeCallbacks: { [key: string]: () => void } = {};
public primaryCallbacks: { [key: string]: () => void } = {};
public workerCallbacks: { [key: string]: () => void } = {};
constructor() {
this.redisClient = new Redis(process.env.NC_REDIS_JOB_URL);
this.redisSubscriber = new Redis(process.env.NC_REDIS_JOB_URL);
if (process.env.NC_WORKER_CONTAINER === 'true') {
this.redisSubscriber.subscribe(InstanceTypes.WORKER);
} else {
this.redisSubscriber.subscribe(InstanceTypes.PRIMARY);
}
const onMessage = (channel, message) => {
if (channel === InstanceTypes.WORKER) {
this.workerCallbacks[message] && this.workerCallbacks[message]();
} else if (channel === InstanceTypes.PRIMARY) {
this.primaryCallbacks[message] && this.primaryCallbacks[message]();
}
};
this.redisSubscriber.on('message', onMessage);
}
publish(channel: string, message: string | any) {
@ -47,4 +67,20 @@ export class JobsRedisService {
delete this.unsubscribeCallbacks[channel];
}
}
workerCount(): Promise<number> {
return new Promise((resolve, reject) => {
this.redisClient.publish(
InstanceTypes.WORKER,
'count',
(error, numberOfSubscribers) => {
if (error) {
reject(0);
} else {
resolve(numberOfSubscribers);
}
},
);
});
}
}

58
packages/nocodb/src/modules/jobs/redis/jobs.service.ts

@ -1,32 +1,52 @@
import { InjectQueue } from '@nestjs/bull';
import { Injectable } from '@nestjs/common';
import { Injectable, Logger } from '@nestjs/common';
import { Queue } from 'bull';
import type { OnModuleInit } from '@nestjs/common';
import { JOBS_QUEUE, JobStatus } from '~/interface/Jobs';
import { JOBS_QUEUE, JobStatus, WorkerCommands } from '~/interface/Jobs';
import { JobsRedisService } from '~/modules/jobs/redis/jobs-redis.service';
@Injectable()
export class JobsService implements OnModuleInit {
constructor(@InjectQueue(JOBS_QUEUE) protected readonly jobsQueue: Queue) {}
protected logger = new Logger(JobsService.name);
constructor(
@InjectQueue(JOBS_QUEUE) public readonly jobsQueue: Queue,
protected readonly jobsRedisService: JobsRedisService,
) {}
// pause primary instance queue
async onModuleInit() {
if (process.env.NC_WORKER_CONTAINER !== 'true') {
// await this.jobsQueue.pause(true);
await this.jobsQueue.pause(true);
} else {
this.jobsRedisService.workerCallbacks[WorkerCommands.RESUME_LOCAL] =
async () => {
this.logger.log('Resuming local queue');
await this.jobsQueue.resume(true);
};
this.jobsRedisService.workerCallbacks[WorkerCommands.PAUSE_LOCAL] =
async () => {
this.logger.log('Pausing local queue');
await this.jobsQueue.pause(true);
};
}
}
async add(name: string, data: any) {
// resume primary instance queue if there is no worker
/* const workerCount = (await this.jobsQueue.getWorkers()).length;
const localWorkerPaused = await this.jobsQueue.isPaused(true);
// if NC_WORKER_CONTAINER is false, then skip dynamic queue pause/resume
if (process.env.NC_WORKER_CONTAINER !== 'false') {
// resume primary instance queue if there is no worker
const workerCount = await this.jobsRedisService.workerCount();
const localWorkerPaused = await this.jobsQueue.isPaused(true);
// if there is no worker and primary instance queue is paused, resume it
// if there is any worker and primary instance queue is not paused, pause it
if (workerCount === 1 && localWorkerPaused) {
await this.jobsQueue.resume(true);
} else if (workerCount > 1 && !localWorkerPaused) {
await this.jobsQueue.pause(true);
} */
// if there is no worker and primary instance queue is paused, resume it
// if there is any worker and primary instance queue is not paused, pause it
if (workerCount === 0 && localWorkerPaused) {
await this.jobsQueue.resume(true);
} else if (workerCount > 0 && !localWorkerPaused) {
await this.jobsQueue.pause(true);
}
}
const job = await this.jobsQueue.add(name, data);
@ -72,4 +92,14 @@ export class JobsService implements OnModuleInit {
return job;
}
async resumeQueue() {
this.logger.log('Resuming global queue');
await this.jobsQueue.resume();
}
async pauseQueue() {
this.logger.log('Pausing global queue');
await this.jobsQueue.pause();
}
}

1
packages/nocodb/src/modules/metas/metas.module.ts

@ -184,6 +184,7 @@ export const metaModuleMetadata = {
MetaDiffsService,
BasesService,
SourcesService,
UtilsService,
],
};

12
packages/nocodb/src/run/cloud.ts

@ -18,12 +18,8 @@ server.use(
server.set('view engine', 'ejs');
(async () => {
if (process.env.NC_WORKER_CONTAINER === 'true') {
await Noco.init({}, null, null);
} else {
const httpServer = server.listen(process.env.PORT || 8080, async () => {
console.log(`App started successfully.\nVisit -> ${Noco.dashboardUrl}`);
server.use(await Noco.init({}, httpServer, server));
});
}
const httpServer = server.listen(process.env.PORT || 8080, async () => {
console.log(`App started successfully.\nVisit -> ${Noco.dashboardUrl}`);
server.use(await Noco.init({}, httpServer, server));
});
})().catch((e) => console.log(e));

10
packages/nocodb/src/run/docker.ts

@ -28,11 +28,7 @@ process.env[`DEBUG`] = 'xc*';
// })().catch((e) => console.log(e));
(async () => {
if (process.env.NC_WORKER_CONTAINER === 'true') {
await Noco.init({}, null, null);
} else {
const httpServer = server.listen(process.env.PORT || 8080, async () => {
server.use(await Noco.init({}, httpServer, server));
});
}
const httpServer = server.listen(process.env.PORT || 8080, async () => {
server.use(await Noco.init({}, httpServer, server));
});
})().catch((e) => console.log(e));

10
packages/nocodb/src/run/dockerEntry.ts

@ -13,11 +13,7 @@ server.use(cors());
server.set('view engine', 'ejs');
(async () => {
if (process.env.NC_WORKER_CONTAINER === 'true') {
await Noco.init({}, null, null);
} else {
const httpServer = server.listen(process.env.PORT || 8080, async () => {
server.use(await Noco.init({}, httpServer, server));
});
}
const httpServer = server.listen(process.env.PORT || 8080, async () => {
server.use(await Noco.init({}, httpServer, server));
});
})().catch((e) => console.log(e));

10
packages/nocodb/src/run/dockerRunMysql.ts

@ -31,11 +31,7 @@ process.env[`NC_DB`] = `mysql2://localhost:3306?u=root&p=password&d=${metaDb}`;
// process.env[`DEBUG`] = 'xc*';
(async () => {
if (process.env.NC_WORKER_CONTAINER === 'true') {
await Noco.init({}, null, null);
} else {
const httpServer = server.listen(process.env.PORT || 8080, async () => {
server.use(await Noco.init({}, httpServer, server));
});
}
const httpServer = server.listen(process.env.PORT || 8080, async () => {
server.use(await Noco.init({}, httpServer, server));
});
})().catch((e) => console.log(e));

10
packages/nocodb/src/run/dockerRunPG.ts

@ -30,11 +30,7 @@ process.env[`NC_DB`] = `pg://localhost:5432?u=postgres&p=password&d=${metaDb}`;
// process.env[`DEBUG`] = 'xc*';
(async () => {
if (process.env.NC_WORKER_CONTAINER === 'true') {
await Noco.init({}, null, null);
} else {
const httpServer = server.listen(process.env.PORT || 8080, async () => {
server.use(await Noco.init({}, httpServer, server));
});
}
const httpServer = server.listen(process.env.PORT || 8080, async () => {
server.use(await Noco.init({}, httpServer, server));
});
})().catch((e) => console.log(e));

10
packages/nocodb/src/run/dockerRunPG_CyQuick.ts

@ -24,11 +24,7 @@ process.env[
//process.env[`DEBUG`] = 'xc*';
(async () => {
if (process.env.NC_WORKER_CONTAINER === 'true') {
await Noco.init({}, null, null);
} else {
const httpServer = server.listen(process.env.PORT || 8080, async () => {
server.use(await Noco.init({}, httpServer, server));
});
}
const httpServer = server.listen(process.env.PORT || 8080, async () => {
server.use(await Noco.init({}, httpServer, server));
});
})().catch((e) => console.log(e));

12
packages/nocodb/src/run/local.ts

@ -17,12 +17,8 @@ server.use(
server.set('view engine', 'ejs');
(async () => {
if (process.env.NC_WORKER_CONTAINER === 'true') {
await Noco.init({}, null, null);
} else {
const httpServer = server.listen(process.env.PORT || 8080, async () => {
console.log(`App started successfully.\nVisit -> ${Noco.dashboardUrl}`);
server.use(await Noco.init({}, httpServer, server));
});
}
const httpServer = server.listen(process.env.PORT || 8080, async () => {
console.log(`App started successfully.\nVisit -> ${Noco.dashboardUrl}`);
server.use(await Noco.init({}, httpServer, server));
});
})().catch((e) => console.log(e));

86
packages/nocodb/src/run/testDocker.ts

@ -26,57 +26,65 @@ process.env[`DEBUG`] = 'xc*';
process.env[`NC_ALLOW_LOCAL_HOOKS`] = 'true';
(async () => {
const httpServer = server.listen(process.env.PORT || 8080, async () => {
server.use(await Noco.init({}, httpServer, server));
if (process.env.NC_WORKER_CONTAINER === 'true') {
const httpServer = server.listen(process.env.PORT || 8080, async () => {
server.use(await Noco.init({}, httpServer, server));
});
} else {
const httpServer = server.listen(process.env.PORT || 8080, async () => {
server.use(await Noco.init({}, httpServer, server));
let admin_response;
if (!(await User.getByEmail('user@nocodb.com'))) {
admin_response = await axios.post(
`http://localhost:${process.env.PORT || 8080}/api/v1/auth/user/signup`,
{
email: 'user@nocodb.com',
password: 'Password123.',
},
);
console.log(admin_response.data);
}
for (let i = 0; i < 4; i++) {
if (!(await User.getByEmail(`user-${i}@nocodb.com`))) {
const response = await axios.post(
let admin_response;
if (!(await User.getByEmail('user@nocodb.com'))) {
admin_response = await axios.post(
`http://localhost:${
process.env.PORT || 8080
}/api/v1/auth/user/signup`,
{
email: `user-${i}@nocodb.com`,
email: 'user@nocodb.com',
password: 'Password123.',
},
);
console.log(response.data);
console.log(admin_response.data);
}
const user = await axios.get(
`http://localhost:${process.env.PORT || 8080}/api/v1/auth/user/me`,
{
headers: {
'xc-auth': response.data.token,
for (let i = 0; i < 4; i++) {
if (!(await User.getByEmail(`user-${i}@nocodb.com`))) {
const response = await axios.post(
`http://localhost:${
process.env.PORT || 8080
}/api/v1/auth/user/signup`,
{
email: `user-${i}@nocodb.com`,
password: 'Password123.',
},
},
);
);
console.log(response.data);
const response2 = await axios.patch(
`http://localhost:${process.env.PORT || 8080}/api/v1/users/${
user.data.id
}`,
{ roles: 'org-level-creator' },
{
headers: {
'xc-auth': admin_response.data.token,
const user = await axios.get(
`http://localhost:${process.env.PORT || 8080}/api/v1/auth/user/me`,
{
headers: {
'xc-auth': response.data.token,
},
},
},
);
);
const response2 = await axios.patch(
`http://localhost:${process.env.PORT || 8080}/api/v1/users/${
user.data.id
}`,
{ roles: 'org-level-creator' },
{
headers: {
'xc-auth': admin_response.data.token,
},
},
);
console.log(response2.data);
console.log(response2.data);
}
}
}
});
});
}
})().catch((e) => console.log(e));

Loading…
Cancel
Save