mirror of https://github.com/nocodb/nocodb
mertmit
2 years ago
8 changed files with 0 additions and 1035 deletions
@ -1,222 +0,0 @@ |
|||||||
import { Readable } from 'stream'; |
|
||||||
import sqlite3 from 'sqlite3'; |
|
||||||
|
|
||||||
class EntityMap { |
|
||||||
initialized: boolean; |
|
||||||
cols: string[]; |
|
||||||
db: any; |
|
||||||
|
|
||||||
constructor(...args) { |
|
||||||
this.initialized = false; |
|
||||||
this.cols = args.map((arg) => processKey(arg)); |
|
||||||
this.db = new Promise((resolve, reject) => { |
|
||||||
const db = new sqlite3.Database(':memory:'); |
|
||||||
|
|
||||||
const colStatement = |
|
||||||
this.cols.length > 0 |
|
||||||
? this.cols.join(' TEXT, ') + ' TEXT' |
|
||||||
: 'mappingPlaceholder TEXT'; |
|
||||||
db.run(`CREATE TABLE mapping (${colStatement})`, (err) => { |
|
||||||
if (err) { |
|
||||||
console.log(err); |
|
||||||
reject(err); |
|
||||||
} |
|
||||||
resolve(db); |
|
||||||
}); |
|
||||||
}); |
|
||||||
} |
|
||||||
|
|
||||||
async init() { |
|
||||||
if (!this.initialized) { |
|
||||||
this.db = await this.db; |
|
||||||
this.initialized = true; |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
destroy() { |
|
||||||
if (this.initialized && this.db) { |
|
||||||
this.db.close(); |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
async addRow(row) { |
|
||||||
if (!this.initialized) { |
|
||||||
throw 'Please initialize first!'; |
|
||||||
} |
|
||||||
|
|
||||||
const cols = Object.keys(row).map((key) => processKey(key)); |
|
||||||
const colStatement = cols.map((key) => `'${key}'`).join(', '); |
|
||||||
const questionMarks = cols.map(() => '?').join(', '); |
|
||||||
|
|
||||||
const promises = []; |
|
||||||
|
|
||||||
for (const col of cols.filter((col) => !this.cols.includes(col))) { |
|
||||||
promises.push( |
|
||||||
new Promise((resolve, reject) => { |
|
||||||
this.db.run(`ALTER TABLE mapping ADD '${col}' TEXT;`, (err) => { |
|
||||||
if (err) { |
|
||||||
console.log(err); |
|
||||||
reject(err); |
|
||||||
} |
|
||||||
this.cols.push(col); |
|
||||||
resolve(true); |
|
||||||
}); |
|
||||||
}), |
|
||||||
); |
|
||||||
} |
|
||||||
|
|
||||||
await Promise.all(promises); |
|
||||||
|
|
||||||
const values = Object.values(row).map((val) => { |
|
||||||
if (typeof val === 'object') { |
|
||||||
return `JSON::${JSON.stringify(val)}`; |
|
||||||
} |
|
||||||
return val; |
|
||||||
}); |
|
||||||
|
|
||||||
return new Promise((resolve, reject) => { |
|
||||||
this.db.run( |
|
||||||
`INSERT INTO mapping (${colStatement}) VALUES (${questionMarks})`, |
|
||||||
values, |
|
||||||
(err) => { |
|
||||||
if (err) { |
|
||||||
console.log(err); |
|
||||||
reject(err); |
|
||||||
} |
|
||||||
resolve(true); |
|
||||||
}, |
|
||||||
); |
|
||||||
}); |
|
||||||
} |
|
||||||
|
|
||||||
getRow(col, val, res = []): Promise<Record<string, any>> { |
|
||||||
if (!this.initialized) { |
|
||||||
throw 'Please initialize first!'; |
|
||||||
} |
|
||||||
return new Promise((resolve, reject) => { |
|
||||||
col = processKey(col); |
|
||||||
res = res.map((r) => processKey(r)); |
|
||||||
this.db.get( |
|
||||||
`SELECT ${ |
|
||||||
res.length ? res.join(', ') : '*' |
|
||||||
} FROM mapping WHERE ${col} = ?`,
|
|
||||||
[val], |
|
||||||
(err, rs) => { |
|
||||||
if (err) { |
|
||||||
console.log(err); |
|
||||||
reject(err); |
|
||||||
} |
|
||||||
if (rs) { |
|
||||||
rs = processResponseRow(rs); |
|
||||||
} |
|
||||||
resolve(rs); |
|
||||||
}, |
|
||||||
); |
|
||||||
}); |
|
||||||
} |
|
||||||
|
|
||||||
getCount(): Promise<number> { |
|
||||||
if (!this.initialized) { |
|
||||||
throw 'Please initialize first!'; |
|
||||||
} |
|
||||||
return new Promise((resolve, reject) => { |
|
||||||
this.db.get(`SELECT COUNT(*) as count FROM mapping`, (err, rs) => { |
|
||||||
if (err) { |
|
||||||
console.log(err); |
|
||||||
reject(err); |
|
||||||
} |
|
||||||
resolve(rs.count); |
|
||||||
}); |
|
||||||
}); |
|
||||||
} |
|
||||||
|
|
||||||
getStream(res = []): DBStream { |
|
||||||
if (!this.initialized) { |
|
||||||
throw 'Please initialize first!'; |
|
||||||
} |
|
||||||
res = res.map((r) => processKey(r)); |
|
||||||
return new DBStream( |
|
||||||
this.db, |
|
||||||
`SELECT ${res.length ? res.join(', ') : '*'} FROM mapping`, |
|
||||||
); |
|
||||||
} |
|
||||||
|
|
||||||
getLimit(limit, offset, res = []): Promise<Record<string, any>[]> { |
|
||||||
if (!this.initialized) { |
|
||||||
throw 'Please initialize first!'; |
|
||||||
} |
|
||||||
return new Promise((resolve, reject) => { |
|
||||||
res = res.map((r) => processKey(r)); |
|
||||||
this.db.all( |
|
||||||
`SELECT ${ |
|
||||||
res.length ? res.join(', ') : '*' |
|
||||||
} FROM mapping LIMIT ${limit} OFFSET ${offset}`,
|
|
||||||
(err, rs) => { |
|
||||||
if (err) { |
|
||||||
console.log(err); |
|
||||||
reject(err); |
|
||||||
} |
|
||||||
for (let row of rs) { |
|
||||||
row = processResponseRow(row); |
|
||||||
} |
|
||||||
resolve(rs); |
|
||||||
}, |
|
||||||
); |
|
||||||
}); |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
class DBStream extends Readable { |
|
||||||
db: any; |
|
||||||
stmt: any; |
|
||||||
sql: any; |
|
||||||
|
|
||||||
constructor(db, sql) { |
|
||||||
super({ objectMode: true }); |
|
||||||
this.db = db; |
|
||||||
this.sql = sql; |
|
||||||
this.stmt = this.db.prepare(this.sql); |
|
||||||
this.on('end', () => this.stmt.finalize()); |
|
||||||
} |
|
||||||
|
|
||||||
_read() { |
|
||||||
const stream = this; |
|
||||||
this.stmt.get(function (err, result) { |
|
||||||
if (err) { |
|
||||||
stream.emit('error', err); |
|
||||||
} else { |
|
||||||
if (result) { |
|
||||||
result = processResponseRow(result); |
|
||||||
} |
|
||||||
stream.push(result || null); |
|
||||||
} |
|
||||||
}); |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
function processResponseRow(res: any) { |
|
||||||
for (const key of Object.keys(res)) { |
|
||||||
if (res[key] && res[key].startsWith('JSON::')) { |
|
||||||
try { |
|
||||||
res[key] = JSON.parse(res[key].replace('JSON::', '')); |
|
||||||
} catch (e) { |
|
||||||
console.log(e); |
|
||||||
} |
|
||||||
} |
|
||||||
if (revertKey(key) !== key) { |
|
||||||
res[revertKey(key)] = res[key]; |
|
||||||
delete res[key]; |
|
||||||
} |
|
||||||
} |
|
||||||
return res; |
|
||||||
} |
|
||||||
|
|
||||||
function processKey(key) { |
|
||||||
return key.replace(/'/g, "''").replace(/[A-Z]/g, (match) => `_${match}`); |
|
||||||
} |
|
||||||
|
|
||||||
function revertKey(key) { |
|
||||||
return key.replace(/''/g, "'").replace(/_[A-Z]/g, (match) => match[1]); |
|
||||||
} |
|
||||||
|
|
||||||
export default EntityMap; |
|
@ -1,242 +0,0 @@ |
|||||||
import axios from 'axios'; |
|
||||||
|
|
||||||
const info: any = { |
|
||||||
initialized: false, |
|
||||||
}; |
|
||||||
|
|
||||||
async function initialize(shareId) { |
|
||||||
info.cookie = ''; |
|
||||||
const url = `https://airtable.com/${shareId}`; |
|
||||||
|
|
||||||
try { |
|
||||||
const hreq = await axios |
|
||||||
.get(url, { |
|
||||||
headers: { |
|
||||||
accept: |
|
||||||
'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9', |
|
||||||
'accept-language': 'en-US,en;q=0.9', |
|
||||||
'sec-ch-ua': |
|
||||||
'" Not A;Brand";v="99", "Chromium";v="100", "Google Chrome";v="100"', |
|
||||||
'sec-ch-ua-mobile': '?0', |
|
||||||
'sec-ch-ua-platform': '"Linux"', |
|
||||||
'sec-fetch-dest': 'document', |
|
||||||
'sec-fetch-mode': 'navigate', |
|
||||||
'sec-fetch-site': 'none', |
|
||||||
'sec-fetch-user': '?1', |
|
||||||
'upgrade-insecure-requests': '1', |
|
||||||
'User-Agent': |
|
||||||
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.4896.88 Safari/537.36', |
|
||||||
}, |
|
||||||
// @ts-ignore
|
|
||||||
referrerPolicy: 'strict-origin-when-cross-origin', |
|
||||||
body: null, |
|
||||||
method: 'GET', |
|
||||||
}) |
|
||||||
.then((response) => { |
|
||||||
for (const ck of response.headers['set-cookie']) { |
|
||||||
info.cookie += ck.split(';')[0] + '; '; |
|
||||||
} |
|
||||||
return response.data; |
|
||||||
}) |
|
||||||
.catch(() => { |
|
||||||
throw { |
|
||||||
message: |
|
||||||
'Invalid Shared Base ID :: Ensure www.airtable.com/<SharedBaseID> is accessible. Refer https://bit.ly/3x0OdXI for details', |
|
||||||
}; |
|
||||||
}); |
|
||||||
|
|
||||||
info.headers = JSON.parse( |
|
||||||
hreq.match(/(?<=var headers =)(.*)(?=;)/g)[0].trim(), |
|
||||||
); |
|
||||||
info.link = unicodeToChar(hreq.match(/(?<=fetch\(")(.*)(?=")/g)[0].trim()); |
|
||||||
info.baseInfo = decodeURIComponent(info.link) |
|
||||||
.match(/{(.*)}/g)[0] |
|
||||||
.split('&') |
|
||||||
.reduce((result, el) => { |
|
||||||
try { |
|
||||||
return Object.assign( |
|
||||||
result, |
|
||||||
JSON.parse(el.includes('=') ? el.split('=')[1] : el), |
|
||||||
); |
|
||||||
} catch (e) { |
|
||||||
if (el.includes('=')) { |
|
||||||
return Object.assign(result, { |
|
||||||
[el.split('=')[0]]: el.split('=')[1], |
|
||||||
}); |
|
||||||
} |
|
||||||
} |
|
||||||
}, {}); |
|
||||||
info.baseId = info.baseInfo.applicationId; |
|
||||||
info.initialized = true; |
|
||||||
} catch (e) { |
|
||||||
console.log(e); |
|
||||||
info.initialized = false; |
|
||||||
if (e.message) { |
|
||||||
throw e; |
|
||||||
} else { |
|
||||||
throw { |
|
||||||
message: |
|
||||||
'Error processing Shared Base :: Ensure www.airtable.com/<SharedBaseID> is accessible. Refer https://bit.ly/3x0OdXI for details', |
|
||||||
}; |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
async function read() { |
|
||||||
if (info.initialized) { |
|
||||||
const resreq = await axios('https://airtable.com' + info.link, { |
|
||||||
headers: { |
|
||||||
accept: '*/*', |
|
||||||
'accept-language': 'en-US,en;q=0.9', |
|
||||||
'sec-ch-ua': |
|
||||||
'" Not A;Brand";v="99", "Chromium";v="100", "Google Chrome";v="100"', |
|
||||||
'sec-ch-ua-mobile': '?0', |
|
||||||
'sec-ch-ua-platform': '"Linux"', |
|
||||||
'sec-fetch-dest': 'empty', |
|
||||||
'sec-fetch-mode': 'cors', |
|
||||||
'sec-fetch-site': 'same-origin', |
|
||||||
'User-Agent': |
|
||||||
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.4896.88 Safari/537.36', |
|
||||||
'x-time-zone': 'Europe/Berlin', |
|
||||||
cookie: info.cookie, |
|
||||||
...info.headers, |
|
||||||
}, |
|
||||||
// @ts-ignore
|
|
||||||
referrerPolicy: 'no-referrer', |
|
||||||
body: null, |
|
||||||
method: 'GET', |
|
||||||
}) |
|
||||||
.then((response) => { |
|
||||||
return response.data; |
|
||||||
}) |
|
||||||
.catch(() => { |
|
||||||
throw { |
|
||||||
message: |
|
||||||
'Error Reading :: Ensure www.airtable.com/<SharedBaseID> is accessible. Refer https://bit.ly/3x0OdXI for details', |
|
||||||
}; |
|
||||||
}); |
|
||||||
|
|
||||||
return { |
|
||||||
schema: resreq.data, |
|
||||||
baseId: info.baseId, |
|
||||||
baseInfo: info.baseInfo, |
|
||||||
}; |
|
||||||
} else { |
|
||||||
throw { |
|
||||||
message: 'Error Initializing :: please try again !!', |
|
||||||
}; |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
async function readView(viewId) { |
|
||||||
if (info.initialized) { |
|
||||||
const resreq = await axios( |
|
||||||
`https://airtable.com/v0.3/view/${viewId}/readData?` + |
|
||||||
`stringifiedObjectParams=${encodeURIComponent('{}')}&requestId=${ |
|
||||||
info.baseInfo.requestId |
|
||||||
}&accessPolicy=${encodeURIComponent( |
|
||||||
JSON.stringify({ |
|
||||||
allowedActions: info.baseInfo.allowedActions, |
|
||||||
shareId: info.baseInfo.shareId, |
|
||||||
applicationId: info.baseInfo.applicationId, |
|
||||||
generationNumber: info.baseInfo.generationNumber, |
|
||||||
expires: info.baseInfo.expires, |
|
||||||
signature: info.baseInfo.signature, |
|
||||||
}), |
|
||||||
)}`,
|
|
||||||
{ |
|
||||||
headers: { |
|
||||||
accept: '*/*', |
|
||||||
'accept-language': 'en-US,en;q=0.9', |
|
||||||
'sec-ch-ua': |
|
||||||
'" Not A;Brand";v="99", "Chromium";v="100", "Google Chrome";v="100"', |
|
||||||
'sec-ch-ua-mobile': '?0', |
|
||||||
'sec-ch-ua-platform': '"Linux"', |
|
||||||
'sec-fetch-dest': 'empty', |
|
||||||
'sec-fetch-mode': 'cors', |
|
||||||
'sec-fetch-site': 'same-origin', |
|
||||||
'User-Agent': |
|
||||||
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.4896.88 Safari/537.36', |
|
||||||
'x-time-zone': 'Europe/Berlin', |
|
||||||
cookie: info.cookie, |
|
||||||
...info.headers, |
|
||||||
}, |
|
||||||
// @ts-ignore
|
|
||||||
referrerPolicy: 'no-referrer', |
|
||||||
body: null, |
|
||||||
method: 'GET', |
|
||||||
}, |
|
||||||
) |
|
||||||
.then((response) => { |
|
||||||
return response.data; |
|
||||||
}) |
|
||||||
.catch(() => { |
|
||||||
throw { |
|
||||||
message: |
|
||||||
'Error Reading View :: Ensure www.airtable.com/<SharedBaseID> is accessible. Refer https://bit.ly/3x0OdXI for details', |
|
||||||
}; |
|
||||||
}); |
|
||||||
return { view: resreq.data }; |
|
||||||
} else { |
|
||||||
throw { |
|
||||||
message: 'Error Initializing :: please try again !!', |
|
||||||
}; |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
async function readTemplate(templateId) { |
|
||||||
if (!info.initialized) { |
|
||||||
await initialize('shrO8aYf3ybwSdDKn'); |
|
||||||
} |
|
||||||
const resreq = await axios( |
|
||||||
`https://www.airtable.com/v0.3/exploreApplications/${templateId}`, |
|
||||||
{ |
|
||||||
headers: { |
|
||||||
accept: '*/*', |
|
||||||
'accept-language': 'en-US,en;q=0.9', |
|
||||||
'sec-ch-ua': |
|
||||||
'" Not A;Brand";v="99", "Chromium";v="100", "Google Chrome";v="100"', |
|
||||||
'sec-ch-ua-mobile': '?0', |
|
||||||
'sec-ch-ua-platform': '"Linux"', |
|
||||||
'sec-fetch-dest': 'empty', |
|
||||||
'sec-fetch-mode': 'cors', |
|
||||||
'sec-fetch-site': 'same-origin', |
|
||||||
'User-Agent': |
|
||||||
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.4896.88 Safari/537.36', |
|
||||||
'x-time-zone': 'Europe/Berlin', |
|
||||||
cookie: info.cookie, |
|
||||||
...info.headers, |
|
||||||
}, |
|
||||||
// @ts-ignore
|
|
||||||
referrer: 'https://www.airtable.com/', |
|
||||||
referrerPolicy: 'same-origin', |
|
||||||
body: null, |
|
||||||
method: 'GET', |
|
||||||
mode: 'cors', |
|
||||||
credentials: 'include', |
|
||||||
}, |
|
||||||
) |
|
||||||
.then((response) => { |
|
||||||
return response.data; |
|
||||||
}) |
|
||||||
.catch(() => { |
|
||||||
throw { |
|
||||||
message: |
|
||||||
'Error Fetching :: Ensure www.airtable.com/templates/featured/<TemplateID> is accessible.', |
|
||||||
}; |
|
||||||
}); |
|
||||||
return { template: resreq }; |
|
||||||
} |
|
||||||
|
|
||||||
function unicodeToChar(text) { |
|
||||||
return text.replace(/\\u[\dA-F]{4}/gi, function (match) { |
|
||||||
return String.fromCharCode(parseInt(match.replace(/\\u/g, ''), 16)); |
|
||||||
}); |
|
||||||
} |
|
||||||
|
|
||||||
export default { |
|
||||||
initialize, |
|
||||||
read, |
|
||||||
readView, |
|
||||||
readTemplate, |
|
||||||
}; |
|
@ -1,362 +0,0 @@ |
|||||||
/* eslint-disable no-async-promise-executor */ |
|
||||||
import { RelationTypes, UITypes } from 'nocodb-sdk'; |
|
||||||
import EntityMap from './EntityMap'; |
|
||||||
import type { BulkDataAliasService } from '../../../../services/bulk-data-alias.service'; |
|
||||||
import type { TablesService } from '../../../../services/tables.service'; |
|
||||||
// @ts-ignore
|
|
||||||
import type { AirtableBase } from 'airtable/lib/airtable_base'; |
|
||||||
import type { TableType } from 'nocodb-sdk'; |
|
||||||
|
|
||||||
const BULK_DATA_BATCH_SIZE = 500; |
|
||||||
const ASSOC_BULK_DATA_BATCH_SIZE = 1000; |
|
||||||
const BULK_PARALLEL_PROCESS = 5; |
|
||||||
|
|
||||||
interface AirtableImportContext { |
|
||||||
bulkDataService: BulkDataAliasService; |
|
||||||
tableService: TablesService; |
|
||||||
} |
|
||||||
|
|
||||||
async function readAllData({ |
|
||||||
table, |
|
||||||
fields, |
|
||||||
base, |
|
||||||
logBasic = (_str) => {}, |
|
||||||
services, |
|
||||||
}: { |
|
||||||
table: { title?: string }; |
|
||||||
fields?; |
|
||||||
base: AirtableBase; |
|
||||||
logBasic?: (string) => void; |
|
||||||
logDetailed?: (string) => void; |
|
||||||
services: AirtableImportContext; |
|
||||||
}): Promise<EntityMap> { |
|
||||||
return new Promise((resolve, reject) => { |
|
||||||
let data = null; |
|
||||||
|
|
||||||
const selectParams: any = { |
|
||||||
pageSize: 100, |
|
||||||
}; |
|
||||||
|
|
||||||
if (fields) selectParams.fields = fields; |
|
||||||
|
|
||||||
base(table.title) |
|
||||||
.select(selectParams) |
|
||||||
.eachPage( |
|
||||||
async function page(records, fetchNextPage) { |
|
||||||
if (!data) { |
|
||||||
data = new EntityMap(); |
|
||||||
await data.init(); |
|
||||||
} |
|
||||||
|
|
||||||
for await (const record of records) { |
|
||||||
await data.addRow({ id: record.id, ...record.fields }); |
|
||||||
} |
|
||||||
|
|
||||||
const tmpLength = await data.getCount(); |
|
||||||
|
|
||||||
logBasic( |
|
||||||
`:: Reading '${table.title}' data :: ${Math.max( |
|
||||||
1, |
|
||||||
tmpLength - records.length, |
|
||||||
)} - ${tmpLength}`,
|
|
||||||
); |
|
||||||
|
|
||||||
// To fetch the next page of records, call `fetchNextPage`.
|
|
||||||
// If there are more records, `page` will get called again.
|
|
||||||
// If there are no more records, `done` will get called.
|
|
||||||
fetchNextPage(); |
|
||||||
}, |
|
||||||
async function done(err) { |
|
||||||
if (err) { |
|
||||||
console.error(err); |
|
||||||
return reject(err); |
|
||||||
} |
|
||||||
resolve(data); |
|
||||||
}, |
|
||||||
); |
|
||||||
}); |
|
||||||
} |
|
||||||
|
|
||||||
export async function importData({ |
|
||||||
projectName, |
|
||||||
table, |
|
||||||
base, |
|
||||||
nocoBaseDataProcessing_v2, |
|
||||||
sDB, |
|
||||||
logDetailed = (_str) => {}, |
|
||||||
logBasic = (_str) => {}, |
|
||||||
services, |
|
||||||
}: { |
|
||||||
projectName: string; |
|
||||||
table: { title?: string; id?: string }; |
|
||||||
fields?; |
|
||||||
base: AirtableBase; |
|
||||||
logBasic: (string) => void; |
|
||||||
logDetailed: (string) => void; |
|
||||||
nocoBaseDataProcessing_v2; |
|
||||||
sDB; |
|
||||||
services: AirtableImportContext; |
|
||||||
}): Promise<EntityMap> { |
|
||||||
try { |
|
||||||
// @ts-ignore
|
|
||||||
const records = await readAllData({ |
|
||||||
table, |
|
||||||
base, |
|
||||||
logDetailed, |
|
||||||
logBasic, |
|
||||||
}); |
|
||||||
|
|
||||||
await new Promise(async (resolve) => { |
|
||||||
const readable = records.getStream(); |
|
||||||
const allRecordsCount = await records.getCount(); |
|
||||||
const promises = []; |
|
||||||
let tempData = []; |
|
||||||
let importedCount = 0; |
|
||||||
let activeProcess = 0; |
|
||||||
readable.on('data', async (record) => { |
|
||||||
promises.push( |
|
||||||
new Promise(async (resolve) => { |
|
||||||
activeProcess++; |
|
||||||
if (activeProcess >= BULK_PARALLEL_PROCESS) readable.pause(); |
|
||||||
const { id: rid, ...fields } = record; |
|
||||||
const r = await nocoBaseDataProcessing_v2(sDB, table, { |
|
||||||
id: rid, |
|
||||||
fields, |
|
||||||
}); |
|
||||||
tempData.push(r); |
|
||||||
|
|
||||||
if (tempData.length >= BULK_DATA_BATCH_SIZE) { |
|
||||||
let insertArray = tempData.splice(0, tempData.length); |
|
||||||
|
|
||||||
await services.bulkDataService.bulkDataInsert({ |
|
||||||
projectName, |
|
||||||
tableName: table.title, |
|
||||||
body: insertArray, |
|
||||||
cookie: {}, |
|
||||||
}); |
|
||||||
|
|
||||||
logBasic( |
|
||||||
`:: Importing '${ |
|
||||||
table.title |
|
||||||
}' data :: ${importedCount} - ${Math.min( |
|
||||||
importedCount + BULK_DATA_BATCH_SIZE, |
|
||||||
allRecordsCount, |
|
||||||
)}`,
|
|
||||||
); |
|
||||||
importedCount += insertArray.length; |
|
||||||
insertArray = []; |
|
||||||
} |
|
||||||
activeProcess--; |
|
||||||
if (activeProcess < BULK_PARALLEL_PROCESS) readable.resume(); |
|
||||||
resolve(true); |
|
||||||
}), |
|
||||||
); |
|
||||||
}); |
|
||||||
readable.on('end', async () => { |
|
||||||
await Promise.all(promises); |
|
||||||
if (tempData.length > 0) { |
|
||||||
await services.bulkDataService.bulkDataInsert({ |
|
||||||
projectName, |
|
||||||
tableName: table.title, |
|
||||||
body: tempData, |
|
||||||
cookie: {}, |
|
||||||
}); |
|
||||||
|
|
||||||
logBasic( |
|
||||||
`:: Importing '${ |
|
||||||
table.title |
|
||||||
}' data :: ${importedCount} - ${Math.min( |
|
||||||
importedCount + BULK_DATA_BATCH_SIZE, |
|
||||||
allRecordsCount, |
|
||||||
)}`,
|
|
||||||
); |
|
||||||
importedCount += tempData.length; |
|
||||||
tempData = []; |
|
||||||
} |
|
||||||
resolve(true); |
|
||||||
}); |
|
||||||
}); |
|
||||||
|
|
||||||
return records; |
|
||||||
} catch (e) { |
|
||||||
console.log(e); |
|
||||||
return null; |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
export async function importLTARData({ |
|
||||||
table, |
|
||||||
fields, |
|
||||||
base, |
|
||||||
projectName, |
|
||||||
insertedAssocRef = {}, |
|
||||||
logDetailed = (_str) => {}, |
|
||||||
logBasic = (_str) => {}, |
|
||||||
records, |
|
||||||
atNcAliasRef, |
|
||||||
ncLinkMappingTable, |
|
||||||
syncDB, |
|
||||||
services, |
|
||||||
}: { |
|
||||||
projectName: string; |
|
||||||
table: { title?: string; id?: string }; |
|
||||||
fields; |
|
||||||
base: AirtableBase; |
|
||||||
logDetailed: (string) => void; |
|
||||||
logBasic: (string) => void; |
|
||||||
insertedAssocRef: { [assocTableId: string]: boolean }; |
|
||||||
records?: EntityMap; |
|
||||||
atNcAliasRef: { |
|
||||||
[ncTableId: string]: { |
|
||||||
[ncTitle: string]: string; |
|
||||||
}; |
|
||||||
}; |
|
||||||
ncLinkMappingTable: Record<string, Record<string, any>>[]; |
|
||||||
syncDB; |
|
||||||
services: AirtableImportContext; |
|
||||||
}) { |
|
||||||
const assocTableMetas: Array<{ |
|
||||||
modelMeta: { id?: string; title?: string }; |
|
||||||
colMeta: { title?: string }; |
|
||||||
curCol: { title?: string }; |
|
||||||
refCol: { title?: string }; |
|
||||||
}> = []; |
|
||||||
const allData = |
|
||||||
records || |
|
||||||
(await readAllData({ |
|
||||||
table, |
|
||||||
fields, |
|
||||||
base, |
|
||||||
logDetailed, |
|
||||||
logBasic, |
|
||||||
services, |
|
||||||
})); |
|
||||||
|
|
||||||
const modelMeta: any = |
|
||||||
await services.tableService.getTableWithAccessibleViews({ |
|
||||||
tableId: table.id, |
|
||||||
user: syncDB.user, |
|
||||||
}); |
|
||||||
|
|
||||||
for (const colMeta of modelMeta.columns) { |
|
||||||
// skip columns which are not LTAR and Many to many
|
|
||||||
if ( |
|
||||||
colMeta.uidt !== UITypes.LinkToAnotherRecord || |
|
||||||
colMeta.colOptions.type !== RelationTypes.MANY_TO_MANY |
|
||||||
) { |
|
||||||
continue; |
|
||||||
} |
|
||||||
|
|
||||||
// skip if already inserted
|
|
||||||
if (colMeta.colOptions.fk_mm_model_id in insertedAssocRef) continue; |
|
||||||
|
|
||||||
// self links: skip if the column under consideration is the add-on column NocoDB creates
|
|
||||||
if (ncLinkMappingTable.every((a) => a.nc.title !== colMeta.title)) continue; |
|
||||||
|
|
||||||
// mark as inserted
|
|
||||||
insertedAssocRef[colMeta.colOptions.fk_mm_model_id] = true; |
|
||||||
|
|
||||||
const assocModelMeta: TableType = |
|
||||||
(await services.tableService.getTableWithAccessibleViews({ |
|
||||||
tableId: colMeta.colOptions.fk_mm_model_id, |
|
||||||
user: syncDB.user, |
|
||||||
})) as any; |
|
||||||
|
|
||||||
// extract associative table and columns meta
|
|
||||||
assocTableMetas.push({ |
|
||||||
modelMeta: assocModelMeta, |
|
||||||
colMeta, |
|
||||||
curCol: assocModelMeta.columns.find( |
|
||||||
(c) => c.id === colMeta.colOptions.fk_mm_child_column_id, |
|
||||||
), |
|
||||||
refCol: assocModelMeta.columns.find( |
|
||||||
(c) => c.id === colMeta.colOptions.fk_mm_parent_column_id, |
|
||||||
), |
|
||||||
}); |
|
||||||
} |
|
||||||
|
|
||||||
let nestedLinkCnt = 0; |
|
||||||
// Iterate over all related M2M associative table
|
|
||||||
for await (const assocMeta of assocTableMetas) { |
|
||||||
let assocTableData = []; |
|
||||||
let importedCount = 0; |
|
||||||
|
|
||||||
// extract insert data from records
|
|
||||||
await new Promise((resolve) => { |
|
||||||
const promises = []; |
|
||||||
const readable = allData.getStream(); |
|
||||||
let activeProcess = 0; |
|
||||||
readable.on('data', async (record) => { |
|
||||||
promises.push( |
|
||||||
new Promise(async (resolve) => { |
|
||||||
activeProcess++; |
|
||||||
if (activeProcess >= BULK_PARALLEL_PROCESS) readable.pause(); |
|
||||||
const { id: _atId, ...rec } = record; |
|
||||||
|
|
||||||
// todo: use actual alias instead of sanitized
|
|
||||||
assocTableData.push( |
|
||||||
...( |
|
||||||
rec?.[atNcAliasRef[table.id][assocMeta.colMeta.title]] || [] |
|
||||||
).map((id) => ({ |
|
||||||
[assocMeta.curCol.title]: record.id, |
|
||||||
[assocMeta.refCol.title]: id, |
|
||||||
})), |
|
||||||
); |
|
||||||
|
|
||||||
if (assocTableData.length >= ASSOC_BULK_DATA_BATCH_SIZE) { |
|
||||||
let insertArray = assocTableData.splice(0, assocTableData.length); |
|
||||||
logBasic( |
|
||||||
`:: Importing '${ |
|
||||||
table.title |
|
||||||
}' LTAR data :: ${importedCount} - ${Math.min( |
|
||||||
importedCount + ASSOC_BULK_DATA_BATCH_SIZE, |
|
||||||
insertArray.length, |
|
||||||
)}`,
|
|
||||||
); |
|
||||||
|
|
||||||
await services.bulkDataService.bulkDataInsert({ |
|
||||||
projectName, |
|
||||||
tableName: assocMeta.modelMeta.title, |
|
||||||
body: insertArray, |
|
||||||
cookie: {}, |
|
||||||
}); |
|
||||||
|
|
||||||
importedCount += insertArray.length; |
|
||||||
insertArray = []; |
|
||||||
} |
|
||||||
activeProcess--; |
|
||||||
if (activeProcess < BULK_PARALLEL_PROCESS) readable.resume(); |
|
||||||
resolve(true); |
|
||||||
}), |
|
||||||
); |
|
||||||
}); |
|
||||||
readable.on('end', async () => { |
|
||||||
await Promise.all(promises); |
|
||||||
if (assocTableData.length >= 0) { |
|
||||||
logBasic( |
|
||||||
`:: Importing '${ |
|
||||||
table.title |
|
||||||
}' LTAR data :: ${importedCount} - ${Math.min( |
|
||||||
importedCount + ASSOC_BULK_DATA_BATCH_SIZE, |
|
||||||
assocTableData.length, |
|
||||||
)}`,
|
|
||||||
); |
|
||||||
|
|
||||||
await services.bulkDataService.bulkDataInsert({ |
|
||||||
projectName, |
|
||||||
tableName: assocMeta.modelMeta.title, |
|
||||||
body: assocTableData, |
|
||||||
cookie: {}, |
|
||||||
}); |
|
||||||
|
|
||||||
importedCount += assocTableData.length; |
|
||||||
assocTableData = []; |
|
||||||
} |
|
||||||
resolve(true); |
|
||||||
}); |
|
||||||
}); |
|
||||||
|
|
||||||
nestedLinkCnt += importedCount; |
|
||||||
} |
|
||||||
return nestedLinkCnt; |
|
||||||
} |
|
@ -1,31 +0,0 @@ |
|||||||
export const mapTbl = {}; |
|
||||||
|
|
||||||
// static mapping records between aTblId && ncId
|
|
||||||
export const addToMappingTbl = function addToMappingTbl( |
|
||||||
aTblId, |
|
||||||
ncId, |
|
||||||
ncName, |
|
||||||
parent?, |
|
||||||
) { |
|
||||||
mapTbl[aTblId] = { |
|
||||||
ncId: ncId, |
|
||||||
ncParent: parent, |
|
||||||
// name added to assist in quick debug
|
|
||||||
ncName: ncName, |
|
||||||
}; |
|
||||||
}; |
|
||||||
|
|
||||||
// get NcID from airtable ID
|
|
||||||
export const getNcIdFromAtId = function getNcIdFromAtId(aId) { |
|
||||||
return mapTbl[aId]?.ncId; |
|
||||||
}; |
|
||||||
|
|
||||||
// get nc Parent from airtable ID
|
|
||||||
export const getNcParentFromAtId = function getNcParentFromAtId(aId) { |
|
||||||
return mapTbl[aId]?.ncParent; |
|
||||||
}; |
|
||||||
|
|
||||||
// get nc-title from airtable ID
|
|
||||||
export const getNcNameFromAtId = function getNcNameFromAtId(aId) { |
|
||||||
return mapTbl[aId]?.ncName; |
|
||||||
}; |
|
@ -1,35 +0,0 @@ |
|||||||
import Emittery from 'emittery'; |
|
||||||
import JobsMgr from './JobsMgr'; |
|
||||||
|
|
||||||
export default class EmitteryJobsMgr extends JobsMgr { |
|
||||||
emitter: Emittery; |
|
||||||
|
|
||||||
constructor() { |
|
||||||
super(); |
|
||||||
this.emitter = new Emittery(); |
|
||||||
} |
|
||||||
|
|
||||||
add(jobName: string, payload: any): Promise<any> { |
|
||||||
return this.emitter.emit(jobName, payload); |
|
||||||
} |
|
||||||
|
|
||||||
addJobWorker( |
|
||||||
jobName: string, |
|
||||||
workerFn: ( |
|
||||||
payload: any, |
|
||||||
progressCbk?: (payload: any, msg?: string) => void, |
|
||||||
) => void, |
|
||||||
) { |
|
||||||
this.emitter.on(jobName, async (payload) => { |
|
||||||
try { |
|
||||||
await workerFn(payload, (msg) => |
|
||||||
this.invokeProgressCbks(jobName, payload, msg), |
|
||||||
); |
|
||||||
await this.invokeSuccessCbks(jobName, payload); |
|
||||||
} catch (e) { |
|
||||||
console.log(e); |
|
||||||
await this.invokeFailureCbks(jobName, payload, e); |
|
||||||
} |
|
||||||
}); |
|
||||||
} |
|
||||||
} |
|
@ -1,67 +0,0 @@ |
|||||||
export default abstract class JobsMgr { |
|
||||||
protected successCbks: Array<{ |
|
||||||
[jobName: string]: (payload: any) => void; |
|
||||||
}> = []; |
|
||||||
protected failureCbks: Array<{ |
|
||||||
[jobName: string]: (payload: any, error: Error) => void; |
|
||||||
}> = []; |
|
||||||
protected progressCbks: Array<{ |
|
||||||
[jobName: string]: (payload: any, msg?: string) => void; |
|
||||||
}> = []; |
|
||||||
|
|
||||||
public abstract add<T>(jobName: string, payload: T): Promise<any>; |
|
||||||
|
|
||||||
public abstract addJobWorker( |
|
||||||
jobName: string, |
|
||||||
workerFn: ( |
|
||||||
payload: any, |
|
||||||
progressCbk?: (payload: any, msg?: string) => void, |
|
||||||
) => void, |
|
||||||
options?: { |
|
||||||
onSuccess?: (payload: any) => void; |
|
||||||
onFailure?: (payload: any, errorData: any) => void; |
|
||||||
onProgress?: (payload: any, progressData: any) => void; |
|
||||||
}, |
|
||||||
); |
|
||||||
|
|
||||||
addSuccessCbk(jobName: string, cbk: (payload: any) => void) { |
|
||||||
this.successCbks[jobName] = this.successCbks[jobName] || []; |
|
||||||
this.successCbks[jobName].push(cbk); |
|
||||||
} |
|
||||||
|
|
||||||
addFailureCbk(jobName: string, cbk: (payload: any, errorData: any) => void) { |
|
||||||
this.failureCbks[jobName] = this.failureCbks[jobName] || []; |
|
||||||
this.failureCbks[jobName].push(cbk); |
|
||||||
} |
|
||||||
addProgressCbk( |
|
||||||
jobName: string, |
|
||||||
cbk: (payload: any, progressData: any) => void, |
|
||||||
) { |
|
||||||
this.progressCbks[jobName] = this.progressCbks[jobName] || []; |
|
||||||
this.progressCbks[jobName].push(cbk); |
|
||||||
} |
|
||||||
|
|
||||||
protected async invokeSuccessCbks(jobName: string, payload: any) { |
|
||||||
await Promise.all( |
|
||||||
this.successCbks?.[jobName]?.map((cb) => cb(payload)) || [], |
|
||||||
); |
|
||||||
} |
|
||||||
protected async invokeFailureCbks( |
|
||||||
jobName: string, |
|
||||||
payload: any, |
|
||||||
error?: Error, |
|
||||||
) { |
|
||||||
await Promise.all( |
|
||||||
this.failureCbks?.[jobName]?.map((cb) => cb(payload, error)) || [], |
|
||||||
); |
|
||||||
} |
|
||||||
protected async invokeProgressCbks( |
|
||||||
jobName: string, |
|
||||||
payload: any, |
|
||||||
data?: any, |
|
||||||
) { |
|
||||||
await Promise.all( |
|
||||||
this.progressCbks?.[jobName]?.map((cb) => cb(payload, data)) || [], |
|
||||||
); |
|
||||||
} |
|
||||||
} |
|
@ -1,20 +0,0 @@ |
|||||||
import EmitteryJobsMgr from './EmitteryJobsMgr'; |
|
||||||
import RedisJobsMgr from './RedisJobsMgr'; |
|
||||||
import type JobsMgr from './JobsMgr'; |
|
||||||
|
|
||||||
export default class NocoJobs { |
|
||||||
private static client: JobsMgr; |
|
||||||
|
|
||||||
private static init() { |
|
||||||
if (process.env.NC_REDIS_URL) { |
|
||||||
this.client = new RedisJobsMgr(process.env.NC_REDIS_URL); |
|
||||||
} else { |
|
||||||
this.client = new EmitteryJobsMgr(); |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
public static get jobsMgr(): JobsMgr { |
|
||||||
if (!this.client) this.init(); |
|
||||||
return this.client; |
|
||||||
} |
|
||||||
} |
|
@ -1,56 +0,0 @@ |
|||||||
import { Queue, Worker } from 'bullmq'; |
|
||||||
import Redis from 'ioredis'; |
|
||||||
import JobsMgr from './JobsMgr'; |
|
||||||
|
|
||||||
export default class RedisJobsMgr extends JobsMgr { |
|
||||||
queue: { [jobName: string]: Queue }; |
|
||||||
workers: { [jobName: string]: Worker }; |
|
||||||
connection: Redis; |
|
||||||
|
|
||||||
constructor(config: any) { |
|
||||||
super(); |
|
||||||
this.queue = {}; |
|
||||||
this.workers = {}; |
|
||||||
this.connection = new Redis(config, { |
|
||||||
maxRetriesPerRequest: null, |
|
||||||
}); |
|
||||||
} |
|
||||||
|
|
||||||
async add( |
|
||||||
jobName: string, |
|
||||||
payload: any, |
|
||||||
// options?: {
|
|
||||||
// onSuccess?: (payload: any) => void;
|
|
||||||
// onFailure?: (payload: any, msg: string) => void;
|
|
||||||
// onProgress?: (payload: any, msgOrData: any) => void;
|
|
||||||
// }
|
|
||||||
): Promise<any> { |
|
||||||
this.queue[jobName] = |
|
||||||
this.queue[jobName] || |
|
||||||
new Queue(jobName, { connection: this.connection }); |
|
||||||
this.queue[jobName].add(jobName, payload); |
|
||||||
} |
|
||||||
|
|
||||||
addJobWorker( |
|
||||||
jobName: string, |
|
||||||
workerFn: ( |
|
||||||
payload: any, |
|
||||||
progressCbk?: (payload: any, msg?: string) => void, |
|
||||||
) => void, |
|
||||||
) { |
|
||||||
this.workers[jobName] = new Worker( |
|
||||||
jobName, |
|
||||||
async (payload) => { |
|
||||||
try { |
|
||||||
await workerFn(payload.data, (...args) => |
|
||||||
this.invokeProgressCbks(jobName, ...args), |
|
||||||
); |
|
||||||
await this.invokeFailureCbks(jobName, payload.data); |
|
||||||
} catch (e) { |
|
||||||
await this.invokeFailureCbks(jobName, payload.data); |
|
||||||
} |
|
||||||
}, |
|
||||||
{ connection: this.connection }, |
|
||||||
); |
|
||||||
} |
|
||||||
} |
|
Loading…
Reference in new issue