Browse Source

feat: clear in-memory queue after job finished

Signed-off-by: mertmit <mertmit99@gmail.com>
pull/5608/head
mertmit 1 year ago
parent
commit
99eaa5c26c
  1. 26
      packages/nocodb/src/modules/jobs/fallback-queue.service.ts

26
packages/nocodb/src/modules/jobs/fallback-queue.service.ts

@ -16,7 +16,7 @@ interface Job {
@Injectable() @Injectable()
export class QueueService { export class QueueService {
static queue = new PQueue({ concurrency: 1 }); static queue = new PQueue({ concurrency: 1 });
static queueIndex = 1; static queueIdCounter = 1;
static processed = 0; static processed = 0;
static queueMemory: Job[] = []; static queueMemory: Job[] = [];
static emitter = new Emittery(); static emitter = new Emittery();
@ -42,6 +42,8 @@ export class QueueService {
job, job,
data.result, data.result,
]); ]);
// clear job from memory
this.removeJob(job);
}); });
this.emitter.on(JobStatus.FAILED, (data: { job: Job; error: Error }) => { this.emitter.on(JobStatus.FAILED, (data: { job: Job; error: Error }) => {
const job = this.queueMemory.find( const job = this.queueMemory.find(
@ -52,6 +54,8 @@ export class QueueService {
job, job,
data.error, data.error,
]); ]);
// clear job from memory
this.removeJob(job);
}); });
} }
@ -96,14 +100,14 @@ export class QueueService {
} }
get queueIndex() { get queueIndex() {
return QueueService.queueIndex; return QueueService.queueIdCounter;
} }
set queueIndex(index: number) { set queueIndex(index: number) {
QueueService.queueIndex = index; QueueService.queueIdCounter = index;
} }
async add(name: string, data: any) { add(name: string, data: any) {
const id = `${this.queueIndex++}`; const id = `${this.queueIndex++}`;
const job = { id: `${id}`, name, status: JobStatus.WAITING, data }; const job = { id: `${id}`, name, status: JobStatus.WAITING, data };
this.queueMemory.push(job); this.queueMemory.push(job);
@ -111,12 +115,22 @@ export class QueueService {
return { id, name }; return { id, name };
} }
async getJobs(types: string[] | string) { getJobs(types: string[] | string) {
types = Array.isArray(types) ? types : [types]; types = Array.isArray(types) ? types : [types];
return this.queueMemory.filter((q) => types.includes(q.status)); return this.queueMemory.filter((q) => types.includes(q.status));
} }
async getJob(id: string) { getJob(id: string) {
return this.queueMemory.find((q) => q.id === id); return this.queueMemory.find((q) => q.id === id);
} }
// remove job from memory
private removeJob(job: Job) {
const fIndex = this.queueMemory.findIndex(
(q) => q.id === job.id && q.name === job.name,
);
if (fIndex) {
this.queueMemory.splice(fIndex, 1);
}
}
} }

Loading…
Cancel
Save