Browse Source

feat: use emit to avoid circular dependency

Signed-off-by: mertmit <mertmit99@gmail.com>
feat/export-nest
mertmit 2 years ago
parent
commit
ab5236075d
  1. 32
      packages/nocodb-nest/package-lock.json
  2. 1
      packages/nocodb-nest/package.json
  3. 2
      packages/nocodb-nest/src/app.module.ts
  4. 6
      packages/nocodb-nest/src/modules/jobs/export-import/duplicate.controller.ts
  5. 65
      packages/nocodb-nest/src/modules/jobs/export-import/duplicate.processor.ts
  6. 21
      packages/nocodb-nest/src/modules/jobs/fallback-queue.service.ts
  7. 60
      packages/nocodb-nest/src/modules/jobs/jobs-event.service.ts
  8. 55
      packages/nocodb-nest/src/modules/jobs/jobs.gateway.ts
  9. 2
      packages/nocodb-nest/src/modules/jobs/jobs.module.ts
  10. 24
      packages/nocodb-nest/src/modules/jobs/jobs.service.ts

32
packages/nocodb-nest/package-lock.json generated

@ -14,6 +14,7 @@
"@nestjs/bull": "^0.6.3", "@nestjs/bull": "^0.6.3",
"@nestjs/common": "^9.4.0", "@nestjs/common": "^9.4.0",
"@nestjs/core": "^9.4.0", "@nestjs/core": "^9.4.0",
"@nestjs/event-emitter": "^1.4.1",
"@nestjs/jwt": "^10.0.3", "@nestjs/jwt": "^10.0.3",
"@nestjs/mapped-types": "*", "@nestjs/mapped-types": "*",
"@nestjs/passport": "^9.0.3", "@nestjs/passport": "^9.0.3",
@ -2532,6 +2533,19 @@
} }
} }
}, },
"node_modules/@nestjs/event-emitter": {
"version": "1.4.1",
"resolved": "https://registry.npmjs.org/@nestjs/event-emitter/-/event-emitter-1.4.1.tgz",
"integrity": "sha512-PmLpzMYgEKJNxOUrRjb6kNSm2PC6J+BeLTuF/bkYViGM/mVGvYOgU5jq8DQnXmiSmDmyWN+tO2cHSnR7odJJRA==",
"dependencies": {
"eventemitter2": "6.4.9"
},
"peerDependencies": {
"@nestjs/common": "^7.0.0 || ^8.0.0 || ^9.0.0",
"@nestjs/core": "^7.0.0 || ^8.0.0 || ^9.0.0",
"reflect-metadata": "^0.1.12"
}
},
"node_modules/@nestjs/jwt": { "node_modules/@nestjs/jwt": {
"version": "10.0.3", "version": "10.0.3",
"resolved": "https://registry.npmjs.org/@nestjs/jwt/-/jwt-10.0.3.tgz", "resolved": "https://registry.npmjs.org/@nestjs/jwt/-/jwt-10.0.3.tgz",
@ -8161,6 +8175,11 @@
"node": ">=6" "node": ">=6"
} }
}, },
"node_modules/eventemitter2": {
"version": "6.4.9",
"resolved": "https://registry.npmjs.org/eventemitter2/-/eventemitter2-6.4.9.tgz",
"integrity": "sha512-JEPTiaOt9f04oa6NOkc4aH+nVp5I3wEjpHbIPqfgCdD5v5bUzy7xQqwcVO2aDQgOWhI28da57HksMrzK9HlRxg=="
},
"node_modules/eventemitter3": { "node_modules/eventemitter3": {
"version": "4.0.7", "version": "4.0.7",
"resolved": "https://registry.npmjs.org/eventemitter3/-/eventemitter3-4.0.7.tgz", "resolved": "https://registry.npmjs.org/eventemitter3/-/eventemitter3-4.0.7.tgz",
@ -20195,6 +20214,14 @@
"uid": "2.0.2" "uid": "2.0.2"
} }
}, },
"@nestjs/event-emitter": {
"version": "1.4.1",
"resolved": "https://registry.npmjs.org/@nestjs/event-emitter/-/event-emitter-1.4.1.tgz",
"integrity": "sha512-PmLpzMYgEKJNxOUrRjb6kNSm2PC6J+BeLTuF/bkYViGM/mVGvYOgU5jq8DQnXmiSmDmyWN+tO2cHSnR7odJJRA==",
"requires": {
"eventemitter2": "6.4.9"
}
},
"@nestjs/jwt": { "@nestjs/jwt": {
"version": "10.0.3", "version": "10.0.3",
"resolved": "https://registry.npmjs.org/@nestjs/jwt/-/jwt-10.0.3.tgz", "resolved": "https://registry.npmjs.org/@nestjs/jwt/-/jwt-10.0.3.tgz",
@ -24580,6 +24607,11 @@
"resolved": "https://registry.npmjs.org/event-target-shim/-/event-target-shim-5.0.1.tgz", "resolved": "https://registry.npmjs.org/event-target-shim/-/event-target-shim-5.0.1.tgz",
"integrity": "sha512-i/2XbnSz/uxRCU6+NdVJgKWDTM427+MqYbkQzD321DuCQJUqOuJKIA0IM2+W2xtYHdKOmZ4dR6fExsd4SXL+WQ==" "integrity": "sha512-i/2XbnSz/uxRCU6+NdVJgKWDTM427+MqYbkQzD321DuCQJUqOuJKIA0IM2+W2xtYHdKOmZ4dR6fExsd4SXL+WQ=="
}, },
"eventemitter2": {
"version": "6.4.9",
"resolved": "https://registry.npmjs.org/eventemitter2/-/eventemitter2-6.4.9.tgz",
"integrity": "sha512-JEPTiaOt9f04oa6NOkc4aH+nVp5I3wEjpHbIPqfgCdD5v5bUzy7xQqwcVO2aDQgOWhI28da57HksMrzK9HlRxg=="
},
"eventemitter3": { "eventemitter3": {
"version": "4.0.7", "version": "4.0.7",
"resolved": "https://registry.npmjs.org/eventemitter3/-/eventemitter3-4.0.7.tgz", "resolved": "https://registry.npmjs.org/eventemitter3/-/eventemitter3-4.0.7.tgz",

1
packages/nocodb-nest/package.json

@ -44,6 +44,7 @@
"@nestjs/bull": "^0.6.3", "@nestjs/bull": "^0.6.3",
"@nestjs/common": "^9.4.0", "@nestjs/common": "^9.4.0",
"@nestjs/core": "^9.4.0", "@nestjs/core": "^9.4.0",
"@nestjs/event-emitter": "^1.4.1",
"@nestjs/jwt": "^10.0.3", "@nestjs/jwt": "^10.0.3",
"@nestjs/mapped-types": "*", "@nestjs/mapped-types": "*",
"@nestjs/passport": "^9.0.3", "@nestjs/passport": "^9.0.3",

2
packages/nocodb-nest/src/app.module.ts

@ -1,6 +1,7 @@
import { Module, RequestMethod } from '@nestjs/common'; import { Module, RequestMethod } from '@nestjs/common';
import { APP_FILTER } from '@nestjs/core'; import { APP_FILTER } from '@nestjs/core';
import { BullModule } from '@nestjs/bull'; import { BullModule } from '@nestjs/bull';
import { EventEmitterModule } from '@nestjs/event-emitter';
import { Connection } from './connection/connection'; import { Connection } from './connection/connection';
import { GlobalExceptionFilter } from './filters/global-exception/global-exception.filter'; import { GlobalExceptionFilter } from './filters/global-exception/global-exception.filter';
import NcPluginMgrv2 from './helpers/NcPluginMgrv2'; import NcPluginMgrv2 from './helpers/NcPluginMgrv2';
@ -35,6 +36,7 @@ import type {
MetasModule, MetasModule,
DatasModule, DatasModule,
JobsModule, JobsModule,
EventEmitterModule.forRoot(),
...(process.env['NC_REDIS_URL'] ...(process.env['NC_REDIS_URL']
? [ ? [
BullModule.forRoot({ BullModule.forRoot({

6
packages/nocodb-nest/src/modules/jobs/export-import/duplicate.controller.ts

@ -68,9 +68,9 @@ export class DuplicateController {
}); });
const job = await this.activeQueue.add('duplicate', { const job = await this.activeQueue.add('duplicate', {
project, projectId: project.id,
base, baseId: base.id,
dupProject, dupProjectId: dupProject.id,
req: { req: {
user: req.user, user: req.user,
clientIp: req.clientIp, clientIp: req.clientIp,

65
packages/nocodb-nest/src/modules/jobs/export-import/duplicate.processor.ts

@ -1,21 +1,12 @@
import { Readable } from 'stream'; import { Readable } from 'stream';
import { import { Process, Processor } from '@nestjs/bull';
OnQueueActive, import { Base, Column, Model, Project } from 'src/models';
OnQueueCompleted,
OnQueueFailed,
Process,
Processor,
} from '@nestjs/bull';
import { Column, Model } from 'src/models';
import { Job } from 'bull'; import { Job } from 'bull';
import { ProjectsService } from 'src/services/projects.service'; import { ProjectsService } from 'src/services/projects.service';
import boxen from 'boxen';
import papaparse from 'papaparse'; import papaparse from 'papaparse';
import { findWithIdentifier } from 'src/helpers/exportImportHelpers'; import { findWithIdentifier } from 'src/helpers/exportImportHelpers';
import { BulkDataAliasService } from 'src/services/bulk-data-alias.service'; import { BulkDataAliasService } from 'src/services/bulk-data-alias.service';
import { UITypes } from 'nocodb-sdk'; import { UITypes } from 'nocodb-sdk';
import { forwardRef, Inject } from '@nestjs/common';
import { JobsGateway } from '../jobs.gateway';
import { ExportService } from './export.service'; import { ExportService } from './export.service';
import { ImportService } from './import.service'; import { ImportService } from './import.service';
import type { LinkToAnotherRecordColumn } from 'src/models'; import type { LinkToAnotherRecordColumn } from 'src/models';
@ -29,53 +20,21 @@ export class DuplicateProcessor {
private readonly importService: ImportService, private readonly importService: ImportService,
private readonly projectsService: ProjectsService, private readonly projectsService: ProjectsService,
private readonly bulkDataService: BulkDataAliasService, private readonly bulkDataService: BulkDataAliasService,
@Inject(forwardRef(() => JobsGateway))
private readonly jobsGateway: JobsGateway,
) {} ) {}
@OnQueueActive()
onActive(job: Job) {
this.jobsGateway.jobStatus({
name: job.name,
id: job.id.toString(),
status: 'active',
});
}
@OnQueueFailed()
onFailed(job: Job, error: Error) {
console.error(
boxen(
`---- !! JOB FAILED !! ----\nname: ${job.name}\nid:${job.id}\nerror:${error.name} (${error.message})\n\nstack: ${error.stack}`,
{
padding: 1,
borderStyle: 'double',
borderColor: 'yellow',
},
),
);
this.jobsGateway.jobStatus({
name: job.name,
id: job.id.toString(),
status: 'failed',
});
}
@OnQueueCompleted()
onCompleted(job: Job) {
this.jobsGateway.jobStatus({
name: job.name,
id: job.id.toString(),
status: 'completed',
});
}
@Process('duplicate') @Process('duplicate')
async duplicateBase(job: Job) { async duplicateBase(job: Job) {
const { project, base, dupProject, req } = job.data; const { projectId, baseId, dupProjectId, req } = job.data;
const project = await Project.get(projectId);
const dupProject = await Project.get(dupProjectId);
const base = await Base.get(baseId);
try { try {
if (!project || !dupProject || !base) {
throw new Error(`Project or base not found!`);
}
let start = process.hrtime(); let start = process.hrtime();
const debugLog = function (...args: any[]) { const debugLog = function (...args: any[]) {
@ -107,6 +66,8 @@ export class DuplicateProcessor {
throw new Error(`Export failed for base '${base.id}'`); throw new Error(`Export failed for base '${base.id}'`);
} }
await dupProject.getBases();
const dupBaseId = dupProject.bases[0].id; const dupBaseId = dupProject.bases[0].id;
elapsedTime('projectCreate'); elapsedTime('projectCreate');

21
packages/nocodb-nest/src/modules/jobs/fallback-queue.service.ts

@ -2,6 +2,7 @@ import { Injectable } from '@nestjs/common';
import PQueue from 'p-queue'; import PQueue from 'p-queue';
import Emittery from 'emittery'; import Emittery from 'emittery';
import { DuplicateProcessor } from './export-import/duplicate.processor'; import { DuplicateProcessor } from './export-import/duplicate.processor';
import { JobsEventService } from './jobs-event.service';
interface Job { interface Job {
id: string; id: string;
@ -18,22 +19,23 @@ export class QueueService {
static queueMemory: Job[] = []; static queueMemory: Job[] = [];
static emitter = new Emittery(); static emitter = new Emittery();
constructor(private readonly duplicateProcessor: DuplicateProcessor) { constructor(
private readonly jobsEventService: JobsEventService,
private readonly duplicateProcessor: DuplicateProcessor,
) {
this.emitter.on('active', (data: any) => { this.emitter.on('active', (data: any) => {
const job = this.queueMemory.find( const job = this.queueMemory.find(
(job) => job.id === data.id && job.name === data.name, (job) => job.id === data.id && job.name === data.name,
); );
job.status = 'active'; job.status = 'active';
this.duplicateProcessor.onActive.apply(this.duplicateProcessor, [ this.jobsEventService.onActive.apply(this.jobsEventService, [job as any]);
job as any,
]);
}); });
this.emitter.on('completed', (data: any) => { this.emitter.on('completed', (data: any) => {
const job = this.queueMemory.find( const job = this.queueMemory.find(
(job) => job.id === data.id && job.name === data.name, (job) => job.id === data.id && job.name === data.name,
); );
job.status = 'completed'; job.status = 'completed';
this.duplicateProcessor.onCompleted.apply(this.duplicateProcessor, [ this.jobsEventService.onCompleted.apply(this.jobsEventService, [
data as any, data as any,
]); ]);
}); });
@ -42,7 +44,7 @@ export class QueueService {
(job) => job.id === data.job.id && job.name === data.job.name, (job) => job.id === data.job.id && job.name === data.job.name,
); );
job.status = 'failed'; job.status = 'failed';
this.duplicateProcessor.onFailed.apply(this.duplicateProcessor, [ this.jobsEventService.onFailed.apply(this.jobsEventService, [
data.job as any, data.job as any,
data.error, data.error,
]); ]);
@ -50,13 +52,16 @@ export class QueueService {
} }
jobMap = { jobMap = {
duplicate: this.duplicateProcessor.duplicateBase, duplicate: {
this: this.duplicateProcessor,
fn: this.duplicateProcessor.duplicateBase,
},
}; };
async jobWrapper(job: Job) { async jobWrapper(job: Job) {
this.emitter.emit('active', job); this.emitter.emit('active', job);
try { try {
await this.jobMap[job.name].apply(this.duplicateProcessor, [job]); await this.jobMap[job.name].fn.apply(this.jobMap[job.name].this, [job]);
this.emitter.emit('completed', job); this.emitter.emit('completed', job);
} catch (error) { } catch (error) {
this.emitter.emit('failed', { job, error }); this.emitter.emit('failed', { job, error });

60
packages/nocodb-nest/src/modules/jobs/jobs-event.service.ts

@ -0,0 +1,60 @@
import {
OnQueueActive,
OnQueueCompleted,
OnQueueFailed,
Processor,
} from '@nestjs/bull';
import { Job } from 'bull';
import boxen from 'boxen';
import { EventEmitter2 } from '@nestjs/event-emitter';
@Processor('jobs')
export class JobsEventService {
constructor(private eventEmitter: EventEmitter2) {}
@OnQueueActive()
onActive(job: Job) {
this.eventEmitter.emit('job.status', {
name: job.name,
id: job.id.toString(),
status: 'active',
});
}
@OnQueueFailed()
onFailed(job: Job, error: Error) {
console.error(
boxen(
`---- !! JOB FAILED !! ----\nname: ${job.name}\nid:${job.id}\nerror:${error.name} (${error.message})\n\nstack: ${error.stack}`,
{
padding: 1,
borderStyle: 'double',
borderColor: 'yellow',
},
),
);
this.eventEmitter.emit('job.status', {
name: job.name,
id: job.id.toString(),
status: 'failed',
});
}
@OnQueueCompleted()
onCompleted(job: Job) {
this.eventEmitter.emit('job.status', {
name: job.name,
id: job.id.toString(),
status: 'completed',
});
}
sendLog(job: Job, data: { message: string }) {
this.eventEmitter.emit('job.log', {
name: job.name,
id: job.id.toString(),
data,
});
}
}

55
packages/nocodb-nest/src/modules/jobs/jobs.gateway.ts

@ -6,9 +6,9 @@ import {
WebSocketServer, WebSocketServer,
} from '@nestjs/websockets'; } from '@nestjs/websockets';
import { Server, Socket } from 'socket.io'; import { Server, Socket } from 'socket.io';
import { forwardRef, Inject, Injectable } from '@nestjs/common';
import { ExecutionContextHost } from '@nestjs/core/helpers/execution-context-host'; import { ExecutionContextHost } from '@nestjs/core/helpers/execution-context-host';
import { AuthGuard } from '@nestjs/passport'; import { AuthGuard } from '@nestjs/passport';
import { OnEvent } from '@nestjs/event-emitter';
import { JobsService } from './jobs.service'; import { JobsService } from './jobs.service';
import type { OnModuleInit } from '@nestjs/common'; import type { OnModuleInit } from '@nestjs/common';
@ -20,12 +20,8 @@ import type { OnModuleInit } from '@nestjs/common';
}, },
namespace: 'jobs', namespace: 'jobs',
}) })
@Injectable()
export class JobsGateway implements OnModuleInit { export class JobsGateway implements OnModuleInit {
constructor( constructor(private readonly jobsService: JobsService) {}
@Inject(forwardRef(() => JobsService))
private readonly jobsService: JobsService,
) {}
@WebSocketServer() @WebSocketServer()
server: Server; server: Server;
@ -44,15 +40,32 @@ export class JobsGateway implements OnModuleInit {
@SubscribeMessage('subscribe') @SubscribeMessage('subscribe')
async subscribe( async subscribe(
@MessageBody() data: { name: string; id: string }, @MessageBody() data: { name: string; id: string } | { id: string },
@ConnectedSocket() client: Socket, @ConnectedSocket() client: Socket,
): Promise<void> { ): Promise<void> {
const rooms = (await this.jobsService.jobList(data.name)).map( if ('name' in data) {
(j) => `${j.name}-${j.id}`, const rooms = (await this.jobsService.jobList(data.name)).map(
); (j) => `${j.name}-${j.id}`,
const room = rooms.find((r) => r === `${data.name}-${data.id}`); );
if (room) { const room = rooms.find((r) => r === `${data.name}-${data.id}`);
client.join(`${data.name}-${data.id}`); if (room) {
client.join(`${data.name}-${data.id}`);
client.emit('subscribed', {
subbed: data.id,
name: data.name,
id: data.id,
});
}
} else {
const job = await this.jobsService.getJobWithData({ id: data.id });
if (job) {
client.join(`${job.name}-${job.id}`);
client.emit('subscribed', {
subbed: data.id,
name: job.name,
id: job.id,
});
}
} }
} }
@ -68,7 +81,8 @@ export class JobsGateway implements OnModuleInit {
}); });
} }
async jobStatus(data: { @OnEvent('job.status')
async sendJobStatus(data: {
name: string; name: string;
id: string; id: string;
status: status:
@ -86,4 +100,17 @@ export class JobsGateway implements OnModuleInit {
status: data.status, status: data.status,
}); });
} }
@OnEvent('job.log')
async sendJobLog(data: {
name: string;
id: string;
data: { message: string };
}): Promise<void> {
this.server.to(`${data.name}-${data.id}`).emit('log', {
id: data.id,
name: data.name,
data: data.data,
});
}
} }

2
packages/nocodb-nest/src/modules/jobs/jobs.module.ts

@ -10,6 +10,7 @@ import { DuplicateController } from './export-import/duplicate.controller';
import { DuplicateProcessor } from './export-import/duplicate.processor'; import { DuplicateProcessor } from './export-import/duplicate.processor';
import { JobsGateway } from './jobs.gateway'; import { JobsGateway } from './jobs.gateway';
import { QueueService } from './fallback-queue.service'; import { QueueService } from './fallback-queue.service';
import { JobsEventService } from './jobs-event.service';
@Module({ @Module({
imports: [ imports: [
@ -25,6 +26,7 @@ import { QueueService } from './fallback-queue.service';
QueueService, QueueService,
JobsGateway, JobsGateway,
JobsService, JobsService,
JobsEventService,
DuplicateProcessor, DuplicateProcessor,
ExportService, ExportService,
ImportService, ImportService,

24
packages/nocodb-nest/src/modules/jobs/jobs.service.ts

@ -24,4 +24,28 @@ export class JobsService {
await this.activeQueue.getJobs(['active', 'waiting', 'delayed']) await this.activeQueue.getJobs(['active', 'waiting', 'delayed'])
).filter((j) => j.name === jobType); ).filter((j) => j.name === jobType);
} }
async getJobWithData(data: any) {
const jobs = await this.activeQueue.getJobs([
'completed',
'waiting',
'active',
'delayed',
'failed',
'paused',
]);
const job = jobs.find((j) => {
for (const key in data) {
if (j.data[key]) {
if (j.data[key] !== data[key]) return false;
} else {
return false;
}
}
return true;
});
return job;
}
} }

Loading…
Cancel
Save