Browse Source

Merge pull request #5954 from nocodb/fix/at-job-size

fix: dynamic batch based on byte size
pull/5962/head
Raju Udava 1 year ago committed by GitHub
parent
commit
49c1a18d52
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 51
      packages/nocodb/package-lock.json
  2. 1
      packages/nocodb/package.json
  3. 149
      packages/nocodb/src/db/BaseModelSqlv2.ts
  4. 153
      packages/nocodb/src/modules/jobs/jobs/at-import/helpers/readAndProcessData.ts
  5. 2
      packages/nocodb/src/services/bulk-data-alias.service.ts

51
packages/nocodb/package-lock.json generated

@ -86,6 +86,7 @@
"nocodb-sdk": "file:../nocodb-sdk",
"nodemailer": "^6.4.10",
"object-hash": "^3.0.0",
"object-sizeof": "^2.6.1",
"os-locale": "^6.0.2",
"p-queue": "^6.6.2",
"papaparse": "^5.3.1",
@ -13568,6 +13569,37 @@
"resolved": "https://registry.npmjs.org/object-keys/-/object-keys-0.4.0.tgz",
"integrity": "sha512-ncrLw+X55z7bkl5PnUvHwFK9FcGuFYo9gtjws2XtSzL+aZ8tm830P60WJ0dSmFVaSalWieW5MD7kEdnXda9yJw=="
},
"node_modules/object-sizeof": {
"version": "2.6.1",
"resolved": "https://registry.npmjs.org/object-sizeof/-/object-sizeof-2.6.1.tgz",
"integrity": "sha512-a7VJ1Zx7ZuHceKwjgfsSqzV/X0PVGvpZz7ho3Dn4Cs0LLcR5e5WuV+gsbizmplD8s0nAXMJmckKB2rkSiPm/Gg==",
"dependencies": {
"buffer": "^6.0.3"
}
},
"node_modules/object-sizeof/node_modules/buffer": {
"version": "6.0.3",
"resolved": "https://registry.npmjs.org/buffer/-/buffer-6.0.3.tgz",
"integrity": "sha512-FTiCpNxtwiZZHEZbcbTIcZjERVICn9yq/pDFkTl95/AxzD1naBctN7YO68riM/gLSDY7sdrMby8hofADYuuqOA==",
"funding": [
{
"type": "github",
"url": "https://github.com/sponsors/feross"
},
{
"type": "patreon",
"url": "https://www.patreon.com/feross"
},
{
"type": "consulting",
"url": "https://feross.org/support"
}
],
"dependencies": {
"base64-js": "^1.3.1",
"ieee754": "^1.2.1"
}
},
"node_modules/object.assign": {
"version": "4.1.4",
"resolved": "https://registry.npmjs.org/object.assign/-/object.assign-4.1.4.tgz",
@ -28765,6 +28797,25 @@
"resolved": "https://registry.npmjs.org/object-keys/-/object-keys-0.4.0.tgz",
"integrity": "sha512-ncrLw+X55z7bkl5PnUvHwFK9FcGuFYo9gtjws2XtSzL+aZ8tm830P60WJ0dSmFVaSalWieW5MD7kEdnXda9yJw=="
},
"object-sizeof": {
"version": "2.6.1",
"resolved": "https://registry.npmjs.org/object-sizeof/-/object-sizeof-2.6.1.tgz",
"integrity": "sha512-a7VJ1Zx7ZuHceKwjgfsSqzV/X0PVGvpZz7ho3Dn4Cs0LLcR5e5WuV+gsbizmplD8s0nAXMJmckKB2rkSiPm/Gg==",
"requires": {
"buffer": "^6.0.3"
},
"dependencies": {
"buffer": {
"version": "6.0.3",
"resolved": "https://registry.npmjs.org/buffer/-/buffer-6.0.3.tgz",
"integrity": "sha512-FTiCpNxtwiZZHEZbcbTIcZjERVICn9yq/pDFkTl95/AxzD1naBctN7YO68riM/gLSDY7sdrMby8hofADYuuqOA==",
"requires": {
"base64-js": "^1.3.1",
"ieee754": "^1.2.1"
}
}
}
},
"object.assign": {
"version": "4.1.4",
"resolved": "https://registry.npmjs.org/object.assign/-/object.assign-4.1.4.tgz",

1
packages/nocodb/package.json

@ -119,6 +119,7 @@
"nocodb-sdk": "file:../nocodb-sdk",
"nodemailer": "^6.4.10",
"object-hash": "^3.0.0",
"object-sizeof": "^2.6.1",
"os-locale": "^6.0.2",
"p-queue": "^6.6.2",
"papaparse": "^5.3.1",

149
packages/nocodb/src/db/BaseModelSqlv2.ts

@ -2244,38 +2244,152 @@ class BaseModelSqlv2 {
chunkSize: _chunkSize = 100,
cookie,
foreign_key_checks = true,
skip_hooks = false,
raw = false,
}: {
chunkSize?: number;
cookie?: any;
foreign_key_checks?: boolean;
skip_hooks?: boolean;
raw?: boolean;
} = {},
) {
let trx;
try {
// TODO: ag column handling for raw bulk insert
const insertDatas = raw
? datas
: await Promise.all(
datas.map(async (d) => {
await populatePk(this.model, d);
return this.model.mapAliasToColumn(
d,
this.clientMeta,
this.dbDriver,
);
}),
);
// await this.beforeInsertb(insertDatas, null);
const insertDatas = raw ? datas : [];
if (!raw) {
for (const data of datas) {
await this.validate(data);
await this.model.getColumns();
for (const d of datas) {
const insertObj = {};
// populate pk, map alias to column, validate data
for (let i = 0; i < this.model.columns.length; ++i) {
const col = this.model.columns[i];
// populate pk columns
if (col.pk) {
if (col.meta?.ag && !d[col.title]) {
d[col.title] =
col.meta?.ag === 'nc' ? `rc_${nanoidv2()}` : uuidv4();
}
}
// map alias to column
if (!isVirtualCol(col)) {
let val =
d?.[col.column_name] !== undefined
? d?.[col.column_name]
: d?.[col.title];
if (val !== undefined) {
if (
col.uidt === UITypes.Attachment &&
typeof val !== 'string'
) {
val = JSON.stringify(val);
}
if (col.uidt === UITypes.DateTime && dayjs(val).isValid()) {
const { isMySQL, isSqlite, isMssql, isPg } = this.clientMeta;
if (
val.indexOf('-') < 0 &&
val.indexOf('+') < 0 &&
val.slice(-1) !== 'Z'
) {
// if no timezone is given,
// then append +00:00 to make it as UTC
val += '+00:00';
}
if (isMySQL) {
// first convert the value to utc
// from UI
// e.g. 2022-01-01 20:00:00Z -> 2022-01-01 20:00:00
// from API
// e.g. 2022-01-01 20:00:00+08:00 -> 2022-01-01 12:00:00
// if timezone info is not found - considered as utc
// e.g. 2022-01-01 20:00:00 -> 2022-01-01 20:00:00
// if timezone info is found
// e.g. 2022-01-01 20:00:00Z -> 2022-01-01 20:00:00
// e.g. 2022-01-01 20:00:00+00:00 -> 2022-01-01 20:00:00
// e.g. 2022-01-01 20:00:00+08:00 -> 2022-01-01 12:00:00
// then we use CONVERT_TZ to convert that in the db timezone
val = this.dbDriver.raw(
`CONVERT_TZ(?, '+00:00', @@GLOBAL.time_zone)`,
[dayjs(val).utc().format('YYYY-MM-DD HH:mm:ss')],
);
} else if (isSqlite) {
// convert to UTC
// e.g. 2022-01-01T10:00:00.000Z -> 2022-01-01 04:30:00+00:00
val = dayjs(val).utc().format('YYYY-MM-DD HH:mm:ssZ');
} else if (isPg) {
// convert to UTC
// e.g. 2023-01-01T12:00:00.000Z -> 2023-01-01 12:00:00+00:00
// then convert to db timezone
val = this.dbDriver.raw(
`? AT TIME ZONE CURRENT_SETTING('timezone')`,
[dayjs(val).utc().format('YYYY-MM-DD HH:mm:ssZ')],
);
} else if (isMssql) {
// convert ot UTC
// e.g. 2023-05-10T08:49:32.000Z -> 2023-05-10 08:49:32-08:00
// then convert to db timezone
val = this.dbDriver.raw(
`SWITCHOFFSET(CONVERT(datetimeoffset, ?), DATENAME(TzOffset, SYSDATETIMEOFFSET()))`,
[dayjs(val).utc().format('YYYY-MM-DD HH:mm:ssZ')],
);
} else {
// e.g. 2023-01-01T12:00:00.000Z -> 2023-01-01 12:00:00+00:00
val = dayjs(val).utc().format('YYYY-MM-DD HH:mm:ssZ');
}
}
insertObj[sanitize(col.column_name)] = val;
}
}
// validate data
if (col?.meta?.validate && col?.validate) {
const validate = col.getValidators();
const cn = col.column_name;
const columnTitle = col.title;
if (validate) {
const { func, msg } = validate;
for (let j = 0; j < func.length; ++j) {
const fn =
typeof func[j] === 'string'
? customValidators[func[j]]
? customValidators[func[j]]
: Validator[func[j]]
: func[j];
const columnValue =
insertObj?.[cn] || insertObj?.[columnTitle];
const arg =
typeof func[j] === 'string'
? columnValue + ''
: columnValue;
if (
![null, undefined, ''].includes(columnValue) &&
!(fn.constructor.name === 'AsyncFunction'
? await fn(arg)
: fn(arg))
) {
NcError.badRequest(
msg[j]
.replace(/\{VALUE}/g, columnValue)
.replace(/\{cn}/g, columnTitle),
);
}
}
}
}
}
insertDatas.push(insertObj);
}
}
// await this.beforeInsertb(insertDatas, null);
// fallbacks to `10` if database client is sqlite
// to avoid `too many SQL variables` error
// refer : https://www.sqlite.org/limits.html
@ -2308,7 +2422,8 @@ class BaseModelSqlv2 {
await trx.commit();
if (!raw) await this.afterBulkInsert(insertDatas, this.dbDriver, cookie);
if (!raw && !skip_hooks)
await this.afterBulkInsert(insertDatas, this.dbDriver, cookie);
return response;
} catch (e) {

153
packages/nocodb/src/modules/jobs/jobs/at-import/helpers/readAndProcessData.ts

@ -1,5 +1,6 @@
/* eslint-disable no-async-promise-executor */
import { RelationTypes, UITypes } from 'nocodb-sdk';
import sizeof from 'object-sizeof';
import EntityMap from './EntityMap';
import type { BulkDataAliasService } from '../../../../../services/bulk-data-alias.service';
import type { TablesService } from '../../../../../services/tables.service';
@ -7,8 +8,8 @@ import type { TablesService } from '../../../../../services/tables.service';
import type { AirtableBase } from 'airtable/lib/airtable_base';
import type { TableType } from 'nocodb-sdk';
const BULK_DATA_BATCH_SIZE = 500;
const ASSOC_BULK_DATA_BATCH_SIZE = 1000;
const BULK_DATA_BATCH_COUNT = 20; // check size for every 100 records
const BULK_DATA_BATCH_SIZE = 50 * 1024; // in bytes
const BULK_PARALLEL_PROCESS = 5;
interface AirtableImportContext {
@ -42,6 +43,12 @@ async function readAllData({
.eachPage(
async function page(records, fetchNextPage) {
if (!data) {
/*
EntityMap is a sqlite3 table dynamically populated based on json data provided
It is used to store data temporarily and then stream it in bulk to import
This is done to avoid memory issues - heap out of memory - while importing large data
*/
data = new EntityMap();
await data.init();
}
@ -96,8 +103,8 @@ export async function importData({
services: AirtableImportContext;
}): Promise<EntityMap> {
try {
// @ts-ignore
const records = await readAllData({
// returns EntityMap which allows us to stream data
const records: EntityMap = await readAllData({
table,
base,
logDetailed,
@ -108,41 +115,57 @@ export async function importData({
const readable = records.getStream();
const allRecordsCount = await records.getCount();
const promises = [];
let tempData = [];
let importedCount = 0;
let tempCount = 0;
// we keep track of active process to pause and resume the stream as we have async calls within the stream and we don't want to load all data in memory
let activeProcess = 0;
readable.on('data', async (record) => {
promises.push(
new Promise(async (resolve) => {
activeProcess++;
if (activeProcess >= BULK_PARALLEL_PROCESS) readable.pause();
const { id: rid, ...fields } = record;
const r = await nocoBaseDataProcessing_v2(sDB, table, {
id: rid,
fields,
});
tempData.push(r);
if (tempData.length >= BULK_DATA_BATCH_SIZE) {
let insertArray = tempData.splice(0, tempData.length);
await services.bulkDataService.bulkDataInsert({
projectName,
tableName: table.title,
body: insertArray,
cookie: {},
});
logBasic(
`:: Importing '${
table.title
}' data :: ${importedCount} - ${Math.min(
importedCount + BULK_DATA_BATCH_SIZE,
allRecordsCount,
)}`,
);
importedCount += insertArray.length;
insertArray = [];
tempCount++;
if (tempCount >= BULK_DATA_BATCH_COUNT) {
if (sizeof(tempData) >= BULK_DATA_BATCH_SIZE) {
readable.pause();
let insertArray = tempData.splice(0, tempData.length);
await services.bulkDataService.bulkDataInsert({
projectName,
tableName: table.title,
body: insertArray,
cookie: {},
skip_hooks: true,
});
logBasic(
`:: Importing '${
table.title
}' data :: ${importedCount} - ${Math.min(
importedCount + insertArray.length,
allRecordsCount,
)}`,
);
importedCount += insertArray.length;
insertArray = [];
readable.resume();
}
tempCount = 0;
}
activeProcess--;
if (activeProcess < BULK_PARALLEL_PROCESS) readable.resume();
@ -151,26 +174,31 @@ export async function importData({
);
});
readable.on('end', async () => {
// ensure all chunks are processed
await Promise.all(promises);
// insert remaining data
if (tempData.length > 0) {
await services.bulkDataService.bulkDataInsert({
projectName,
tableName: table.title,
body: tempData,
cookie: {},
skip_hooks: true,
});
logBasic(
`:: Importing '${
table.title
}' data :: ${importedCount} - ${Math.min(
importedCount + BULK_DATA_BATCH_SIZE,
importedCount + tempData.length,
allRecordsCount,
)}`,
);
importedCount += tempData.length;
tempData = [];
}
resolve(true);
});
});
@ -219,7 +247,7 @@ export async function importLTARData({
curCol: { title?: string };
refCol: { title?: string };
}> = [];
const allData =
const allData: EntityMap =
records ||
(await readAllData({
table,
@ -277,17 +305,16 @@ export async function importLTARData({
for await (const assocMeta of assocTableMetas) {
let assocTableData = [];
let importedCount = 0;
let tempCount = 0;
// extract insert data from records
// extract link data from records
await new Promise((resolve) => {
const promises = [];
const readable = allData.getStream();
let activeProcess = 0;
readable.on('data', async (record) => {
promises.push(
new Promise(async (resolve) => {
activeProcess++;
if (activeProcess >= BULK_PARALLEL_PROCESS) readable.pause();
const { id: _atId, ...rec } = record;
// todo: use actual alias instead of sanitized
@ -299,42 +326,56 @@ export async function importLTARData({
[assocMeta.refCol.title]: id,
})),
);
if (assocTableData.length >= ASSOC_BULK_DATA_BATCH_SIZE) {
let insertArray = assocTableData.splice(0, assocTableData.length);
logBasic(
`:: Importing '${
table.title
}' LTAR data :: ${importedCount} - ${Math.min(
importedCount + ASSOC_BULK_DATA_BATCH_SIZE,
insertArray.length,
)}`,
);
await services.bulkDataService.bulkDataInsert({
projectName,
tableName: assocMeta.modelMeta.title,
body: insertArray,
cookie: {},
});
importedCount += insertArray.length;
insertArray = [];
tempCount++;
if (tempCount >= BULK_DATA_BATCH_COUNT) {
if (sizeof(assocTableData) >= BULK_DATA_BATCH_SIZE) {
readable.pause();
let insertArray = assocTableData.splice(
0,
assocTableData.length,
);
logBasic(
`:: Importing '${
table.title
}' LTAR data :: ${importedCount} - ${Math.min(
importedCount + insertArray.length,
insertArray.length,
)}`,
);
await services.bulkDataService.bulkDataInsert({
projectName,
tableName: assocMeta.modelMeta.title,
body: insertArray,
cookie: {},
skip_hooks: true,
});
importedCount += insertArray.length;
insertArray = [];
readable.resume();
}
tempCount = 0;
}
activeProcess--;
if (activeProcess < BULK_PARALLEL_PROCESS) readable.resume();
resolve(true);
}),
);
});
readable.on('end', async () => {
// ensure all chunks are processed
await Promise.all(promises);
// insert remaining data
if (assocTableData.length >= 0) {
logBasic(
`:: Importing '${
table.title
}' LTAR data :: ${importedCount} - ${Math.min(
importedCount + ASSOC_BULK_DATA_BATCH_SIZE,
importedCount + assocTableData.length,
assocTableData.length,
)}`,
);
@ -344,11 +385,13 @@ export async function importLTARData({
tableName: assocMeta.modelMeta.title,
body: assocTableData,
cookie: {},
skip_hooks: true,
});
importedCount += assocTableData.length;
assocTableData = [];
}
resolve(true);
});
});

2
packages/nocodb/src/services/bulk-data-alias.service.ts

@ -43,6 +43,7 @@ export class BulkDataAliasService {
cookie: any;
chunkSize?: number;
foreign_key_checks?: boolean;
skip_hooks?: boolean;
raw?: boolean;
},
) {
@ -54,6 +55,7 @@ export class BulkDataAliasService {
{
cookie: param.cookie,
foreign_key_checks: param.foreign_key_checks,
skip_hooks: param.skip_hooks,
raw: param.raw,
},
],

Loading…
Cancel
Save