From 713c5243c0bb08493732645019de58b83d49d884 Mon Sep 17 00:00:00 2001 From: mertmit Date: Tue, 25 Apr 2023 19:13:30 +0300 Subject: [PATCH] feat: use emit to avoid circular dependency Signed-off-by: mertmit --- packages/nocodb-nest/package-lock.json | 32 +++++++++ packages/nocodb-nest/package.json | 1 + packages/nocodb-nest/src/app.module.ts | 2 + .../export-import/duplicate.controller.ts | 6 +- .../jobs/export-import/duplicate.processor.ts | 65 ++++--------------- .../modules/jobs/fallback-queue.service.ts | 21 +++--- .../src/modules/jobs/jobs-event.service.ts | 60 +++++++++++++++++ .../src/modules/jobs/jobs.gateway.ts | 55 ++++++++++++---- .../src/modules/jobs/jobs.module.ts | 2 + .../src/modules/jobs/jobs.service.ts | 24 +++++++ 10 files changed, 191 insertions(+), 77 deletions(-) create mode 100644 packages/nocodb-nest/src/modules/jobs/jobs-event.service.ts diff --git a/packages/nocodb-nest/package-lock.json b/packages/nocodb-nest/package-lock.json index fcc7026cb2..08ae4864d6 100644 --- a/packages/nocodb-nest/package-lock.json +++ b/packages/nocodb-nest/package-lock.json @@ -14,6 +14,7 @@ "@nestjs/bull": "^0.6.3", "@nestjs/common": "^9.4.0", "@nestjs/core": "^9.4.0", + "@nestjs/event-emitter": "^1.4.1", "@nestjs/jwt": "^10.0.3", "@nestjs/mapped-types": "*", "@nestjs/passport": "^9.0.3", @@ -16634,6 +16635,19 @@ } } }, + "node_modules/@nestjs/event-emitter": { + "version": "1.4.1", + "resolved": "https://registry.npmjs.org/@nestjs/event-emitter/-/event-emitter-1.4.1.tgz", + "integrity": "sha512-PmLpzMYgEKJNxOUrRjb6kNSm2PC6J+BeLTuF/bkYViGM/mVGvYOgU5jq8DQnXmiSmDmyWN+tO2cHSnR7odJJRA==", + "dependencies": { + "eventemitter2": "6.4.9" + }, + "peerDependencies": { + "@nestjs/common": "^7.0.0 || ^8.0.0 || ^9.0.0", + "@nestjs/core": "^7.0.0 || ^8.0.0 || ^9.0.0", + "reflect-metadata": "^0.1.12" + } + }, "node_modules/@nestjs/jwt": { "version": "10.0.3", "license": "MIT", @@ -21304,6 +21318,11 @@ "node": ">=6" } }, + "node_modules/eventemitter2": { + "version": "6.4.9", + "resolved": "https://registry.npmjs.org/eventemitter2/-/eventemitter2-6.4.9.tgz", + "integrity": "sha512-JEPTiaOt9f04oa6NOkc4aH+nVp5I3wEjpHbIPqfgCdD5v5bUzy7xQqwcVO2aDQgOWhI28da57HksMrzK9HlRxg==" + }, "node_modules/eventemitter3": { "version": "4.0.7", "resolved": "https://registry.npmjs.org/eventemitter3/-/eventemitter3-4.0.7.tgz", @@ -31170,6 +31189,14 @@ "uid": "2.0.2" } }, + "@nestjs/event-emitter": { + "version": "1.4.1", + "resolved": "https://registry.npmjs.org/@nestjs/event-emitter/-/event-emitter-1.4.1.tgz", + "integrity": "sha512-PmLpzMYgEKJNxOUrRjb6kNSm2PC6J+BeLTuF/bkYViGM/mVGvYOgU5jq8DQnXmiSmDmyWN+tO2cHSnR7odJJRA==", + "requires": { + "eventemitter2": "6.4.9" + } + }, "@nestjs/jwt": { "version": "10.0.3", "requires": { @@ -34279,6 +34306,11 @@ "event-target-shim": { "version": "5.0.1" }, + "eventemitter2": { + "version": "6.4.9", + "resolved": "https://registry.npmjs.org/eventemitter2/-/eventemitter2-6.4.9.tgz", + "integrity": "sha512-JEPTiaOt9f04oa6NOkc4aH+nVp5I3wEjpHbIPqfgCdD5v5bUzy7xQqwcVO2aDQgOWhI28da57HksMrzK9HlRxg==" + }, "eventemitter3": { "version": "4.0.7", "resolved": "https://registry.npmjs.org/eventemitter3/-/eventemitter3-4.0.7.tgz", diff --git a/packages/nocodb-nest/package.json b/packages/nocodb-nest/package.json index c093b66395..fa5013702d 100644 --- a/packages/nocodb-nest/package.json +++ b/packages/nocodb-nest/package.json @@ -45,6 +45,7 @@ "@nestjs/bull": "^0.6.3", "@nestjs/common": "^9.4.0", "@nestjs/core": "^9.4.0", + "@nestjs/event-emitter": "^1.4.1", "@nestjs/jwt": "^10.0.3", "@nestjs/mapped-types": "*", "@nestjs/passport": "^9.0.3", diff --git a/packages/nocodb-nest/src/app.module.ts b/packages/nocodb-nest/src/app.module.ts index a8166dba87..70ab72e351 100644 --- a/packages/nocodb-nest/src/app.module.ts +++ b/packages/nocodb-nest/src/app.module.ts @@ -1,6 +1,7 @@ import { Module, RequestMethod } from '@nestjs/common'; import { APP_FILTER } from '@nestjs/core'; import { BullModule } from '@nestjs/bull'; +import { EventEmitterModule } from '@nestjs/event-emitter'; import { Connection } from './connection/connection'; import { GlobalExceptionFilter } from './filters/global-exception/global-exception.filter'; import NcPluginMgrv2 from './helpers/NcPluginMgrv2'; @@ -35,6 +36,7 @@ import type { MetasModule, DatasModule, JobsModule, + EventEmitterModule.forRoot(), ...(process.env['NC_REDIS_URL'] ? [ BullModule.forRoot({ diff --git a/packages/nocodb-nest/src/modules/jobs/export-import/duplicate.controller.ts b/packages/nocodb-nest/src/modules/jobs/export-import/duplicate.controller.ts index f35444a6b9..ced65903a6 100644 --- a/packages/nocodb-nest/src/modules/jobs/export-import/duplicate.controller.ts +++ b/packages/nocodb-nest/src/modules/jobs/export-import/duplicate.controller.ts @@ -68,9 +68,9 @@ export class DuplicateController { }); const job = await this.activeQueue.add('duplicate', { - project, - base, - dupProject, + projectId: project.id, + baseId: base.id, + dupProjectId: dupProject.id, req: { user: req.user, clientIp: req.clientIp, diff --git a/packages/nocodb-nest/src/modules/jobs/export-import/duplicate.processor.ts b/packages/nocodb-nest/src/modules/jobs/export-import/duplicate.processor.ts index 7b11ef7c16..60ba3c1dda 100644 --- a/packages/nocodb-nest/src/modules/jobs/export-import/duplicate.processor.ts +++ b/packages/nocodb-nest/src/modules/jobs/export-import/duplicate.processor.ts @@ -1,21 +1,12 @@ import { Readable } from 'stream'; -import { - OnQueueActive, - OnQueueCompleted, - OnQueueFailed, - Process, - Processor, -} from '@nestjs/bull'; -import { Column, Model } from 'src/models'; +import { Process, Processor } from '@nestjs/bull'; +import { Base, Column, Model, Project } from 'src/models'; import { Job } from 'bull'; import { ProjectsService } from 'src/services/projects.service'; -import boxen from 'boxen'; import papaparse from 'papaparse'; import { findWithIdentifier } from 'src/helpers/exportImportHelpers'; import { BulkDataAliasService } from 'src/services/bulk-data-alias.service'; import { UITypes } from 'nocodb-sdk'; -import { forwardRef, Inject } from '@nestjs/common'; -import { JobsGateway } from '../jobs.gateway'; import { ExportService } from './export.service'; import { ImportService } from './import.service'; import type { LinkToAnotherRecordColumn } from 'src/models'; @@ -29,53 +20,21 @@ export class DuplicateProcessor { private readonly importService: ImportService, private readonly projectsService: ProjectsService, private readonly bulkDataService: BulkDataAliasService, - @Inject(forwardRef(() => JobsGateway)) - private readonly jobsGateway: JobsGateway, ) {} - @OnQueueActive() - onActive(job: Job) { - this.jobsGateway.jobStatus({ - name: job.name, - id: job.id.toString(), - status: 'active', - }); - } - - @OnQueueFailed() - onFailed(job: Job, error: Error) { - console.error( - boxen( - `---- !! JOB FAILED !! ----\nname: ${job.name}\nid:${job.id}\nerror:${error.name} (${error.message})\n\nstack: ${error.stack}`, - { - padding: 1, - borderStyle: 'double', - borderColor: 'yellow', - }, - ), - ); - - this.jobsGateway.jobStatus({ - name: job.name, - id: job.id.toString(), - status: 'failed', - }); - } - - @OnQueueCompleted() - onCompleted(job: Job) { - this.jobsGateway.jobStatus({ - name: job.name, - id: job.id.toString(), - status: 'completed', - }); - } - @Process('duplicate') async duplicateBase(job: Job) { - const { project, base, dupProject, req } = job.data; + const { projectId, baseId, dupProjectId, req } = job.data; + + const project = await Project.get(projectId); + const dupProject = await Project.get(dupProjectId); + const base = await Base.get(baseId); try { + if (!project || !dupProject || !base) { + throw new Error(`Project or base not found!`); + } + let start = process.hrtime(); const debugLog = function (...args: any[]) { @@ -107,6 +66,8 @@ export class DuplicateProcessor { throw new Error(`Export failed for base '${base.id}'`); } + await dupProject.getBases(); + const dupBaseId = dupProject.bases[0].id; elapsedTime('projectCreate'); diff --git a/packages/nocodb-nest/src/modules/jobs/fallback-queue.service.ts b/packages/nocodb-nest/src/modules/jobs/fallback-queue.service.ts index 1859bf41a8..3ac9ced9ff 100644 --- a/packages/nocodb-nest/src/modules/jobs/fallback-queue.service.ts +++ b/packages/nocodb-nest/src/modules/jobs/fallback-queue.service.ts @@ -2,6 +2,7 @@ import { Injectable } from '@nestjs/common'; import PQueue from 'p-queue'; import Emittery from 'emittery'; import { DuplicateProcessor } from './export-import/duplicate.processor'; +import { JobsEventService } from './jobs-event.service'; interface Job { id: string; @@ -18,22 +19,23 @@ export class QueueService { static queueMemory: Job[] = []; static emitter = new Emittery(); - constructor(private readonly duplicateProcessor: DuplicateProcessor) { + constructor( + private readonly jobsEventService: JobsEventService, + private readonly duplicateProcessor: DuplicateProcessor, + ) { this.emitter.on('active', (data: any) => { const job = this.queueMemory.find( (job) => job.id === data.id && job.name === data.name, ); job.status = 'active'; - this.duplicateProcessor.onActive.apply(this.duplicateProcessor, [ - job as any, - ]); + this.jobsEventService.onActive.apply(this.jobsEventService, [job as any]); }); this.emitter.on('completed', (data: any) => { const job = this.queueMemory.find( (job) => job.id === data.id && job.name === data.name, ); job.status = 'completed'; - this.duplicateProcessor.onCompleted.apply(this.duplicateProcessor, [ + this.jobsEventService.onCompleted.apply(this.jobsEventService, [ data as any, ]); }); @@ -42,7 +44,7 @@ export class QueueService { (job) => job.id === data.job.id && job.name === data.job.name, ); job.status = 'failed'; - this.duplicateProcessor.onFailed.apply(this.duplicateProcessor, [ + this.jobsEventService.onFailed.apply(this.jobsEventService, [ data.job as any, data.error, ]); @@ -50,13 +52,16 @@ export class QueueService { } jobMap = { - duplicate: this.duplicateProcessor.duplicateBase, + duplicate: { + this: this.duplicateProcessor, + fn: this.duplicateProcessor.duplicateBase, + }, }; async jobWrapper(job: Job) { this.emitter.emit('active', job); try { - await this.jobMap[job.name].apply(this.duplicateProcessor, [job]); + await this.jobMap[job.name].fn.apply(this.jobMap[job.name].this, [job]); this.emitter.emit('completed', job); } catch (error) { this.emitter.emit('failed', { job, error }); diff --git a/packages/nocodb-nest/src/modules/jobs/jobs-event.service.ts b/packages/nocodb-nest/src/modules/jobs/jobs-event.service.ts new file mode 100644 index 0000000000..dfc400e4e7 --- /dev/null +++ b/packages/nocodb-nest/src/modules/jobs/jobs-event.service.ts @@ -0,0 +1,60 @@ +import { + OnQueueActive, + OnQueueCompleted, + OnQueueFailed, + Processor, +} from '@nestjs/bull'; +import { Job } from 'bull'; +import boxen from 'boxen'; +import { EventEmitter2 } from '@nestjs/event-emitter'; + +@Processor('jobs') +export class JobsEventService { + constructor(private eventEmitter: EventEmitter2) {} + + @OnQueueActive() + onActive(job: Job) { + this.eventEmitter.emit('job.status', { + name: job.name, + id: job.id.toString(), + status: 'active', + }); + } + + @OnQueueFailed() + onFailed(job: Job, error: Error) { + console.error( + boxen( + `---- !! JOB FAILED !! ----\nname: ${job.name}\nid:${job.id}\nerror:${error.name} (${error.message})\n\nstack: ${error.stack}`, + { + padding: 1, + borderStyle: 'double', + borderColor: 'yellow', + }, + ), + ); + + this.eventEmitter.emit('job.status', { + name: job.name, + id: job.id.toString(), + status: 'failed', + }); + } + + @OnQueueCompleted() + onCompleted(job: Job) { + this.eventEmitter.emit('job.status', { + name: job.name, + id: job.id.toString(), + status: 'completed', + }); + } + + sendLog(job: Job, data: { message: string }) { + this.eventEmitter.emit('job.log', { + name: job.name, + id: job.id.toString(), + data, + }); + } +} diff --git a/packages/nocodb-nest/src/modules/jobs/jobs.gateway.ts b/packages/nocodb-nest/src/modules/jobs/jobs.gateway.ts index ef0035a890..f9b521afa1 100644 --- a/packages/nocodb-nest/src/modules/jobs/jobs.gateway.ts +++ b/packages/nocodb-nest/src/modules/jobs/jobs.gateway.ts @@ -6,9 +6,9 @@ import { WebSocketServer, } from '@nestjs/websockets'; import { Server, Socket } from 'socket.io'; -import { forwardRef, Inject, Injectable } from '@nestjs/common'; import { ExecutionContextHost } from '@nestjs/core/helpers/execution-context-host'; import { AuthGuard } from '@nestjs/passport'; +import { OnEvent } from '@nestjs/event-emitter'; import { JobsService } from './jobs.service'; import type { OnModuleInit } from '@nestjs/common'; @@ -20,12 +20,8 @@ import type { OnModuleInit } from '@nestjs/common'; }, namespace: 'jobs', }) -@Injectable() export class JobsGateway implements OnModuleInit { - constructor( - @Inject(forwardRef(() => JobsService)) - private readonly jobsService: JobsService, - ) {} + constructor(private readonly jobsService: JobsService) {} @WebSocketServer() server: Server; @@ -44,15 +40,32 @@ export class JobsGateway implements OnModuleInit { @SubscribeMessage('subscribe') async subscribe( - @MessageBody() data: { name: string; id: string }, + @MessageBody() data: { name: string; id: string } | { id: string }, @ConnectedSocket() client: Socket, ): Promise { - const rooms = (await this.jobsService.jobList(data.name)).map( - (j) => `${j.name}-${j.id}`, - ); - const room = rooms.find((r) => r === `${data.name}-${data.id}`); - if (room) { - client.join(`${data.name}-${data.id}`); + if ('name' in data) { + const rooms = (await this.jobsService.jobList(data.name)).map( + (j) => `${j.name}-${j.id}`, + ); + const room = rooms.find((r) => r === `${data.name}-${data.id}`); + if (room) { + client.join(`${data.name}-${data.id}`); + client.emit('subscribed', { + subbed: data.id, + name: data.name, + id: data.id, + }); + } + } else { + const job = await this.jobsService.getJobWithData({ id: data.id }); + if (job) { + client.join(`${job.name}-${job.id}`); + client.emit('subscribed', { + subbed: data.id, + name: job.name, + id: job.id, + }); + } } } @@ -68,7 +81,8 @@ export class JobsGateway implements OnModuleInit { }); } - async jobStatus(data: { + @OnEvent('job.status') + async sendJobStatus(data: { name: string; id: string; status: @@ -86,4 +100,17 @@ export class JobsGateway implements OnModuleInit { status: data.status, }); } + + @OnEvent('job.log') + async sendJobLog(data: { + name: string; + id: string; + data: { message: string }; + }): Promise { + this.server.to(`${data.name}-${data.id}`).emit('log', { + id: data.id, + name: data.name, + data: data.data, + }); + } } diff --git a/packages/nocodb-nest/src/modules/jobs/jobs.module.ts b/packages/nocodb-nest/src/modules/jobs/jobs.module.ts index e376f6917e..33c8b81471 100644 --- a/packages/nocodb-nest/src/modules/jobs/jobs.module.ts +++ b/packages/nocodb-nest/src/modules/jobs/jobs.module.ts @@ -10,6 +10,7 @@ import { DuplicateController } from './export-import/duplicate.controller'; import { DuplicateProcessor } from './export-import/duplicate.processor'; import { JobsGateway } from './jobs.gateway'; import { QueueService } from './fallback-queue.service'; +import { JobsEventService } from './jobs-event.service'; @Module({ imports: [ @@ -25,6 +26,7 @@ import { QueueService } from './fallback-queue.service'; QueueService, JobsGateway, JobsService, + JobsEventService, DuplicateProcessor, ExportService, ImportService, diff --git a/packages/nocodb-nest/src/modules/jobs/jobs.service.ts b/packages/nocodb-nest/src/modules/jobs/jobs.service.ts index 2d27763fc3..3271f631bb 100644 --- a/packages/nocodb-nest/src/modules/jobs/jobs.service.ts +++ b/packages/nocodb-nest/src/modules/jobs/jobs.service.ts @@ -24,4 +24,28 @@ export class JobsService { await this.activeQueue.getJobs(['active', 'waiting', 'delayed']) ).filter((j) => j.name === jobType); } + + async getJobWithData(data: any) { + const jobs = await this.activeQueue.getJobs([ + 'completed', + 'waiting', + 'active', + 'delayed', + 'failed', + 'paused', + ]); + + const job = jobs.find((j) => { + for (const key in data) { + if (j.data[key]) { + if (j.data[key] !== data[key]) return false; + } else { + return false; + } + } + return true; + }); + + return job; + } }