Browse Source

feat: restrict concurrent operations (#9509)

* feat: restrict concurrent operations

* fix: skip after first error

Signed-off-by: mertmit <mertmit99@gmail.com>

---------

Signed-off-by: mertmit <mertmit99@gmail.com>
pull/9519/head
Mert E. 2 months ago committed by GitHub
parent
commit
4d6db667bf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 11
      packages/nocodb/src/helpers/populateMeta.ts
  2. 39
      packages/nocodb/src/utils/NcHelp.ts

11
packages/nocodb/src/helpers/populateMeta.ts

@ -274,8 +274,8 @@ export async function populateMeta(
// await this.syncRelations();
const tableMetasInsert = tables.map((table) => {
logger?.(`Populating meta for table '${table.title}'`);
return async () => {
logger?.(`Populating meta for table '${table.title}'`);
/* filter relation where this table is present */
const tableRelations = relations.filter(
(r) => r.tn === table.tn || r.rtn === table.tn,
@ -386,6 +386,8 @@ export async function populateMeta(
column.title = `${column.title}${c || ''}`;
columnNames[column.title] = true;
logger?.(`Populating meta for column '${column.title}'`);
const rel = column.hm || column.bt;
const rel_column_id = (
@ -429,8 +431,12 @@ export async function populateMeta(
} catch (e) {
console.log(e);
}
logger?.(`Populated meta for column '${column.title}'`);
}
});
logger?.(`Populated meta for table '${table.title}'`);
};
});
@ -462,7 +468,6 @@ export async function populateMeta(
info.viewsCount = views.length;
const viewMetasInsert = views.map((table) => {
logger?.(`Populating meta for view '${table.title}'`);
return async () => {
const columns = (
await sqlClient.columnList({
@ -501,6 +506,8 @@ export async function populateMeta(
uidt: getColumnUiType(source, column),
});
}
logger?.(`Populated meta for view '${table.title}'`);
};
});

39
packages/nocodb/src/utils/NcHelp.ts

@ -1,20 +1,45 @@
import debug from 'debug';
import PQueue from 'p-queue';
import { Logger } from '@nestjs/common';
const NC_EXECUTE_OPERATIONS_CONCURRENCY =
parseInt(process.env.NC_EXECUTE_OPERATIONS_CONCURRENCY) || 5;
export default class NcHelp {
public static logger = new Logger('NcHelp');
public static async executeOperations(
fns: Array<() => Promise<any>>,
dbType: string,
): Promise<any> {
if (dbType === 'oracledb' || dbType === 'mssql') {
const queue = new PQueue({
concurrency:
dbType === 'oracledb' || dbType === 'mssql'
? 1
: NC_EXECUTE_OPERATIONS_CONCURRENCY,
});
const errors = [];
for (const fn of fns) {
queue.add(async () => {
if (errors.length) {
return;
}
try {
await fn();
} catch (e) {
this.logger.error(e);
errors.push(e);
}
} else {
await Promise.all(
fns.map(async (f) => {
await f();
}),
);
});
}
await queue.onIdle();
if (errors.length) {
throw errors[0];
}
}

Loading…
Cancel
Save