|
|
|
@ -1,5 +1,5 @@
|
|
|
|
|
import { InjectQueue } from '@nestjs/bull'; |
|
|
|
|
import { Injectable } from '@nestjs/common'; |
|
|
|
|
import { Injectable, Logger } from '@nestjs/common'; |
|
|
|
|
import { Queue } from 'bull'; |
|
|
|
|
import type { OnModuleInit } from '@nestjs/common'; |
|
|
|
|
import { JOBS_QUEUE, JobStatus } from '~/interface/Jobs'; |
|
|
|
@ -7,6 +7,8 @@ import { JobsRedisService } from '~/modules/jobs/redis/jobs-redis.service';
|
|
|
|
|
|
|
|
|
|
@Injectable() |
|
|
|
|
export class JobsService implements OnModuleInit { |
|
|
|
|
private logger = new Logger(JobsService.name); |
|
|
|
|
|
|
|
|
|
constructor( |
|
|
|
|
@InjectQueue(JOBS_QUEUE) protected readonly jobsQueue: Queue, |
|
|
|
|
protected readonly jobsRedisService: JobsRedisService, |
|
|
|
@ -16,20 +18,15 @@ export class JobsService implements OnModuleInit {
|
|
|
|
|
async onModuleInit() { |
|
|
|
|
if (process.env.NC_WORKER_CONTAINER !== 'true') { |
|
|
|
|
await this.jobsQueue.pause(true); |
|
|
|
|
this.jobsRedisService.publish('workers', 'pause'); |
|
|
|
|
} else { |
|
|
|
|
this.jobsRedisService.workerCallbacks['resumeLocal'] = async () => { |
|
|
|
|
this.logger.log('Resuming local queue'); |
|
|
|
|
await this.jobsQueue.resume(true); |
|
|
|
|
}; |
|
|
|
|
this.jobsRedisService.workerCallbacks['pauseLocal'] = async () => { |
|
|
|
|
this.logger.log('Pausing local queue'); |
|
|
|
|
await this.jobsQueue.pause(true); |
|
|
|
|
}; |
|
|
|
|
this.jobsRedisService.workerCallbacks['resume'] = async () => { |
|
|
|
|
await this.jobsQueue.resume(); |
|
|
|
|
}; |
|
|
|
|
this.jobsRedisService.workerCallbacks['pause'] = async () => { |
|
|
|
|
await this.jobsQueue.pause(); |
|
|
|
|
}; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -92,10 +89,12 @@ export class JobsService implements OnModuleInit {
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
async resumeQueue() { |
|
|
|
|
this.logger.log('Resuming global queue'); |
|
|
|
|
await this.jobsQueue.resume(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
async pauseQueue() { |
|
|
|
|
this.logger.log('Pausing global queue'); |
|
|
|
|
await this.jobsQueue.pause(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|