From d69337e8e855eeb7ca4fd1ed774be0841db50d8e Mon Sep 17 00:00:00 2001 From: mertmit Date: Sun, 23 Oct 2022 22:26:52 +0300 Subject: [PATCH] feat: use dynamic entity map for AT import Signed-off-by: mertmit --- .../lib/meta/api/sync/helpers/EntityMap.ts | 18 +- .../src/lib/meta/api/sync/helpers/job.ts | 86 ++++---- .../api/sync/helpers/readAndProcessData.ts | 206 +++++++++++------- 3 files changed, 182 insertions(+), 128 deletions(-) diff --git a/packages/nocodb/src/lib/meta/api/sync/helpers/EntityMap.ts b/packages/nocodb/src/lib/meta/api/sync/helpers/EntityMap.ts index 9664779a28..311c38e477 100644 --- a/packages/nocodb/src/lib/meta/api/sync/helpers/EntityMap.ts +++ b/packages/nocodb/src/lib/meta/api/sync/helpers/EntityMap.ts @@ -92,7 +92,11 @@ class EntityMap { if (rs) { for (const key of Object.keys(rs)) { if (rs[key] && rs[key].startsWith('JSON::')) { - rs[key] = JSON.parse(rs[key].replace('JSON::', '')); + try { + rs[key] = JSON.parse(rs[key].replace('JSON::', '')); + } catch (e) { + console.log(e); + } } } } @@ -136,7 +140,11 @@ class EntityMap { for (const row of rs) { for (const key of Object.keys(row)) { if (row[key] && row[key].startsWith('JSON::')) { - row[key] = JSON.parse(row[key].replace('JSON::', '')); + try { + row[key] = JSON.parse(row[key].replace('JSON::', '')); + } catch (e) { + console.log(e); + } } } } @@ -168,7 +176,11 @@ class DBStream extends Readable { if (result) { for (const key of Object.keys(result)) { if (result[key] && result[key].startsWith('JSON::')) { - result[key] = JSON.parse(result[key].replace('JSON::', '')); + try { + result[key] = JSON.parse(result[key].replace('JSON::', '')); + } catch (e) { + console.log(e); + } } } } diff --git a/packages/nocodb/src/lib/meta/api/sync/helpers/job.ts b/packages/nocodb/src/lib/meta/api/sync/helpers/job.ts index 39202e609b..ff504bc0d5 100644 --- a/packages/nocodb/src/lib/meta/api/sync/helpers/job.ts +++ b/packages/nocodb/src/lib/meta/api/sync/helpers/job.ts @@ -14,6 +14,8 @@ import utc from 'dayjs/plugin/utc'; import tinycolor from 'tinycolor2'; import { importData, importLTARData } from './readAndProcessData'; +import EntityMap from './EntityMap'; + dayjs.extend(utc); const selectColors = { @@ -67,32 +69,28 @@ export default async ( syncDB: AirtableSyncConfig, progress: (data: { msg?: string; level?: any }) => void ) => { - const sMap = { - mapTbl: {}, + const sMapEM = new EntityMap('aTblId', 'ncId', 'ncName', 'ncParent'); + await sMapEM.init(); + const sMap = { // static mapping records between aTblId && ncId - addToMappingTbl(aTblId, ncId, ncName, parent?) { - this.mapTbl[aTblId] = { - ncId: ncId, - ncParent: parent, - // name added to assist in quick debug - ncName: ncName, - }; + async addToMappingTbl(aTblId, ncId, ncName, ncParent?) { + await sMapEM.addRow({ aTblId, ncId, ncName, ncParent }); }, // get NcID from airtable ID - getNcIdFromAtId(aId) { - return this.mapTbl[aId]?.ncId; + async getNcIdFromAtId(aId) { + return (await sMapEM.getRow('aTblId', aId, ['ncId']))?.ncId; }, // get nc Parent from airtable ID - getNcParentFromAtId(aId) { - return this.mapTbl[aId]?.ncParent; + async getNcParentFromAtId(aId) { + return (await sMapEM.getRow('aTblId', aId, ['ncParent']))?.ncParent; }, // get nc-title from airtable ID - getNcNameFromAtId(aId) { - return this.mapTbl[aId]?.ncName; + async getNcNameFromAtId(aId) { + return (await sMapEM.getRow('aTblId', aId, ['ncName']))?.ncName; }, }; @@ -333,8 +331,8 @@ export default async ( // let ncCol = ncTbl.columns.find(x => x.title === aTblField.cn); // return ncCol; - const ncTblId = sMap.getNcParentFromAtId(aTblFieldId); - const ncColId = sMap.getNcIdFromAtId(aTblFieldId); + const ncTblId = await sMap.getNcParentFromAtId(aTblFieldId); + const ncColId = await sMap.getNcIdFromAtId(aTblFieldId); // not migrated column, skip if (ncColId === undefined || ncTblId === undefined) return 0; @@ -424,7 +422,7 @@ export default async ( // retrieve additional options associated with selected data types // - function getNocoTypeOptions(col: any): any { + async function getNocoTypeOptions(col: any): Promise { switch (col.type) { case 'select': case 'multiSelect': { @@ -457,7 +455,7 @@ export default async ( : tinycolor.random().toHexString(), }); - sMap.addToMappingTbl( + await sMap.addToMappingTbl( (value as any).id, undefined, (value as any).name @@ -472,7 +470,7 @@ export default async ( // convert to Nc schema (basic, excluding relations) // - function tablesPrepare(tblSchema: any[]) { + async function tablesPrepare(tblSchema: any[]) { const tables: any[] = []; for (let i = 0; i < tblSchema.length; ++i) { @@ -569,7 +567,7 @@ export default async ( } // additional column parameters when applicable - const colOptions = getNocoTypeOptions(col); + const colOptions = await getNocoTypeOptions(col); switch (colOptions.type) { case 'select': @@ -602,7 +600,7 @@ export default async ( async function nocoCreateBaseSchema(aTblSchema) { // base schema preparation: exclude - const tables: any[] = tablesPrepare(aTblSchema); + const tables: any[] = await tablesPrepare(aTblSchema); // for each table schema, create nc table for (let idx = 0; idx < tables.length; idx++) { @@ -696,7 +694,7 @@ export default async ( if (!nc_isLinkExists(aTblLinkColumns[i].id)) { // parent table ID // let srcTableId = (await nc_getTableSchema(aTblSchema[idx].name)).id; - const srcTableId = sMap.getNcIdFromAtId(aTblSchema[idx].id); + const srcTableId = await sMap.getNcIdFromAtId(aTblSchema[idx].id); // find child table name from symmetric column ID specified // self link, symmetricColumnId field will be undefined @@ -911,7 +909,7 @@ export default async ( // parent table ID // let srcTableId = (await nc_getTableSchema(aTblSchema[idx].name)).id; - const srcTableId = sMap.getNcIdFromAtId(aTblSchema[idx].id); + const srcTableId = await sMap.getNcIdFromAtId(aTblSchema[idx].id); const srcTableSchema = ncSchema.tablesById[srcTableId]; if (aTblColumns.length) { @@ -939,10 +937,10 @@ export default async ( continue; } - const ncRelationColumnId = sMap.getNcIdFromAtId( + const ncRelationColumnId = await sMap.getNcIdFromAtId( aTblColumns[i].typeOptions.relationColumnId ); - const ncLookupColumnId = sMap.getNcIdFromAtId( + const ncLookupColumnId = await sMap.getNcIdFromAtId( aTblColumns[i].typeOptions.foreignTableRollupColumnId ); @@ -1015,10 +1013,10 @@ export default async ( const srcTableId = nestedLookupTbl[0].srcTableId; const srcTableSchema = ncSchema.tablesById[srcTableId]; - const ncRelationColumnId = sMap.getNcIdFromAtId( + const ncRelationColumnId = await sMap.getNcIdFromAtId( nestedLookupTbl[0].typeOptions.relationColumnId ); - const ncLookupColumnId = sMap.getNcIdFromAtId( + const ncLookupColumnId = await sMap.getNcIdFromAtId( nestedLookupTbl[0].typeOptions.foreignTableRollupColumnId ); @@ -1101,7 +1099,7 @@ export default async ( // parent table ID // let srcTableId = (await nc_getTableSchema(aTblSchema[idx].name)).id; - const srcTableId = sMap.getNcIdFromAtId(aTblSchema[idx].id); + const srcTableId = await sMap.getNcIdFromAtId(aTblSchema[idx].id); const srcTableSchema = ncSchema.tablesById[srcTableId]; if (aTblColumns.length) { @@ -1146,10 +1144,10 @@ export default async ( continue; } - const ncRelationColumnId = sMap.getNcIdFromAtId( + const ncRelationColumnId = await sMap.getNcIdFromAtId( aTblColumns[i].typeOptions.relationColumnId ); - const ncRollupColumnId = sMap.getNcIdFromAtId( + const ncRollupColumnId = await sMap.getNcIdFromAtId( aTblColumns[i].typeOptions.foreignTableRollupColumnId ); @@ -1219,10 +1217,10 @@ export default async ( const srcTableId = nestedLookupTbl[0].srcTableId; const srcTableSchema = ncSchema.tablesById[srcTableId]; - const ncRelationColumnId = sMap.getNcIdFromAtId( + const ncRelationColumnId = await sMap.getNcIdFromAtId( nestedLookupTbl[0].typeOptions.relationColumnId ); - const ncLookupColumnId = sMap.getNcIdFromAtId( + const ncLookupColumnId = await sMap.getNcIdFromAtId( nestedLookupTbl[0].typeOptions.foreignTableRollupColumnId ); @@ -1278,7 +1276,7 @@ export default async ( ); const pColId = aTblSchema[idx].primaryColumnId; - const ncColId = sMap.getNcIdFromAtId(pColId); + const ncColId = await sMap.getNcIdFromAtId(pColId); // skip primary column configuration if we field not migrated if (ncColId) { @@ -1288,7 +1286,7 @@ export default async ( recordPerfStats(_perfStart, 'dbTableColumn.primaryColumnSet'); // update schema - const ncTblId = sMap.getNcIdFromAtId(aTblSchema[idx].id); + const ncTblId = await sMap.getNcIdFromAtId(aTblSchema[idx].id); await updateNcTblSchemaById(ncTblId); } } @@ -1408,7 +1406,7 @@ export default async ( case UITypes.MultiSelect: rec[key] = value - .map((v) => { + ?.map((v) => { if (v === '') { return 'nc_empty'; } @@ -1567,7 +1565,7 @@ export default async ( async function nocoConfigureFormView(sDB, aTblSchema) { if (!sDB.options.syncViews) return; for (let idx = 0; idx < aTblSchema.length; idx++) { - const tblId = sMap.getNcIdFromAtId(aTblSchema[idx].id); + const tblId = await sMap.getNcIdFromAtId(aTblSchema[idx].id); const formViews = aTblSchema[idx].views.filter((x) => x.type === 'form'); const configuredViews = rtc.view.grid + rtc.view.gallery + rtc.view.form; @@ -1639,7 +1637,7 @@ export default async ( async function nocoConfigureGridView(sDB, aTblSchema) { for (let idx = 0; idx < aTblSchema.length; idx++) { - const tblId = sMap.getNcIdFromAtId(aTblSchema[idx].id); + const tblId = await sMap.getNcIdFromAtId(aTblSchema[idx].id); const gridViews = aTblSchema[idx].views.filter((x) => x.type === 'grid'); let viewCnt = idx; @@ -1955,7 +1953,7 @@ export default async ( // one of not migrated column; if (!colSchema) { updateMigrationSkipLog( - sMap.getNcNameFromAtId(viewId), + await sMap.getNcNameFromAtId(viewId), colSchema.title, colSchema.uidt, `filter config skipped; column not migrated` @@ -1970,7 +1968,7 @@ export default async ( if (datatype === UITypes.Date || datatype === UITypes.DateTime) { // skip filters over data datatype updateMigrationSkipLog( - sMap.getNcNameFromAtId(viewId), + await sMap.getNcNameFromAtId(viewId), colSchema.title, colSchema.uidt, `filter config skipped; filter over date datatype not supported` @@ -1990,7 +1988,7 @@ export default async ( fk_column_id: columnId, logical_op: f.conjunction, comparison_op: filterMap[filter.operator], - value: sMap.getNcNameFromAtId(filter.value[i]), + value: await sMap.getNcNameFromAtId(filter.value[i]), }; ncFilters.push(fx); } @@ -2001,7 +1999,7 @@ export default async ( fk_column_id: columnId, logical_op: f.conjunction, comparison_op: filterMap[filter.operator], - value: sMap.getNcNameFromAtId(filter.value), + value: await sMap.getNcNameFromAtId(filter.value), }; ncFilters.push(fx); } @@ -2097,7 +2095,7 @@ export default async ( // rest of the columns from airtable- retain order & visibility property for (let j = 0; j < c.length; j++) { - const ncColumnId = sMap.getNcIdFromAtId(c[j].columnId); + const ncColumnId = await sMap.getNcIdFromAtId(c[j].columnId); const ncViewColumnId = await nc_getViewColumnId( viewId, viewType, @@ -2243,7 +2241,7 @@ export default async ( sDB: syncDB, logDetailed, }); - rtc.data.records += recordsMap[ncTbl.id].length; + rtc.data.records += await recordsMap[ncTbl.id].getCount(); logDetailed(`Data inserted from ${ncTbl.title}`); } diff --git a/packages/nocodb/src/lib/meta/api/sync/helpers/readAndProcessData.ts b/packages/nocodb/src/lib/meta/api/sync/helpers/readAndProcessData.ts index 9b605a0349..78bd787aab 100644 --- a/packages/nocodb/src/lib/meta/api/sync/helpers/readAndProcessData.ts +++ b/packages/nocodb/src/lib/meta/api/sync/helpers/readAndProcessData.ts @@ -1,59 +1,54 @@ import { AirtableBase } from 'airtable/lib/airtable_base'; import { Api, RelationTypes, TableType, UITypes } from 'nocodb-sdk'; +import EntityMap from './EntityMap'; -const BULK_DATA_BATCH_SIZE = 2000; -const ASSOC_BULK_DATA_BATCH_SIZE = 5000; + +const BULK_DATA_BATCH_SIZE = 500; +const ASSOC_BULK_DATA_BATCH_SIZE = 2000; async function readAllData({ table, fields, base, logBasic = (_str) => {}, - triggerThreshold = BULK_DATA_BATCH_SIZE, - onThreshold = async (_rec) => {}, }: { table: { title?: string }; fields?; base: AirtableBase; logBasic?: (string) => void; logDetailed?: (string) => void; - triggerThreshold?: number; - onThreshold?: ( - records: Array<{ fields: any; id: string }>, - allRecords?: Array<{ fields: any; id: string }> - ) => Promise; -}): Promise> { +}): Promise { return new Promise((resolve, reject) => { - const data = []; - let thresholdCbkData = []; + let data = null; const selectParams: any = { pageSize: 100, }; if (fields) selectParams.fields = fields; - const insertJobs: Promise[] = []; base(table.title) .select(selectParams) .eachPage( async function page(records, fetchNextPage) { - data.push(...records); - thresholdCbkData.push(...records); + if (!data) { + data = new EntityMap(); + await data.init(); + } + + for await (const record of records) { + await data.addRow({ id: record.id, ...record.fields }); + } + + const tmpLength = await data.getCount(); logBasic( `:: Reading '${table.title}' data :: ${Math.max( 1, - data.length - records.length - )} - ${data.length}` + tmpLength - records.length + )} - ${tmpLength}` ); - if (thresholdCbkData.length >= triggerThreshold) { - await Promise.all(insertJobs); - insertJobs.push(onThreshold(thresholdCbkData, data)); - thresholdCbkData = []; - } - // To fetch the next page of records, call `fetchNextPage`. // If there are more records, `page` will get called again. // If there are no more records, `done` will get called. @@ -64,11 +59,6 @@ async function readAllData({ console.error(err); return reject(err); } - if (thresholdCbkData.length) { - await Promise.all(insertJobs); - await onThreshold(thresholdCbkData, data); - thresholdCbkData = []; - } resolve(data); } ); @@ -94,7 +84,7 @@ export async function importData({ api: Api; nocoBaseDataProcessing_v2; sDB; -}): Promise { +}): Promise { try { // @ts-ignore const records = await readAllData({ @@ -102,26 +92,52 @@ export async function importData({ base, logDetailed, logBasic, - async onThreshold(records, allRecords) { - const allData = []; - for (let i = 0; i < records.length; i++) { - const r = await nocoBaseDataProcessing_v2(sDB, table, records[i]); - allData.push(r); - } + }); + + await new Promise(async (resolve) => { + const readable = records.getStream(); + const allRecordsCount = await records.getCount(); + const promises = []; + let tempData = []; + let importedCount = 0; + readable.on('data', async (record) => { + promises.push(new Promise(async (resolve) => { + const { id: rid, ...fields } = record; + const r = await nocoBaseDataProcessing_v2(sDB, table, { id: rid, fields }); + tempData.push(r); - logBasic( - `:: Importing '${table.title}' data :: ${ - allRecords.length - records.length + 1 - } - ${allRecords.length}` - ); - await api.dbTableRow.bulkCreate('nc', projectName, table.id, allData); - }, + if (tempData.length >= BULK_DATA_BATCH_SIZE) { + readable.pause(); + let insertArray = tempData.splice(0, tempData.length); + await api.dbTableRow.bulkCreate('nc', projectName, table.id, insertArray); + logBasic( + `:: Importing '${table.title}' data :: ${importedCount} - ${Math.min(importedCount + BULK_DATA_BATCH_SIZE, allRecordsCount)}` + ); + importedCount += insertArray.length; + insertArray = []; + readable.resume(); + } + resolve(true); + })); + }); + readable.on('end', async () => { + await Promise.all(promises); + if (tempData.length > 0) { + await api.dbTableRow.bulkCreate('nc', projectName, table.id, tempData); + logBasic( + `:: Importing '${table.title}' data :: ${importedCount} - ${Math.min(importedCount + BULK_DATA_BATCH_SIZE, allRecordsCount)}` + ); + importedCount += tempData.length; + tempData = []; + } + resolve(true); + }); }); return records; } catch (e) { console.log(e); - return 0; + return null; } } @@ -146,7 +162,7 @@ export async function importLTARData({ logBasic: (string) => void; api: Api; insertedAssocRef: { [assocTableId: string]: boolean }; - records?: Array<{ fields: any; id: string }>; + records?: EntityMap; atNcAliasRef: { [ncTableId: string]: { [ncTitle: string]: string; @@ -209,49 +225,77 @@ export async function importLTARData({ let nestedLinkCnt = 0; // Iterate over all related M2M associative table - for (const assocMeta of assocTableMetas) { - const assocTableData = []; + for await (const assocMeta of assocTableMetas) { + let assocTableData = []; + let importedCount = 0; // extract insert data from records - for (const record of allData) { - const rec = record.fields; + await new Promise((resolve) => { + const promises = []; + const readable = allData.getStream(); + readable.on('data', async (record) => { + promises.push(new Promise(async (resolve) => { + const { id: _atId, ...rec } = record; - // todo: use actual alias instead of sanitized - assocTableData.push( - ...(rec?.[atNcAliasRef[table.id][assocMeta.colMeta.title]] || []).map( - (id) => ({ - [assocMeta.curCol.title]: record.id, - [assocMeta.refCol.title]: id, - }) - ) - ); - } + // todo: use actual alias instead of sanitized + assocTableData.push( + ...(rec?.[atNcAliasRef[table.id][assocMeta.colMeta.title]] || []).map( + (id) => ({ + [assocMeta.curCol.title]: record.id, + [assocMeta.refCol.title]: id, + }) + ) + ); - nestedLinkCnt += assocTableData.length; - // Insert datas as chunks of size `ASSOC_BULK_DATA_BATCH_SIZE` - for ( - let i = 0; - i < assocTableData.length; - i += ASSOC_BULK_DATA_BATCH_SIZE - ) { - logBasic( - `:: Importing '${table.title}' LTAR data :: ${i + 1} - ${Math.min( - i + ASSOC_BULK_DATA_BATCH_SIZE, - assocTableData.length - )}` - ); + if (assocTableData.length >= ASSOC_BULK_DATA_BATCH_SIZE) { + readable.pause(); + let insertArray = assocTableData.splice(0, assocTableData.length); + logBasic( + `:: Importing '${table.title}' LTAR data :: ${importedCount} - ${Math.min( + importedCount + ASSOC_BULK_DATA_BATCH_SIZE, + insertArray.length + )}` + ); + + await api.dbTableRow.bulkCreate( + 'nc', + projectName, + assocMeta.modelMeta.id, + insertArray + ); - console.log( - assocTableData.slice(i, i + ASSOC_BULK_DATA_BATCH_SIZE).length - ); + importedCount += insertArray.length; + insertArray = []; + readable.resume(); + } + resolve(true); + })); + }); + readable.on('end', async () => { + await Promise.all(promises); + if (assocTableData.length >= 0) { + logBasic( + `:: Importing '${table.title}' LTAR data :: ${importedCount} - ${Math.min( + importedCount + ASSOC_BULK_DATA_BATCH_SIZE, + assocTableData.length + )}` + ); + + await api.dbTableRow.bulkCreate( + 'nc', + projectName, + assocMeta.modelMeta.id, + assocTableData + ); - await api.dbTableRow.bulkCreate( - 'nc', - projectName, - assocMeta.modelMeta.id, - assocTableData.slice(i, i + ASSOC_BULK_DATA_BATCH_SIZE) - ); - } + importedCount += assocTableData.length; + assocTableData = []; + } + resolve(true); + }); + }); + + nestedLinkCnt += importedCount; } return nestedLinkCnt; }