diff --git a/packages/nocodb/src/lib/meta/api/sync/helpers/readAndProcessData.ts b/packages/nocodb/src/lib/meta/api/sync/helpers/readAndProcessData.ts index 78bd787aab..d24a31c011 100644 --- a/packages/nocodb/src/lib/meta/api/sync/helpers/readAndProcessData.ts +++ b/packages/nocodb/src/lib/meta/api/sync/helpers/readAndProcessData.ts @@ -4,7 +4,8 @@ import EntityMap from './EntityMap'; const BULK_DATA_BATCH_SIZE = 500; -const ASSOC_BULK_DATA_BATCH_SIZE = 2000; +const ASSOC_BULK_DATA_BATCH_SIZE = 1000; +const BULK_PARALLEL_PROCESS = 100; async function readAllData({ table, @@ -100,14 +101,16 @@ export async function importData({ const promises = []; let tempData = []; let importedCount = 0; + let activeProcess = 0; readable.on('data', async (record) => { promises.push(new Promise(async (resolve) => { + activeProcess++; + if (activeProcess >= BULK_PARALLEL_PROCESS) readable.pause(); const { id: rid, ...fields } = record; const r = await nocoBaseDataProcessing_v2(sDB, table, { id: rid, fields }); tempData.push(r); if (tempData.length >= BULK_DATA_BATCH_SIZE) { - readable.pause(); let insertArray = tempData.splice(0, tempData.length); await api.dbTableRow.bulkCreate('nc', projectName, table.id, insertArray); logBasic( @@ -115,8 +118,9 @@ export async function importData({ ); importedCount += insertArray.length; insertArray = []; - readable.resume(); } + activeProcess--; + if (activeProcess < BULK_PARALLEL_PROCESS) readable.resume(); resolve(true); })); }); @@ -233,8 +237,11 @@ export async function importLTARData({ await new Promise((resolve) => { const promises = []; const readable = allData.getStream(); + let activeProcess = 0; readable.on('data', async (record) => { promises.push(new Promise(async (resolve) => { + activeProcess++; + if (activeProcess >= BULK_PARALLEL_PROCESS) readable.pause(); const { id: _atId, ...rec } = record; // todo: use actual alias instead of sanitized @@ -248,7 +255,6 @@ export async function importLTARData({ ); if (assocTableData.length >= ASSOC_BULK_DATA_BATCH_SIZE) { - readable.pause(); let insertArray = assocTableData.splice(0, assocTableData.length); logBasic( `:: Importing '${table.title}' LTAR data :: ${importedCount} - ${Math.min( @@ -266,8 +272,9 @@ export async function importLTARData({ importedCount += insertArray.length; insertArray = []; - readable.resume(); } + activeProcess--; + if (activeProcess < BULK_PARALLEL_PROCESS) readable.resume(); resolve(true); })); });