diff --git a/packages/nc-gui/components/dlg/AirtableImport.vue b/packages/nc-gui/components/dlg/AirtableImport.vue index 1f0ccbe26c..1b762f351d 100644 --- a/packages/nc-gui/components/dlg/AirtableImport.vue +++ b/packages/nc-gui/components/dlg/AirtableImport.vue @@ -40,8 +40,12 @@ const progress = ref[]>([]) const logRef = ref() +const enableAbort = ref(false) + let socket: Socket | null +let socketInterval: NodeJS.Timer + const syncSource = ref({ id: '', type: 'Airtable', @@ -121,6 +125,7 @@ async function loadSyncSrc() { srcs[0].details = srcs[0].details || {} syncSource.value = migrateSync(srcs[0]) syncSource.value.details.syncSourceUrlOrId = srcs[0].details.shareId + socket?.emit('subscribe', syncSource.value.id) } else { syncSource.value = { id: '', @@ -146,7 +151,6 @@ async function loadSyncSrc() { } async function sync() { - step.value = 2 try { await $fetch(`/api/v1/db/meta/syncs/${syncSource.value.id}/trigger`, { baseURL, @@ -156,11 +160,36 @@ async function sync() { id: socket?.id, }, }) + socket?.emit('subscribe', syncSource.value.id) } catch (e: any) { message.error(await extractSdkResponseErrorMsg(e)) } } +async function abort() { + Modal.confirm({ + title: 'Are you sure you want to abort this job?', + type: 'warn', + content: + "This is a highly experimental feature and only marks job as not started, please don't abort the job unless you are sure job is stuck.", + onOk: async () => { + try { + await $fetch(`/api/v1/db/meta/syncs/${syncSource.value.id}/abort`, { + baseURL, + method: 'POST', + headers: { 'xc-auth': $state.token.value as string }, + params: { + id: socket?.id, + }, + }) + step.value = 1 + } catch (e: any) { + message.error(await extractSdkResponseErrorMsg(e)) + } + }, + }) +} + function migrateSync(src: any) { if (!src.details?.options) { src.details.options = { @@ -193,16 +222,6 @@ onMounted(async () => { extraHeaders: { 'xc-auth': $state.token.value as string }, }) - socket.on('connect_error', () => { - socket?.disconnect() - socket = null - }) - - // connect event does not provide data - socket.on('connect', () => { - console.log('socket connected') - }) - socket.on('progress', async (d: Record) => { progress.value.push(d) @@ -219,13 +238,46 @@ onMounted(async () => { } }) + socket.on('disconnect', () => { + console.log('socket disconnected') + const rcInterval = setInterval(() => { + if (socket?.connected) { + clearInterval(rcInterval) + socket?.emit('subscribe', syncSource.value.id) + } else { + socket?.connect() + } + }, 2000) + }) + + socket.on('job', () => { + step.value = 2 + }) + + // connect event does not provide data + socket.on('connect', () => { + console.log('socket connected') + if (syncSource.value.id) { + socket?.emit('subscribe', syncSource.value.id) + } + }) + + socket?.io.on('reconnect', () => { + console.log('socket reconnected') + if (syncSource.value.id) { + socket?.emit('subscribe', syncSource.value.id) + } + }) + await loadSyncSrc() }) onBeforeUnmount(() => { if (socket) { + socket.removeAllListeners() socket.disconnect() } + clearInterval(socketInterval) }) @@ -239,7 +291,7 @@ onBeforeUnmount(() => { >
-
{{ $t('title.quickImport') }} - AIRTABLE
+
{{ $t('title.quickImport') }} - AIRTABLE
@@ -381,6 +433,7 @@ onBeforeUnmount(() => { {{ $t('labels.goToDashboard') }} + ABORT
diff --git a/packages/nocodb/src/lib/meta/api/index.ts b/packages/nocodb/src/lib/meta/api/index.ts index ee1d8ab7fc..28ee746356 100644 --- a/packages/nocodb/src/lib/meta/api/index.ts +++ b/packages/nocodb/src/lib/meta/api/index.ts @@ -55,6 +55,7 @@ import importApis from './sync/importApis'; import syncSourceApis from './sync/syncSourceApis'; const clients: { [id: string]: Socket } = {}; +const jobs: { [id: string]: { last_message: any } } = {}; export default function (router: Router, server) { initStrategies(router); @@ -138,9 +139,16 @@ export default function (router: Router, server) { socket.on('event', (args) => { Tele.event({ ...args, id }); }); + socket.on('subscribe', (room) => { + if (room in jobs) { + socket.join(room) + socket.emit('job') + socket.emit('progress', jobs[room].last_message) + } + }) }); - importApis(router, clients); + importApis(router, io, jobs); } function getHash(str) { diff --git a/packages/nocodb/src/lib/meta/api/sync/helpers/readAndProcessData.ts b/packages/nocodb/src/lib/meta/api/sync/helpers/readAndProcessData.ts index 5fe7f76b5d..ad004205d1 100644 --- a/packages/nocodb/src/lib/meta/api/sync/helpers/readAndProcessData.ts +++ b/packages/nocodb/src/lib/meta/api/sync/helpers/readAndProcessData.ts @@ -4,7 +4,7 @@ import EntityMap from './EntityMap'; const BULK_DATA_BATCH_SIZE = 500; const ASSOC_BULK_DATA_BATCH_SIZE = 1000; -const BULK_PARALLEL_PROCESS = 100; +const BULK_PARALLEL_PROCESS = 5; async function readAllData({ table, diff --git a/packages/nocodb/src/lib/meta/api/sync/importApis.ts b/packages/nocodb/src/lib/meta/api/sync/importApis.ts index 15ae1b5e63..fd73736fa4 100644 --- a/packages/nocodb/src/lib/meta/api/sync/importApis.ts +++ b/packages/nocodb/src/lib/meta/api/sync/importApis.ts @@ -1,8 +1,8 @@ import { Request, Router } from 'express'; // import { Queue } from 'bullmq'; // import axios from 'axios'; -import catchError from '../../helpers/catchError'; -import { Socket } from 'socket.io'; +import catchError, { NcError } from '../../helpers/catchError'; +import { Server } from 'socket.io'; import NocoJobs from '../../../jobs/NocoJobs'; import job, { AirtableSyncConfig } from './helpers/job'; import SyncSource from '../../../models/SyncSource'; @@ -17,17 +17,25 @@ enum SyncStatus { FAILED = 'FAILED', } -export default (router: Router, clients: { [id: string]: Socket }) => { +export default (router: Router, sv: Server, jobs: { [id: string]: { last_message: any } }) => { // add importer job handler and progress notification job handler NocoJobs.jobsMgr.addJobWorker(AIRTABLE_IMPORT_JOB, job); NocoJobs.jobsMgr.addJobWorker( AIRTABLE_PROGRESS_JOB, ({ payload, progress }) => { - clients?.[payload?.id]?.emit('progress', { + sv.to(payload?.id).emit('progress', { msg: progress?.msg, level: progress?.level, status: progress?.status, }); + + if (payload?.id in jobs) { + jobs[payload?.id].last_message = { + msg: progress?.msg, + level: progress?.level, + status: progress?.status, + }; + } } ); @@ -49,6 +57,7 @@ export default (router: Router, clients: { [id: string]: Socket }) => { status: SyncStatus.COMPLETED, }, }); + delete jobs[payload?.id]; }); NocoJobs.jobsMgr.addFailureCbk(AIRTABLE_IMPORT_JOB, (payload, error: any) => { NocoJobs.jobsMgr.add(AIRTABLE_PROGRESS_JOB, { @@ -58,6 +67,7 @@ export default (router: Router, clients: { [id: string]: Socket }) => { status: SyncStatus.FAILED, }, }); + delete jobs[payload?.id]; }); router.post( @@ -73,6 +83,10 @@ export default (router: Router, clients: { [id: string]: Socket }) => { router.post( '/api/v1/db/meta/syncs/:syncId/trigger', catchError(async (req: Request, res) => { + if (req.params.syncId in jobs) { + NcError.badRequest('Sync already in progress'); + } + const syncSource = await SyncSource.get(req.params.syncId); const user = await syncSource.getUser(); @@ -90,12 +104,26 @@ export default (router: Router, clients: { [id: string]: Socket }) => { } NocoJobs.jobsMgr.add(AIRTABLE_IMPORT_JOB, { - id: req.query.id, + id: req.params.syncId, ...(syncSource?.details || {}), projectId: syncSource.project_id, authToken: token, baseURL, }); + jobs[req.params.syncId] = { + last_message: { + msg: 'Sync started' + } + }; + res.json({}); + }) + ); + router.post( + '/api/v1/db/meta/syncs/:syncId/abort', + catchError(async (req: Request, res) => { + if (req.params.syncId in jobs) { + delete jobs[req.params.syncId]; + } res.json({}); }) ); diff --git a/packages/nocodb/src/lib/v1-legacy/plugins/adapters/storage/Local.ts b/packages/nocodb/src/lib/v1-legacy/plugins/adapters/storage/Local.ts index edcc20b385..48b695b878 100644 --- a/packages/nocodb/src/lib/v1-legacy/plugins/adapters/storage/Local.ts +++ b/packages/nocodb/src/lib/v1-legacy/plugins/adapters/storage/Local.ts @@ -6,7 +6,7 @@ import mkdirp from 'mkdirp'; import { IStorageAdapterV2, XcFile } from 'nc-plugin'; import NcConfigFactory from '../../../../utils/NcConfigFactory'; -import request from 'request'; +import axios from 'axios'; export default class Local implements IStorageAdapterV2 { constructor() {} @@ -27,37 +27,36 @@ export default class Local implements IStorageAdapterV2 { async fileCreateByUrl(key: string, url: string): Promise { const destPath = path.join(NcConfigFactory.getToolDir(), ...key.split('/')); return new Promise((resolve, reject) => { - mkdirp.sync(path.dirname(destPath)); - const file = fs.createWriteStream(destPath); - const sendReq = request.get(url); - - // verify response code - sendReq.on('response', (response) => { - if (response.statusCode !== 200) { - return reject('Response status was ' + response.statusCode); - } - - sendReq.pipe(file); - }); - - // close() is async, call cb after close completes - file.on('finish', () => { - file.close((err) => { - if (err) { - return reject(err); - } - resolve(null); + axios.get((url), { responseType: "stream", headers: { + "accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9", + "accept-language": "en-US,en;q=0.9", + "cache-control": "no-cache", + "pragma": "no-cache", + "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36", + "origin": "https://www.airtable.com/", + } }) + .then(response => { + mkdirp.sync(path.dirname(destPath)); + const file = fs.createWriteStream(destPath); + // close() is async, call cb after close completes + file.on('finish', () => { + file.close((err) => { + if (err) { + return reject(err); + } + resolve(null); + }); }); - }); - // check for request errors - sendReq.on('error', (err) => { - fs.unlink(destPath, () => reject(err.message)); // delete the (partial) file and then return the error - }); + file.on('error', (err) => { + // Handle errors + fs.unlink(destPath, () => reject(err.message)); // delete the (partial) file and then return the error + }); - file.on('error', (err) => { - // Handle errors - fs.unlink(destPath, () => reject(err.message)); // delete the (partial) file and then return the error + response.data.pipe(file); + }) + .catch((err) => { + reject(err.message) }); }); }