Skip to content

Commit

Permalink
cli entry point to load txs to remote
Browse files Browse the repository at this point in the history
  • Loading branch information
0o-de-lally committed Nov 7, 2024
1 parent fd7a244 commit 65f0b24
Show file tree
Hide file tree
Showing 11 changed files with 311 additions and 85 deletions.
9 changes: 8 additions & 1 deletion warehouse/src/cypher_templates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,14 @@ pub fn write_batch_tx_string(list_str: String) -> String {
UNWIND tx_data AS tx
MERGE (from:Account {{address: tx.sender}})
MERGE (to:Account {{address: tx.recipient}})
MERGE (from)-[:Tx {{tx_hash: tx.tx_hash}}]->(to)
MERGE (from)-[rel:Tx {{tx_hash: tx.tx_hash}}]->(to)
ON CREATE SET rel.created = true
ON MATCH SET rel.created = false
WITH tx, rel
RETURN
COUNT(CASE WHEN rel.created = true THEN 1 END) AS merged_tx_count,
COUNT(CASE WHEN rel.created = false THEN 1 END) AS ignored_tx_count
"#
)
}
10 changes: 7 additions & 3 deletions warehouse/src/extract_transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,18 +106,22 @@ pub fn make_master_tx(
let p = raw.clone().into_payload().clone();
let ef = p.into_entry_function();

let tx = WarehouseTxMaster {
let mut tx = WarehouseTxMaster {
tx_hash,
expiration_timestamp: user_tx.expiration_timestamp_secs(),
sender: user_tx.sender().to_hex_literal(),
epoch,
round,
block_timestamp,
function: format!("{}::{}", ef.module().short_str_lossless(), ef.function()),
recipients: None,
recipient: None,
args: function_args_to_json(user_tx)?,
};

if let Some(deposit) = try_decode_deposit_tx(user_tx).ok() {

Check failure on line 121 in warehouse/src/extract_transactions.rs

View workflow job for this annotation

GitHub Actions / clippy

matching on `Some` with `ok()` is redundant
tx.recipient = Some(deposit.to.to_hex_literal());
}

Ok(tx)
}

Expand Down Expand Up @@ -178,7 +182,7 @@ pub fn function_args_to_json(user_tx: &SignedTransaction) -> Result<serde_json::
}

// TODO: unsure if this needs to happen on Rust side
fn _try_decode_deposit_tx(user_tx: &SignedTransaction) -> Result<WarehouseDepositTx> {
fn try_decode_deposit_tx(user_tx: &SignedTransaction) -> Result<WarehouseDepositTx> {
let (to, amount) = match EntryFunctionCall::decode(user_tx.payload()) {
Some(EntryFunctionCall::OlAccountTransfer { to, amount }) => (to, amount),
// many variants
Expand Down
1 change: 1 addition & 0 deletions warehouse/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ pub mod extract_snapshot;
pub mod extract_transactions;
pub mod load_account;
pub mod load_coin;
pub mod load_entrypoint;
pub mod load_tx_cypher;
pub mod migrate;
pub mod neo4j_init;
Expand Down
39 changes: 39 additions & 0 deletions warehouse/src/load_entrypoint.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
use crate::{
extract_transactions::extract_current_transactions,
load_tx_cypher,
scan::{ArchiveMap, ManifestInfo},
};

use anyhow::Result;
use neo4rs::Graph;

/// takes all the archives from a map, and tries to load them sequentially
pub async fn ingest_all(archive_map: &ArchiveMap, pool: &Graph) -> Result<()> {
for (_p, m) in archive_map.0.iter() {
let (merged, ignored) = try_load_one_archive(m, pool).await?;
println!(
"TOTAL transactions updated: {}, ignored: {}",
merged, ignored
);
}

Ok(())
}

pub async fn try_load_one_archive(man: &ManifestInfo, pool: &Graph) -> Result<(u64, u64)> {
let mut records_updated = 0u64;
let mut records_ignored = 0u64;
match man.contents {
crate::scan::BundleContent::Unknown => todo!(),
crate::scan::BundleContent::StateSnapshot => todo!(),
crate::scan::BundleContent::Transaction => {
let (txs, _) = extract_current_transactions(&man.archive_dir).await?;
let (merged, ignored) = load_tx_cypher::tx_batch(&txs, pool, 100).await?;
records_updated += merged;
records_ignored += ignored;
println!("transactions updated: {}, ignored: {}", merged, ignored);
}
crate::scan::BundleContent::EpochEnding => todo!(),
}
Ok((records_updated, records_ignored))
}
47 changes: 23 additions & 24 deletions warehouse/src/load_tx_cypher.rs
Original file line number Diff line number Diff line change
@@ -1,41 +1,40 @@
use anyhow::Result;
use neo4rs::{query, Graph};

use crate::table_structs::WarehouseTxMaster;
use crate::{cypher_templates::write_batch_tx_string, table_structs::WarehouseTxMaster};

pub async fn load_tx_cypher(
pub async fn tx_batch(
txs: &[WarehouseTxMaster],
pool: &Graph,
batch_len: usize,
) -> Result<()> {
) -> Result<(u64, u64)> {
let chunks: Vec<&[WarehouseTxMaster]> = txs.chunks(batch_len).collect();
let mut merged_count = 0u64;
let mut ignored_count = 0u64;

for c in chunks {
impl_batch_tx_insert(pool, c).await?;
let (m, ig) = impl_batch_tx_insert(pool, c).await?;
merged_count += m;
ignored_count += ig;
}

Ok(())
Ok((merged_count, ignored_count))
}

pub async fn impl_batch_tx_insert(pool: &Graph, batch_txs: &[WarehouseTxMaster]) -> Result<u64> {
let transactions = WarehouseTxMaster::slice_to_bolt_list(batch_txs);

// for tx in batch_txs {
// let mut this_query = tx.to_hashmap();
// transactions.push(this_query);
// }

let mut txn = pool.start_txn().await?;
pub async fn impl_batch_tx_insert(
pool: &Graph,
batch_txs: &[WarehouseTxMaster],
) -> Result<(u64, u64)> {
let list_str = WarehouseTxMaster::slice_to_template(batch_txs);
let cypher_string = write_batch_tx_string(list_str);

let q = query(
"UNWIND $transactions AS tx
MERGE (from:Account {address: tx.sender})
MERGE (to:Account {address: tx.recipient})
MERGE (from)-[:Tx {tx_hash: tx.tx_hash}]->(to)",
)
.param("transactions", transactions);
// Execute the query
let cypher_query = query(&cypher_string);
let mut res = pool.execute(cypher_query).await?;

txn.run(q).await?;
txn.commit().await?;
let row = res.next().await?.unwrap();
let merged: i64 = row.get("merged_tx_count").unwrap();
let ignored: i64 = row.get("ignored_tx_count").unwrap();

Ok(0)
Ok((merged as u64, ignored as u64))
}
2 changes: 1 addition & 1 deletion warehouse/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ use libra_warehouse::warehouse_cli::WarehouseCli;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
WarehouseCli::parse().run();
WarehouseCli::parse().run().await?;
Ok(())
}
25 changes: 22 additions & 3 deletions warehouse/src/neo4j_init.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
use anyhow::Result;
use anyhow::{Context, Result};
use neo4rs::Graph;

pub static URI_ENV: &str = "LIBRA_GRAPH_DB_URI";
pub static USER_ENV: &str = "LIBRA_GRAPH_DB_USER";
pub static PASS_ENV: &str = "LIBRA_GRAPH_DB_PASS";

pub static ACCOUNT_UNIQUE: &str =
"CREATE CONSTRAINT unique_address FOR (n:Account) REQUIRE n.address IS UNIQUE";

Expand All @@ -21,14 +25,29 @@ pub static INDEX_TX_TIMESTAMP: &str =
pub static INDEX_TX_FUNCTION: &str =
"CREATE INDEX tx_function IF NOT EXISTS FOR ()-[r:Tx]-() ON (r.function)";

/// get the driver connection object
pub async fn get_neo4j_pool(port: u16) -> Result<Graph> {
/// get the testing neo4j connection
pub async fn get_neo4j_localhost_pool(port: u16) -> Result<Graph> {
let uri = format!("127.0.0.1:{port}");
let user = "neo4j";
let pass = "neo";
Ok(Graph::new(uri, user, pass).await?)
}

/// get the driver connection object
pub async fn get_neo4j_remote_pool(uri: &str, user: &str, pass: &str) -> Result<Graph> {
Ok(Graph::new(uri, user, pass).await?)
}

pub fn get_credentials_from_env() -> Result<(String, String, String)> {
let uri = std::env::var(URI_ENV).context(format!("could not get env var {}", URI_ENV))?;
let user = std::env::var(USER_ENV).context(format!("could not get env var {}", USER_ENV))?;
let pass = std::env::var(PASS_ENV).context(format!("could not get env var {}", PASS_ENV))?;

Ok((uri, user, pass))
}

// get_neo4j_localhost_pool

pub async fn create_indexes(graph: &Graph) -> Result<()> {
let mut txn = graph.start_txn().await.unwrap();

Expand Down
13 changes: 9 additions & 4 deletions warehouse/src/table_structs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ pub struct WarehouseTxMaster {
pub block_timestamp: u64,
pub expiration_timestamp: u64,
// maybe there are counter parties
pub recipients: Option<Vec<AccountAddress>>,
pub recipient: Option<String>,
pub args: serde_json::Value,
}

Expand All @@ -69,7 +69,7 @@ impl Default for WarehouseTxMaster {
round: 0,
block_timestamp: 0,
expiration_timestamp: 0,
recipients: None,
recipient: None,
args: json!(""),
}
}
Expand All @@ -82,9 +82,11 @@ impl WarehouseTxMaster {
/// JSON5 but the last time someone updated
/// that crate was 3 years ago.
pub fn to_cypher_object_template(&self) -> String {
let recipient = self.recipient.as_ref().unwrap_or(&self.sender);

format!(
r#"{{tx_hash: "{}", sender: "{}", recipient: "{}"}}"#,
self.tx_hash, self.sender, self.sender,
self.tx_hash, self.sender, recipient,
)
}

Expand All @@ -93,8 +95,11 @@ impl WarehouseTxMaster {
let mut list_literal = "".to_owned();
for el in txs {
let s = el.to_cypher_object_template();
list_literal = format!("{}\n", s);
list_literal.push_str(&s);
list_literal.push(',');
// list_literal = format!("{},\n{}", list_literal, s);
}
list_literal.pop(); // need to drop last comma ","
format!("[{}]", list_literal)
}

Expand Down
91 changes: 82 additions & 9 deletions warehouse/src/warehouse_cli.rs
Original file line number Diff line number Diff line change
@@ -1,35 +1,108 @@
use anyhow::{bail, Result};
use clap::{Parser, Subcommand};
use neo4rs::Graph;
use std::path::PathBuf;

// use crate::{read_snapshot, restore, restore_bundle::RestoreBundle};
use crate::{
load_entrypoint::{ingest_all, try_load_one_archive},
neo4j_init::{get_credentials_from_env, PASS_ENV, URI_ENV, USER_ENV},
scan::scan_dir_archive,
};

#[derive(Parser)]
#[clap(author, version, about, long_about = None)]
#[clap(arg_required_else_help(true))]
/// DB tools e.g.: backup, restore, export to json
pub struct WarehouseCli {
#[clap(long, short('d'))]
/// URI of graphDB e.g. neo4j+s://localhost:port
db_uri: Option<String>,
/// username of db
db_username: Option<String>,
/// db password
db_password: Option<String>,

#[clap(subcommand)]
command: Sub,
}

#[derive(Subcommand)]
#[allow(clippy::large_enum_variant)]
pub enum Sub {
/// scans directory and subdirectory for manifests
/// Tries to identify the libra version they belong to (v5 etc.)
// #[clap(su)]
Scan {
/// scans sub directories for archive bundles
IngestAll {
#[clap(long, short('d'))]
start_path: PathBuf,
},
/// process and load a single archive
LoadOne {
#[clap(long, short('d'))]
archive_dir: PathBuf,
},
/// check archive is valid and can be decoded
Check {
#[clap(long, short('d'))]
dir_archive: PathBuf,
archive_dir: PathBuf,
},
}

impl WarehouseCli {
pub fn run(&self) {
pub async fn run(&self) -> anyhow::Result<()> {
match &self.command {
Sub::Scan { dir_archive } => {
dbg!(&dir_archive)
Sub::IngestAll { start_path } => {
let map = scan_dir_archive(start_path)?;
let pool = try_db_connection_pool(self).await?;

ingest_all(&map, &pool).await?;
}
Sub::LoadOne { archive_dir } => match scan_dir_archive(archive_dir)?.0.get(archive_dir)
{
Some(man) => {
let pool = try_db_connection_pool(self).await?;
try_load_one_archive(man, &pool).await?;
}
None => {
bail!(format!(
"ERROR: cannot find .manifest file under {}",
archive_dir.display()
));
}
},
Sub::Check { archive_dir } => match scan_dir_archive(archive_dir)?.0.get(archive_dir) {
Some(_) => todo!(),
None => {
bail!(format!(
"ERROR: cannot find .manifest file under {}",
archive_dir.display()
));
}
},
};
Ok(())
}
}

pub async fn try_db_connection_pool(cli: &WarehouseCli) -> Result<Graph> {
let db = match get_credentials_from_env() {
Ok((uri, user, password)) => Graph::new(uri, user, password).await?,
Err(_) => {
if cli.db_uri.is_some() && cli.db_username.is_some() && cli.db_password.is_some() {
Graph::new(
cli.db_uri.as_ref().unwrap(),
cli.db_username.as_ref().unwrap(),
cli.db_password.as_ref().unwrap(),
)
.await?
} else {
println!("Must pass DB credentials, either with CLI args or environment variable");
println!("call with --db-uri, --db-user, and --db-password");
println!(
"Alternatively export credentials to env variables: {}, {}, {}",
URI_ENV, USER_ENV, PASS_ENV
);
bail!("could not get a db instance with credentials");
}
}
};
Ok(db)
}
Loading

0 comments on commit 65f0b24

Please sign in to comment.