mirror of https://github.com/nocodb/nocodb
mertmit
2 years ago
committed by
starbirdtech383
14 changed files with 245 additions and 59 deletions
@ -0,0 +1,102 @@
|
||||
import { Injectable } from '@nestjs/common'; |
||||
import PQueue from 'p-queue'; |
||||
import Emittery from 'emittery'; |
||||
import { DuplicateProcessor } from './export-import/duplicate.processor'; |
||||
|
||||
interface Job { |
||||
id: string; |
||||
name: string; |
||||
status: string; |
||||
data: any; |
||||
} |
||||
|
||||
@Injectable() |
||||
export class QueueService { |
||||
static queue = new PQueue({ concurrency: 1 }); |
||||
static queueIndex = 1; |
||||
static processed = 0; |
||||
static queueMemory: Job[] = []; |
||||
static emitter = new Emittery(); |
||||
|
||||
constructor(private readonly duplicateProcessor: DuplicateProcessor) { |
||||
this.emitter.on('active', (data: any) => { |
||||
const job = this.queueMemory.find( |
||||
(job) => job.id === data.id && job.name === data.name, |
||||
); |
||||
job.status = 'active'; |
||||
this.duplicateProcessor.onActive.apply(this.duplicateProcessor, [ |
||||
job as any, |
||||
]); |
||||
}); |
||||
this.emitter.on('completed', (data: any) => { |
||||
const job = this.queueMemory.find( |
||||
(job) => job.id === data.id && job.name === data.name, |
||||
); |
||||
job.status = 'completed'; |
||||
this.duplicateProcessor.onCompleted.apply(this.duplicateProcessor, [ |
||||
data as any, |
||||
]); |
||||
}); |
||||
this.emitter.on('failed', (data: { job: Job; error: Error }) => { |
||||
const job = this.queueMemory.find( |
||||
(job) => job.id === data.job.id && job.name === data.job.name, |
||||
); |
||||
job.status = 'failed'; |
||||
this.duplicateProcessor.onFailed.apply(this.duplicateProcessor, [ |
||||
data.job as any, |
||||
data.error, |
||||
]); |
||||
}); |
||||
} |
||||
|
||||
jobMap = { |
||||
duplicate: this.duplicateProcessor.duplicateBase, |
||||
}; |
||||
|
||||
async jobWrapper(job: Job) { |
||||
this.emitter.emit('active', job); |
||||
try { |
||||
await this.jobMap[job.name].apply(this.duplicateProcessor, [job]); |
||||
this.emitter.emit('completed', job); |
||||
} catch (error) { |
||||
this.emitter.emit('failed', { job, error }); |
||||
} |
||||
} |
||||
|
||||
get emitter() { |
||||
return QueueService.emitter; |
||||
} |
||||
|
||||
get queue() { |
||||
return QueueService.queue; |
||||
} |
||||
|
||||
get queueMemory() { |
||||
return QueueService.queueMemory; |
||||
} |
||||
|
||||
get queueIndex() { |
||||
return QueueService.queueIndex; |
||||
} |
||||
|
||||
set queueIndex(index: number) { |
||||
QueueService.queueIndex = index; |
||||
} |
||||
|
||||
async add(name: string, data: any) { |
||||
const id = `${this.queueIndex++}`; |
||||
const job = { id: `${id}`, name, status: 'waiting', data }; |
||||
this.queueMemory.push(job); |
||||
this.queue.add(() => this.jobWrapper(job)); |
||||
return { id, name }; |
||||
} |
||||
|
||||
async getJobs(types: string[] | string) { |
||||
types = Array.isArray(types) ? types : [types]; |
||||
return this.queueMemory.filter((q) => types.includes(q.status)); |
||||
} |
||||
|
||||
async getJob(id: string) { |
||||
return this.queueMemory.find((q) => q.id === id); |
||||
} |
||||
} |
@ -1,28 +1,27 @@
|
||||
import { InjectQueue } from '@nestjs/bull'; |
||||
import { Injectable } from '@nestjs/common'; |
||||
import { Queue } from 'bull'; |
||||
import { QueueService } from './fallback-queue.service'; |
||||
|
||||
@Injectable() |
||||
export class JobsService { |
||||
constructor(@InjectQueue('duplicate') private duplicateQueue: Queue) {} |
||||
activeQueue; |
||||
constructor( |
||||
@InjectQueue('jobs') private readonly jobsQueue: Queue, |
||||
private readonly fallbackQueueService: QueueService, |
||||
) { |
||||
this.activeQueue = process.env.NC_REDIS_URL |
||||
? this.jobsQueue |
||||
: this.fallbackQueueService; |
||||
} |
||||
|
||||
async jobStatus(jobType: string, jobId: string) { |
||||
switch (jobType) { |
||||
case 'duplicate': |
||||
default: |
||||
return await (await this.duplicateQueue.getJob(jobId)).getState(); |
||||
} |
||||
async jobStatus(jobId: string) { |
||||
return await (await this.activeQueue.getJob(jobId)).getState(); |
||||
} |
||||
|
||||
async jobList(jobType: string) { |
||||
switch (jobType) { |
||||
case 'duplicate': |
||||
default: |
||||
return await this.duplicateQueue.getJobs([ |
||||
'active', |
||||
'waiting', |
||||
'delayed', |
||||
]); |
||||
} |
||||
return ( |
||||
await this.activeQueue.getJobs(['active', 'waiting', 'delayed']) |
||||
).filter((j) => j.name === jobType); |
||||
} |
||||
} |
||||
|
Loading…
Reference in new issue