Browse Source

feat: reconnect quick import logs

Signed-off-by: mertmit <mertmit99@gmail.com>
pull/4486/head
mertmit 2 years ago
parent
commit
76f33ae4e6
  1. 77
      packages/nc-gui/components/dlg/AirtableImport.vue
  2. 10
      packages/nocodb/src/lib/meta/api/index.ts
  3. 38
      packages/nocodb/src/lib/meta/api/sync/importApis.ts

77
packages/nc-gui/components/dlg/AirtableImport.vue

@ -40,8 +40,12 @@ const progress = ref<Record<string, any>[]>([])
const logRef = ref<typeof AntCard>() const logRef = ref<typeof AntCard>()
const enableAbort = ref(false)
let socket: Socket | null let socket: Socket | null
let socketInterval: NodeJS.Timer
const syncSource = ref({ const syncSource = ref({
id: '', id: '',
type: 'Airtable', type: 'Airtable',
@ -121,6 +125,7 @@ async function loadSyncSrc() {
srcs[0].details = srcs[0].details || {} srcs[0].details = srcs[0].details || {}
syncSource.value = migrateSync(srcs[0]) syncSource.value = migrateSync(srcs[0])
syncSource.value.details.syncSourceUrlOrId = srcs[0].details.shareId syncSource.value.details.syncSourceUrlOrId = srcs[0].details.shareId
socket?.emit('subscribe', syncSource.value.id)
} else { } else {
syncSource.value = { syncSource.value = {
id: '', id: '',
@ -146,7 +151,6 @@ async function loadSyncSrc() {
} }
async function sync() { async function sync() {
step.value = 2
try { try {
await $fetch(`/api/v1/db/meta/syncs/${syncSource.value.id}/trigger`, { await $fetch(`/api/v1/db/meta/syncs/${syncSource.value.id}/trigger`, {
baseURL, baseURL,
@ -156,11 +160,36 @@ async function sync() {
id: socket?.id, id: socket?.id,
}, },
}) })
socket?.emit('subscribe', syncSource.value.id)
} catch (e: any) { } catch (e: any) {
message.error(await extractSdkResponseErrorMsg(e)) message.error(await extractSdkResponseErrorMsg(e))
} }
} }
async function abort() {
Modal.confirm({
title: 'Are you sure you want to abort this job?',
type: 'warn',
content:
"This is a highly experimental feature and only marks job as not started, please don't abort the job unless you are sure job is stuck.",
onOk: async () => {
try {
await $fetch(`/api/v1/db/meta/syncs/${syncSource.value.id}/abort`, {
baseURL,
method: 'POST',
headers: { 'xc-auth': $state.token.value as string },
params: {
id: socket?.id,
},
})
step.value = 1
} catch (e: any) {
message.error(await extractSdkResponseErrorMsg(e))
}
},
})
}
function migrateSync(src: any) { function migrateSync(src: any) {
if (!src.details?.options) { if (!src.details?.options) {
src.details.options = { src.details.options = {
@ -193,16 +222,6 @@ onMounted(async () => {
extraHeaders: { 'xc-auth': $state.token.value as string }, extraHeaders: { 'xc-auth': $state.token.value as string },
}) })
socket.on('connect_error', () => {
socket?.disconnect()
socket = null
})
// connect event does not provide data
socket.on('connect', () => {
console.log('socket connected')
})
socket.on('progress', async (d: Record<string, any>) => { socket.on('progress', async (d: Record<string, any>) => {
progress.value.push(d) progress.value.push(d)
@ -219,13 +238,46 @@ onMounted(async () => {
} }
}) })
socket.on('disconnect', () => {
console.log('socket disconnected')
const rcInterval = setInterval(() => {
if (socket?.connected) {
clearInterval(rcInterval)
socket?.emit('subscribe', syncSource.value.id)
} else {
socket?.connect()
}
}, 2000)
})
socket.on('job', () => {
step.value = 2
})
// connect event does not provide data
socket.on('connect', () => {
console.log('socket connected')
if (syncSource.value.id) {
socket?.emit('subscribe', syncSource.value.id)
}
})
socket?.io.on('reconnect', () => {
console.log('socket reconnected')
if (syncSource.value.id) {
socket?.emit('subscribe', syncSource.value.id)
}
})
await loadSyncSrc() await loadSyncSrc()
}) })
onBeforeUnmount(() => { onBeforeUnmount(() => {
if (socket) { if (socket) {
socket.removeAllListeners()
socket.disconnect() socket.disconnect()
} }
clearInterval(socketInterval)
}) })
</script> </script>
@ -239,7 +291,7 @@ onBeforeUnmount(() => {
> >
<div class="px-5"> <div class="px-5">
<!-- Quick Import --> <!-- Quick Import -->
<div class="mt-5 prose-xl font-weight-bold">{{ $t('title.quickImport') }} - AIRTABLE</div> <div class="mt-5 prose-xl font-weight-bold" @dblclick="enableAbort = true">{{ $t('title.quickImport') }} - AIRTABLE</div>
<div v-if="step === 1"> <div v-if="step === 1">
<div class="mb-4"> <div class="mb-4">
@ -381,6 +433,7 @@ onBeforeUnmount(() => {
<a-button v-if="showGoToDashboardButton" class="mt-4" size="large" @click="dialogShow = false"> <a-button v-if="showGoToDashboardButton" class="mt-4" size="large" @click="dialogShow = false">
{{ $t('labels.goToDashboard') }} {{ $t('labels.goToDashboard') }}
</a-button> </a-button>
<a-button v-else-if="enableAbort" class="mt-4" size="large" danger @click="abort()">ABORT</a-button>
</div> </div>
</div> </div>
</div> </div>

10
packages/nocodb/src/lib/meta/api/index.ts

@ -55,6 +55,7 @@ import importApis from './sync/importApis';
import syncSourceApis from './sync/syncSourceApis'; import syncSourceApis from './sync/syncSourceApis';
const clients: { [id: string]: Socket } = {}; const clients: { [id: string]: Socket } = {};
const jobs: { [id: string]: { last_message: any } } = {};
export default function (router: Router, server) { export default function (router: Router, server) {
initStrategies(router); initStrategies(router);
@ -138,9 +139,16 @@ export default function (router: Router, server) {
socket.on('event', (args) => { socket.on('event', (args) => {
Tele.event({ ...args, id }); Tele.event({ ...args, id });
}); });
socket.on('subscribe', (room) => {
if (room in jobs) {
socket.join(room)
socket.emit('job')
socket.emit('progress', jobs[room].last_message)
}
})
}); });
importApis(router, clients); importApis(router, io, jobs);
} }
function getHash(str) { function getHash(str) {

38
packages/nocodb/src/lib/meta/api/sync/importApis.ts

@ -1,8 +1,8 @@
import { Request, Router } from 'express'; import { Request, Router } from 'express';
// import { Queue } from 'bullmq'; // import { Queue } from 'bullmq';
// import axios from 'axios'; // import axios from 'axios';
import catchError from '../../helpers/catchError'; import catchError, { NcError } from '../../helpers/catchError';
import { Socket } from 'socket.io'; import { Server } from 'socket.io';
import NocoJobs from '../../../jobs/NocoJobs'; import NocoJobs from '../../../jobs/NocoJobs';
import job, { AirtableSyncConfig } from './helpers/job'; import job, { AirtableSyncConfig } from './helpers/job';
import SyncSource from '../../../models/SyncSource'; import SyncSource from '../../../models/SyncSource';
@ -17,17 +17,25 @@ enum SyncStatus {
FAILED = 'FAILED', FAILED = 'FAILED',
} }
export default (router: Router, clients: { [id: string]: Socket }) => { export default (router: Router, sv: Server, jobs: { [id: string]: { last_message: any } }) => {
// add importer job handler and progress notification job handler // add importer job handler and progress notification job handler
NocoJobs.jobsMgr.addJobWorker(AIRTABLE_IMPORT_JOB, job); NocoJobs.jobsMgr.addJobWorker(AIRTABLE_IMPORT_JOB, job);
NocoJobs.jobsMgr.addJobWorker( NocoJobs.jobsMgr.addJobWorker(
AIRTABLE_PROGRESS_JOB, AIRTABLE_PROGRESS_JOB,
({ payload, progress }) => { ({ payload, progress }) => {
clients?.[payload?.id]?.emit('progress', { sv.to(payload?.id).emit('progress', {
msg: progress?.msg, msg: progress?.msg,
level: progress?.level, level: progress?.level,
status: progress?.status, status: progress?.status,
}); });
if (payload?.id in jobs) {
jobs[payload?.id].last_message = {
msg: progress?.msg,
level: progress?.level,
status: progress?.status,
};
}
} }
); );
@ -49,6 +57,7 @@ export default (router: Router, clients: { [id: string]: Socket }) => {
status: SyncStatus.COMPLETED, status: SyncStatus.COMPLETED,
}, },
}); });
delete jobs[payload?.id];
}); });
NocoJobs.jobsMgr.addFailureCbk(AIRTABLE_IMPORT_JOB, (payload, error: any) => { NocoJobs.jobsMgr.addFailureCbk(AIRTABLE_IMPORT_JOB, (payload, error: any) => {
NocoJobs.jobsMgr.add(AIRTABLE_PROGRESS_JOB, { NocoJobs.jobsMgr.add(AIRTABLE_PROGRESS_JOB, {
@ -58,6 +67,7 @@ export default (router: Router, clients: { [id: string]: Socket }) => {
status: SyncStatus.FAILED, status: SyncStatus.FAILED,
}, },
}); });
delete jobs[payload?.id];
}); });
router.post( router.post(
@ -73,6 +83,10 @@ export default (router: Router, clients: { [id: string]: Socket }) => {
router.post( router.post(
'/api/v1/db/meta/syncs/:syncId/trigger', '/api/v1/db/meta/syncs/:syncId/trigger',
catchError(async (req: Request, res) => { catchError(async (req: Request, res) => {
if (req.params.syncId in jobs) {
NcError.badRequest('Sync already in progress');
}
const syncSource = await SyncSource.get(req.params.syncId); const syncSource = await SyncSource.get(req.params.syncId);
const user = await syncSource.getUser(); const user = await syncSource.getUser();
@ -90,12 +104,26 @@ export default (router: Router, clients: { [id: string]: Socket }) => {
} }
NocoJobs.jobsMgr.add<AirtableSyncConfig>(AIRTABLE_IMPORT_JOB, { NocoJobs.jobsMgr.add<AirtableSyncConfig>(AIRTABLE_IMPORT_JOB, {
id: req.query.id, id: req.params.syncId,
...(syncSource?.details || {}), ...(syncSource?.details || {}),
projectId: syncSource.project_id, projectId: syncSource.project_id,
authToken: token, authToken: token,
baseURL, baseURL,
}); });
jobs[req.params.syncId] = {
last_message: {
msg: 'Sync started'
}
};
res.json({});
})
);
router.post(
'/api/v1/db/meta/syncs/:syncId/abort',
catchError(async (req: Request, res) => {
if (req.params.syncId in jobs) {
delete jobs[req.params.syncId];
}
res.json({}); res.json({});
}) })
); );

Loading…
Cancel
Save