|
|
|
@ -17,7 +17,7 @@ const BULK_DATA_BATCH_COUNT =
|
|
|
|
|
const BULK_DATA_BATCH_SIZE = |
|
|
|
|
+process.env.AT_IMPORT_BULK_DATA_BATCH_SIZE || 20 * 1024; // import N bytes at a time
|
|
|
|
|
const BULK_LINK_BATCH_COUNT = |
|
|
|
|
+process.env.AT_IMPORT_BULK_LINK_BATCH_COUNT || 200; // import N links at a time
|
|
|
|
|
+process.env.AT_IMPORT_BULK_LINK_BATCH_COUNT || 500; // import N links at a time
|
|
|
|
|
const BULK_PARALLEL_PROCESS = +process.env.AT_IMPORT_BULK_PARALLEL_PROCESS || 2; // process N records at a time
|
|
|
|
|
const STREAM_BUFFER_LIMIT = +process.env.AT_IMPORT_STREAM_BUFFER_LIMIT || 100; // pause reading if we have more than N records to avoid backpressure
|
|
|
|
|
const QUEUE_BUFFER_LIMIT = +process.env.AT_IMPORT_QUEUE_BUFFER_LIMIT || 20; // pause streaming if we have more than N records in the queue
|
|
|
|
@ -406,14 +406,13 @@ export async function importLTARData({
|
|
|
|
|
const { id: _atId, ...rec } = record; |
|
|
|
|
|
|
|
|
|
// todo: use actual alias instead of sanitized
|
|
|
|
|
assocTableData[assocMeta.modelMeta.id].push( |
|
|
|
|
...( |
|
|
|
|
rec?.[atNcAliasRef[table.id][assocMeta.colMeta.title]] || [] |
|
|
|
|
).map((id) => ({ |
|
|
|
|
const links = |
|
|
|
|
rec?.[atNcAliasRef[table.id][assocMeta.colMeta.title]] || []; |
|
|
|
|
for (const id of links) { |
|
|
|
|
assocTableData[assocMeta.modelMeta.id].push({ |
|
|
|
|
[assocMeta.curCol.title]: record.id, |
|
|
|
|
[assocMeta.refCol.title]: id, |
|
|
|
|
})), |
|
|
|
|
); |
|
|
|
|
}); |
|
|
|
|
|
|
|
|
|
if ( |
|
|
|
|
assocTableData[assocMeta.modelMeta.id].length >= |
|
|
|
@ -445,6 +444,7 @@ export async function importLTARData({
|
|
|
|
|
|
|
|
|
|
insertArray = []; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (queue.size < QUEUE_BUFFER_LIMIT / 2) dataStream.resume(); |
|
|
|
|
resolve(true); |
|
|
|
|