Browse Source

fix: dynamic batch based on byte size

Signed-off-by: mertmit <mertmit99@gmail.com>
pull/5954/head
mertmit 1 year ago
parent
commit
e78428a9c1
  1. 51
      packages/nocodb/package-lock.json
  2. 1
      packages/nocodb/package.json
  3. 97
      packages/nocodb/src/modules/jobs/jobs/at-import/helpers/readAndProcessData.ts

51
packages/nocodb/package-lock.json generated

@ -86,6 +86,7 @@
"nocodb-sdk": "file:../nocodb-sdk",
"nodemailer": "^6.4.10",
"object-hash": "^3.0.0",
"object-sizeof": "^2.6.1",
"os-locale": "^6.0.2",
"p-queue": "^6.6.2",
"papaparse": "^5.3.1",
@ -13568,6 +13569,37 @@
"resolved": "https://registry.npmjs.org/object-keys/-/object-keys-0.4.0.tgz",
"integrity": "sha512-ncrLw+X55z7bkl5PnUvHwFK9FcGuFYo9gtjws2XtSzL+aZ8tm830P60WJ0dSmFVaSalWieW5MD7kEdnXda9yJw=="
},
"node_modules/object-sizeof": {
"version": "2.6.1",
"resolved": "https://registry.npmjs.org/object-sizeof/-/object-sizeof-2.6.1.tgz",
"integrity": "sha512-a7VJ1Zx7ZuHceKwjgfsSqzV/X0PVGvpZz7ho3Dn4Cs0LLcR5e5WuV+gsbizmplD8s0nAXMJmckKB2rkSiPm/Gg==",
"dependencies": {
"buffer": "^6.0.3"
}
},
"node_modules/object-sizeof/node_modules/buffer": {
"version": "6.0.3",
"resolved": "https://registry.npmjs.org/buffer/-/buffer-6.0.3.tgz",
"integrity": "sha512-FTiCpNxtwiZZHEZbcbTIcZjERVICn9yq/pDFkTl95/AxzD1naBctN7YO68riM/gLSDY7sdrMby8hofADYuuqOA==",
"funding": [
{
"type": "github",
"url": "https://github.com/sponsors/feross"
},
{
"type": "patreon",
"url": "https://www.patreon.com/feross"
},
{
"type": "consulting",
"url": "https://feross.org/support"
}
],
"dependencies": {
"base64-js": "^1.3.1",
"ieee754": "^1.2.1"
}
},
"node_modules/object.assign": {
"version": "4.1.4",
"resolved": "https://registry.npmjs.org/object.assign/-/object.assign-4.1.4.tgz",
@ -28765,6 +28797,25 @@
"resolved": "https://registry.npmjs.org/object-keys/-/object-keys-0.4.0.tgz",
"integrity": "sha512-ncrLw+X55z7bkl5PnUvHwFK9FcGuFYo9gtjws2XtSzL+aZ8tm830P60WJ0dSmFVaSalWieW5MD7kEdnXda9yJw=="
},
"object-sizeof": {
"version": "2.6.1",
"resolved": "https://registry.npmjs.org/object-sizeof/-/object-sizeof-2.6.1.tgz",
"integrity": "sha512-a7VJ1Zx7ZuHceKwjgfsSqzV/X0PVGvpZz7ho3Dn4Cs0LLcR5e5WuV+gsbizmplD8s0nAXMJmckKB2rkSiPm/Gg==",
"requires": {
"buffer": "^6.0.3"
},
"dependencies": {
"buffer": {
"version": "6.0.3",
"resolved": "https://registry.npmjs.org/buffer/-/buffer-6.0.3.tgz",
"integrity": "sha512-FTiCpNxtwiZZHEZbcbTIcZjERVICn9yq/pDFkTl95/AxzD1naBctN7YO68riM/gLSDY7sdrMby8hofADYuuqOA==",
"requires": {
"base64-js": "^1.3.1",
"ieee754": "^1.2.1"
}
}
}
},
"object.assign": {
"version": "4.1.4",
"resolved": "https://registry.npmjs.org/object.assign/-/object.assign-4.1.4.tgz",

1
packages/nocodb/package.json

@ -119,6 +119,7 @@
"nocodb-sdk": "file:../nocodb-sdk",
"nodemailer": "^6.4.10",
"object-hash": "^3.0.0",
"object-sizeof": "^2.6.1",
"os-locale": "^6.0.2",
"p-queue": "^6.6.2",
"papaparse": "^5.3.1",

97
packages/nocodb/src/modules/jobs/jobs/at-import/helpers/readAndProcessData.ts

@ -1,5 +1,6 @@
/* eslint-disable no-async-promise-executor */
import { RelationTypes, UITypes } from 'nocodb-sdk';
import sizeof from 'object-sizeof';
import EntityMap from './EntityMap';
import type { BulkDataAliasService } from '../../../../../services/bulk-data-alias.service';
import type { TablesService } from '../../../../../services/tables.service';
@ -7,8 +8,8 @@ import type { TablesService } from '../../../../../services/tables.service';
import type { AirtableBase } from 'airtable/lib/airtable_base';
import type { TableType } from 'nocodb-sdk';
const BULK_DATA_BATCH_SIZE = 500;
const ASSOC_BULK_DATA_BATCH_SIZE = 1000;
const BULK_DATA_BATCH_COUNT = 100; // check size for every 100 records
const BULK_DATA_BATCH_SIZE = 102400; // in bytes
const BULK_PARALLEL_PROCESS = 5;
interface AirtableImportContext {
@ -122,27 +123,28 @@ export async function importData({
fields,
});
tempData.push(r);
if (tempData.length >= BULK_DATA_BATCH_SIZE) {
let insertArray = tempData.splice(0, tempData.length);
await services.bulkDataService.bulkDataInsert({
projectName,
tableName: table.title,
body: insertArray,
cookie: {},
});
logBasic(
`:: Importing '${
table.title
}' data :: ${importedCount} - ${Math.min(
importedCount + BULK_DATA_BATCH_SIZE,
allRecordsCount,
)}`,
);
importedCount += insertArray.length;
insertArray = [];
if (tempData.length % BULK_DATA_BATCH_COUNT === 0) {
if (sizeof(tempData) >= BULK_DATA_BATCH_SIZE) {
let insertArray = tempData.splice(0, tempData.length);
await services.bulkDataService.bulkDataInsert({
projectName,
tableName: table.title,
body: insertArray,
cookie: {},
});
logBasic(
`:: Importing '${
table.title
}' data :: ${importedCount} - ${Math.min(
importedCount + insertArray.length,
allRecordsCount,
)}`,
);
importedCount += insertArray.length;
insertArray = [];
}
}
activeProcess--;
if (activeProcess < BULK_PARALLEL_PROCESS) readable.resume();
@ -164,7 +166,7 @@ export async function importData({
`:: Importing '${
table.title
}' data :: ${importedCount} - ${Math.min(
importedCount + BULK_DATA_BATCH_SIZE,
importedCount + tempData.length,
allRecordsCount,
)}`,
);
@ -300,26 +302,31 @@ export async function importLTARData({
})),
);
if (assocTableData.length >= ASSOC_BULK_DATA_BATCH_SIZE) {
let insertArray = assocTableData.splice(0, assocTableData.length);
logBasic(
`:: Importing '${
table.title
}' LTAR data :: ${importedCount} - ${Math.min(
importedCount + ASSOC_BULK_DATA_BATCH_SIZE,
insertArray.length,
)}`,
);
await services.bulkDataService.bulkDataInsert({
projectName,
tableName: assocMeta.modelMeta.title,
body: insertArray,
cookie: {},
});
importedCount += insertArray.length;
insertArray = [];
if (assocTableData.length % BULK_DATA_BATCH_COUNT === 0) {
if (sizeof(assocTableData) >= BULK_DATA_BATCH_SIZE) {
let insertArray = assocTableData.splice(
0,
assocTableData.length,
);
logBasic(
`:: Importing '${
table.title
}' LTAR data :: ${importedCount} - ${Math.min(
importedCount + insertArray.length,
insertArray.length,
)}`,
);
await services.bulkDataService.bulkDataInsert({
projectName,
tableName: assocMeta.modelMeta.title,
body: insertArray,
cookie: {},
});
importedCount += insertArray.length;
insertArray = [];
}
}
activeProcess--;
if (activeProcess < BULK_PARALLEL_PROCESS) readable.resume();
@ -334,7 +341,7 @@ export async function importLTARData({
`:: Importing '${
table.title
}' LTAR data :: ${importedCount} - ${Math.min(
importedCount + ASSOC_BULK_DATA_BATCH_SIZE,
importedCount + assocTableData.length,
assocTableData.length,
)}`,
);

Loading…
Cancel
Save