Browse Source

refactor: introduce queue and better log

pull/7369/head
Pranav C 10 months ago
parent
commit
c7d91be0f0
  1. 108
      packages/nocodb/src/version-upgrader/ncXcdbCreatedAndUpdatedTimeUpgrader.ts

108
packages/nocodb/src/version-upgrader/ncXcdbCreatedAndUpdatedTimeUpgrader.ts

@ -1,5 +1,4 @@
import { UITypes } from 'nocodb-sdk'; import { UITypes } from 'nocodb-sdk';
import { Logger } from '@nestjs/common';
import type { NcUpgraderCtx } from './NcUpgrader'; import type { NcUpgraderCtx } from './NcUpgrader';
import type { MetaService } from '~/meta/meta.service'; import type { MetaService } from '~/meta/meta.service';
import type { Base } from '~/models'; import type { Base } from '~/models';
@ -15,6 +14,51 @@ import { Altered } from '~/services/columns.service';
import NcConnectionMgrv2 from '~/utils/common/NcConnectionMgrv2'; import NcConnectionMgrv2 from '~/utils/common/NcConnectionMgrv2';
import getColumnUiType from '~/helpers/getColumnUiType'; import getColumnUiType from '~/helpers/getColumnUiType';
// todo: move to utils
class RequestQueue {
private runningCount: number;
private queue: any[];
private maxParallelRequests: number;
constructor(maxParallelRequests = 10) {
this.maxParallelRequests = maxParallelRequests;
this.queue = [];
this.runningCount = 0;
}
async enqueue(requestFunction) {
return new Promise((resolve, reject) => {
const execute = async () => {
this.runningCount++;
try {
const result = await requestFunction();
resolve(result);
} catch (error) {
reject(error);
} finally {
this.runningCount--;
this.processQueue();
}
};
if (this.runningCount < this.maxParallelRequests) {
execute();
} else {
this.queue.push(execute);
}
});
}
processQueue() {
if (this.runningCount < this.maxParallelRequests && this.queue.length > 0) {
const nextRequest = this.queue.shift();
nextRequest();
}
}
}
// Example Usage:
// An upgrader for upgrading created_at and updated_at columns // An upgrader for upgrading created_at and updated_at columns
// to system column and convert to new uidt CreatedTime and LastModifiedTime // to system column and convert to new uidt CreatedTime and LastModifiedTime
@ -69,7 +113,9 @@ async function upgradeModels({
models.map(async (model: any) => { models.map(async (model: any) => {
if (model.mm) return; if (model.mm) return;
logger.log(`Upgrading model ${model.title} from base ${base.title}`); logger.log(
`Upgrading model '${model.title}'(${model.id}) from base '${base.title}'(${base.id}})`,
);
const columns = await model.getColumns(ncMeta); const columns = await model.getColumns(ncMeta);
const oldColumns = columns.map((c) => ({ ...c, cn: c.column_name })); const oldColumns = columns.map((c) => ({ ...c, cn: c.column_name }));
@ -131,6 +177,14 @@ async function upgradeModels({
}) })
)?.data?.list?.map((c) => ({ ...c, column_name: c.cn })) || []; )?.data?.list?.map((c) => ({ ...c, column_name: c.cn })) || [];
// if no columns found skip since table might not be there
if (!dbColumns.length) {
logger.log(
`Skipping upgrade of model '${model.title}'(${model.id}) from base '${base.title}'(${base.id}}) since columns not found`,
);
return;
}
// create created_at and updated_at columns // create created_at and updated_at columns
const newColumns = []; const newColumns = [];
const existingDbColumns = []; const existingDbColumns = [];
@ -211,7 +265,7 @@ async function upgradeModels({
// alter table and add new columns if any // alter table and add new columns if any
if (newColumns.length) { if (newColumns.length) {
logger.log( logger.log(
`Altering table ${model.title} from base ${base.title} for new columns`, `Altering table '${model.title}'(${model.id}) from base '${base.title}'(${base.id}}) for new columns`,
); );
// update column in db // update column in db
const tableUpdateBody = { const tableUpdateBody = {
@ -238,7 +292,9 @@ async function upgradeModels({
} }
} }
logger.log(`Upgraded model ${model.title} from base ${base.title}`); logger.log(
`Upgraded model '${model.title}'(${model.id}) from base '${base.title}'(${base.id}})`,
);
}), }),
); );
} }
@ -263,22 +319,40 @@ export default async function ({ ncMeta }: NcUpgraderCtx) {
}, },
}); });
const requestQueue = new RequestQueue();
// iterate and upgrade each base // iterate and upgrade each base
for (let i = 0; i < sources.length; i++) { await Promise.all(
const source = new Source(sources[i]); sources.map(async (_source, i) => {
const source = new Source(_source);
const base = await source.getProject(ncMeta); const base = await source.getProject(ncMeta);
// skip deleted base bases // skip deleted base bases
if (!base || base.deleted) { if (!base || base.deleted) {
logger.log(`Skipped deleted base source ${source.alias || source.id}`); logger.log(
continue; `Skipped deleted base source '${source.alias || source.id}' - ${
} base.id
}`,
);
return Promise.resolve();
}
logger.log(`Upgrading base ${base.title} (${i + 1}/${sources.length})`); // update the meta props
// update the meta props return requestQueue.enqueue(async () => {
await upgradeModels({ ncMeta, source, base }); logger.log(
`Upgrading base ${base.title}(${base.id},${source.id}) (${i + 1}/${
sources.length
})`,
);
logger.log(`Upgraded base ${base.title} (${i + 1}/${sources.length})`); return upgradeModels({ ncMeta, source, base }).then(() => {
} logger.log(
`Upgraded base '${base.title}'(${base.id},${source.id}) (${i + 1}/${
sources.length
})`,
);
});
});
}),
);
} }

Loading…
Cancel
Save