Skip to content

Commit

Permalink
neo4j indexing
Browse files Browse the repository at this point in the history
  • Loading branch information
0o-de-lally committed Nov 6, 2024
1 parent ccef148 commit 5482239
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 36 deletions.
1 change: 1 addition & 0 deletions warehouse/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ pub mod table_structs;
pub mod warehouse_cli;
pub mod unzip_temp;
pub mod restaurant;
pub mod neo4j_init;
33 changes: 33 additions & 0 deletions warehouse/src/neo4j_init.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
use anyhow::Result;
use neo4rs::Graph;

pub static ACCOUNT_CONSTRAINT: &str =
"CREATE CONSTRAINT unique_address FOR (n:Account) REQUIRE n.address IS UNIQUE";

pub static TX_CONSTRAINT: &str =
"CREATE CONSTRAINT unique_tx_hash FOR ()-[r:Tx]-() REQUIRE r.txs_hash IS UNIQUE";

// assumes the Account.address is stored as a hex string
// NOTE: hex numericals may query faster but will be hard to use in user interface
pub static INDEX_HEX_ADDR: &str =
"CREATE TEXT INDEX hex_addr IF NOT EXISTS FOR (n:Account) ON (n.address)";

pub static INDEX_TX_TIMESTAMP: &str =
"CREATE INDEX tx_timestamp IF NOT EXISTS FOR ()-[r:Tx]-() ON (r.block_timestamp)";

pub static INDEX_TX_FUNCTION: &str =
"CREATE INDEX tx_function IF NOT EXISTS FOR ()-[r:Tx]-() ON (r.function)";

pub async fn init_neo4j(graph: Graph) -> Result<()>{
let mut txn = graph.start_txn().await.unwrap();

txn.run_queries([
ACCOUNT_CONSTRAINT,
TX_CONSTRAINT,
INDEX_HEX_ADDR,
INDEX_TX_TIMESTAMP,
INDEX_TX_FUNCTION,
]).await?;
txn.commit().await?;
Ok(())
}
54 changes: 18 additions & 36 deletions warehouse/tests/test_neo4j_meta.rs
Original file line number Diff line number Diff line change
@@ -1,49 +1,24 @@
mod support;

use anyhow::Result;
use libra_warehouse::neo4j_init::init_neo4j;
// use libra_warehouse::table_structs::WarehouseTxMaster;
use neo4rs::{query, Graph, Node};
use support::neo4j_testcontainer::start_neo4j_container;

/// get a
pub async fn make_driver(port: u16) -> Result<Graph> {
pub async fn get_neo4j_pool(port: u16) -> Result<Graph> {
let uri = format!("127.0.0.1:{port}");
let user = "neo4j";
let pass = "neo";
Ok(Graph::new(uri, user, pass).await?)
}

// pub async fn connect_neo4j(port: u16) {
// let graph = make_driver(port).await;

// let mut txn = graph.start_txn().await.unwrap();

// txn.run_queries([
// "MERGE (p:Person {name: 'alice', id: 123 })",
// "MERGE (p:Person {name: 'bob', id: 456 })",
// "MERGE (p:Person {name: 'carol', id: 789 })",
// ])
// .await
// .unwrap();
// txn.commit().await.unwrap();

// let mut result = graph
// .execute(query("MATCH (p:Person {name: $this_name}) RETURN p").param("this_name", "alice"))
// .await
// .unwrap();
// while let Ok(Some(row)) = result.next().await {
// let node: Node = row.get("p").unwrap();
// let id: u64 = node.get("id").unwrap();
// assert!(id == 123);
// }

// }

#[tokio::test]
async fn test_neo4j_connect() -> Result<()> {
let c = start_neo4j_container();
let port = c.get_host_port_ipv4(7687);
let graph = make_driver(port).await?;
let graph = get_neo4j_pool(port).await?;

let mut txn = graph.start_txn().await.unwrap();

Expand Down Expand Up @@ -73,34 +48,41 @@ async fn test_neo4j_connect() -> Result<()> {
async fn test_tx_insert() -> Result<()> {
let c = start_neo4j_container();
let port = c.get_host_port_ipv4(7687);
let graph = make_driver(port).await?;
let graph = get_neo4j_pool(port).await?;

let mut txn = graph.start_txn().await.unwrap();

txn.run_queries([
"MERGE (from:Account {address: 'alice'})-[r:Tx {txs_hash: '0000000'}]->(to:Account {address: 'bob'})"
"MERGE (from:Account {address: '0xa11ce'})-[r:Tx {txs_hash: '0000000'}]->(to:Account {address: '0x808'})"
]).await.unwrap();
txn.commit().await.unwrap();

let mut result = graph
.execute(query(
"MATCH p=()-[:Tx {txs_hash: '0000000'}]->() RETURN p",
))
.execute(query("MATCH p=()-[:Tx {txs_hash: '0000000'}]->() RETURN p"))
.await?;
let mut found_rows = 0;
while let Ok(Some(row)) = result.next().await {
while let Ok(Some(_row)) = result.next().await {
found_rows += 1;
}
assert!(found_rows == 1);

let mut result = graph
.execute(query("MATCH (p:Account {address: 'alice'}) RETURN p"))
.execute(query("MATCH (p:Account {address: '0xa11ce'}) RETURN p"))
.await?;
while let Ok(Some(row)) = result.next().await {
let node: Node = row.get("p").unwrap();
let id: String = node.get("address").unwrap();
assert!(id == "alice".to_owned());
dbg!(&id);
assert!(id == "0xa11ce".to_owned());
}

Ok(())
}

#[tokio::test]
async fn test_init_indices() {
let c = start_neo4j_container();
let port = c.get_host_port_ipv4(7687);
let graph = get_neo4j_pool(port).await.expect("could not get n2o4j connection pool");
init_neo4j(graph).await.expect("could start index");
}

0 comments on commit 5482239

Please sign in to comment.