From 5fb2c312341b9932c7721efbf352d2ebbbd1700f Mon Sep 17 00:00:00 2001 From: mertmit Date: Sat, 30 Sep 2023 17:01:46 +0000 Subject: [PATCH] fix: worker for all db types --- .../src/modules/jobs/jobs.controller.ts | 78 +++++++++++-------- packages/nocodb/src/run/cloud.ts | 12 ++- packages/nocodb/src/run/docker.ts | 10 ++- packages/nocodb/src/run/dockerEntry.ts | 10 ++- packages/nocodb/src/run/dockerRunMysql.ts | 10 ++- packages/nocodb/src/run/dockerRunPG.ts | 10 ++- .../nocodb/src/run/dockerRunPG_CyQuick.ts | 10 ++- packages/nocodb/src/run/local.ts | 12 ++- 8 files changed, 97 insertions(+), 55 deletions(-) diff --git a/packages/nocodb/src/modules/jobs/jobs.controller.ts b/packages/nocodb/src/modules/jobs/jobs.controller.ts index b2a2a15152..da92780a6d 100644 --- a/packages/nocodb/src/modules/jobs/jobs.controller.ts +++ b/packages/nocodb/src/modules/jobs/jobs.controller.ts @@ -10,24 +10,34 @@ import { } from '@nestjs/common'; import { OnEvent } from '@nestjs/event-emitter'; import { customAlphabet } from 'nanoid'; +import { ModuleRef } from '@nestjs/core'; import { JobsRedisService } from './redis/jobs-redis.service'; +import type { OnModuleInit } from '@nestjs/common'; import { JobStatus } from '~/interface/Jobs'; import { JobEvents } from '~/interface/Jobs'; import { GlobalGuard } from '~/guards/global/global.guard'; import NocoCache from '~/cache/NocoCache'; -import { CacheDelDirection, CacheGetType, CacheScope } from '~/utils/globals'; +import { CacheGetType, CacheScope } from '~/utils/globals'; const nanoidv2 = customAlphabet('1234567890abcdefghijklmnopqrstuvwxyz', 14); const POLLING_INTERVAL = 30000; @Controller() @UseGuards(GlobalGuard) -export class JobsController { +export class JobsController implements OnModuleInit { + jobsRedisService: JobsRedisService; + constructor( @Inject('JobsService') private readonly jobsService, - private readonly jobsRedisService: JobsRedisService, + private moduleRef: ModuleRef, ) {} + onModuleInit() { + if (process.env.NC_REDIS_JOB_URL) { + this.jobsRedisService = this.moduleRef.get(JobsRedisService); + } + } + private jobRooms = {}; private localJobs = {}; private closedJobs = []; @@ -88,32 +98,36 @@ export class JobsController { listeners: [res], }; // subscribe to job events - this.jobsRedisService.subscribe(jobId, (data) => { - if (this.jobRooms[jobId]) { - this.jobRooms[jobId].listeners.forEach((res) => { - if (!res.headersSent) { - res.send({ - status: 'refresh', - }); - } - }); - } - - const cmd = data.cmd; - delete data.cmd; - switch (cmd) { - case JobEvents.STATUS: - if ([JobStatus.COMPLETED, JobStatus.FAILED].includes(data.status)) { - this.jobsRedisService.unsubscribe(jobId); - delete this.jobRooms[jobId]; - this.closedJobs.push(jobId); - setTimeout(() => { - this.closedJobs = this.closedJobs.filter((j) => j !== jobId); - }, POLLING_INTERVAL * 1.5); - } - break; - } - }); + if (this.jobsRedisService) { + this.jobsRedisService.subscribe(jobId, (data) => { + if (this.jobRooms[jobId]) { + this.jobRooms[jobId].listeners.forEach((res) => { + if (!res.headersSent) { + res.send({ + status: 'refresh', + }); + } + }); + } + + const cmd = data.cmd; + delete data.cmd; + switch (cmd) { + case JobEvents.STATUS: + if ( + [JobStatus.COMPLETED, JobStatus.FAILED].includes(data.status) + ) { + this.jobsRedisService.unsubscribe(jobId); + delete this.jobRooms[jobId]; + this.closedJobs.push(jobId); + setTimeout(() => { + this.closedJobs = this.closedJobs.filter((j) => j !== jobId); + }, POLLING_INTERVAL * 1.5); + } + break; + } + }); + } } res.on('close', () => { @@ -203,7 +217,7 @@ export class JobsController { }); } - if (process.env.NC_WORKER_CONTAINER === 'true') { + if (process.env.NC_WORKER_CONTAINER === 'true' && this.jobsRedisService) { this.jobsRedisService.publish(jobId, { cmd: JobEvents.STATUS, ...data, @@ -219,7 +233,7 @@ export class JobsController { setTimeout(() => { delete this.jobRooms[jobId]; delete this.localJobs[jobId]; - NocoCache.deepDel(`jobs`, jobId, CacheDelDirection.CHILD_TO_PARENT); + NocoCache.del(`${CacheScope.JOBS}:${jobId}:messages`); }, POLLING_INTERVAL); } } @@ -268,7 +282,7 @@ export class JobsController { }); } - if (process.env.NC_WORKER_CONTAINER === 'true') { + if (process.env.NC_WORKER_CONTAINER === 'true' && this.jobsRedisService) { this.jobsRedisService.publish(jobId, { cmd: JobEvents.LOG, ...data, diff --git a/packages/nocodb/src/run/cloud.ts b/packages/nocodb/src/run/cloud.ts index 50883ded16..8f428139c4 100644 --- a/packages/nocodb/src/run/cloud.ts +++ b/packages/nocodb/src/run/cloud.ts @@ -18,8 +18,12 @@ server.use( server.set('view engine', 'ejs'); (async () => { - const httpServer = server.listen(process.env.PORT || 8080, () => { - console.log(`App started successfully.\nVisit -> ${Noco.dashboardUrl}`); - }); - server.use(await Noco.init({}, httpServer, server)); + if (process.env.NC_WORKER_CONTAINER === 'true') { + await Noco.init({}, null, null); + } else { + const httpServer = server.listen(process.env.PORT || 8080, async () => { + console.log(`App started successfully.\nVisit -> ${Noco.dashboardUrl}`); + server.use(await Noco.init({}, httpServer, server)); + }); + } })().catch((e) => console.log(e)); diff --git a/packages/nocodb/src/run/docker.ts b/packages/nocodb/src/run/docker.ts index b413d1efa8..f80a313eb2 100644 --- a/packages/nocodb/src/run/docker.ts +++ b/packages/nocodb/src/run/docker.ts @@ -28,7 +28,11 @@ process.env[`DEBUG`] = 'xc*'; // })().catch((e) => console.log(e)); (async () => { - const httpServer = server.listen(process.env.PORT || 8080, async () => { - server.use(await Noco.init({}, httpServer, server)); - }); + if (process.env.NC_WORKER_CONTAINER === 'true') { + await Noco.init({}, null, null); + } else { + const httpServer = server.listen(process.env.PORT || 8080, async () => { + server.use(await Noco.init({}, httpServer, server)); + }); + } })().catch((e) => console.log(e)); diff --git a/packages/nocodb/src/run/dockerEntry.ts b/packages/nocodb/src/run/dockerEntry.ts index c0f4cbaa0e..9ac361d24d 100644 --- a/packages/nocodb/src/run/dockerEntry.ts +++ b/packages/nocodb/src/run/dockerEntry.ts @@ -13,7 +13,11 @@ server.use(cors()); server.set('view engine', 'ejs'); (async () => { - const httpServer = server.listen(process.env.PORT || 8080, async () => { - server.use(await Noco.init({}, httpServer, server)); - }); + if (process.env.NC_WORKER_CONTAINER === 'true') { + await Noco.init({}, null, null); + } else { + const httpServer = server.listen(process.env.PORT || 8080, async () => { + server.use(await Noco.init({}, httpServer, server)); + }); + } })().catch((e) => console.log(e)); diff --git a/packages/nocodb/src/run/dockerRunMysql.ts b/packages/nocodb/src/run/dockerRunMysql.ts index 91ee249bf2..d1b8cd2954 100644 --- a/packages/nocodb/src/run/dockerRunMysql.ts +++ b/packages/nocodb/src/run/dockerRunMysql.ts @@ -31,7 +31,11 @@ process.env[`NC_DB`] = `mysql2://localhost:3306?u=root&p=password&d=${metaDb}`; // process.env[`DEBUG`] = 'xc*'; (async () => { - const httpServer = server.listen(process.env.PORT || 8080, async () => { - server.use(await Noco.init({}, httpServer, server)); - }); + if (process.env.NC_WORKER_CONTAINER === 'true') { + await Noco.init({}, null, null); + } else { + const httpServer = server.listen(process.env.PORT || 8080, async () => { + server.use(await Noco.init({}, httpServer, server)); + }); + } })().catch((e) => console.log(e)); diff --git a/packages/nocodb/src/run/dockerRunPG.ts b/packages/nocodb/src/run/dockerRunPG.ts index e18b7d66af..e47e73af9c 100644 --- a/packages/nocodb/src/run/dockerRunPG.ts +++ b/packages/nocodb/src/run/dockerRunPG.ts @@ -30,7 +30,11 @@ process.env[`NC_DB`] = `pg://localhost:5432?u=postgres&p=password&d=${metaDb}`; // process.env[`DEBUG`] = 'xc*'; (async () => { - const httpServer = server.listen(process.env.PORT || 8080, async () => { - server.use(await Noco.init({}, httpServer, server)); - }); + if (process.env.NC_WORKER_CONTAINER === 'true') { + await Noco.init({}, null, null); + } else { + const httpServer = server.listen(process.env.PORT || 8080, async () => { + server.use(await Noco.init({}, httpServer, server)); + }); + } })().catch((e) => console.log(e)); diff --git a/packages/nocodb/src/run/dockerRunPG_CyQuick.ts b/packages/nocodb/src/run/dockerRunPG_CyQuick.ts index f48965ce6f..f0585c8b2a 100644 --- a/packages/nocodb/src/run/dockerRunPG_CyQuick.ts +++ b/packages/nocodb/src/run/dockerRunPG_CyQuick.ts @@ -24,7 +24,11 @@ process.env[ //process.env[`DEBUG`] = 'xc*'; (async () => { - const httpServer = server.listen(process.env.PORT || 8080, async () => { - server.use(await Noco.init({}, httpServer, server)); - }); + if (process.env.NC_WORKER_CONTAINER === 'true') { + await Noco.init({}, null, null); + } else { + const httpServer = server.listen(process.env.PORT || 8080, async () => { + server.use(await Noco.init({}, httpServer, server)); + }); + } })().catch((e) => console.log(e)); diff --git a/packages/nocodb/src/run/local.ts b/packages/nocodb/src/run/local.ts index 36e7c53820..660c932d25 100644 --- a/packages/nocodb/src/run/local.ts +++ b/packages/nocodb/src/run/local.ts @@ -17,8 +17,12 @@ server.use( server.set('view engine', 'ejs'); (async () => { - const httpServer = server.listen(process.env.PORT || 8080, () => { - console.log(`App started successfully.\nVisit -> ${Noco.dashboardUrl}`); - }); - server.use(await Noco.init({}, httpServer, server)); + if (process.env.NC_WORKER_CONTAINER === 'true') { + await Noco.init({}, null, null); + } else { + const httpServer = server.listen(process.env.PORT || 8080, async () => { + console.log(`App started successfully.\nVisit -> ${Noco.dashboardUrl}`); + server.use(await Noco.init({}, httpServer, server)); + }); + } })().catch((e) => console.log(e));