Browse Source

feat: at import background job

Signed-off-by: mertmit <mertmit99@gmail.com>
feat/export-nest
mertmit 2 years ago
parent
commit
7dd332c847
  1. 111
      packages/nc-gui/components/dlg/AirtableImport.vue
  2. 2
      packages/nc-gui/pages/index/index/index.vue
  3. 56
      packages/nc-gui/plugins/jobs.ts
  4. 6
      packages/nocodb-nest/src/controllers/imports/helpers/NocoSyncDestAdapter.ts
  5. 7
      packages/nocodb-nest/src/controllers/imports/helpers/NocoSyncSourceAdapter.ts
  6. 2480
      packages/nocodb-nest/src/controllers/imports/helpers/job.ts
  7. 21
      packages/nocodb-nest/src/controllers/imports/import.controller.spec.ts
  8. 148
      packages/nocodb-nest/src/controllers/imports/import.controller.ts
  9. 76
      packages/nocodb-nest/src/modules/jobs/at-import/at-import.controller.ts
  10. 2515
      packages/nocodb-nest/src/modules/jobs/at-import/at-import.processor.ts
  11. 0
      packages/nocodb-nest/src/modules/jobs/at-import/helpers/EntityMap.ts
  12. 0
      packages/nocodb-nest/src/modules/jobs/at-import/helpers/fetchAT.ts
  13. 5
      packages/nocodb-nest/src/modules/jobs/at-import/helpers/readAndProcessData.ts
  14. 0
      packages/nocodb-nest/src/modules/jobs/at-import/helpers/syncMap.ts
  15. 6
      packages/nocodb-nest/src/modules/jobs/fallback-queue.service.ts
  16. 1
      packages/nocodb-nest/src/modules/jobs/jobs-event.service.ts
  17. 22
      packages/nocodb-nest/src/modules/jobs/jobs.gateway.ts
  18. 5
      packages/nocodb-nest/src/modules/jobs/jobs.module.ts
  19. 6
      packages/nocodb-nest/src/modules/jobs/jobs.service.ts
  20. 4
      packages/nocodb-nest/src/modules/metas/metas.module.ts
  21. 12
      packages/nocodb-nest/src/services/socket.gateway.ts

111
packages/nc-gui/components/dlg/AirtableImport.vue

@ -1,6 +1,4 @@
<script setup lang="ts"> <script setup lang="ts">
import type { Socket } from 'socket.io-client'
import io from 'socket.io-client'
import type { Card as AntCard } from 'ant-design-vue' import type { Card as AntCard } from 'ant-design-vue'
import { import {
Form, Form,
@ -10,7 +8,6 @@ import {
iconMap, iconMap,
message, message,
nextTick, nextTick,
onBeforeUnmount,
onMounted, onMounted,
ref, ref,
storeToRefs, storeToRefs,
@ -31,7 +28,7 @@ const { appInfo } = $(useGlobal())
const baseURL = appInfo.ncSiteUrl const baseURL = appInfo.ncSiteUrl
const { $state } = useNuxtApp() const { $state, $jobs } = useNuxtApp()
const projectStore = useProject() const projectStore = useProject()
@ -49,8 +46,6 @@ const logRef = ref<typeof AntCard>()
const enableAbort = ref(false) const enableAbort = ref(false)
let socket: Socket | null
const syncSource = ref({ const syncSource = ref({
id: '', id: '',
type: 'Airtable', type: 'Airtable',
@ -72,6 +67,35 @@ const syncSource = ref({
}, },
}) })
const pushProgress = async (message: string, status: 'completed' | 'failed' | 'progress') => {
progress.value.push({ msg: message, status })
await nextTick(() => {
const container: HTMLDivElement = logRef.value?.$el?.firstElementChild
if (!container) return
container.scrollTop = container.scrollHeight
})
}
const onSubscribe = () => {
step.value = 2
}
const onStatus = async (status: 'active' | 'completed' | 'failed' | 'refresh', error?: any) => {
if (status === 'completed') {
showGoToDashboardButton.value = true
await loadTables()
pushProgress('Done!', status)
// TODO: add tab of the first table
} else if (status === 'failed') {
pushProgress(error, status)
}
}
const onLog = (data: { message: string }) => {
pushProgress(data.message, 'progress')
}
const validators = computed(() => ({ const validators = computed(() => ({
'details.apiKey': [fieldRequiredValidator()], 'details.apiKey': [fieldRequiredValidator()],
'details.syncSourceUrlOrId': [fieldRequiredValidator()], 'details.syncSourceUrlOrId': [fieldRequiredValidator()],
@ -130,7 +154,7 @@ async function loadSyncSrc() {
srcs[0].details = srcs[0].details || {} srcs[0].details = srcs[0].details || {}
syncSource.value = migrateSync(srcs[0]) syncSource.value = migrateSync(srcs[0])
syncSource.value.details.syncSourceUrlOrId = srcs[0].details.shareId syncSource.value.details.syncSourceUrlOrId = srcs[0].details.shareId
socket?.emit('subscribe', syncSource.value.id) $jobs.subscribe({ syncId: syncSource.value.id }, onSubscribe, onStatus, onLog)
} else { } else {
syncSource.value = { syncSource.value = {
id: '', id: '',
@ -161,11 +185,8 @@ async function sync() {
baseURL, baseURL,
method: 'POST', method: 'POST',
headers: { 'xc-auth': $state.token.value as string }, headers: { 'xc-auth': $state.token.value as string },
params: {
id: socket?.id,
},
}) })
socket?.emit('subscribe', syncSource.value.id) $jobs.subscribe({ syncId: syncSource.value.id }, onSubscribe, onStatus, onLog)
} catch (e: any) { } catch (e: any) {
message.error(await extractSdkResponseErrorMsg(e)) message.error(await extractSdkResponseErrorMsg(e))
} }
@ -183,9 +204,6 @@ async function abort() {
baseURL, baseURL,
method: 'POST', method: 'POST',
headers: { 'xc-auth': $state.token.value as string }, headers: { 'xc-auth': $state.token.value as string },
params: {
id: socket?.id,
},
}) })
step.value = 1 step.value = 1
} catch (e: any) { } catch (e: any) {
@ -223,67 +241,12 @@ watch(
) )
onMounted(async () => { onMounted(async () => {
socket = io(new URL(baseURL, window.location.href.split(/[?#]/)[0]).href, { if (syncSource.value.id) {
extraHeaders: { 'xc-auth': $state.token.value as string }, $jobs.subscribe({ syncId: syncSource.value.id }, onSubscribe, onStatus, onLog)
}) }
socket.on('progress', async (d: Record<string, any>) => {
progress.value.push(d)
await nextTick(() => {
const container: HTMLDivElement = logRef.value?.$el?.firstElementChild
if (!container) return
container.scrollTop = container.scrollHeight
})
if (d.status === 'COMPLETED') {
showGoToDashboardButton.value = true
await loadTables()
// TODO: add tab of the first table
}
})
socket.on('disconnect', () => {
console.log('socket disconnected')
const rcInterval = setInterval(() => {
if (socket?.connected) {
clearInterval(rcInterval)
socket?.emit('subscribe', syncSource.value.id)
} else {
socket?.connect()
}
}, 2000)
})
socket.on('job', () => {
step.value = 2
})
// connect event does not provide data
socket.on('connect', () => {
console.log('socket connected')
if (syncSource.value.id) {
socket?.emit('subscribe', syncSource.value.id)
}
})
socket?.io.on('reconnect', () => {
console.log('socket reconnected')
if (syncSource.value.id) {
socket?.emit('subscribe', syncSource.value.id)
}
})
await loadSyncSrc() await loadSyncSrc()
}) })
onBeforeUnmount(() => {
if (socket) {
socket.off('disconnect')
socket.disconnect()
socket.removeAllListeners()
}
})
</script> </script>
<template> <template>
@ -407,7 +370,7 @@ onBeforeUnmount(() => {
<a-card ref="logRef" :body-style="{ backgroundColor: '#000000', height: '400px', overflow: 'auto' }"> <a-card ref="logRef" :body-style="{ backgroundColor: '#000000', height: '400px', overflow: 'auto' }">
<div v-for="({ msg, status }, i) in progress" :key="i"> <div v-for="({ msg, status }, i) in progress" :key="i">
<div v-if="status === 'FAILED'" class="flex items-center"> <div v-if="status === 'failed'" class="flex items-center">
<component :is="iconMap.closeCircle" class="text-red-500" /> <component :is="iconMap.closeCircle" class="text-red-500" />
<span class="text-red-500 ml-2">{{ msg }}</span> <span class="text-red-500 ml-2">{{ msg }}</span>
@ -424,7 +387,7 @@ onBeforeUnmount(() => {
v-if=" v-if="
!progress || !progress ||
!progress.length || !progress.length ||
(progress[progress.length - 1].status !== 'COMPLETED' && progress[progress.length - 1].status !== 'FAILED') (progress[progress.length - 1].status !== 'completed' && progress[progress.length - 1].status !== 'failed')
" "
class="flex items-center" class="flex items-center"
> >

2
packages/nc-gui/pages/index/index/index.vue

@ -95,7 +95,7 @@ const duplicateProject = (project: ProjectType) => {
await loadProjects() await loadProjects()
$jobs.subscribe(jobData.name, jobData.id, async (data: { status: string }) => { $jobs.subscribe({ name: jobData.name, id: jobData.id }, null, async (data: { status: string }) => {
if (data.status === 'completed') { if (data.status === 'completed') {
await loadProjects() await loadProjects()
} else if (data.status === 'failed') { } else if (data.status === 'failed') {

56
packages/nc-gui/plugins/jobs.ts

@ -6,6 +6,7 @@ export default defineNuxtPlugin(async (nuxtApp) => {
const { appInfo } = $(useGlobal()) const { appInfo } = $(useGlobal())
let socket: Socket | null = null let socket: Socket | null = null
let messageIndex = 0
const init = async (token: string) => { const init = async (token: string) => {
try { try {
@ -28,27 +29,58 @@ export default defineNuxtPlugin(async (nuxtApp) => {
await init(nuxtApp.$state.token.value) await init(nuxtApp.$state.token.value)
} }
const send = (name: string, data: any) => {
if (socket) {
const _id = messageIndex++
socket.emit(name, { _id, data })
return _id
}
}
const jobs = { const jobs = {
subscribe(name: string, id: string, cb: (data: any) => void) { subscribe(
if (socket) { job: { id: string; name: string } | any,
socket.emit('subscribe', { name, id }) subscribedCb?: () => void,
const tempFn = (data: any) => { statusCb?: (status: 'active' | 'completed' | 'failed' | 'refresh', error?: any) => void,
if (data.id === id && data.name === name) { logCb?: (data: { message: string }) => void,
cb(data) ) {
if (data.status === 'completed' || data.status === 'failed') { const logFn = (data: { id: string; name: string; data: { message: string } }) => {
socket?.off('status', tempFn) if (data.id === job.id) {
} if (logCb) logCb(data.data)
}
}
const statusFn = (data: any) => {
if (data.id === job.id) {
if (statusCb) statusCb(data.status, data.error)
if (data.status === 'completed' || data.status === 'failed') {
socket?.off('status', statusFn)
socket?.off('log', logFn)
}
}
}
const _id = send('subscribe', job)
const subscribeFn = (data: { _id: number; name: string; id: string }) => {
if (data._id === _id) {
if (data.id !== job.id || data.name !== job.name) {
job.id = data.id
job.name = data.name
} }
if (subscribedCb) subscribedCb()
socket?.on('log', logFn)
socket?.on('status', statusFn)
socket?.off('subscribed', subscribeFn)
} }
socket.on('status', tempFn)
} }
socket?.on('subscribed', subscribeFn)
}, },
getStatus(name: string, id: string): Promise<string> { getStatus(name: string, id: string): Promise<string> {
return new Promise((resolve) => { return new Promise((resolve) => {
if (socket) { if (socket) {
socket.emit('status', { name, id }) const _id = send('status', { name, id })
const tempFn = (data: any) => { const tempFn = (data: any) => {
if (data.id === id && data.name === name) { if (data._id === _id) {
resolve(data.status) resolve(data.status)
socket?.off('status', tempFn) socket?.off('status', tempFn)
} }

6
packages/nocodb-nest/src/controllers/imports/helpers/NocoSyncDestAdapter.ts

@ -1,6 +0,0 @@
export abstract class NocoSyncSourceAdapter {
public abstract init(): Promise<void>;
public abstract destProjectWrite(): Promise<any>;
public abstract destSchemaWrite(): Promise<any>;
public abstract destDataWrite(): Promise<any>;
}

7
packages/nocodb-nest/src/controllers/imports/helpers/NocoSyncSourceAdapter.ts

@ -1,7 +0,0 @@
export abstract class NocoSyncSourceAdapter {
public abstract init(): Promise<void>;
public abstract srcSchemaGet(): Promise<any>;
public abstract srcDataLoad(): Promise<any>;
public abstract srcDataListen(): Promise<any>;
public abstract srcDataPoll(): Promise<any>;
}

2480
packages/nocodb-nest/src/controllers/imports/helpers/job.ts

File diff suppressed because it is too large Load Diff

21
packages/nocodb-nest/src/controllers/imports/import.controller.spec.ts

@ -1,21 +0,0 @@
import { Test } from '@nestjs/testing';
import { ImportService } from '../../services/import.service';
import { ImportController } from './import.controller';
import type { TestingModule } from '@nestjs/testing';
describe('ImportController', () => {
let controller: ImportController;
beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({
controllers: [ImportController],
providers: [ImportService],
}).compile();
controller = module.get<ImportController>(ImportController);
});
it('should be defined', () => {
expect(controller).toBeDefined();
});
});

148
packages/nocodb-nest/src/controllers/imports/import.controller.ts

@ -1,148 +0,0 @@
import { Controller, HttpCode, Post, Request, UseGuards } from '@nestjs/common';
import { forwardRef, Inject } from '@nestjs/common';
import { ModuleRef } from '@nestjs/core';
import { SocketGateway } from 'src/services/socket.gateway';
import { GlobalGuard } from '../../guards/global/global.guard';
import { NcError } from '../../helpers/catchError';
import { ExtractProjectIdMiddleware } from '../../middlewares/extract-project-id/extract-project-id.middleware';
import { SyncSource } from '../../models';
import NocoJobs from '../../jobs/NocoJobs';
import airtableSyncJob from './helpers/job';
import type { AirtableSyncConfig } from './helpers/job';
import type { Server } from 'socket.io';
const AIRTABLE_IMPORT_JOB = 'AIRTABLE_IMPORT_JOB';
const AIRTABLE_PROGRESS_JOB = 'AIRTABLE_PROGRESS_JOB';
enum SyncStatus {
PROGRESS = 'PROGRESS',
COMPLETED = 'COMPLETED',
FAILED = 'FAILED',
}
const initJob = (sv: Server, jobs: { [p: string]: { last_message: any } }) => {
// add importer job handler and progress notification job handler
NocoJobs.jobsMgr.addJobWorker(AIRTABLE_IMPORT_JOB, airtableSyncJob);
NocoJobs.jobsMgr.addJobWorker(
AIRTABLE_PROGRESS_JOB,
({ payload, progress }) => {
sv.to(payload?.id).emit('progress', {
msg: progress?.msg,
level: progress?.level,
status: progress?.status,
});
if (payload?.id in jobs) {
jobs[payload?.id].last_message = {
msg: progress?.msg,
level: progress?.level,
status: progress?.status,
};
}
},
);
NocoJobs.jobsMgr.addProgressCbk(AIRTABLE_IMPORT_JOB, (payload, progress) => {
NocoJobs.jobsMgr.add(AIRTABLE_PROGRESS_JOB, {
payload,
progress: {
msg: progress?.msg,
level: progress?.level,
status: progress?.status,
},
});
});
NocoJobs.jobsMgr.addSuccessCbk(AIRTABLE_IMPORT_JOB, (payload) => {
NocoJobs.jobsMgr.add(AIRTABLE_PROGRESS_JOB, {
payload,
progress: {
msg: 'Complete!',
status: SyncStatus.COMPLETED,
},
});
delete jobs[payload?.id];
});
NocoJobs.jobsMgr.addFailureCbk(AIRTABLE_IMPORT_JOB, (payload, error: any) => {
NocoJobs.jobsMgr.add(AIRTABLE_PROGRESS_JOB, {
payload,
progress: {
msg: error?.message || 'Failed due to some internal error',
status: SyncStatus.FAILED,
},
});
delete jobs[payload?.id];
});
};
@Controller()
@UseGuards(ExtractProjectIdMiddleware, GlobalGuard)
export class ImportController {
constructor(
private readonly socketGateway: SocketGateway,
@Inject(forwardRef(() => ModuleRef)) private readonly moduleRef: ModuleRef,
) {}
@Post('/api/v1/db/meta/import/airtable')
@HttpCode(200)
importAirtable(@Request() req) {
NocoJobs.jobsMgr.add(AIRTABLE_IMPORT_JOB, {
id: req.query.id,
...req.body,
});
return {};
}
@Post('/api/v1/db/meta/syncs/:syncId/trigger')
@HttpCode(200)
async triggerSync(@Request() req) {
if (req.params.syncId in this.socketGateway.jobs) {
NcError.badRequest('Sync already in progress');
}
const syncSource = await SyncSource.get(req.params.syncId);
const user = await syncSource.getUser();
// Treat default baseUrl as siteUrl from req object
let baseURL = (req as any).ncSiteUrl;
// if environment value avail use it
// or if it's docker construct using `PORT`
if (process.env.NC_DOCKER) {
baseURL = `http://localhost:${process.env.PORT || 8080}`;
}
setTimeout(() => {
NocoJobs.jobsMgr.add<AirtableSyncConfig>(AIRTABLE_IMPORT_JOB, {
id: req.params.syncId,
...(syncSource?.details || {}),
projectId: syncSource.project_id,
baseId: syncSource.base_id,
authToken: '',
baseURL,
user: user,
moduleRef: this.moduleRef,
});
}, 1000);
this.socketGateway.jobs[req.params.syncId] = {
last_message: {
msg: 'Sync started',
},
};
return {};
}
@Post('/api/v1/db/meta/syncs/:syncId/abort')
@HttpCode(200)
async abortImport(@Request() req) {
if (req.params.syncId in this.socketGateway.jobs) {
delete this.socketGateway.jobs[req.params.syncId];
}
return {};
}
async onModuleInit() {
initJob(this.socketGateway.io, this.socketGateway.jobs);
}
}

76
packages/nocodb-nest/src/modules/jobs/at-import/at-import.controller.ts

@ -0,0 +1,76 @@
import { InjectQueue } from '@nestjs/bull';
import { Controller, HttpCode, Post, Request, UseGuards } from '@nestjs/common';
import { GlobalGuard } from 'src/guards/global/global.guard';
import { ExtractProjectIdMiddleware } from 'src/middlewares/extract-project-id/extract-project-id.middleware';
import { Queue } from 'bull';
import { SyncSource } from 'src/models';
import { NcError } from 'src/helpers/catchError';
import { QueueService } from '../fallback-queue.service';
import { JobsService } from '../jobs.service';
@Controller()
@UseGuards(ExtractProjectIdMiddleware, GlobalGuard)
export class AtImportController {
activeQueue;
constructor(
@InjectQueue('jobs') private readonly jobsQueue: Queue,
private readonly fallbackQueueService: QueueService,
private readonly jobsService: JobsService,
) {
this.activeQueue = process.env.NC_REDIS_URL
? this.jobsQueue
: this.fallbackQueueService;
}
@Post('/api/v1/db/meta/import/airtable')
@HttpCode(200)
async importAirtable(@Request() req) {
const job = await this.activeQueue.add('at-import', {
...req.body,
});
return { id: job.id, name: job.name };
}
@Post('/api/v1/db/meta/syncs/:syncId/trigger')
@HttpCode(200)
async triggerSync(@Request() req) {
const jobs = await this.jobsService.jobList('at-import');
const fnd = jobs.find((j) => j.data.syncId === req.params.syncId);
if (fnd) {
NcError.badRequest('Sync already in progress');
}
const syncSource = await SyncSource.get(req.params.syncId);
const user = await syncSource.getUser();
// Treat default baseUrl as siteUrl from req object
let baseURL = (req as any).ncSiteUrl;
// if environment value avail use it
// or if it's docker construct using `PORT`
if (process.env.NC_DOCKER) {
baseURL = `http://localhost:${process.env.PORT || 8080}`;
}
const job = await this.activeQueue.add('at-import', {
syncId: req.params.syncId,
...(syncSource?.details || {}),
projectId: syncSource.project_id,
baseId: syncSource.base_id,
authToken: '',
baseURL,
user: user,
});
return { id: job.id, name: job.name };
}
@Post('/api/v1/db/meta/syncs/:syncId/abort')
@HttpCode(200)
async abortImport(@Request() req) {
return {};
}
}

2515
packages/nocodb-nest/src/modules/jobs/at-import/at-import.processor.ts

File diff suppressed because it is too large Load Diff

0
packages/nocodb-nest/src/controllers/imports/helpers/EntityMap.ts → packages/nocodb-nest/src/modules/jobs/at-import/helpers/EntityMap.ts

0
packages/nocodb-nest/src/controllers/imports/helpers/fetchAT.ts → packages/nocodb-nest/src/modules/jobs/at-import/helpers/fetchAT.ts

5
packages/nocodb-nest/src/controllers/imports/helpers/readAndProcessData.ts → packages/nocodb-nest/src/modules/jobs/at-import/helpers/readAndProcessData.ts

@ -1,7 +1,8 @@
/* eslint-disable no-async-promise-executor */
import { RelationTypes, UITypes } from 'nocodb-sdk'; import { RelationTypes, UITypes } from 'nocodb-sdk';
import EntityMap from './EntityMap'; import EntityMap from './EntityMap';
import type { BulkDataAliasService } from '../../../services/bulk-data-alias.service'; import type { BulkDataAliasService } from '../../../../services/bulk-data-alias.service';
import type { TablesService } from '../../../services/tables.service'; import type { TablesService } from '../../../../services/tables.service';
// @ts-ignore // @ts-ignore
import type { AirtableBase } from 'airtable/lib/airtable_base'; import type { AirtableBase } from 'airtable/lib/airtable_base';
import type { TableType } from 'nocodb-sdk'; import type { TableType } from 'nocodb-sdk';

0
packages/nocodb-nest/src/controllers/imports/helpers/syncMap.ts → packages/nocodb-nest/src/modules/jobs/at-import/helpers/syncMap.ts

6
packages/nocodb-nest/src/modules/jobs/fallback-queue.service.ts

@ -3,6 +3,7 @@ import PQueue from 'p-queue';
import Emittery from 'emittery'; import Emittery from 'emittery';
import { DuplicateProcessor } from './export-import/duplicate.processor'; import { DuplicateProcessor } from './export-import/duplicate.processor';
import { JobsEventService } from './jobs-event.service'; import { JobsEventService } from './jobs-event.service';
import { AtImportProcessor } from './at-import/at-import.processor';
interface Job { interface Job {
id: string; id: string;
@ -22,6 +23,7 @@ export class QueueService {
constructor( constructor(
private readonly jobsEventService: JobsEventService, private readonly jobsEventService: JobsEventService,
private readonly duplicateProcessor: DuplicateProcessor, private readonly duplicateProcessor: DuplicateProcessor,
private readonly atImportProcessor: AtImportProcessor,
) { ) {
this.emitter.on('active', (data: any) => { this.emitter.on('active', (data: any) => {
const job = this.queueMemory.find( const job = this.queueMemory.find(
@ -56,6 +58,10 @@ export class QueueService {
this: this.duplicateProcessor, this: this.duplicateProcessor,
fn: this.duplicateProcessor.duplicateBase, fn: this.duplicateProcessor.duplicateBase,
}, },
'at-import': {
this: this.atImportProcessor,
fn: this.atImportProcessor.job,
},
}; };
async jobWrapper(job: Job) { async jobWrapper(job: Job) {

1
packages/nocodb-nest/src/modules/jobs/jobs-event.service.ts

@ -38,6 +38,7 @@ export class JobsEventService {
name: job.name, name: job.name,
id: job.id.toString(), id: job.id.toString(),
status: 'failed', status: 'failed',
error: error?.message,
}); });
} }

22
packages/nocodb-nest/src/modules/jobs/jobs.gateway.ts

@ -40,10 +40,16 @@ export class JobsGateway implements OnModuleInit {
@SubscribeMessage('subscribe') @SubscribeMessage('subscribe')
async subscribe( async subscribe(
@MessageBody() data: { name: string; id: string } | { id: string }, @MessageBody()
body: { _id: number; data: { id: string; name: string } | any },
@ConnectedSocket() client: Socket, @ConnectedSocket() client: Socket,
): Promise<void> { ): Promise<void> {
if ('name' in data) { const { _id, data } = body;
if (
Object.keys(data).every((k) => ['name', 'id'].includes(k)) &&
data?.name &&
data?.id
) {
const rooms = (await this.jobsService.jobList(data.name)).map( const rooms = (await this.jobsService.jobList(data.name)).map(
(j) => `${j.name}-${j.id}`, (j) => `${j.name}-${j.id}`,
); );
@ -51,17 +57,17 @@ export class JobsGateway implements OnModuleInit {
if (room) { if (room) {
client.join(`${data.name}-${data.id}`); client.join(`${data.name}-${data.id}`);
client.emit('subscribed', { client.emit('subscribed', {
subbed: data.id, _id,
name: data.name, name: data.name,
id: data.id, id: data.id,
}); });
} }
} else { } else {
const job = await this.jobsService.getJobWithData({ id: data.id }); const job = await this.jobsService.getJobWithData(data);
if (job) { if (job) {
client.join(`${job.name}-${job.id}`); client.join(`${job.name}-${job.id}`);
client.emit('subscribed', { client.emit('subscribed', {
subbed: data.id, _id,
name: job.name, name: job.name,
id: job.id, id: job.id,
}); });
@ -71,10 +77,12 @@ export class JobsGateway implements OnModuleInit {
@SubscribeMessage('status') @SubscribeMessage('status')
async status( async status(
@MessageBody() data: { name: string; id: string }, @MessageBody() body: { _id: number; data: { id: string; name: string } },
@ConnectedSocket() client: Socket, @ConnectedSocket() client: Socket,
): Promise<void> { ): Promise<void> {
const { _id, data } = body;
client.emit('status', { client.emit('status', {
_id,
id: data.id, id: data.id,
name: data.name, name: data.name,
status: await this.jobsService.jobStatus(data.id), status: await this.jobsService.jobStatus(data.id),
@ -93,11 +101,13 @@ export class JobsGateway implements OnModuleInit {
| 'failed' | 'failed'
| 'paused' | 'paused'
| 'refresh'; | 'refresh';
error?: any;
}): Promise<void> { }): Promise<void> {
this.server.to(`${data.name}-${data.id}`).emit('status', { this.server.to(`${data.name}-${data.id}`).emit('status', {
id: data.id, id: data.id,
name: data.name, name: data.name,
status: data.status, status: data.status,
error: data.error,
}); });
} }

5
packages/nocodb-nest/src/modules/jobs/jobs.module.ts

@ -11,6 +11,8 @@ import { DuplicateProcessor } from './export-import/duplicate.processor';
import { JobsGateway } from './jobs.gateway'; import { JobsGateway } from './jobs.gateway';
import { QueueService } from './fallback-queue.service'; import { QueueService } from './fallback-queue.service';
import { JobsEventService } from './jobs-event.service'; import { JobsEventService } from './jobs-event.service';
import { AtImportController } from './at-import/at-import.controller';
import { AtImportProcessor } from './at-import/at-import.processor';
@Module({ @Module({
imports: [ imports: [
@ -21,7 +23,7 @@ import { JobsEventService } from './jobs-event.service';
name: 'jobs', name: 'jobs',
}), }),
], ],
controllers: [DuplicateController], controllers: [DuplicateController, AtImportController],
providers: [ providers: [
QueueService, QueueService,
JobsGateway, JobsGateway,
@ -30,6 +32,7 @@ import { JobsEventService } from './jobs-event.service';
DuplicateProcessor, DuplicateProcessor,
ExportService, ExportService,
ImportService, ImportService,
AtImportProcessor,
], ],
}) })
export class JobsModule {} export class JobsModule {}

6
packages/nocodb-nest/src/modules/jobs/jobs.service.ts

@ -21,17 +21,17 @@ export class JobsService {
async jobList(jobType: string) { async jobList(jobType: string) {
return ( return (
await this.activeQueue.getJobs(['active', 'waiting', 'delayed']) await this.activeQueue.getJobs(['active', 'waiting', 'delayed', 'paused'])
).filter((j) => j.name === jobType); ).filter((j) => j.name === jobType);
} }
async getJobWithData(data: any) { async getJobWithData(data: any) {
const jobs = await this.activeQueue.getJobs([ const jobs = await this.activeQueue.getJobs([
'completed', // 'completed',
'waiting', 'waiting',
'active', 'active',
'delayed', 'delayed',
'failed', // 'failed',
'paused', 'paused',
]); ]);

4
packages/nocodb-nest/src/modules/metas/metas.module.ts

@ -16,7 +16,6 @@ import { GalleriesController } from '../../controllers/galleries.controller';
import { GridColumnsController } from '../../controllers/grid-columns.controller'; import { GridColumnsController } from '../../controllers/grid-columns.controller';
import { GridsController } from '../../controllers/grids.controller'; import { GridsController } from '../../controllers/grids.controller';
import { HooksController } from '../../controllers/hooks.controller'; import { HooksController } from '../../controllers/hooks.controller';
import { ImportController } from '../../controllers/imports/import.controller';
import { KanbansController } from '../../controllers/kanbans.controller'; import { KanbansController } from '../../controllers/kanbans.controller';
import { MapsController } from '../../controllers/maps.controller'; import { MapsController } from '../../controllers/maps.controller';
import { MetaDiffsController } from '../../controllers/meta-diffs.controller'; import { MetaDiffsController } from '../../controllers/meta-diffs.controller';
@ -98,7 +97,6 @@ import { DatasModule } from '../datas/datas.module';
GridColumnsController, GridColumnsController,
GridsController, GridsController,
HooksController, HooksController,
ImportController,
KanbansController, KanbansController,
MapsController, MapsController,
MetaDiffsController, MetaDiffsController,
@ -170,6 +168,8 @@ import { DatasModule } from '../datas/datas.module';
GalleriesService, GalleriesService,
KanbansService, KanbansService,
ProjectsService, ProjectsService,
AttachmentsService,
ProjectUsersService,
], ],
}) })
export class MetasModule {} export class MetasModule {}

12
packages/nocodb-nest/src/services/socket.gateway.ts

@ -25,7 +25,6 @@ function getHash(str) {
export class SocketGateway implements OnModuleInit { export class SocketGateway implements OnModuleInit {
// private server: HttpServer; // private server: HttpServer;
private clients: { [id: string]: Socket } = {}; private clients: { [id: string]: Socket } = {};
private _jobs: { [id: string]: { last_message: any } } = {};
constructor( constructor(
private jwtStrategy: JwtStrategy, private jwtStrategy: JwtStrategy,
@ -59,21 +58,10 @@ export class SocketGateway implements OnModuleInit {
socket.on('event', (args) => { socket.on('event', (args) => {
T.event({ ...args, id }); T.event({ ...args, id });
}); });
socket.on('subscribe', (room) => {
if (room in this.jobs) {
socket.join(room);
socket.emit('job');
socket.emit('progress', this.jobs[room].last_message);
}
});
}); });
} }
public get io() { public get io() {
return this.server; return this.server;
} }
public get jobs() {
return this._jobs;
}
} }

Loading…
Cancel
Save