Browse Source

Merge pull request #4486 from nocodb/feat/socket-room

feat: reconnect quick import logs
pull/3668/head
Raju Udava 2 years ago committed by GitHub
parent
commit
1335442899
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 77
      packages/nc-gui/components/dlg/AirtableImport.vue
  2. 10
      packages/nocodb/src/lib/meta/api/index.ts
  3. 2
      packages/nocodb/src/lib/meta/api/sync/helpers/readAndProcessData.ts
  4. 38
      packages/nocodb/src/lib/meta/api/sync/importApis.ts
  5. 57
      packages/nocodb/src/lib/v1-legacy/plugins/adapters/storage/Local.ts

77
packages/nc-gui/components/dlg/AirtableImport.vue

@ -40,8 +40,12 @@ const progress = ref<Record<string, any>[]>([])
const logRef = ref<typeof AntCard>()
const enableAbort = ref(false)
let socket: Socket | null
let socketInterval: NodeJS.Timer
const syncSource = ref({
id: '',
type: 'Airtable',
@ -121,6 +125,7 @@ async function loadSyncSrc() {
srcs[0].details = srcs[0].details || {}
syncSource.value = migrateSync(srcs[0])
syncSource.value.details.syncSourceUrlOrId = srcs[0].details.shareId
socket?.emit('subscribe', syncSource.value.id)
} else {
syncSource.value = {
id: '',
@ -146,7 +151,6 @@ async function loadSyncSrc() {
}
async function sync() {
step.value = 2
try {
await $fetch(`/api/v1/db/meta/syncs/${syncSource.value.id}/trigger`, {
baseURL,
@ -156,11 +160,36 @@ async function sync() {
id: socket?.id,
},
})
socket?.emit('subscribe', syncSource.value.id)
} catch (e: any) {
message.error(await extractSdkResponseErrorMsg(e))
}
}
async function abort() {
Modal.confirm({
title: 'Are you sure you want to abort this job?',
type: 'warn',
content:
"This is a highly experimental feature and only marks job as not started, please don't abort the job unless you are sure job is stuck.",
onOk: async () => {
try {
await $fetch(`/api/v1/db/meta/syncs/${syncSource.value.id}/abort`, {
baseURL,
method: 'POST',
headers: { 'xc-auth': $state.token.value as string },
params: {
id: socket?.id,
},
})
step.value = 1
} catch (e: any) {
message.error(await extractSdkResponseErrorMsg(e))
}
},
})
}
function migrateSync(src: any) {
if (!src.details?.options) {
src.details.options = {
@ -193,16 +222,6 @@ onMounted(async () => {
extraHeaders: { 'xc-auth': $state.token.value as string },
})
socket.on('connect_error', () => {
socket?.disconnect()
socket = null
})
// connect event does not provide data
socket.on('connect', () => {
console.log('socket connected')
})
socket.on('progress', async (d: Record<string, any>) => {
progress.value.push(d)
@ -219,13 +238,46 @@ onMounted(async () => {
}
})
socket.on('disconnect', () => {
console.log('socket disconnected')
const rcInterval = setInterval(() => {
if (socket?.connected) {
clearInterval(rcInterval)
socket?.emit('subscribe', syncSource.value.id)
} else {
socket?.connect()
}
}, 2000)
})
socket.on('job', () => {
step.value = 2
})
// connect event does not provide data
socket.on('connect', () => {
console.log('socket connected')
if (syncSource.value.id) {
socket?.emit('subscribe', syncSource.value.id)
}
})
socket?.io.on('reconnect', () => {
console.log('socket reconnected')
if (syncSource.value.id) {
socket?.emit('subscribe', syncSource.value.id)
}
})
await loadSyncSrc()
})
onBeforeUnmount(() => {
if (socket) {
socket.removeAllListeners()
socket.disconnect()
}
clearInterval(socketInterval)
})
</script>
@ -239,7 +291,7 @@ onBeforeUnmount(() => {
>
<div class="px-5">
<!-- Quick Import -->
<div class="mt-5 prose-xl font-weight-bold">{{ $t('title.quickImport') }} - AIRTABLE</div>
<div class="mt-5 prose-xl font-weight-bold" @dblclick="enableAbort = true">{{ $t('title.quickImport') }} - AIRTABLE</div>
<div v-if="step === 1">
<div class="mb-4">
@ -381,6 +433,7 @@ onBeforeUnmount(() => {
<a-button v-if="showGoToDashboardButton" class="mt-4" size="large" @click="dialogShow = false">
{{ $t('labels.goToDashboard') }}
</a-button>
<a-button v-else-if="enableAbort" class="mt-4" size="large" danger @click="abort()">ABORT</a-button>
</div>
</div>
</div>

10
packages/nocodb/src/lib/meta/api/index.ts

@ -55,6 +55,7 @@ import importApis from './sync/importApis';
import syncSourceApis from './sync/syncSourceApis';
const clients: { [id: string]: Socket } = {};
const jobs: { [id: string]: { last_message: any } } = {};
export default function (router: Router, server) {
initStrategies(router);
@ -138,9 +139,16 @@ export default function (router: Router, server) {
socket.on('event', (args) => {
Tele.event({ ...args, id });
});
socket.on('subscribe', (room) => {
if (room in jobs) {
socket.join(room)
socket.emit('job')
socket.emit('progress', jobs[room].last_message)
}
})
});
importApis(router, clients);
importApis(router, io, jobs);
}
function getHash(str) {

2
packages/nocodb/src/lib/meta/api/sync/helpers/readAndProcessData.ts

@ -4,7 +4,7 @@ import EntityMap from './EntityMap';
const BULK_DATA_BATCH_SIZE = 500;
const ASSOC_BULK_DATA_BATCH_SIZE = 1000;
const BULK_PARALLEL_PROCESS = 100;
const BULK_PARALLEL_PROCESS = 5;
async function readAllData({
table,

38
packages/nocodb/src/lib/meta/api/sync/importApis.ts

@ -1,8 +1,8 @@
import { Request, Router } from 'express';
// import { Queue } from 'bullmq';
// import axios from 'axios';
import catchError from '../../helpers/catchError';
import { Socket } from 'socket.io';
import catchError, { NcError } from '../../helpers/catchError';
import { Server } from 'socket.io';
import NocoJobs from '../../../jobs/NocoJobs';
import job, { AirtableSyncConfig } from './helpers/job';
import SyncSource from '../../../models/SyncSource';
@ -17,17 +17,25 @@ enum SyncStatus {
FAILED = 'FAILED',
}
export default (router: Router, clients: { [id: string]: Socket }) => {
export default (router: Router, sv: Server, jobs: { [id: string]: { last_message: any } }) => {
// add importer job handler and progress notification job handler
NocoJobs.jobsMgr.addJobWorker(AIRTABLE_IMPORT_JOB, job);
NocoJobs.jobsMgr.addJobWorker(
AIRTABLE_PROGRESS_JOB,
({ payload, progress }) => {
clients?.[payload?.id]?.emit('progress', {
sv.to(payload?.id).emit('progress', {
msg: progress?.msg,
level: progress?.level,
status: progress?.status,
});
if (payload?.id in jobs) {
jobs[payload?.id].last_message = {
msg: progress?.msg,
level: progress?.level,
status: progress?.status,
};
}
}
);
@ -49,6 +57,7 @@ export default (router: Router, clients: { [id: string]: Socket }) => {
status: SyncStatus.COMPLETED,
},
});
delete jobs[payload?.id];
});
NocoJobs.jobsMgr.addFailureCbk(AIRTABLE_IMPORT_JOB, (payload, error: any) => {
NocoJobs.jobsMgr.add(AIRTABLE_PROGRESS_JOB, {
@ -58,6 +67,7 @@ export default (router: Router, clients: { [id: string]: Socket }) => {
status: SyncStatus.FAILED,
},
});
delete jobs[payload?.id];
});
router.post(
@ -73,6 +83,10 @@ export default (router: Router, clients: { [id: string]: Socket }) => {
router.post(
'/api/v1/db/meta/syncs/:syncId/trigger',
catchError(async (req: Request, res) => {
if (req.params.syncId in jobs) {
NcError.badRequest('Sync already in progress');
}
const syncSource = await SyncSource.get(req.params.syncId);
const user = await syncSource.getUser();
@ -90,12 +104,26 @@ export default (router: Router, clients: { [id: string]: Socket }) => {
}
NocoJobs.jobsMgr.add<AirtableSyncConfig>(AIRTABLE_IMPORT_JOB, {
id: req.query.id,
id: req.params.syncId,
...(syncSource?.details || {}),
projectId: syncSource.project_id,
authToken: token,
baseURL,
});
jobs[req.params.syncId] = {
last_message: {
msg: 'Sync started'
}
};
res.json({});
})
);
router.post(
'/api/v1/db/meta/syncs/:syncId/abort',
catchError(async (req: Request, res) => {
if (req.params.syncId in jobs) {
delete jobs[req.params.syncId];
}
res.json({});
})
);

57
packages/nocodb/src/lib/v1-legacy/plugins/adapters/storage/Local.ts

@ -6,7 +6,7 @@ import mkdirp from 'mkdirp';
import { IStorageAdapterV2, XcFile } from 'nc-plugin';
import NcConfigFactory from '../../../../utils/NcConfigFactory';
import request from 'request';
import axios from 'axios';
export default class Local implements IStorageAdapterV2 {
constructor() {}
@ -27,37 +27,36 @@ export default class Local implements IStorageAdapterV2 {
async fileCreateByUrl(key: string, url: string): Promise<any> {
const destPath = path.join(NcConfigFactory.getToolDir(), ...key.split('/'));
return new Promise((resolve, reject) => {
mkdirp.sync(path.dirname(destPath));
const file = fs.createWriteStream(destPath);
const sendReq = request.get(url);
// verify response code
sendReq.on('response', (response) => {
if (response.statusCode !== 200) {
return reject('Response status was ' + response.statusCode);
}
sendReq.pipe(file);
});
// close() is async, call cb after close completes
file.on('finish', () => {
file.close((err) => {
if (err) {
return reject(err);
}
resolve(null);
axios.get((url), { responseType: "stream", headers: {
"accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
"accept-language": "en-US,en;q=0.9",
"cache-control": "no-cache",
"pragma": "no-cache",
"user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36",
"origin": "https://www.airtable.com/",
} })
.then(response => {
mkdirp.sync(path.dirname(destPath));
const file = fs.createWriteStream(destPath);
// close() is async, call cb after close completes
file.on('finish', () => {
file.close((err) => {
if (err) {
return reject(err);
}
resolve(null);
});
});
});
// check for request errors
sendReq.on('error', (err) => {
fs.unlink(destPath, () => reject(err.message)); // delete the (partial) file and then return the error
});
file.on('error', (err) => {
// Handle errors
fs.unlink(destPath, () => reject(err.message)); // delete the (partial) file and then return the error
});
file.on('error', (err) => {
// Handle errors
fs.unlink(destPath, () => reject(err.message)); // delete the (partial) file and then return the error
response.data.pipe(file);
})
.catch((err) => {
reject(err.message)
});
});
}

Loading…
Cancel
Save