Browse Source

feat: parallel processing

nc-feat/attachment-clean-up
mertmit 4 months ago
parent
commit
6deb4f03f3
  1. 74
      packages/nocodb/src/modules/jobs/migration-jobs/nc_job_001_attachment.ts

74
packages/nocodb/src/modules/jobs/migration-jobs/nc_job_001_attachment.ts

@ -1,5 +1,6 @@
import path from 'path';
import debug from 'debug';
import PQueue from 'p-queue';
import { UITypes } from 'nocodb-sdk';
import { Injectable } from '@nestjs/common';
import { FileReference, Source } from '~/models';
@ -205,7 +206,7 @@ export class AttachmentMigration {
this.log(
`External source ${source_id} (${source.alias}) is not accessible`,
);
return;
throw e;
}
}
@ -230,6 +231,12 @@ export class AttachmentMigration {
dataOffset = processedModel.offset;
}
const numRecords = await baseModel.count();
this.log(
`Processing model "${model.title}" with ${numRecords} records and ${attachmentColumns.length} attachment columns`,
);
// eslint-disable-next-line no-constant-condition
while (true) {
const data = await baseModel.list(
@ -460,12 +467,51 @@ export class AttachmentMigration {
.first()
)?.count;
const processModelLimit = 100;
const skipModels = new Set(['placeholder']);
let processingModels = [{ fk_model_id: 'placeholder', processing: true }];
const parallelLimit = 5;
const queue = new PQueue({ concurrency: parallelLimit });
const wrapper = async (model) => {
try {
processingModels.push({
fk_model_id: model.fk_model_id,
processing: true,
});
await processModel(model);
this.log(
`Processed ${processedModelsCount} of ${numberOfModelsToBeProcessed} models`,
);
} catch (e) {
this.log(`Error processing model ${model.fk_model_id}`);
this.log(e);
skipModels.add(model.fk_model_id);
} finally {
const item = processingModels.find(
(m) => m.fk_model_id === model.fk_model_id,
);
if (item) {
item.processing = false;
}
}
};
// get models that have at least one attachment column, and not processed
// eslint-disable-next-line no-constant-condition
while (true) {
if (queue.pending > parallelLimit) {
await new Promise((resolve) => setTimeout(resolve, 1000));
continue;
}
processingModels = processingModels.filter((m) => m.processing);
// this will return until all models marked as processed
const models = await ncMeta
.knexConnection(MetaTable.COLUMNS)
@ -478,23 +524,27 @@ export class AttachmentMigration {
.select('fk_model_id')
.where('completed', true),
)
.whereNotIn(
'fk_model_id',
Array.from(skipModels)
.map((m) => m)
.concat(processingModels.map((m) => m.fk_model_id)),
)
.groupBy(selectFields)
.limit(processModelLimit);
.limit(100);
if (!models?.length) {
break;
}
for (const model of models) {
try {
await processModel(model);
this.log(
`Processed ${processedModelsCount} of ${numberOfModelsToBeProcessed} models`,
);
} catch (e) {
this.log(`Error processing model ${model.fk_model_id}`);
this.log(e);
}
queue
.add(() => wrapper(model))
.catch((e) => {
this.log(`Error processing model ${model.fk_model_id}`);
this.log(e);
skipModels.add(model.fk_model_id);
});
}
}

Loading…
Cancel
Save