Browse Source

fix: avoid pushing all links of record at once

pull/7506/head
mertmit 10 months ago
parent
commit
0f33979dd4
  1. 70
      packages/nocodb/src/modules/jobs/jobs/at-import/helpers/readAndProcessData.ts

70
packages/nocodb/src/modules/jobs/jobs/at-import/helpers/readAndProcessData.ts

@ -17,7 +17,7 @@ const BULK_DATA_BATCH_COUNT =
const BULK_DATA_BATCH_SIZE = const BULK_DATA_BATCH_SIZE =
+process.env.AT_IMPORT_BULK_DATA_BATCH_SIZE || 20 * 1024; // import N bytes at a time +process.env.AT_IMPORT_BULK_DATA_BATCH_SIZE || 20 * 1024; // import N bytes at a time
const BULK_LINK_BATCH_COUNT = const BULK_LINK_BATCH_COUNT =
+process.env.AT_IMPORT_BULK_LINK_BATCH_COUNT || 200; // import N links at a time +process.env.AT_IMPORT_BULK_LINK_BATCH_COUNT || 500; // import N links at a time
const BULK_PARALLEL_PROCESS = +process.env.AT_IMPORT_BULK_PARALLEL_PROCESS || 2; // process N records at a time const BULK_PARALLEL_PROCESS = +process.env.AT_IMPORT_BULK_PARALLEL_PROCESS || 2; // process N records at a time
const STREAM_BUFFER_LIMIT = +process.env.AT_IMPORT_STREAM_BUFFER_LIMIT || 100; // pause reading if we have more than N records to avoid backpressure const STREAM_BUFFER_LIMIT = +process.env.AT_IMPORT_STREAM_BUFFER_LIMIT || 100; // pause reading if we have more than N records to avoid backpressure
const QUEUE_BUFFER_LIMIT = +process.env.AT_IMPORT_QUEUE_BUFFER_LIMIT || 20; // pause streaming if we have more than N records in the queue const QUEUE_BUFFER_LIMIT = +process.env.AT_IMPORT_QUEUE_BUFFER_LIMIT || 20; // pause streaming if we have more than N records in the queue
@ -406,44 +406,44 @@ export async function importLTARData({
const { id: _atId, ...rec } = record; const { id: _atId, ...rec } = record;
// todo: use actual alias instead of sanitized // todo: use actual alias instead of sanitized
assocTableData[assocMeta.modelMeta.id].push( const links =
...( rec?.[atNcAliasRef[table.id][assocMeta.colMeta.title]] || [];
rec?.[atNcAliasRef[table.id][assocMeta.colMeta.title]] || [] for (const id of links) {
).map((id) => ({ assocTableData[assocMeta.modelMeta.id].push({
[assocMeta.curCol.title]: record.id, [assocMeta.curCol.title]: record.id,
[assocMeta.refCol.title]: id, [assocMeta.refCol.title]: id,
})),
);
if (
assocTableData[assocMeta.modelMeta.id].length >=
BULK_LINK_BATCH_COUNT
) {
let insertArray = assocTableData[
assocMeta.modelMeta.id
].splice(0, assocTableData[assocMeta.modelMeta.id].length);
const lastImportedCount = importedCount;
importedCount += insertArray.length;
logBasic(
`:: Importing '${
table.title
}' LTAR data :: ${lastImportedCount} - ${
lastImportedCount + insertArray.length
}`,
);
await services.bulkDataService.bulkDataInsert({
baseName,
tableName: assocMeta.modelMeta.id,
body: insertArray,
cookie: {},
skip_hooks: true,
foreign_key_checks: !!source.isMeta(),
}); });
insertArray = []; if (
assocTableData[assocMeta.modelMeta.id].length >=
BULK_LINK_BATCH_COUNT
) {
let insertArray = assocTableData[
assocMeta.modelMeta.id
].splice(0, assocTableData[assocMeta.modelMeta.id].length);
const lastImportedCount = importedCount;
importedCount += insertArray.length;
logBasic(
`:: Importing '${
table.title
}' LTAR data :: ${lastImportedCount} - ${
lastImportedCount + insertArray.length
}`,
);
await services.bulkDataService.bulkDataInsert({
baseName,
tableName: assocMeta.modelMeta.id,
body: insertArray,
cookie: {},
skip_hooks: true,
foreign_key_checks: !!source.isMeta(),
});
insertArray = [];
}
} }
if (queue.size < QUEUE_BUFFER_LIMIT / 2) dataStream.resume(); if (queue.size < QUEUE_BUFFER_LIMIT / 2) dataStream.resume();

Loading…
Cancel
Save