Browse Source

feat: fallback job queue

Signed-off-by: mertmit <mertmit99@gmail.com>
feat/export-nest
mertmit 2 years ago
parent
commit
5f17a34a82
  1. 3
      packages/nc-gui/pages/index/index/index.vue
  2. 12
      packages/nc-gui/plugins/events.ts
  3. 67
      packages/nocodb-nest/package-lock.json
  4. 1
      packages/nocodb-nest/package.json
  5. 9
      packages/nocodb-nest/src/app.module.ts
  6. 15
      packages/nocodb-nest/src/modules/jobs/export-import/duplicate.controller.ts
  7. 14
      packages/nocodb-nest/src/modules/jobs/export-import/duplicate.processor.ts
  8. 1
      packages/nocodb-nest/src/modules/jobs/export-import/export.service.ts
  9. 102
      packages/nocodb-nest/src/modules/jobs/fallback-queue.service.ts
  10. 29
      packages/nocodb-nest/src/modules/jobs/jobs.gateway.ts
  11. 4
      packages/nocodb-nest/src/modules/jobs/jobs.module.ts
  12. 31
      packages/nocodb-nest/src/modules/jobs/jobs.service.ts
  13. 4
      packages/nocodb-nest/src/schema/swagger.json
  14. 8
      packages/nocodb-sdk/src/lib/Api.ts

3
packages/nc-gui/pages/index/index/index.vue

@ -85,7 +85,8 @@ const duplicateProject = (project: ProjectType) => {
try {
const jobData = await api.project.duplicate(project.id as string)
$events.subscribe(jobData.type, jobData.id, async (data: { status: string }) => {
$events.subscribe(jobData.name, jobData.id, async (data: { status: string }) => {
console.log('dataCB', data)
if (data.status === 'completed' || data.status === 'refresh') {
await loadProjects()
} else if (data.status === 'failed') {

12
packages/nc-gui/plugins/events.ts

@ -29,11 +29,11 @@ export default defineNuxtPlugin(async (nuxtApp) => {
}
const events = {
subscribe(type: string, id: string, cb: (data: any) => void) {
subscribe(name: string, id: string, cb: (data: any) => void) {
if (socket) {
socket.emit('subscribe', { type, id })
socket.emit('subscribe', { name, id })
const tempFn = (data: any) => {
if (data.id === id && data.type === type) {
if (data.id === id && data.name === name) {
cb(data)
if (data.status === 'completed' || data.status === 'failed') {
socket?.off('status', tempFn)
@ -43,12 +43,12 @@ export default defineNuxtPlugin(async (nuxtApp) => {
socket.on('status', tempFn)
}
},
getStatus(type: string, id: string): Promise<string> {
getStatus(name: string, id: string): Promise<string> {
return new Promise((resolve) => {
if (socket) {
socket.emit('status', { type, id })
socket.emit('status', { name, id })
const tempFn = (data: any) => {
if (data.id === id && data.type === type) {
if (data.id === id && data.name === name) {
resolve(data.status)
socket?.off('status', tempFn)
}

67
packages/nocodb-nest/package-lock.json generated

@ -86,6 +86,7 @@
"nodemailer": "^6.4.10",
"object-hash": "^3.0.0",
"os-locale": "^6.0.2",
"p-queue": "^6.6.2",
"papaparse": "^5.3.1",
"parse-database-url": "^0.3.0",
"passport": "^0.4.1",
@ -8160,6 +8161,11 @@
"node": ">=6"
}
},
"node_modules/eventemitter3": {
"version": "4.0.7",
"resolved": "https://registry.npmjs.org/eventemitter3/-/eventemitter3-4.0.7.tgz",
"integrity": "sha512-8guHBZCwKnFhYdHr2ysuRWErTwhoN2X8XELRlrRwpmfeY2jjuUN4taQMsULKUVo1K4DvZl+0pgfyoysHxvmvEw=="
},
"node_modules/events": {
"version": "3.3.0",
"resolved": "https://registry.npmjs.org/events/-/events-3.3.0.tgz",
@ -13734,6 +13740,14 @@
"node": ">=0.10.0"
}
},
"node_modules/p-finally": {
"version": "1.0.0",
"resolved": "https://registry.npmjs.org/p-finally/-/p-finally-1.0.0.tgz",
"integrity": "sha512-LICb2p9CB7FS+0eR1oqWnHhp0FljGLZCWBE9aix0Uye9W8LTQPwMTYVGWQWIw9RdQiDg4+epXQODwIYJtSJaow==",
"engines": {
"node": ">=4"
}
},
"node_modules/p-limit": {
"version": "3.1.0",
"resolved": "https://registry.npmjs.org/p-limit/-/p-limit-3.1.0.tgz",
@ -13778,6 +13792,32 @@
"url": "https://github.com/sponsors/sindresorhus"
}
},
"node_modules/p-queue": {
"version": "6.6.2",
"resolved": "https://registry.npmjs.org/p-queue/-/p-queue-6.6.2.tgz",
"integrity": "sha512-RwFpb72c/BhQLEXIZ5K2e+AhgNVmIejGlTgiB9MzZ0e93GRvqZ7uSi0dvRF7/XIXDeNkra2fNHBxTyPDGySpjQ==",
"dependencies": {
"eventemitter3": "^4.0.4",
"p-timeout": "^3.2.0"
},
"engines": {
"node": ">=8"
},
"funding": {
"url": "https://github.com/sponsors/sindresorhus"
}
},
"node_modules/p-timeout": {
"version": "3.2.0",
"resolved": "https://registry.npmjs.org/p-timeout/-/p-timeout-3.2.0.tgz",
"integrity": "sha512-rhIwUycgwwKcP9yTOOFK/AKsAopjjCakVqLHePO3CC6Mir1Z99xT+R63jZxAT5lFZLa2inS5h+ZS2GvR99/FBg==",
"dependencies": {
"p-finally": "^1.0.0"
},
"engines": {
"node": ">=8"
}
},
"node_modules/p-try": {
"version": "2.2.0",
"resolved": "https://registry.npmjs.org/p-try/-/p-try-2.2.0.tgz",
@ -24540,6 +24580,11 @@
"resolved": "https://registry.npmjs.org/event-target-shim/-/event-target-shim-5.0.1.tgz",
"integrity": "sha512-i/2XbnSz/uxRCU6+NdVJgKWDTM427+MqYbkQzD321DuCQJUqOuJKIA0IM2+W2xtYHdKOmZ4dR6fExsd4SXL+WQ=="
},
"eventemitter3": {
"version": "4.0.7",
"resolved": "https://registry.npmjs.org/eventemitter3/-/eventemitter3-4.0.7.tgz",
"integrity": "sha512-8guHBZCwKnFhYdHr2ysuRWErTwhoN2X8XELRlrRwpmfeY2jjuUN4taQMsULKUVo1K4DvZl+0pgfyoysHxvmvEw=="
},
"events": {
"version": "3.3.0",
"resolved": "https://registry.npmjs.org/events/-/events-3.3.0.tgz",
@ -28821,6 +28866,11 @@
"minimist": "^1.1.0"
}
},
"p-finally": {
"version": "1.0.0",
"resolved": "https://registry.npmjs.org/p-finally/-/p-finally-1.0.0.tgz",
"integrity": "sha512-LICb2p9CB7FS+0eR1oqWnHhp0FljGLZCWBE9aix0Uye9W8LTQPwMTYVGWQWIw9RdQiDg4+epXQODwIYJtSJaow=="
},
"p-limit": {
"version": "3.1.0",
"resolved": "https://registry.npmjs.org/p-limit/-/p-limit-3.1.0.tgz",
@ -28847,6 +28897,23 @@
"aggregate-error": "^3.0.0"
}
},
"p-queue": {
"version": "6.6.2",
"resolved": "https://registry.npmjs.org/p-queue/-/p-queue-6.6.2.tgz",
"integrity": "sha512-RwFpb72c/BhQLEXIZ5K2e+AhgNVmIejGlTgiB9MzZ0e93GRvqZ7uSi0dvRF7/XIXDeNkra2fNHBxTyPDGySpjQ==",
"requires": {
"eventemitter3": "^4.0.4",
"p-timeout": "^3.2.0"
}
},
"p-timeout": {
"version": "3.2.0",
"resolved": "https://registry.npmjs.org/p-timeout/-/p-timeout-3.2.0.tgz",
"integrity": "sha512-rhIwUycgwwKcP9yTOOFK/AKsAopjjCakVqLHePO3CC6Mir1Z99xT+R63jZxAT5lFZLa2inS5h+ZS2GvR99/FBg==",
"requires": {
"p-finally": "^1.0.0"
}
},
"p-try": {
"version": "2.2.0",
"resolved": "https://registry.npmjs.org/p-try/-/p-try-2.2.0.tgz",

1
packages/nocodb-nest/package.json

@ -116,6 +116,7 @@
"nodemailer": "^6.4.10",
"object-hash": "^3.0.0",
"os-locale": "^6.0.2",
"p-queue": "^6.6.2",
"papaparse": "^5.3.1",
"parse-database-url": "^0.3.0",
"passport": "^0.4.1",

9
packages/nocodb-nest/src/app.module.ts

@ -35,12 +35,13 @@ import type {
MetasModule,
DatasModule,
JobsModule,
...(process.env['NC_REDIS_URL']
? [
BullModule.forRoot({
redis: {
host: 'localhost',
port: 6379,
},
redis: process.env.NC_REDIS_URL,
}),
]
: []),
],
controllers: [],
providers: [

15
packages/nocodb-nest/src/modules/jobs/export-import/duplicate.controller.ts

@ -14,13 +14,20 @@ import {
Acl,
ExtractProjectIdMiddleware,
} from 'src/middlewares/extract-project-id/extract-project-id.middleware';
import { QueueService } from '../fallback-queue.service';
@Controller()
@UseGuards(ExtractProjectIdMiddleware, GlobalGuard)
export class DuplicateController {
activeQueue;
constructor(
@InjectQueue('duplicate') private readonly duplicateQueue: Queue,
) {}
@InjectQueue('jobs') private readonly jobsQueue: Queue,
private readonly fallbackQueueService: QueueService,
) {
this.activeQueue = process.env.NC_REDIS_URL
? this.jobsQueue
: this.fallbackQueueService;
}
@Post('/api/v1/db/meta/duplicate/:projectId/:baseId?')
@HttpCode(200)
@ -30,7 +37,7 @@ export class DuplicateController {
@Param('projectId') projectId: string,
@Param('baseId') baseId?: string,
) {
const job = await this.duplicateQueue.add('duplicate', {
const job = await this.activeQueue.add('duplicate', {
projectId,
baseId,
req: {
@ -38,6 +45,6 @@ export class DuplicateController {
clientIp: req.clientIp,
},
});
return { id: job.id, type: job.name };
return { id: job.id, name: job.name };
}
}

14
packages/nocodb-nest/src/modules/jobs/export-import/duplicate.processor.ts

@ -17,6 +17,7 @@ import {
} from 'src/helpers/exportImportHelpers';
import { BulkDataAliasService } from 'src/services/bulk-data-alias.service';
import { UITypes } from 'nocodb-sdk';
import { forwardRef, Inject } from '@nestjs/common';
import { JobsGateway } from '../jobs.gateway';
import { ExportService } from './export.service';
import { ImportService } from './import.service';
@ -24,20 +25,21 @@ import type { LinkToAnotherRecordColumn } from 'src/models';
const DEBUG = false;
@Processor('duplicate')
@Processor('jobs')
export class DuplicateProcessor {
constructor(
private readonly exportService: ExportService,
private readonly importService: ImportService,
private readonly projectsService: ProjectsService,
private readonly bulkDataService: BulkDataAliasService,
@Inject(forwardRef(() => JobsGateway))
private readonly jobsGateway: JobsGateway,
) {}
@OnQueueActive()
onActive(job: Job) {
this.jobsGateway.jobStatus({
type: job.name,
name: job.name,
id: job.id.toString(),
status: 'active',
});
@ -47,7 +49,7 @@ export class DuplicateProcessor {
onFailed(job: Job, error: Error) {
console.error(
boxen(
`---- !! JOB FAILED !! ----\ntype: ${job.name}\nid:${job.id}\nerror:${error.name} (${error.message})\n\nstack: ${error.stack}`,
`---- !! JOB FAILED !! ----\nname: ${job.name}\nid:${job.id}\nerror:${error.name} (${error.message})\n\nstack: ${error.stack}`,
{
padding: 1,
borderStyle: 'double',
@ -57,7 +59,7 @@ export class DuplicateProcessor {
);
this.jobsGateway.jobStatus({
type: job.name,
name: job.name,
id: job.id.toString(),
status: 'failed',
});
@ -66,7 +68,7 @@ export class DuplicateProcessor {
@OnQueueCompleted()
onCompleted(job: Job) {
this.jobsGateway.jobStatus({
type: job.name,
name: job.name,
id: job.id.toString(),
status: 'completed',
});
@ -134,7 +136,7 @@ export class DuplicateProcessor {
});
this.jobsGateway.jobStatus({
type: job.name,
name: job.name,
id: job.id.toString(),
status: 'refresh',
});

1
packages/nocodb-nest/src/modules/jobs/export-import/export.service.ts

@ -12,7 +12,6 @@ import { Base, Model, Project } from 'src/models';
import { DatasService } from 'src/services/datas.service';
import { Injectable } from '@nestjs/common';
import type { LinkToAnotherRecordColumn, View } from 'src/models';
import type { IStorageAdapterV2 } from 'nc-plugin';
@Injectable()
export class ExportService {

102
packages/nocodb-nest/src/modules/jobs/fallback-queue.service.ts

@ -0,0 +1,102 @@
import { Injectable } from '@nestjs/common';
import PQueue from 'p-queue';
import Emittery from 'emittery';
import { DuplicateProcessor } from './export-import/duplicate.processor';
interface Job {
id: string;
name: string;
status: string;
data: any;
}
@Injectable()
export class QueueService {
static queue = new PQueue({ concurrency: 1 });
static queueIndex = 1;
static processed = 0;
static queueMemory: Job[] = [];
static emitter = new Emittery();
constructor(private readonly duplicateProcessor: DuplicateProcessor) {
this.emitter.on('active', (data: any) => {
const job = this.queueMemory.find(
(job) => job.id === data.id && job.name === data.name,
);
job.status = 'active';
this.duplicateProcessor.onActive.apply(this.duplicateProcessor, [
job as any,
]);
});
this.emitter.on('completed', (data: any) => {
const job = this.queueMemory.find(
(job) => job.id === data.id && job.name === data.name,
);
job.status = 'completed';
this.duplicateProcessor.onCompleted.apply(this.duplicateProcessor, [
data as any,
]);
});
this.emitter.on('failed', (data: { job: Job; error: Error }) => {
const job = this.queueMemory.find(
(job) => job.id === data.job.id && job.name === data.job.name,
);
job.status = 'failed';
this.duplicateProcessor.onFailed.apply(this.duplicateProcessor, [
data.job as any,
data.error,
]);
});
}
jobMap = {
duplicate: this.duplicateProcessor.duplicateBase,
};
async jobWrapper(job: Job) {
this.emitter.emit('active', job);
try {
await this.jobMap[job.name].apply(this.duplicateProcessor, [job]);
this.emitter.emit('completed', job);
} catch (error) {
this.emitter.emit('failed', { job, error });
}
}
get emitter() {
return QueueService.emitter;
}
get queue() {
return QueueService.queue;
}
get queueMemory() {
return QueueService.queueMemory;
}
get queueIndex() {
return QueueService.queueIndex;
}
set queueIndex(index: number) {
QueueService.queueIndex = index;
}
async add(name: string, data: any) {
const id = `${this.queueIndex++}`;
const job = { id: `${id}`, name, status: 'waiting', data };
this.queueMemory.push(job);
this.queue.add(() => this.jobWrapper(job));
return { id, name };
}
async getJobs(types: string[] | string) {
types = Array.isArray(types) ? types : [types];
return this.queueMemory.filter((q) => types.includes(q.status));
}
async getJob(id: string) {
return this.queueMemory.find((q) => q.id === id);
}
}

29
packages/nocodb-nest/src/modules/jobs/jobs.gateway.ts

@ -6,7 +6,7 @@ import {
WebSocketServer,
} from '@nestjs/websockets';
import { Server, Socket } from 'socket.io';
import { Injectable } from '@nestjs/common';
import { forwardRef, Inject, Injectable } from '@nestjs/common';
import { ExecutionContextHost } from '@nestjs/core/helpers/execution-context-host';
import { AuthGuard } from '@nestjs/passport';
import { JobsService } from './jobs.service';
@ -22,7 +22,10 @@ import type { OnModuleInit } from '@nestjs/common';
})
@Injectable()
export class JobsGateway implements OnModuleInit {
constructor(private readonly jobsService: JobsService) {}
constructor(
@Inject(forwardRef(() => JobsService))
private readonly jobsService: JobsService,
) {}
@WebSocketServer()
server: Server;
@ -41,30 +44,32 @@ export class JobsGateway implements OnModuleInit {
@SubscribeMessage('subscribe')
async subscribe(
@MessageBody() data: { type: string; id: string },
@MessageBody() data: { name: string; id: string },
@ConnectedSocket() client: Socket,
): Promise<void> {
const rooms = await this.jobsService.jobList(data.type);
const room = rooms.find((r) => r.id === data.id);
const rooms = (await this.jobsService.jobList(data.name)).map(
(j) => `${j.name}-${j.id}`,
);
const room = rooms.find((r) => r === `${data.name}-${data.id}`);
if (room) {
client.join(data.id);
client.join(`${data.name}-${data.id}`);
}
}
@SubscribeMessage('status')
async status(
@MessageBody() data: { type: string; id: string },
@MessageBody() data: { name: string; id: string },
@ConnectedSocket() client: Socket,
): Promise<void> {
client.emit('status', {
id: data.id,
type: data.type,
status: await this.jobsService.jobStatus(data.type, data.id),
name: data.name,
status: await this.jobsService.jobStatus(data.id),
});
}
async jobStatus(data: {
type: string;
name: string;
id: string;
status:
| 'completed'
@ -75,9 +80,9 @@ export class JobsGateway implements OnModuleInit {
| 'paused'
| 'refresh';
}): Promise<void> {
this.server.to(data.id).emit('status', {
this.server.to(`${data.name}-${data.id}`).emit('status', {
id: data.id,
type: data.type,
name: data.name,
status: data.status,
});
}

4
packages/nocodb-nest/src/modules/jobs/jobs.module.ts

@ -9,6 +9,7 @@ import { ImportService } from './export-import/import.service';
import { DuplicateController } from './export-import/duplicate.controller';
import { DuplicateProcessor } from './export-import/duplicate.processor';
import { JobsGateway } from './jobs.gateway';
import { QueueService } from './fallback-queue.service';
@Module({
imports: [
@ -16,11 +17,12 @@ import { JobsGateway } from './jobs.gateway';
DatasModule,
MetasModule,
BullModule.registerQueue({
name: 'duplicate',
name: 'jobs',
}),
],
controllers: [DuplicateController],
providers: [
QueueService,
JobsGateway,
JobsService,
DuplicateProcessor,

31
packages/nocodb-nest/src/modules/jobs/jobs.service.ts

@ -1,28 +1,27 @@
import { InjectQueue } from '@nestjs/bull';
import { Injectable } from '@nestjs/common';
import { Queue } from 'bull';
import { QueueService } from './fallback-queue.service';
@Injectable()
export class JobsService {
constructor(@InjectQueue('duplicate') private duplicateQueue: Queue) {}
async jobStatus(jobType: string, jobId: string) {
switch (jobType) {
case 'duplicate':
default:
return await (await this.duplicateQueue.getJob(jobId)).getState();
activeQueue;
constructor(
@InjectQueue('jobs') private readonly jobsQueue: Queue,
private readonly fallbackQueueService: QueueService,
) {
this.activeQueue = process.env.NC_REDIS_URL
? this.jobsQueue
: this.fallbackQueueService;
}
async jobStatus(jobId: string) {
return await (await this.activeQueue.getJob(jobId)).getState();
}
async jobList(jobType: string) {
switch (jobType) {
case 'duplicate':
default:
return await this.duplicateQueue.getJobs([
'active',
'waiting',
'delayed',
]);
}
return (
await this.activeQueue.getJobs(['active', 'waiting', 'delayed'])
).filter((j) => j.name === jobType);
}
}

4
packages/nocodb-nest/src/schema/swagger.json

@ -2112,7 +2112,7 @@
"schema": {
"type": "object",
"properties": {
"type": {
"name": {
"type": "string"
},
"id": {
@ -2170,7 +2170,7 @@
"schema": {
"type": "object",
"properties": {
"type": {
"name": {
"type": "string"
},
"id": {

8
packages/nocodb-sdk/src/lib/Api.ts

@ -4019,7 +4019,7 @@ export class Api<
* @summary Duplicate Project Base
* @request POST:/api/v1/db/meta/duplicate/{projectId}/{baseId}
* @response `200` `{
type?: string,
name?: string,
id?: string,
}` OK
@ -4036,7 +4036,7 @@ export class Api<
) =>
this.request<
{
type?: string;
name?: string;
id?: string;
},
{
@ -4058,7 +4058,7 @@ export class Api<
* @summary Duplicate Project
* @request POST:/api/v1/db/meta/duplicate/{projectId}
* @response `200` `{
type?: string,
name?: string,
id?: string,
}` OK
@ -4071,7 +4071,7 @@ export class Api<
duplicate: (projectId: IdType, params: RequestParams = {}) =>
this.request<
{
type?: string;
name?: string;
id?: string;
},
{

Loading…
Cancel
Save