From 9b4ea8864f731cbe7a1d3a6b139981a96ff8b2b7 Mon Sep 17 00:00:00 2001 From: Karl Prieb Date: Tue, 17 Sep 2024 12:43:08 -0300 Subject: [PATCH] feat(parquet-exporter): export tags to parquet --- docker-compose.yaml | 1 + docs/envs.md | 1 + flake.nix | 2 +- package.json | 2 +- src/routes/ar-io.ts | 6 +- src/workers/parquet-exporter.ts | 207 +++++++++++++++++--------------- yarn.lock | 9 +- 7 files changed, 124 insertions(+), 104 deletions(-) diff --git a/docker-compose.yaml b/docker-compose.yaml index cd8c4510..246aa354 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -36,6 +36,7 @@ services: - ${CONTIGUOUS_DATA_PATH:-./data/contiguous}:/app/data/contiguous - ${HEADERS_DATA_PATH:-./data/headers}:/app/data/headers - ${SQLITE_DATA_PATH:-./data/sqlite}:/app/data/sqlite + - ${DUCKDB_DATA_PATH:-./data/duckdb}:/app/data/duckdb - ${TEMP_DATA_PATH:-./data/tmp}:/app/data/tmp - ${LMDB_DATA_PATH:-./data/lmdb}:/app/data/lmdb environment: diff --git a/docs/envs.md b/docs/envs.md index 2dc43158..3b565300 100644 --- a/docs/envs.md +++ b/docs/envs.md @@ -37,6 +37,7 @@ This document describes the environment variables that can be used to configure | CONTIGUOUS_DATA_PATH | String | "./data/contiguous" | Sets the location for contiguous data to be saved. If omitted, contiguous data will be stored in the `data` directory | | HEADERS_DATA_PATH | String | "./data/headers" | Sets the location for header data to be saved. If omitted, header data will be stored in the `data` directory | | SQLITE_DATA_PATH | String | "./data/sqlite" | Sets the location for sqlite indexed data to be saved. If omitted, sqlite data will be stored in the `data` directory | +| DUCKDB_DATA_PATH | String | "./data/duckdb" | Sets the location for duckdb data to be saved. If omitted, duckdb data will be stored in the `data` directory | | TEMP_DATA_PATH | String | "./data/tmp" | Sets the location for temporary data to be saved. If omitted, temporary data will be stored in the `data` directory | | LMDB_DATA_PATH | String | "./data/LMDB" | Sets the location for LMDB data to be saved. If omitted, LMDB data will be stored in the `data` directory | | CHAIN_CACHE_TYPE | String | "redis" | Sets the method for caching chain data, defaults redis if gateway is started with docker-compose, otherwise defaults to LMDB | diff --git a/flake.nix b/flake.nix index a6567968..c91fb488 100644 --- a/flake.nix +++ b/flake.nix @@ -22,7 +22,7 @@ yaml-language-server openjdk sqlite-interactive - python3 + python311 duckdb ]; }; diff --git a/package.json b/package.json index 630aa928..b35efd18 100644 --- a/package.json +++ b/package.json @@ -21,7 +21,7 @@ "cors": "^2.8.5", "crypto": "^1.0.1", "dotenv": "^16.3.1", - "duckdb": "^1.0.0", + "duckdb-async": "^1.0.0", "express": "^4.18.1", "express-async-handler": "^1.2.0", "express-openapi-validator": "^5.1.2", diff --git a/src/routes/ar-io.ts b/src/routes/ar-io.ts index 3013ba67..7053f25a 100644 --- a/src/routes/ar-io.ts +++ b/src/routes/ar-io.ts @@ -234,10 +234,10 @@ arIoRouter.post( return; } - const parquetExporter = new ParquetExporter({ + const parquetExporter = await ParquetExporter.create({ log, - duckDbPath: './data/duckdb/tags.duckdb', - sqliteDbPath: './data/sqlite/bundles.db', + duckDbPath: 'data/duckdb/tags.duckdb', + sqliteDbPath: 'data/sqlite/bundles.db', }); await parquetExporter.exportDataItemTagsParquet({ diff --git a/src/workers/parquet-exporter.ts b/src/workers/parquet-exporter.ts index 3746a01b..e2cb7ce9 100644 --- a/src/workers/parquet-exporter.ts +++ b/src/workers/parquet-exporter.ts @@ -17,14 +17,26 @@ */ import { readFileSync, existsSync, mkdirSync } from 'node:fs'; import * as winston from 'winston'; -import duckdb from 'duckdb'; -// import duckdb, { RowData } from 'duckdb'; +import { Database } from 'duckdb-async'; export class ParquetExporter { private log: winston.Logger; - private db: duckdb.Database; + private db: Database; constructor({ + log, + db, + }: { + log: winston.Logger; + db: Database; + duckDbPath: string; + sqliteDbPath: string; + }) { + this.log = log; + this.db = db; + } + + static async create({ log, duckDbPath, sqliteDbPath, @@ -33,20 +45,20 @@ export class ParquetExporter { duckDbPath: string; sqliteDbPath: string; }) { - this.log = log.child({ class: this.constructor.name }); - this.db = new duckdb.Database(duckDbPath); + const logger = log.child({ class: this.constructor.name }); + const db = await Database.create(duckDbPath); const duckDbSchema = readFileSync( './src/database/duckdb/schema.sql', 'utf8', ); + await db.exec(duckDbSchema); + logger.debug('DuckDB schema created'); - this.db.run(duckDbSchema); - this.log.debug('DuckDB schema imported successfully'); - - this.db.exec(`INSTALL sqlite; LOAD sqlite;`); - this.db.exec(`ATTACH '${sqliteDbPath}' AS sqlite_db (TYPE sqlite);`); - this.log.debug('SQLite database attached successfully'); + await db.exec(`INSTALL sqlite; LOAD sqlite;`); + await db.exec(`ATTACH '${sqliteDbPath}' AS sqlite_db (TYPE sqlite);`); + logger.debug('SQLite loaded'); + return new ParquetExporter({ log: logger, db, duckDbPath, sqliteDbPath }); } async exportDataItemTagsParquet({ @@ -60,10 +72,7 @@ export class ParquetExporter { maxFileRows: number; outputDir: string; }) { - console.log(outputDir); - const sqliteQuery = ` - SELECT * FROM ( SELECT sdit.height, sdit.data_item_id AS id, @@ -81,106 +90,108 @@ export class ParquetExporter { JOIN sqlite_db.stable_data_items sdi ON sdit.data_item_id = sdi.id WHERE - sdit.height BETWEEN ${startHeight} AND ${endHeight} - UNION ALL - SELECT - ndit.height, - ndit.data_item_id AS id, - ndit.data_item_tag_index AS tag_index, - ndi.indexed_at AS created_at, - tn.name AS tag_name, - tv.value AS tag_value, - 1 AS is_data_item - FROM - sqlite_db.new_data_item_tags ndit - JOIN - sqlite_db.tag_names tn ON ndit.tag_name_hash = tn.hash - JOIN - sqlite_db.tag_values tv ON ndit.tag_value_hash = tv.hash - JOIN - sqlite_db.new_data_items ndi ON ndit.data_item_id = ndi.id - WHERE - ndit.height BETWEEN ${startHeight} AND ${endHeight} - ) AS combined_results - LIMIT ${maxFileRows}; + sdit.height BETWEEN ${startHeight} AND ${endHeight}; `; - try { - this.db.exec('BEGIN TRANSACTION;'); - - this.db.exec(`INSERT INTO tags ${sqliteQuery}`, (err: Error | null) => { - if (err) { - this.log.error('Error inserting data into DuckDB:', err); - throw err; - } - }); - - this.db.exec('COMMIT;'); - } catch (err) { - this.db.exec('ROLLBACK;'); - this.log.error('Error inserting data into DuckDB, rolling back:', err); - } finally { - this.log.info('Data imported into DuckDB tags table successfully'); - this.exportToParquet({ - outputDir, - tableName: 'tags', - minHeight: startHeight, - maxHeight: endHeight, - rowCount: maxFileRows, - }); - this.log.info('Data exported to Parquet file successfully'); - this.truncateTable('tags'); - this.db.close(); - } + await this.db.exec(`INSERT INTO tags ${sqliteQuery}`); + + this.log.debug('Data inserted into DuckDB'); + + await this.exportToParquet({ + outputDir, + tableName: 'tags', + startHeight, + endHeight, + maxFileRows, + }); + + await this.truncateTable('tags'); + + await this.db.close(); } private async exportToParquet({ outputDir, tableName, - minHeight, - maxHeight, - rowCount, + startHeight, + endHeight, + maxFileRows, }: { outputDir: string; tableName: string; - minHeight: number; - maxHeight: number; - rowCount: number; + startHeight: number; + endHeight: number; + maxFileRows: number; }): Promise { - const fileName = `${tableName}-minHeight:${minHeight}-maxHeight:${maxHeight}-rowCount:${rowCount}.parquet`; - const filePath = `${outputDir}/${fileName}`; - if (!existsSync(outputDir)) { mkdirSync(outputDir, { recursive: true }); } - return new Promise((resolve, reject) => { - this.db.exec( - `COPY tags TO '${filePath}' (FORMAT PARQUET);`, - (err: Error | null) => { - if (err) { - this.log.error(`Error exporting to Parquet file ${fileName}:`, err); - reject(err); - } else { - this.log.info(`Exported to Parquet file: ${fileName}`); - resolve(); - } - }, - ); - }); + let { minHeight, maxHeight } = await this.getHeightRange(tableName); + minHeight = Math.max(startHeight, minHeight); + maxHeight = Math.min(endHeight, maxHeight); + let rowCount = 0; + + this.log.info( + `Exporting Parquet file(s) for ${tableName} from height ${minHeight} to ${maxHeight}`, + ); + + for (let height = minHeight; height <= maxHeight; height++) { + const heightRowCount = await this.getRowCountForHeight(tableName, height); + rowCount += heightRowCount; + + if (rowCount >= maxFileRows || height === maxHeight) { + const fileName = `${tableName}-minHeight:${minHeight}-maxHeight:${height}-rowCount:${rowCount}.parquet`; + const filePath = `${outputDir}/${fileName}`; + + await this.db.exec(` + COPY ( + SELECT * FROM ${tableName} + WHERE height >= ${minHeight} AND height <= ${height} + ) TO '${filePath}' (FORMAT PARQUET, COMPRESSION 'zstd') + `); + + this.log.info(`Exported Parquet file: ${fileName}`); + + minHeight = height + 1; + rowCount = 0; + } + } + + this.log.info('Parquet export complete'); } - private async truncateTable(table: string): Promise { - return new Promise((resolve, reject) => { - this.db.exec(`TRUNCATE TABLE ${table};`, (err: Error | null) => { - if (err) { - this.log.error(`Error truncating ${table} table:`, err); - reject(err); - } else { - this.log.info(`${table} table truncated`); - resolve(); - } - }); - }); + private async getHeightRange( + tableName: string, + ): Promise<{ minHeight: number; maxHeight: number }> { + const query = ` + SELECT MIN(height) as min_height, MAX(height) as max_height + FROM ${tableName} + `; + const result = await this.db.all(query); + + return { + minHeight: result[0].min_height, + maxHeight: result[0].max_height, + }; + } + + private async getRowCountForHeight( + tableName: string, + height: number, + ): Promise { + const query = ` + SELECT COUNT(*) as count + FROM ${tableName} + WHERE height = ${height} + `; + const result = await this.db.all(query); + + return Number(result[0].count); + } + + private async truncateTable(tableName: string): Promise { + await this.db.exec(`TRUNCATE TABLE ${tableName};`); + await this.db.exec(`CHECKPOINT ${tableName};`); } } diff --git a/yarn.lock b/yarn.lock index 30dbd228..fab12a87 100644 --- a/yarn.lock +++ b/yarn.lock @@ -4211,7 +4211,14 @@ dotenv@^16.3.1: resolved "https://registry.yarnpkg.com/dotenv/-/dotenv-16.4.5.tgz#cdd3b3b604cb327e286b4762e13502f717cb099f" integrity sha512-ZmdL2rui+eB2YwhsWzjInR8LldtZHGDoQ1ugH85ppHKwpUHL7j7rN0Ti9NCnGiQbhaZ11FpR+7ao1dNsmduNUg== -duckdb@^1.0.0: +duckdb-async@^1.0.0: + version "1.0.0" + resolved "https://registry.yarnpkg.com/duckdb-async/-/duckdb-async-1.0.0.tgz#0dd51f03404cec4a1ab0e8fa5941c756479122f4" + integrity sha512-1UTAZ2LVk9s4NPMiLnB9L2DI716I/LqtDYL8I7RFEP981/fo0SaqcRQFwtgPPz8MyEPcx8QPPmAWEsal5q68xQ== + dependencies: + duckdb "1.0.0" + +duckdb@1.0.0: version "1.0.0" resolved "https://registry.yarnpkg.com/duckdb/-/duckdb-1.0.0.tgz#de81d9b93311bb816901b582de27ae75fa0e20e2" integrity sha512-QwpcIeN42A2lL19S70mUFibZgRcEcZpCkKHdzDgecHaYZhXj3+1i2cxSDyAk/RVg5CYnqj1Dp4jAuN4cc80udA==