diff --git a/packages/nocodb/src/modules/jobs/jobs/at-import/helpers/readAndProcessData.ts b/packages/nocodb/src/modules/jobs/jobs/at-import/helpers/readAndProcessData.ts index 03655dbb4f..0fb1059e1a 100644 --- a/packages/nocodb/src/modules/jobs/jobs/at-import/helpers/readAndProcessData.ts +++ b/packages/nocodb/src/modules/jobs/jobs/at-import/helpers/readAndProcessData.ts @@ -17,7 +17,7 @@ const BULK_DATA_BATCH_COUNT = const BULK_DATA_BATCH_SIZE = +process.env.AT_IMPORT_BULK_DATA_BATCH_SIZE || 20 * 1024; // import N bytes at a time const BULK_LINK_BATCH_COUNT = - +process.env.AT_IMPORT_BULK_LINK_BATCH_COUNT || 200; // import N links at a time + +process.env.AT_IMPORT_BULK_LINK_BATCH_COUNT || 500; // import N links at a time const BULK_PARALLEL_PROCESS = +process.env.AT_IMPORT_BULK_PARALLEL_PROCESS || 2; // process N records at a time const STREAM_BUFFER_LIMIT = +process.env.AT_IMPORT_STREAM_BUFFER_LIMIT || 100; // pause reading if we have more than N records to avoid backpressure const QUEUE_BUFFER_LIMIT = +process.env.AT_IMPORT_QUEUE_BUFFER_LIMIT || 20; // pause streaming if we have more than N records in the queue @@ -406,44 +406,44 @@ export async function importLTARData({ const { id: _atId, ...rec } = record; // todo: use actual alias instead of sanitized - assocTableData[assocMeta.modelMeta.id].push( - ...( - rec?.[atNcAliasRef[table.id][assocMeta.colMeta.title]] || [] - ).map((id) => ({ + const links = + rec?.[atNcAliasRef[table.id][assocMeta.colMeta.title]] || []; + for (const id of links) { + assocTableData[assocMeta.modelMeta.id].push({ [assocMeta.curCol.title]: record.id, [assocMeta.refCol.title]: id, - })), - ); - - if ( - assocTableData[assocMeta.modelMeta.id].length >= - BULK_LINK_BATCH_COUNT - ) { - let insertArray = assocTableData[ - assocMeta.modelMeta.id - ].splice(0, assocTableData[assocMeta.modelMeta.id].length); - - const lastImportedCount = importedCount; - importedCount += insertArray.length; - - logBasic( - `:: Importing '${ - table.title - }' LTAR data :: ${lastImportedCount} - ${ - lastImportedCount + insertArray.length - }`, - ); - - await services.bulkDataService.bulkDataInsert({ - baseName, - tableName: assocMeta.modelMeta.id, - body: insertArray, - cookie: {}, - skip_hooks: true, - foreign_key_checks: !!source.isMeta(), }); - insertArray = []; + if ( + assocTableData[assocMeta.modelMeta.id].length >= + BULK_LINK_BATCH_COUNT + ) { + let insertArray = assocTableData[ + assocMeta.modelMeta.id + ].splice(0, assocTableData[assocMeta.modelMeta.id].length); + + const lastImportedCount = importedCount; + importedCount += insertArray.length; + + logBasic( + `:: Importing '${ + table.title + }' LTAR data :: ${lastImportedCount} - ${ + lastImportedCount + insertArray.length + }`, + ); + + await services.bulkDataService.bulkDataInsert({ + baseName, + tableName: assocMeta.modelMeta.id, + body: insertArray, + cookie: {}, + skip_hooks: true, + foreign_key_checks: !!source.isMeta(), + }); + + insertArray = []; + } } if (queue.size < QUEUE_BUFFER_LIMIT / 2) dataStream.resume();