Browse Source

refactor: comments and spacing

Signed-off-by: mertmit <mertmit99@gmail.com>
pull/5954/head
mertmit 1 year ago
parent
commit
98762eecd2
  1. 35
      packages/nocodb/src/modules/jobs/jobs/at-import/helpers/readAndProcessData.ts

35
packages/nocodb/src/modules/jobs/jobs/at-import/helpers/readAndProcessData.ts

@ -9,7 +9,7 @@ import type { AirtableBase } from 'airtable/lib/airtable_base';
import type { TableType } from 'nocodb-sdk';
const BULK_DATA_BATCH_COUNT = 20; // check size for every 100 records
const BULK_DATA_BATCH_SIZE = 51200; // in bytes
const BULK_DATA_BATCH_SIZE = 50 * 1024; // in bytes
const BULK_PARALLEL_PROCESS = 5;
interface AirtableImportContext {
@ -43,6 +43,12 @@ async function readAllData({
.eachPage(
async function page(records, fetchNextPage) {
if (!data) {
/*
EntityMap is a sqlite3 table dynamically populated based on json data provided
It is used to store data temporarily and then stream it in bulk to import
This is done to avoid memory issues - heap out of memory - while importing large data
*/
data = new EntityMap();
await data.init();
}
@ -97,8 +103,8 @@ export async function importData({
services: AirtableImportContext;
}): Promise<EntityMap> {
try {
// @ts-ignore
const records = await readAllData({
// returns EntityMap which allows us to stream data
const records: EntityMap = await readAllData({
table,
base,
logDetailed,
@ -109,10 +115,14 @@ export async function importData({
const readable = records.getStream();
const allRecordsCount = await records.getCount();
const promises = [];
let tempData = [];
let importedCount = 0;
let activeProcess = 0;
let tempCount = 0;
// we keep track of active process to pause and resume the stream as we have async calls within the stream and we don't want to load all data in memory
let activeProcess = 0;
readable.on('data', async (record) => {
promises.push(
new Promise(async (resolve) => {
@ -148,6 +158,7 @@ export async function importData({
allRecordsCount,
)}`,
);
importedCount += insertArray.length;
insertArray = [];
@ -162,7 +173,10 @@ export async function importData({
);
});
readable.on('end', async () => {
// ensure all chunks are processed
await Promise.all(promises);
// insert remaining data
if (tempData.length > 0) {
await services.bulkDataService.bulkDataInsert({
projectName,
@ -182,6 +196,7 @@ export async function importData({
importedCount += tempData.length;
tempData = [];
}
resolve(true);
});
});
@ -230,7 +245,7 @@ export async function importLTARData({
curCol: { title?: string };
refCol: { title?: string };
}> = [];
const allData =
const allData: EntityMap =
records ||
(await readAllData({
table,
@ -288,12 +303,13 @@ export async function importLTARData({
for await (const assocMeta of assocTableMetas) {
let assocTableData = [];
let importedCount = 0;
let tempCount = 0;
// extract insert data from records
// extract link data from records
await new Promise((resolve) => {
const promises = [];
const readable = allData.getStream();
let tempCount = 0;
readable.on('data', async (record) => {
promises.push(
new Promise(async (resolve) => {
@ -318,6 +334,7 @@ export async function importLTARData({
0,
assocTableData.length,
);
logBasic(
`:: Importing '${
table.title
@ -346,7 +363,10 @@ export async function importLTARData({
);
});
readable.on('end', async () => {
// ensure all chunks are processed
await Promise.all(promises);
// insert remaining data
if (assocTableData.length >= 0) {
logBasic(
`:: Importing '${
@ -367,6 +387,7 @@ export async function importLTARData({
importedCount += assocTableData.length;
assocTableData = [];
}
resolve(true);
});
});

Loading…
Cancel
Save