Skip to content

Commit

Permalink
Merge pull request #12 from weaveVM/0.1.1
Browse files Browse the repository at this point in the history
0.1.1
  • Loading branch information
charmful0x authored Oct 8, 2024
2 parents d2bfb39 + 6cc98e2 commit 2335279
Show file tree
Hide file tree
Showing 5 changed files with 179 additions and 33 deletions.
17 changes: 16 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "arweave-exex-backfill"
version = "0.0.1"
version = "0.1.1"
edition = "2021"

[dependencies]
Expand All @@ -16,6 +16,7 @@ serde_json = "1.0.128"
shuttle-axum = "0.47.0"
shuttle-runtime = "0.47.0"
bundlr-sdk = { git = "https://github.com/weaveVM/wvm-irys-rust-sdk.git", branch = "master" }
common = { git = "https://github.com/weaveVM/miscalleneous.git", branch = "main"}
eyre = "0.6.12"
borsh = "1.5.1"
brotli = "6.0.0"
Expand Down
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ async fn main() -> shuttle_axum::ShuttleAxum {
.route("/block/hash/:hash", get(handle_get_block_by_hash));

task::spawn(async move {
let _ = backfill_blocks(3000).await;
let _ = backfill_blocks(5000).await;
});

Ok(router.into())
Expand Down
188 changes: 159 additions & 29 deletions src/utils/arweave_gql.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use crate::utils::constants::GQL_START_TIMESTAMP;
use crate::utils::constants::IRYS_GQL_GATEWAY;
use crate::utils::constants::{ARWEAVE_GQL_GATEWAY, IRYS_GQL_GATEWAY};
use crate::utils::wvm_client::get_latest_block_number;
use anyhow::Error;
use common::address_book::ADDRESS_BOOK;
use reqwest::Client;
use serde_json::{json, Value};
use std::time::{SystemTime, UNIX_EPOCH};

async fn send_graphql(gateway: &str, query: Value) -> Result<Value, Error> {
let client = Client::new();
Expand All @@ -19,12 +18,11 @@ async fn send_graphql(gateway: &str, query: Value) -> Result<Value, Error> {
Ok(json_res)
}

async fn retrieve_all_transactions(scan_count: u32) -> Result<Vec<u32>, Error> {
async fn retrieve_all_transactions(scan_count: u32, address: &str) -> Result<Vec<u32>, Error> {
let mut block_numbers: Vec<u32> = Vec::new();
let mut cursor: Option<String> = None;
const PAGE_SIZE: u32 = 1000;
let mut page_count: u32 = 0;
let current_timestamp = get_timestamp();

loop {
page_count += 1;
Expand All @@ -35,16 +33,15 @@ async fn retrieve_all_transactions(scan_count: u32) -> Result<Vec<u32>, Error> {

let query = json!({
"query": r#"
query GetTransactions($cursor: String, $pageSize: Int!, $startTimestamp: BigInt!, $endTimestamp: BigInt!) {
query GetTransactions($cursor: String, $address: String!, $pageSize: Int!) {
transactions(
first: $pageSize,
after: $cursor,
timestamp: {from: $startTimestamp, to: $endTimestamp},
order: DESC,
sort: HEIGHT_DESC,
owners: [$address],
tags: [
{ name: "Protocol", values: ["WeaveVM-ExEx"] }
],
owners: ["5JUE58yemNynRDeQDyVECKbGVCQbnX7unPrBRqCPVn5Z", "F8XVrMQzsHiWfn1CaKtUPxAgUkATXQjXULWw3oVXCiFV"]
) {
edges {
node {
Expand All @@ -57,20 +54,18 @@ async fn retrieve_all_transactions(scan_count: u32) -> Result<Vec<u32>, Error> {
}
pageInfo {
hasNextPage
endCursor
}
}
}
"#,
"variables": {
"cursor": cursor,
"pageSize": PAGE_SIZE,
"startTimestamp": GQL_START_TIMESTAMP,
"endTimestamp": current_timestamp
"address": address
}
});

let res = send_graphql(IRYS_GQL_GATEWAY, query)
let res = send_graphql(ARWEAVE_GQL_GATEWAY, query)
.await
.unwrap_or(generate_empty_gql_server_res());

Expand Down Expand Up @@ -121,9 +116,12 @@ async fn retrieve_all_transactions(scan_count: u32) -> Result<Vec<u32>, Error> {
break;
}

cursor = page_info
.get("endCursor")
.and_then(|ec| ec.as_str())
cursor = transactions
.get("edges")
.and_then(|edges| edges.as_array())
.and_then(|edges| edges.last())
.and_then(|last_edge| last_edge.get("cursor"))
.and_then(|cursor_val| cursor_val.as_str())
.map(String::from);

println!(
Expand All @@ -147,10 +145,25 @@ async fn retrieve_all_transactions(scan_count: u32) -> Result<Vec<u32>, Error> {
}

pub async fn detect_missing_blocks(scan_count: u32) -> Result<Vec<u32>, Error> {
// retrieve WeaveVM-ExEx data protocol blocks on Arweave
let blocks = retrieve_all_transactions(scan_count)
// load WeaveVM address book
let address_book: Value = serde_json::from_str(ADDRESS_BOOK).unwrap();
let exex_archiver_addr = address_book["ario_fmt_alphanet_exex_publisher"]
.as_str()
.unwrap();
let exex_backfill_addr = address_book["ario_fmt_alphanet_exex_backfiller"]
.as_str()
.unwrap();
let exex_archiver_blocks = retrieve_all_transactions(scan_count, exex_archiver_addr)
.await
.unwrap();
let exex_backfill_blocks = retrieve_all_transactions(scan_count, exex_backfill_addr)
.await
.unwrap_or(Vec::new());
.unwrap();
// concat archiver and backfill blocks
let mut blocks = [&exex_archiver_blocks[..], &exex_backfill_blocks[..]].concat();
// remove possible duplicates from both archiver & backfill
blocks.sort();
blocks.dedup();
// latest WeaveVM block number from the RPC
let latest_block = get_latest_block_number().await.unwrap_or(0) as u32;

Expand All @@ -170,25 +183,142 @@ pub async fn detect_missing_blocks(scan_count: u32) -> Result<Vec<u32>, Error> {

// util functions

fn get_timestamp() -> u128 {
let now = SystemTime::now();
let unix_timestamp_ms = now
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_millis();
unix_timestamp_ms
}

fn generate_empty_gql_server_res() -> Value {
json!({
"data": {
"transactions": {
"edges": [],
"pageInfo": {
"hasNextPage": false,
"endCursor": null
}
}
}
})
}

// async fn irys_retrieve_all_transactions(scan_count: u32) -> Result<Vec<u32>, Error> {
// let mut block_numbers: Vec<u32> = Vec::new();
// let mut cursor: Option<String> = None;
// const PAGE_SIZE: u32 = 1000;
// let mut page_count: u32 = 0;
// let current_timestamp = get_timestamp();

// loop {
// page_count += 1;
// println!(
// "Fetching page {}. Current cursor: {:#?}",
// page_count, cursor
// );

// let query = json!({
// "query": r#"
// query GetTransactions($cursor: String, $pageSize: Int!, $startTimestamp: BigInt!, $endTimestamp: BigInt!) {
// transactions(
// first: $pageSize,
// after: $cursor,
// timestamp: {from: $startTimestamp, to: $endTimestamp},
// order: DESC,
// tags: [
// { name: "Protocol", values: ["WeaveVM-ExEx"] }
// ],
// owners: ["5JUE58yemNynRDeQDyVECKbGVCQbnX7unPrBRqCPVn5Z", "F8XVrMQzsHiWfn1CaKtUPxAgUkATXQjXULWw3oVXCiFV"]
// ) {
// edges {
// node {
// tags {
// name
// value
// }
// }
// cursor
// }
// pageInfo {
// hasNextPage
// endCursor
// }
// }
// }
// "#,
// "variables": {
// "cursor": cursor,
// "pageSize": PAGE_SIZE,
// "startTimestamp": GQL_START_TIMESTAMP,
// "endTimestamp": current_timestamp
// }
// });

// let res = send_graphql(IRYS_GQL_GATEWAY, query)
// .await
// .unwrap_or(generate_empty_gql_server_res());

// let transactions = res
// .get("data")
// .and_then(|data| data.get("transactions"))
// .ok_or_else(|| Error::msg("Invalid response structure"))?;

// let new_block_numbers: Vec<u32> = transactions
// .get("edges")
// .and_then(|edges| edges.as_array())
// .ok_or_else(|| Error::msg("Edges not found or not an array"))?
// .iter()
// .filter_map(|edge| {
// edge.get("node")
// .and_then(|node| node.get("tags"))
// .and_then(|tags| tags.as_array())
// .and_then(|tags| {
// tags.iter().find_map(|tag| {
// if tag.get("name")?.as_str()? == "Block-Number" {
// tag.get("value")?.as_str()?.parse::<u32>().ok()
// } else {
// None
// }
// })
// })
// })
// .collect();

// println!(
// "Fetched {} new block numbers on page {}",
// new_block_numbers.len(),
// page_count
// );
// block_numbers.extend(new_block_numbers);

// let page_info = transactions
// .get("pageInfo")
// .ok_or_else(|| Error::msg("PageInfo not found"))?;

// let has_next_page = page_info
// .get("hasNextPage")
// .and_then(|hnp| hnp.as_bool())
// .unwrap_or(false);

// if !has_next_page {
// println!("No more pages. Pagination complete.");
// break;
// }

// cursor = page_info
// .get("endCursor")
// .and_then(|ec| ec.as_str())
// .map(String::from);

// println!(
// "Page {} complete. Has next page: {}. Next cursor: {:#?}",
// page_count, has_next_page, cursor
// );
// if page_count > scan_count {
// break;
// }
// }

// println!("Pagination complete. Total pages fetched: {}", page_count);

// block_numbers.sort();
// block_numbers.dedup();

// println!("{}", "#".repeat(100));
// println!("Total scanned block numbers: {}", block_numbers.len());

// Ok(block_numbers)
// }
2 changes: 1 addition & 1 deletion src/utils/constants.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
pub const WVM_RPC_URL: &str = "https://testnet-rpc.wvm.dev";
pub const IRYS_GQL_GATEWAY: &str = "https://arweave.mainnet.irys.xyz";
pub const ARWEAVE_GQL_GATEWAY: &str = "https://arweave.net";
pub const IRYS_UPLOADER_URL: &str = "https://node1.bundlr.network";
pub const RETH_CLIENT_VERSION: &str = "reth/v1.0.6";
pub const WVM_NETWORK_TAG: &str = "Alphanet v0.1.0";
pub const GQL_START_TIMESTAMP: u128 = 1727828340;

0 comments on commit 2335279

Please sign in to comment.