diff --git a/Cargo.lock b/Cargo.lock index 3ae7310..b659664 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -108,7 +108,7 @@ checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" [[package]] name = "arweave-exex-backfill" -version = "0.0.1" +version = "0.1.1" dependencies = [ "anyhow", "axum 0.7.6", @@ -116,6 +116,7 @@ dependencies = [ "borsh-derive 1.5.1", "brotli", "bundlr-sdk", + "common", "dotenv", "ethers", "ethers-providers", @@ -849,6 +850,14 @@ dependencies = [ "unicode-width", ] +[[package]] +name = "common" +version = "0.1.0" +source = "git+https://github.com/weaveVM/miscalleneous.git?branch=main#8ed3bf038a727c9cfb52f7d706dca10c8e206e63" +dependencies = [ + "paste", +] + [[package]] name = "const-hex" version = "1.12.0" @@ -3201,6 +3210,12 @@ dependencies = [ "subtle", ] +[[package]] +name = "paste" +version = "1.0.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" + [[package]] name = "path-slash" version = "0.2.1" diff --git a/Cargo.toml b/Cargo.toml index 2e93861..d0dfc92 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "arweave-exex-backfill" -version = "0.0.1" +version = "0.1.1" edition = "2021" [dependencies] @@ -16,6 +16,7 @@ serde_json = "1.0.128" shuttle-axum = "0.47.0" shuttle-runtime = "0.47.0" bundlr-sdk = { git = "https://github.com/weaveVM/wvm-irys-rust-sdk.git", branch = "master" } +common = { git = "https://github.com/weaveVM/miscalleneous.git", branch = "main"} eyre = "0.6.12" borsh = "1.5.1" brotli = "6.0.0" diff --git a/src/main.rs b/src/main.rs index c721772..4820959 100644 --- a/src/main.rs +++ b/src/main.rs @@ -23,7 +23,7 @@ async fn main() -> shuttle_axum::ShuttleAxum { .route("/block/hash/:hash", get(handle_get_block_by_hash)); task::spawn(async move { - let _ = backfill_blocks(3000).await; + let _ = backfill_blocks(5000).await; }); Ok(router.into()) diff --git a/src/utils/arweave_gql.rs b/src/utils/arweave_gql.rs index d0d66b6..6e490b1 100644 --- a/src/utils/arweave_gql.rs +++ b/src/utils/arweave_gql.rs @@ -1,10 +1,9 @@ -use crate::utils::constants::GQL_START_TIMESTAMP; -use crate::utils::constants::IRYS_GQL_GATEWAY; +use crate::utils::constants::{ARWEAVE_GQL_GATEWAY, IRYS_GQL_GATEWAY}; use crate::utils::wvm_client::get_latest_block_number; use anyhow::Error; +use common::address_book::ADDRESS_BOOK; use reqwest::Client; use serde_json::{json, Value}; -use std::time::{SystemTime, UNIX_EPOCH}; async fn send_graphql(gateway: &str, query: Value) -> Result { let client = Client::new(); @@ -19,12 +18,11 @@ async fn send_graphql(gateway: &str, query: Value) -> Result { Ok(json_res) } -async fn retrieve_all_transactions(scan_count: u32) -> Result, Error> { +async fn retrieve_all_transactions(scan_count: u32, address: &str) -> Result, Error> { let mut block_numbers: Vec = Vec::new(); let mut cursor: Option = None; const PAGE_SIZE: u32 = 1000; let mut page_count: u32 = 0; - let current_timestamp = get_timestamp(); loop { page_count += 1; @@ -35,16 +33,15 @@ async fn retrieve_all_transactions(scan_count: u32) -> Result, Error> { let query = json!({ "query": r#" - query GetTransactions($cursor: String, $pageSize: Int!, $startTimestamp: BigInt!, $endTimestamp: BigInt!) { + query GetTransactions($cursor: String, $address: String!, $pageSize: Int!) { transactions( first: $pageSize, after: $cursor, - timestamp: {from: $startTimestamp, to: $endTimestamp}, - order: DESC, + sort: HEIGHT_DESC, + owners: [$address], tags: [ { name: "Protocol", values: ["WeaveVM-ExEx"] } ], - owners: ["5JUE58yemNynRDeQDyVECKbGVCQbnX7unPrBRqCPVn5Z", "F8XVrMQzsHiWfn1CaKtUPxAgUkATXQjXULWw3oVXCiFV"] ) { edges { node { @@ -57,7 +54,6 @@ async fn retrieve_all_transactions(scan_count: u32) -> Result, Error> { } pageInfo { hasNextPage - endCursor } } } @@ -65,12 +61,11 @@ async fn retrieve_all_transactions(scan_count: u32) -> Result, Error> { "variables": { "cursor": cursor, "pageSize": PAGE_SIZE, - "startTimestamp": GQL_START_TIMESTAMP, - "endTimestamp": current_timestamp + "address": address } }); - let res = send_graphql(IRYS_GQL_GATEWAY, query) + let res = send_graphql(ARWEAVE_GQL_GATEWAY, query) .await .unwrap_or(generate_empty_gql_server_res()); @@ -121,9 +116,12 @@ async fn retrieve_all_transactions(scan_count: u32) -> Result, Error> { break; } - cursor = page_info - .get("endCursor") - .and_then(|ec| ec.as_str()) + cursor = transactions + .get("edges") + .and_then(|edges| edges.as_array()) + .and_then(|edges| edges.last()) + .and_then(|last_edge| last_edge.get("cursor")) + .and_then(|cursor_val| cursor_val.as_str()) .map(String::from); println!( @@ -147,10 +145,25 @@ async fn retrieve_all_transactions(scan_count: u32) -> Result, Error> { } pub async fn detect_missing_blocks(scan_count: u32) -> Result, Error> { - // retrieve WeaveVM-ExEx data protocol blocks on Arweave - let blocks = retrieve_all_transactions(scan_count) + // load WeaveVM address book + let address_book: Value = serde_json::from_str(ADDRESS_BOOK).unwrap(); + let exex_archiver_addr = address_book["ario_fmt_alphanet_exex_publisher"] + .as_str() + .unwrap(); + let exex_backfill_addr = address_book["ario_fmt_alphanet_exex_backfiller"] + .as_str() + .unwrap(); + let exex_archiver_blocks = retrieve_all_transactions(scan_count, exex_archiver_addr) + .await + .unwrap(); + let exex_backfill_blocks = retrieve_all_transactions(scan_count, exex_backfill_addr) .await - .unwrap_or(Vec::new()); + .unwrap(); + // concat archiver and backfill blocks + let mut blocks = [&exex_archiver_blocks[..], &exex_backfill_blocks[..]].concat(); + // remove possible duplicates from both archiver & backfill + blocks.sort(); + blocks.dedup(); // latest WeaveVM block number from the RPC let latest_block = get_latest_block_number().await.unwrap_or(0) as u32; @@ -170,15 +183,6 @@ pub async fn detect_missing_blocks(scan_count: u32) -> Result, Error> { // util functions -fn get_timestamp() -> u128 { - let now = SystemTime::now(); - let unix_timestamp_ms = now - .duration_since(UNIX_EPOCH) - .expect("Time went backwards") - .as_millis(); - unix_timestamp_ms -} - fn generate_empty_gql_server_res() -> Value { json!({ "data": { @@ -186,9 +190,135 @@ fn generate_empty_gql_server_res() -> Value { "edges": [], "pageInfo": { "hasNextPage": false, - "endCursor": null } } } }) } + +// async fn irys_retrieve_all_transactions(scan_count: u32) -> Result, Error> { +// let mut block_numbers: Vec = Vec::new(); +// let mut cursor: Option = None; +// const PAGE_SIZE: u32 = 1000; +// let mut page_count: u32 = 0; +// let current_timestamp = get_timestamp(); + +// loop { +// page_count += 1; +// println!( +// "Fetching page {}. Current cursor: {:#?}", +// page_count, cursor +// ); + +// let query = json!({ +// "query": r#" +// query GetTransactions($cursor: String, $pageSize: Int!, $startTimestamp: BigInt!, $endTimestamp: BigInt!) { +// transactions( +// first: $pageSize, +// after: $cursor, +// timestamp: {from: $startTimestamp, to: $endTimestamp}, +// order: DESC, +// tags: [ +// { name: "Protocol", values: ["WeaveVM-ExEx"] } +// ], +// owners: ["5JUE58yemNynRDeQDyVECKbGVCQbnX7unPrBRqCPVn5Z", "F8XVrMQzsHiWfn1CaKtUPxAgUkATXQjXULWw3oVXCiFV"] +// ) { +// edges { +// node { +// tags { +// name +// value +// } +// } +// cursor +// } +// pageInfo { +// hasNextPage +// endCursor +// } +// } +// } +// "#, +// "variables": { +// "cursor": cursor, +// "pageSize": PAGE_SIZE, +// "startTimestamp": GQL_START_TIMESTAMP, +// "endTimestamp": current_timestamp +// } +// }); + +// let res = send_graphql(IRYS_GQL_GATEWAY, query) +// .await +// .unwrap_or(generate_empty_gql_server_res()); + +// let transactions = res +// .get("data") +// .and_then(|data| data.get("transactions")) +// .ok_or_else(|| Error::msg("Invalid response structure"))?; + +// let new_block_numbers: Vec = transactions +// .get("edges") +// .and_then(|edges| edges.as_array()) +// .ok_or_else(|| Error::msg("Edges not found or not an array"))? +// .iter() +// .filter_map(|edge| { +// edge.get("node") +// .and_then(|node| node.get("tags")) +// .and_then(|tags| tags.as_array()) +// .and_then(|tags| { +// tags.iter().find_map(|tag| { +// if tag.get("name")?.as_str()? == "Block-Number" { +// tag.get("value")?.as_str()?.parse::().ok() +// } else { +// None +// } +// }) +// }) +// }) +// .collect(); + +// println!( +// "Fetched {} new block numbers on page {}", +// new_block_numbers.len(), +// page_count +// ); +// block_numbers.extend(new_block_numbers); + +// let page_info = transactions +// .get("pageInfo") +// .ok_or_else(|| Error::msg("PageInfo not found"))?; + +// let has_next_page = page_info +// .get("hasNextPage") +// .and_then(|hnp| hnp.as_bool()) +// .unwrap_or(false); + +// if !has_next_page { +// println!("No more pages. Pagination complete."); +// break; +// } + +// cursor = page_info +// .get("endCursor") +// .and_then(|ec| ec.as_str()) +// .map(String::from); + +// println!( +// "Page {} complete. Has next page: {}. Next cursor: {:#?}", +// page_count, has_next_page, cursor +// ); +// if page_count > scan_count { +// break; +// } +// } + +// println!("Pagination complete. Total pages fetched: {}", page_count); + +// block_numbers.sort(); +// block_numbers.dedup(); + +// println!("{}", "#".repeat(100)); +// println!("Total scanned block numbers: {}", block_numbers.len()); + +// Ok(block_numbers) +// } diff --git a/src/utils/constants.rs b/src/utils/constants.rs index 427b77d..518dc04 100644 --- a/src/utils/constants.rs +++ b/src/utils/constants.rs @@ -1,6 +1,6 @@ pub const WVM_RPC_URL: &str = "https://testnet-rpc.wvm.dev"; pub const IRYS_GQL_GATEWAY: &str = "https://arweave.mainnet.irys.xyz"; +pub const ARWEAVE_GQL_GATEWAY: &str = "https://arweave.net"; pub const IRYS_UPLOADER_URL: &str = "https://node1.bundlr.network"; pub const RETH_CLIENT_VERSION: &str = "reth/v1.0.6"; pub const WVM_NETWORK_TAG: &str = "Alphanet v0.1.0"; -pub const GQL_START_TIMESTAMP: u128 = 1727828340;