Browse Source

feat: use queue for concurrency handling

nc-fix/at-attachments
mertmit 10 months ago
parent
commit
fcf8b93e51
  1. 280
      packages/nocodb/src/modules/jobs/jobs/at-import/helpers/readAndProcessData.ts

280
packages/nocodb/src/modules/jobs/jobs/at-import/helpers/readAndProcessData.ts

@ -2,6 +2,7 @@
import { isLinksOrLTAR, RelationTypes } from 'nocodb-sdk'; import { isLinksOrLTAR, RelationTypes } from 'nocodb-sdk';
import sizeof from 'object-sizeof'; import sizeof from 'object-sizeof';
import { Logger } from '@nestjs/common'; import { Logger } from '@nestjs/common';
import PQueue from 'p-queue';
import type { BulkDataAliasService } from '~/services/bulk-data-alias.service'; import type { BulkDataAliasService } from '~/services/bulk-data-alias.service';
import type { TablesService } from '~/services/tables.service'; import type { TablesService } from '~/services/tables.service';
import type { AirtableBase } from 'airtable/lib/airtable_base'; import type { AirtableBase } from 'airtable/lib/airtable_base';
@ -10,11 +11,12 @@ import type { Source } from '~/models';
const logger = new Logger('BaseModelSqlv2'); const logger = new Logger('BaseModelSqlv2');
const BULK_DATA_BATCH_COUNT = 20; // check size for every 100 records const BULK_DATA_BATCH_COUNT = 20; // check size for every 20 records
const BULK_DATA_BATCH_SIZE = 50 * 1024; // in bytes const BULK_DATA_BATCH_SIZE = 20 * 1024; // in bytes
const BULK_LINK_BATCH_COUNT = 1000; // process 1000 links at a time const BULK_LINK_BATCH_COUNT = 200; // process 200 links at a time
const BULK_PARALLEL_PROCESS = 2; const BULK_PARALLEL_PROCESS = 5;
const STREAM_BUFFER_LIMIT = 200; const STREAM_BUFFER_LIMIT = 200;
const QUEUE_BUFFER_LIMIT = 50;
interface AirtableImportContext { interface AirtableImportContext {
bulkDataService: BulkDataAliasService; bulkDataService: BulkDataAliasService;
@ -31,6 +33,8 @@ async function readAllData({
table: { title?: string }; table: { title?: string };
fields?; fields?;
atBase: AirtableBase; atBase: AirtableBase;
dataStream: Readable;
counter?: { streamingCounter: number };
logBasic?: (string) => void; logBasic?: (string) => void;
logDetailed?: (string) => void; logDetailed?: (string) => void;
logWarning?: (string) => void; logWarning?: (string) => void;
@ -127,18 +131,25 @@ export async function importData({
services: AirtableImportContext; services: AirtableImportContext;
}): Promise<EntityMap> { }): Promise<EntityMap> {
try { try {
// returns EntityMap which allows us to stream data const counter = {
const records: EntityMap = await readAllData({ streamingCounter: 0,
};
const dataStream = new Readable({
read() {},
});
dataStream.pause();
readAllData({
table, table,
atBase, atBase,
logDetailed, logDetailed,
logBasic, logBasic,
}); });
await new Promise(async (resolve) => { return new Promise(async (resolve) => {
const readable = records.getStream(); const queue = new PQueue({ concurrency: BULK_PARALLEL_PROCESS });
const allRecordsCount = await records.getCount();
const promises = [];
const ltarPromise = importLTARData({ const ltarPromise = importLTARData({
table, table,
@ -150,6 +161,7 @@ export async function importData({
syncDB, syncDB,
source, source,
services, services,
queue,
logBasic, logBasic,
logDetailed, logDetailed,
logWarning, logWarning,
@ -163,73 +175,68 @@ export async function importData({
let importedCount = 0; let importedCount = 0;
let tempCount = 0; let tempCount = 0;
// we keep track of active process to pause and resume the stream as we have async calls within the stream and we don't want to load all data in memory dataStream.on('data', async (record) => {
let activeProcess = 0; counter.streamingCounter--;
record = JSON.parse(record);
readable.on('data', async (record) => { queue.add(
promises.push( () =>
new Promise(async (resolve) => { new Promise(async (resolve) => {
try { try {
activeProcess++; const { id: rid, ...fields } = record;
if (activeProcess >= BULK_PARALLEL_PROCESS) readable.pause(); const r = await nocoBaseDataProcessing_v2(syncDB, table, {
id: rid,
const { id: rid, ...fields } = record; fields,
const r = await nocoBaseDataProcessing_v2(sDB, table, { });
id: rid, tempData.push(r);
fields, tempCount++;
});
tempData.push(r); if (tempCount >= BULK_DATA_BATCH_COUNT) {
tempCount++; if (sizeof(tempData) >= BULK_DATA_BATCH_SIZE) {
let insertArray = tempData.splice(0, tempData.length);
if (tempCount >= BULK_DATA_BATCH_COUNT) {
if (sizeof(tempData) >= BULK_DATA_BATCH_SIZE) { await services.bulkDataService.bulkDataInsert({
readable.pause(); baseName,
tableName: table.id,
let insertArray = tempData.splice(0, tempData.length); body: insertArray,
cookie: {},
await services.bulkDataService.bulkDataInsert({ skip_hooks: true,
baseName, foreign_key_checks: !!source.isMeta(),
tableName: table.id, });
body: insertArray,
cookie: {}, logBasic(
skip_hooks: true, `:: Importing '${
foreign_key_checks: !!source.isMeta(), table.title
}); }' data :: ${importedCount} - ${
importedCount + insertArray.length
logBasic( }`,
`:: Importing '${ );
table.title
}' data :: ${importedCount} - ${Math.min( importedCount += insertArray.length;
importedCount + insertArray.length, insertArray = [];
allRecordsCount, }
)}`, tempCount = 0;
); }
importedCount += insertArray.length; if (queue.size < QUEUE_BUFFER_LIMIT / 2) dataStream.resume();
insertArray = [];
readable.resume(); resolve(true);
} } catch (e) {
tempCount = 0; logger.error(e);
logWarning(
`There were errors on importing '${table.title}' data :: ${e}`,
);
if (queue.size < QUEUE_BUFFER_LIMIT / 2) dataStream.resume();
resolve(true);
} }
activeProcess--; }),
if (activeProcess < BULK_PARALLEL_PROCESS) readable.resume();
resolve(true);
} catch (e) {
logger.error(e);
logWarning(
`There were errors on importing '${table.title}' data :: ${e}`,
);
readable.resume();
resolve(true);
}
}),
); );
if (queue.size >= QUEUE_BUFFER_LIMIT) dataStream.pause();
}); });
readable.on('end', async () => { readable.on('end', async () => {
try { try {
// ensure all chunks are processed // ensure all chunks are processed
await Promise.all(promises); await queue.onIdle();
// insert remaining data // insert remaining data
if (tempData.length > 0) { if (tempData.length > 0) {
@ -282,6 +289,7 @@ export async function importLTARData({
syncDB, syncDB,
source, source,
services, services,
queue,
logBasic = (_str) => {}, logBasic = (_str) => {},
logDetailed = (_str) => {}, logDetailed = (_str) => {},
logWarning = (_str) => {}, logWarning = (_str) => {},
@ -301,6 +309,7 @@ export async function importLTARData({
syncDB; syncDB;
source: Source; source: Source;
services: AirtableImportContext; services: AirtableImportContext;
queue: PQueue;
logBasic: (string) => void; logBasic: (string) => void;
logDetailed: (string) => void; logDetailed: (string) => void;
logWarning: (string) => void; logWarning: (string) => void;
@ -366,76 +375,83 @@ export async function importLTARData({
let nestedLinkCnt = 0; let nestedLinkCnt = 0;
let importedCount = 0; let importedCount = 0;
let assocTableData = []; const assocTableData = {};
// Iterate over all related M2M associative table // extract link data from records
for (const assocMeta of assocTableMetas) { return new Promise((resolve, reject) => {
// extract link data from records dataStream.on('data', async (record) => {
await new Promise((resolve) => { record = JSON.parse(record);
const promises = []; // Iterate over all related M2M associative table
const readable = allData.getStream(); for (const assocMeta of assocTableMetas) {
if (!assocTableData[assocMeta.modelMeta.id]) {
readable.on('data', async (record) => { assocTableData[assocMeta.modelMeta.id] = [];
promises.push( }
new Promise(async (resolve) => { queue.add(
try { () =>
const { id: _atId, ...rec } = record; new Promise(async (resolve) => {
try {
// todo: use actual alias instead of sanitized const { id: _atId, ...rec } = record;
assocTableData.push(
...( // todo: use actual alias instead of sanitized
rec?.[atNcAliasRef[table.id][assocMeta.colMeta.title]] || [] assocTableData[assocMeta.modelMeta.id].push(
).map((id) => ({ ...(
[assocMeta.curCol.title]: record.id, rec?.[atNcAliasRef[table.id][assocMeta.colMeta.title]] || []
[assocMeta.refCol.title]: id, ).map((id) => ({
})), [assocMeta.curCol.title]: record.id,
); [assocMeta.refCol.title]: id,
})),
if (assocTableData.length >= BULK_LINK_BATCH_COUNT) {
readable.pause();
let insertArray = assocTableData.splice(
0,
assocTableData.length,
); );
logBasic( if (
`:: Importing '${ assocTableData[assocMeta.modelMeta.id].length >=
table.title BULK_LINK_BATCH_COUNT
}' LTAR data :: ${importedCount} - ${ ) {
importedCount + insertArray.length let insertArray = assocTableData[
}`, assocMeta.modelMeta.id
); ].splice(0, assocTableData[assocMeta.modelMeta.id].length);
await services.bulkDataService.bulkDataInsert({ const lastImportedCount = importedCount;
baseName, importedCount += insertArray.length;
tableName: assocMeta.modelMeta.id,
body: insertArray,
cookie: {},
skip_hooks: true,
foreign_key_checks: !!source.isMeta(),
});
importedCount += insertArray.length; logBasic(
insertArray = []; `:: Importing '${
table.title
}' LTAR data :: ${lastImportedCount} - ${
lastImportedCount + insertArray.length
}`,
);
readable.resume(); await services.bulkDataService.bulkDataInsert({
baseName,
tableName: assocMeta.modelMeta.id,
body: insertArray,
cookie: {},
skip_hooks: true,
foreign_key_checks: !!source.isMeta(),
});
insertArray = [];
}
if (queue.size < QUEUE_BUFFER_LIMIT / 2) dataStream.resume();
resolve(true);
} catch (e) {
logger.error(e);
logWarning(
`There were errors on importing '${table.title}' LTAR data :: ${e}`,
);
if (queue.size < QUEUE_BUFFER_LIMIT / 2) dataStream.resume();
resolve(true);
} }
resolve(true); }),
} catch (e) {
logger.error(e);
logWarning(
`There were errors on importing '${table.title}' LTAR data :: ${e}`,
);
readable.resume();
resolve(true);
}
}),
); );
}); }
readable.on('end', async () => {
try { if (queue.size >= QUEUE_BUFFER_LIMIT) dataStream.pause();
// ensure all chunks are processed });
await Promise.all(promises); dataStream.on('end', async () => {
try {
// ensure all chunks are processed
await queue.onIdle();
// insert remaining data // insert remaining data
if (assocTableData.length >= 0) { if (assocTableData.length >= 0) {

Loading…
Cancel
Save