From 99eaa5c26c696d4fc5fbe9e9e58164bca21b7e07 Mon Sep 17 00:00:00 2001 From: mertmit Date: Tue, 9 May 2023 21:20:38 +0300 Subject: [PATCH] feat: clear in-memory queue after job finished Signed-off-by: mertmit --- .../modules/jobs/fallback-queue.service.ts | 26 ++++++++++++++----- 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/packages/nocodb/src/modules/jobs/fallback-queue.service.ts b/packages/nocodb/src/modules/jobs/fallback-queue.service.ts index 255714b72b..3250f90506 100644 --- a/packages/nocodb/src/modules/jobs/fallback-queue.service.ts +++ b/packages/nocodb/src/modules/jobs/fallback-queue.service.ts @@ -16,7 +16,7 @@ interface Job { @Injectable() export class QueueService { static queue = new PQueue({ concurrency: 1 }); - static queueIndex = 1; + static queueIdCounter = 1; static processed = 0; static queueMemory: Job[] = []; static emitter = new Emittery(); @@ -42,6 +42,8 @@ export class QueueService { job, data.result, ]); + // clear job from memory + this.removeJob(job); }); this.emitter.on(JobStatus.FAILED, (data: { job: Job; error: Error }) => { const job = this.queueMemory.find( @@ -52,6 +54,8 @@ export class QueueService { job, data.error, ]); + // clear job from memory + this.removeJob(job); }); } @@ -96,14 +100,14 @@ export class QueueService { } get queueIndex() { - return QueueService.queueIndex; + return QueueService.queueIdCounter; } set queueIndex(index: number) { - QueueService.queueIndex = index; + QueueService.queueIdCounter = index; } - async add(name: string, data: any) { + add(name: string, data: any) { const id = `${this.queueIndex++}`; const job = { id: `${id}`, name, status: JobStatus.WAITING, data }; this.queueMemory.push(job); @@ -111,12 +115,22 @@ export class QueueService { return { id, name }; } - async getJobs(types: string[] | string) { + getJobs(types: string[] | string) { types = Array.isArray(types) ? types : [types]; return this.queueMemory.filter((q) => types.includes(q.status)); } - async getJob(id: string) { + getJob(id: string) { return this.queueMemory.find((q) => q.id === id); } + + // remove job from memory + private removeJob(job: Job) { + const fIndex = this.queueMemory.findIndex( + (q) => q.id === job.id && q.name === job.name, + ); + if (fIndex) { + this.queueMemory.splice(fIndex, 1); + } + } }