Browse Source

fix: keep track of batch count instead of mod

Signed-off-by: mertmit <mertmit99@gmail.com>
pull/5954/head
mertmit 1 year ago
parent
commit
c86bdae4c0
  1. 24
      packages/nocodb/src/modules/jobs/jobs/at-import/helpers/readAndProcessData.ts

24
packages/nocodb/src/modules/jobs/jobs/at-import/helpers/readAndProcessData.ts

@ -8,8 +8,8 @@ import type { TablesService } from '../../../../../services/tables.service';
import type { AirtableBase } from 'airtable/lib/airtable_base'; import type { AirtableBase } from 'airtable/lib/airtable_base';
import type { TableType } from 'nocodb-sdk'; import type { TableType } from 'nocodb-sdk';
const BULK_DATA_BATCH_COUNT = 100; // check size for every 100 records const BULK_DATA_BATCH_COUNT = 20; // check size for every 100 records
const BULK_DATA_BATCH_SIZE = 102400; // in bytes const BULK_DATA_BATCH_SIZE = 51200; // in bytes
const BULK_PARALLEL_PROCESS = 5; const BULK_PARALLEL_PROCESS = 5;
interface AirtableImportContext { interface AirtableImportContext {
@ -112,19 +112,25 @@ export async function importData({
let tempData = []; let tempData = [];
let importedCount = 0; let importedCount = 0;
let activeProcess = 0; let activeProcess = 0;
let tempCount = 0;
readable.on('data', async (record) => { readable.on('data', async (record) => {
promises.push( promises.push(
new Promise(async (resolve) => { new Promise(async (resolve) => {
activeProcess++; activeProcess++;
if (activeProcess >= BULK_PARALLEL_PROCESS) readable.pause(); if (activeProcess >= BULK_PARALLEL_PROCESS) readable.pause();
const { id: rid, ...fields } = record; const { id: rid, ...fields } = record;
const r = await nocoBaseDataProcessing_v2(sDB, table, { const r = await nocoBaseDataProcessing_v2(sDB, table, {
id: rid, id: rid,
fields, fields,
}); });
tempData.push(r); tempData.push(r);
if (tempData.length % BULK_DATA_BATCH_COUNT === 0) { tempCount++;
if (tempCount >= BULK_DATA_BATCH_COUNT) {
if (sizeof(tempData) >= BULK_DATA_BATCH_SIZE) { if (sizeof(tempData) >= BULK_DATA_BATCH_SIZE) {
readable.pause();
let insertArray = tempData.splice(0, tempData.length); let insertArray = tempData.splice(0, tempData.length);
await services.bulkDataService.bulkDataInsert({ await services.bulkDataService.bulkDataInsert({
@ -144,7 +150,10 @@ export async function importData({
); );
importedCount += insertArray.length; importedCount += insertArray.length;
insertArray = []; insertArray = [];
readable.resume();
} }
tempCount = 0;
} }
activeProcess--; activeProcess--;
if (activeProcess < BULK_PARALLEL_PROCESS) readable.resume(); if (activeProcess < BULK_PARALLEL_PROCESS) readable.resume();
@ -285,6 +294,7 @@ export async function importLTARData({
const promises = []; const promises = [];
const readable = allData.getStream(); const readable = allData.getStream();
let activeProcess = 0; let activeProcess = 0;
let tempCount = 0;
readable.on('data', async (record) => { readable.on('data', async (record) => {
promises.push( promises.push(
new Promise(async (resolve) => { new Promise(async (resolve) => {
@ -301,9 +311,12 @@ export async function importLTARData({
[assocMeta.refCol.title]: id, [assocMeta.refCol.title]: id,
})), })),
); );
tempCount++;
if (assocTableData.length % BULK_DATA_BATCH_COUNT === 0) { if (tempCount >= BULK_DATA_BATCH_COUNT) {
if (sizeof(assocTableData) >= BULK_DATA_BATCH_SIZE) { if (sizeof(assocTableData) >= BULK_DATA_BATCH_SIZE) {
readable.pause();
let insertArray = assocTableData.splice( let insertArray = assocTableData.splice(
0, 0,
assocTableData.length, assocTableData.length,
@ -326,7 +339,10 @@ export async function importLTARData({
importedCount += insertArray.length; importedCount += insertArray.length;
insertArray = []; insertArray = [];
readable.resume();
} }
tempCount = 0;
} }
activeProcess--; activeProcess--;
if (activeProcess < BULK_PARALLEL_PROCESS) readable.resume(); if (activeProcess < BULK_PARALLEL_PROCESS) readable.resume();

Loading…
Cancel
Save