diff --git a/packages/nc-gui/components/dlg/AirtableImport.vue b/packages/nc-gui/components/dlg/AirtableImport.vue index 1f0ccbe26c..1b762f351d 100644 --- a/packages/nc-gui/components/dlg/AirtableImport.vue +++ b/packages/nc-gui/components/dlg/AirtableImport.vue @@ -40,8 +40,12 @@ const progress = ref[]>([]) const logRef = ref() +const enableAbort = ref(false) + let socket: Socket | null +let socketInterval: NodeJS.Timer + const syncSource = ref({ id: '', type: 'Airtable', @@ -121,6 +125,7 @@ async function loadSyncSrc() { srcs[0].details = srcs[0].details || {} syncSource.value = migrateSync(srcs[0]) syncSource.value.details.syncSourceUrlOrId = srcs[0].details.shareId + socket?.emit('subscribe', syncSource.value.id) } else { syncSource.value = { id: '', @@ -146,7 +151,6 @@ async function loadSyncSrc() { } async function sync() { - step.value = 2 try { await $fetch(`/api/v1/db/meta/syncs/${syncSource.value.id}/trigger`, { baseURL, @@ -156,11 +160,36 @@ async function sync() { id: socket?.id, }, }) + socket?.emit('subscribe', syncSource.value.id) } catch (e: any) { message.error(await extractSdkResponseErrorMsg(e)) } } +async function abort() { + Modal.confirm({ + title: 'Are you sure you want to abort this job?', + type: 'warn', + content: + "This is a highly experimental feature and only marks job as not started, please don't abort the job unless you are sure job is stuck.", + onOk: async () => { + try { + await $fetch(`/api/v1/db/meta/syncs/${syncSource.value.id}/abort`, { + baseURL, + method: 'POST', + headers: { 'xc-auth': $state.token.value as string }, + params: { + id: socket?.id, + }, + }) + step.value = 1 + } catch (e: any) { + message.error(await extractSdkResponseErrorMsg(e)) + } + }, + }) +} + function migrateSync(src: any) { if (!src.details?.options) { src.details.options = { @@ -193,16 +222,6 @@ onMounted(async () => { extraHeaders: { 'xc-auth': $state.token.value as string }, }) - socket.on('connect_error', () => { - socket?.disconnect() - socket = null - }) - - // connect event does not provide data - socket.on('connect', () => { - console.log('socket connected') - }) - socket.on('progress', async (d: Record) => { progress.value.push(d) @@ -219,13 +238,46 @@ onMounted(async () => { } }) + socket.on('disconnect', () => { + console.log('socket disconnected') + const rcInterval = setInterval(() => { + if (socket?.connected) { + clearInterval(rcInterval) + socket?.emit('subscribe', syncSource.value.id) + } else { + socket?.connect() + } + }, 2000) + }) + + socket.on('job', () => { + step.value = 2 + }) + + // connect event does not provide data + socket.on('connect', () => { + console.log('socket connected') + if (syncSource.value.id) { + socket?.emit('subscribe', syncSource.value.id) + } + }) + + socket?.io.on('reconnect', () => { + console.log('socket reconnected') + if (syncSource.value.id) { + socket?.emit('subscribe', syncSource.value.id) + } + }) + await loadSyncSrc() }) onBeforeUnmount(() => { if (socket) { + socket.removeAllListeners() socket.disconnect() } + clearInterval(socketInterval) }) @@ -239,7 +291,7 @@ onBeforeUnmount(() => { >
-
{{ $t('title.quickImport') }} - AIRTABLE
+
{{ $t('title.quickImport') }} - AIRTABLE
@@ -381,6 +433,7 @@ onBeforeUnmount(() => { {{ $t('labels.goToDashboard') }} + ABORT
diff --git a/packages/nocodb/src/lib/meta/api/index.ts b/packages/nocodb/src/lib/meta/api/index.ts index ee1d8ab7fc..28ee746356 100644 --- a/packages/nocodb/src/lib/meta/api/index.ts +++ b/packages/nocodb/src/lib/meta/api/index.ts @@ -55,6 +55,7 @@ import importApis from './sync/importApis'; import syncSourceApis from './sync/syncSourceApis'; const clients: { [id: string]: Socket } = {}; +const jobs: { [id: string]: { last_message: any } } = {}; export default function (router: Router, server) { initStrategies(router); @@ -138,9 +139,16 @@ export default function (router: Router, server) { socket.on('event', (args) => { Tele.event({ ...args, id }); }); + socket.on('subscribe', (room) => { + if (room in jobs) { + socket.join(room) + socket.emit('job') + socket.emit('progress', jobs[room].last_message) + } + }) }); - importApis(router, clients); + importApis(router, io, jobs); } function getHash(str) { diff --git a/packages/nocodb/src/lib/meta/api/sync/importApis.ts b/packages/nocodb/src/lib/meta/api/sync/importApis.ts index 15ae1b5e63..fd73736fa4 100644 --- a/packages/nocodb/src/lib/meta/api/sync/importApis.ts +++ b/packages/nocodb/src/lib/meta/api/sync/importApis.ts @@ -1,8 +1,8 @@ import { Request, Router } from 'express'; // import { Queue } from 'bullmq'; // import axios from 'axios'; -import catchError from '../../helpers/catchError'; -import { Socket } from 'socket.io'; +import catchError, { NcError } from '../../helpers/catchError'; +import { Server } from 'socket.io'; import NocoJobs from '../../../jobs/NocoJobs'; import job, { AirtableSyncConfig } from './helpers/job'; import SyncSource from '../../../models/SyncSource'; @@ -17,17 +17,25 @@ enum SyncStatus { FAILED = 'FAILED', } -export default (router: Router, clients: { [id: string]: Socket }) => { +export default (router: Router, sv: Server, jobs: { [id: string]: { last_message: any } }) => { // add importer job handler and progress notification job handler NocoJobs.jobsMgr.addJobWorker(AIRTABLE_IMPORT_JOB, job); NocoJobs.jobsMgr.addJobWorker( AIRTABLE_PROGRESS_JOB, ({ payload, progress }) => { - clients?.[payload?.id]?.emit('progress', { + sv.to(payload?.id).emit('progress', { msg: progress?.msg, level: progress?.level, status: progress?.status, }); + + if (payload?.id in jobs) { + jobs[payload?.id].last_message = { + msg: progress?.msg, + level: progress?.level, + status: progress?.status, + }; + } } ); @@ -49,6 +57,7 @@ export default (router: Router, clients: { [id: string]: Socket }) => { status: SyncStatus.COMPLETED, }, }); + delete jobs[payload?.id]; }); NocoJobs.jobsMgr.addFailureCbk(AIRTABLE_IMPORT_JOB, (payload, error: any) => { NocoJobs.jobsMgr.add(AIRTABLE_PROGRESS_JOB, { @@ -58,6 +67,7 @@ export default (router: Router, clients: { [id: string]: Socket }) => { status: SyncStatus.FAILED, }, }); + delete jobs[payload?.id]; }); router.post( @@ -73,6 +83,10 @@ export default (router: Router, clients: { [id: string]: Socket }) => { router.post( '/api/v1/db/meta/syncs/:syncId/trigger', catchError(async (req: Request, res) => { + if (req.params.syncId in jobs) { + NcError.badRequest('Sync already in progress'); + } + const syncSource = await SyncSource.get(req.params.syncId); const user = await syncSource.getUser(); @@ -90,12 +104,26 @@ export default (router: Router, clients: { [id: string]: Socket }) => { } NocoJobs.jobsMgr.add(AIRTABLE_IMPORT_JOB, { - id: req.query.id, + id: req.params.syncId, ...(syncSource?.details || {}), projectId: syncSource.project_id, authToken: token, baseURL, }); + jobs[req.params.syncId] = { + last_message: { + msg: 'Sync started' + } + }; + res.json({}); + }) + ); + router.post( + '/api/v1/db/meta/syncs/:syncId/abort', + catchError(async (req: Request, res) => { + if (req.params.syncId in jobs) { + delete jobs[req.params.syncId]; + } res.json({}); }) );