Browse Source

feat: limit parallel processing for at import job

Signed-off-by: mertmit <mertmit99@gmail.com>
pull/4156/head
mertmit 2 years ago
parent
commit
ac9c0ee82c
  1. 17
      packages/nocodb/src/lib/meta/api/sync/helpers/readAndProcessData.ts

17
packages/nocodb/src/lib/meta/api/sync/helpers/readAndProcessData.ts

@ -4,7 +4,8 @@ import EntityMap from './EntityMap';
const BULK_DATA_BATCH_SIZE = 500;
const ASSOC_BULK_DATA_BATCH_SIZE = 2000;
const ASSOC_BULK_DATA_BATCH_SIZE = 1000;
const BULK_PARALLEL_PROCESS = 100;
async function readAllData({
table,
@ -100,14 +101,16 @@ export async function importData({
const promises = [];
let tempData = [];
let importedCount = 0;
let activeProcess = 0;
readable.on('data', async (record) => {
promises.push(new Promise(async (resolve) => {
activeProcess++;
if (activeProcess >= BULK_PARALLEL_PROCESS) readable.pause();
const { id: rid, ...fields } = record;
const r = await nocoBaseDataProcessing_v2(sDB, table, { id: rid, fields });
tempData.push(r);
if (tempData.length >= BULK_DATA_BATCH_SIZE) {
readable.pause();
let insertArray = tempData.splice(0, tempData.length);
await api.dbTableRow.bulkCreate('nc', projectName, table.id, insertArray);
logBasic(
@ -115,8 +118,9 @@ export async function importData({
);
importedCount += insertArray.length;
insertArray = [];
readable.resume();
}
activeProcess--;
if (activeProcess < BULK_PARALLEL_PROCESS) readable.resume();
resolve(true);
}));
});
@ -233,8 +237,11 @@ export async function importLTARData({
await new Promise((resolve) => {
const promises = [];
const readable = allData.getStream();
let activeProcess = 0;
readable.on('data', async (record) => {
promises.push(new Promise(async (resolve) => {
activeProcess++;
if (activeProcess >= BULK_PARALLEL_PROCESS) readable.pause();
const { id: _atId, ...rec } = record;
// todo: use actual alias instead of sanitized
@ -248,7 +255,6 @@ export async function importLTARData({
);
if (assocTableData.length >= ASSOC_BULK_DATA_BATCH_SIZE) {
readable.pause();
let insertArray = assocTableData.splice(0, assocTableData.length);
logBasic(
`:: Importing '${table.title}' LTAR data :: ${importedCount} - ${Math.min(
@ -266,8 +272,9 @@ export async function importLTARData({
importedCount += insertArray.length;
insertArray = [];
readable.resume();
}
activeProcess--;
if (activeProcess < BULK_PARALLEL_PROCESS) readable.resume();
resolve(true);
}));
});

Loading…
Cancel
Save