Browse Source

fix: duplicate base (#8678)

* fix: duplicate from source

* fix: pubsub & duplicate

Signed-off-by: mertmit <mertmit99@gmail.com>

---------

Signed-off-by: mertmit <mertmit99@gmail.com>
pull/8676/head
Mert E 6 months ago committed by GitHub
parent
commit
bc5094e742
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 5
      packages/nocodb/src/models/Base.ts
  2. 4
      packages/nocodb/src/modules/jobs/jobs.controller.ts
  3. 14
      packages/nocodb/src/modules/jobs/jobs/export-import/duplicate.controller.ts
  4. 63
      packages/nocodb/src/modules/jobs/jobs/export-import/duplicate.processor.ts
  5. 60
      packages/nocodb/src/modules/jobs/redis/jobs-redis.ts
  6. 17
      packages/nocodb/src/redis/pubsub-redis.ts

5
packages/nocodb/src/models/Base.ts

@ -560,10 +560,9 @@ export default class Base implements BaseType {
) { ) {
const base = await this.getByTitleOrId(context, titleOrId, ncMeta); const base = await this.getByTitleOrId(context, titleOrId, ncMeta);
// parse meta
base.meta = parseMetaProp(base);
if (base) { if (base) {
// parse meta
base.meta = parseMetaProp(base);
await base.getSources(ncMeta); await base.getSources(ncMeta);
} }

4
packages/nocodb/src/modules/jobs/jobs.controller.ts

@ -8,7 +8,6 @@ import {
Res, Res,
UseGuards, UseGuards,
} from '@nestjs/common'; } from '@nestjs/common';
import { Request } from 'express';
import { OnEvent } from '@nestjs/event-emitter'; import { OnEvent } from '@nestjs/event-emitter';
import { customAlphabet } from 'nanoid'; import { customAlphabet } from 'nanoid';
import type { Response } from 'express'; import type { Response } from 'express';
@ -20,8 +19,7 @@ import { CacheGetType, CacheScope } from '~/utils/globals';
import { MetaApiLimiterGuard } from '~/guards/meta-api-limiter.guard'; import { MetaApiLimiterGuard } from '~/guards/meta-api-limiter.guard';
import { IJobsService } from '~/modules/jobs/jobs-service.interface'; import { IJobsService } from '~/modules/jobs/jobs-service.interface';
import { JobsRedis } from '~/modules/jobs/redis/jobs-redis'; import { JobsRedis } from '~/modules/jobs/redis/jobs-redis';
import { TenantContext } from '~/decorators/tenant-context.decorator'; import { NcRequest } from '~/interface/config';
import { NcContext, NcRequest } from '~/interface/config';
const nanoidv2 = customAlphabet('1234567890abcdefghijklmnopqrstuvwxyz', 14); const nanoidv2 = customAlphabet('1234567890abcdefghijklmnopqrstuvwxyz', 14);
const POLLING_INTERVAL = 30000; const POLLING_INTERVAL = 30000;

14
packages/nocodb/src/modules/jobs/jobs/export-import/duplicate.controller.ts

@ -19,6 +19,7 @@ import { MetaApiLimiterGuard } from '~/guards/meta-api-limiter.guard';
import { IJobsService } from '~/modules/jobs/jobs-service.interface'; import { IJobsService } from '~/modules/jobs/jobs-service.interface';
import { TenantContext } from '~/decorators/tenant-context.decorator'; import { TenantContext } from '~/decorators/tenant-context.decorator';
import { NcContext, NcRequest } from '~/interface/config'; import { NcContext, NcRequest } from '~/interface/config';
import { RootScopes } from '~/utils/globals';
@Controller() @Controller()
@UseGuards(MetaApiLimiterGuard, GlobalGuard) @UseGuards(MetaApiLimiterGuard, GlobalGuard)
@ -50,7 +51,13 @@ export class DuplicateController {
base?: any; base?: any;
}, },
) { ) {
const base = await Base.getByUuid(context, sharedBaseId); const base = await Base.getByUuid(
{
workspace_id: RootScopes.BASE,
base_id: RootScopes.BASE,
},
sharedBaseId,
);
if (!base) { if (!base) {
throw new Error(`Base not found for id '${sharedBaseId}'`); throw new Error(`Base not found for id '${sharedBaseId}'`);
@ -80,7 +87,10 @@ export class DuplicateController {
}); });
const job = await this.jobsService.add(JobTypes.DuplicateBase, { const job = await this.jobsService.add(JobTypes.DuplicateBase, {
context, context: {
workspace_id: base.fk_workspace_id,
base_id: base.id,
},
baseId: base.id, baseId: base.id,
sourceId: source.id, sourceId: source.id,
dupProjectId: dupProject.id, dupProjectId: dupProject.id,

63
packages/nocodb/src/modules/jobs/jobs/export-import/duplicate.processor.ts

@ -49,7 +49,7 @@ export class DuplicateProcessor {
const source = await Source.get(context, sourceId); const source = await Source.get(context, sourceId);
const targetContext = { const targetContext = {
...context, workspace_id: dupProject.fk_workspace_id,
base_id: dupProject.id, base_id: dupProject.id,
}; };
@ -101,7 +101,7 @@ export class DuplicateProcessor {
} }
if (!excludeData) { if (!excludeData) {
await this.importModelsData(targetContext, { await this.importModelsData(targetContext, context, {
idMap, idMap,
sourceProject: base, sourceProject: base,
sourceModels: models, sourceModels: models,
@ -225,7 +225,7 @@ export class DuplicateProcessor {
} }
} }
await this.importModelsData(context, { await this.importModelsData(context, context, {
idMap, idMap,
sourceProject: base, sourceProject: base,
sourceModels: [sourceModel], sourceModels: [sourceModel],
@ -362,7 +362,7 @@ export class DuplicateProcessor {
} }
} }
await this.importModelsData(context, { await this.importModelsData(context, context, {
idMap, idMap,
sourceProject: base, sourceProject: base,
sourceModels: [], sourceModels: [],
@ -405,7 +405,8 @@ export class DuplicateProcessor {
} }
async importModelsData( async importModelsData(
context: NcContext, targetContext: NcContext,
sourceContext: NcContext,
param: { param: {
idMap: Map<string, string>; idMap: Map<string, string>;
sourceProject: Base; sourceProject: Base;
@ -444,7 +445,7 @@ export class DuplicateProcessor {
}); });
this.exportService this.exportService
.streamModelDataAsCsv(context, { .streamModelDataAsCsv(sourceContext, {
dataStream, dataStream,
linkStream, linkStream,
baseId: sourceProject.id, baseId: sourceProject.id,
@ -459,11 +460,11 @@ export class DuplicateProcessor {
}); });
const model = await Model.get( const model = await Model.get(
context, targetContext,
findWithIdentifier(idMap, sourceModel.id), findWithIdentifier(idMap, sourceModel.id),
); );
await this.importService.importDataFromCsvStream(context, { await this.importService.importDataFromCsvStream(targetContext, {
idMap, idMap,
dataStream, dataStream,
destProject, destProject,
@ -471,13 +472,16 @@ export class DuplicateProcessor {
destModel: model, destModel: model,
}); });
handledLinks = await this.importService.importLinkFromCsvStream(context, { handledLinks = await this.importService.importLinkFromCsvStream(
idMap, targetContext,
linkStream, {
destProject, idMap,
destBase, linkStream,
handledLinks, destProject,
}); destBase,
handledLinks,
},
);
elapsedTime( elapsedTime(
hrTime, hrTime,
@ -506,7 +510,7 @@ export class DuplicateProcessor {
let error = null; let error = null;
this.exportService this.exportService
.streamModelDataAsCsv(context, { .streamModelDataAsCsv(targetContext, {
dataStream, dataStream,
linkStream, linkStream,
baseId: sourceProject.id, baseId: sourceProject.id,
@ -524,7 +528,7 @@ export class DuplicateProcessor {
const headers: string[] = []; const headers: string[] = [];
let chunk = []; let chunk = [];
const model = await Model.get(context, sourceModel.id); const model = await Model.get(targetContext, sourceModel.id);
await new Promise((resolve) => { await new Promise((resolve) => {
papaparse.parse(dataStream, { papaparse.parse(dataStream, {
@ -535,7 +539,7 @@ export class DuplicateProcessor {
for (const header of results.data as any) { for (const header of results.data as any) {
const id = idMap.get(header); const id = idMap.get(header);
if (id) { if (id) {
const col = await Column.get(context, { const col = await Column.get(targetContext, {
source_id: destBase.id, source_id: destBase.id,
colId: id, colId: id,
}); });
@ -545,7 +549,7 @@ export class DuplicateProcessor {
(col.colOptions?.type === RelationTypes.ONE_TO_ONE && (col.colOptions?.type === RelationTypes.ONE_TO_ONE &&
col.meta?.bt) col.meta?.bt)
) { ) {
const childCol = await Column.get(context, { const childCol = await Column.get(targetContext, {
source_id: destBase.id, source_id: destBase.id,
colId: col.colOptions.fk_child_column_id, colId: col.colOptions.fk_child_column_id,
}); });
@ -585,13 +589,16 @@ export class DuplicateProcessor {
// remove empty rows (only pk is present) // remove empty rows (only pk is present)
chunk = chunk.filter((r) => Object.keys(r).length > 1); chunk = chunk.filter((r) => Object.keys(r).length > 1);
if (chunk.length > 0) { if (chunk.length > 0) {
await this.bulkDataService.bulkDataUpdate(context, { await this.bulkDataService.bulkDataUpdate(
baseName: destProject.id, targetContext,
tableName: model.id, {
body: chunk, baseName: destProject.id,
cookie: null, tableName: model.id,
raw: true, body: chunk,
}); cookie: null,
raw: true,
},
);
} }
} catch (e) { } catch (e) {
this.debugLog(e); this.debugLog(e);
@ -608,7 +615,7 @@ export class DuplicateProcessor {
// remove empty rows (only pk is present) // remove empty rows (only pk is present)
chunk = chunk.filter((r) => Object.keys(r).length > 1); chunk = chunk.filter((r) => Object.keys(r).length > 1);
if (chunk.length > 0) { if (chunk.length > 0) {
await this.bulkDataService.bulkDataUpdate(context, { await this.bulkDataService.bulkDataUpdate(targetContext, {
baseName: destProject.id, baseName: destProject.id,
tableName: model.id, tableName: model.id,
body: chunk, body: chunk,
@ -629,7 +636,7 @@ export class DuplicateProcessor {
if (error) throw error; if (error) throw error;
handledLinks = await this.importService.importLinkFromCsvStream( handledLinks = await this.importService.importLinkFromCsvStream(
context, targetContext,
{ {
idMap, idMap,
linkStream, linkStream,

60
packages/nocodb/src/modules/jobs/redis/jobs-redis.ts

@ -3,9 +3,11 @@ import type { InstanceCommands } from '~/interface/Jobs';
import { PubSubRedis } from '~/redis/pubsub-redis'; import { PubSubRedis } from '~/redis/pubsub-redis';
import { InstanceTypes } from '~/interface/Jobs'; import { InstanceTypes } from '~/interface/Jobs';
export class JobsRedis extends PubSubRedis { export class JobsRedis {
protected static logger = new Logger(JobsRedis.name); protected static logger = new Logger(JobsRedis.name);
public static available = PubSubRedis.available;
public static primaryCallbacks: { public static primaryCallbacks: {
[key: string]: (...args) => Promise<void>; [key: string]: (...args) => Promise<void>;
} = {}; } = {};
@ -13,47 +15,61 @@ export class JobsRedis extends PubSubRedis {
{}; {};
static async initJobs() { static async initJobs() {
if (!this.initialized) { if (!PubSubRedis.initialized) {
if (!this.available) { if (!PubSubRedis.available) {
return; return;
} }
await this.init(); await PubSubRedis.init();
} }
const onMessage = async (channel, message) => { const onMessage = async (channel, message) => {
const args = message.split(':'); try {
const command = args.shift(); if (!message) {
if (channel === InstanceTypes.WORKER) { return;
this.workerCallbacks[command] && }
(await this.workerCallbacks[command](...args)); const args = message.split(':');
} else if (channel === InstanceTypes.PRIMARY) { const command = args.shift();
this.primaryCallbacks[command] &&
(await this.primaryCallbacks[command](...args)); if (channel === InstanceTypes.WORKER) {
this.workerCallbacks[command] &&
(await this.workerCallbacks[command](...args));
} else if (channel === InstanceTypes.PRIMARY) {
this.primaryCallbacks[command] &&
(await this.primaryCallbacks[command](...args));
}
} catch (error) {
this.logger.error({
message: `Error processing redis pub-sub message ${message}`,
});
} }
}; };
PubSubRedis.redisSubscriber.on('message', onMessage);
if (process.env.NC_WORKER_CONTAINER === 'true') { if (process.env.NC_WORKER_CONTAINER === 'true') {
await this.subscribe(InstanceTypes.WORKER, async (message) => { await PubSubRedis.subscribe(InstanceTypes.WORKER, async (message) => {
await onMessage(InstanceTypes.WORKER, message); await onMessage(InstanceTypes.WORKER, message);
}); });
} else { } else {
await this.subscribe(InstanceTypes.PRIMARY, async (message) => { await PubSubRedis.subscribe(InstanceTypes.PRIMARY, async (message) => {
await onMessage(InstanceTypes.PRIMARY, message); await onMessage(InstanceTypes.PRIMARY, message);
}); });
} }
} }
static async workerCount(): Promise<number> { static async workerCount(): Promise<number> {
if (!this.initialized) { if (!PubSubRedis.initialized) {
if (!this.available) { if (!PubSubRedis.available) {
return; return;
} }
await this.init(); await PubSubRedis.init();
await this.initJobs(); await this.initJobs();
} }
return new Promise((resolve) => { return new Promise((resolve) => {
this.redisClient.publish( PubSubRedis.redisClient.publish(
InstanceTypes.WORKER, InstanceTypes.WORKER,
'count', 'count',
(error, numberOfSubscribers) => { (error, numberOfSubscribers) => {
@ -70,11 +86,15 @@ export class JobsRedis extends PubSubRedis {
static async emitWorkerCommand(command: InstanceCommands, ...args: any[]) { static async emitWorkerCommand(command: InstanceCommands, ...args: any[]) {
const data = `${command}${args.length ? `:${args.join(':')}` : ''}`; const data = `${command}${args.length ? `:${args.join(':')}` : ''}`;
await this.publish(InstanceTypes.WORKER, data); await PubSubRedis.publish(InstanceTypes.WORKER, data);
} }
static async emitPrimaryCommand(command: InstanceCommands, ...args: any[]) { static async emitPrimaryCommand(command: InstanceCommands, ...args: any[]) {
const data = `${command}${args.length ? `:${args.join(':')}` : ''}`; const data = `${command}${args.length ? `:${args.join(':')}` : ''}`;
await this.publish(InstanceTypes.PRIMARY, data); await PubSubRedis.publish(InstanceTypes.PRIMARY, data);
} }
static publish = PubSubRedis.publish;
static subscribe = PubSubRedis.subscribe;
static unsubscribe = PubSubRedis.unsubscribe;
} }

17
packages/nocodb/src/redis/pubsub-redis.ts

@ -8,11 +8,10 @@ export class PubSubRedis {
protected static logger = new Logger(PubSubRedis.name); protected static logger = new Logger(PubSubRedis.name);
static redisClient: Redis; public static redisClient: Redis;
private static redisSubscriber: Redis; public static redisSubscriber: Redis;
private static unsubscribeCallbacks: { [key: string]: () => Promise<void> } = private static unsubscribeCallbacks: { [key: string]: () => Promise<void> } =
{}; {};
private static callbacks: Record<string, (...args) => Promise<void>> = {};
public static async init() { public static async init() {
if (!PubSubRedis.available) { if (!PubSubRedis.available) {
@ -22,12 +21,6 @@ export class PubSubRedis {
PubSubRedis.redisClient = new Redis(process.env.NC_REDIS_JOB_URL); PubSubRedis.redisClient = new Redis(process.env.NC_REDIS_JOB_URL);
PubSubRedis.redisSubscriber = new Redis(process.env.NC_REDIS_JOB_URL); PubSubRedis.redisSubscriber = new Redis(process.env.NC_REDIS_JOB_URL);
PubSubRedis.redisSubscriber.on('message', async (channel, message) => {
const [command, ...args] = message.split(':');
const callback = PubSubRedis.callbacks[command];
if (callback) await callback(...args);
});
PubSubRedis.initialized = true; PubSubRedis.initialized = true;
} }
@ -78,7 +71,11 @@ export class PubSubRedis {
await PubSubRedis.redisSubscriber.subscribe(channel); await PubSubRedis.redisSubscriber.subscribe(channel);
const onMessage = async (_channel, message) => { const onMessage = async (messageChannel, message) => {
if (channel !== messageChannel) {
return;
}
try { try {
message = JSON.parse(message); message = JSON.parse(message);
} catch (e) {} } catch (e) {}

Loading…
Cancel
Save