From d3903461351e2e11ef3fa58471ef831b391a1a47 Mon Sep 17 00:00:00 2001 From: mertmit Date: Fri, 21 Apr 2023 15:13:29 +0300 Subject: [PATCH] feat: fallback job queue Signed-off-by: mertmit --- packages/nc-gui/pages/index/index/index.vue | 3 +- packages/nc-gui/plugins/events.ts | 12 +-- packages/nocodb-nest/package-lock.json | 67 ++++++++++++ packages/nocodb-nest/package.json | 1 + packages/nocodb-nest/src/app.module.ts | 13 +-- .../export-import/duplicate.controller.ts | 15 ++- .../jobs/export-import/duplicate.processor.ts | 14 +-- .../jobs/export-import/export.service.ts | 1 - .../modules/jobs/fallback-queue.service.ts | 102 ++++++++++++++++++ .../src/modules/jobs/jobs.gateway.ts | 29 ++--- .../src/modules/jobs/jobs.module.ts | 4 +- .../src/modules/jobs/jobs.service.ts | 31 +++--- packages/nocodb-nest/src/schema/swagger.json | 4 +- packages/nocodb-sdk/src/lib/Api.ts | 8 +- 14 files changed, 245 insertions(+), 59 deletions(-) create mode 100644 packages/nocodb-nest/src/modules/jobs/fallback-queue.service.ts diff --git a/packages/nc-gui/pages/index/index/index.vue b/packages/nc-gui/pages/index/index/index.vue index d770cb9438..250de32f12 100644 --- a/packages/nc-gui/pages/index/index/index.vue +++ b/packages/nc-gui/pages/index/index/index.vue @@ -85,7 +85,8 @@ const duplicateProject = (project: ProjectType) => { try { const jobData = await api.project.duplicate(project.id as string) - $events.subscribe(jobData.type, jobData.id, async (data: { status: string }) => { + $events.subscribe(jobData.name, jobData.id, async (data: { status: string }) => { + console.log('dataCB', data) if (data.status === 'completed' || data.status === 'refresh') { await loadProjects() } else if (data.status === 'failed') { diff --git a/packages/nc-gui/plugins/events.ts b/packages/nc-gui/plugins/events.ts index b39f2086b4..f90d4064fe 100644 --- a/packages/nc-gui/plugins/events.ts +++ b/packages/nc-gui/plugins/events.ts @@ -29,11 +29,11 @@ export default defineNuxtPlugin(async (nuxtApp) => { } const events = { - subscribe(type: string, id: string, cb: (data: any) => void) { + subscribe(name: string, id: string, cb: (data: any) => void) { if (socket) { - socket.emit('subscribe', { type, id }) + socket.emit('subscribe', { name, id }) const tempFn = (data: any) => { - if (data.id === id && data.type === type) { + if (data.id === id && data.name === name) { cb(data) if (data.status === 'completed' || data.status === 'failed') { socket?.off('status', tempFn) @@ -43,12 +43,12 @@ export default defineNuxtPlugin(async (nuxtApp) => { socket.on('status', tempFn) } }, - getStatus(type: string, id: string): Promise { + getStatus(name: string, id: string): Promise { return new Promise((resolve) => { if (socket) { - socket.emit('status', { type, id }) + socket.emit('status', { name, id }) const tempFn = (data: any) => { - if (data.id === id && data.type === type) { + if (data.id === id && data.name === name) { resolve(data.status) socket?.off('status', tempFn) } diff --git a/packages/nocodb-nest/package-lock.json b/packages/nocodb-nest/package-lock.json index 1ef6b4a2b7..fcc7026cb2 100644 --- a/packages/nocodb-nest/package-lock.json +++ b/packages/nocodb-nest/package-lock.json @@ -86,6 +86,7 @@ "nodemailer": "^6.4.10", "object-hash": "^3.0.0", "os-locale": "^6.0.2", + "p-queue": "^6.6.2", "papaparse": "^5.3.1", "parse-database-url": "^0.3.0", "passport": "^0.4.1", @@ -21303,6 +21304,11 @@ "node": ">=6" } }, + "node_modules/eventemitter3": { + "version": "4.0.7", + "resolved": "https://registry.npmjs.org/eventemitter3/-/eventemitter3-4.0.7.tgz", + "integrity": "sha512-8guHBZCwKnFhYdHr2ysuRWErTwhoN2X8XELRlrRwpmfeY2jjuUN4taQMsULKUVo1K4DvZl+0pgfyoysHxvmvEw==" + }, "node_modules/events": { "version": "3.3.0", "license": "MIT", @@ -26088,6 +26094,14 @@ "node": ">=0.10.0" } }, + "node_modules/p-finally": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/p-finally/-/p-finally-1.0.0.tgz", + "integrity": "sha512-LICb2p9CB7FS+0eR1oqWnHhp0FljGLZCWBE9aix0Uye9W8LTQPwMTYVGWQWIw9RdQiDg4+epXQODwIYJtSJaow==", + "engines": { + "node": ">=4" + } + }, "node_modules/p-limit": { "version": "3.1.0", "license": "MIT", @@ -26129,6 +26143,32 @@ "url": "https://github.com/sponsors/sindresorhus" } }, + "node_modules/p-queue": { + "version": "6.6.2", + "resolved": "https://registry.npmjs.org/p-queue/-/p-queue-6.6.2.tgz", + "integrity": "sha512-RwFpb72c/BhQLEXIZ5K2e+AhgNVmIejGlTgiB9MzZ0e93GRvqZ7uSi0dvRF7/XIXDeNkra2fNHBxTyPDGySpjQ==", + "dependencies": { + "eventemitter3": "^4.0.4", + "p-timeout": "^3.2.0" + }, + "engines": { + "node": ">=8" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, + "node_modules/p-timeout": { + "version": "3.2.0", + "resolved": "https://registry.npmjs.org/p-timeout/-/p-timeout-3.2.0.tgz", + "integrity": "sha512-rhIwUycgwwKcP9yTOOFK/AKsAopjjCakVqLHePO3CC6Mir1Z99xT+R63jZxAT5lFZLa2inS5h+ZS2GvR99/FBg==", + "dependencies": { + "p-finally": "^1.0.0" + }, + "engines": { + "node": ">=8" + } + }, "node_modules/p-try": { "version": "2.2.0", "dev": true, @@ -34239,6 +34279,11 @@ "event-target-shim": { "version": "5.0.1" }, + "eventemitter3": { + "version": "4.0.7", + "resolved": "https://registry.npmjs.org/eventemitter3/-/eventemitter3-4.0.7.tgz", + "integrity": "sha512-8guHBZCwKnFhYdHr2ysuRWErTwhoN2X8XELRlrRwpmfeY2jjuUN4taQMsULKUVo1K4DvZl+0pgfyoysHxvmvEw==" + }, "events": { "version": "3.3.0" }, @@ -46431,6 +46476,11 @@ "minimist": "^1.1.0" } }, + "p-finally": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/p-finally/-/p-finally-1.0.0.tgz", + "integrity": "sha512-LICb2p9CB7FS+0eR1oqWnHhp0FljGLZCWBE9aix0Uye9W8LTQPwMTYVGWQWIw9RdQiDg4+epXQODwIYJtSJaow==" + }, "p-limit": { "version": "3.1.0", "requires": { @@ -46451,6 +46501,23 @@ "aggregate-error": "^3.0.0" } }, + "p-queue": { + "version": "6.6.2", + "resolved": "https://registry.npmjs.org/p-queue/-/p-queue-6.6.2.tgz", + "integrity": "sha512-RwFpb72c/BhQLEXIZ5K2e+AhgNVmIejGlTgiB9MzZ0e93GRvqZ7uSi0dvRF7/XIXDeNkra2fNHBxTyPDGySpjQ==", + "requires": { + "eventemitter3": "^4.0.4", + "p-timeout": "^3.2.0" + } + }, + "p-timeout": { + "version": "3.2.0", + "resolved": "https://registry.npmjs.org/p-timeout/-/p-timeout-3.2.0.tgz", + "integrity": "sha512-rhIwUycgwwKcP9yTOOFK/AKsAopjjCakVqLHePO3CC6Mir1Z99xT+R63jZxAT5lFZLa2inS5h+ZS2GvR99/FBg==", + "requires": { + "p-finally": "^1.0.0" + } + }, "p-try": { "version": "2.2.0", "dev": true diff --git a/packages/nocodb-nest/package.json b/packages/nocodb-nest/package.json index 6ce75495f0..c093b66395 100644 --- a/packages/nocodb-nest/package.json +++ b/packages/nocodb-nest/package.json @@ -117,6 +117,7 @@ "nodemailer": "^6.4.10", "object-hash": "^3.0.0", "os-locale": "^6.0.2", + "p-queue": "^6.6.2", "papaparse": "^5.3.1", "parse-database-url": "^0.3.0", "passport": "^0.4.1", diff --git a/packages/nocodb-nest/src/app.module.ts b/packages/nocodb-nest/src/app.module.ts index 81597ae773..a8166dba87 100644 --- a/packages/nocodb-nest/src/app.module.ts +++ b/packages/nocodb-nest/src/app.module.ts @@ -35,12 +35,13 @@ import type { MetasModule, DatasModule, JobsModule, - BullModule.forRoot({ - redis: { - host: 'localhost', - port: 6379, - }, - }), + ...(process.env['NC_REDIS_URL'] + ? [ + BullModule.forRoot({ + redis: process.env.NC_REDIS_URL, + }), + ] + : []), ], controllers: [], providers: [ diff --git a/packages/nocodb-nest/src/modules/jobs/export-import/duplicate.controller.ts b/packages/nocodb-nest/src/modules/jobs/export-import/duplicate.controller.ts index 61a0ca3f4f..d6b4dd9297 100644 --- a/packages/nocodb-nest/src/modules/jobs/export-import/duplicate.controller.ts +++ b/packages/nocodb-nest/src/modules/jobs/export-import/duplicate.controller.ts @@ -14,13 +14,20 @@ import { Acl, ExtractProjectIdMiddleware, } from 'src/middlewares/extract-project-id/extract-project-id.middleware'; +import { QueueService } from '../fallback-queue.service'; @Controller() @UseGuards(ExtractProjectIdMiddleware, GlobalGuard) export class DuplicateController { + activeQueue; constructor( - @InjectQueue('duplicate') private readonly duplicateQueue: Queue, - ) {} + @InjectQueue('jobs') private readonly jobsQueue: Queue, + private readonly fallbackQueueService: QueueService, + ) { + this.activeQueue = process.env.NC_REDIS_URL + ? this.jobsQueue + : this.fallbackQueueService; + } @Post('/api/v1/db/meta/duplicate/:projectId/:baseId?') @HttpCode(200) @@ -30,7 +37,7 @@ export class DuplicateController { @Param('projectId') projectId: string, @Param('baseId') baseId?: string, ) { - const job = await this.duplicateQueue.add('duplicate', { + const job = await this.activeQueue.add('duplicate', { projectId, baseId, req: { @@ -38,6 +45,6 @@ export class DuplicateController { clientIp: req.clientIp, }, }); - return { id: job.id, type: job.name }; + return { id: job.id, name: job.name }; } } diff --git a/packages/nocodb-nest/src/modules/jobs/export-import/duplicate.processor.ts b/packages/nocodb-nest/src/modules/jobs/export-import/duplicate.processor.ts index 83b8abcdbf..4d0f84a3e2 100644 --- a/packages/nocodb-nest/src/modules/jobs/export-import/duplicate.processor.ts +++ b/packages/nocodb-nest/src/modules/jobs/export-import/duplicate.processor.ts @@ -17,6 +17,7 @@ import { } from 'src/helpers/exportImportHelpers'; import { BulkDataAliasService } from 'src/services/bulk-data-alias.service'; import { UITypes } from 'nocodb-sdk'; +import { forwardRef, Inject } from '@nestjs/common'; import { JobsGateway } from '../jobs.gateway'; import { ExportService } from './export.service'; import { ImportService } from './import.service'; @@ -24,20 +25,21 @@ import type { LinkToAnotherRecordColumn } from 'src/models'; const DEBUG = false; -@Processor('duplicate') +@Processor('jobs') export class DuplicateProcessor { constructor( private readonly exportService: ExportService, private readonly importService: ImportService, private readonly projectsService: ProjectsService, private readonly bulkDataService: BulkDataAliasService, + @Inject(forwardRef(() => JobsGateway)) private readonly jobsGateway: JobsGateway, ) {} @OnQueueActive() onActive(job: Job) { this.jobsGateway.jobStatus({ - type: job.name, + name: job.name, id: job.id.toString(), status: 'active', }); @@ -47,7 +49,7 @@ export class DuplicateProcessor { onFailed(job: Job, error: Error) { console.error( boxen( - `---- !! JOB FAILED !! ----\ntype: ${job.name}\nid:${job.id}\nerror:${error.name} (${error.message})\n\nstack: ${error.stack}`, + `---- !! JOB FAILED !! ----\nname: ${job.name}\nid:${job.id}\nerror:${error.name} (${error.message})\n\nstack: ${error.stack}`, { padding: 1, borderStyle: 'double', @@ -57,7 +59,7 @@ export class DuplicateProcessor { ); this.jobsGateway.jobStatus({ - type: job.name, + name: job.name, id: job.id.toString(), status: 'failed', }); @@ -66,7 +68,7 @@ export class DuplicateProcessor { @OnQueueCompleted() onCompleted(job: Job) { this.jobsGateway.jobStatus({ - type: job.name, + name: job.name, id: job.id.toString(), status: 'completed', }); @@ -134,7 +136,7 @@ export class DuplicateProcessor { }); this.jobsGateway.jobStatus({ - type: job.name, + name: job.name, id: job.id.toString(), status: 'refresh', }); diff --git a/packages/nocodb-nest/src/modules/jobs/export-import/export.service.ts b/packages/nocodb-nest/src/modules/jobs/export-import/export.service.ts index 5b185d8d24..e1ee6fdd99 100644 --- a/packages/nocodb-nest/src/modules/jobs/export-import/export.service.ts +++ b/packages/nocodb-nest/src/modules/jobs/export-import/export.service.ts @@ -12,7 +12,6 @@ import { Base, Model, Project } from 'src/models'; import { DatasService } from 'src/services/datas.service'; import { Injectable } from '@nestjs/common'; import type { LinkToAnotherRecordColumn, View } from 'src/models'; -import type { IStorageAdapterV2 } from 'nc-plugin'; @Injectable() export class ExportService { diff --git a/packages/nocodb-nest/src/modules/jobs/fallback-queue.service.ts b/packages/nocodb-nest/src/modules/jobs/fallback-queue.service.ts new file mode 100644 index 0000000000..1859bf41a8 --- /dev/null +++ b/packages/nocodb-nest/src/modules/jobs/fallback-queue.service.ts @@ -0,0 +1,102 @@ +import { Injectable } from '@nestjs/common'; +import PQueue from 'p-queue'; +import Emittery from 'emittery'; +import { DuplicateProcessor } from './export-import/duplicate.processor'; + +interface Job { + id: string; + name: string; + status: string; + data: any; +} + +@Injectable() +export class QueueService { + static queue = new PQueue({ concurrency: 1 }); + static queueIndex = 1; + static processed = 0; + static queueMemory: Job[] = []; + static emitter = new Emittery(); + + constructor(private readonly duplicateProcessor: DuplicateProcessor) { + this.emitter.on('active', (data: any) => { + const job = this.queueMemory.find( + (job) => job.id === data.id && job.name === data.name, + ); + job.status = 'active'; + this.duplicateProcessor.onActive.apply(this.duplicateProcessor, [ + job as any, + ]); + }); + this.emitter.on('completed', (data: any) => { + const job = this.queueMemory.find( + (job) => job.id === data.id && job.name === data.name, + ); + job.status = 'completed'; + this.duplicateProcessor.onCompleted.apply(this.duplicateProcessor, [ + data as any, + ]); + }); + this.emitter.on('failed', (data: { job: Job; error: Error }) => { + const job = this.queueMemory.find( + (job) => job.id === data.job.id && job.name === data.job.name, + ); + job.status = 'failed'; + this.duplicateProcessor.onFailed.apply(this.duplicateProcessor, [ + data.job as any, + data.error, + ]); + }); + } + + jobMap = { + duplicate: this.duplicateProcessor.duplicateBase, + }; + + async jobWrapper(job: Job) { + this.emitter.emit('active', job); + try { + await this.jobMap[job.name].apply(this.duplicateProcessor, [job]); + this.emitter.emit('completed', job); + } catch (error) { + this.emitter.emit('failed', { job, error }); + } + } + + get emitter() { + return QueueService.emitter; + } + + get queue() { + return QueueService.queue; + } + + get queueMemory() { + return QueueService.queueMemory; + } + + get queueIndex() { + return QueueService.queueIndex; + } + + set queueIndex(index: number) { + QueueService.queueIndex = index; + } + + async add(name: string, data: any) { + const id = `${this.queueIndex++}`; + const job = { id: `${id}`, name, status: 'waiting', data }; + this.queueMemory.push(job); + this.queue.add(() => this.jobWrapper(job)); + return { id, name }; + } + + async getJobs(types: string[] | string) { + types = Array.isArray(types) ? types : [types]; + return this.queueMemory.filter((q) => types.includes(q.status)); + } + + async getJob(id: string) { + return this.queueMemory.find((q) => q.id === id); + } +} diff --git a/packages/nocodb-nest/src/modules/jobs/jobs.gateway.ts b/packages/nocodb-nest/src/modules/jobs/jobs.gateway.ts index a645eee474..ef0035a890 100644 --- a/packages/nocodb-nest/src/modules/jobs/jobs.gateway.ts +++ b/packages/nocodb-nest/src/modules/jobs/jobs.gateway.ts @@ -6,7 +6,7 @@ import { WebSocketServer, } from '@nestjs/websockets'; import { Server, Socket } from 'socket.io'; -import { Injectable } from '@nestjs/common'; +import { forwardRef, Inject, Injectable } from '@nestjs/common'; import { ExecutionContextHost } from '@nestjs/core/helpers/execution-context-host'; import { AuthGuard } from '@nestjs/passport'; import { JobsService } from './jobs.service'; @@ -22,7 +22,10 @@ import type { OnModuleInit } from '@nestjs/common'; }) @Injectable() export class JobsGateway implements OnModuleInit { - constructor(private readonly jobsService: JobsService) {} + constructor( + @Inject(forwardRef(() => JobsService)) + private readonly jobsService: JobsService, + ) {} @WebSocketServer() server: Server; @@ -41,30 +44,32 @@ export class JobsGateway implements OnModuleInit { @SubscribeMessage('subscribe') async subscribe( - @MessageBody() data: { type: string; id: string }, + @MessageBody() data: { name: string; id: string }, @ConnectedSocket() client: Socket, ): Promise { - const rooms = await this.jobsService.jobList(data.type); - const room = rooms.find((r) => r.id === data.id); + const rooms = (await this.jobsService.jobList(data.name)).map( + (j) => `${j.name}-${j.id}`, + ); + const room = rooms.find((r) => r === `${data.name}-${data.id}`); if (room) { - client.join(data.id); + client.join(`${data.name}-${data.id}`); } } @SubscribeMessage('status') async status( - @MessageBody() data: { type: string; id: string }, + @MessageBody() data: { name: string; id: string }, @ConnectedSocket() client: Socket, ): Promise { client.emit('status', { id: data.id, - type: data.type, - status: await this.jobsService.jobStatus(data.type, data.id), + name: data.name, + status: await this.jobsService.jobStatus(data.id), }); } async jobStatus(data: { - type: string; + name: string; id: string; status: | 'completed' @@ -75,9 +80,9 @@ export class JobsGateway implements OnModuleInit { | 'paused' | 'refresh'; }): Promise { - this.server.to(data.id).emit('status', { + this.server.to(`${data.name}-${data.id}`).emit('status', { id: data.id, - type: data.type, + name: data.name, status: data.status, }); } diff --git a/packages/nocodb-nest/src/modules/jobs/jobs.module.ts b/packages/nocodb-nest/src/modules/jobs/jobs.module.ts index dc0f94a304..e376f6917e 100644 --- a/packages/nocodb-nest/src/modules/jobs/jobs.module.ts +++ b/packages/nocodb-nest/src/modules/jobs/jobs.module.ts @@ -9,6 +9,7 @@ import { ImportService } from './export-import/import.service'; import { DuplicateController } from './export-import/duplicate.controller'; import { DuplicateProcessor } from './export-import/duplicate.processor'; import { JobsGateway } from './jobs.gateway'; +import { QueueService } from './fallback-queue.service'; @Module({ imports: [ @@ -16,11 +17,12 @@ import { JobsGateway } from './jobs.gateway'; DatasModule, MetasModule, BullModule.registerQueue({ - name: 'duplicate', + name: 'jobs', }), ], controllers: [DuplicateController], providers: [ + QueueService, JobsGateway, JobsService, DuplicateProcessor, diff --git a/packages/nocodb-nest/src/modules/jobs/jobs.service.ts b/packages/nocodb-nest/src/modules/jobs/jobs.service.ts index bbab813a02..2d27763fc3 100644 --- a/packages/nocodb-nest/src/modules/jobs/jobs.service.ts +++ b/packages/nocodb-nest/src/modules/jobs/jobs.service.ts @@ -1,28 +1,27 @@ import { InjectQueue } from '@nestjs/bull'; import { Injectable } from '@nestjs/common'; import { Queue } from 'bull'; +import { QueueService } from './fallback-queue.service'; @Injectable() export class JobsService { - constructor(@InjectQueue('duplicate') private duplicateQueue: Queue) {} + activeQueue; + constructor( + @InjectQueue('jobs') private readonly jobsQueue: Queue, + private readonly fallbackQueueService: QueueService, + ) { + this.activeQueue = process.env.NC_REDIS_URL + ? this.jobsQueue + : this.fallbackQueueService; + } - async jobStatus(jobType: string, jobId: string) { - switch (jobType) { - case 'duplicate': - default: - return await (await this.duplicateQueue.getJob(jobId)).getState(); - } + async jobStatus(jobId: string) { + return await (await this.activeQueue.getJob(jobId)).getState(); } async jobList(jobType: string) { - switch (jobType) { - case 'duplicate': - default: - return await this.duplicateQueue.getJobs([ - 'active', - 'waiting', - 'delayed', - ]); - } + return ( + await this.activeQueue.getJobs(['active', 'waiting', 'delayed']) + ).filter((j) => j.name === jobType); } } diff --git a/packages/nocodb-nest/src/schema/swagger.json b/packages/nocodb-nest/src/schema/swagger.json index 6f52de1311..7f1cbb7784 100644 --- a/packages/nocodb-nest/src/schema/swagger.json +++ b/packages/nocodb-nest/src/schema/swagger.json @@ -2112,7 +2112,7 @@ "schema": { "type": "object", "properties": { - "type": { + "name": { "type": "string" }, "id": { @@ -2170,7 +2170,7 @@ "schema": { "type": "object", "properties": { - "type": { + "name": { "type": "string" }, "id": { diff --git a/packages/nocodb-sdk/src/lib/Api.ts b/packages/nocodb-sdk/src/lib/Api.ts index bf098a6ff7..8c6b60930e 100644 --- a/packages/nocodb-sdk/src/lib/Api.ts +++ b/packages/nocodb-sdk/src/lib/Api.ts @@ -4019,7 +4019,7 @@ export class Api< * @summary Duplicate Project Base * @request POST:/api/v1/db/meta/duplicate/{projectId}/{baseId} * @response `200` `{ - type?: string, + name?: string, id?: string, }` OK @@ -4036,7 +4036,7 @@ export class Api< ) => this.request< { - type?: string; + name?: string; id?: string; }, { @@ -4058,7 +4058,7 @@ export class Api< * @summary Duplicate Project * @request POST:/api/v1/db/meta/duplicate/{projectId} * @response `200` `{ - type?: string, + name?: string, id?: string, }` OK @@ -4071,7 +4071,7 @@ export class Api< duplicate: (projectId: IdType, params: RequestParams = {}) => this.request< { - type?: string; + name?: string; id?: string; }, {