From 7c4dfc99cc1a4bc26807981cbe6d0b4ec0b0ec2f Mon Sep 17 00:00:00 2001 From: mertmit Date: Sat, 20 May 2023 22:28:08 +0300 Subject: [PATCH] feat: solely use id for jobs Signed-off-by: mertmit --- .../nc-gui/components/dashboard/TreeView.vue | 4 +- packages/nc-gui/nuxt-shim.d.ts | 1 - packages/nc-gui/pages/index/index/index.vue | 4 +- packages/nc-gui/plugins/jobs.ts | 17 ++++---- .../jobs/at-import/at-import.controller.ts | 6 +-- .../export-import/duplicate.controller.ts | 4 +- .../modules/jobs/fallback-queue.service.ts | 16 ++----- .../src/modules/jobs/jobs-event.service.ts | 6 +-- .../nocodb/src/modules/jobs/jobs.gateway.ts | 42 ++++++------------- .../nocodb/src/modules/jobs/jobs.service.ts | 16 ++++--- 10 files changed, 41 insertions(+), 75 deletions(-) diff --git a/packages/nc-gui/components/dashboard/TreeView.vue b/packages/nc-gui/components/dashboard/TreeView.vue index 2f5f97ee4b..5940b634c5 100644 --- a/packages/nc-gui/components/dashboard/TreeView.vue +++ b/packages/nc-gui/components/dashboard/TreeView.vue @@ -399,8 +399,8 @@ const duplicateTable = async (table: TableType) => { const { close } = useDialog(resolveComponent('DlgTableDuplicate'), { 'modelValue': isOpen, 'table': table, - 'onOk': async (jobData: { name: string; id: string }) => { - $jobs.subscribe({ name: jobData.name, id: jobData.id }, undefined, async (status: string, data?: any) => { + 'onOk': async (jobData: { id: string }) => { + $jobs.subscribe({ id: jobData.id }, undefined, async (status: string, data?: any) => { if (status === JobStatus.COMPLETED) { await loadTables() const newTable = tables.value.find((el) => el.id === data?.result?.id) diff --git a/packages/nc-gui/nuxt-shim.d.ts b/packages/nc-gui/nuxt-shim.d.ts index dec8a6e113..4dc4ac605a 100644 --- a/packages/nc-gui/nuxt-shim.d.ts +++ b/packages/nc-gui/nuxt-shim.d.ts @@ -18,7 +18,6 @@ declare module '#app/nuxt' { job: | { id: string - name: string } | any, subscribedCb?: () => void, diff --git a/packages/nc-gui/pages/index/index/index.vue b/packages/nc-gui/pages/index/index/index.vue index bc22e036b3..486786b72b 100644 --- a/packages/nc-gui/pages/index/index/index.vue +++ b/packages/nc-gui/pages/index/index/index.vue @@ -90,10 +90,10 @@ const duplicateProject = (project: ProjectType) => { const { close } = useDialog(resolveComponent('DlgProjectDuplicate'), { 'modelValue': isOpen, 'project': project, - 'onOk': async (jobData: { name: string; id: string }) => { + 'onOk': async (jobData: { id: string }) => { await loadProjects() - $jobs.subscribe({ name: jobData.name, id: jobData.id }, undefined, async (status: string) => { + $jobs.subscribe({ id: jobData.id }, undefined, async (status: string) => { if (status === JobStatus.COMPLETED) { await loadProjects() } else if (status === JobStatus.FAILED) { diff --git a/packages/nc-gui/plugins/jobs.ts b/packages/nc-gui/plugins/jobs.ts index aa6b2cddb7..cc12c3aa86 100644 --- a/packages/nc-gui/plugins/jobs.ts +++ b/packages/nc-gui/plugins/jobs.ts @@ -29,22 +29,22 @@ export default defineNuxtPlugin(async (nuxtApp) => { await init(nuxtApp.$state.token.value) } - const send = (name: string, data: any) => { + const send = (evt: string, data: any) => { if (socket) { const _id = messageIndex++ - socket.emit(name, { _id, data }) + socket.emit(evt, { _id, data }) return _id } } const jobs = { subscribe( - job: { id: string; name: string } | any, + job: { id: string } | any, subscribedCb?: () => void, statusCb?: (status: JobStatus, data?: any) => void, logCb?: (data: { message: string }) => void, ) { - const logFn = (data: { id: string; name: string; data: { message: string } }) => { + const logFn = (data: { id: string; data: { message: string } }) => { if (data.id === job.id) { if (logCb) logCb(data.data) } @@ -61,11 +61,10 @@ export default defineNuxtPlugin(async (nuxtApp) => { const _id = send('subscribe', job) - const subscribeFn = (data: { _id: number; name: string; id: string }) => { + const subscribeFn = (data: { _id: number; id: string }) => { if (data._id === _id) { - if (data.id !== job.id || data.name !== job.name) { + if (data.id !== job.id) { job.id = data.id - job.name = data.name } if (subscribedCb) subscribedCb() socket?.on('log', logFn) @@ -75,10 +74,10 @@ export default defineNuxtPlugin(async (nuxtApp) => { } socket?.on('subscribed', subscribeFn) }, - getStatus(name: string, id: string): Promise { + getStatus(id: string): Promise { return new Promise((resolve) => { if (socket) { - const _id = send('status', { name, id }) + const _id = send('status', { id }) const tempFn = (data: any) => { if (data._id === _id) { resolve(data.status) diff --git a/packages/nocodb/src/modules/jobs/at-import/at-import.controller.ts b/packages/nocodb/src/modules/jobs/at-import/at-import.controller.ts index b8db84f927..289894d944 100644 --- a/packages/nocodb/src/modules/jobs/at-import/at-import.controller.ts +++ b/packages/nocodb/src/modules/jobs/at-import/at-import.controller.ts @@ -18,13 +18,13 @@ export class AtImportController { ...req.body, }); - return { id: job.id, name: job.name }; + return { id: job.id }; } @Post('/api/v1/db/meta/syncs/:syncId/trigger') @HttpCode(200) async triggerSync(@Request() req) { - const jobs = await this.jobsService.jobList(JobTypes.AtImport); + const jobs = await this.jobsService.jobList(); const fnd = jobs.find((j) => j.data.syncId === req.params.syncId); if (fnd) { @@ -54,7 +54,7 @@ export class AtImportController { user: user, }); - return { id: job.id, name: job.name }; + return { id: job.id }; } @Post('/api/v1/db/meta/syncs/:syncId/abort') diff --git a/packages/nocodb/src/modules/jobs/export-import/duplicate.controller.ts b/packages/nocodb/src/modules/jobs/export-import/duplicate.controller.ts index f8f785aa44..d8bf734263 100644 --- a/packages/nocodb/src/modules/jobs/export-import/duplicate.controller.ts +++ b/packages/nocodb/src/modules/jobs/export-import/duplicate.controller.ts @@ -78,7 +78,7 @@ export class DuplicateController { }, }); - return { id: job.id, name: job.name }; + return { id: job.id }; } @Post('/api/v1/db/meta/duplicate/:projectId/table/:modelId') @@ -128,6 +128,6 @@ export class DuplicateController { }, }); - return { id: job.id, name: job.name }; + return { id: job.id }; } } diff --git a/packages/nocodb/src/modules/jobs/fallback-queue.service.ts b/packages/nocodb/src/modules/jobs/fallback-queue.service.ts index 3250f90506..c8c45cfe13 100644 --- a/packages/nocodb/src/modules/jobs/fallback-queue.service.ts +++ b/packages/nocodb/src/modules/jobs/fallback-queue.service.ts @@ -27,16 +27,12 @@ export class QueueService { private readonly atImportProcessor: AtImportProcessor, ) { this.emitter.on(JobStatus.ACTIVE, (data: { job: Job }) => { - const job = this.queueMemory.find( - (job) => job.id === data.job.id && job.name === data.job.name, - ); + const job = this.queueMemory.find((job) => job.id === data.job.id); job.status = JobStatus.ACTIVE; this.jobsEventService.onActive.apply(this.jobsEventService, [job as any]); }); this.emitter.on(JobStatus.COMPLETED, (data: { job: Job; result: any }) => { - const job = this.queueMemory.find( - (job) => job.id === data.job.id && job.name === data.job.name, - ); + const job = this.queueMemory.find((job) => job.id === data.job.id); job.status = JobStatus.COMPLETED; this.jobsEventService.onCompleted.apply(this.jobsEventService, [ job, @@ -46,9 +42,7 @@ export class QueueService { this.removeJob(job); }); this.emitter.on(JobStatus.FAILED, (data: { job: Job; error: Error }) => { - const job = this.queueMemory.find( - (job) => job.id === data.job.id && job.name === data.job.name, - ); + const job = this.queueMemory.find((job) => job.id === data.job.id); job.status = JobStatus.FAILED; this.jobsEventService.onFailed.apply(this.jobsEventService, [ job, @@ -126,9 +120,7 @@ export class QueueService { // remove job from memory private removeJob(job: Job) { - const fIndex = this.queueMemory.findIndex( - (q) => q.id === job.id && q.name === job.name, - ); + const fIndex = this.queueMemory.findIndex((q) => q.id === job.id); if (fIndex) { this.queueMemory.splice(fIndex, 1); } diff --git a/packages/nocodb/src/modules/jobs/jobs-event.service.ts b/packages/nocodb/src/modules/jobs/jobs-event.service.ts index 7e72857204..6f113cc9fc 100644 --- a/packages/nocodb/src/modules/jobs/jobs-event.service.ts +++ b/packages/nocodb/src/modules/jobs/jobs-event.service.ts @@ -16,7 +16,6 @@ export class JobsEventService { @OnQueueActive() onActive(job: Job) { this.eventEmitter.emit(JobEvents.STATUS, { - name: job.name, id: job.id.toString(), status: JobStatus.ACTIVE, }); @@ -26,7 +25,7 @@ export class JobsEventService { onFailed(job: Job, error: Error) { console.error( boxen( - `---- !! JOB FAILED !! ----\nname: ${job.name}\nid:${job.id}\nerror:${error.name} (${error.message})\n\nstack: ${error.stack}`, + `---- !! JOB FAILED !! ----\nid:${job.id}\nerror:${error.name} (${error.message})\n\nstack: ${error.stack}`, { padding: 1, borderStyle: 'double', @@ -36,7 +35,6 @@ export class JobsEventService { ); this.eventEmitter.emit(JobEvents.STATUS, { - name: job.name, id: job.id.toString(), status: JobStatus.FAILED, data: { @@ -50,7 +48,6 @@ export class JobsEventService { @OnQueueCompleted() onCompleted(job: Job, data: any) { this.eventEmitter.emit(JobEvents.STATUS, { - name: job.name, id: job.id.toString(), status: JobStatus.COMPLETED, data: { @@ -61,7 +58,6 @@ export class JobsEventService { sendLog(job: Job, data: { message: string }) { this.eventEmitter.emit(JobEvents.LOG, { - name: job.name, id: job.id.toString(), data, }); diff --git a/packages/nocodb/src/modules/jobs/jobs.gateway.ts b/packages/nocodb/src/modules/jobs/jobs.gateway.ts index 119462e69f..2576c71728 100644 --- a/packages/nocodb/src/modules/jobs/jobs.gateway.ts +++ b/packages/nocodb/src/modules/jobs/jobs.gateway.ts @@ -43,34 +43,28 @@ export class JobsGateway implements OnModuleInit { @SubscribeMessage('subscribe') async subscribe( @MessageBody() - body: { _id: number; data: { id: string; name: string } | any }, + body: { _id: number; data: { id: string } | any }, @ConnectedSocket() client: Socket, ): Promise { const { _id, data } = body; - if ( - Object.keys(data).every((k) => ['name', 'id'].includes(k)) && - data?.name && - data?.id - ) { - const rooms = (await this.jobsService.jobList(data.name)).map( - (j) => `${j.name}-${j.id}`, + if (Object.keys(data).every((k) => ['id'].includes(k)) && data?.id) { + const rooms = (await this.jobsService.jobList()).map( + (j) => `jobs-${j.id}`, ); - const room = rooms.find((r) => r === `${data.name}-${data.id}`); + const room = rooms.find((r) => r === `jobs-${data.id}`); if (room) { - client.join(`${data.name}-${data.id}`); + client.join(`jobs-${data.id}`); client.emit('subscribed', { _id, - name: data.name, id: data.id, }); } } else { const job = await this.jobsService.getJobWithData(data); if (job) { - client.join(`${job.name}-${job.id}`); + client.join(`jobs-${job.id}`); client.emit('subscribed', { _id, - name: job.name, id: job.id, }); } @@ -79,42 +73,30 @@ export class JobsGateway implements OnModuleInit { @SubscribeMessage('status') async status( - @MessageBody() body: { _id: number; data: { id: string; name: string } }, + @MessageBody() body: { _id: number; data: { id: string } }, @ConnectedSocket() client: Socket, ): Promise { const { _id, data } = body; client.emit('status', { _id, id: data.id, - name: data.name, status: await this.jobsService.jobStatus(data.id), }); } @OnEvent(JobEvents.STATUS) - async sendJobStatus(data: { - name: string; - id: string; - status: JobStatus; - data?: any; - }): Promise { - this.server.to(`${data.name}-${data.id}`).emit('status', { + sendJobStatus(data: { id: string; status: JobStatus; data?: any }): void { + this.server.to(`jobs-${data.id}`).emit('status', { id: data.id, - name: data.name, status: data.status, data: data.data, }); } @OnEvent(JobEvents.LOG) - async sendJobLog(data: { - name: string; - id: string; - data: { message: string }; - }): Promise { - this.server.to(`${data.name}-${data.id}`).emit('log', { + sendJobLog(data: { id: string; data: { message: string } }): void { + this.server.to(`jobs-${data.id}`).emit('log', { id: data.id, - name: data.name, data: data.data, }); } diff --git a/packages/nocodb/src/modules/jobs/jobs.service.ts b/packages/nocodb/src/modules/jobs/jobs.service.ts index f3b9dc326a..9087e4c8a3 100644 --- a/packages/nocodb/src/modules/jobs/jobs.service.ts +++ b/packages/nocodb/src/modules/jobs/jobs.service.ts @@ -28,15 +28,13 @@ export class JobsService { return await (await this.activeQueue.getJob(jobId)).getState(); } - async jobList(jobType: string) { - return ( - await this.activeQueue.getJobs([ - JobStatus.ACTIVE, - JobStatus.WAITING, - JobStatus.DELAYED, - JobStatus.PAUSED, - ]) - ).filter((j) => j.name === jobType); + async jobList() { + return await this.activeQueue.getJobs([ + JobStatus.ACTIVE, + JobStatus.WAITING, + JobStatus.DELAYED, + JobStatus.PAUSED, + ]); } async getJobWithData(data: any) {