Browse Source

fix: worker for all db types

pull/6528/head
mertmit 1 year ago
parent
commit
5fb2c31234
  1. 28
      packages/nocodb/src/modules/jobs/jobs.controller.ts
  2. 8
      packages/nocodb/src/run/cloud.ts
  3. 4
      packages/nocodb/src/run/docker.ts
  4. 4
      packages/nocodb/src/run/dockerEntry.ts
  5. 4
      packages/nocodb/src/run/dockerRunMysql.ts
  6. 4
      packages/nocodb/src/run/dockerRunPG.ts
  7. 4
      packages/nocodb/src/run/dockerRunPG_CyQuick.ts
  8. 8
      packages/nocodb/src/run/local.ts

28
packages/nocodb/src/modules/jobs/jobs.controller.ts

@ -10,24 +10,34 @@ import {
} from '@nestjs/common'; } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter'; import { OnEvent } from '@nestjs/event-emitter';
import { customAlphabet } from 'nanoid'; import { customAlphabet } from 'nanoid';
import { ModuleRef } from '@nestjs/core';
import { JobsRedisService } from './redis/jobs-redis.service'; import { JobsRedisService } from './redis/jobs-redis.service';
import type { OnModuleInit } from '@nestjs/common';
import { JobStatus } from '~/interface/Jobs'; import { JobStatus } from '~/interface/Jobs';
import { JobEvents } from '~/interface/Jobs'; import { JobEvents } from '~/interface/Jobs';
import { GlobalGuard } from '~/guards/global/global.guard'; import { GlobalGuard } from '~/guards/global/global.guard';
import NocoCache from '~/cache/NocoCache'; import NocoCache from '~/cache/NocoCache';
import { CacheDelDirection, CacheGetType, CacheScope } from '~/utils/globals'; import { CacheGetType, CacheScope } from '~/utils/globals';
const nanoidv2 = customAlphabet('1234567890abcdefghijklmnopqrstuvwxyz', 14); const nanoidv2 = customAlphabet('1234567890abcdefghijklmnopqrstuvwxyz', 14);
const POLLING_INTERVAL = 30000; const POLLING_INTERVAL = 30000;
@Controller() @Controller()
@UseGuards(GlobalGuard) @UseGuards(GlobalGuard)
export class JobsController { export class JobsController implements OnModuleInit {
jobsRedisService: JobsRedisService;
constructor( constructor(
@Inject('JobsService') private readonly jobsService, @Inject('JobsService') private readonly jobsService,
private readonly jobsRedisService: JobsRedisService, private moduleRef: ModuleRef,
) {} ) {}
onModuleInit() {
if (process.env.NC_REDIS_JOB_URL) {
this.jobsRedisService = this.moduleRef.get(JobsRedisService);
}
}
private jobRooms = {}; private jobRooms = {};
private localJobs = {}; private localJobs = {};
private closedJobs = []; private closedJobs = [];
@ -88,6 +98,7 @@ export class JobsController {
listeners: [res], listeners: [res],
}; };
// subscribe to job events // subscribe to job events
if (this.jobsRedisService) {
this.jobsRedisService.subscribe(jobId, (data) => { this.jobsRedisService.subscribe(jobId, (data) => {
if (this.jobRooms[jobId]) { if (this.jobRooms[jobId]) {
this.jobRooms[jobId].listeners.forEach((res) => { this.jobRooms[jobId].listeners.forEach((res) => {
@ -103,7 +114,9 @@ export class JobsController {
delete data.cmd; delete data.cmd;
switch (cmd) { switch (cmd) {
case JobEvents.STATUS: case JobEvents.STATUS:
if ([JobStatus.COMPLETED, JobStatus.FAILED].includes(data.status)) { if (
[JobStatus.COMPLETED, JobStatus.FAILED].includes(data.status)
) {
this.jobsRedisService.unsubscribe(jobId); this.jobsRedisService.unsubscribe(jobId);
delete this.jobRooms[jobId]; delete this.jobRooms[jobId];
this.closedJobs.push(jobId); this.closedJobs.push(jobId);
@ -115,6 +128,7 @@ export class JobsController {
} }
}); });
} }
}
res.on('close', () => { res.on('close', () => {
if (jobId && this.jobRooms[jobId]?.listeners) { if (jobId && this.jobRooms[jobId]?.listeners) {
@ -203,7 +217,7 @@ export class JobsController {
}); });
} }
if (process.env.NC_WORKER_CONTAINER === 'true') { if (process.env.NC_WORKER_CONTAINER === 'true' && this.jobsRedisService) {
this.jobsRedisService.publish(jobId, { this.jobsRedisService.publish(jobId, {
cmd: JobEvents.STATUS, cmd: JobEvents.STATUS,
...data, ...data,
@ -219,7 +233,7 @@ export class JobsController {
setTimeout(() => { setTimeout(() => {
delete this.jobRooms[jobId]; delete this.jobRooms[jobId];
delete this.localJobs[jobId]; delete this.localJobs[jobId];
NocoCache.deepDel(`jobs`, jobId, CacheDelDirection.CHILD_TO_PARENT); NocoCache.del(`${CacheScope.JOBS}:${jobId}:messages`);
}, POLLING_INTERVAL); }, POLLING_INTERVAL);
} }
} }
@ -268,7 +282,7 @@ export class JobsController {
}); });
} }
if (process.env.NC_WORKER_CONTAINER === 'true') { if (process.env.NC_WORKER_CONTAINER === 'true' && this.jobsRedisService) {
this.jobsRedisService.publish(jobId, { this.jobsRedisService.publish(jobId, {
cmd: JobEvents.LOG, cmd: JobEvents.LOG,
...data, ...data,

8
packages/nocodb/src/run/cloud.ts

@ -18,8 +18,12 @@ server.use(
server.set('view engine', 'ejs'); server.set('view engine', 'ejs');
(async () => { (async () => {
const httpServer = server.listen(process.env.PORT || 8080, () => { if (process.env.NC_WORKER_CONTAINER === 'true') {
await Noco.init({}, null, null);
} else {
const httpServer = server.listen(process.env.PORT || 8080, async () => {
console.log(`App started successfully.\nVisit -> ${Noco.dashboardUrl}`); console.log(`App started successfully.\nVisit -> ${Noco.dashboardUrl}`);
});
server.use(await Noco.init({}, httpServer, server)); server.use(await Noco.init({}, httpServer, server));
});
}
})().catch((e) => console.log(e)); })().catch((e) => console.log(e));

4
packages/nocodb/src/run/docker.ts

@ -28,7 +28,11 @@ process.env[`DEBUG`] = 'xc*';
// })().catch((e) => console.log(e)); // })().catch((e) => console.log(e));
(async () => { (async () => {
if (process.env.NC_WORKER_CONTAINER === 'true') {
await Noco.init({}, null, null);
} else {
const httpServer = server.listen(process.env.PORT || 8080, async () => { const httpServer = server.listen(process.env.PORT || 8080, async () => {
server.use(await Noco.init({}, httpServer, server)); server.use(await Noco.init({}, httpServer, server));
}); });
}
})().catch((e) => console.log(e)); })().catch((e) => console.log(e));

4
packages/nocodb/src/run/dockerEntry.ts

@ -13,7 +13,11 @@ server.use(cors());
server.set('view engine', 'ejs'); server.set('view engine', 'ejs');
(async () => { (async () => {
if (process.env.NC_WORKER_CONTAINER === 'true') {
await Noco.init({}, null, null);
} else {
const httpServer = server.listen(process.env.PORT || 8080, async () => { const httpServer = server.listen(process.env.PORT || 8080, async () => {
server.use(await Noco.init({}, httpServer, server)); server.use(await Noco.init({}, httpServer, server));
}); });
}
})().catch((e) => console.log(e)); })().catch((e) => console.log(e));

4
packages/nocodb/src/run/dockerRunMysql.ts

@ -31,7 +31,11 @@ process.env[`NC_DB`] = `mysql2://localhost:3306?u=root&p=password&d=${metaDb}`;
// process.env[`DEBUG`] = 'xc*'; // process.env[`DEBUG`] = 'xc*';
(async () => { (async () => {
if (process.env.NC_WORKER_CONTAINER === 'true') {
await Noco.init({}, null, null);
} else {
const httpServer = server.listen(process.env.PORT || 8080, async () => { const httpServer = server.listen(process.env.PORT || 8080, async () => {
server.use(await Noco.init({}, httpServer, server)); server.use(await Noco.init({}, httpServer, server));
}); });
}
})().catch((e) => console.log(e)); })().catch((e) => console.log(e));

4
packages/nocodb/src/run/dockerRunPG.ts

@ -30,7 +30,11 @@ process.env[`NC_DB`] = `pg://localhost:5432?u=postgres&p=password&d=${metaDb}`;
// process.env[`DEBUG`] = 'xc*'; // process.env[`DEBUG`] = 'xc*';
(async () => { (async () => {
if (process.env.NC_WORKER_CONTAINER === 'true') {
await Noco.init({}, null, null);
} else {
const httpServer = server.listen(process.env.PORT || 8080, async () => { const httpServer = server.listen(process.env.PORT || 8080, async () => {
server.use(await Noco.init({}, httpServer, server)); server.use(await Noco.init({}, httpServer, server));
}); });
}
})().catch((e) => console.log(e)); })().catch((e) => console.log(e));

4
packages/nocodb/src/run/dockerRunPG_CyQuick.ts

@ -24,7 +24,11 @@ process.env[
//process.env[`DEBUG`] = 'xc*'; //process.env[`DEBUG`] = 'xc*';
(async () => { (async () => {
if (process.env.NC_WORKER_CONTAINER === 'true') {
await Noco.init({}, null, null);
} else {
const httpServer = server.listen(process.env.PORT || 8080, async () => { const httpServer = server.listen(process.env.PORT || 8080, async () => {
server.use(await Noco.init({}, httpServer, server)); server.use(await Noco.init({}, httpServer, server));
}); });
}
})().catch((e) => console.log(e)); })().catch((e) => console.log(e));

8
packages/nocodb/src/run/local.ts

@ -17,8 +17,12 @@ server.use(
server.set('view engine', 'ejs'); server.set('view engine', 'ejs');
(async () => { (async () => {
const httpServer = server.listen(process.env.PORT || 8080, () => { if (process.env.NC_WORKER_CONTAINER === 'true') {
await Noco.init({}, null, null);
} else {
const httpServer = server.listen(process.env.PORT || 8080, async () => {
console.log(`App started successfully.\nVisit -> ${Noco.dashboardUrl}`); console.log(`App started successfully.\nVisit -> ${Noco.dashboardUrl}`);
});
server.use(await Noco.init({}, httpServer, server)); server.use(await Noco.init({}, httpServer, server));
});
}
})().catch((e) => console.log(e)); })().catch((e) => console.log(e));

Loading…
Cancel
Save