Browse Source

fix: improved readAndProcessData error handling

pull/7476/head
mertmit 8 months ago
parent
commit
f1bebdd2b1
  1. 43
      packages/nocodb/src/modules/jobs/jobs/at-import/helpers/readAndProcessData.ts

43
packages/nocodb/src/modules/jobs/jobs/at-import/helpers/readAndProcessData.ts

@ -1,6 +1,7 @@
/* eslint-disable no-async-promise-executor */
import { isLinksOrLTAR, RelationTypes } from 'nocodb-sdk';
import sizeof from 'object-sizeof';
import { Logger } from '@nestjs/common';
import EntityMap from './EntityMap';
import type { BulkDataAliasService } from '../../../../../services/bulk-data-alias.service';
import type { TablesService } from '../../../../../services/tables.service';
@ -8,6 +9,8 @@ import type { TablesService } from '../../../../../services/tables.service';
import type { AirtableBase } from 'airtable/lib/airtable_base';
import type { TableType } from 'nocodb-sdk';
const logger = new Logger('BaseModelSqlv2');
const BULK_DATA_BATCH_COUNT = 20; // check size for every 100 records
const BULK_DATA_BATCH_SIZE = 50 * 1024; // in bytes
const BULK_LINK_BATCH_COUNT = 1000; // process 1000 records at a time
@ -30,7 +33,7 @@ async function readAllData({
logBasic?: (string) => void;
logDetailed?: (string) => void;
}): Promise<EntityMap> {
return new Promise((resolve, reject) => {
return new Promise((resolve) => {
let data = null;
const selectParams: any = {
@ -74,8 +77,10 @@ async function readAllData({
},
async function done(err) {
if (err) {
console.error(err);
return reject(err);
logger.error(err);
logBasic(
`:: There were errors on reading '${table.title}' data :: ${err}`,
);
}
resolve(data);
},
@ -112,7 +117,7 @@ export async function importData({
logBasic,
});
await new Promise(async (resolve, reject) => {
await new Promise(async (resolve) => {
const readable = records.getStream();
const allRecordsCount = await records.getCount();
const promises = [];
@ -126,7 +131,7 @@ export async function importData({
readable.on('data', async (record) => {
promises.push(
new Promise(async (resolve, reject) => {
new Promise(async (resolve) => {
try {
activeProcess++;
if (activeProcess >= BULK_PARALLEL_PROCESS) readable.pause();
@ -173,7 +178,12 @@ export async function importData({
if (activeProcess < BULK_PARALLEL_PROCESS) readable.resume();
resolve(true);
} catch (e) {
reject(e);
logger.error(e);
logBasic(
`:: There were errors on importing '${table.title}' data :: ${e}`,
);
readable.resume();
resolve(true);
}
}),
);
@ -206,7 +216,11 @@ export async function importData({
}
resolve(true);
} catch (e) {
return reject(e);
logger.error(e);
logBasic(
`:: There were errors on importing '${table.title}' data :: ${e}`,
);
resolve(true);
}
});
});
@ -313,7 +327,7 @@ export async function importLTARData({
// Iterate over all related M2M associative table
for (const assocMeta of assocTableMetas) {
// extract link data from records
await new Promise((resolve, reject) => {
await new Promise((resolve) => {
const promises = [];
const readable = allData.getStream();
@ -364,7 +378,12 @@ export async function importLTARData({
}
resolve(true);
} catch (e) {
reject(e);
logger.error(e);
logBasic(
`:: There were errors on importing '${table.title}' LTAR data :: ${e}`,
);
readable.resume();
resolve(true);
}
}),
);
@ -398,7 +417,11 @@ export async function importLTARData({
resolve(true);
} catch (e) {
reject(e);
logger.error(e);
logBasic(
`:: There were errors on importing '${table.title}' LTAR data :: ${e}`,
);
resolve(true);
}
});
});

Loading…
Cancel
Save