From fcf8b93e51291b845033feed15c6c1fa6e97ad99 Mon Sep 17 00:00:00 2001 From: mertmit Date: Thu, 25 Jan 2024 16:11:11 +0000 Subject: [PATCH] feat: use queue for concurrency handling --- .../at-import/helpers/readAndProcessData.ts | 280 +++++++++--------- 1 file changed, 148 insertions(+), 132 deletions(-) diff --git a/packages/nocodb/src/modules/jobs/jobs/at-import/helpers/readAndProcessData.ts b/packages/nocodb/src/modules/jobs/jobs/at-import/helpers/readAndProcessData.ts index 46a5ae3abb..6dc5ea7849 100644 --- a/packages/nocodb/src/modules/jobs/jobs/at-import/helpers/readAndProcessData.ts +++ b/packages/nocodb/src/modules/jobs/jobs/at-import/helpers/readAndProcessData.ts @@ -2,6 +2,7 @@ import { isLinksOrLTAR, RelationTypes } from 'nocodb-sdk'; import sizeof from 'object-sizeof'; import { Logger } from '@nestjs/common'; +import PQueue from 'p-queue'; import type { BulkDataAliasService } from '~/services/bulk-data-alias.service'; import type { TablesService } from '~/services/tables.service'; import type { AirtableBase } from 'airtable/lib/airtable_base'; @@ -10,11 +11,12 @@ import type { Source } from '~/models'; const logger = new Logger('BaseModelSqlv2'); -const BULK_DATA_BATCH_COUNT = 20; // check size for every 100 records -const BULK_DATA_BATCH_SIZE = 50 * 1024; // in bytes -const BULK_LINK_BATCH_COUNT = 1000; // process 1000 links at a time -const BULK_PARALLEL_PROCESS = 2; +const BULK_DATA_BATCH_COUNT = 20; // check size for every 20 records +const BULK_DATA_BATCH_SIZE = 20 * 1024; // in bytes +const BULK_LINK_BATCH_COUNT = 200; // process 200 links at a time +const BULK_PARALLEL_PROCESS = 5; const STREAM_BUFFER_LIMIT = 200; +const QUEUE_BUFFER_LIMIT = 50; interface AirtableImportContext { bulkDataService: BulkDataAliasService; @@ -31,6 +33,8 @@ async function readAllData({ table: { title?: string }; fields?; atBase: AirtableBase; + dataStream: Readable; + counter?: { streamingCounter: number }; logBasic?: (string) => void; logDetailed?: (string) => void; logWarning?: (string) => void; @@ -127,18 +131,25 @@ export async function importData({ services: AirtableImportContext; }): Promise { try { - // returns EntityMap which allows us to stream data - const records: EntityMap = await readAllData({ + const counter = { + streamingCounter: 0, + }; + + const dataStream = new Readable({ + read() {}, + }); + + dataStream.pause(); + + readAllData({ table, atBase, logDetailed, logBasic, }); - await new Promise(async (resolve) => { - const readable = records.getStream(); - const allRecordsCount = await records.getCount(); - const promises = []; + return new Promise(async (resolve) => { + const queue = new PQueue({ concurrency: BULK_PARALLEL_PROCESS }); const ltarPromise = importLTARData({ table, @@ -150,6 +161,7 @@ export async function importData({ syncDB, source, services, + queue, logBasic, logDetailed, logWarning, @@ -163,73 +175,68 @@ export async function importData({ let importedCount = 0; let tempCount = 0; - // we keep track of active process to pause and resume the stream as we have async calls within the stream and we don't want to load all data in memory - let activeProcess = 0; - - readable.on('data', async (record) => { - promises.push( - new Promise(async (resolve) => { - try { - activeProcess++; - if (activeProcess >= BULK_PARALLEL_PROCESS) readable.pause(); - - const { id: rid, ...fields } = record; - const r = await nocoBaseDataProcessing_v2(sDB, table, { - id: rid, - fields, - }); - tempData.push(r); - tempCount++; - - if (tempCount >= BULK_DATA_BATCH_COUNT) { - if (sizeof(tempData) >= BULK_DATA_BATCH_SIZE) { - readable.pause(); - - let insertArray = tempData.splice(0, tempData.length); - - await services.bulkDataService.bulkDataInsert({ - baseName, - tableName: table.id, - body: insertArray, - cookie: {}, - skip_hooks: true, - foreign_key_checks: !!source.isMeta(), - }); - - logBasic( - `:: Importing '${ - table.title - }' data :: ${importedCount} - ${Math.min( - importedCount + insertArray.length, - allRecordsCount, - )}`, - ); + dataStream.on('data', async (record) => { + counter.streamingCounter--; + record = JSON.parse(record); + queue.add( + () => + new Promise(async (resolve) => { + try { + const { id: rid, ...fields } = record; + const r = await nocoBaseDataProcessing_v2(syncDB, table, { + id: rid, + fields, + }); + tempData.push(r); + tempCount++; + + if (tempCount >= BULK_DATA_BATCH_COUNT) { + if (sizeof(tempData) >= BULK_DATA_BATCH_SIZE) { + let insertArray = tempData.splice(0, tempData.length); + + await services.bulkDataService.bulkDataInsert({ + baseName, + tableName: table.id, + body: insertArray, + cookie: {}, + skip_hooks: true, + foreign_key_checks: !!source.isMeta(), + }); + + logBasic( + `:: Importing '${ + table.title + }' data :: ${importedCount} - ${ + importedCount + insertArray.length + }`, + ); + + importedCount += insertArray.length; + insertArray = []; + } + tempCount = 0; + } - importedCount += insertArray.length; - insertArray = []; + if (queue.size < QUEUE_BUFFER_LIMIT / 2) dataStream.resume(); - readable.resume(); - } - tempCount = 0; + resolve(true); + } catch (e) { + logger.error(e); + logWarning( + `There were errors on importing '${table.title}' data :: ${e}`, + ); + if (queue.size < QUEUE_BUFFER_LIMIT / 2) dataStream.resume(); + resolve(true); } - activeProcess--; - if (activeProcess < BULK_PARALLEL_PROCESS) readable.resume(); - resolve(true); - } catch (e) { - logger.error(e); - logWarning( - `There were errors on importing '${table.title}' data :: ${e}`, - ); - readable.resume(); - resolve(true); - } - }), + }), ); + + if (queue.size >= QUEUE_BUFFER_LIMIT) dataStream.pause(); }); readable.on('end', async () => { try { // ensure all chunks are processed - await Promise.all(promises); + await queue.onIdle(); // insert remaining data if (tempData.length > 0) { @@ -282,6 +289,7 @@ export async function importLTARData({ syncDB, source, services, + queue, logBasic = (_str) => {}, logDetailed = (_str) => {}, logWarning = (_str) => {}, @@ -301,6 +309,7 @@ export async function importLTARData({ syncDB; source: Source; services: AirtableImportContext; + queue: PQueue; logBasic: (string) => void; logDetailed: (string) => void; logWarning: (string) => void; @@ -366,76 +375,83 @@ export async function importLTARData({ let nestedLinkCnt = 0; let importedCount = 0; - let assocTableData = []; - // Iterate over all related M2M associative table - for (const assocMeta of assocTableMetas) { - // extract link data from records - await new Promise((resolve) => { - const promises = []; - const readable = allData.getStream(); - - readable.on('data', async (record) => { - promises.push( - new Promise(async (resolve) => { - try { - const { id: _atId, ...rec } = record; - - // todo: use actual alias instead of sanitized - assocTableData.push( - ...( - rec?.[atNcAliasRef[table.id][assocMeta.colMeta.title]] || [] - ).map((id) => ({ - [assocMeta.curCol.title]: record.id, - [assocMeta.refCol.title]: id, - })), - ); - - if (assocTableData.length >= BULK_LINK_BATCH_COUNT) { - readable.pause(); - - let insertArray = assocTableData.splice( - 0, - assocTableData.length, + const assocTableData = {}; + // extract link data from records + return new Promise((resolve, reject) => { + dataStream.on('data', async (record) => { + record = JSON.parse(record); + // Iterate over all related M2M associative table + for (const assocMeta of assocTableMetas) { + if (!assocTableData[assocMeta.modelMeta.id]) { + assocTableData[assocMeta.modelMeta.id] = []; + } + queue.add( + () => + new Promise(async (resolve) => { + try { + const { id: _atId, ...rec } = record; + + // todo: use actual alias instead of sanitized + assocTableData[assocMeta.modelMeta.id].push( + ...( + rec?.[atNcAliasRef[table.id][assocMeta.colMeta.title]] || [] + ).map((id) => ({ + [assocMeta.curCol.title]: record.id, + [assocMeta.refCol.title]: id, + })), ); - logBasic( - `:: Importing '${ - table.title - }' LTAR data :: ${importedCount} - ${ - importedCount + insertArray.length - }`, - ); + if ( + assocTableData[assocMeta.modelMeta.id].length >= + BULK_LINK_BATCH_COUNT + ) { + let insertArray = assocTableData[ + assocMeta.modelMeta.id + ].splice(0, assocTableData[assocMeta.modelMeta.id].length); - await services.bulkDataService.bulkDataInsert({ - baseName, - tableName: assocMeta.modelMeta.id, - body: insertArray, - cookie: {}, - skip_hooks: true, - foreign_key_checks: !!source.isMeta(), - }); + const lastImportedCount = importedCount; + importedCount += insertArray.length; - importedCount += insertArray.length; - insertArray = []; + logBasic( + `:: Importing '${ + table.title + }' LTAR data :: ${lastImportedCount} - ${ + lastImportedCount + insertArray.length + }`, + ); - readable.resume(); + await services.bulkDataService.bulkDataInsert({ + baseName, + tableName: assocMeta.modelMeta.id, + body: insertArray, + cookie: {}, + skip_hooks: true, + foreign_key_checks: !!source.isMeta(), + }); + + insertArray = []; + } + + if (queue.size < QUEUE_BUFFER_LIMIT / 2) dataStream.resume(); + resolve(true); + } catch (e) { + logger.error(e); + logWarning( + `There were errors on importing '${table.title}' LTAR data :: ${e}`, + ); + if (queue.size < QUEUE_BUFFER_LIMIT / 2) dataStream.resume(); + resolve(true); } - resolve(true); - } catch (e) { - logger.error(e); - logWarning( - `There were errors on importing '${table.title}' LTAR data :: ${e}`, - ); - readable.resume(); - resolve(true); - } - }), + }), ); - }); - readable.on('end', async () => { - try { - // ensure all chunks are processed - await Promise.all(promises); + } + + if (queue.size >= QUEUE_BUFFER_LIMIT) dataStream.pause(); + }); + dataStream.on('end', async () => { + try { + // ensure all chunks are processed + await queue.onIdle(); // insert remaining data if (assocTableData.length >= 0) {