Browse Source

Merge pull request #7476 from nocodb/nc-fix/at-import-data

fix: at import data
pull/7479/head
Mert E 11 months ago committed by GitHub
parent
commit
e833934581
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 31
      packages/nocodb/src/modules/jobs/jobs/at-import/at-import.processor.ts
  2. 59
      packages/nocodb/src/modules/jobs/jobs/at-import/helpers/readAndProcessData.ts

31
packages/nocodb/src/modules/jobs/jobs/at-import/at-import.processor.ts

@ -1528,6 +1528,11 @@ export class AtImportProcessor {
} }
break; break;
case UITypes.LongText:
// eslint-disable-next-line no-control-regex
rec[key] = value.replace(/\u0000/g, '');
break;
default: default:
break; break;
} }
@ -2352,6 +2357,22 @@ export class AtImportProcessor {
try { try {
logBasic('SDK initialized'); logBasic('SDK initialized');
// clear all tables if debug mode
if (debugMode) {
const tables = await this.tablesService.getAccessibleTables({
baseId: syncDB.baseId,
sourceId: syncDB.sourceId,
roles: { ...userRole, owner: true },
});
for (const table of tables) {
await this.tablesService.tableDelete({
tableId: table.id,
user: syncDB.user,
forceDeleteRelations: true,
});
}
}
logDetailed('Base initialization started'); logDetailed('Base initialization started');
logDetailed('Base initialized'); logDetailed('Base initialized');
@ -2462,14 +2483,15 @@ export class AtImportProcessor {
baseName: syncDB.baseId, baseName: syncDB.baseId,
table: ncTbl, table: ncTbl,
atBase, atBase,
logBasic,
nocoBaseDataProcessing_v2, nocoBaseDataProcessing_v2,
sDB: syncDB, sDB: syncDB,
logDetailed,
services: { services: {
tableService: this.tablesService, tableService: this.tablesService,
bulkDataService: this.bulkDataAliasService, bulkDataService: this.bulkDataAliasService,
}, },
logBasic,
logDetailed,
logWarning,
}); });
rtc.data.records += await recordsMap[ncTbl.id].getCount(); rtc.data.records += await recordsMap[ncTbl.id].getCount();
@ -2497,9 +2519,7 @@ export class AtImportProcessor {
baseName: syncDB.baseId, baseName: syncDB.baseId,
atBase, atBase,
fields: null, //Object.values(tblLinkGroup).flat(), fields: null, //Object.values(tblLinkGroup).flat(),
logBasic,
insertedAssocRef, insertedAssocRef,
logDetailed,
records: recordsMap[ncTbl.id], records: recordsMap[ncTbl.id],
atNcAliasRef, atNcAliasRef,
ncLinkMappingTable, ncLinkMappingTable,
@ -2508,6 +2528,9 @@ export class AtImportProcessor {
tableService: this.tablesService, tableService: this.tablesService,
bulkDataService: this.bulkDataAliasService, bulkDataService: this.bulkDataAliasService,
}, },
logBasic,
logDetailed,
logWarning,
}); });
} }
} catch (error) { } catch (error) {

59
packages/nocodb/src/modules/jobs/jobs/at-import/helpers/readAndProcessData.ts

@ -1,6 +1,7 @@
/* eslint-disable no-async-promise-executor */ /* eslint-disable no-async-promise-executor */
import { isLinksOrLTAR, RelationTypes } from 'nocodb-sdk'; import { isLinksOrLTAR, RelationTypes } from 'nocodb-sdk';
import sizeof from 'object-sizeof'; import sizeof from 'object-sizeof';
import { Logger } from '@nestjs/common';
import EntityMap from './EntityMap'; import EntityMap from './EntityMap';
import type { BulkDataAliasService } from '../../../../../services/bulk-data-alias.service'; import type { BulkDataAliasService } from '../../../../../services/bulk-data-alias.service';
import type { TablesService } from '../../../../../services/tables.service'; import type { TablesService } from '../../../../../services/tables.service';
@ -8,6 +9,8 @@ import type { TablesService } from '../../../../../services/tables.service';
import type { AirtableBase } from 'airtable/lib/airtable_base'; import type { AirtableBase } from 'airtable/lib/airtable_base';
import type { TableType } from 'nocodb-sdk'; import type { TableType } from 'nocodb-sdk';
const logger = new Logger('BaseModelSqlv2');
const BULK_DATA_BATCH_COUNT = 20; // check size for every 100 records const BULK_DATA_BATCH_COUNT = 20; // check size for every 100 records
const BULK_DATA_BATCH_SIZE = 50 * 1024; // in bytes const BULK_DATA_BATCH_SIZE = 50 * 1024; // in bytes
const BULK_LINK_BATCH_COUNT = 1000; // process 1000 records at a time const BULK_LINK_BATCH_COUNT = 1000; // process 1000 records at a time
@ -23,14 +26,16 @@ async function readAllData({
fields, fields,
atBase, atBase,
logBasic = (_str) => {}, logBasic = (_str) => {},
logWarning = (_str) => {},
}: { }: {
table: { title?: string }; table: { title?: string };
fields?; fields?;
atBase: AirtableBase; atBase: AirtableBase;
logBasic?: (string) => void; logBasic?: (string) => void;
logDetailed?: (string) => void; logDetailed?: (string) => void;
logWarning?: (string) => void;
}): Promise<EntityMap> { }): Promise<EntityMap> {
return new Promise((resolve, reject) => { return new Promise((resolve) => {
let data = null; let data = null;
const selectParams: any = { const selectParams: any = {
@ -74,8 +79,10 @@ async function readAllData({
}, },
async function done(err) { async function done(err) {
if (err) { if (err) {
console.error(err); logger.error(err);
return reject(err); logWarning(
`There were errors on reading '${table.title}' data :: ${err}`,
);
} }
resolve(data); resolve(data);
}, },
@ -89,8 +96,9 @@ export async function importData({
atBase, atBase,
nocoBaseDataProcessing_v2, nocoBaseDataProcessing_v2,
sDB, sDB,
logDetailed = (_str) => {},
logBasic = (_str) => {}, logBasic = (_str) => {},
logDetailed = (_str) => {},
logWarning = (_str) => {},
services, services,
}: { }: {
baseName: string; baseName: string;
@ -99,6 +107,7 @@ export async function importData({
atBase: AirtableBase; atBase: AirtableBase;
logBasic: (string) => void; logBasic: (string) => void;
logDetailed: (string) => void; logDetailed: (string) => void;
logWarning: (string) => void;
nocoBaseDataProcessing_v2; nocoBaseDataProcessing_v2;
sDB; sDB;
services: AirtableImportContext; services: AirtableImportContext;
@ -112,7 +121,7 @@ export async function importData({
logBasic, logBasic,
}); });
await new Promise(async (resolve, reject) => { await new Promise(async (resolve) => {
const readable = records.getStream(); const readable = records.getStream();
const allRecordsCount = await records.getCount(); const allRecordsCount = await records.getCount();
const promises = []; const promises = [];
@ -126,7 +135,7 @@ export async function importData({
readable.on('data', async (record) => { readable.on('data', async (record) => {
promises.push( promises.push(
new Promise(async (resolve, reject) => { new Promise(async (resolve) => {
try { try {
activeProcess++; activeProcess++;
if (activeProcess >= BULK_PARALLEL_PROCESS) readable.pause(); if (activeProcess >= BULK_PARALLEL_PROCESS) readable.pause();
@ -173,7 +182,12 @@ export async function importData({
if (activeProcess < BULK_PARALLEL_PROCESS) readable.resume(); if (activeProcess < BULK_PARALLEL_PROCESS) readable.resume();
resolve(true); resolve(true);
} catch (e) { } catch (e) {
reject(e); logger.error(e);
logWarning(
`There were errors on importing '${table.title}' data :: ${e}`,
);
readable.resume();
resolve(true);
} }
}), }),
); );
@ -206,7 +220,11 @@ export async function importData({
} }
resolve(true); resolve(true);
} catch (e) { } catch (e) {
return reject(e); logger.error(e);
logWarning(
`There were errors on importing '${table.title}' data :: ${e}`,
);
resolve(true);
} }
}); });
}); });
@ -223,20 +241,19 @@ export async function importLTARData({
atBase, atBase,
baseName, baseName,
insertedAssocRef = {}, insertedAssocRef = {},
logDetailed = (_str) => {},
logBasic = (_str) => {},
records, records,
atNcAliasRef, atNcAliasRef,
ncLinkMappingTable, ncLinkMappingTable,
syncDB, syncDB,
services, services,
logBasic = (_str) => {},
logDetailed = (_str) => {},
logWarning = (_str) => {},
}: { }: {
baseName: string; baseName: string;
table: { title?: string; id?: string }; table: { title?: string; id?: string };
fields; fields;
atBase: AirtableBase; atBase: AirtableBase;
logDetailed: (string) => void;
logBasic: (string) => void;
insertedAssocRef: { [assocTableId: string]: boolean }; insertedAssocRef: { [assocTableId: string]: boolean };
records?: EntityMap; records?: EntityMap;
atNcAliasRef: { atNcAliasRef: {
@ -247,6 +264,9 @@ export async function importLTARData({
ncLinkMappingTable: Record<string, Record<string, any>>[]; ncLinkMappingTable: Record<string, Record<string, any>>[];
syncDB; syncDB;
services: AirtableImportContext; services: AirtableImportContext;
logBasic: (string) => void;
logDetailed: (string) => void;
logWarning: (string) => void;
}) { }) {
const assocTableMetas: Array<{ const assocTableMetas: Array<{
modelMeta: { id?: string; title?: string }; modelMeta: { id?: string; title?: string };
@ -313,7 +333,7 @@ export async function importLTARData({
// Iterate over all related M2M associative table // Iterate over all related M2M associative table
for (const assocMeta of assocTableMetas) { for (const assocMeta of assocTableMetas) {
// extract link data from records // extract link data from records
await new Promise((resolve, reject) => { await new Promise((resolve) => {
const promises = []; const promises = [];
const readable = allData.getStream(); const readable = allData.getStream();
@ -364,7 +384,12 @@ export async function importLTARData({
} }
resolve(true); resolve(true);
} catch (e) { } catch (e) {
reject(e); logger.error(e);
logWarning(
`There were errors on importing '${table.title}' LTAR data :: ${e}`,
);
readable.resume();
resolve(true);
} }
}), }),
); );
@ -398,7 +423,11 @@ export async function importLTARData({
resolve(true); resolve(true);
} catch (e) { } catch (e) {
reject(e); logger.error(e);
logWarning(
`There were errors on importing '${table.title}' LTAR data :: ${e}`,
);
resolve(true);
} }
}); });
}); });

Loading…
Cancel
Save