From c7d91be0f00cad1a74528d5e6b892c75e1f57245 Mon Sep 17 00:00:00 2001 From: Pranav C Date: Fri, 5 Jan 2024 11:32:32 +0000 Subject: [PATCH] refactor: introduce queue and better log --- .../ncXcdbCreatedAndUpdatedTimeUpgrader.ts | 108 +++++++++++++++--- 1 file changed, 91 insertions(+), 17 deletions(-) diff --git a/packages/nocodb/src/version-upgrader/ncXcdbCreatedAndUpdatedTimeUpgrader.ts b/packages/nocodb/src/version-upgrader/ncXcdbCreatedAndUpdatedTimeUpgrader.ts index 136584b493..50be6c17ab 100644 --- a/packages/nocodb/src/version-upgrader/ncXcdbCreatedAndUpdatedTimeUpgrader.ts +++ b/packages/nocodb/src/version-upgrader/ncXcdbCreatedAndUpdatedTimeUpgrader.ts @@ -1,5 +1,4 @@ import { UITypes } from 'nocodb-sdk'; -import { Logger } from '@nestjs/common'; import type { NcUpgraderCtx } from './NcUpgrader'; import type { MetaService } from '~/meta/meta.service'; import type { Base } from '~/models'; @@ -15,6 +14,51 @@ import { Altered } from '~/services/columns.service'; import NcConnectionMgrv2 from '~/utils/common/NcConnectionMgrv2'; import getColumnUiType from '~/helpers/getColumnUiType'; +// todo: move to utils +class RequestQueue { + private runningCount: number; + private queue: any[]; + private maxParallelRequests: number; + + constructor(maxParallelRequests = 10) { + this.maxParallelRequests = maxParallelRequests; + this.queue = []; + this.runningCount = 0; + } + + async enqueue(requestFunction) { + return new Promise((resolve, reject) => { + const execute = async () => { + this.runningCount++; + try { + const result = await requestFunction(); + resolve(result); + } catch (error) { + reject(error); + } finally { + this.runningCount--; + this.processQueue(); + } + }; + + if (this.runningCount < this.maxParallelRequests) { + execute(); + } else { + this.queue.push(execute); + } + }); + } + + processQueue() { + if (this.runningCount < this.maxParallelRequests && this.queue.length > 0) { + const nextRequest = this.queue.shift(); + nextRequest(); + } + } +} + +// Example Usage: + // An upgrader for upgrading created_at and updated_at columns // to system column and convert to new uidt CreatedTime and LastModifiedTime @@ -69,7 +113,9 @@ async function upgradeModels({ models.map(async (model: any) => { if (model.mm) return; - logger.log(`Upgrading model ${model.title} from base ${base.title}`); + logger.log( + `Upgrading model '${model.title}'(${model.id}) from base '${base.title}'(${base.id}})`, + ); const columns = await model.getColumns(ncMeta); const oldColumns = columns.map((c) => ({ ...c, cn: c.column_name })); @@ -131,6 +177,14 @@ async function upgradeModels({ }) )?.data?.list?.map((c) => ({ ...c, column_name: c.cn })) || []; + // if no columns found skip since table might not be there + if (!dbColumns.length) { + logger.log( + `Skipping upgrade of model '${model.title}'(${model.id}) from base '${base.title}'(${base.id}}) since columns not found`, + ); + return; + } + // create created_at and updated_at columns const newColumns = []; const existingDbColumns = []; @@ -211,7 +265,7 @@ async function upgradeModels({ // alter table and add new columns if any if (newColumns.length) { logger.log( - `Altering table ${model.title} from base ${base.title} for new columns`, + `Altering table '${model.title}'(${model.id}) from base '${base.title}'(${base.id}}) for new columns`, ); // update column in db const tableUpdateBody = { @@ -238,7 +292,9 @@ async function upgradeModels({ } } - logger.log(`Upgraded model ${model.title} from base ${base.title}`); + logger.log( + `Upgraded model '${model.title}'(${model.id}) from base '${base.title}'(${base.id}})`, + ); }), ); } @@ -263,22 +319,40 @@ export default async function ({ ncMeta }: NcUpgraderCtx) { }, }); + const requestQueue = new RequestQueue(); // iterate and upgrade each base - for (let i = 0; i < sources.length; i++) { - const source = new Source(sources[i]); + await Promise.all( + sources.map(async (_source, i) => { + const source = new Source(_source); - const base = await source.getProject(ncMeta); + const base = await source.getProject(ncMeta); - // skip deleted base bases - if (!base || base.deleted) { - logger.log(`Skipped deleted base source ${source.alias || source.id}`); - continue; - } + // skip deleted base bases + if (!base || base.deleted) { + logger.log( + `Skipped deleted base source '${source.alias || source.id}' - ${ + base.id + }`, + ); + return Promise.resolve(); + } - logger.log(`Upgrading base ${base.title} (${i + 1}/${sources.length})`); - // update the meta props - await upgradeModels({ ncMeta, source, base }); + // update the meta props + return requestQueue.enqueue(async () => { + logger.log( + `Upgrading base ${base.title}(${base.id},${source.id}) (${i + 1}/${ + sources.length + })`, + ); - logger.log(`Upgraded base ${base.title} (${i + 1}/${sources.length})`); - } + return upgradeModels({ ncMeta, source, base }).then(() => { + logger.log( + `Upgraded base '${base.title}'(${base.id},${source.id}) (${i + 1}/${ + sources.length + })`, + ); + }); + }); + }), + ); }