Browse Source

feat: at import background job

Signed-off-by: mertmit <mertmit99@gmail.com>
feat/export-nest-dir-restructure
mertmit 2 years ago committed by starbirdtech383
parent
commit
6d51ce7bfc
  1. 111
      packages/nc-gui/components/dlg/AirtableImport.vue
  2. 2
      packages/nc-gui/pages/index/index/index.vue
  3. 56
      packages/nc-gui/plugins/jobs.ts
  4. 5
      packages/nocodb-nest/src/controllers/imports/helpers/readAndProcessData.ts
  5. 76
      packages/nocodb-nest/src/modules/jobs/at-import/at-import.controller.ts
  6. 2515
      packages/nocodb-nest/src/modules/jobs/at-import/at-import.processor.ts
  7. 222
      packages/nocodb-nest/src/modules/jobs/at-import/helpers/EntityMap.ts
  8. 242
      packages/nocodb-nest/src/modules/jobs/at-import/helpers/fetchAT.ts
  9. 362
      packages/nocodb-nest/src/modules/jobs/at-import/helpers/readAndProcessData.ts
  10. 31
      packages/nocodb-nest/src/modules/jobs/at-import/helpers/syncMap.ts
  11. 6
      packages/nocodb-nest/src/modules/jobs/fallback-queue.service.ts
  12. 1
      packages/nocodb-nest/src/modules/jobs/jobs-event.service.ts
  13. 22
      packages/nocodb-nest/src/modules/jobs/jobs.gateway.ts
  14. 5
      packages/nocodb-nest/src/modules/jobs/jobs.module.ts
  15. 6
      packages/nocodb-nest/src/modules/jobs/jobs.service.ts
  16. 4
      packages/nocodb-nest/src/modules/metas/metas.module.ts
  17. 12
      packages/nocodb-nest/src/services/socket.gateway.ts

111
packages/nc-gui/components/dlg/AirtableImport.vue

@ -1,6 +1,4 @@
<script setup lang="ts">
import type { Socket } from 'socket.io-client'
import io from 'socket.io-client'
import type { Card as AntCard } from 'ant-design-vue'
import {
Form,
@ -10,7 +8,6 @@ import {
iconMap,
message,
nextTick,
onBeforeUnmount,
onMounted,
ref,
storeToRefs,
@ -31,7 +28,7 @@ const { appInfo } = $(useGlobal())
const baseURL = appInfo.ncSiteUrl
const { $state } = useNuxtApp()
const { $state, $jobs } = useNuxtApp()
const projectStore = useProject()
@ -49,8 +46,6 @@ const logRef = ref<typeof AntCard>()
const enableAbort = ref(false)
let socket: Socket | null
const syncSource = ref({
id: '',
type: 'Airtable',
@ -72,6 +67,35 @@ const syncSource = ref({
},
})
const pushProgress = async (message: string, status: 'completed' | 'failed' | 'progress') => {
progress.value.push({ msg: message, status })
await nextTick(() => {
const container: HTMLDivElement = logRef.value?.$el?.firstElementChild
if (!container) return
container.scrollTop = container.scrollHeight
})
}
const onSubscribe = () => {
step.value = 2
}
const onStatus = async (status: 'active' | 'completed' | 'failed' | 'refresh', error?: any) => {
if (status === 'completed') {
showGoToDashboardButton.value = true
await loadTables()
pushProgress('Done!', status)
// TODO: add tab of the first table
} else if (status === 'failed') {
pushProgress(error, status)
}
}
const onLog = (data: { message: string }) => {
pushProgress(data.message, 'progress')
}
const validators = computed(() => ({
'details.apiKey': [fieldRequiredValidator()],
'details.syncSourceUrlOrId': [fieldRequiredValidator()],
@ -130,7 +154,7 @@ async function loadSyncSrc() {
srcs[0].details = srcs[0].details || {}
syncSource.value = migrateSync(srcs[0])
syncSource.value.details.syncSourceUrlOrId = srcs[0].details.shareId
socket?.emit('subscribe', syncSource.value.id)
$jobs.subscribe({ syncId: syncSource.value.id }, onSubscribe, onStatus, onLog)
} else {
syncSource.value = {
id: '',
@ -161,11 +185,8 @@ async function sync() {
baseURL,
method: 'POST',
headers: { 'xc-auth': $state.token.value as string },
params: {
id: socket?.id,
},
})
socket?.emit('subscribe', syncSource.value.id)
$jobs.subscribe({ syncId: syncSource.value.id }, onSubscribe, onStatus, onLog)
} catch (e: any) {
message.error(await extractSdkResponseErrorMsg(e))
}
@ -183,9 +204,6 @@ async function abort() {
baseURL,
method: 'POST',
headers: { 'xc-auth': $state.token.value as string },
params: {
id: socket?.id,
},
})
step.value = 1
} catch (e: any) {
@ -223,67 +241,12 @@ watch(
)
onMounted(async () => {
socket = io(new URL(baseURL, window.location.href.split(/[?#]/)[0]).href, {
extraHeaders: { 'xc-auth': $state.token.value as string },
})
socket.on('progress', async (d: Record<string, any>) => {
progress.value.push(d)
await nextTick(() => {
const container: HTMLDivElement = logRef.value?.$el?.firstElementChild
if (!container) return
container.scrollTop = container.scrollHeight
})
if (d.status === 'COMPLETED') {
showGoToDashboardButton.value = true
await loadTables()
// TODO: add tab of the first table
}
})
socket.on('disconnect', () => {
console.log('socket disconnected')
const rcInterval = setInterval(() => {
if (socket?.connected) {
clearInterval(rcInterval)
socket?.emit('subscribe', syncSource.value.id)
} else {
socket?.connect()
}
}, 2000)
})
socket.on('job', () => {
step.value = 2
})
// connect event does not provide data
socket.on('connect', () => {
console.log('socket connected')
if (syncSource.value.id) {
socket?.emit('subscribe', syncSource.value.id)
}
})
socket?.io.on('reconnect', () => {
console.log('socket reconnected')
if (syncSource.value.id) {
socket?.emit('subscribe', syncSource.value.id)
}
})
if (syncSource.value.id) {
$jobs.subscribe({ syncId: syncSource.value.id }, onSubscribe, onStatus, onLog)
}
await loadSyncSrc()
})
onBeforeUnmount(() => {
if (socket) {
socket.off('disconnect')
socket.disconnect()
socket.removeAllListeners()
}
})
</script>
<template>
@ -407,7 +370,7 @@ onBeforeUnmount(() => {
<a-card ref="logRef" :body-style="{ backgroundColor: '#000000', height: '400px', overflow: 'auto' }">
<div v-for="({ msg, status }, i) in progress" :key="i">
<div v-if="status === 'FAILED'" class="flex items-center">
<div v-if="status === 'failed'" class="flex items-center">
<component :is="iconMap.closeCircle" class="text-red-500" />
<span class="text-red-500 ml-2">{{ msg }}</span>
@ -424,7 +387,7 @@ onBeforeUnmount(() => {
v-if="
!progress ||
!progress.length ||
(progress[progress.length - 1].status !== 'COMPLETED' && progress[progress.length - 1].status !== 'FAILED')
(progress[progress.length - 1].status !== 'completed' && progress[progress.length - 1].status !== 'failed')
"
class="flex items-center"
>

2
packages/nc-gui/pages/index/index/index.vue

@ -95,7 +95,7 @@ const duplicateProject = (project: ProjectType) => {
await loadProjects()
$jobs.subscribe(jobData.name, jobData.id, async (data: { status: string }) => {
$jobs.subscribe({ name: jobData.name, id: jobData.id }, null, async (data: { status: string }) => {
if (data.status === 'completed') {
await loadProjects()
} else if (data.status === 'failed') {

56
packages/nc-gui/plugins/jobs.ts

@ -6,6 +6,7 @@ export default defineNuxtPlugin(async (nuxtApp) => {
const { appInfo } = $(useGlobal())
let socket: Socket | null = null
let messageIndex = 0
const init = async (token: string) => {
try {
@ -28,27 +29,58 @@ export default defineNuxtPlugin(async (nuxtApp) => {
await init(nuxtApp.$state.token.value)
}
const send = (name: string, data: any) => {
if (socket) {
const _id = messageIndex++
socket.emit(name, { _id, data })
return _id
}
}
const jobs = {
subscribe(name: string, id: string, cb: (data: any) => void) {
if (socket) {
socket.emit('subscribe', { name, id })
const tempFn = (data: any) => {
if (data.id === id && data.name === name) {
cb(data)
if (data.status === 'completed' || data.status === 'failed') {
socket?.off('status', tempFn)
}
subscribe(
job: { id: string; name: string } | any,
subscribedCb?: () => void,
statusCb?: (status: 'active' | 'completed' | 'failed' | 'refresh', error?: any) => void,
logCb?: (data: { message: string }) => void,
) {
const logFn = (data: { id: string; name: string; data: { message: string } }) => {
if (data.id === job.id) {
if (logCb) logCb(data.data)
}
}
const statusFn = (data: any) => {
if (data.id === job.id) {
if (statusCb) statusCb(data.status, data.error)
if (data.status === 'completed' || data.status === 'failed') {
socket?.off('status', statusFn)
socket?.off('log', logFn)
}
}
}
const _id = send('subscribe', job)
const subscribeFn = (data: { _id: number; name: string; id: string }) => {
if (data._id === _id) {
if (data.id !== job.id || data.name !== job.name) {
job.id = data.id
job.name = data.name
}
if (subscribedCb) subscribedCb()
socket?.on('log', logFn)
socket?.on('status', statusFn)
socket?.off('subscribed', subscribeFn)
}
socket.on('status', tempFn)
}
socket?.on('subscribed', subscribeFn)
},
getStatus(name: string, id: string): Promise<string> {
return new Promise((resolve) => {
if (socket) {
socket.emit('status', { name, id })
const _id = send('status', { name, id })
const tempFn = (data: any) => {
if (data.id === id && data.name === name) {
if (data._id === _id) {
resolve(data.status)
socket?.off('status', tempFn)
}

5
packages/nocodb-nest/src/controllers/imports/helpers/readAndProcessData.ts

@ -1,7 +1,8 @@
/* eslint-disable no-async-promise-executor */
import { RelationTypes, UITypes } from 'nocodb-sdk';
import EntityMap from './EntityMap';
import type { BulkDataAliasService } from '../../../services/bulk-data-alias.service';
import type { TablesService } from '../../../services/tables.service';
import type { BulkDataAliasService } from '../../../../services/bulk-data-alias.service';
import type { TablesService } from '../../../../services/tables.service';
// @ts-ignore
import type { AirtableBase } from 'airtable/lib/airtable_base';
import type { TableType } from 'nocodb-sdk';

76
packages/nocodb-nest/src/modules/jobs/at-import/at-import.controller.ts

@ -0,0 +1,76 @@
import { InjectQueue } from '@nestjs/bull';
import { Controller, HttpCode, Post, Request, UseGuards } from '@nestjs/common';
import { GlobalGuard } from 'src/guards/global/global.guard';
import { ExtractProjectIdMiddleware } from 'src/middlewares/extract-project-id/extract-project-id.middleware';
import { Queue } from 'bull';
import { SyncSource } from 'src/models';
import { NcError } from 'src/helpers/catchError';
import { QueueService } from '../fallback-queue.service';
import { JobsService } from '../jobs.service';
@Controller()
@UseGuards(ExtractProjectIdMiddleware, GlobalGuard)
export class AtImportController {
activeQueue;
constructor(
@InjectQueue('jobs') private readonly jobsQueue: Queue,
private readonly fallbackQueueService: QueueService,
private readonly jobsService: JobsService,
) {
this.activeQueue = process.env.NC_REDIS_URL
? this.jobsQueue
: this.fallbackQueueService;
}
@Post('/api/v1/db/meta/import/airtable')
@HttpCode(200)
async importAirtable(@Request() req) {
const job = await this.activeQueue.add('at-import', {
...req.body,
});
return { id: job.id, name: job.name };
}
@Post('/api/v1/db/meta/syncs/:syncId/trigger')
@HttpCode(200)
async triggerSync(@Request() req) {
const jobs = await this.jobsService.jobList('at-import');
const fnd = jobs.find((j) => j.data.syncId === req.params.syncId);
if (fnd) {
NcError.badRequest('Sync already in progress');
}
const syncSource = await SyncSource.get(req.params.syncId);
const user = await syncSource.getUser();
// Treat default baseUrl as siteUrl from req object
let baseURL = (req as any).ncSiteUrl;
// if environment value avail use it
// or if it's docker construct using `PORT`
if (process.env.NC_DOCKER) {
baseURL = `http://localhost:${process.env.PORT || 8080}`;
}
const job = await this.activeQueue.add('at-import', {
syncId: req.params.syncId,
...(syncSource?.details || {}),
projectId: syncSource.project_id,
baseId: syncSource.base_id,
authToken: '',
baseURL,
user: user,
});
return { id: job.id, name: job.name };
}
@Post('/api/v1/db/meta/syncs/:syncId/abort')
@HttpCode(200)
async abortImport(@Request() req) {
return {};
}
}

2515
packages/nocodb-nest/src/modules/jobs/at-import/at-import.processor.ts

File diff suppressed because it is too large Load Diff

222
packages/nocodb-nest/src/modules/jobs/at-import/helpers/EntityMap.ts

@ -0,0 +1,222 @@
import { Readable } from 'stream';
import sqlite3 from 'sqlite3';
class EntityMap {
initialized: boolean;
cols: string[];
db: any;
constructor(...args) {
this.initialized = false;
this.cols = args.map((arg) => processKey(arg));
this.db = new Promise((resolve, reject) => {
const db = new sqlite3.Database(':memory:');
const colStatement =
this.cols.length > 0
? this.cols.join(' TEXT, ') + ' TEXT'
: 'mappingPlaceholder TEXT';
db.run(`CREATE TABLE mapping (${colStatement})`, (err) => {
if (err) {
console.log(err);
reject(err);
}
resolve(db);
});
});
}
async init() {
if (!this.initialized) {
this.db = await this.db;
this.initialized = true;
}
}
destroy() {
if (this.initialized && this.db) {
this.db.close();
}
}
async addRow(row) {
if (!this.initialized) {
throw 'Please initialize first!';
}
const cols = Object.keys(row).map((key) => processKey(key));
const colStatement = cols.map((key) => `'${key}'`).join(', ');
const questionMarks = cols.map(() => '?').join(', ');
const promises = [];
for (const col of cols.filter((col) => !this.cols.includes(col))) {
promises.push(
new Promise((resolve, reject) => {
this.db.run(`ALTER TABLE mapping ADD '${col}' TEXT;`, (err) => {
if (err) {
console.log(err);
reject(err);
}
this.cols.push(col);
resolve(true);
});
}),
);
}
await Promise.all(promises);
const values = Object.values(row).map((val) => {
if (typeof val === 'object') {
return `JSON::${JSON.stringify(val)}`;
}
return val;
});
return new Promise((resolve, reject) => {
this.db.run(
`INSERT INTO mapping (${colStatement}) VALUES (${questionMarks})`,
values,
(err) => {
if (err) {
console.log(err);
reject(err);
}
resolve(true);
},
);
});
}
getRow(col, val, res = []): Promise<Record<string, any>> {
if (!this.initialized) {
throw 'Please initialize first!';
}
return new Promise((resolve, reject) => {
col = processKey(col);
res = res.map((r) => processKey(r));
this.db.get(
`SELECT ${
res.length ? res.join(', ') : '*'
} FROM mapping WHERE ${col} = ?`,
[val],
(err, rs) => {
if (err) {
console.log(err);
reject(err);
}
if (rs) {
rs = processResponseRow(rs);
}
resolve(rs);
},
);
});
}
getCount(): Promise<number> {
if (!this.initialized) {
throw 'Please initialize first!';
}
return new Promise((resolve, reject) => {
this.db.get(`SELECT COUNT(*) as count FROM mapping`, (err, rs) => {
if (err) {
console.log(err);
reject(err);
}
resolve(rs.count);
});
});
}
getStream(res = []): DBStream {
if (!this.initialized) {
throw 'Please initialize first!';
}
res = res.map((r) => processKey(r));
return new DBStream(
this.db,
`SELECT ${res.length ? res.join(', ') : '*'} FROM mapping`,
);
}
getLimit(limit, offset, res = []): Promise<Record<string, any>[]> {
if (!this.initialized) {
throw 'Please initialize first!';
}
return new Promise((resolve, reject) => {
res = res.map((r) => processKey(r));
this.db.all(
`SELECT ${
res.length ? res.join(', ') : '*'
} FROM mapping LIMIT ${limit} OFFSET ${offset}`,
(err, rs) => {
if (err) {
console.log(err);
reject(err);
}
for (let row of rs) {
row = processResponseRow(row);
}
resolve(rs);
},
);
});
}
}
class DBStream extends Readable {
db: any;
stmt: any;
sql: any;
constructor(db, sql) {
super({ objectMode: true });
this.db = db;
this.sql = sql;
this.stmt = this.db.prepare(this.sql);
this.on('end', () => this.stmt.finalize());
}
_read() {
const stream = this;
this.stmt.get(function (err, result) {
if (err) {
stream.emit('error', err);
} else {
if (result) {
result = processResponseRow(result);
}
stream.push(result || null);
}
});
}
}
function processResponseRow(res: any) {
for (const key of Object.keys(res)) {
if (res[key] && res[key].startsWith('JSON::')) {
try {
res[key] = JSON.parse(res[key].replace('JSON::', ''));
} catch (e) {
console.log(e);
}
}
if (revertKey(key) !== key) {
res[revertKey(key)] = res[key];
delete res[key];
}
}
return res;
}
function processKey(key) {
return key.replace(/'/g, "''").replace(/[A-Z]/g, (match) => `_${match}`);
}
function revertKey(key) {
return key.replace(/''/g, "'").replace(/_[A-Z]/g, (match) => match[1]);
}
export default EntityMap;

242
packages/nocodb-nest/src/modules/jobs/at-import/helpers/fetchAT.ts

@ -0,0 +1,242 @@
import axios from 'axios';
const info: any = {
initialized: false,
};
async function initialize(shareId) {
info.cookie = '';
const url = `https://airtable.com/${shareId}`;
try {
const hreq = await axios
.get(url, {
headers: {
accept:
'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
'accept-language': 'en-US,en;q=0.9',
'sec-ch-ua':
'" Not A;Brand";v="99", "Chromium";v="100", "Google Chrome";v="100"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Linux"',
'sec-fetch-dest': 'document',
'sec-fetch-mode': 'navigate',
'sec-fetch-site': 'none',
'sec-fetch-user': '?1',
'upgrade-insecure-requests': '1',
'User-Agent':
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.4896.88 Safari/537.36',
},
// @ts-ignore
referrerPolicy: 'strict-origin-when-cross-origin',
body: null,
method: 'GET',
})
.then((response) => {
for (const ck of response.headers['set-cookie']) {
info.cookie += ck.split(';')[0] + '; ';
}
return response.data;
})
.catch(() => {
throw {
message:
'Invalid Shared Base ID :: Ensure www.airtable.com/<SharedBaseID> is accessible. Refer https://bit.ly/3x0OdXI for details',
};
});
info.headers = JSON.parse(
hreq.match(/(?<=var headers =)(.*)(?=;)/g)[0].trim(),
);
info.link = unicodeToChar(hreq.match(/(?<=fetch\(")(.*)(?=")/g)[0].trim());
info.baseInfo = decodeURIComponent(info.link)
.match(/{(.*)}/g)[0]
.split('&')
.reduce((result, el) => {
try {
return Object.assign(
result,
JSON.parse(el.includes('=') ? el.split('=')[1] : el),
);
} catch (e) {
if (el.includes('=')) {
return Object.assign(result, {
[el.split('=')[0]]: el.split('=')[1],
});
}
}
}, {});
info.baseId = info.baseInfo.applicationId;
info.initialized = true;
} catch (e) {
console.log(e);
info.initialized = false;
if (e.message) {
throw e;
} else {
throw {
message:
'Error processing Shared Base :: Ensure www.airtable.com/<SharedBaseID> is accessible. Refer https://bit.ly/3x0OdXI for details',
};
}
}
}
async function read() {
if (info.initialized) {
const resreq = await axios('https://airtable.com' + info.link, {
headers: {
accept: '*/*',
'accept-language': 'en-US,en;q=0.9',
'sec-ch-ua':
'" Not A;Brand";v="99", "Chromium";v="100", "Google Chrome";v="100"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Linux"',
'sec-fetch-dest': 'empty',
'sec-fetch-mode': 'cors',
'sec-fetch-site': 'same-origin',
'User-Agent':
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.4896.88 Safari/537.36',
'x-time-zone': 'Europe/Berlin',
cookie: info.cookie,
...info.headers,
},
// @ts-ignore
referrerPolicy: 'no-referrer',
body: null,
method: 'GET',
})
.then((response) => {
return response.data;
})
.catch(() => {
throw {
message:
'Error Reading :: Ensure www.airtable.com/<SharedBaseID> is accessible. Refer https://bit.ly/3x0OdXI for details',
};
});
return {
schema: resreq.data,
baseId: info.baseId,
baseInfo: info.baseInfo,
};
} else {
throw {
message: 'Error Initializing :: please try again !!',
};
}
}
async function readView(viewId) {
if (info.initialized) {
const resreq = await axios(
`https://airtable.com/v0.3/view/${viewId}/readData?` +
`stringifiedObjectParams=${encodeURIComponent('{}')}&requestId=${
info.baseInfo.requestId
}&accessPolicy=${encodeURIComponent(
JSON.stringify({
allowedActions: info.baseInfo.allowedActions,
shareId: info.baseInfo.shareId,
applicationId: info.baseInfo.applicationId,
generationNumber: info.baseInfo.generationNumber,
expires: info.baseInfo.expires,
signature: info.baseInfo.signature,
}),
)}`,
{
headers: {
accept: '*/*',
'accept-language': 'en-US,en;q=0.9',
'sec-ch-ua':
'" Not A;Brand";v="99", "Chromium";v="100", "Google Chrome";v="100"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Linux"',
'sec-fetch-dest': 'empty',
'sec-fetch-mode': 'cors',
'sec-fetch-site': 'same-origin',
'User-Agent':
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.4896.88 Safari/537.36',
'x-time-zone': 'Europe/Berlin',
cookie: info.cookie,
...info.headers,
},
// @ts-ignore
referrerPolicy: 'no-referrer',
body: null,
method: 'GET',
},
)
.then((response) => {
return response.data;
})
.catch(() => {
throw {
message:
'Error Reading View :: Ensure www.airtable.com/<SharedBaseID> is accessible. Refer https://bit.ly/3x0OdXI for details',
};
});
return { view: resreq.data };
} else {
throw {
message: 'Error Initializing :: please try again !!',
};
}
}
async function readTemplate(templateId) {
if (!info.initialized) {
await initialize('shrO8aYf3ybwSdDKn');
}
const resreq = await axios(
`https://www.airtable.com/v0.3/exploreApplications/${templateId}`,
{
headers: {
accept: '*/*',
'accept-language': 'en-US,en;q=0.9',
'sec-ch-ua':
'" Not A;Brand";v="99", "Chromium";v="100", "Google Chrome";v="100"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Linux"',
'sec-fetch-dest': 'empty',
'sec-fetch-mode': 'cors',
'sec-fetch-site': 'same-origin',
'User-Agent':
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.4896.88 Safari/537.36',
'x-time-zone': 'Europe/Berlin',
cookie: info.cookie,
...info.headers,
},
// @ts-ignore
referrer: 'https://www.airtable.com/',
referrerPolicy: 'same-origin',
body: null,
method: 'GET',
mode: 'cors',
credentials: 'include',
},
)
.then((response) => {
return response.data;
})
.catch(() => {
throw {
message:
'Error Fetching :: Ensure www.airtable.com/templates/featured/<TemplateID> is accessible.',
};
});
return { template: resreq };
}
function unicodeToChar(text) {
return text.replace(/\\u[\dA-F]{4}/gi, function (match) {
return String.fromCharCode(parseInt(match.replace(/\\u/g, ''), 16));
});
}
export default {
initialize,
read,
readView,
readTemplate,
};

362
packages/nocodb-nest/src/modules/jobs/at-import/helpers/readAndProcessData.ts

@ -0,0 +1,362 @@
/* eslint-disable no-async-promise-executor */
import { RelationTypes, UITypes } from 'nocodb-sdk';
import EntityMap from './EntityMap';
import type { BulkDataAliasService } from '../../../../services/bulk-data-alias.service';
import type { TablesService } from '../../../../services/tables.service';
// @ts-ignore
import type { AirtableBase } from 'airtable/lib/airtable_base';
import type { TableType } from 'nocodb-sdk';
const BULK_DATA_BATCH_SIZE = 500;
const ASSOC_BULK_DATA_BATCH_SIZE = 1000;
const BULK_PARALLEL_PROCESS = 5;
interface AirtableImportContext {
bulkDataService: BulkDataAliasService;
tableService: TablesService;
}
async function readAllData({
table,
fields,
base,
logBasic = (_str) => {},
services,
}: {
table: { title?: string };
fields?;
base: AirtableBase;
logBasic?: (string) => void;
logDetailed?: (string) => void;
services: AirtableImportContext;
}): Promise<EntityMap> {
return new Promise((resolve, reject) => {
let data = null;
const selectParams: any = {
pageSize: 100,
};
if (fields) selectParams.fields = fields;
base(table.title)
.select(selectParams)
.eachPage(
async function page(records, fetchNextPage) {
if (!data) {
data = new EntityMap();
await data.init();
}
for await (const record of records) {
await data.addRow({ id: record.id, ...record.fields });
}
const tmpLength = await data.getCount();
logBasic(
`:: Reading '${table.title}' data :: ${Math.max(
1,
tmpLength - records.length,
)} - ${tmpLength}`,
);
// To fetch the next page of records, call `fetchNextPage`.
// If there are more records, `page` will get called again.
// If there are no more records, `done` will get called.
fetchNextPage();
},
async function done(err) {
if (err) {
console.error(err);
return reject(err);
}
resolve(data);
},
);
});
}
export async function importData({
projectName,
table,
base,
nocoBaseDataProcessing_v2,
sDB,
logDetailed = (_str) => {},
logBasic = (_str) => {},
services,
}: {
projectName: string;
table: { title?: string; id?: string };
fields?;
base: AirtableBase;
logBasic: (string) => void;
logDetailed: (string) => void;
nocoBaseDataProcessing_v2;
sDB;
services: AirtableImportContext;
}): Promise<EntityMap> {
try {
// @ts-ignore
const records = await readAllData({
table,
base,
logDetailed,
logBasic,
});
await new Promise(async (resolve) => {
const readable = records.getStream();
const allRecordsCount = await records.getCount();
const promises = [];
let tempData = [];
let importedCount = 0;
let activeProcess = 0;
readable.on('data', async (record) => {
promises.push(
new Promise(async (resolve) => {
activeProcess++;
if (activeProcess >= BULK_PARALLEL_PROCESS) readable.pause();
const { id: rid, ...fields } = record;
const r = await nocoBaseDataProcessing_v2(sDB, table, {
id: rid,
fields,
});
tempData.push(r);
if (tempData.length >= BULK_DATA_BATCH_SIZE) {
let insertArray = tempData.splice(0, tempData.length);
await services.bulkDataService.bulkDataInsert({
projectName,
tableName: table.title,
body: insertArray,
cookie: {},
});
logBasic(
`:: Importing '${
table.title
}' data :: ${importedCount} - ${Math.min(
importedCount + BULK_DATA_BATCH_SIZE,
allRecordsCount,
)}`,
);
importedCount += insertArray.length;
insertArray = [];
}
activeProcess--;
if (activeProcess < BULK_PARALLEL_PROCESS) readable.resume();
resolve(true);
}),
);
});
readable.on('end', async () => {
await Promise.all(promises);
if (tempData.length > 0) {
await services.bulkDataService.bulkDataInsert({
projectName,
tableName: table.title,
body: tempData,
cookie: {},
});
logBasic(
`:: Importing '${
table.title
}' data :: ${importedCount} - ${Math.min(
importedCount + BULK_DATA_BATCH_SIZE,
allRecordsCount,
)}`,
);
importedCount += tempData.length;
tempData = [];
}
resolve(true);
});
});
return records;
} catch (e) {
console.log(e);
return null;
}
}
export async function importLTARData({
table,
fields,
base,
projectName,
insertedAssocRef = {},
logDetailed = (_str) => {},
logBasic = (_str) => {},
records,
atNcAliasRef,
ncLinkMappingTable,
syncDB,
services,
}: {
projectName: string;
table: { title?: string; id?: string };
fields;
base: AirtableBase;
logDetailed: (string) => void;
logBasic: (string) => void;
insertedAssocRef: { [assocTableId: string]: boolean };
records?: EntityMap;
atNcAliasRef: {
[ncTableId: string]: {
[ncTitle: string]: string;
};
};
ncLinkMappingTable: Record<string, Record<string, any>>[];
syncDB;
services: AirtableImportContext;
}) {
const assocTableMetas: Array<{
modelMeta: { id?: string; title?: string };
colMeta: { title?: string };
curCol: { title?: string };
refCol: { title?: string };
}> = [];
const allData =
records ||
(await readAllData({
table,
fields,
base,
logDetailed,
logBasic,
services,
}));
const modelMeta: any =
await services.tableService.getTableWithAccessibleViews({
tableId: table.id,
user: syncDB.user,
});
for (const colMeta of modelMeta.columns) {
// skip columns which are not LTAR and Many to many
if (
colMeta.uidt !== UITypes.LinkToAnotherRecord ||
colMeta.colOptions.type !== RelationTypes.MANY_TO_MANY
) {
continue;
}
// skip if already inserted
if (colMeta.colOptions.fk_mm_model_id in insertedAssocRef) continue;
// self links: skip if the column under consideration is the add-on column NocoDB creates
if (ncLinkMappingTable.every((a) => a.nc.title !== colMeta.title)) continue;
// mark as inserted
insertedAssocRef[colMeta.colOptions.fk_mm_model_id] = true;
const assocModelMeta: TableType =
(await services.tableService.getTableWithAccessibleViews({
tableId: colMeta.colOptions.fk_mm_model_id,
user: syncDB.user,
})) as any;
// extract associative table and columns meta
assocTableMetas.push({
modelMeta: assocModelMeta,
colMeta,
curCol: assocModelMeta.columns.find(
(c) => c.id === colMeta.colOptions.fk_mm_child_column_id,
),
refCol: assocModelMeta.columns.find(
(c) => c.id === colMeta.colOptions.fk_mm_parent_column_id,
),
});
}
let nestedLinkCnt = 0;
// Iterate over all related M2M associative table
for await (const assocMeta of assocTableMetas) {
let assocTableData = [];
let importedCount = 0;
// extract insert data from records
await new Promise((resolve) => {
const promises = [];
const readable = allData.getStream();
let activeProcess = 0;
readable.on('data', async (record) => {
promises.push(
new Promise(async (resolve) => {
activeProcess++;
if (activeProcess >= BULK_PARALLEL_PROCESS) readable.pause();
const { id: _atId, ...rec } = record;
// todo: use actual alias instead of sanitized
assocTableData.push(
...(
rec?.[atNcAliasRef[table.id][assocMeta.colMeta.title]] || []
).map((id) => ({
[assocMeta.curCol.title]: record.id,
[assocMeta.refCol.title]: id,
})),
);
if (assocTableData.length >= ASSOC_BULK_DATA_BATCH_SIZE) {
let insertArray = assocTableData.splice(0, assocTableData.length);
logBasic(
`:: Importing '${
table.title
}' LTAR data :: ${importedCount} - ${Math.min(
importedCount + ASSOC_BULK_DATA_BATCH_SIZE,
insertArray.length,
)}`,
);
await services.bulkDataService.bulkDataInsert({
projectName,
tableName: assocMeta.modelMeta.title,
body: insertArray,
cookie: {},
});
importedCount += insertArray.length;
insertArray = [];
}
activeProcess--;
if (activeProcess < BULK_PARALLEL_PROCESS) readable.resume();
resolve(true);
}),
);
});
readable.on('end', async () => {
await Promise.all(promises);
if (assocTableData.length >= 0) {
logBasic(
`:: Importing '${
table.title
}' LTAR data :: ${importedCount} - ${Math.min(
importedCount + ASSOC_BULK_DATA_BATCH_SIZE,
assocTableData.length,
)}`,
);
await services.bulkDataService.bulkDataInsert({
projectName,
tableName: assocMeta.modelMeta.title,
body: assocTableData,
cookie: {},
});
importedCount += assocTableData.length;
assocTableData = [];
}
resolve(true);
});
});
nestedLinkCnt += importedCount;
}
return nestedLinkCnt;
}

31
packages/nocodb-nest/src/modules/jobs/at-import/helpers/syncMap.ts

@ -0,0 +1,31 @@
export const mapTbl = {};
// static mapping records between aTblId && ncId
export const addToMappingTbl = function addToMappingTbl(
aTblId,
ncId,
ncName,
parent?,
) {
mapTbl[aTblId] = {
ncId: ncId,
ncParent: parent,
// name added to assist in quick debug
ncName: ncName,
};
};
// get NcID from airtable ID
export const getNcIdFromAtId = function getNcIdFromAtId(aId) {
return mapTbl[aId]?.ncId;
};
// get nc Parent from airtable ID
export const getNcParentFromAtId = function getNcParentFromAtId(aId) {
return mapTbl[aId]?.ncParent;
};
// get nc-title from airtable ID
export const getNcNameFromAtId = function getNcNameFromAtId(aId) {
return mapTbl[aId]?.ncName;
};

6
packages/nocodb-nest/src/modules/jobs/fallback-queue.service.ts

@ -3,6 +3,7 @@ import PQueue from 'p-queue';
import Emittery from 'emittery';
import { DuplicateProcessor } from './export-import/duplicate.processor';
import { JobsEventService } from './jobs-event.service';
import { AtImportProcessor } from './at-import/at-import.processor';
interface Job {
id: string;
@ -22,6 +23,7 @@ export class QueueService {
constructor(
private readonly jobsEventService: JobsEventService,
private readonly duplicateProcessor: DuplicateProcessor,
private readonly atImportProcessor: AtImportProcessor,
) {
this.emitter.on('active', (data: any) => {
const job = this.queueMemory.find(
@ -56,6 +58,10 @@ export class QueueService {
this: this.duplicateProcessor,
fn: this.duplicateProcessor.duplicateBase,
},
'at-import': {
this: this.atImportProcessor,
fn: this.atImportProcessor.job,
},
};
async jobWrapper(job: Job) {

1
packages/nocodb-nest/src/modules/jobs/jobs-event.service.ts

@ -38,6 +38,7 @@ export class JobsEventService {
name: job.name,
id: job.id.toString(),
status: 'failed',
error: error?.message,
});
}

22
packages/nocodb-nest/src/modules/jobs/jobs.gateway.ts

@ -40,10 +40,16 @@ export class JobsGateway implements OnModuleInit {
@SubscribeMessage('subscribe')
async subscribe(
@MessageBody() data: { name: string; id: string } | { id: string },
@MessageBody()
body: { _id: number; data: { id: string; name: string } | any },
@ConnectedSocket() client: Socket,
): Promise<void> {
if ('name' in data) {
const { _id, data } = body;
if (
Object.keys(data).every((k) => ['name', 'id'].includes(k)) &&
data?.name &&
data?.id
) {
const rooms = (await this.jobsService.jobList(data.name)).map(
(j) => `${j.name}-${j.id}`,
);
@ -51,17 +57,17 @@ export class JobsGateway implements OnModuleInit {
if (room) {
client.join(`${data.name}-${data.id}`);
client.emit('subscribed', {
subbed: data.id,
_id,
name: data.name,
id: data.id,
});
}
} else {
const job = await this.jobsService.getJobWithData({ id: data.id });
const job = await this.jobsService.getJobWithData(data);
if (job) {
client.join(`${job.name}-${job.id}`);
client.emit('subscribed', {
subbed: data.id,
_id,
name: job.name,
id: job.id,
});
@ -71,10 +77,12 @@ export class JobsGateway implements OnModuleInit {
@SubscribeMessage('status')
async status(
@MessageBody() data: { name: string; id: string },
@MessageBody() body: { _id: number; data: { id: string; name: string } },
@ConnectedSocket() client: Socket,
): Promise<void> {
const { _id, data } = body;
client.emit('status', {
_id,
id: data.id,
name: data.name,
status: await this.jobsService.jobStatus(data.id),
@ -93,11 +101,13 @@ export class JobsGateway implements OnModuleInit {
| 'failed'
| 'paused'
| 'refresh';
error?: any;
}): Promise<void> {
this.server.to(`${data.name}-${data.id}`).emit('status', {
id: data.id,
name: data.name,
status: data.status,
error: data.error,
});
}

5
packages/nocodb-nest/src/modules/jobs/jobs.module.ts

@ -11,6 +11,8 @@ import { DuplicateProcessor } from './export-import/duplicate.processor';
import { JobsGateway } from './jobs.gateway';
import { QueueService } from './fallback-queue.service';
import { JobsEventService } from './jobs-event.service';
import { AtImportController } from './at-import/at-import.controller';
import { AtImportProcessor } from './at-import/at-import.processor';
@Module({
imports: [
@ -21,7 +23,7 @@ import { JobsEventService } from './jobs-event.service';
name: 'jobs',
}),
],
controllers: [DuplicateController],
controllers: [DuplicateController, AtImportController],
providers: [
QueueService,
JobsGateway,
@ -30,6 +32,7 @@ import { JobsEventService } from './jobs-event.service';
DuplicateProcessor,
ExportService,
ImportService,
AtImportProcessor,
],
})
export class JobsModule {}

6
packages/nocodb-nest/src/modules/jobs/jobs.service.ts

@ -21,17 +21,17 @@ export class JobsService {
async jobList(jobType: string) {
return (
await this.activeQueue.getJobs(['active', 'waiting', 'delayed'])
await this.activeQueue.getJobs(['active', 'waiting', 'delayed', 'paused'])
).filter((j) => j.name === jobType);
}
async getJobWithData(data: any) {
const jobs = await this.activeQueue.getJobs([
'completed',
// 'completed',
'waiting',
'active',
'delayed',
'failed',
// 'failed',
'paused',
]);

4
packages/nocodb-nest/src/modules/metas/metas.module.ts

@ -16,7 +16,6 @@ import { GalleriesController } from '../../controllers/galleries.controller';
import { GridColumnsController } from '../../controllers/grid-columns.controller';
import { GridsController } from '../../controllers/grids.controller';
import { HooksController } from '../../controllers/hooks.controller';
import { ImportController } from '../../controllers/imports/import.controller';
import { KanbansController } from '../../controllers/kanbans.controller';
import { MapsController } from '../../controllers/maps.controller';
import { MetaDiffsController } from '../../controllers/meta-diffs.controller';
@ -98,7 +97,6 @@ import { DatasModule } from '../datas/datas.module';
GridColumnsController,
GridsController,
HooksController,
ImportController,
KanbansController,
MapsController,
MetaDiffsController,
@ -170,6 +168,8 @@ import { DatasModule } from '../datas/datas.module';
GalleriesService,
KanbansService,
ProjectsService,
AttachmentsService,
ProjectUsersService,
],
})
export class MetasModule {}

12
packages/nocodb-nest/src/services/socket.gateway.ts

@ -25,7 +25,6 @@ function getHash(str) {
export class SocketGateway implements OnModuleInit {
// private server: HttpServer;
private clients: { [id: string]: Socket } = {};
private _jobs: { [id: string]: { last_message: any } } = {};
constructor(
private jwtStrategy: JwtStrategy,
@ -59,21 +58,10 @@ export class SocketGateway implements OnModuleInit {
socket.on('event', (args) => {
T.event({ ...args, id });
});
socket.on('subscribe', (room) => {
if (room in this.jobs) {
socket.join(room);
socket.emit('job');
socket.emit('progress', this.jobs[room].last_message);
}
});
});
}
public get io() {
return this.server;
}
public get jobs() {
return this._jobs;
}
}

Loading…
Cancel
Save