Browse Source

Merge pull request #2218 from nocodb/feat/at-import-optimization

Feat - Import optimization
pull/2219/head
Pranav C 2 years ago committed by GitHub
parent
commit
d81e7bf968
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      packages/nc-gui/components/import/ImportFromAirtable.vue
  2. 10
      packages/nc-gui/components/project/spreadsheet/components/editableCell/EditableAttachmentCell.vue
  3. 4
      packages/nc-gui/components/project/spreadsheet/helpers/imageExt.js
  4. 11
      packages/nocodb/src/lib/dataMapper/lib/sql/BaseModelSqlv2.ts
  5. 2
      packages/nocodb/src/lib/noco/Noco.ts
  6. 305
      packages/nocodb/src/lib/noco/meta/api/sync/helpers/job.ts
  7. 252
      packages/nocodb/src/lib/noco/meta/api/sync/helpers/readAndProcessData.ts

2
packages/nc-gui/components/import/ImportFromAirtable.vue

@ -169,7 +169,7 @@
<v-icon color="green" size="15">
mdi-loading mdi-spin
</v-icon>
<span class="caption nc-text">Syncing
<span class="caption nc-text">Importing
</span>
<!-- <div class="nc-progress" />-->
</div>

10
packages/nc-gui/components/project/spreadsheet/components/editableCell/EditableAttachmentCell.vue

@ -27,7 +27,7 @@
<v-tooltip bottom>
<template #activator="{on}">
<v-img
v-if="isImage(item.title)"
v-if="isImage(item.title, item.mimetype)"
lazy-src="https://via.placeholder.com/60.png?text=Loading..."
alt="#"
max-height="99px"
@ -150,7 +150,7 @@
</v-icon>
<div class="pa-2 d-flex align-center" style="height:200px">
<img
v-if="isImage(item.title)"
v-if="isImage(item.title, item.mimetype)"
style="max-height: 100%;max-width: 100%"
alt="#"
:src="item.url || item.data"
@ -195,7 +195,7 @@
</p>
<div style="width:90vh;height:calc(100vh - 150px)" class="d-flex align-center justify-center">
<img
v-if="isImage(item.title)"
v-if="isImage(item.title, item.mimetype)"
style="max-width:90vh;max-height:calc(100vh - 100px)"
:src="item.url || item.data"
>
@ -234,7 +234,7 @@
@click="carousel = i"
>
<img
v-if="isImage(item.title)"
v-if="isImage(item.title, item.mimetype)"
style="max-width:100%;max-height:100%"
:src="item.url || item.data"
>
@ -341,7 +341,7 @@ export default {
if (this.isPublicForm) {
this.localFilesState.push(...Array.from(this.$refs.file.files).map((file) => {
const res = { file, title: file.name }
if (isImage(file.name)) {
if (isImage(file.name, file.mimetype)) {
const reader = new FileReader()
reader.onload = (e) => {
this.$set(res, 'data', e.target.result)

4
packages/nc-gui/components/project/spreadsheet/helpers/imageExt.js

@ -12,8 +12,8 @@ const imageExt = [
export default imageExt
const isImage = (name) => {
return imageExt.some(e => name.toLowerCase().endsWith(`.${e}`))
const isImage = (name, type) => {
return imageExt.some(e => name.toLowerCase().endsWith(`.${e}`)) || (type || '').startsWith('image/')
}
export { isImage }

11
packages/nocodb/src/lib/dataMapper/lib/sql/BaseModelSqlv2.ts

@ -1513,7 +1513,14 @@ class BaseModelSqlv2 {
}
}
async bulkInsert(datas: any[]) {
async bulkInsert(
datas: any[],
{
chunkSize: _chunkSize = 100
}: {
chunkSize?: number;
} = {}
) {
try {
const insertDatas = await Promise.all(
datas.map(async d => {
@ -1536,7 +1543,7 @@ class BaseModelSqlv2 {
// fallbacks to `10` if database client is sqlite
// to avoid `too many SQL variables` error
// refer : https://www.sqlite.org/limits.html
const chunkSize = this.isSqlite ? 10 : 50;
const chunkSize = this.isSqlite ? 10 : _chunkSize;
const response = await this.dbDriver
.batchInsert(this.model.table_name, insertDatas, chunkSize)

2
packages/nocodb/src/lib/noco/Noco.ts

@ -211,7 +211,7 @@ export default class Noco {
this.router.use(cookieParser());
this.router.use(
bodyParser.json({
limit: process.env.NC_REQUEST_BODY_SIZE || 1024 * 1024
limit: process.env.NC_REQUEST_BODY_SIZE || '50mb'
})
);
this.router.use(morgan('tiny'));

305
packages/nocodb/src/lib/noco/meta/api/sync/helpers/job.ts

@ -2,17 +2,16 @@ import FetchAT from './fetchAT';
import { UITypes } from 'nocodb-sdk';
import { Tele } from 'nc-help';
// import * as sMap from './syncMap';
import FormData from 'form-data';
import { Api } from 'nocodb-sdk';
import axios from 'axios';
import Airtable from 'airtable';
import jsonfile from 'jsonfile';
import hash from 'object-hash';
import dayjs from 'dayjs';
import utc from 'dayjs/plugin/utc';
import { importData, importLTARData } from './readAndProcessData';
dayjs.extend(utc);
@ -82,6 +81,13 @@ export default async (
const ncSysFields = { id: 'ncRecordId', hash: 'ncRecordHash' };
const storeLinks = false;
const ncLinkDataStore: any = {};
const insertedAssocRef: any = {};
const atNcAliasRef: {
[ncTableId: string]: {
[ncTitle: string]: string;
};
} = {};
const uniqueTableNameGen = getUniqueNameGenerator('sheet');
@ -102,6 +108,10 @@ export default async (
migrationSkipLog: {
count: 0,
log: []
},
data: {
records: 0,
nestedLinks: 0
}
};
@ -117,6 +127,13 @@ export default async (
async function getAirtableSchema(sDB) {
const start = Date.now();
if (!sDB.shareId)
throw {
message:
'Invalid Shared Base ID :: Ensure www.airtable.com/<SharedBaseID> is accessible. Refer https://bit.ly/3x0OdXI for details'
};
if (sDB.shareId.startsWith('exp')) {
const template = await FetchAT.readTemplate(sDB.shareId);
await FetchAT.initialize(template.template.exploreApplication.shareId);
@ -208,6 +225,7 @@ export default async (
// aTbl: retrieve table name from table ID
//
// @ts-ignore
function aTbl_getTableName(tblId) {
const sheetObj = g_aTblSchema.find(tbl => tbl.id === tblId);
return {
@ -607,6 +625,10 @@ export default async (
aTblLinkColumns[i].name
);
// LTAR alias ref to AT
atNcAliasRef[srcTbl.id] = atNcAliasRef[srcTbl.id] || {};
atNcAliasRef[srcTbl.id][ncName.title] = aTblLinkColumns[i].name;
logDetailed(
`NC API: dbTableColumn.create LinkToAnotherRecord ${ncName.title}`
);
@ -1188,38 +1210,9 @@ export default async (
////////// Data processing
async function nocoLinkProcessing(projName, table, record, _field) {
const rec = record.fields;
for (const [key, value] of Object.entries(rec)) {
const refRowIdList: any = value;
const referenceColumnName = key;
if (refRowIdList.length) {
for (let i = 0; i < refRowIdList.length; i++) {
logDetailed(
`NC API: dbTableRow.nestedAdd ${record.id}/mm/${referenceColumnName}/${refRowIdList[0][i]}`
);
const _perfStart = recordPerfStart();
await api.dbTableRow.nestedAdd(
'noco',
projName,
table.id,
`${record.id}`,
'mm',
encodeURIComponent(referenceColumnName),
`${refRowIdList[i]}`
);
recordPerfStats(_perfStart, 'dbTableRow.nestedAdd');
}
}
}
}
async function nocoBaseDataProcessing_v2(sDB, table, record) {
const recordHash = hash(record);
const rec = record.fields;
const rec = { ...record.fields };
// kludge -
// trim spaces on either side of column name
@ -1313,51 +1306,26 @@ export default async (
break;
case UITypes.Attachment:
if (syncDB.options.syncLookup) rec[key] = null;
if (!syncDB.options.syncAttachment) rec[key] = null;
else {
const tempArr = [];
for (const v of value) {
const binaryImage = await axios
.get(v.url, {
responseType: 'stream',
headers: {
'Content-Type': v.type
}
})
.then(response => {
return response.data;
})
.catch(error => {
console.log(error);
return false;
});
const imageFile: any = new FormData();
imageFile.append('files', binaryImage, {
filename: v.filename.includes('?')
? v.filename.split('?')[0]
: v.filename
});
const rs = await axios
.post(sDB.baseURL + '/api/v1/db/storage/upload', imageFile, {
params: {
path: `noco/${sDB.projectName}/${table.title}/${key}`
},
headers: {
'Content-Type': `multipart/form-data; boundary=${imageFile._boundary}`,
'xc-auth': sDB.authToken
}
})
.then(response => {
return response.data;
})
.catch(e => {
console.log(e);
});
tempArr.push(...rs);
let tempArr = [];
try {
tempArr = await api.storage.uploadByUrl(
{
path: `noco/${sDB.projectName}/${table.title}/${key}`
},
value?.map(attachment => ({
fileName: attachment.filename?.split('?')?.[0],
url: attachment.url,
size: attachment.size,
mimetype: attachment.type
}))
);
} catch (e) {
console.log(e);
}
rec[key] = JSON.stringify(tempArr);
}
break;
@ -1374,71 +1342,7 @@ export default async (
return rec;
}
async function nocoReadData(sDB, table) {
ncLinkDataStore[table.title] = {};
const insertJobs: Promise<any>[] = [];
return new Promise((resolve, reject) => {
base(table.title)
.select({
pageSize: 100
// maxRecords: 1,
})
.eachPage(
async function page(records, fetchNextPage) {
// console.log(JSON.stringify(records, null, 2));
// This function (`page`) will get called for each page of records.
logBasic(
`:: ${table.title} : ${recordCnt + 1} ~ ${(recordCnt += 100)}`
);
// await Promise.all(
// records.map(record => _callback(sDB, table, record))
// );
const ncRecords = [];
for (let i = 0; i < records.length; i++) {
const r = await nocoBaseDataProcessing_v2(sDB, table, records[i]);
ncRecords.push(r);
}
// wait for previous job's to finish
await Promise.all(insertJobs);
const _perfStart = recordPerfStart();
insertJobs.push(
api.dbTableRow.bulkCreate(
'nc',
sDB.projectName,
table.id, // encodeURIComponent(table.title),
ncRecords
)
);
recordPerfStats(_perfStart, 'dbTableRow.bulkCreate');
// To fetch the next page of records, call `fetchNextPage`.
// If there are more records, `page` will get called again.
// If there are no more records, `done` will get called.
// logBasic(
// `:: ${Date.now()} Awaiting response from Airtable Data API ...`
// );
fetchNextPage();
},
async function done(err) {
if (err) {
console.error(err);
reject(err);
}
// wait for all jobs to be completed
await Promise.all(insertJobs);
resolve(null);
}
);
});
}
// @ts-ignore
async function nocoReadDataSelected(projName, table, callback, fields) {
return new Promise((resolve, reject) => {
base(table.title)
@ -1851,6 +1755,8 @@ export default async (
logBasic(`:: Grid: ${rtc.view.grid}`);
logBasic(`:: Gallery: ${rtc.view.gallery}`);
logBasic(`:: Form: ${rtc.view.form}`);
logBasic(`:: Total Records: ${rtc.data.records}`);
logBasic(`:: Total Nested Links: ${rtc.data.nestedLinks}`);
const duration = Date.now() - start;
logBasic(`:: Migration time: ${duration}`);
@ -1890,7 +1796,9 @@ export default async (
axios: {
count: rtc.fetchAt.count,
time: rtc.fetchAt.time
}
},
totalRecords: rtc.data.records,
nestedLinks: rtc.data.nestedLinks
}
}
});
@ -2186,6 +2094,8 @@ export default async (
logBasic('Reading Records...');
const recordsMap = {};
for (let i = 0; i < ncTblList.list.length; i++) {
const _perfStart = recordPerfStart();
const ncTbl = await api.dbTable.read(ncTblList.list[i].id);
@ -2196,52 +2106,91 @@ export default async (
continue;
recordCnt = 0;
await nocoReadData(syncDB, ncTbl);
// await nocoReadData(syncDB, ncTbl);
recordsMap[ncTbl.id] = await importData({
projectName: syncDB.projectName,
table: ncTbl,
base,
api,
logBasic,
nocoBaseDataProcessing_v2,
sDB: syncDB,
logDetailed
});
rtc.data.records += recordsMap[ncTbl.id].length;
logDetailed(`Data inserted from ${ncTbl.title}`);
}
logBasic('Configuring Record Links...');
for (let i = 0; i < ncTblList.list.length; i++) {
const ncTbl = await api.dbTable.read(ncTblList.list[i].id);
rtc.data.nestedLinks += await importLTARData({
table: ncTbl,
projectName: syncDB.projectName,
api,
base,
fields: null, //Object.values(tblLinkGroup).flat(),
logBasic,
insertedAssocRef,
logDetailed,
records: recordsMap[ncTbl.id],
atNcAliasRef
});
}
if (storeLinks) {
// const insertJobs: Promise<any>[] = [];
for (const [pTitle, v] of Object.entries(ncLinkDataStore)) {
logBasic(`:: ${pTitle}`);
for (const [, record] of Object.entries(v)) {
const tbl = ncTblList.list.find(a => a.title === pTitle);
await nocoLinkProcessing(syncDB.projectName, tbl, record, 0);
// insertJobs.push(
// nocoLinkProcessing(syncDB.projectName, tbl, record, 0)
// );
}
}
// for (const [pTitle, v] of Object.entries(ncLinkDataStore)) {
// logBasic(`:: ${pTitle}`);
// for (const [, record] of Object.entries(v)) {
// const tbl = ncTblList.list.find(a => a.title === pTitle);
// await nocoLinkProcessing(syncDB.projectName, tbl, record, 0);
// // insertJobs.push(
// // nocoLinkProcessing(syncDB.projectName, tbl, record, 0)
// // );
// }
// }
// await Promise.all(insertJobs);
// await nocoLinkProcessing(syncDB.projectName, 0, 0, 0);
} else {
// create link groups (table: link fields)
const tblLinkGroup = {};
for (let idx = 0; idx < ncLinkMappingTable.length; idx++) {
const x = ncLinkMappingTable[idx];
if (tblLinkGroup[x.aTbl.tblId] === undefined)
tblLinkGroup[x.aTbl.tblId] = [x.aTbl.name];
else tblLinkGroup[x.aTbl.tblId].push(x.aTbl.name);
}
for (const [k, v] of Object.entries(tblLinkGroup)) {
const ncTbl = await nc_getTableSchema(aTbl_getTableName(k).tn);
// not a migrated table, skip
if (undefined === aTblSchema.find(x => x.name === ncTbl.title))
continue;
recordCnt = 0;
await nocoReadDataSelected(
syncDB.projectName,
ncTbl,
async (projName, table, record, _field) => {
await nocoLinkProcessing(projName, table, record, _field);
},
v
);
}
// // create link groups (table: link fields)
// // const tblLinkGroup = {};
// // for (let idx = 0; idx < ncLinkMappingTable.length; idx++) {
// // const x = ncLinkMappingTable[idx];
// // if (tblLinkGroup[x.aTbl.tblId] === undefined)
// // tblLinkGroup[x.aTbl.tblId] = [x.aTbl.name];
// // else tblLinkGroup[x.aTbl.tblId].push(x.aTbl.name);
// // }
// //
// // const ncTbl = await nc_getTableSchema(aTbl_getTableName(k).tn);
// //
// // await importLTARData({
// // table: ncTbl,
// // projectName: syncDB.projectName,
// // api,
// // base,
// // fields: Object.values(tblLinkGroup).flat(),
// // logBasic
// // });
// for (const [k, v] of Object.entries(tblLinkGroup)) {
// const ncTbl = await nc_getTableSchema(aTbl_getTableName(k).tn);
//
// // not a migrated table, skip
// if (undefined === aTblSchema.find(x => x.name === ncTbl.title))
// continue;
//
// recordCnt = 0;
// await nocoReadDataSelected(
// syncDB.projectName,
// ncTbl,
// async (projName, table, record, _field) => {
// await nocoLinkProcessing(projName, table, record, _field);
// },
// v
// );
// }
}
} catch (error) {
logDetailed(

252
packages/nocodb/src/lib/noco/meta/api/sync/helpers/readAndProcessData.ts

@ -0,0 +1,252 @@
import { AirtableBase } from 'airtable/lib/airtable_base';
import { Api, RelationTypes, TableType, UITypes } from 'nocodb-sdk';
const BULK_DATA_BATCH_SIZE = 2000;
const ASSOC_BULK_DATA_BATCH_SIZE = 5000;
async function readAllData({
table,
fields,
base,
logBasic = _str => {},
triggerThreshold = BULK_DATA_BATCH_SIZE,
onThreshold = async _rec => {}
}: {
table: { title?: string };
fields?;
base: AirtableBase;
logBasic?: (string) => void;
logDetailed?: (string) => void;
triggerThreshold?: number;
onThreshold?: (
records: Array<{ fields: any; id: string }>,
allRecords?: Array<{ fields: any; id: string }>
) => Promise<void>;
}): Promise<Array<any>> {
return new Promise((resolve, reject) => {
const data = [];
let thresholdCbkData = [];
const selectParams: any = {
pageSize: 100
};
if (fields) selectParams.fields = fields;
const insertJobs: Promise<any>[] = [];
base(table.title)
.select(selectParams)
.eachPage(
async function page(records, fetchNextPage) {
data.push(...records);
thresholdCbkData.push(...records);
logBasic(
`:: Reading '${table.title}' data :: ${Math.max(
1,
data.length - records.length
)} - ${data.length}`
);
if (thresholdCbkData.length >= triggerThreshold) {
await Promise.all(insertJobs);
insertJobs.push(onThreshold(thresholdCbkData, data));
thresholdCbkData = [];
}
// To fetch the next page of records, call `fetchNextPage`.
// If there are more records, `page` will get called again.
// If there are no more records, `done` will get called.
fetchNextPage();
},
async function done(err) {
if (err) {
console.error(err);
return reject(err);
}
if (thresholdCbkData.length) {
await Promise.all(insertJobs);
await onThreshold(thresholdCbkData, data);
thresholdCbkData = [];
}
resolve(data);
}
);
});
}
export async function importData({
projectName,
table,
base,
api,
nocoBaseDataProcessing_v2,
sDB,
logDetailed = _str => {},
logBasic = _str => {}
}: {
projectName: string;
table: { title?: string; id?: string };
fields?;
base: AirtableBase;
logBasic: (string) => void;
logDetailed: (string) => void;
api: Api<any>;
nocoBaseDataProcessing_v2;
sDB;
}): Promise<any> {
try {
// @ts-ignore
const records = await readAllData({
table,
base,
logDetailed,
logBasic,
async onThreshold(records, allRecords) {
const allData = [];
for (let i = 0; i < records.length; i++) {
const r = await nocoBaseDataProcessing_v2(sDB, table, records[i]);
allData.push(r);
}
logBasic(
`:: Importing '${table.title}' data :: ${allRecords.length -
records.length +
1} - ${allRecords.length}`
);
await api.dbTableRow.bulkCreate('nc', projectName, table.id, allData);
}
});
return records;
} catch (e) {
console.log(e);
return 0;
}
}
export async function importLTARData({
table,
fields,
base,
api,
projectName,
insertedAssocRef = {},
logDetailed = _str => {},
logBasic = _str => {},
records,
atNcAliasRef
}: {
projectName: string;
table: { title?: string; id?: string };
fields;
base: AirtableBase;
logDetailed: (string) => void;
logBasic: (string) => void;
api: Api<any>;
insertedAssocRef: { [assocTableId: string]: boolean };
records?: Array<{ fields: any; id: string }>;
atNcAliasRef: {
[ncTableId: string]: {
[ncTitle: string]: string;
};
};
}) {
const assocTableMetas: Array<{
modelMeta: { id?: string; title?: string };
colMeta: { title?: string };
curCol: { title?: string };
refCol: { title?: string };
}> = [];
const allData =
records ||
(await readAllData({
table,
fields,
base,
logDetailed,
logBasic
}));
const modelMeta: any = await api.dbTable.read(table.id);
for (const colMeta of modelMeta.columns) {
// skip columns which are not LTAR and Many to many
if (
colMeta.uidt !== UITypes.LinkToAnotherRecord ||
colMeta.colOptions.type !== RelationTypes.MANY_TO_MANY
) {
continue;
}
// skip if already inserted
if (colMeta.colOptions.fk_mm_model_id in insertedAssocRef) continue;
// mark as inserted
insertedAssocRef[colMeta.colOptions.fk_mm_model_id] = true;
const assocModelMeta: TableType = (await api.dbTable.read(
colMeta.colOptions.fk_mm_model_id
)) as any;
// extract associative table and columns meta
assocTableMetas.push({
modelMeta: assocModelMeta,
colMeta,
curCol: assocModelMeta.columns.find(
c => c.id === colMeta.colOptions.fk_mm_child_column_id
),
refCol: assocModelMeta.columns.find(
c => c.id === colMeta.colOptions.fk_mm_parent_column_id
)
});
}
let nestedLinkCnt = 0;
// Iterate over all related M2M associative table
for (const assocMeta of assocTableMetas) {
const assocTableData = [];
// extract insert data from records
for (const record of allData) {
const rec = record.fields;
// todo: use actual alias instead of sanitized
assocTableData.push(
...(rec?.[atNcAliasRef[table.id][assocMeta.colMeta.title]] || []).map(
id => ({
[assocMeta.curCol.title]: record.id,
[assocMeta.refCol.title]: id
})
)
);
}
nestedLinkCnt += assocTableData.length;
// Insert datas as chunks of size `ASSOC_BULK_DATA_BATCH_SIZE`
for (
let i = 0;
i < assocTableData.length;
i += ASSOC_BULK_DATA_BATCH_SIZE
) {
logBasic(
`:: Importing '${table.title}' LTAR data :: ${i + 1} - ${Math.min(
i + ASSOC_BULK_DATA_BATCH_SIZE,
assocTableData.length
)}`
);
console.log(
assocTableData.slice(i, i + ASSOC_BULK_DATA_BATCH_SIZE).length
);
await api.dbTableRow.bulkCreate(
'nc',
projectName,
assocMeta.modelMeta.id,
assocTableData.slice(i, i + ASSOC_BULK_DATA_BATCH_SIZE)
);
}
}
return nestedLinkCnt;
}
Loading…
Cancel
Save