Browse Source

refactor: sync api

Signed-off-by: Pranav C <pranavxc@gmail.com>
pull/5239/head
Pranav C 2 years ago
parent
commit
ed939174fe
  1. 142
      packages/nocodb/src/lib/controllers/syncController/importApis.ts
  2. 69
      packages/nocodb/src/lib/controllers/syncController/index.ts
  3. 462
      packages/nocodb/src/lib/controllers/userController/index.ts
  4. 462
      packages/nocodb/src/lib/controllers/userController/userApis.ts
  5. 1
      packages/nocodb/src/lib/services/index.ts
  6. 222
      packages/nocodb/src/lib/services/syncService/helpers/EntityMap.ts
  7. 6
      packages/nocodb/src/lib/services/syncService/helpers/NocoSyncDestAdapter.ts
  8. 7
      packages/nocodb/src/lib/services/syncService/helpers/NocoSyncSourceAdapter.ts
  9. 238
      packages/nocodb/src/lib/services/syncService/helpers/fetchAT.ts
  10. 2430
      packages/nocodb/src/lib/services/syncService/helpers/job.ts
  11. 338
      packages/nocodb/src/lib/services/syncService/helpers/readAndProcessData.ts
  12. 31
      packages/nocodb/src/lib/services/syncService/helpers/syncMap.ts
  13. 47
      packages/nocodb/src/lib/services/syncService/index.ts

142
packages/nocodb/src/lib/controllers/syncController/importApis.ts

@ -0,0 +1,142 @@
import { Request, Router } from 'express';
// import { Queue } from 'bullmq';
// import axios from 'axios';
import catchError, { NcError } from '../../meta/helpers/catchError';
import { Server } from 'socket.io';
import NocoJobs from '../../jobs/NocoJobs';
import { SyncSource } from '../../models';
import Noco from '../../Noco';
import { syncService, userService } from '../../services';
import { AirtableSyncConfig } from '../../services/syncService/helpers/job';
const AIRTABLE_IMPORT_JOB = 'AIRTABLE_IMPORT_JOB';
const AIRTABLE_PROGRESS_JOB = 'AIRTABLE_PROGRESS_JOB';
enum SyncStatus {
PROGRESS = 'PROGRESS',
COMPLETED = 'COMPLETED',
FAILED = 'FAILED',
}
export default (
router: Router,
sv: Server,
jobs: { [id: string]: { last_message: any } }
) => {
// add importer job handler and progress notification job handler
NocoJobs.jobsMgr.addJobWorker(
AIRTABLE_IMPORT_JOB,
syncService.airtableImportJob
);
NocoJobs.jobsMgr.addJobWorker(
AIRTABLE_PROGRESS_JOB,
({ payload, progress }) => {
sv.to(payload?.id).emit('progress', {
msg: progress?.msg,
level: progress?.level,
status: progress?.status,
});
if (payload?.id in jobs) {
jobs[payload?.id].last_message = {
msg: progress?.msg,
level: progress?.level,
status: progress?.status,
};
}
}
);
NocoJobs.jobsMgr.addProgressCbk(AIRTABLE_IMPORT_JOB, (payload, progress) => {
NocoJobs.jobsMgr.add(AIRTABLE_PROGRESS_JOB, {
payload,
progress: {
msg: progress?.msg,
level: progress?.level,
status: progress?.status,
},
});
});
NocoJobs.jobsMgr.addSuccessCbk(AIRTABLE_IMPORT_JOB, (payload) => {
NocoJobs.jobsMgr.add(AIRTABLE_PROGRESS_JOB, {
payload,
progress: {
msg: 'Complete!',
status: SyncStatus.COMPLETED,
},
});
delete jobs[payload?.id];
});
NocoJobs.jobsMgr.addFailureCbk(AIRTABLE_IMPORT_JOB, (payload, error: any) => {
NocoJobs.jobsMgr.add(AIRTABLE_PROGRESS_JOB, {
payload,
progress: {
msg: error?.message || 'Failed due to some internal error',
status: SyncStatus.FAILED,
},
});
delete jobs[payload?.id];
});
router.post(
'/api/v1/db/meta/import/airtable',
catchError((req, res) => {
NocoJobs.jobsMgr.add(AIRTABLE_IMPORT_JOB, {
id: req.query.id,
...req.body,
});
res.json({});
})
);
router.post(
'/api/v1/db/meta/syncs/:syncId/trigger',
catchError(async (req: Request, res) => {
if (req.params.syncId in jobs) {
NcError.badRequest('Sync already in progress');
}
const syncSource = await SyncSource.get(req.params.syncId);
const user = await syncSource.getUser();
const token = userService.genJwt(user, Noco.getConfig());
// Treat default baseUrl as siteUrl from req object
let baseURL = (req as any).ncSiteUrl;
// if environment value avail use it
// or if it's docker construct using `PORT`
if (process.env.NC_BASEURL_INTERNAL) {
baseURL = process.env.NC_BASEURL_INTERNAL;
} else if (process.env.NC_DOCKER) {
baseURL = `http://localhost:${process.env.PORT || 8080}`;
}
setTimeout(() => {
NocoJobs.jobsMgr.add<AirtableSyncConfig>(AIRTABLE_IMPORT_JOB, {
id: req.params.syncId,
...(syncSource?.details || {}),
projectId: syncSource.project_id,
baseId: syncSource.base_id,
authToken: token,
baseURL,
});
}, 1000);
jobs[req.params.syncId] = {
last_message: {
msg: 'Sync started',
},
};
res.json({});
})
);
router.post(
'/api/v1/db/meta/syncs/:syncId/abort',
catchError(async (req: Request, res) => {
if (req.params.syncId in jobs) {
delete jobs[req.params.syncId];
}
res.json({});
})
);
};

69
packages/nocodb/src/lib/controllers/syncController/index.ts

@ -0,0 +1,69 @@
import { Request, Response, Router } from 'express';
import ncMetaAclMw from '../../meta/helpers/ncMetaAclMw';
import { syncService } from '../../services';
export async function syncSourceList(req: Request, res: Response) {
// todo: pagination
res.json(
syncService.syncSourceList({
projectId: req.params.projectId,
})
);
}
export async function syncCreate(req: Request, res: Response) {
res.json(
await syncService.syncCreate({
projectId: req.params.projectId,
baseId: req.params.baseId,
userId: (req as any).user.id,
syncPayload: req.body,
})
);
}
export async function syncDelete(req: Request, res: Response<any>) {
res.json(
await syncService.syncDelete({
syncId: req.params.syncId,
})
);
}
export async function syncUpdate(req: Request, res: Response) {
res.json(
await syncService.syncUpdate({
syncId: req.params.syncId,
syncPayload: req.body,
})
);
}
const router = Router({ mergeParams: true });
router.get(
'/api/v1/db/meta/projects/:projectId/syncs',
ncMetaAclMw(syncSourceList, 'syncSourceList')
);
router.post(
'/api/v1/db/meta/projects/:projectId/syncs',
ncMetaAclMw(syncCreate, 'syncSourceCreate')
);
router.get(
'/api/v1/db/meta/projects/:projectId/syncs/:baseId',
ncMetaAclMw(syncSourceList, 'syncSourceList')
);
router.post(
'/api/v1/db/meta/projects/:projectId/syncs/:baseId',
ncMetaAclMw(syncCreate, 'syncSourceCreate')
);
router.delete(
'/api/v1/db/meta/syncs/:syncId',
ncMetaAclMw(syncDelete, 'syncSourceDelete')
);
router.patch(
'/api/v1/db/meta/syncs/:syncId',
ncMetaAclMw(syncUpdate, 'syncSourceUpdate')
);
export default router;

462
packages/nocodb/src/lib/controllers/userController/index.ts

@ -1 +1,461 @@
export * from './userApis';
import { Request, Response } from 'express';
import { TableType, validatePassword } from 'nocodb-sdk';
import { Tele } from 'nc-help';
const { isEmail } = require('validator');
import * as ejs from 'ejs';
import bcrypt from 'bcryptjs';
import { promisify } from 'util';
const { v4: uuidv4 } = require('uuid');
import passport from 'passport';
import { getAjvValidatorMw } from '../../meta/api/helpers';
import catchError, { NcError } from '../../meta/helpers/catchError';
import extractProjectIdAndAuthenticate from '../../meta/helpers/extractProjectIdAndAuthenticate';
import ncMetaAclMw from '../../meta/helpers/ncMetaAclMw';
import NcPluginMgrv2 from '../../meta/helpers/NcPluginMgrv2';
import { Audit, User } from '../../models';
import Noco from '../../Noco';
import { userService } from '../../services';
export async function signup(req: Request, res: Response<TableType>) {
const {
email: _email,
firstname,
lastname,
token,
ignore_subscribe,
} = req.body;
let { password } = req.body;
// validate password and throw error if password is satisfying the conditions
const { valid, error } = validatePassword(password);
if (!valid) {
NcError.badRequest(`Password : ${error}`);
}
if (!isEmail(_email)) {
NcError.badRequest(`Invalid email`);
}
const email = _email.toLowerCase();
let user = await User.getByEmail(email);
if (user) {
if (token) {
if (token !== user.invite_token) {
NcError.badRequest(`Invalid invite url`);
} else if (user.invite_token_expires < new Date()) {
NcError.badRequest(
'Expired invite url, Please contact super admin to get a new invite url'
);
}
} else {
// todo : opening up signup for timebeing
// return next(new Error(`Email '${email}' already registered`));
}
}
const salt = await promisify(bcrypt.genSalt)(10);
password = await promisify(bcrypt.hash)(password, salt);
const email_verification_token = uuidv4();
if (!ignore_subscribe) {
Tele.emit('evt_subscribe', email);
}
if (user) {
if (token) {
await User.update(user.id, {
firstname,
lastname,
salt,
password,
email_verification_token,
invite_token: null,
invite_token_expires: null,
email: user.email,
});
} else {
NcError.badRequest('User already exist');
}
} else {
await userService.registerNewUserIfAllowed({
firstname,
lastname,
email,
salt,
password,
email_verification_token,
});
}
user = await User.getByEmail(email);
try {
const template = (await import('./ui/emailTemplates/verify')).default;
await (
await NcPluginMgrv2.emailAdapter()
).mailSend({
to: email,
subject: 'Verify email',
html: ejs.render(template, {
verifyLink:
(req as any).ncSiteUrl +
`/email/verify/${user.email_verification_token}`,
}),
});
} catch (e) {
console.log(
'Warning : `mailSend` failed, Please configure emailClient configuration.'
);
}
await promisify((req as any).login.bind(req))(user);
const refreshToken = userService.randomTokenString();
await User.update(user.id, {
refresh_token: refreshToken,
email: user.email,
});
setTokenCookie(res, refreshToken);
user = (req as any).user;
await Audit.insert({
op_type: 'AUTHENTICATION',
op_sub_type: 'SIGNUP',
user: user.email,
description: `signed up `,
ip: (req as any).clientIp,
});
res.json({
token: userService.genJwt(user, Noco.getConfig()),
} as any);
}
async function successfulSignIn({
user,
err,
info,
req,
res,
auditDescription,
}) {
try {
if (!user || !user.email) {
if (err) {
return res.status(400).send(err);
}
if (info) {
return res.status(400).send(info);
}
return res.status(400).send({ msg: 'Your signin has failed' });
}
await promisify((req as any).login.bind(req))(user);
const refreshToken = userService.randomTokenString();
if (!user.token_version) {
user.token_version = userService.randomTokenString();
}
await User.update(user.id, {
refresh_token: refreshToken,
email: user.email,
token_version: user.token_version,
});
setTokenCookie(res, refreshToken);
await Audit.insert({
op_type: 'AUTHENTICATION',
op_sub_type: 'SIGNIN',
user: user.email,
ip: req.clientIp,
description: auditDescription,
});
res.json({
token: userService.genJwt(user, Noco.getConfig()),
} as any);
} catch (e) {
console.log(e);
throw e;
}
}
async function signin(req, res, next) {
passport.authenticate(
'local',
{ session: false },
async (err, user, info): Promise<any> =>
await successfulSignIn({
user,
err,
info,
req,
res,
auditDescription: 'signed in',
})
)(req, res, next);
}
async function googleSignin(req, res, next) {
passport.authenticate(
'google',
{
session: false,
callbackURL: req.ncSiteUrl + Noco.getConfig().dashboardPath,
},
async (err, user, info): Promise<any> =>
await successfulSignIn({
user,
err,
info,
req,
res,
auditDescription: 'signed in using Google Auth',
})
)(req, res, next);
}
function setTokenCookie(res: Response, token): void {
// create http only cookie with refresh token that expires in 7 days
const cookieOptions = {
httpOnly: true,
expires: new Date(Date.now() + 7 * 24 * 60 * 60 * 1000),
};
res.cookie('refresh_token', token, cookieOptions);
}
async function me(req, res): Promise<any> {
res.json(req?.session?.passport?.user ?? {});
}
async function passwordChange(req: Request<any, any>, res): Promise<any> {
if (!(req as any).isAuthenticated()) {
NcError.forbidden('Not allowed');
}
await userService.passwordChange({
user: req['user'],
req,
body: req.body,
});
res.json({ msg: 'Password updated successfully' });
}
async function passwordForgot(req: Request<any, any>, res): Promise<any> {
await userService.passwordForgot({
siteUrl: (req as any).ncSiteUrl,
body: req.body,
req,
});
res.json({ msg: 'Please check your email to reset the password' });
}
async function tokenValidate(req, res): Promise<any> {
await userService.tokenValidate({
token: req.params.tokenId,
});
res.json(true);
}
async function passwordReset(req, res): Promise<any> {
await userService.passwordReset({
token: req.params.tokenId,
body: req.body,
req,
});
res.json({ msg: 'Password reset successful' });
}
async function emailVerification(req, res): Promise<any> {
await userService.emailVerification({
token: req.params.tokenId,
req,
});
res.json({ msg: 'Email verified successfully' });
}
async function refreshToken(req, res): Promise<any> {
try {
if (!req?.cookies?.refresh_token) {
return res.status(400).json({ msg: 'Missing refresh token' });
}
const user = await User.getByRefreshToken(req.cookies.refresh_token);
if (!user) {
return res.status(400).json({ msg: 'Invalid refresh token' });
}
const refreshToken = userService.randomTokenString();
await User.update(user.id, {
email: user.email,
refresh_token: refreshToken,
});
setTokenCookie(res, refreshToken);
res.json({
token: userService.genJwt(user, Noco.getConfig()),
} as any);
} catch (e) {
return res.status(400).json({ msg: e.message });
}
}
async function renderPasswordReset(req, res): Promise<any> {
try {
res.send(
ejs.render((await import('./ui/auth/resetPassword')).default, {
ncPublicUrl: process.env.NC_PUBLIC_URL || '',
token: JSON.stringify(req.params.tokenId),
baseUrl: `/`,
})
);
} catch (e) {
return res.status(400).json({ msg: e.message });
}
}
const mapRoutes = (router) => {
// todo: old api - /auth/signup?tool=1
router.post(
'/auth/user/signup',
getAjvValidatorMw('swagger.json#/components/schemas/SignUpReq'),
catchError(signup)
);
router.post(
'/auth/user/signin',
getAjvValidatorMw('swagger.json#/components/schemas/SignInReq'),
catchError(signin)
);
router.get('/auth/user/me', extractProjectIdAndAuthenticate, catchError(me));
router.post(
'/auth/password/forgot',
getAjvValidatorMw('swagger.json#/components/schemas/ForgotPasswordReq'),
catchError(passwordForgot)
);
router.post('/auth/token/validate/:tokenId', catchError(tokenValidate));
router.post(
'/auth/password/reset/:tokenId',
getAjvValidatorMw('swagger.json#/components/schemas/PasswordResetReq'),
catchError(passwordReset)
);
router.post('/auth/email/validate/:tokenId', catchError(emailVerification));
router.post(
'/user/password/change',
getAjvValidatorMw('swagger.json#/components/schemas/PasswordChangeReq'),
ncMetaAclMw(passwordChange, 'passwordChange')
);
router.post('/auth/token/refresh', catchError(refreshToken));
/* Google auth apis */
router.post(`/auth/google/genTokenByCode`, catchError(googleSignin));
router.get('/auth/google', (req: any, res, next) =>
passport.authenticate('google', {
scope: ['profile', 'email'],
state: req.query.state,
callbackURL: req.ncSiteUrl + Noco.getConfig().dashboardPath,
})(req, res, next)
);
// deprecated APIs
router.post(
'/api/v1/db/auth/user/signup',
getAjvValidatorMw('swagger.json#/components/schemas/SignUpReq'),
catchError(signup)
);
router.post(
'/api/v1/db/auth/user/signin',
getAjvValidatorMw('swagger.json#/components/schemas/SignInReq'),
catchError(signin)
);
router.get(
'/api/v1/db/auth/user/me',
extractProjectIdAndAuthenticate,
catchError(me)
);
router.post(
'/api/v1/db/auth/password/forgot',
getAjvValidatorMw('swagger.json#/components/schemas/ForgotPasswordReq'),
catchError(passwordForgot)
);
router.post(
'/api/v1/db/auth/token/validate/:tokenId',
catchError(tokenValidate)
);
router.post(
'/api/v1/db/auth/password/reset/:tokenId',
getAjvValidatorMw('swagger.json#/components/schemas/PasswordResetReq'),
catchError(passwordReset)
);
router.post(
'/api/v1/db/auth/email/validate/:tokenId',
catchError(emailVerification)
);
router.post(
'/api/v1/db/auth/password/change',
getAjvValidatorMw('swagger.json#/components/schemas/PasswordChangeReq'),
ncMetaAclMw(passwordChange, 'passwordChange')
);
router.post('/api/v1/db/auth/token/refresh', catchError(refreshToken));
router.get(
'/api/v1/db/auth/password/reset/:tokenId',
catchError(renderPasswordReset)
);
// new API
router.post(
'/api/v1/auth/user/signup',
getAjvValidatorMw('swagger.json#/components/schemas/SignUpReq'),
catchError(signup)
);
router.post(
'/api/v1/auth/user/signin',
getAjvValidatorMw('swagger.json#/components/schemas/SignInReq'),
catchError(signin)
);
router.get(
'/api/v1/auth/user/me',
extractProjectIdAndAuthenticate,
catchError(me)
);
router.post(
'/api/v1/auth/password/forgot',
getAjvValidatorMw('swagger.json#/components/schemas/ForgotPasswordReq'),
catchError(passwordForgot)
);
router.post(
'/api/v1/auth/token/validate/:tokenId',
catchError(tokenValidate)
);
router.post(
'/api/v1/auth/password/reset/:tokenId',
catchError(passwordReset)
);
router.post(
'/api/v1/auth/email/validate/:tokenId',
catchError(emailVerification)
);
router.post(
'/api/v1/auth/password/change',
getAjvValidatorMw('swagger.json#/components/schemas/PasswordChangeReq'),
ncMetaAclMw(passwordChange, 'passwordChange')
);
router.post('/api/v1/auth/token/refresh', catchError(refreshToken));
// respond with password reset page
router.get('/auth/password/reset/:tokenId', catchError(renderPasswordReset));
};
export { mapRoutes as userApis };

462
packages/nocodb/src/lib/controllers/userController/userApis.ts

@ -1,462 +0,0 @@
import { Request, Response } from 'express';
import { TableType, validatePassword } from 'nocodb-sdk';
import { Tele } from 'nc-help';
const { isEmail } = require('validator');
import * as ejs from 'ejs';
import bcrypt from 'bcryptjs';
import { promisify } from 'util';
const { v4: uuidv4 } = require('uuid');
import passport from 'passport';
import { getAjvValidatorMw } from '../../meta/api/helpers';
import catchError, { NcError } from '../../meta/helpers/catchError';
import extractProjectIdAndAuthenticate from '../../meta/helpers/extractProjectIdAndAuthenticate';
import ncMetaAclMw from '../../meta/helpers/ncMetaAclMw';
import NcPluginMgrv2 from '../../meta/helpers/NcPluginMgrv2';
import { Audit, User } from '../../models';
import Noco from '../../Noco';
import { genJwt } from './helpers';
import { userService } from '../../services';
export async function signup(req: Request, res: Response<TableType>) {
const {
email: _email,
firstname,
lastname,
token,
ignore_subscribe,
} = req.body;
let { password } = req.body;
// validate password and throw error if password is satisfying the conditions
const { valid, error } = validatePassword(password);
if (!valid) {
NcError.badRequest(`Password : ${error}`);
}
if (!isEmail(_email)) {
NcError.badRequest(`Invalid email`);
}
const email = _email.toLowerCase();
let user = await User.getByEmail(email);
if (user) {
if (token) {
if (token !== user.invite_token) {
NcError.badRequest(`Invalid invite url`);
} else if (user.invite_token_expires < new Date()) {
NcError.badRequest(
'Expired invite url, Please contact super admin to get a new invite url'
);
}
} else {
// todo : opening up signup for timebeing
// return next(new Error(`Email '${email}' already registered`));
}
}
const salt = await promisify(bcrypt.genSalt)(10);
password = await promisify(bcrypt.hash)(password, salt);
const email_verification_token = uuidv4();
if (!ignore_subscribe) {
Tele.emit('evt_subscribe', email);
}
if (user) {
if (token) {
await User.update(user.id, {
firstname,
lastname,
salt,
password,
email_verification_token,
invite_token: null,
invite_token_expires: null,
email: user.email,
});
} else {
NcError.badRequest('User already exist');
}
} else {
await userService.registerNewUserIfAllowed({
firstname,
lastname,
email,
salt,
password,
email_verification_token,
});
}
user = await User.getByEmail(email);
try {
const template = (await import('./ui/emailTemplates/verify')).default;
await (
await NcPluginMgrv2.emailAdapter()
).mailSend({
to: email,
subject: 'Verify email',
html: ejs.render(template, {
verifyLink:
(req as any).ncSiteUrl +
`/email/verify/${user.email_verification_token}`,
}),
});
} catch (e) {
console.log(
'Warning : `mailSend` failed, Please configure emailClient configuration.'
);
}
await promisify((req as any).login.bind(req))(user);
const refreshToken = userService.randomTokenString();
await User.update(user.id, {
refresh_token: refreshToken,
email: user.email,
});
setTokenCookie(res, refreshToken);
user = (req as any).user;
await Audit.insert({
op_type: 'AUTHENTICATION',
op_sub_type: 'SIGNUP',
user: user.email,
description: `signed up `,
ip: (req as any).clientIp,
});
res.json({
token: genJwt(user, Noco.getConfig()),
} as any);
}
async function successfulSignIn({
user,
err,
info,
req,
res,
auditDescription,
}) {
try {
if (!user || !user.email) {
if (err) {
return res.status(400).send(err);
}
if (info) {
return res.status(400).send(info);
}
return res.status(400).send({ msg: 'Your signin has failed' });
}
await promisify((req as any).login.bind(req))(user);
const refreshToken = userService.randomTokenString();
if (!user.token_version) {
user.token_version = userService.randomTokenString();
}
await User.update(user.id, {
refresh_token: refreshToken,
email: user.email,
token_version: user.token_version,
});
setTokenCookie(res, refreshToken);
await Audit.insert({
op_type: 'AUTHENTICATION',
op_sub_type: 'SIGNIN',
user: user.email,
ip: req.clientIp,
description: auditDescription,
});
res.json({
token: genJwt(user, Noco.getConfig()),
} as any);
} catch (e) {
console.log(e);
throw e;
}
}
async function signin(req, res, next) {
passport.authenticate(
'local',
{ session: false },
async (err, user, info): Promise<any> =>
await successfulSignIn({
user,
err,
info,
req,
res,
auditDescription: 'signed in',
})
)(req, res, next);
}
async function googleSignin(req, res, next) {
passport.authenticate(
'google',
{
session: false,
callbackURL: req.ncSiteUrl + Noco.getConfig().dashboardPath,
},
async (err, user, info): Promise<any> =>
await successfulSignIn({
user,
err,
info,
req,
res,
auditDescription: 'signed in using Google Auth',
})
)(req, res, next);
}
function setTokenCookie(res: Response, token): void {
// create http only cookie with refresh token that expires in 7 days
const cookieOptions = {
httpOnly: true,
expires: new Date(Date.now() + 7 * 24 * 60 * 60 * 1000),
};
res.cookie('refresh_token', token, cookieOptions);
}
async function me(req, res): Promise<any> {
res.json(req?.session?.passport?.user ?? {});
}
async function passwordChange(req: Request<any, any>, res): Promise<any> {
if (!(req as any).isAuthenticated()) {
NcError.forbidden('Not allowed');
}
await userService.passwordChange({
user: req['user'],
req,
body: req.body,
});
res.json({ msg: 'Password updated successfully' });
}
async function passwordForgot(req: Request<any, any>, res): Promise<any> {
await userService.passwordForgot({
siteUrl: (req as any).ncSiteUrl,
body: req.body,
req,
});
res.json({ msg: 'Please check your email to reset the password' });
}
async function tokenValidate(req, res): Promise<any> {
await userService.tokenValidate({
token: req.params.tokenId,
});
res.json(true);
}
async function passwordReset(req, res): Promise<any> {
await userService.passwordReset({
token: req.params.tokenId,
body: req.body,
req,
});
res.json({ msg: 'Password reset successful' });
}
async function emailVerification(req, res): Promise<any> {
await userService.emailVerification({
token: req.params.tokenId,
req,
});
res.json({ msg: 'Email verified successfully' });
}
async function refreshToken(req, res): Promise<any> {
try {
if (!req?.cookies?.refresh_token) {
return res.status(400).json({ msg: 'Missing refresh token' });
}
const user = await User.getByRefreshToken(req.cookies.refresh_token);
if (!user) {
return res.status(400).json({ msg: 'Invalid refresh token' });
}
const refreshToken = userService.randomTokenString();
await User.update(user.id, {
email: user.email,
refresh_token: refreshToken,
});
setTokenCookie(res, refreshToken);
res.json({
token: genJwt(user, Noco.getConfig()),
} as any);
} catch (e) {
return res.status(400).json({ msg: e.message });
}
}
async function renderPasswordReset(req, res): Promise<any> {
try {
res.send(
ejs.render((await import('./ui/auth/resetPassword')).default, {
ncPublicUrl: process.env.NC_PUBLIC_URL || '',
token: JSON.stringify(req.params.tokenId),
baseUrl: `/`,
})
);
} catch (e) {
return res.status(400).json({ msg: e.message });
}
}
const mapRoutes = (router) => {
// todo: old api - /auth/signup?tool=1
router.post(
'/auth/user/signup',
getAjvValidatorMw('swagger.json#/components/schemas/SignUpReq'),
catchError(signup)
);
router.post(
'/auth/user/signin',
getAjvValidatorMw('swagger.json#/components/schemas/SignInReq'),
catchError(signin)
);
router.get('/auth/user/me', extractProjectIdAndAuthenticate, catchError(me));
router.post(
'/auth/password/forgot',
getAjvValidatorMw('swagger.json#/components/schemas/ForgotPasswordReq'),
catchError(passwordForgot)
);
router.post('/auth/token/validate/:tokenId', catchError(tokenValidate));
router.post(
'/auth/password/reset/:tokenId',
getAjvValidatorMw('swagger.json#/components/schemas/PasswordResetReq'),
catchError(passwordReset)
);
router.post('/auth/email/validate/:tokenId', catchError(emailVerification));
router.post(
'/user/password/change',
getAjvValidatorMw('swagger.json#/components/schemas/PasswordChangeReq'),
ncMetaAclMw(passwordChange, 'passwordChange')
);
router.post('/auth/token/refresh', catchError(refreshToken));
/* Google auth apis */
router.post(`/auth/google/genTokenByCode`, catchError(googleSignin));
router.get('/auth/google', (req: any, res, next) =>
passport.authenticate('google', {
scope: ['profile', 'email'],
state: req.query.state,
callbackURL: req.ncSiteUrl + Noco.getConfig().dashboardPath,
})(req, res, next)
);
// deprecated APIs
router.post(
'/api/v1/db/auth/user/signup',
getAjvValidatorMw('swagger.json#/components/schemas/SignUpReq'),
catchError(signup)
);
router.post(
'/api/v1/db/auth/user/signin',
getAjvValidatorMw('swagger.json#/components/schemas/SignInReq'),
catchError(signin)
);
router.get(
'/api/v1/db/auth/user/me',
extractProjectIdAndAuthenticate,
catchError(me)
);
router.post(
'/api/v1/db/auth/password/forgot',
getAjvValidatorMw('swagger.json#/components/schemas/ForgotPasswordReq'),
catchError(passwordForgot)
);
router.post(
'/api/v1/db/auth/token/validate/:tokenId',
catchError(tokenValidate)
);
router.post(
'/api/v1/db/auth/password/reset/:tokenId',
getAjvValidatorMw('swagger.json#/components/schemas/PasswordResetReq'),
catchError(passwordReset)
);
router.post(
'/api/v1/db/auth/email/validate/:tokenId',
catchError(emailVerification)
);
router.post(
'/api/v1/db/auth/password/change',
getAjvValidatorMw('swagger.json#/components/schemas/PasswordChangeReq'),
ncMetaAclMw(passwordChange, 'passwordChange')
);
router.post('/api/v1/db/auth/token/refresh', catchError(refreshToken));
router.get(
'/api/v1/db/auth/password/reset/:tokenId',
catchError(renderPasswordReset)
);
// new API
router.post(
'/api/v1/auth/user/signup',
getAjvValidatorMw('swagger.json#/components/schemas/SignUpReq'),
catchError(signup)
);
router.post(
'/api/v1/auth/user/signin',
getAjvValidatorMw('swagger.json#/components/schemas/SignInReq'),
catchError(signin)
);
router.get(
'/api/v1/auth/user/me',
extractProjectIdAndAuthenticate,
catchError(me)
);
router.post(
'/api/v1/auth/password/forgot',
getAjvValidatorMw('swagger.json#/components/schemas/ForgotPasswordReq'),
catchError(passwordForgot)
);
router.post(
'/api/v1/auth/token/validate/:tokenId',
catchError(tokenValidate)
);
router.post(
'/api/v1/auth/password/reset/:tokenId',
catchError(passwordReset)
);
router.post(
'/api/v1/auth/email/validate/:tokenId',
catchError(emailVerification)
);
router.post(
'/api/v1/auth/password/change',
getAjvValidatorMw('swagger.json#/components/schemas/PasswordChangeReq'),
ncMetaAclMw(passwordChange, 'passwordChange')
);
router.post('/api/v1/auth/token/refresh', catchError(refreshToken));
// respond with password reset page
router.get('/auth/password/reset/:tokenId', catchError(renderPasswordReset));
};
export { mapRoutes as userApis };

1
packages/nocodb/src/lib/services/index.ts

@ -32,3 +32,4 @@ export * as cacheService from './cacheService';
export * as auditService from './auditService';
export * as swaggerService from './swaggerService';
export * as userService from './userService';
export * as syncService from './syncService';

222
packages/nocodb/src/lib/services/syncService/helpers/EntityMap.ts

@ -0,0 +1,222 @@
import sqlite3 from 'sqlite3';
import { Readable } from 'stream';
class EntityMap {
initialized: boolean;
cols: string[];
db: any;
constructor(...args) {
this.initialized = false;
this.cols = args.map((arg) => processKey(arg));
this.db = new Promise((resolve, reject) => {
const db = new sqlite3.Database(':memory:');
const colStatement =
this.cols.length > 0
? this.cols.join(' TEXT, ') + ' TEXT'
: 'mappingPlaceholder TEXT';
db.run(`CREATE TABLE mapping (${colStatement})`, (err) => {
if (err) {
console.log(err);
reject(err);
}
resolve(db);
});
});
}
async init() {
if (!this.initialized) {
this.db = await this.db;
this.initialized = true;
}
}
destroy() {
if (this.initialized && this.db) {
this.db.close();
}
}
async addRow(row) {
if (!this.initialized) {
throw 'Please initialize first!';
}
const cols = Object.keys(row).map((key) => processKey(key));
const colStatement = cols.map((key) => `'${key}'`).join(', ');
const questionMarks = cols.map(() => '?').join(', ');
const promises = [];
for (const col of cols.filter((col) => !this.cols.includes(col))) {
promises.push(
new Promise((resolve, reject) => {
this.db.run(`ALTER TABLE mapping ADD '${col}' TEXT;`, (err) => {
if (err) {
console.log(err);
reject(err);
}
this.cols.push(col);
resolve(true);
});
})
);
}
await Promise.all(promises);
const values = Object.values(row).map((val) => {
if (typeof val === 'object') {
return `JSON::${JSON.stringify(val)}`;
}
return val;
});
return new Promise((resolve, reject) => {
this.db.run(
`INSERT INTO mapping (${colStatement}) VALUES (${questionMarks})`,
values,
(err) => {
if (err) {
console.log(err);
reject(err);
}
resolve(true);
}
);
});
}
getRow(col, val, res = []): Promise<Record<string, any>> {
if (!this.initialized) {
throw 'Please initialize first!';
}
return new Promise((resolve, reject) => {
col = processKey(col);
res = res.map((r) => processKey(r));
this.db.get(
`SELECT ${
res.length ? res.join(', ') : '*'
} FROM mapping WHERE ${col} = ?`,
[val],
(err, rs) => {
if (err) {
console.log(err);
reject(err);
}
if (rs) {
rs = processResponseRow(rs);
}
resolve(rs);
}
);
});
}
getCount(): Promise<number> {
if (!this.initialized) {
throw 'Please initialize first!';
}
return new Promise((resolve, reject) => {
this.db.get(`SELECT COUNT(*) as count FROM mapping`, (err, rs) => {
if (err) {
console.log(err);
reject(err);
}
resolve(rs.count);
});
});
}
getStream(res = []): DBStream {
if (!this.initialized) {
throw 'Please initialize first!';
}
res = res.map((r) => processKey(r));
return new DBStream(
this.db,
`SELECT ${res.length ? res.join(', ') : '*'} FROM mapping`
);
}
getLimit(limit, offset, res = []): Promise<Record<string, any>[]> {
if (!this.initialized) {
throw 'Please initialize first!';
}
return new Promise((resolve, reject) => {
res = res.map((r) => processKey(r));
this.db.all(
`SELECT ${
res.length ? res.join(', ') : '*'
} FROM mapping LIMIT ${limit} OFFSET ${offset}`,
(err, rs) => {
if (err) {
console.log(err);
reject(err);
}
for (let row of rs) {
row = processResponseRow(row);
}
resolve(rs);
}
);
});
}
}
class DBStream extends Readable {
db: any;
stmt: any;
sql: any;
constructor(db, sql) {
super({ objectMode: true });
this.db = db;
this.sql = sql;
this.stmt = this.db.prepare(this.sql);
this.on('end', () => this.stmt.finalize());
}
_read() {
let stream = this;
this.stmt.get(function (err, result) {
if (err) {
stream.emit('error', err);
} else {
if (result) {
result = processResponseRow(result);
}
stream.push(result || null);
}
});
}
}
function processResponseRow(res: any) {
for (const key of Object.keys(res)) {
if (res[key] && res[key].startsWith('JSON::')) {
try {
res[key] = JSON.parse(res[key].replace('JSON::', ''));
} catch (e) {
console.log(e);
}
}
if (revertKey(key) !== key) {
res[revertKey(key)] = res[key];
delete res[key];
}
}
return res;
}
function processKey(key) {
return key.replace(/'/g, "''").replace(/[A-Z]/g, (match) => `_${match}`);
}
function revertKey(key) {
return key.replace(/''/g, "'").replace(/_[A-Z]/g, (match) => match[1]);
}
export default EntityMap;

6
packages/nocodb/src/lib/services/syncService/helpers/NocoSyncDestAdapter.ts

@ -0,0 +1,6 @@
export abstract class NocoSyncSourceAdapter {
public abstract init(): Promise<void>;
public abstract destProjectWrite(): Promise<any>;
public abstract destSchemaWrite(): Promise<any>;
public abstract destDataWrite(): Promise<any>;
}

7
packages/nocodb/src/lib/services/syncService/helpers/NocoSyncSourceAdapter.ts

@ -0,0 +1,7 @@
export abstract class NocoSyncSourceAdapter {
public abstract init(): Promise<void>;
public abstract srcSchemaGet(): Promise<any>;
public abstract srcDataLoad(): Promise<any>;
public abstract srcDataListen(): Promise<any>;
public abstract srcDataPoll(): Promise<any>;
}

238
packages/nocodb/src/lib/services/syncService/helpers/fetchAT.ts

@ -0,0 +1,238 @@
const axios = require('axios').default;
const info: any = {
initialized: false,
};
async function initialize(shareId) {
info.cookie = '';
const url = `https://airtable.com/${shareId}`;
try {
const hreq = await axios
.get(url, {
headers: {
accept:
'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
'accept-language': 'en-US,en;q=0.9',
'sec-ch-ua':
'" Not A;Brand";v="99", "Chromium";v="100", "Google Chrome";v="100"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Linux"',
'sec-fetch-dest': 'document',
'sec-fetch-mode': 'navigate',
'sec-fetch-site': 'none',
'sec-fetch-user': '?1',
'upgrade-insecure-requests': '1',
'User-Agent':
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.4896.88 Safari/537.36',
},
referrerPolicy: 'strict-origin-when-cross-origin',
body: null,
method: 'GET',
})
.then((response) => {
for (const ck of response.headers['set-cookie']) {
info.cookie += ck.split(';')[0] + '; ';
}
return response.data;
})
.catch(() => {
throw {
message:
'Invalid Shared Base ID :: Ensure www.airtable.com/<SharedBaseID> is accessible. Refer https://bit.ly/3x0OdXI for details',
};
});
info.headers = JSON.parse(
hreq.match(/(?<=var headers =)(.*)(?=;)/g)[0].trim()
);
info.link = unicodeToChar(hreq.match(/(?<=fetch\(")(.*)(?=")/g)[0].trim());
info.baseInfo = decodeURIComponent(info.link)
.match(/{(.*)}/g)[0]
.split('&')
.reduce((result, el) => {
try {
return Object.assign(
result,
JSON.parse(el.includes('=') ? el.split('=')[1] : el)
);
} catch (e) {
if (el.includes('=')) {
return Object.assign(result, {
[el.split('=')[0]]: el.split('=')[1],
});
}
}
}, {});
info.baseId = info.baseInfo.applicationId;
info.initialized = true;
} catch (e) {
console.log(e);
info.initialized = false;
if (e.message) {
throw e;
} else {
throw {
message:
'Error processing Shared Base :: Ensure www.airtable.com/<SharedBaseID> is accessible. Refer https://bit.ly/3x0OdXI for details',
};
}
}
}
async function read() {
if (info.initialized) {
const resreq = await axios('https://airtable.com' + info.link, {
headers: {
accept: '*/*',
'accept-language': 'en-US,en;q=0.9',
'sec-ch-ua':
'" Not A;Brand";v="99", "Chromium";v="100", "Google Chrome";v="100"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Linux"',
'sec-fetch-dest': 'empty',
'sec-fetch-mode': 'cors',
'sec-fetch-site': 'same-origin',
'User-Agent':
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.4896.88 Safari/537.36',
'x-time-zone': 'Europe/Berlin',
cookie: info.cookie,
...info.headers,
},
referrerPolicy: 'no-referrer',
body: null,
method: 'GET',
})
.then((response) => {
return response.data;
})
.catch(() => {
throw {
message:
'Error Reading :: Ensure www.airtable.com/<SharedBaseID> is accessible. Refer https://bit.ly/3x0OdXI for details',
};
});
return {
schema: resreq.data,
baseId: info.baseId,
baseInfo: info.baseInfo,
};
} else {
throw {
message: 'Error Initializing :: please try again !!',
};
}
}
async function readView(viewId) {
if (info.initialized) {
const resreq = await axios(
`https://airtable.com/v0.3/view/${viewId}/readData?` +
`stringifiedObjectParams=${encodeURIComponent('{}')}&requestId=${
info.baseInfo.requestId
}&accessPolicy=${encodeURIComponent(
JSON.stringify({
allowedActions: info.baseInfo.allowedActions,
shareId: info.baseInfo.shareId,
applicationId: info.baseInfo.applicationId,
generationNumber: info.baseInfo.generationNumber,
expires: info.baseInfo.expires,
signature: info.baseInfo.signature,
})
)}`,
{
headers: {
accept: '*/*',
'accept-language': 'en-US,en;q=0.9',
'sec-ch-ua':
'" Not A;Brand";v="99", "Chromium";v="100", "Google Chrome";v="100"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Linux"',
'sec-fetch-dest': 'empty',
'sec-fetch-mode': 'cors',
'sec-fetch-site': 'same-origin',
'User-Agent':
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.4896.88 Safari/537.36',
'x-time-zone': 'Europe/Berlin',
cookie: info.cookie,
...info.headers,
},
referrerPolicy: 'no-referrer',
body: null,
method: 'GET',
}
)
.then((response) => {
return response.data;
})
.catch(() => {
throw {
message:
'Error Reading View :: Ensure www.airtable.com/<SharedBaseID> is accessible. Refer https://bit.ly/3x0OdXI for details',
};
});
return { view: resreq.data };
} else {
throw {
message: 'Error Initializing :: please try again !!',
};
}
}
async function readTemplate(templateId) {
if (!info.initialized) {
await initialize('shrO8aYf3ybwSdDKn');
}
const resreq = await axios(
`https://www.airtable.com/v0.3/exploreApplications/${templateId}`,
{
headers: {
accept: '*/*',
'accept-language': 'en-US,en;q=0.9',
'sec-ch-ua':
'" Not A;Brand";v="99", "Chromium";v="100", "Google Chrome";v="100"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Linux"',
'sec-fetch-dest': 'empty',
'sec-fetch-mode': 'cors',
'sec-fetch-site': 'same-origin',
'User-Agent':
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.4896.88 Safari/537.36',
'x-time-zone': 'Europe/Berlin',
cookie: info.cookie,
...info.headers,
},
referrer: 'https://www.airtable.com/',
referrerPolicy: 'same-origin',
body: null,
method: 'GET',
mode: 'cors',
credentials: 'include',
}
)
.then((response) => {
return response.data;
})
.catch(() => {
throw {
message:
'Error Fetching :: Ensure www.airtable.com/templates/featured/<TemplateID> is accessible.',
};
});
return { template: resreq };
}
function unicodeToChar(text) {
return text.replace(/\\u[\dA-F]{4}/gi, function (match) {
return String.fromCharCode(parseInt(match.replace(/\\u/g, ''), 16));
});
}
export default {
initialize,
read,
readView,
readTemplate,
};

2430
packages/nocodb/src/lib/services/syncService/helpers/job.ts

File diff suppressed because it is too large Load Diff

338
packages/nocodb/src/lib/services/syncService/helpers/readAndProcessData.ts

@ -0,0 +1,338 @@
import { AirtableBase } from 'airtable/lib/airtable_base';
import { Api, RelationTypes, TableType, UITypes } from 'nocodb-sdk';
import EntityMap from './EntityMap';
const BULK_DATA_BATCH_SIZE = 500;
const ASSOC_BULK_DATA_BATCH_SIZE = 1000;
const BULK_PARALLEL_PROCESS = 5;
async function readAllData({
table,
fields,
base,
logBasic = (_str) => {},
}: {
table: { title?: string };
fields?;
base: AirtableBase;
logBasic?: (string) => void;
logDetailed?: (string) => void;
}): Promise<EntityMap> {
return new Promise((resolve, reject) => {
let data = null;
const selectParams: any = {
pageSize: 100,
};
if (fields) selectParams.fields = fields;
base(table.title)
.select(selectParams)
.eachPage(
async function page(records, fetchNextPage) {
if (!data) {
data = new EntityMap();
await data.init();
}
for await (const record of records) {
await data.addRow({ id: record.id, ...record.fields });
}
const tmpLength = await data.getCount();
logBasic(
`:: Reading '${table.title}' data :: ${Math.max(
1,
tmpLength - records.length
)} - ${tmpLength}`
);
// To fetch the next page of records, call `fetchNextPage`.
// If there are more records, `page` will get called again.
// If there are no more records, `done` will get called.
fetchNextPage();
},
async function done(err) {
if (err) {
console.error(err);
return reject(err);
}
resolve(data);
}
);
});
}
export async function importData({
projectName,
table,
base,
api,
nocoBaseDataProcessing_v2,
sDB,
logDetailed = (_str) => {},
logBasic = (_str) => {},
}: {
projectName: string;
table: { title?: string; id?: string };
fields?;
base: AirtableBase;
logBasic: (string) => void;
logDetailed: (string) => void;
api: Api<any>;
nocoBaseDataProcessing_v2;
sDB;
}): Promise<EntityMap> {
try {
// @ts-ignore
const records = await readAllData({
table,
base,
logDetailed,
logBasic,
});
await new Promise(async (resolve) => {
const readable = records.getStream();
const allRecordsCount = await records.getCount();
const promises = [];
let tempData = [];
let importedCount = 0;
let activeProcess = 0;
readable.on('data', async (record) => {
promises.push(
new Promise(async (resolve) => {
activeProcess++;
if (activeProcess >= BULK_PARALLEL_PROCESS) readable.pause();
const { id: rid, ...fields } = record;
const r = await nocoBaseDataProcessing_v2(sDB, table, {
id: rid,
fields,
});
tempData.push(r);
if (tempData.length >= BULK_DATA_BATCH_SIZE) {
let insertArray = tempData.splice(0, tempData.length);
await api.dbTableRow.bulkCreate(
'nc',
projectName,
table.id,
insertArray
);
logBasic(
`:: Importing '${
table.title
}' data :: ${importedCount} - ${Math.min(
importedCount + BULK_DATA_BATCH_SIZE,
allRecordsCount
)}`
);
importedCount += insertArray.length;
insertArray = [];
}
activeProcess--;
if (activeProcess < BULK_PARALLEL_PROCESS) readable.resume();
resolve(true);
})
);
});
readable.on('end', async () => {
await Promise.all(promises);
if (tempData.length > 0) {
await api.dbTableRow.bulkCreate(
'nc',
projectName,
table.id,
tempData
);
logBasic(
`:: Importing '${
table.title
}' data :: ${importedCount} - ${Math.min(
importedCount + BULK_DATA_BATCH_SIZE,
allRecordsCount
)}`
);
importedCount += tempData.length;
tempData = [];
}
resolve(true);
});
});
return records;
} catch (e) {
console.log(e);
return null;
}
}
export async function importLTARData({
table,
fields,
base,
api,
projectName,
insertedAssocRef = {},
logDetailed = (_str) => {},
logBasic = (_str) => {},
records,
atNcAliasRef,
ncLinkMappingTable,
}: {
projectName: string;
table: { title?: string; id?: string };
fields;
base: AirtableBase;
logDetailed: (string) => void;
logBasic: (string) => void;
api: Api<any>;
insertedAssocRef: { [assocTableId: string]: boolean };
records?: EntityMap;
atNcAliasRef: {
[ncTableId: string]: {
[ncTitle: string]: string;
};
};
ncLinkMappingTable: Record<string, Record<string, any>>[];
}) {
const assocTableMetas: Array<{
modelMeta: { id?: string; title?: string };
colMeta: { title?: string };
curCol: { title?: string };
refCol: { title?: string };
}> = [];
const allData =
records ||
(await readAllData({
table,
fields,
base,
logDetailed,
logBasic,
}));
const modelMeta: any = await api.dbTable.read(table.id);
for (const colMeta of modelMeta.columns) {
// skip columns which are not LTAR and Many to many
if (
colMeta.uidt !== UITypes.LinkToAnotherRecord ||
colMeta.colOptions.type !== RelationTypes.MANY_TO_MANY
) {
continue;
}
// skip if already inserted
if (colMeta.colOptions.fk_mm_model_id in insertedAssocRef) continue;
// self links: skip if the column under consideration is the add-on column NocoDB creates
if (ncLinkMappingTable.every((a) => a.nc.title !== colMeta.title)) continue;
// mark as inserted
insertedAssocRef[colMeta.colOptions.fk_mm_model_id] = true;
const assocModelMeta: TableType = (await api.dbTable.read(
colMeta.colOptions.fk_mm_model_id
)) as any;
// extract associative table and columns meta
assocTableMetas.push({
modelMeta: assocModelMeta,
colMeta,
curCol: assocModelMeta.columns.find(
(c) => c.id === colMeta.colOptions.fk_mm_child_column_id
),
refCol: assocModelMeta.columns.find(
(c) => c.id === colMeta.colOptions.fk_mm_parent_column_id
),
});
}
let nestedLinkCnt = 0;
// Iterate over all related M2M associative table
for await (const assocMeta of assocTableMetas) {
let assocTableData = [];
let importedCount = 0;
// extract insert data from records
await new Promise((resolve) => {
const promises = [];
const readable = allData.getStream();
let activeProcess = 0;
readable.on('data', async (record) => {
promises.push(
new Promise(async (resolve) => {
activeProcess++;
if (activeProcess >= BULK_PARALLEL_PROCESS) readable.pause();
const { id: _atId, ...rec } = record;
// todo: use actual alias instead of sanitized
assocTableData.push(
...(
rec?.[atNcAliasRef[table.id][assocMeta.colMeta.title]] || []
).map((id) => ({
[assocMeta.curCol.title]: record.id,
[assocMeta.refCol.title]: id,
}))
);
if (assocTableData.length >= ASSOC_BULK_DATA_BATCH_SIZE) {
let insertArray = assocTableData.splice(0, assocTableData.length);
logBasic(
`:: Importing '${
table.title
}' LTAR data :: ${importedCount} - ${Math.min(
importedCount + ASSOC_BULK_DATA_BATCH_SIZE,
insertArray.length
)}`
);
await api.dbTableRow.bulkCreate(
'nc',
projectName,
assocMeta.modelMeta.id,
insertArray
);
importedCount += insertArray.length;
insertArray = [];
}
activeProcess--;
if (activeProcess < BULK_PARALLEL_PROCESS) readable.resume();
resolve(true);
})
);
});
readable.on('end', async () => {
await Promise.all(promises);
if (assocTableData.length >= 0) {
logBasic(
`:: Importing '${
table.title
}' LTAR data :: ${importedCount} - ${Math.min(
importedCount + ASSOC_BULK_DATA_BATCH_SIZE,
assocTableData.length
)}`
);
await api.dbTableRow.bulkCreate(
'nc',
projectName,
assocMeta.modelMeta.id,
assocTableData
);
importedCount += assocTableData.length;
assocTableData = [];
}
resolve(true);
});
});
nestedLinkCnt += importedCount;
}
return nestedLinkCnt;
}

31
packages/nocodb/src/lib/services/syncService/helpers/syncMap.ts

@ -0,0 +1,31 @@
export const mapTbl = {};
// static mapping records between aTblId && ncId
export const addToMappingTbl = function addToMappingTbl(
aTblId,
ncId,
ncName,
parent?
) {
mapTbl[aTblId] = {
ncId: ncId,
ncParent: parent,
// name added to assist in quick debug
ncName: ncName,
};
};
// get NcID from airtable ID
export const getNcIdFromAtId = function getNcIdFromAtId(aId) {
return mapTbl[aId]?.ncId;
};
// get nc Parent from airtable ID
export const getNcParentFromAtId = function getNcParentFromAtId(aId) {
return mapTbl[aId]?.ncParent;
};
// get nc-title from airtable ID
export const getNcNameFromAtId = function getNcNameFromAtId(aId) {
return mapTbl[aId]?.ncName;
};

47
packages/nocodb/src/lib/services/syncService/index.ts

@ -0,0 +1,47 @@
import { Tele } from 'nc-help';
import { PagedResponseImpl } from '../../meta/helpers/PagedResponse'
import { Project, SyncSource } from '../../models'
export async function syncSourceList(param: {
projectId: string;
baseId?: string;
}) {
return new PagedResponseImpl(
await SyncSource.list(param.projectId, param.baseId)
);
}
export async function syncCreate(param: {
projectId: string;
baseId?: string;
userId: string;
// todo: define type
syncPayload: Partial<SyncSource>;
}) {
Tele.emit('evt', { evt_type: 'webhooks:created' });
const project = await Project.getWithInfo(param.projectId);
const sync = await SyncSource.insert({
...param.syncPayload,
fk_user_id: param.userId,
base_id: param.baseId ? param.baseId : project.bases[0].id,
project_id: param.projectId,
});
return sync;
}
export async function syncDelete(param: { syncId: string }) {
Tele.emit('evt', { evt_type: 'webhooks:deleted' });
return await SyncSource.delete(param.syncId)
}
export async function syncUpdate(param:{
syncId: string;
syncPayload: Partial<SyncSource>;
}) {
Tele.emit('evt', { evt_type: 'webhooks:updated' });
return await SyncSource.update(param.syncId, param.syncPayload)
}
export { default as airtableImportJob } from './helpers/job';
Loading…
Cancel
Save