From 4d6db667bfbe78154c44999aa650b4d96ab6293c Mon Sep 17 00:00:00 2001 From: "Mert E." Date: Thu, 19 Sep 2024 03:32:44 +0300 Subject: [PATCH] feat: restrict concurrent operations (#9509) * feat: restrict concurrent operations * fix: skip after first error Signed-off-by: mertmit --------- Signed-off-by: mertmit --- packages/nocodb/src/helpers/populateMeta.ts | 11 ++++- packages/nocodb/src/utils/NcHelp.ts | 45 ++++++++++++++++----- 2 files changed, 44 insertions(+), 12 deletions(-) diff --git a/packages/nocodb/src/helpers/populateMeta.ts b/packages/nocodb/src/helpers/populateMeta.ts index 1fe5e5df16..6e1c22f2e2 100644 --- a/packages/nocodb/src/helpers/populateMeta.ts +++ b/packages/nocodb/src/helpers/populateMeta.ts @@ -274,8 +274,8 @@ export async function populateMeta( // await this.syncRelations(); const tableMetasInsert = tables.map((table) => { - logger?.(`Populating meta for table '${table.title}'`); return async () => { + logger?.(`Populating meta for table '${table.title}'`); /* filter relation where this table is present */ const tableRelations = relations.filter( (r) => r.tn === table.tn || r.rtn === table.tn, @@ -386,6 +386,8 @@ export async function populateMeta( column.title = `${column.title}${c || ''}`; columnNames[column.title] = true; + logger?.(`Populating meta for column '${column.title}'`); + const rel = column.hm || column.bt; const rel_column_id = ( @@ -429,8 +431,12 @@ export async function populateMeta( } catch (e) { console.log(e); } + + logger?.(`Populated meta for column '${column.title}'`); } }); + + logger?.(`Populated meta for table '${table.title}'`); }; }); @@ -462,7 +468,6 @@ export async function populateMeta( info.viewsCount = views.length; const viewMetasInsert = views.map((table) => { - logger?.(`Populating meta for view '${table.title}'`); return async () => { const columns = ( await sqlClient.columnList({ @@ -501,6 +506,8 @@ export async function populateMeta( uidt: getColumnUiType(source, column), }); } + + logger?.(`Populated meta for view '${table.title}'`); }; }); diff --git a/packages/nocodb/src/utils/NcHelp.ts b/packages/nocodb/src/utils/NcHelp.ts index 743c0612c7..042c6a9337 100644 --- a/packages/nocodb/src/utils/NcHelp.ts +++ b/packages/nocodb/src/utils/NcHelp.ts @@ -1,20 +1,45 @@ import debug from 'debug'; +import PQueue from 'p-queue'; +import { Logger } from '@nestjs/common'; + +const NC_EXECUTE_OPERATIONS_CONCURRENCY = + parseInt(process.env.NC_EXECUTE_OPERATIONS_CONCURRENCY) || 5; export default class NcHelp { + public static logger = new Logger('NcHelp'); + public static async executeOperations( fns: Array<() => Promise>, dbType: string, ): Promise { - if (dbType === 'oracledb' || dbType === 'mssql') { - for (const fn of fns) { - await fn(); - } - } else { - await Promise.all( - fns.map(async (f) => { - await f(); - }), - ); + const queue = new PQueue({ + concurrency: + dbType === 'oracledb' || dbType === 'mssql' + ? 1 + : NC_EXECUTE_OPERATIONS_CONCURRENCY, + }); + + const errors = []; + + for (const fn of fns) { + queue.add(async () => { + if (errors.length) { + return; + } + + try { + await fn(); + } catch (e) { + this.logger.error(e); + errors.push(e); + } + }); + } + + await queue.onIdle(); + + if (errors.length) { + throw errors[0]; } }