Browse Source

Nc feat/cleanup (#8508)

* feat: clean-up job

* feat: source cleanup

* fix: remove unused method

* feat: move jobs-redis to a static class

* feat: release sources from in-memory db on update & delete

* fix: skip calls if job redis not available

* fix: error handling on connection delete

---------

Co-authored-by: mertmit <mertmit99@gmail.com>
pull/8557/head
Raju Udava 6 months ago committed by GitHub
parent
commit
6a334f7351
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 1
      packages/nocodb/src/interface/Jobs.ts
  2. 18
      packages/nocodb/src/models/Source.ts
  3. 5
      packages/nocodb/src/modules/jobs/fallback/jobs.service.ts
  4. 52
      packages/nocodb/src/modules/jobs/jobs.controller.ts
  5. 3
      packages/nocodb/src/modules/jobs/jobs.module.ts
  6. 91
      packages/nocodb/src/modules/jobs/redis/jobs-redis.service.ts
  7. 155
      packages/nocodb/src/modules/jobs/redis/jobs-redis.ts
  8. 40
      packages/nocodb/src/modules/jobs/redis/jobs.service.ts
  9. 33
      packages/nocodb/src/utils/common/NcConnectionMgrv2.ts

1
packages/nocodb/src/interface/Jobs.ts

@ -15,6 +15,7 @@ export enum JobTypes {
UpdateSrcStat = 'update-source-stat',
HealthCheck = 'health-check',
HandleWebhook = 'handle-webhook',
CleanUp = 'clean-up',
}
export enum JobStatus {

18
packages/nocodb/src/models/Source.ts

@ -21,6 +21,8 @@ import {
prepareForResponse,
stringifyMetaProp,
} from '~/utils/modelUtils';
import { JobsRedis } from '~/modules/jobs/redis/jobs-redis';
import { InstanceCommands } from '~/interface/Jobs';
// todo: hide credentials
export default class Source implements SourceType {
@ -182,6 +184,11 @@ export default class Source implements SourceType {
prepareForResponse(updateObj),
);
if (JobsRedis.available) {
await JobsRedis.emitWorkerCommand(InstanceCommands.RELEASE, sourceId);
await JobsRedis.emitPrimaryCommand(InstanceCommands.RELEASE, sourceId);
}
// call before reorder to update cache
const returnBase = await this.get(oldBase.id, false, ncMeta);
@ -351,6 +358,15 @@ export default class Source implements SourceType {
return Base.get(this.base_id, ncMeta);
}
async sourceCleanup(_ncMeta = Noco.ncMeta) {
await NcConnectionMgrv2.deleteAwait(this);
if (JobsRedis.available) {
await JobsRedis.emitWorkerCommand(InstanceCommands.RELEASE, this.id);
await JobsRedis.emitPrimaryCommand(InstanceCommands.RELEASE, this.id);
}
}
async delete(ncMeta = Noco.ncMeta, { force }: { force?: boolean } = {}) {
const sources = await Source.list({ baseId: this.base_id }, ncMeta);
@ -422,7 +438,7 @@ export default class Source implements SourceType {
await SyncSource.delete(syncSource.id, ncMeta);
}
await NcConnectionMgrv2.deleteAwait(this);
await this.sourceCleanup(ncMeta);
const res = await ncMeta.metaDelete(null, null, MetaTable.BASES, this.id);

5
packages/nocodb/src/modules/jobs/fallback/jobs.service.ts

@ -1,11 +1,14 @@
import { Injectable } from '@nestjs/common';
import type { OnModuleInit } from '@nestjs/common';
import { QueueService } from '~/modules/jobs/fallback/fallback-queue.service';
import { JobStatus } from '~/interface/Jobs';
@Injectable()
export class JobsService {
export class JobsService implements OnModuleInit {
constructor(private readonly fallbackQueueService: QueueService) {}
async onModuleInit() {}
async add(name: string, data: any) {
return this.fallbackQueueService.add(name, data);
}

52
packages/nocodb/src/modules/jobs/jobs.controller.ts

@ -11,10 +11,7 @@ import {
import { Request } from 'express';
import { OnEvent } from '@nestjs/event-emitter';
import { customAlphabet } from 'nanoid';
import { ModuleRef } from '@nestjs/core';
import { JobsRedisService } from './redis/jobs-redis.service';
import type { Response } from 'express';
import type { OnModuleInit } from '@nestjs/common';
import { JobStatus } from '~/interface/Jobs';
import { JobEvents } from '~/interface/Jobs';
import { GlobalGuard } from '~/guards/global/global.guard';
@ -22,26 +19,18 @@ import NocoCache from '~/cache/NocoCache';
import { CacheGetType, CacheScope } from '~/utils/globals';
import { MetaApiLimiterGuard } from '~/guards/meta-api-limiter.guard';
import { IJobsService } from '~/modules/jobs/jobs-service.interface';
import { JobsRedis } from '~/modules/jobs/redis/jobs-redis';
const nanoidv2 = customAlphabet('1234567890abcdefghijklmnopqrstuvwxyz', 14);
const POLLING_INTERVAL = 30000;
@Controller()
@UseGuards(MetaApiLimiterGuard, GlobalGuard)
export class JobsController implements OnModuleInit {
jobsRedisService: JobsRedisService;
export class JobsController {
constructor(
@Inject('JobsService') private readonly jobsService: IJobsService,
private moduleRef: ModuleRef,
) {}
onModuleInit() {
if (process.env.NC_REDIS_JOB_URL) {
this.jobsRedisService = this.moduleRef.get(JobsRedisService);
}
}
private jobRooms = {};
private localJobs = {};
private closedJobs = [];
@ -102,8 +91,8 @@ export class JobsController implements OnModuleInit {
listeners: [res],
};
// subscribe to job events
if (this.jobsRedisService) {
this.jobsRedisService.subscribe(jobId, (data) => {
if (JobsRedis.available) {
await JobsRedis.subscribe(jobId, async (data) => {
if (this.jobRooms[jobId]) {
this.jobRooms[jobId].listeners.forEach((res) => {
if (!res.headersSent) {
@ -121,7 +110,7 @@ export class JobsController implements OnModuleInit {
if (
[JobStatus.COMPLETED, JobStatus.FAILED].includes(data.status)
) {
this.jobsRedisService.unsubscribe(jobId);
await JobsRedis.unsubscribe(jobId);
delete this.jobRooms[jobId];
this.closedJobs.push(jobId);
setTimeout(() => {
@ -178,7 +167,11 @@ export class JobsController implements OnModuleInit {
}
@OnEvent(JobEvents.STATUS)
sendJobStatus(data: { id: string; status: JobStatus; data?: any }): void {
async sendJobStatus(data: {
id: string;
status: JobStatus;
data?: any;
}): Promise<void> {
let response;
const jobId = data.id;
@ -196,7 +189,7 @@ export class JobsController implements OnModuleInit {
this.localJobs[jobId].messages.shift();
}
NocoCache.set(`${CacheScope.JOBS}:${jobId}:messages`, {
await NocoCache.set(`${CacheScope.JOBS}:${jobId}:messages`, {
messages: this.localJobs[jobId].messages,
});
} else {
@ -211,7 +204,7 @@ export class JobsController implements OnModuleInit {
_mid: 1,
};
NocoCache.set(`${CacheScope.JOBS}:${jobId}:messages`, {
await NocoCache.set(`${CacheScope.JOBS}:${jobId}:messages`, {
messages: this.localJobs[jobId].messages,
});
}
@ -224,8 +217,8 @@ export class JobsController implements OnModuleInit {
});
}
if (this.jobsRedisService) {
this.jobsRedisService.publish(jobId, {
if (JobsRedis.available) {
await JobsRedis.publish(jobId, {
cmd: JobEvents.STATUS,
...data,
});
@ -237,16 +230,19 @@ export class JobsController implements OnModuleInit {
this.closedJobs = this.closedJobs.filter((j) => j !== jobId);
}, POLLING_INTERVAL * 2);
setTimeout(() => {
setTimeout(async () => {
delete this.jobRooms[jobId];
delete this.localJobs[jobId];
NocoCache.del(`${CacheScope.JOBS}:${jobId}:messages`);
await NocoCache.del(`${CacheScope.JOBS}:${jobId}:messages`);
}, POLLING_INTERVAL * 2);
}
}
@OnEvent(JobEvents.LOG)
sendJobLog(data: { id: string; data: { message: string } }): void {
async sendJobLog(data: {
id: string;
data: { message: string };
}): Promise<void> {
let response;
const jobId = data.id;
@ -265,7 +261,7 @@ export class JobsController implements OnModuleInit {
this.localJobs[jobId].messages.shift();
}
NocoCache.set(`${CacheScope.JOBS}:${jobId}:messages`, {
await NocoCache.set(`${CacheScope.JOBS}:${jobId}:messages`, {
messages: this.localJobs[jobId].messages,
});
} else {
@ -280,7 +276,7 @@ export class JobsController implements OnModuleInit {
_mid: 1,
};
NocoCache.set(`${CacheScope.JOBS}:${jobId}:messages`, {
await NocoCache.set(`${CacheScope.JOBS}:${jobId}:messages`, {
messages: this.localJobs[jobId].messages,
});
}
@ -293,8 +289,8 @@ export class JobsController implements OnModuleInit {
});
}
if (this.jobsRedisService) {
this.jobsRedisService.publish(jobId, {
if (JobsRedis.available) {
await JobsRedis.publish(jobId, {
cmd: JobEvents.LOG,
...data,
});

3
packages/nocodb/src/modules/jobs/jobs.module.ts

@ -22,7 +22,6 @@ import { JobsLogService } from '~/modules/jobs/jobs/jobs-log.service';
// import { JobsGateway } from '~/modules/jobs/jobs.gateway';
import { JobsController } from '~/modules/jobs/jobs.controller';
import { JobsService } from '~/modules/jobs/redis/jobs.service';
import { JobsRedisService } from '~/modules/jobs/redis/jobs-redis.service';
import { JobsEventService } from '~/modules/jobs/redis/jobs-event.service';
// Fallback
@ -60,7 +59,7 @@ export const JobsModuleMetadata = {
providers: [
...(process.env.NC_WORKER_CONTAINER !== 'true' ? [] : []),
...(process.env.NC_REDIS_JOB_URL
? [JobsRedisService, JobsEventService]
? [JobsEventService]
: [FallbackQueueService, FallbackJobsEventService]),
{
provide: 'JobsService',

91
packages/nocodb/src/modules/jobs/redis/jobs-redis.service.ts

@ -1,91 +0,0 @@
import { Injectable, Logger } from '@nestjs/common';
import Redis from 'ioredis';
import { InstanceTypes } from '~/interface/Jobs';
@Injectable()
export class JobsRedisService {
protected logger = new Logger(JobsRedisService.name);
private redisClient: Redis;
private redisSubscriber: Redis;
private unsubscribeCallbacks: { [key: string]: () => void } = {};
public primaryCallbacks: { [key: string]: (...args) => void } = {};
public workerCallbacks: { [key: string]: (...args) => void } = {};
constructor() {
this.redisClient = new Redis(process.env.NC_REDIS_JOB_URL);
this.redisSubscriber = new Redis(process.env.NC_REDIS_JOB_URL);
if (process.env.NC_WORKER_CONTAINER === 'true') {
this.redisSubscriber.subscribe(InstanceTypes.WORKER);
} else {
this.redisSubscriber.subscribe(InstanceTypes.PRIMARY);
}
const onMessage = (channel, message) => {
const args = message.split(':');
const command = args.shift();
if (channel === InstanceTypes.WORKER) {
this.workerCallbacks[command] && this.workerCallbacks[command](...args);
} else if (channel === InstanceTypes.PRIMARY) {
this.primaryCallbacks[command] &&
this.primaryCallbacks[command](...args);
}
};
this.redisSubscriber.on('message', onMessage);
}
publish(channel: string, message: string | any) {
if (typeof message === 'string') {
this.redisClient.publish(channel, message);
} else {
try {
this.redisClient.publish(channel, JSON.stringify(message));
} catch (e) {
this.logger.error(e);
}
}
}
subscribe(channel: string, callback: (message: any) => void) {
this.redisSubscriber.subscribe(channel);
const onMessage = (_channel, message) => {
try {
message = JSON.parse(message);
} catch (e) {}
callback(message);
};
this.redisSubscriber.on('message', onMessage);
this.unsubscribeCallbacks[channel] = () => {
this.redisSubscriber.unsubscribe(channel);
this.redisSubscriber.off('message', onMessage);
};
}
unsubscribe(channel: string) {
if (this.unsubscribeCallbacks[channel]) {
this.unsubscribeCallbacks[channel]();
delete this.unsubscribeCallbacks[channel];
}
}
workerCount(): Promise<number> {
return new Promise((resolve, reject) => {
this.redisClient.publish(
InstanceTypes.WORKER,
'count',
(error, numberOfSubscribers) => {
if (error) {
reject(0);
} else {
resolve(numberOfSubscribers);
}
},
);
});
}
}

155
packages/nocodb/src/modules/jobs/redis/jobs-redis.ts

@ -0,0 +1,155 @@
import { Logger } from '@nestjs/common';
import Redis from 'ioredis';
import type { InstanceCommands } from '~/interface/Jobs';
import { InstanceTypes } from '~/interface/Jobs';
export class JobsRedis {
private static initialized = false;
public static available = process.env.NC_REDIS_JOB_URL ? true : false;
protected static logger = new Logger(JobsRedis.name);
private static redisClient: Redis;
private static redisSubscriber: Redis;
private static unsubscribeCallbacks: { [key: string]: () => Promise<void> } =
{};
public static primaryCallbacks: {
[key: string]: (...args) => Promise<void>;
} = {};
public static workerCallbacks: { [key: string]: (...args) => Promise<void> } =
{};
static async init() {
if (this.initialized) {
return;
}
if (!JobsRedis.available) {
return;
}
this.initialized = true;
this.redisClient = new Redis(process.env.NC_REDIS_JOB_URL);
this.redisSubscriber = new Redis(process.env.NC_REDIS_JOB_URL);
if (process.env.NC_WORKER_CONTAINER === 'true') {
await this.redisSubscriber.subscribe(InstanceTypes.WORKER);
} else {
await this.redisSubscriber.subscribe(InstanceTypes.PRIMARY);
}
const onMessage = async (channel, message) => {
const args = message.split(':');
const command = args.shift();
if (channel === InstanceTypes.WORKER) {
this.workerCallbacks[command] &&
(await this.workerCallbacks[command](...args));
} else if (channel === InstanceTypes.PRIMARY) {
this.primaryCallbacks[command] &&
(await this.primaryCallbacks[command](...args));
}
};
this.redisSubscriber.on('message', onMessage);
}
static async publish(channel: string, message: string | any) {
if (!this.initialized) {
if (!JobsRedis.available) {
return;
}
await this.init();
}
if (typeof message === 'string') {
await this.redisClient.publish(channel, message);
} else {
try {
await this.redisClient.publish(channel, JSON.stringify(message));
} catch (e) {
this.logger.error(e);
}
}
}
static async subscribe(
channel: string,
callback: (message: any) => Promise<void>,
) {
if (!this.initialized) {
if (!JobsRedis.available) {
return;
}
await this.init();
}
await this.redisSubscriber.subscribe(channel);
const onMessage = async (_channel, message) => {
try {
message = JSON.parse(message);
} catch (e) {}
await callback(message);
};
this.redisSubscriber.on('message', onMessage);
this.unsubscribeCallbacks[channel] = async () => {
await this.redisSubscriber.unsubscribe(channel);
this.redisSubscriber.off('message', onMessage);
};
}
static async unsubscribe(channel: string) {
if (!this.initialized) {
if (!JobsRedis.available) {
return;
}
await this.init();
}
if (this.unsubscribeCallbacks[channel]) {
await this.unsubscribeCallbacks[channel]();
delete this.unsubscribeCallbacks[channel];
}
}
static async workerCount(): Promise<number> {
if (!this.initialized) {
if (!JobsRedis.available) {
return;
}
await this.init();
}
return new Promise((resolve, reject) => {
this.redisClient.publish(
InstanceTypes.WORKER,
'count',
(error, numberOfSubscribers) => {
if (error) {
reject(0);
} else {
resolve(numberOfSubscribers);
}
},
);
});
}
static async emitWorkerCommand(command: InstanceCommands, ...args: any[]) {
const data = `${command}${args.length ? `:${args.join(':')}` : ''}`;
await JobsRedis.publish(InstanceTypes.WORKER, data);
}
static async emitPrimaryCommand(command: InstanceCommands, ...args: any[]) {
const data = `${command}${args.length ? `:${args.join(':')}` : ''}`;
await JobsRedis.publish(InstanceTypes.PRIMARY, data);
}
}

40
packages/nocodb/src/modules/jobs/redis/jobs.service.ts

@ -2,37 +2,27 @@ import { InjectQueue } from '@nestjs/bull';
import { Injectable, Logger } from '@nestjs/common';
import { Queue } from 'bull';
import type { OnModuleInit } from '@nestjs/common';
import {
InstanceCommands,
InstanceTypes,
JOBS_QUEUE,
JobStatus,
} from '~/interface/Jobs';
import { JobsRedisService } from '~/modules/jobs/redis/jobs-redis.service';
import { InstanceCommands, JOBS_QUEUE, JobStatus } from '~/interface/Jobs';
import { JobsRedis } from '~/modules/jobs/redis/jobs-redis';
@Injectable()
export class JobsService implements OnModuleInit {
protected logger = new Logger(JobsService.name);
constructor(
@InjectQueue(JOBS_QUEUE) public readonly jobsQueue: Queue,
protected readonly jobsRedisService: JobsRedisService,
) {}
constructor(@InjectQueue(JOBS_QUEUE) public readonly jobsQueue: Queue) {}
// pause primary instance queue
async onModuleInit() {
await this.toggleQueue();
this.jobsRedisService.workerCallbacks[InstanceCommands.RESUME_LOCAL] =
async () => {
this.logger.log('Resuming local queue');
await this.jobsQueue.resume(true);
};
this.jobsRedisService.workerCallbacks[InstanceCommands.PAUSE_LOCAL] =
async () => {
this.logger.log('Pausing local queue');
await this.jobsQueue.pause(true);
};
JobsRedis.workerCallbacks[InstanceCommands.RESUME_LOCAL] = async () => {
this.logger.log('Resuming local queue');
await this.jobsQueue.resume(true);
};
JobsRedis.workerCallbacks[InstanceCommands.PAUSE_LOCAL] = async () => {
this.logger.log('Pausing local queue');
await this.jobsQueue.pause(true);
};
}
async toggleQueue() {
@ -40,7 +30,7 @@ export class JobsService implements OnModuleInit {
await this.jobsQueue.pause(true);
} else if (process.env.NC_WORKER_CONTAINER !== 'true') {
// resume primary instance queue if there is no worker
const workerCount = await this.jobsRedisService.workerCount();
const workerCount = await JobsRedis.workerCount();
const localWorkerPaused = await this.jobsQueue.isPaused(true);
// if there is no worker and primary instance queue is paused, resume it
@ -112,12 +102,10 @@ export class JobsService implements OnModuleInit {
}
async emitWorkerCommand(command: InstanceCommands, ...args: any[]) {
const data = `${command}${args.length ? `:${args.join(':')}` : ''}`;
await this.jobsRedisService.publish(InstanceTypes.WORKER, data);
return JobsRedis.emitWorkerCommand(command, ...args);
}
async emitPrimaryCommand(command: InstanceCommands, ...args: any[]) {
const data = `${command}${args.length ? `:${args.join(':')}` : ''}`;
await this.jobsRedisService.publish(InstanceTypes.PRIMARY, data);
return JobsRedis.emitPrimaryCommand(command, ...args);
}
}

33
packages/nocodb/src/utils/common/NcConnectionMgrv2.ts

@ -1,3 +1,4 @@
import { Logger } from '@nestjs/common';
import type Source from '~/models/Source';
import {
defaultConnectionConfig,
@ -8,6 +9,8 @@ import { XKnex } from '~/db/CustomKnex';
import Noco from '~/Noco';
export default class NcConnectionMgrv2 {
private static logger = new Logger('NcConnectionMgrv2');
protected static connectionRefs: {
[baseId: string]: {
[sourceId: string]: XKnex;
@ -22,31 +25,39 @@ export default class NcConnectionMgrv2 {
}
}
// Todo: Should await on connection destroy
public static delete(source: Source) {
public static async deleteAwait(source: Source) {
// todo: ignore meta bases
if (this.connectionRefs?.[source.base_id]?.[source.id]) {
try {
const conn = this.connectionRefs?.[source.base_id]?.[source.id];
conn.destroy();
await conn.destroy();
delete this.connectionRefs?.[source.base_id][source.id];
} catch (e) {
console.log(e);
this.logger.error({
error: e,
details: 'Error deleting connection ref',
});
}
}
}
public static async deleteAwait(source: Source) {
// todo: ignore meta bases
if (this.connectionRefs?.[source.base_id]?.[source.id]) {
public static async deleteConnectionRef(sourceId: string) {
let deleted = false;
for (const baseId in this.connectionRefs) {
try {
const conn = this.connectionRefs?.[source.base_id]?.[source.id];
await conn.destroy();
delete this.connectionRefs?.[source.base_id][source.id];
if (this.connectionRefs[baseId][sourceId]) {
await this.connectionRefs[baseId][sourceId].destroy();
delete this.connectionRefs[baseId][sourceId];
deleted = true;
}
} catch (e) {
console.log(e);
this.logger.error({
error: e,
details: 'Error deleting connection ref',
});
}
}
return deleted;
}
public static async get(source: Source): Promise<XKnex> {

Loading…
Cancel
Save