diff --git a/src/workers/parquet-exporter.ts b/src/workers/parquet-exporter.ts index 49d22c2e..a6f71c2b 100644 --- a/src/workers/parquet-exporter.ts +++ b/src/workers/parquet-exporter.ts @@ -22,11 +22,21 @@ import { Database } from 'duckdb-async'; export class ParquetExporter { private log: winston.Logger; private db: Database; + private duckDbPath: string; private isExporting = false; - constructor({ log, db }: { log: winston.Logger; db: Database }) { + constructor({ + log, + db, + duckDbPath, + }: { + log: winston.Logger; + db: Database; + duckDbPath: string; + }) { this.log = log; this.db = db; + this.duckDbPath = duckDbPath; } static async create({ @@ -78,6 +88,7 @@ export class ParquetExporter { return new ParquetExporter({ log: logger, db, + duckDbPath, }); } @@ -88,6 +99,7 @@ export class ParquetExporter { startHeight: number; endHeight: number; }) { + const log = this.log.child({ method: 'importBlocks' }); const query = ` SELECT indep_hash, @@ -108,7 +120,7 @@ export class ParquetExporter { try { await this.db.exec(`INSERT INTO blocks ${query}`); - this.log.info('Blocks inserted into DuckDB'); + log.info('Blocks inserted into DuckDB'); } catch (error) { throw `Error importing blocks: ${error}`; } @@ -121,43 +133,49 @@ export class ParquetExporter { startHeight: number; endHeight: number; }) { + const log = this.log.child({ method: 'importTransactions' }); const query = ` SELECT - id, + st.id, NULL AS indexed_at, - block_transaction_index, + st.block_transaction_index, 0 AS is_data_item, - target, - quantity, - reward, - last_tx as anchor, - data_size, - content_type, - format, - height, - owner_address, - data_root, + st.target, + st.quantity, + st.reward, + st.last_tx as anchor, + st.data_size, + st.content_type, + st.format, + st.height, + st.owner_address, + st.data_root, NULL AS parent, - "offset", + st."offset", NULL AS size, NULL AS data_offset, NULL AS owner_offset, NULL AS owner_size, - NULL AS owner, + CASE + WHEN octet_length(w.public_modulus) <= 64 THEN w.public_modulus + ELSE NULL + END AS owner, NULL AS signature_offset, NULL AS signature_size, NULL AS signature_type FROM - sqlite_core_db.stable_transactions + sqlite_core_db.stable_transactions st + LEFT JOIN + sqlite_core_db.wallets w ON st.owner_address = w.address WHERE - height BETWEEN ${startHeight} AND ${endHeight} + st.height BETWEEN ${startHeight} AND ${endHeight} ORDER BY - height ASC; + st.height ASC; `; try { await this.db.exec(`INSERT INTO transactions ${query}`); - this.log.info('Transactions inserted into DuckDB'); + log.info('Transactions inserted into DuckDB'); } catch (error) { throw `Error importing transactions: ${error}`; } @@ -170,43 +188,49 @@ export class ParquetExporter { startHeight: number; endHeight: number; }) { + const log = this.log.child({ method: 'importDataItems' }); const query = ` SELECT - id, - indexed_at, + sdi.id, + sdi.indexed_at, NULL AS block_transaction_index, 1 AS is_data_item, - target, + sdi.target, NULL AS quantity, NULL AS reward, - anchor, - data_size, - content_type, + sdi.anchor, + sdi.data_size, + sdi.content_type, NULL AS format, - height, - owner_address, + sdi.height, + sdi.owner_address, NULL AS data_root, - parent_id AS parent, - "offset", - size, - data_offset, - owner_offset, - owner_size, - NULL AS owner, - signature_offset, - signature_size, - signature_type + sdi.parent_id AS parent, + sdi."offset", + sdi.size, + sdi.data_offset, + sdi.owner_offset, + sdi.owner_size, + CASE + WHEN octet_length(w.public_modulus) <= 64 THEN w.public_modulus + ELSE NULL + END AS owner, + sdi.signature_offset, + sdi.signature_size, + sdi.signature_type FROM - sqlite_bundles_db.stable_data_items + sqlite_bundles_db.stable_data_items sdi + LEFT JOIN + sqlite_bundles_db.wallets w ON sdi.owner_address = w.address WHERE - height BETWEEN ${startHeight} AND ${endHeight} + sdi.height BETWEEN ${startHeight} AND ${endHeight} ORDER BY - height ASC; + sdi.height ASC; `; try { await this.db.exec(`INSERT INTO transactions ${query}`); - this.log.info('Data items inserted into DuckDB'); + log.info('Data items inserted into DuckDB'); } catch (error) { throw `Error importing data items: ${error}`; } @@ -219,6 +243,7 @@ export class ParquetExporter { startHeight: number; endHeight: number; }) { + const log = this.log.child({ method: 'importTransactionTags' }); const query = ` SELECT stt.height, @@ -242,7 +267,7 @@ export class ParquetExporter { try { await this.db.exec(`INSERT INTO tags ${query}`); - this.log.info('Transaction tags inserted into DuckDB'); + log.info('Transaction tags inserted into DuckDB'); } catch (error) { throw `Error importing transaction tags: ${error}`; } @@ -255,6 +280,7 @@ export class ParquetExporter { startHeight: number; endHeight: number; }) { + const log = this.log.child({ method: 'importDataItemTags' }); const query = ` SELECT sdit.height, @@ -280,7 +306,7 @@ export class ParquetExporter { try { await this.db.exec(`INSERT INTO tags ${query}`); - this.log.info('Data item tags inserted into DuckDB'); + log.info('Data item tags inserted into DuckDB'); } catch (error) { throw `Error importing data item tags: ${error}`; } @@ -299,6 +325,8 @@ export class ParquetExporter { endHeight: number; maxFileRows: number; }): Promise { + const log = this.log.child({ method: 'exportToParquet' }); + if (!existsSync(outputDir)) { mkdirSync(outputDir, { recursive: true }); } @@ -309,7 +337,7 @@ export class ParquetExporter { maxHeight = maxHeight < BigInt(endHeight) ? maxHeight : BigInt(endHeight); let rowCount = 0n; - this.log.info( + log.info( `Exporting Parquet file(s) for ${tableName} from height ${minHeight} to ${maxHeight}`, ); @@ -329,7 +357,7 @@ export class ParquetExporter { ) TO '${filePath}' (FORMAT PARQUET, COMPRESSION 'zstd') `); - this.log.info(`Exported Parquet file: ${fileName}`); + log.info(`Exported Parquet file: ${fileName}`); minHeight = height + 1n; rowCount = 0n; @@ -339,7 +367,7 @@ export class ParquetExporter { } } - this.log.info(`Parquet export for ${tableName} complete`); + log.info(`Parquet export for ${tableName} complete`); } async export({ @@ -353,19 +381,21 @@ export class ParquetExporter { endHeight: number; maxFileRows: number; }): Promise { + const log = this.log.child({ method: 'export' }); + if (this.isExporting) { - this.log.error('An export is already in progress'); + log.error('An export is already in progress'); return; } this.isExporting = true; if (startHeight > endHeight) { - this.log.error('startHeight must be less than or equal to endHeight'); + log.error('startHeight must be less than or equal to endHeight'); return; } if (maxFileRows <= 0) { - this.log.error('maxFileRows must be a positive number'); + log.error('maxFileRows must be a positive number'); return; } @@ -375,30 +405,11 @@ export class ParquetExporter { try { // Import data into DuckDB - await this.importBlocks({ - startHeight, - endHeight, - }); - - await this.importTransactions({ - startHeight, - endHeight, - }); - - await this.importDataItems({ - startHeight, - endHeight, - }); - - await this.importTransactionTags({ - startHeight, - endHeight, - }); - - await this.importDataItemTags({ - startHeight, - endHeight, - }); + await this.importBlocks({ startHeight, endHeight }); + await this.importTransactions({ startHeight, endHeight }); + await this.importDataItems({ startHeight, endHeight }); + await this.importTransactionTags({ startHeight, endHeight }); + await this.importDataItemTags({ startHeight, endHeight }); // Export data to Parquet files await this.exportToParquet({ @@ -424,16 +435,21 @@ export class ParquetExporter { endHeight, maxFileRows, }); + + log.info('Parquet export complete'); } catch (error) { - this.log.error('Error exporting Parquet files:', error); + log.error('Error exporting Parquet files:', { + error: error instanceof Error ? error.message : String(error), + stack: error instanceof Error ? error.stack : undefined, + }); } finally { await this.db.close(); - // Delete the output folder + // Delete the duckdb file try { - rmSync(outputDir, { recursive: true, force: true }); + rmSync(this.duckDbPath, { recursive: true, force: true }); } catch (error) { - this.log.error(`Error deleting duckdb folder ${outputDir}:`, error); + log.error(`Error deleting duckdb file ${this.duckDbPath}:`, error); } this.isExporting = false; }