Skip to content

Commit

Permalink
feat(parquet-exporter): export tags to parquet
Browse files Browse the repository at this point in the history
  • Loading branch information
karlprieb committed Sep 17, 2024
1 parent 059cf4d commit 9b4ea88
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 104 deletions.
1 change: 1 addition & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ services:
- ${CONTIGUOUS_DATA_PATH:-./data/contiguous}:/app/data/contiguous
- ${HEADERS_DATA_PATH:-./data/headers}:/app/data/headers
- ${SQLITE_DATA_PATH:-./data/sqlite}:/app/data/sqlite
- ${DUCKDB_DATA_PATH:-./data/duckdb}:/app/data/duckdb
- ${TEMP_DATA_PATH:-./data/tmp}:/app/data/tmp
- ${LMDB_DATA_PATH:-./data/lmdb}:/app/data/lmdb
environment:
Expand Down
1 change: 1 addition & 0 deletions docs/envs.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ This document describes the environment variables that can be used to configure
| CONTIGUOUS_DATA_PATH | String | "./data/contiguous" | Sets the location for contiguous data to be saved. If omitted, contiguous data will be stored in the `data` directory |
| HEADERS_DATA_PATH | String | "./data/headers" | Sets the location for header data to be saved. If omitted, header data will be stored in the `data` directory |
| SQLITE_DATA_PATH | String | "./data/sqlite" | Sets the location for sqlite indexed data to be saved. If omitted, sqlite data will be stored in the `data` directory |
| DUCKDB_DATA_PATH | String | "./data/duckdb" | Sets the location for duckdb data to be saved. If omitted, duckdb data will be stored in the `data` directory |
| TEMP_DATA_PATH | String | "./data/tmp" | Sets the location for temporary data to be saved. If omitted, temporary data will be stored in the `data` directory |
| LMDB_DATA_PATH | String | "./data/LMDB" | Sets the location for LMDB data to be saved. If omitted, LMDB data will be stored in the `data` directory |
| CHAIN_CACHE_TYPE | String | "redis" | Sets the method for caching chain data, defaults redis if gateway is started with docker-compose, otherwise defaults to LMDB |
Expand Down
2 changes: 1 addition & 1 deletion flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
yaml-language-server
openjdk
sqlite-interactive
python3
python311
duckdb
];
};
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
"cors": "^2.8.5",
"crypto": "^1.0.1",
"dotenv": "^16.3.1",
"duckdb": "^1.0.0",
"duckdb-async": "^1.0.0",
"express": "^4.18.1",
"express-async-handler": "^1.2.0",
"express-openapi-validator": "^5.1.2",
Expand Down
6 changes: 3 additions & 3 deletions src/routes/ar-io.ts
Original file line number Diff line number Diff line change
Expand Up @@ -234,10 +234,10 @@ arIoRouter.post(
return;
}

const parquetExporter = new ParquetExporter({
const parquetExporter = await ParquetExporter.create({
log,
duckDbPath: './data/duckdb/tags.duckdb',
sqliteDbPath: './data/sqlite/bundles.db',
duckDbPath: 'data/duckdb/tags.duckdb',
sqliteDbPath: 'data/sqlite/bundles.db',
});

await parquetExporter.exportDataItemTagsParquet({
Expand Down
207 changes: 109 additions & 98 deletions src/workers/parquet-exporter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,26 @@
*/
import { readFileSync, existsSync, mkdirSync } from 'node:fs';
import * as winston from 'winston';
import duckdb from 'duckdb';
// import duckdb, { RowData } from 'duckdb';
import { Database } from 'duckdb-async';

export class ParquetExporter {
private log: winston.Logger;
private db: duckdb.Database;
private db: Database;

constructor({
log,
db,
}: {
log: winston.Logger;
db: Database;
duckDbPath: string;
sqliteDbPath: string;
}) {
this.log = log;
this.db = db;
}

static async create({
log,
duckDbPath,
sqliteDbPath,
Expand All @@ -33,20 +45,20 @@ export class ParquetExporter {
duckDbPath: string;
sqliteDbPath: string;
}) {
this.log = log.child({ class: this.constructor.name });
this.db = new duckdb.Database(duckDbPath);
const logger = log.child({ class: this.constructor.name });
const db = await Database.create(duckDbPath);

const duckDbSchema = readFileSync(
'./src/database/duckdb/schema.sql',
'utf8',
);
await db.exec(duckDbSchema);
logger.debug('DuckDB schema created');

this.db.run(duckDbSchema);
this.log.debug('DuckDB schema imported successfully');

this.db.exec(`INSTALL sqlite; LOAD sqlite;`);
this.db.exec(`ATTACH '${sqliteDbPath}' AS sqlite_db (TYPE sqlite);`);
this.log.debug('SQLite database attached successfully');
await db.exec(`INSTALL sqlite; LOAD sqlite;`);
await db.exec(`ATTACH '${sqliteDbPath}' AS sqlite_db (TYPE sqlite);`);
logger.debug('SQLite loaded');
return new ParquetExporter({ log: logger, db, duckDbPath, sqliteDbPath });
}

async exportDataItemTagsParquet({
Expand All @@ -60,10 +72,7 @@ export class ParquetExporter {
maxFileRows: number;
outputDir: string;
}) {
console.log(outputDir);

const sqliteQuery = `
SELECT * FROM (
SELECT
sdit.height,
sdit.data_item_id AS id,
Expand All @@ -81,106 +90,108 @@ export class ParquetExporter {
JOIN
sqlite_db.stable_data_items sdi ON sdit.data_item_id = sdi.id
WHERE
sdit.height BETWEEN ${startHeight} AND ${endHeight}
UNION ALL
SELECT
ndit.height,
ndit.data_item_id AS id,
ndit.data_item_tag_index AS tag_index,
ndi.indexed_at AS created_at,
tn.name AS tag_name,
tv.value AS tag_value,
1 AS is_data_item
FROM
sqlite_db.new_data_item_tags ndit
JOIN
sqlite_db.tag_names tn ON ndit.tag_name_hash = tn.hash
JOIN
sqlite_db.tag_values tv ON ndit.tag_value_hash = tv.hash
JOIN
sqlite_db.new_data_items ndi ON ndit.data_item_id = ndi.id
WHERE
ndit.height BETWEEN ${startHeight} AND ${endHeight}
) AS combined_results
LIMIT ${maxFileRows};
sdit.height BETWEEN ${startHeight} AND ${endHeight};
`;

try {
this.db.exec('BEGIN TRANSACTION;');

this.db.exec(`INSERT INTO tags ${sqliteQuery}`, (err: Error | null) => {
if (err) {
this.log.error('Error inserting data into DuckDB:', err);
throw err;
}
});

this.db.exec('COMMIT;');
} catch (err) {
this.db.exec('ROLLBACK;');
this.log.error('Error inserting data into DuckDB, rolling back:', err);
} finally {
this.log.info('Data imported into DuckDB tags table successfully');
this.exportToParquet({
outputDir,
tableName: 'tags',
minHeight: startHeight,
maxHeight: endHeight,
rowCount: maxFileRows,
});
this.log.info('Data exported to Parquet file successfully');
this.truncateTable('tags');
this.db.close();
}
await this.db.exec(`INSERT INTO tags ${sqliteQuery}`);

this.log.debug('Data inserted into DuckDB');

await this.exportToParquet({
outputDir,
tableName: 'tags',
startHeight,
endHeight,
maxFileRows,
});

await this.truncateTable('tags');

await this.db.close();
}

private async exportToParquet({
outputDir,
tableName,
minHeight,
maxHeight,
rowCount,
startHeight,
endHeight,
maxFileRows,
}: {
outputDir: string;
tableName: string;
minHeight: number;
maxHeight: number;
rowCount: number;
startHeight: number;
endHeight: number;
maxFileRows: number;
}): Promise<void> {
const fileName = `${tableName}-minHeight:${minHeight}-maxHeight:${maxHeight}-rowCount:${rowCount}.parquet`;
const filePath = `${outputDir}/${fileName}`;

if (!existsSync(outputDir)) {
mkdirSync(outputDir, { recursive: true });
}

return new Promise((resolve, reject) => {
this.db.exec(
`COPY tags TO '${filePath}' (FORMAT PARQUET);`,
(err: Error | null) => {
if (err) {
this.log.error(`Error exporting to Parquet file ${fileName}:`, err);
reject(err);
} else {
this.log.info(`Exported to Parquet file: ${fileName}`);
resolve();
}
},
);
});
let { minHeight, maxHeight } = await this.getHeightRange(tableName);
minHeight = Math.max(startHeight, minHeight);
maxHeight = Math.min(endHeight, maxHeight);
let rowCount = 0;

this.log.info(
`Exporting Parquet file(s) for ${tableName} from height ${minHeight} to ${maxHeight}`,
);

for (let height = minHeight; height <= maxHeight; height++) {
const heightRowCount = await this.getRowCountForHeight(tableName, height);
rowCount += heightRowCount;

if (rowCount >= maxFileRows || height === maxHeight) {
const fileName = `${tableName}-minHeight:${minHeight}-maxHeight:${height}-rowCount:${rowCount}.parquet`;
const filePath = `${outputDir}/${fileName}`;

await this.db.exec(`
COPY (
SELECT * FROM ${tableName}
WHERE height >= ${minHeight} AND height <= ${height}
) TO '${filePath}' (FORMAT PARQUET, COMPRESSION 'zstd')
`);

this.log.info(`Exported Parquet file: ${fileName}`);

minHeight = height + 1;
rowCount = 0;
}
}

this.log.info('Parquet export complete');
}

private async truncateTable(table: string): Promise<void> {
return new Promise((resolve, reject) => {
this.db.exec(`TRUNCATE TABLE ${table};`, (err: Error | null) => {
if (err) {
this.log.error(`Error truncating ${table} table:`, err);
reject(err);
} else {
this.log.info(`${table} table truncated`);
resolve();
}
});
});
private async getHeightRange(
tableName: string,
): Promise<{ minHeight: number; maxHeight: number }> {
const query = `
SELECT MIN(height) as min_height, MAX(height) as max_height
FROM ${tableName}
`;
const result = await this.db.all(query);

return {
minHeight: result[0].min_height,
maxHeight: result[0].max_height,
};
}

private async getRowCountForHeight(
tableName: string,
height: number,
): Promise<number> {
const query = `
SELECT COUNT(*) as count
FROM ${tableName}
WHERE height = ${height}
`;
const result = await this.db.all(query);

return Number(result[0].count);
}

private async truncateTable(tableName: string): Promise<void> {
await this.db.exec(`TRUNCATE TABLE ${tableName};`);
await this.db.exec(`CHECKPOINT ${tableName};`);
}
}
9 changes: 8 additions & 1 deletion yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -4211,7 +4211,14 @@ dotenv@^16.3.1:
resolved "https://registry.yarnpkg.com/dotenv/-/dotenv-16.4.5.tgz#cdd3b3b604cb327e286b4762e13502f717cb099f"
integrity sha512-ZmdL2rui+eB2YwhsWzjInR8LldtZHGDoQ1ugH85ppHKwpUHL7j7rN0Ti9NCnGiQbhaZ11FpR+7ao1dNsmduNUg==

duckdb@^1.0.0:
duckdb-async@^1.0.0:
version "1.0.0"
resolved "https://registry.yarnpkg.com/duckdb-async/-/duckdb-async-1.0.0.tgz#0dd51f03404cec4a1ab0e8fa5941c756479122f4"
integrity sha512-1UTAZ2LVk9s4NPMiLnB9L2DI716I/LqtDYL8I7RFEP981/fo0SaqcRQFwtgPPz8MyEPcx8QPPmAWEsal5q68xQ==
dependencies:
duckdb "1.0.0"

[email protected]:
version "1.0.0"
resolved "https://registry.yarnpkg.com/duckdb/-/duckdb-1.0.0.tgz#de81d9b93311bb816901b582de27ae75fa0e20e2"
integrity sha512-QwpcIeN42A2lL19S70mUFibZgRcEcZpCkKHdzDgecHaYZhXj3+1i2cxSDyAk/RVg5CYnqj1Dp4jAuN4cc80udA==
Expand Down

0 comments on commit 9b4ea88

Please sign in to comment.