Browse Source

feat: move meta diff sync to jobs

pull/6528/head
mertmit 1 year ago
parent
commit
2e32c86c17
  1. 32
      packages/nocodb/src/controllers/meta-diffs.controller.ts
  2. 1
      packages/nocodb/src/interface/Jobs.ts
  3. 6
      packages/nocodb/src/modules/jobs/fallback/fallback-queue.service.ts
  4. 9
      packages/nocodb/src/modules/jobs/jobs.module.ts
  5. 70
      packages/nocodb/src/modules/jobs/jobs/meta-sync/meta-sync.controller.ts
  6. 30
      packages/nocodb/src/modules/jobs/jobs/meta-sync/meta-sync.processor.ts
  7. 1
      packages/nocodb/src/modules/metas/metas.module.ts
  8. 287
      packages/nocodb/src/services/meta-diffs.service.ts

32
packages/nocodb/src/controllers/meta-diffs.controller.ts

@ -1,11 +1,4 @@
import { import { Controller, Get, Param, UseGuards } from '@nestjs/common';
Controller,
Get,
HttpCode,
Param,
Post,
UseGuards,
} from '@nestjs/common';
import { GlobalGuard } from '~/guards/global/global.guard'; import { GlobalGuard } from '~/guards/global/global.guard';
import { MetaDiffsService } from '~/services/meta-diffs.service'; import { MetaDiffsService } from '~/services/meta-diffs.service';
import { Acl } from '~/middlewares/extract-ids/extract-ids.middleware'; import { Acl } from '~/middlewares/extract-ids/extract-ids.middleware';
@ -32,27 +25,4 @@ export class MetaDiffsController {
projectId, projectId,
}); });
} }
@Post('/api/v1/db/meta/projects/:projectId/meta-diff')
@HttpCode(200)
@Acl('metaDiffSync')
async metaDiffSync(@Param('projectId') projectId: string) {
await this.metaDiffsService.metaDiffSync({ projectId });
return { msg: 'The meta has been synchronized successfully' };
}
@Post('/api/v1/db/meta/projects/:projectId/meta-diff/:baseId')
@HttpCode(200)
@Acl('baseMetaDiffSync')
async baseMetaDiffSync(
@Param('projectId') projectId: string,
@Param('baseId') baseId: string,
) {
await this.metaDiffsService.baseMetaDiffSync({
projectId,
baseId,
});
return { msg: 'The base meta has been synchronized successfully' };
}
} }

1
packages/nocodb/src/interface/Jobs.ts

@ -4,6 +4,7 @@ export enum JobTypes {
DuplicateBase = 'duplicate-base', DuplicateBase = 'duplicate-base',
DuplicateModel = 'duplicate-model', DuplicateModel = 'duplicate-model',
AtImport = 'at-import', AtImport = 'at-import',
MetaSync = 'meta-sync',
} }
export enum JobStatus { export enum JobStatus {

6
packages/nocodb/src/modules/jobs/fallback/fallback-queue.service.ts

@ -3,6 +3,7 @@ import PQueue from 'p-queue';
import Emittery from 'emittery'; import Emittery from 'emittery';
import { DuplicateProcessor } from '../jobs/export-import/duplicate.processor'; import { DuplicateProcessor } from '../jobs/export-import/duplicate.processor';
import { AtImportProcessor } from '../jobs/at-import/at-import.processor'; import { AtImportProcessor } from '../jobs/at-import/at-import.processor';
import { MetaSyncProcessor } from '../jobs/meta-sync/meta-sync.processor';
import { JobsEventService } from './jobs-event.service'; import { JobsEventService } from './jobs-event.service';
import { JobStatus, JobTypes } from '~/interface/Jobs'; import { JobStatus, JobTypes } from '~/interface/Jobs';
@ -25,6 +26,7 @@ export class QueueService {
private readonly jobsEventService: JobsEventService, private readonly jobsEventService: JobsEventService,
private readonly duplicateProcessor: DuplicateProcessor, private readonly duplicateProcessor: DuplicateProcessor,
private readonly atImportProcessor: AtImportProcessor, private readonly atImportProcessor: AtImportProcessor,
private readonly metaSyncProcessor: MetaSyncProcessor,
) { ) {
this.emitter.on(JobStatus.ACTIVE, (data: { job: Job }) => { this.emitter.on(JobStatus.ACTIVE, (data: { job: Job }) => {
const job = this.queueMemory.find((job) => job.id === data.job.id); const job = this.queueMemory.find((job) => job.id === data.job.id);
@ -66,6 +68,10 @@ export class QueueService {
this: this.atImportProcessor, this: this.atImportProcessor,
fn: this.atImportProcessor.job, fn: this.atImportProcessor.job,
}, },
[JobTypes.MetaSync]: {
this: this.metaSyncProcessor,
fn: this.metaSyncProcessor.job,
},
}; };
async jobWrapper(job: Job) { async jobWrapper(job: Job) {

9
packages/nocodb/src/modules/jobs/jobs.module.ts

@ -1,11 +1,17 @@
import { Module } from '@nestjs/common'; import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bull'; import { BullModule } from '@nestjs/bull';
// Jobs
import { ExportService } from './jobs/export-import/export.service'; import { ExportService } from './jobs/export-import/export.service';
import { ImportService } from './jobs/export-import/import.service'; import { ImportService } from './jobs/export-import/import.service';
import { AtImportController } from './jobs/at-import/at-import.controller'; import { AtImportController } from './jobs/at-import/at-import.controller';
import { AtImportProcessor } from './jobs/at-import/at-import.processor'; import { AtImportProcessor } from './jobs/at-import/at-import.processor';
import { DuplicateController } from './jobs/export-import/duplicate.controller'; import { DuplicateController } from './jobs/export-import/duplicate.controller';
import { DuplicateProcessor } from './jobs/export-import/duplicate.processor'; import { DuplicateProcessor } from './jobs/export-import/duplicate.processor';
import { MetaSyncController } from './jobs/meta-sync/meta-sync.controller';
import { MetaSyncProcessor } from './jobs/meta-sync/meta-sync.processor';
// Jobs Module Related
import { JobsLogService } from './jobs/jobs-log.service'; import { JobsLogService } from './jobs/jobs-log.service';
import { JobsGateway } from './jobs.gateway'; import { JobsGateway } from './jobs.gateway';
@ -41,7 +47,7 @@ import { GlobalModule } from '~/modules/global/global.module';
], ],
controllers: [ controllers: [
...(process.env.NC_WORKER_CONTAINER !== 'true' ...(process.env.NC_WORKER_CONTAINER !== 'true'
? [DuplicateController, AtImportController] ? [DuplicateController, AtImportController, MetaSyncController]
: []), : []),
], ],
providers: [ providers: [
@ -60,6 +66,7 @@ import { GlobalModule } from '~/modules/global/global.module';
ImportService, ImportService,
DuplicateProcessor, DuplicateProcessor,
AtImportProcessor, AtImportProcessor,
MetaSyncProcessor,
], ],
}) })
export class JobsModule {} export class JobsModule {}

70
packages/nocodb/src/modules/jobs/jobs/meta-sync/meta-sync.controller.ts

@ -0,0 +1,70 @@
import {
Controller,
HttpCode,
Inject,
Param,
Post,
Request,
UseGuards,
} from '@nestjs/common';
import { GlobalGuard } from '~/guards/global/global.guard';
import { Acl } from '~/middlewares/extract-ids/extract-ids.middleware';
import { NcError } from '~/helpers/catchError';
import { JobTypes } from '~/interface/Jobs';
@Controller()
@UseGuards(GlobalGuard)
export class MetaSyncController {
constructor(@Inject('JobsService') private readonly jobsService) {}
@Post('/api/v1/db/meta/projects/:projectId/meta-diff')
@HttpCode(200)
@Acl('metaDiffSync')
async metaDiffSync(@Param('projectId') projectId: string, @Request() req) {
const jobs = await this.jobsService.jobList();
const fnd = jobs.find(
(j) => j.name === JobTypes.MetaSync && j.data.projectId === projectId,
);
if (fnd) {
NcError.badRequest('Meta sync already in progress for this project');
}
const job = await this.jobsService.add(JobTypes.MetaSync, {
projectId,
baseId: 'all',
user: req.user,
});
return { id: job.id };
}
@Post('/api/v1/db/meta/projects/:projectId/meta-diff/:baseId')
@HttpCode(200)
@Acl('baseMetaDiffSync')
async baseMetaDiffSync(
@Param('projectId') projectId: string,
@Param('baseId') baseId: string,
@Request() req,
) {
const jobs = await this.jobsService.jobList();
const fnd = jobs.find(
(j) =>
j.name === JobTypes.MetaSync &&
j.data.projectId === projectId &&
(j.data.baseId === baseId || j.data.baseId === 'all'),
);
if (fnd) {
NcError.badRequest('Meta sync already in progress for this project');
}
const job = await this.jobsService.add(JobTypes.MetaSync, {
projectId,
baseId,
user: req.user,
});
return { id: job.id };
}
}

30
packages/nocodb/src/modules/jobs/jobs/meta-sync/meta-sync.processor.ts

@ -0,0 +1,30 @@
import debug from 'debug';
import { Process, Processor } from '@nestjs/bull';
import { Job } from 'bull';
import { JOBS_QUEUE, JobTypes } from '~/interface/Jobs';
import { MetaDiffsService } from '~/services/meta-diffs.service';
@Processor(JOBS_QUEUE)
export class MetaSyncProcessor {
private readonly debugLog = debug('nc:meta-sync:processor');
constructor(private readonly metaDiffsService: MetaDiffsService) {}
@Process(JobTypes.MetaSync)
async job(job: Job) {
const info: {
projectId: string;
baseId: string;
user: any;
} = job.data;
if (info.baseId === 'all') {
await this.metaDiffsService.metaDiffSync({ projectId: info.projectId });
} else {
await this.metaDiffsService.baseMetaDiffSync({
projectId: info.projectId,
baseId: info.baseId,
});
}
}
}

1
packages/nocodb/src/modules/metas/metas.module.ts

@ -179,6 +179,7 @@ export const metaModuleMetadata = {
AttachmentsService, AttachmentsService,
ProjectUsersService, ProjectUsersService,
HooksService, HooksService,
MetaDiffsService,
], ],
}; };

287
packages/nocodb/src/services/meta-diffs.service.ts

@ -642,254 +642,10 @@ export class MetaDiffsService {
return changes; return changes;
} }
async metaDiffSync(param: { projectId: string }) { async syncBaseMeta(project: Project, base: Base, throwOnFail = false) {
const project = await Project.getWithInfo(param.projectId);
for (const base of project.bases) {
// skip if metadb base
if (base.is_meta) continue;
const virtualColumnInsert: Array<() => Promise<void>> = [];
// @ts-ignore
const sqlClient = await NcConnectionMgrv2.getSqlClient(base);
const changes = await this.getMetaDiff(sqlClient, project, base);
/* Get all relations */
// const relations = (await sqlClient.relationListAll())?.data?.list;
for (const { table_name, detectedChanges } of changes) {
// reorder changes to apply relation remove changes
// before column remove to avoid foreign key constraint error
detectedChanges.sort((a, b) => {
return (
applyChangesPriorityOrder.indexOf(b.type) -
applyChangesPriorityOrder.indexOf(a.type)
);
});
for (const change of detectedChanges) {
switch (change.type) {
case MetaDiffType.TABLE_NEW:
{
const columns = (
await sqlClient.columnList({
tn: table_name,
schema: base.getConfig()?.schema,
})
)?.data?.list?.map((c) => ({ ...c, column_name: c.cn }));
mapDefaultDisplayValue(columns);
const model = await Model.insert(project.id, base.id, {
table_name: table_name,
title: getTableNameAlias(
table_name,
base.is_meta ? project.prefix : '',
base,
),
type: ModelTypes.TABLE,
});
for (const column of columns) {
await Column.insert({
uidt: getColumnUiType(base, column),
fk_model_id: model.id,
...column,
title: getColumnNameAlias(column.column_name, base),
});
}
}
break;
case MetaDiffType.VIEW_NEW:
{
const columns = (
await sqlClient.columnList({
tn: table_name,
schema: base.getConfig()?.schema,
})
)?.data?.list?.map((c) => ({ ...c, column_name: c.cn }));
mapDefaultDisplayValue(columns);
const model = await Model.insert(project.id, base.id, {
table_name: table_name,
title: getTableNameAlias(table_name, project.prefix, base),
type: ModelTypes.VIEW,
});
for (const column of columns) {
await Column.insert({
uidt: getColumnUiType(base, column),
fk_model_id: model.id,
...column,
title: getColumnNameAlias(column.column_name, base),
});
}
}
break;
case MetaDiffType.TABLE_REMOVE:
case MetaDiffType.VIEW_REMOVE:
{
await change.model.delete();
}
break;
case MetaDiffType.TABLE_COLUMN_ADD:
case MetaDiffType.VIEW_COLUMN_ADD:
{
const columns = (
await sqlClient.columnList({
tn: table_name,
schema: base.getConfig()?.schema,
})
)?.data?.list?.map((c) => ({ ...c, column_name: c.cn }));
const column = columns.find((c) => c.cn === change.cn);
column.uidt = getColumnUiType(base, column);
//todo: inflection
column.title = getColumnNameAlias(column.cn, base);
await Column.insert({ fk_model_id: change.id, ...column });
}
// update old
// populateParams.tableNames.push({ tn });
// populateParams.oldMetas[tn] = oldMetas.find(m => m.tn === tn);
break;
case MetaDiffType.TABLE_COLUMN_TYPE_CHANGE:
case MetaDiffType.VIEW_COLUMN_TYPE_CHANGE:
{
const columns = (
await sqlClient.columnList({
tn: table_name,
schema: base.getConfig()?.schema,
})
)?.data?.list?.map((c) => ({ ...c, column_name: c.cn }));
const column = columns.find((c) => c.cn === change.cn);
const metaFact = ModelXcMetaFactory.create(
{ client: base.type },
{},
);
column.uidt = metaFact.getUIDataType(column);
column.title = change.column.title;
await Column.update(change.column.id, column);
}
break;
case MetaDiffType.TABLE_COLUMN_PROPS_CHANGED:
{
const columns = (
await sqlClient.columnList({ tn: table_name })
)?.data?.list?.map((c) => ({ ...c, column_name: c.cn }));
const colMeta = columns.find((c) => c.cn === change.cn);
if (!colMeta) break;
const { pk, ai, rqd, un, unique } = colMeta;
await Column.update(change.column.id, {
pk,
ai,
rqd,
un,
unique,
});
}
break;
case MetaDiffType.TABLE_COLUMN_REMOVE:
case MetaDiffType.VIEW_COLUMN_REMOVE:
await change.column.delete();
break;
case MetaDiffType.TABLE_RELATION_REMOVE:
case MetaDiffType.TABLE_VIRTUAL_M2M_REMOVE:
await change.column.delete();
break;
case MetaDiffType.TABLE_RELATION_ADD:
{
virtualColumnInsert.push(async () => {
const parentModel = await Model.getByIdOrName({
project_id: base.project_id,
base_id: base.id,
table_name: change.rtn,
});
const childModel = await Model.getByIdOrName({
project_id: base.project_id,
base_id: base.id,
table_name: change.tn,
});
const parentCol = await parentModel
.getColumns()
.then((cols) =>
cols.find((c) => c.column_name === change.rcn),
);
const childCol = await childModel
.getColumns()
.then((cols) =>
cols.find((c) => c.column_name === change.cn),
);
await Column.update(childCol.id, {
...childCol,
uidt: UITypes.ForeignKey,
system: true,
});
if (change.relationType === RelationTypes.BELONGS_TO) {
const title = getUniqueColumnAliasName(
childModel.columns,
`${parentModel.title || parentModel.table_name}`,
);
await Column.insert<LinkToAnotherRecordColumn>({
uidt: UITypes.LinkToAnotherRecord,
title,
fk_model_id: childModel.id,
fk_related_model_id: parentModel.id,
type: RelationTypes.BELONGS_TO,
fk_parent_column_id: parentCol.id,
fk_child_column_id: childCol.id,
virtual: false,
fk_index_name: change.cstn,
});
} else if (change.relationType === RelationTypes.HAS_MANY) {
const title = getUniqueColumnAliasName(
childModel.columns,
pluralize(childModel.title || childModel.table_name),
);
await Column.insert<LinkToAnotherRecordColumn>({
uidt: UITypes.Links,
title,
fk_model_id: parentModel.id,
fk_related_model_id: childModel.id,
type: RelationTypes.HAS_MANY,
fk_parent_column_id: parentCol.id,
fk_child_column_id: childCol.id,
virtual: false,
fk_index_name: change.cstn,
meta: {
plural: pluralize(childModel.title),
singular: singularize(childModel.title),
},
});
}
});
}
break;
}
}
}
await NcHelp.executeOperations(virtualColumnInsert, base.type);
// populate m2m relations
await this.extractAndGenerateManyToManyRelations(await base.getModels());
}
this.appHooksService.emit(AppEvents.META_DIFF_SYNC, {
project,
});
return true;
}
async baseMetaDiffSync(param: { projectId: string; baseId: string }) {
const project = await Project.getWithInfo(param.projectId);
const base = await Base.get(param.baseId);
if (base.is_meta) { if (base.is_meta) {
NcError.badRequest('Cannot sync meta base'); if (throwOnFail) NcError.badRequest('Cannot sync meta base');
return;
} }
const virtualColumnInsert: Array<() => Promise<void>> = []; const virtualColumnInsert: Array<() => Promise<void>> = [];
@ -902,6 +658,15 @@ export class MetaDiffsService {
// const relations = (await sqlClient.relationListAll())?.data?.list; // const relations = (await sqlClient.relationListAll())?.data?.list;
for (const { table_name, detectedChanges } of changes) { for (const { table_name, detectedChanges } of changes) {
// reorder changes to apply relation remove changes
// before column remove to avoid foreign key constraint error
detectedChanges.sort((a, b) => {
return (
applyChangesPriorityOrder.indexOf(b.type) -
applyChangesPriorityOrder.indexOf(a.type)
);
});
for (const change of detectedChanges) { for (const change of detectedChanges) {
switch (change.type) { switch (change.type) {
case MetaDiffType.TABLE_NEW: case MetaDiffType.TABLE_NEW:
@ -1076,13 +841,14 @@ export class MetaDiffsService {
fk_parent_column_id: parentCol.id, fk_parent_column_id: parentCol.id,
fk_child_column_id: childCol.id, fk_child_column_id: childCol.id,
virtual: false, virtual: false,
fk_index_name: change.cstn,
}); });
} else if (change.relationType === RelationTypes.HAS_MANY) { } else if (change.relationType === RelationTypes.HAS_MANY) {
const title = getUniqueColumnAliasName( const title = getUniqueColumnAliasName(
childModel.columns, childModel.columns,
pluralize(childModel.title || childModel.table_name), pluralize(childModel.title || childModel.table_name),
); );
await Column.insert<LinksColumn>({ await Column.insert<LinkToAnotherRecordColumn>({
uidt: UITypes.Links, uidt: UITypes.Links,
title, title,
fk_model_id: parentModel.id, fk_model_id: parentModel.id,
@ -1091,6 +857,11 @@ export class MetaDiffsService {
fk_parent_column_id: parentCol.id, fk_parent_column_id: parentCol.id,
fk_child_column_id: childCol.id, fk_child_column_id: childCol.id,
virtual: false, virtual: false,
fk_index_name: change.cstn,
meta: {
plural: pluralize(childModel.title),
singular: singularize(childModel.title),
},
}); });
} }
}); });
@ -1104,6 +875,26 @@ export class MetaDiffsService {
// populate m2m relations // populate m2m relations
await this.extractAndGenerateManyToManyRelations(await base.getModels()); await this.extractAndGenerateManyToManyRelations(await base.getModels());
}
async metaDiffSync(param: { projectId: string }) {
const project = await Project.getWithInfo(param.projectId);
for (const base of project.bases) {
await this.syncBaseMeta(project, base);
}
this.appHooksService.emit(AppEvents.META_DIFF_SYNC, {
project,
});
return true;
}
async baseMetaDiffSync(param: { projectId: string; baseId: string }) {
const project = await Project.getWithInfo(param.projectId);
const base = await Base.get(param.baseId);
await this.syncBaseMeta(project, base, true);
this.appHooksService.emit(AppEvents.META_DIFF_SYNC, { this.appHooksService.emit(AppEvents.META_DIFF_SYNC, {
project, project,

Loading…
Cancel
Save