Skip to content

Commit

Permalink
chore(parquet-exporter): insert owner if it's smaller or equal to 64 …
Browse files Browse the repository at this point in the history
…bytes
  • Loading branch information
karlprieb committed Sep 26, 2024
1 parent 8637b1e commit 117dfda
Showing 1 changed file with 94 additions and 78 deletions.
172 changes: 94 additions & 78 deletions src/workers/parquet-exporter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,21 @@ import { Database } from 'duckdb-async';
export class ParquetExporter {
private log: winston.Logger;
private db: Database;
private duckDbPath: string;
private isExporting = false;

constructor({ log, db }: { log: winston.Logger; db: Database }) {
constructor({
log,
db,
duckDbPath,
}: {
log: winston.Logger;
db: Database;
duckDbPath: string;
}) {
this.log = log;
this.db = db;
this.duckDbPath = duckDbPath;
}

static async create({
Expand Down Expand Up @@ -78,6 +88,7 @@ export class ParquetExporter {
return new ParquetExporter({
log: logger,
db,
duckDbPath,
});
}

Expand All @@ -88,6 +99,7 @@ export class ParquetExporter {
startHeight: number;
endHeight: number;
}) {
const log = this.log.child({ method: 'importBlocks' });
const query = `
SELECT
indep_hash,
Expand All @@ -108,7 +120,7 @@ export class ParquetExporter {

try {
await this.db.exec(`INSERT INTO blocks ${query}`);
this.log.info('Blocks inserted into DuckDB');
log.info('Blocks inserted into DuckDB');
} catch (error) {
throw `Error importing blocks: ${error}`;
}
Expand All @@ -121,43 +133,49 @@ export class ParquetExporter {
startHeight: number;
endHeight: number;
}) {
const log = this.log.child({ method: 'importTransactions' });
const query = `
SELECT
id,
st.id,
NULL AS indexed_at,
block_transaction_index,
st.block_transaction_index,
0 AS is_data_item,
target,
quantity,
reward,
last_tx as anchor,
data_size,
content_type,
format,
height,
owner_address,
data_root,
st.target,
st.quantity,
st.reward,
st.last_tx as anchor,
st.data_size,
st.content_type,
st.format,
st.height,
st.owner_address,
st.data_root,
NULL AS parent,
"offset",
st."offset",
NULL AS size,
NULL AS data_offset,
NULL AS owner_offset,
NULL AS owner_size,
NULL AS owner,
CASE
WHEN octet_length(w.public_modulus) <= 64 THEN w.public_modulus
ELSE NULL
END AS owner,
NULL AS signature_offset,
NULL AS signature_size,
NULL AS signature_type
FROM
sqlite_core_db.stable_transactions
sqlite_core_db.stable_transactions st
LEFT JOIN
sqlite_core_db.wallets w ON st.owner_address = w.address
WHERE
height BETWEEN ${startHeight} AND ${endHeight}
st.height BETWEEN ${startHeight} AND ${endHeight}
ORDER BY
height ASC;
st.height ASC;
`;

try {
await this.db.exec(`INSERT INTO transactions ${query}`);
this.log.info('Transactions inserted into DuckDB');
log.info('Transactions inserted into DuckDB');
} catch (error) {
throw `Error importing transactions: ${error}`;
}
Expand All @@ -170,43 +188,49 @@ export class ParquetExporter {
startHeight: number;
endHeight: number;
}) {
const log = this.log.child({ method: 'importDataItems' });
const query = `
SELECT
id,
indexed_at,
sdi.id,
sdi.indexed_at,
NULL AS block_transaction_index,
1 AS is_data_item,
target,
sdi.target,
NULL AS quantity,
NULL AS reward,
anchor,
data_size,
content_type,
sdi.anchor,
sdi.data_size,
sdi.content_type,
NULL AS format,
height,
owner_address,
sdi.height,
sdi.owner_address,
NULL AS data_root,
parent_id AS parent,
"offset",
size,
data_offset,
owner_offset,
owner_size,
NULL AS owner,
signature_offset,
signature_size,
signature_type
sdi.parent_id AS parent,
sdi."offset",
sdi.size,
sdi.data_offset,
sdi.owner_offset,
sdi.owner_size,
CASE
WHEN octet_length(w.public_modulus) <= 64 THEN w.public_modulus
ELSE NULL
END AS owner,
sdi.signature_offset,
sdi.signature_size,
sdi.signature_type
FROM
sqlite_bundles_db.stable_data_items
sqlite_bundles_db.stable_data_items sdi
LEFT JOIN
sqlite_bundles_db.wallets w ON sdi.owner_address = w.address
WHERE
height BETWEEN ${startHeight} AND ${endHeight}
sdi.height BETWEEN ${startHeight} AND ${endHeight}
ORDER BY
height ASC;
sdi.height ASC;
`;

try {
await this.db.exec(`INSERT INTO transactions ${query}`);
this.log.info('Data items inserted into DuckDB');
log.info('Data items inserted into DuckDB');
} catch (error) {
throw `Error importing data items: ${error}`;
}
Expand All @@ -219,6 +243,7 @@ export class ParquetExporter {
startHeight: number;
endHeight: number;
}) {
const log = this.log.child({ method: 'importTransactionTags' });
const query = `
SELECT
stt.height,
Expand All @@ -242,7 +267,7 @@ export class ParquetExporter {

try {
await this.db.exec(`INSERT INTO tags ${query}`);
this.log.info('Transaction tags inserted into DuckDB');
log.info('Transaction tags inserted into DuckDB');
} catch (error) {
throw `Error importing transaction tags: ${error}`;
}
Expand All @@ -255,6 +280,7 @@ export class ParquetExporter {
startHeight: number;
endHeight: number;
}) {
const log = this.log.child({ method: 'importDataItemTags' });
const query = `
SELECT
sdit.height,
Expand All @@ -280,7 +306,7 @@ export class ParquetExporter {

try {
await this.db.exec(`INSERT INTO tags ${query}`);
this.log.info('Data item tags inserted into DuckDB');
log.info('Data item tags inserted into DuckDB');
} catch (error) {
throw `Error importing data item tags: ${error}`;
}
Expand All @@ -299,6 +325,8 @@ export class ParquetExporter {
endHeight: number;
maxFileRows: number;
}): Promise<void> {
const log = this.log.child({ method: 'exportToParquet' });

if (!existsSync(outputDir)) {
mkdirSync(outputDir, { recursive: true });
}
Expand All @@ -309,7 +337,7 @@ export class ParquetExporter {
maxHeight = maxHeight < BigInt(endHeight) ? maxHeight : BigInt(endHeight);
let rowCount = 0n;

this.log.info(
log.info(
`Exporting Parquet file(s) for ${tableName} from height ${minHeight} to ${maxHeight}`,
);

Expand All @@ -329,7 +357,7 @@ export class ParquetExporter {
) TO '${filePath}' (FORMAT PARQUET, COMPRESSION 'zstd')
`);

this.log.info(`Exported Parquet file: ${fileName}`);
log.info(`Exported Parquet file: ${fileName}`);

minHeight = height + 1n;
rowCount = 0n;
Expand All @@ -339,7 +367,7 @@ export class ParquetExporter {
}
}

this.log.info(`Parquet export for ${tableName} complete`);
log.info(`Parquet export for ${tableName} complete`);
}

async export({
Expand All @@ -353,19 +381,21 @@ export class ParquetExporter {
endHeight: number;
maxFileRows: number;
}): Promise<void> {
const log = this.log.child({ method: 'export' });

if (this.isExporting) {
this.log.error('An export is already in progress');
log.error('An export is already in progress');
return;
}
this.isExporting = true;

if (startHeight > endHeight) {
this.log.error('startHeight must be less than or equal to endHeight');
log.error('startHeight must be less than or equal to endHeight');
return;
}

if (maxFileRows <= 0) {
this.log.error('maxFileRows must be a positive number');
log.error('maxFileRows must be a positive number');
return;
}

Expand All @@ -375,30 +405,11 @@ export class ParquetExporter {

try {
// Import data into DuckDB
await this.importBlocks({
startHeight,
endHeight,
});

await this.importTransactions({
startHeight,
endHeight,
});

await this.importDataItems({
startHeight,
endHeight,
});

await this.importTransactionTags({
startHeight,
endHeight,
});

await this.importDataItemTags({
startHeight,
endHeight,
});
await this.importBlocks({ startHeight, endHeight });
await this.importTransactions({ startHeight, endHeight });
await this.importDataItems({ startHeight, endHeight });
await this.importTransactionTags({ startHeight, endHeight });
await this.importDataItemTags({ startHeight, endHeight });

// Export data to Parquet files
await this.exportToParquet({
Expand All @@ -424,16 +435,21 @@ export class ParquetExporter {
endHeight,
maxFileRows,
});

log.info('Parquet export complete');
} catch (error) {
this.log.error('Error exporting Parquet files:', error);
log.error('Error exporting Parquet files:', {
error: error instanceof Error ? error.message : String(error),
stack: error instanceof Error ? error.stack : undefined,
});
} finally {
await this.db.close();

// Delete the output folder
// Delete the duckdb file
try {
rmSync(outputDir, { recursive: true, force: true });
rmSync(this.duckDbPath, { recursive: true, force: true });
} catch (error) {
this.log.error(`Error deleting duckdb folder ${outputDir}:`, error);
log.error(`Error deleting duckdb file ${this.duckDbPath}:`, error);
}
this.isExporting = false;
}
Expand Down

0 comments on commit 117dfda

Please sign in to comment.