Skip to content

Commit

Permalink
feat(parquet-exporter): init parquet exporter
Browse files Browse the repository at this point in the history
Initial implementation of the parquet exporter.
Currently only supports data item tags.
  • Loading branch information
karlprieb committed Sep 13, 2024
1 parent f765a76 commit 059cf4d
Show file tree
Hide file tree
Showing 7 changed files with 322 additions and 10 deletions.
6 changes: 3 additions & 3 deletions flake.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
openjdk
sqlite-interactive
python3
duckdb
];
};
};
Expand Down
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
"@ar.io/sdk": "^2.0.0",
"@aws-lite/client": "^0.21.7",
"@aws-lite/s3": "^0.1.21",
"@permaweb/aoconnect": "^0.0.56",
"@clickhouse/client": "^1.3.0",
"@permaweb/aoconnect": "^0.0.56",
"apollo-server-express": "^3.13.0",
"arbundles": "^0.11.1",
"arweave": "^1.14.4",
Expand All @@ -21,6 +21,7 @@
"cors": "^2.8.5",
"crypto": "^1.0.1",
"dotenv": "^16.3.1",
"duckdb": "^1.0.0",
"express": "^4.18.1",
"express-async-handler": "^1.2.0",
"express-openapi-validator": "^5.1.2",
Expand Down
9 changes: 9 additions & 0 deletions src/database/duckdb/schema.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
CREATE TABLE IF NOT EXISTS tags (
height UINTEGER NOT NULL,
id TEXT NOT NULL,
tag_index USMALLINT NOT NULL,
created_at INTEGER NOT NULL,
tag_name TEXT NOT NULL,
tag_value TEXT NOT NULL,
is_data_item BOOLEAN NOT NULL
);
34 changes: 34 additions & 0 deletions src/routes/ar-io.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import * as system from '../system.js';
import * as events from '../events.js';
import { release } from '../version.js';
import { signatureStore } from '../system.js';
import log from '../log.js';
import { ParquetExporter } from '../workers/parquet-exporter.js';

export const arIoRouter = Router();

Expand Down Expand Up @@ -219,3 +221,35 @@ arIoRouter.post(
}
},
);

arIoRouter.post(
'/ar-io/admin/export-parquet',
express.json(),
async (req, res) => {
try {
const { outputDir, startHeight, endHeight, maxFileRows } = req.body;

if (!outputDir || !startHeight || !endHeight || !maxFileRows) {
res.status(400).send('Missing required parameters');
return;
}

const parquetExporter = new ParquetExporter({
log,
duckDbPath: './data/duckdb/tags.duckdb',
sqliteDbPath: './data/sqlite/bundles.db',
});

await parquetExporter.exportDataItemTagsParquet({
outputDir,
startHeight,
endHeight,
maxFileRows,
});

res.json({ message: 'Parquet export started' });
} catch (error: any) {
res.status(500).send(error?.message);
}
},
);
186 changes: 186 additions & 0 deletions src/workers/parquet-exporter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
/**
* AR.IO Gateway
* Copyright (C) 2022-2023 Permanent Data Solutions, Inc. All Rights Reserved.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
import { readFileSync, existsSync, mkdirSync } from 'node:fs';
import * as winston from 'winston';
import duckdb from 'duckdb';
// import duckdb, { RowData } from 'duckdb';

export class ParquetExporter {
private log: winston.Logger;
private db: duckdb.Database;

constructor({
log,
duckDbPath,
sqliteDbPath,
}: {
log: winston.Logger;
duckDbPath: string;
sqliteDbPath: string;
}) {
this.log = log.child({ class: this.constructor.name });
this.db = new duckdb.Database(duckDbPath);

const duckDbSchema = readFileSync(
'./src/database/duckdb/schema.sql',
'utf8',
);

this.db.run(duckDbSchema);
this.log.debug('DuckDB schema imported successfully');

this.db.exec(`INSTALL sqlite; LOAD sqlite;`);
this.db.exec(`ATTACH '${sqliteDbPath}' AS sqlite_db (TYPE sqlite);`);
this.log.debug('SQLite database attached successfully');
}

async exportDataItemTagsParquet({
startHeight,
endHeight,
maxFileRows,
outputDir,
}: {
startHeight: number;
endHeight: number;
maxFileRows: number;
outputDir: string;
}) {
console.log(outputDir);

const sqliteQuery = `
SELECT * FROM (
SELECT
sdit.height,
sdit.data_item_id AS id,
sdit.data_item_tag_index AS tag_index,
sdi.indexed_at AS created_at,
tn.name AS tag_name,
tv.value AS tag_value,
1 AS is_data_item
FROM
sqlite_db.stable_data_item_tags sdit
JOIN
sqlite_db.tag_names tn ON sdit.tag_name_hash = tn.hash
JOIN
sqlite_db.tag_values tv ON sdit.tag_value_hash = tv.hash
JOIN
sqlite_db.stable_data_items sdi ON sdit.data_item_id = sdi.id
WHERE
sdit.height BETWEEN ${startHeight} AND ${endHeight}
UNION ALL
SELECT
ndit.height,
ndit.data_item_id AS id,
ndit.data_item_tag_index AS tag_index,
ndi.indexed_at AS created_at,
tn.name AS tag_name,
tv.value AS tag_value,
1 AS is_data_item
FROM
sqlite_db.new_data_item_tags ndit
JOIN
sqlite_db.tag_names tn ON ndit.tag_name_hash = tn.hash
JOIN
sqlite_db.tag_values tv ON ndit.tag_value_hash = tv.hash
JOIN
sqlite_db.new_data_items ndi ON ndit.data_item_id = ndi.id
WHERE
ndit.height BETWEEN ${startHeight} AND ${endHeight}
) AS combined_results
LIMIT ${maxFileRows};
`;

try {
this.db.exec('BEGIN TRANSACTION;');

this.db.exec(`INSERT INTO tags ${sqliteQuery}`, (err: Error | null) => {
if (err) {
this.log.error('Error inserting data into DuckDB:', err);
throw err;
}
});

this.db.exec('COMMIT;');
} catch (err) {
this.db.exec('ROLLBACK;');
this.log.error('Error inserting data into DuckDB, rolling back:', err);
} finally {
this.log.info('Data imported into DuckDB tags table successfully');
this.exportToParquet({
outputDir,
tableName: 'tags',
minHeight: startHeight,
maxHeight: endHeight,
rowCount: maxFileRows,
});
this.log.info('Data exported to Parquet file successfully');
this.truncateTable('tags');
this.db.close();
}
}

private async exportToParquet({
outputDir,
tableName,
minHeight,
maxHeight,
rowCount,
}: {
outputDir: string;
tableName: string;
minHeight: number;
maxHeight: number;
rowCount: number;
}): Promise<void> {
const fileName = `${tableName}-minHeight:${minHeight}-maxHeight:${maxHeight}-rowCount:${rowCount}.parquet`;
const filePath = `${outputDir}/${fileName}`;

if (!existsSync(outputDir)) {
mkdirSync(outputDir, { recursive: true });
}

return new Promise((resolve, reject) => {
this.db.exec(
`COPY tags TO '${filePath}' (FORMAT PARQUET);`,
(err: Error | null) => {
if (err) {
this.log.error(`Error exporting to Parquet file ${fileName}:`, err);
reject(err);
} else {
this.log.info(`Exported to Parquet file: ${fileName}`);
resolve();
}
},
);
});
}

private async truncateTable(table: string): Promise<void> {
return new Promise((resolve, reject) => {
this.db.exec(`TRUNCATE TABLE ${table};`, (err: Error | null) => {
if (err) {
this.log.error(`Error truncating ${table} table:`, err);
reject(err);
} else {
this.log.info(`${table} table truncated`);
resolve();
}
});
});
}
}
Loading

0 comments on commit 059cf4d

Please sign in to comment.