Browse Source

feat: worker controller

pull/6899/head
mertmit 11 months ago
parent
commit
d9c721ed97
  1. 38
      packages/nocodb/src/Noco.ts
  2. 31
      packages/nocodb/src/modules/jobs/jobs.controller.ts
  3. 2
      packages/nocodb/src/modules/jobs/jobs.module.ts
  4. 2
      packages/nocodb/src/modules/jobs/redis/jobs.service.ts
  5. 21
      packages/nocodb/src/modules/jobs/worker/worker.controller.spec.ts
  6. 90
      packages/nocodb/src/modules/jobs/worker/worker.controller.ts
  7. 3
      packages/nocodb/src/modules/metas/metas.module.ts
  8. 12
      packages/nocodb/src/run/cloud.ts
  9. 10
      packages/nocodb/src/run/docker.ts
  10. 10
      packages/nocodb/src/run/dockerEntry.ts
  11. 10
      packages/nocodb/src/run/dockerRunMysql.ts
  12. 10
      packages/nocodb/src/run/dockerRunPG.ts
  13. 10
      packages/nocodb/src/run/dockerRunPG_CyQuick.ts
  14. 12
      packages/nocodb/src/run/local.ts
  15. 4
      packages/nocodb/src/run/testDocker.ts

38
packages/nocodb/src/Noco.ts

@ -107,35 +107,33 @@ export default class Noco {
throw new Error('NC_REDIS_URL is required');
}
process.env.NC_DISABLE_TELE = 'true';
}
nestApp.init();
} else {
nestApp.useWebSocketAdapter(new IoAdapter(httpServer));
this._httpServer = nestApp.getHttpAdapter().getInstance();
this._server = server;
nestApp.useWebSocketAdapter(new IoAdapter(httpServer));
nestApp.use(requestIp.mw());
nestApp.use(cookieParser());
this._httpServer = nestApp.getHttpAdapter().getInstance();
this._server = server;
nestApp.useWebSocketAdapter(new IoAdapter(httpServer));
nestApp.use(requestIp.mw());
nestApp.use(cookieParser());
nestApp.use(
express.json({ limit: process.env.NC_REQUEST_BODY_SIZE || '50mb' }),
);
nestApp.useWebSocketAdapter(new IoAdapter(httpServer));
await nestApp.init();
nestApp.use(
express.json({ limit: process.env.NC_REQUEST_BODY_SIZE || '50mb' }),
);
const dashboardPath = process.env.NC_DASHBOARD_URL ?? '/dashboard';
server.use(NcToolGui.expressMiddleware(dashboardPath));
server.use(express.static(path.join(__dirname, 'public')));
await nestApp.init();
if (dashboardPath !== '/' && dashboardPath !== '') {
server.get('/', (_req, res) => res.redirect(dashboardPath));
}
const dashboardPath = process.env.NC_DASHBOARD_URL ?? '/dashboard';
server.use(NcToolGui.expressMiddleware(dashboardPath));
server.use(express.static(path.join(__dirname, 'public')));
return nestApp.getHttpAdapter().getInstance();
if (dashboardPath !== '/' && dashboardPath !== '') {
server.get('/', (_req, res) => res.redirect(dashboardPath));
}
return nestApp.getHttpAdapter().getInstance();
}
public static get httpServer(): http.Server {

31
packages/nocodb/src/modules/jobs/jobs.controller.ts

@ -11,10 +11,9 @@ import {
import { OnEvent } from '@nestjs/event-emitter';
import { customAlphabet } from 'nanoid';
import { ModuleRef } from '@nestjs/core';
import { AuthGuard } from '@nestjs/passport';
import { JobsRedisService } from './redis/jobs-redis.service';
import type { OnModuleInit } from '@nestjs/common';
import { InstanceTypes, JobStatus, WorkerCommands } from '~/interface/Jobs';
import { JobStatus } from '~/interface/Jobs';
import { JobEvents } from '~/interface/Jobs';
import { GlobalGuard } from '~/guards/global/global.guard';
import NocoCache from '~/cache/NocoCache';
@ -175,34 +174,6 @@ export class JobsController implements OnModuleInit {
return res;
}
// reference: https://github.com/OptimalBits/bull/blob/develop/REFERENCE.md#queueresume
@Post('/internal/workers/resume')
@UseGuards(MetaApiLimiterGuard, AuthGuard('basic'))
async resumeWorkers(@Body() body: { global?: boolean }) {
if (body.global === true) {
await this.jobsService.resumeQueue();
} else {
await this.jobsRedisService.publish(
InstanceTypes.WORKER,
WorkerCommands.RESUME_LOCAL,
);
}
}
// reference: https://github.com/OptimalBits/bull/blob/develop/REFERENCE.md#queuepause
@Post('/internal/workers/pause')
@UseGuards(MetaApiLimiterGuard, AuthGuard('basic'))
async pauseWorkers(@Body() body: { global?: boolean }) {
if (body.global === true) {
await this.jobsService.pauseQueue();
} else {
await this.jobsRedisService.publish(
InstanceTypes.WORKER,
WorkerCommands.PAUSE_LOCAL,
);
}
}
@OnEvent(JobEvents.STATUS)
sendJobStatus(data: { id: string; status: JobStatus; data?: any }): void {
let response;

2
packages/nocodb/src/modules/jobs/jobs.module.ts

@ -31,6 +31,7 @@ import { JOBS_QUEUE } from '~/interface/Jobs';
import { MetasModule } from '~/modules/metas/metas.module';
import { DatasModule } from '~/modules/datas/datas.module';
import { GlobalModule } from '~/modules/global/global.module';
import { WorkerController } from '~/modules/jobs/worker/worker.controller';
export const JobsModuleMetadata = {
imports: [
@ -50,6 +51,7 @@ export const JobsModuleMetadata = {
],
controllers: [
JobsController,
WorkerController,
...(process.env.NC_WORKER_CONTAINER !== 'true'
? [
DuplicateController,

2
packages/nocodb/src/modules/jobs/redis/jobs.service.ts

@ -10,7 +10,7 @@ export class JobsService implements OnModuleInit {
private logger = new Logger(JobsService.name);
constructor(
@InjectQueue(JOBS_QUEUE) protected readonly jobsQueue: Queue,
@InjectQueue(JOBS_QUEUE) public readonly jobsQueue: Queue,
protected readonly jobsRedisService: JobsRedisService,
) {}

21
packages/nocodb/src/modules/jobs/worker/worker.controller.spec.ts

@ -0,0 +1,21 @@
import { Test } from '@nestjs/testing';
import { UtilsService } from '../../../services/utils.service';
import { WorkerController } from './worker.controller';
import type { TestingModule } from '@nestjs/testing';
describe('WorkerController', () => {
let controller: WorkerController;
beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({
controllers: [WorkerController],
providers: [UtilsService],
}).compile();
controller = module.get<WorkerController>(WorkerController);
});
it('should be defined', () => {
expect(controller).toBeDefined();
});
});

90
packages/nocodb/src/modules/jobs/worker/worker.controller.ts

@ -0,0 +1,90 @@
import { Body, Controller, Get, Inject, Post, UseGuards } from '@nestjs/common';
import { AuthGuard } from '@nestjs/passport';
import { ModuleRef } from '@nestjs/core';
import type { OnModuleInit } from '@nestjs/common';
import type { Queue } from 'bull';
import { JobsRedisService } from '~/modules/jobs/redis/jobs-redis.service';
import { UtilsService } from '~/services/utils.service';
import { InstanceTypes, WorkerCommands } from '~/interface/Jobs';
import { MetaApiLimiterGuard } from '~/guards/meta-api-limiter.guard';
@Controller()
export class WorkerController implements OnModuleInit {
jobsRedisService: JobsRedisService;
constructor(
protected readonly utilsService: UtilsService,
@Inject('JobsService') private readonly jobsService,
private moduleRef: ModuleRef,
) {}
onModuleInit() {
if (process.env.NC_REDIS_JOB_URL) {
this.jobsRedisService = this.moduleRef.get(JobsRedisService);
}
}
@Get('/api/v1/health')
async appHealth() {
return await this.utilsService.appHealth();
}
// reference: https://github.com/OptimalBits/bull/blob/develop/REFERENCE.md#queueresume
@Post('/internal/workers/resume')
@UseGuards(MetaApiLimiterGuard, AuthGuard('basic'))
async resumeWorkers(@Body() body: { global?: boolean }) {
if (body.global === true) {
await this.jobsService.resumeQueue();
} else {
await this.jobsRedisService.publish(
InstanceTypes.WORKER,
WorkerCommands.RESUME_LOCAL,
);
}
}
// reference: https://github.com/OptimalBits/bull/blob/develop/REFERENCE.md#queuepause
@Post('/internal/workers/pause')
@UseGuards(MetaApiLimiterGuard, AuthGuard('basic'))
async pauseWorkers(@Body() body: { global?: boolean }) {
if (body.global === true) {
await this.jobsService.pauseQueue();
} else {
await this.jobsRedisService.publish(
InstanceTypes.WORKER,
WorkerCommands.PAUSE_LOCAL,
);
}
}
@Get('/internal/workers/status')
@UseGuards(MetaApiLimiterGuard, AuthGuard('basic'))
async workerStatus() {
const queue = this.jobsService.jobsQueue as Queue;
const status = (await queue.isPaused(true)) ? 'paused' : 'active';
const hasRunningJobs = await new Promise((resolve) => {
queue.whenCurrentJobsFinished().then(() => {
resolve(false);
});
setTimeout(() => {
resolve(true);
}, 2000);
});
return {
status,
hasRunningJobs,
};
}
@Get('/internal/workers/metrics')
@UseGuards(MetaApiLimiterGuard, AuthGuard('basic'))
async workerMetrics() {
const queue = this.jobsService.jobsQueue as Queue;
return {
queueStatus: (await queue.isPaused()) ? 'paused' : 'active',
jobCounts: await queue.getJobCounts(),
timestamp: Date.now(),
};
}
}

3
packages/nocodb/src/modules/metas/metas.module.ts

@ -123,7 +123,7 @@ export const metaModuleMetadata = {
SharedBasesController,
NotificationsController,
]
: [UtilsController]),
: []),
],
providers: [
/** Services */
@ -184,6 +184,7 @@ export const metaModuleMetadata = {
MetaDiffsService,
BasesService,
SourcesService,
UtilsService,
],
};

12
packages/nocodb/src/run/cloud.ts

@ -18,12 +18,8 @@ server.use(
server.set('view engine', 'ejs');
(async () => {
if (process.env.NC_WORKER_CONTAINER === 'true') {
await Noco.init({}, null, null);
} else {
const httpServer = server.listen(process.env.PORT || 8080, async () => {
console.log(`App started successfully.\nVisit -> ${Noco.dashboardUrl}`);
server.use(await Noco.init({}, httpServer, server));
});
}
const httpServer = server.listen(process.env.PORT || 8080, async () => {
console.log(`App started successfully.\nVisit -> ${Noco.dashboardUrl}`);
server.use(await Noco.init({}, httpServer, server));
});
})().catch((e) => console.log(e));

10
packages/nocodb/src/run/docker.ts

@ -28,11 +28,7 @@ process.env[`DEBUG`] = 'xc*';
// })().catch((e) => console.log(e));
(async () => {
if (process.env.NC_WORKER_CONTAINER === 'true') {
await Noco.init({}, null, null);
} else {
const httpServer = server.listen(process.env.PORT || 8080, async () => {
server.use(await Noco.init({}, httpServer, server));
});
}
const httpServer = server.listen(process.env.PORT || 8080, async () => {
server.use(await Noco.init({}, httpServer, server));
});
})().catch((e) => console.log(e));

10
packages/nocodb/src/run/dockerEntry.ts

@ -13,11 +13,7 @@ server.use(cors());
server.set('view engine', 'ejs');
(async () => {
if (process.env.NC_WORKER_CONTAINER === 'true') {
await Noco.init({}, null, null);
} else {
const httpServer = server.listen(process.env.PORT || 8080, async () => {
server.use(await Noco.init({}, httpServer, server));
});
}
const httpServer = server.listen(process.env.PORT || 8080, async () => {
server.use(await Noco.init({}, httpServer, server));
});
})().catch((e) => console.log(e));

10
packages/nocodb/src/run/dockerRunMysql.ts

@ -31,11 +31,7 @@ process.env[`NC_DB`] = `mysql2://localhost:3306?u=root&p=password&d=${metaDb}`;
// process.env[`DEBUG`] = 'xc*';
(async () => {
if (process.env.NC_WORKER_CONTAINER === 'true') {
await Noco.init({}, null, null);
} else {
const httpServer = server.listen(process.env.PORT || 8080, async () => {
server.use(await Noco.init({}, httpServer, server));
});
}
const httpServer = server.listen(process.env.PORT || 8080, async () => {
server.use(await Noco.init({}, httpServer, server));
});
})().catch((e) => console.log(e));

10
packages/nocodb/src/run/dockerRunPG.ts

@ -30,11 +30,7 @@ process.env[`NC_DB`] = `pg://localhost:5432?u=postgres&p=password&d=${metaDb}`;
// process.env[`DEBUG`] = 'xc*';
(async () => {
if (process.env.NC_WORKER_CONTAINER === 'true') {
await Noco.init({}, null, null);
} else {
const httpServer = server.listen(process.env.PORT || 8080, async () => {
server.use(await Noco.init({}, httpServer, server));
});
}
const httpServer = server.listen(process.env.PORT || 8080, async () => {
server.use(await Noco.init({}, httpServer, server));
});
})().catch((e) => console.log(e));

10
packages/nocodb/src/run/dockerRunPG_CyQuick.ts

@ -24,11 +24,7 @@ process.env[
//process.env[`DEBUG`] = 'xc*';
(async () => {
if (process.env.NC_WORKER_CONTAINER === 'true') {
await Noco.init({}, null, null);
} else {
const httpServer = server.listen(process.env.PORT || 8080, async () => {
server.use(await Noco.init({}, httpServer, server));
});
}
const httpServer = server.listen(process.env.PORT || 8080, async () => {
server.use(await Noco.init({}, httpServer, server));
});
})().catch((e) => console.log(e));

12
packages/nocodb/src/run/local.ts

@ -17,12 +17,8 @@ server.use(
server.set('view engine', 'ejs');
(async () => {
if (process.env.NC_WORKER_CONTAINER === 'true') {
await Noco.init({}, null, null);
} else {
const httpServer = server.listen(process.env.PORT || 8080, async () => {
console.log(`App started successfully.\nVisit -> ${Noco.dashboardUrl}`);
server.use(await Noco.init({}, httpServer, server));
});
}
const httpServer = server.listen(process.env.PORT || 8080, async () => {
console.log(`App started successfully.\nVisit -> ${Noco.dashboardUrl}`);
server.use(await Noco.init({}, httpServer, server));
});
})().catch((e) => console.log(e));

4
packages/nocodb/src/run/testDocker.ts

@ -27,7 +27,9 @@ process.env[`NC_ALLOW_LOCAL_HOOKS`] = 'true';
(async () => {
if (process.env.NC_WORKER_CONTAINER === 'true') {
await await Noco.init({}, null, null);
const httpServer = server.listen(process.env.PORT || 8080, async () => {
server.use(await Noco.init({}, httpServer, server));
});
} else {
const httpServer = server.listen(process.env.PORT || 8080, async () => {
server.use(await Noco.init({}, httpServer, server));

Loading…
Cancel
Save