diff --git a/Dockerfile b/Dockerfile index 8828134..b80a72d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -23,8 +23,7 @@ RUN apt update && apt install -y libssl1.1 ca-certificates COPY --from=builder /app/target/release/relay-backend /usr/local/bin COPY --from=builder /app/target/release/phoenix-service /usr/local/bin COPY --from=builder /app/target/release/serve /usr/local/bin -COPY --from=builder /app/target/release/ingest-chain /usr/local/bin COPY --from=builder /app/target/release/ingest-block-production /usr/local/bin EXPOSE 3002 -ENTRYPOINT ["/usr/local/bin/relay-backend"] \ No newline at end of file +ENTRYPOINT ["/usr/local/bin/relay-backend"] diff --git a/sql/populate_tx_metadata.sql b/sql/populate_tx_metadata.sql deleted file mode 100644 index 38052d9..0000000 --- a/sql/populate_tx_metadata.sql +++ /dev/null @@ -1,206 +0,0 @@ -WITH inserted_rows as ( -INSERT INTO - transactions_data - SELECT - "NEST2".transaction_hash, - min("NEST2".tbn) AS blocknumber, - min("NEST2".minertx) AS minertransaction, - max("NEST2".lowbasefee) AS lowbasefee, - max("NEST2".congestion) AS congested, - CASE - WHEN - count(DISTINCT "NEST2".block_number) > 0 - AND max("NEST2".lowbasefee) < 1 - THEN - max("NEST2".lowfeetip) - ELSE - 0 - END - AS lowtip, min("NEST2".blockts) AS mined, max("NEST2".delay) AS delay, array_agg(DISTINCT "NEST2".blacklist) AS blacklist, count(DISTINCT "NEST2".block_number) AS blocksdelay - FROM - ( - SELECT - "NEST".minertx, - "NEST".tbn, - "NEST".blockts, - "NEST".prevts, - "NEST".transaction_hash, - blocks.block_number, - CASE - WHEN - "NEST".bf < blocks.base_fee_per_gas - THEN - 1 - ELSE - 0 - END - AS lowbasefee, - CASE - WHEN - ( - "NEST".bf - blocks.base_fee_per_gas - ) - >= 1000000000 AND "NEST".pf>=1000000000 and tt=2 - THEN - 0 - WHEN - ( - "NEST".bf - blocks.base_fee_per_gas - ) - >= 1000000000 AND tt=0 - THEN 0 - ELSE - 1 - END - AS lowfeetip, blocks.gas_limit AS gaslimit, blocks.gas_used AS gasused, "NEST".gas, - CASE - WHEN - "NEST".gas > (blocks.gas_limit - blocks.gas_used) - THEN - 1 - ELSE - 0 - END - AS congestion, "NEST".delay, "NEST".blacklist, - 9 as low_balance - FROM - ( - SELECT - transactions.transaction_hash, - min(transactions.block_number) AS tbn, - CASE - WHEN - min(transactions.max_fee_per_gas) IS NOT NULL - THEN - min(transactions.max_fee_per_gas) - ELSE - min(transactions.gas_price) - END - AS bf, - min(transactions.max_priority_fee_per_gas) as pf, - min (transactions.transaction_type) as tt, - min(transactions.gas) AS gas, min(blocks_1."timestamp") AS blockts, min(mempool_timestamps."timestamp") AS memts, - CASE - WHEN - ( - min(extract(epoch - FROM - blocks_1."timestamp")) - (( - SELECT - percentile_cont(0.5) WITHIN GROUP ( - ORDER BY -(mempool_timestamps.timestamp_unix)) AS percentile_cont)) - ) - <= 0 - THEN - 0 - ELSE - min(extract(epoch - FROM - blocks_1."timestamp")) - greatest(extract(epoch - FROM - min(transactions.prev_nonce_timestamp)), - ( -( - SELECT - percentile_cont(0.5) WITHIN GROUP ( - ORDER BY -(mempool_timestamps.timestamp_unix)) AS percentile_cont) - ) -) - END - AS delay, - CASE - WHEN - ( - min(extract(epoch - FROM - blocks_1."timestamp")) - (( - SELECT - percentile_cont(0.5) WITHIN GROUP ( - ORDER BY -(mempool_timestamps.timestamp_unix)) AS percentile_cont)) - ) - <= 0 - THEN - 1 - ELSE - 0 - END - AS minertx, min(blocks_1.block_number) AS bn, max(transaction_blacklists.blacklist_id) AS blacklist, min(transactions.prev_nonce_timestamp) AS prevts - FROM - transactions - LEFT JOIN - blocks blocks_1 - ON blocks_1.block_number = transactions.block_number - LEFT JOIN - ( - SELECT - "NEST2_1".transaction_hash, - min("NEST2_1".blacklist_id) AS blacklist_id - FROM - ( - SELECT - "NEST_1".transaction_hash, - "NEST_1".trace, - blacklist_entries.display_name, - blacklist_entries.blacklist_id - FROM - ( - SELECT - transactions_1.transaction_hash, - unnest(transactions_1.address_trace) AS trace, - transactions_1.block_timestamp - FROM - transactions transactions_1 - ) - "NEST_1" - LEFT JOIN - blacklist_entries - ON blacklist_entries.address = "NEST_1".trace - WHERE - ( - "NEST_1".trace IN - ( - SELECT - blacklist_entries_1.address - FROM - blacklist_entries blacklist_entries_1 - ) - ) - AND "NEST_1".block_timestamp > blacklist_entries.date_added - ) - "NEST2_1" - GROUP BY - "NEST2_1".transaction_hash - ) - transaction_blacklists - ON transaction_blacklists.transaction_hash = transactions.transaction_hash - INNER JOIN - mempool_timestamps - ON mempool_timestamps.transaction_hash = transactions.transaction_hash - WHERE - transactions.block_timestamp > ( - SELECT - GREATEST('2020-01-01', MAX(transactions_data.mined)) - from - transactions_data) - GROUP BY - transactions.transaction_hash, - blocks_1.gas_limit, - blocks_1.gas_used - ) - "NEST" - LEFT JOIN - blocks - ON blocks."timestamp" > GREATEST ("NEST".memts, "NEST".prevts) - AND blocks."timestamp" < "NEST".blockts - ) - "NEST2" - GROUP BY - "NEST2".transaction_hash - ON CONFLICT (transaction_hash) DO NOTHING - RETURNING 1 -) - -SELECT COUNT(*) FROM inserted_rows; diff --git a/src/bin/ingest-chain.rs b/src/bin/ingest-chain.rs deleted file mode 100644 index a2f0946..0000000 --- a/src/bin/ingest-chain.rs +++ /dev/null @@ -1,6 +0,0 @@ -use anyhow::Result; - -#[tokio::main] -pub async fn main() -> Result<()> { - relay_backend::start_chain_data_ingest().await -} diff --git a/src/censorship.rs b/src/censorship.rs index da47454..9c2ce4b 100644 --- a/src/censorship.rs +++ b/src/censorship.rs @@ -1,19 +1,14 @@ -mod archive_node; -mod chain; mod db; mod env; -mod mempool; mod relay; use anyhow::Result; use axum::{http::StatusCode, routing::get, Router}; -use chrono::{Duration, SubsecRound, Utc}; +use chrono::{Duration, Utc}; use enum_iterator::all; use futures::future; -use gcp_bigquery_client::Client; use itertools::Itertools; use sqlx::{Connection, PgConnection}; -use std::cmp; use std::collections::HashMap; use std::net::SocketAddr; use std::process; @@ -21,47 +16,12 @@ use tracing::{error, info, warn}; use crate::log; -use self::archive_node::ArchiveNode; use self::db::{CensorshipDB, PostgresCensorshipDB}; use self::env::APP_CONFIG; use self::relay::{DeliveredPayload, RelayApi}; -use self::{ - archive_node::InfuraClient, - chain::ChainStore, - mempool::{MempoolStore, ZeroMev}, -}; pub use self::relay::RelayId; -pub async fn start_chain_data_ingest() -> Result<()> { - log::init(); - - let mut db_conn = PgConnection::connect(&APP_CONFIG.database_url).await?; - sqlx::migrate!().run(&mut db_conn).await?; - db_conn.close().await?; - - let db = PostgresCensorshipDB::new().await?; - let mempool_store = ZeroMev::new().await; - let chain_store = - Client::from_service_account_key_file(&APP_CONFIG.bigquery_service_account).await?; - let archive_node = InfuraClient::new(); - - tokio::spawn(mount_health_route()); - - let result = ingest_chain_data(&db, &chain_store, &mempool_store, &archive_node).await; - - match result { - Ok(_) => { - error!("chain data ingestion completed unexpectedly without an error"); - process::exit(1); - } - Err(err) => { - error!("chain data ingestion failed: {}", err); - process::exit(1); - } - } -} - pub async fn start_block_production_ingest() -> Result<()> { log::init(); @@ -90,100 +50,6 @@ pub async fn start_block_production_ingest() -> Result<()> { } } -async fn ingest_chain_data( - db: &impl CensorshipDB, - chain_store: &impl ChainStore, - mempool_store: &impl MempoolStore, - archive_node: &impl ArchiveNode, -) -> Result<()> { - let fetch_interval = APP_CONFIG.chain_data_interval; - - loop { - let checkpoint = db - .get_chain_checkpoint() - .await? - .unwrap_or(APP_CONFIG.backfill_until); - - let begin = Utc::now(); - let is_backfilling = begin - checkpoint > fetch_interval; - - let start_time = checkpoint; - // Stay at least 10 minutes behind current time to make sure the data is available in BigQuery - let end_time = cmp::min( - start_time + APP_CONFIG.chain_data_batch_size, - (begin - Duration::minutes(10)).round_subsecs(0), - ); - - info!( - "starting chain data ingestion from {} until {}, interval: {} minutes", - start_time, - end_time, - fetch_interval.num_minutes() - ); - - let (blocks, txs) = tokio::try_join!( - chain_store.fetch_blocks(&start_time, &end_time), - chain_store.fetch_txs(&start_time, &end_time) - )?; - - let block_count = blocks.len(); - let tx_count = txs.len(); - - info!("received {} blocks and {} txs", &block_count, &tx_count); - - let start_block = blocks.first().expect("no blocks received").block_number; - let end_block = blocks.last().expect("no blocks received").block_number; - let timestamped_txs = mempool_store - .fetch_tx_timestamps(txs, start_block, end_block) - .await?; - - db.put_chain_data(blocks, timestamped_txs).await?; - - info!( - "persisted chain data in {} seconds", - (Utc::now() - begin).num_seconds() - ); - - if !is_backfilling { - refresh_derived_data(db).await?; - run_low_balance_check(db, archive_node).await?; - info!( - "reached current time, sleeping for {} minutes", - APP_CONFIG.chain_data_interval.num_minutes() - ); - tokio::time::sleep(fetch_interval.to_std().unwrap()).await; - } - } -} -/* - Check historical balance of addresses that experienced delayed transactions, - and tag them as "low balance" if they didn't have enough balance to cover the transaction. -*/ -async fn run_low_balance_check( - db: &impl CensorshipDB, - archive_node: &impl ArchiveNode, -) -> Result<()> { - let checks = db.get_tx_low_balance_checks().await?; - - info!( - "running low balance check for {} transactions", - &checks.len() - ); - - for check in &checks { - let low_balance = archive_node.check_low_balance(check).await?; - db.update_tx_low_balance_status(&check.transaction_hash, &low_balance) - .await?; - } - - info!( - "completed low balance check for {} transactions", - &checks.len() - ); - - Ok(()) -} - /* The relay api has no way of providing both start and end slot for the payload request. This makes it difficult to deterministically fetch all the payloads for an interval. @@ -334,30 +200,6 @@ pub async fn patch_block_production_interval(start_slot: i64, end_slot: i64) -> Ok(()) } -async fn refresh_derived_data(db: &impl CensorshipDB) -> Result<()> { - let start = Utc::now(); - - info!("refreshing derived data..."); - - db.populate_tx_metadata().await?; - - info!( - "populated transaction metadata in {} seconds", - (Utc::now() - start).num_seconds() - ); - - let start_matviews = Utc::now(); - - db.refresh_matviews().await?; - - info!( - "refreshed materialized views in {} seconds", - (Utc::now() - start_matviews).num_seconds() - ); - - Ok(()) -} - type BlockProductionBatch = Vec<(RelayId, Vec)>; async fn fetch_block_production_batch(end_slot: &Option) -> Result { diff --git a/src/censorship/archive_node.rs b/src/censorship/archive_node.rs deleted file mode 100644 index 9898756..0000000 --- a/src/censorship/archive_node.rs +++ /dev/null @@ -1,18 +0,0 @@ -mod infura; - -pub use infura::InfuraClient; - -use anyhow::Result; -use async_trait::async_trait; - -pub struct TxLowBalanceCheck { - pub transaction_hash: String, - pub from_address: String, - pub block_number: i64, - pub total_value: i128, // wei -} - -#[async_trait] -pub trait ArchiveNode { - async fn check_low_balance(&self, check: &TxLowBalanceCheck) -> Result; -} diff --git a/src/censorship/archive_node/infura.rs b/src/censorship/archive_node/infura.rs deleted file mode 100644 index 615e950..0000000 --- a/src/censorship/archive_node/infura.rs +++ /dev/null @@ -1,81 +0,0 @@ -use anyhow::Result; -use async_trait::async_trait; -use serde::Deserialize; -use serde_json::json; -use tracing::info; - -use crate::censorship::env::APP_CONFIG; - -use super::{ArchiveNode, TxLowBalanceCheck}; - -pub struct InfuraClient { - client: reqwest::Client, -} - -impl InfuraClient { - pub fn new() -> Self { - Self { - client: reqwest::Client::new(), - } - } -} - -#[derive(Deserialize)] -#[allow(dead_code)] -struct InfuraResponse { - jsonrpc: String, - id: u32, - result: String, -} - -fn encode_block_number(block_number: &i64) -> String { - format!( - "0x{}", - hex::encode(block_number.to_be_bytes()).trim_start_matches('0') - ) -} - -#[async_trait] -impl ArchiveNode for InfuraClient { - async fn check_low_balance( - &self, - TxLowBalanceCheck { - from_address, - block_number, - total_value, - .. - }: &TxLowBalanceCheck, - ) -> Result { - let payload = json!({ - "jsonrpc": "2.0", - "id": 1, - "method": "eth_getBalance", - "params": [from_address, encode_block_number(block_number)] - }); - - info!("{}, block_number {}", payload, block_number); - - let response: InfuraResponse = self - .client - .post(format!( - "https://mainnet.infura.io/v3/{}", - &APP_CONFIG.infura_api_key - )) - .json(&payload) - .send() - .await? - .error_for_status()? - .json() - .await?; - - let balance = i128::from_str_radix(&response.result[2..], 16)?; - let low_balance = balance < *total_value; - - info!( - "balance: {}, required: {}, is_low {}", - balance, total_value, low_balance - ); - - Ok(low_balance) - } -} diff --git a/src/censorship/chain.rs b/src/censorship/chain.rs deleted file mode 100644 index c05ff3c..0000000 --- a/src/censorship/chain.rs +++ /dev/null @@ -1,60 +0,0 @@ -mod bigquery; -mod file; -mod util; - -use anyhow::Result; -use async_trait::async_trait; -use chrono::{DateTime, Utc}; -use serde::Deserialize; - -#[derive(Debug, Deserialize)] -pub struct Block { - pub base_fee_per_gas: i64, - pub block_hash: String, - pub block_number: i64, - pub extra_data: Option, - pub fee_recipient: String, - pub gas_limit: i64, - pub gas_used: i64, - pub logs_bloom: String, - pub parent_hash: String, - pub receipts_root: String, - pub sha3_uncles: String, - pub size: i64, - pub state_root: String, - pub timestamp: DateTime, - pub transaction_count: i64, - pub transactions_root: String, -} - -#[derive(Clone, Debug)] -pub struct Tx { - pub address_trace: Vec, - pub block_number: i64, - pub block_timestamp: DateTime, - pub from_address: String, - pub gas: i64, - pub gas_price: i64, - pub input: Option, - pub max_fee_per_gas: Option, - pub max_priority_fee_per_gas: Option, - pub nonce: i64, - pub prev_nonce_timestamp: Option>, - pub receipt_contract_address: Option, - pub receipt_cumulative_gas_used: i64, - pub receipt_effective_gas_price: Option, - pub receipt_gas_used: i64, - pub receipt_status: i64, - pub to_address: Option, - pub transaction_hash: String, - pub transaction_index: i64, - pub transaction_type: i64, - pub value: String, -} - -#[async_trait] -pub trait ChainStore { - // start is our previous checkpoint, so start = exclusive, end = inclusive - async fn fetch_blocks(&self, start: &DateTime, end: &DateTime) -> Result>; - async fn fetch_txs(&self, start: &DateTime, end: &DateTime) -> Result>; -} diff --git a/src/censorship/db.rs b/src/censorship/db.rs index 88770b0..5847683 100644 --- a/src/censorship/db.rs +++ b/src/censorship/db.rs @@ -2,26 +2,15 @@ mod postgres; use anyhow::Result; use async_trait::async_trait; -use chrono::{DateTime, Utc}; -use super::{ - archive_node::TxLowBalanceCheck, chain::Block, mempool::TaggedTx, relay::DeliveredPayload, -}; +use super::relay::DeliveredPayload; pub use postgres::PostgresCensorshipDB; #[async_trait] pub trait CensorshipDB { - // checkpoint for onchain data - async fn get_chain_checkpoint(&self) -> Result>>; - // slot_number checkpoint for offchain (relay) data + // slot_number checkpoint for offchain relay data async fn get_block_production_checkpoint(&self) -> Result>; - async fn get_tx_low_balance_checks(&self) -> Result>; - #[allow(clippy::ptr_arg)] - async fn update_tx_low_balance_status(&self, tx_hash: &String, status: &bool) -> Result<()>; - async fn put_chain_data(&self, blocks: Vec, txs: Vec) -> Result<()>; // this method is idempotent for a given set of relays async fn upsert_delivered_payloads(&self, payloads: Vec) -> Result<()>; - async fn populate_tx_metadata(&self) -> Result; - async fn refresh_matviews(&self) -> Result<()>; } diff --git a/src/censorship/db/postgres.rs b/src/censorship/db/postgres.rs index c54aa12..cc566e7 100644 --- a/src/censorship/db/postgres.rs +++ b/src/censorship/db/postgres.rs @@ -2,20 +2,14 @@ use std::str::FromStr; use anyhow::Result; use async_trait::async_trait; -use chrono::{DateTime, Utc}; -use itertools::Itertools; use sqlx::{ postgres::{PgConnectOptions, PgPoolOptions}, types::BigDecimal, - ConnectOptions, Pool, Postgres, QueryBuilder, + ConnectOptions, Pool, Postgres, }; -use tracing::warn; use super::CensorshipDB; -use crate::censorship::{ - archive_node::TxLowBalanceCheck, chain::Block, env::APP_CONFIG, mempool::TaggedTx, - relay::DeliveredPayload, -}; +use crate::censorship::{env::APP_CONFIG, relay::DeliveredPayload}; pub struct PostgresCensorshipDB { pool: Pool, @@ -37,22 +31,8 @@ impl PostgresCensorshipDB { } } -// It's not great but these are used to avoid the bind limit -// when batch inserting rows, and have to be kept up to date -const BIND_LIMIT: usize = 65535; -const BLOCK_NUM_KEYS: usize = 17; -const TX_NUM_KEYS: usize = 23; -const TIMESTAMP_NUM_KEYS: usize = 4; - #[async_trait] impl CensorshipDB for PostgresCensorshipDB { - async fn get_chain_checkpoint(&self) -> Result>> { - sqlx::query_scalar!("SELECT timestamp FROM blocks ORDER BY timestamp DESC LIMIT 1") - .fetch_optional(&self.pool) - .await - .map_err(Into::into) - } - async fn get_block_production_checkpoint(&self) -> Result> { sqlx::query_scalar!( " @@ -67,56 +47,6 @@ impl CensorshipDB for PostgresCensorshipDB { .map_err(Into::into) } - async fn get_tx_low_balance_checks(&self) -> Result> { - sqlx::query!( - r#" - SELECT - tx.transaction_hash, - tx.from_address, - (tx.value + tx.receipt_effective_gas_price * tx.receipt_gas_used)::text AS total_value, - tx.block_number - txd.blocksdelay AS block_number - FROM - transactions_data txd - INNER JOIN - transactions tx - ON tx.transaction_hash = txd.transaction_hash - WHERE - txd.blocksdelay > 2 - AND minertransaction + lowbasefee + congested + lowtip = 0 - AND low_balance = 9 - "# - ) - .fetch_all(&self.pool) - .await - .map(|rows| rows.into_iter().map(|row| TxLowBalanceCheck { - transaction_hash: row.transaction_hash, - from_address: row.from_address, - total_value: row.total_value.unwrap().parse().unwrap(), - block_number: row.block_number.unwrap() - }).collect()) - .map_err(Into::into) - } - - async fn update_tx_low_balance_status( - &self, - tx_hash: &String, - low_balance: &bool, - ) -> Result<()> { - let status = if *low_balance { 1 } else { 0 }; - sqlx::query!( - " - UPDATE transactions_data - SET low_balance = $1 - WHERE transaction_hash = $2 - ", - status, - tx_hash - ) - .execute(&self.pool) - .await?; - Ok(()) - } - async fn upsert_delivered_payloads(&self, payloads: Vec) -> Result<()> { for DeliveredPayload { slot_number, @@ -152,218 +82,4 @@ impl CensorshipDB for PostgresCensorshipDB { } Ok(()) } - - async fn put_chain_data(&self, blocks: Vec, txs: Vec) -> Result<()> { - let block_chunks = blocks - .into_iter() - .chunks(BIND_LIMIT / BLOCK_NUM_KEYS) - .into_iter() - .map(|chunk| chunk.collect_vec()) - .collect_vec(); - - for chunk in block_chunks { - let mut query_builder: QueryBuilder = QueryBuilder::new( - " - INSERT INTO blocks ( - base_fee_per_gas, - block_hash, - block_number, - extra_data, - fee_recipient, - gas_limit, - gas_used, - logs_bloom, - parent_hash, - receipts_root, - sha3_uncles, - size, - state_root, - timestamp, - timestamp_unix, - transaction_count, - transactions_root - ) - ", - ); - - query_builder.push_values(chunk, |mut builder, block| { - builder - .push(block.base_fee_per_gas) - .push_bind(block.block_hash) - .push_bind(block.block_number) - .push_bind(block.extra_data) - .push_bind(block.fee_recipient) - .push_bind(block.gas_limit) - .push_bind(block.gas_used) - .push_bind(block.logs_bloom) - .push_bind(block.parent_hash) - .push_bind(block.receipts_root) - .push_bind(block.sha3_uncles) - .push_bind(block.size) - .push_bind(block.state_root) - .push_bind(block.timestamp) - .push_bind(block.timestamp.timestamp()) - .push_bind(block.transaction_count) - .push_bind(block.transactions_root); - }); - - query_builder.push("ON CONFLICT (block_hash) DO NOTHING"); - - query_builder.build().execute(&self.pool).await?; - } - - let tx_chunks = &txs - .iter() - .chunks(BIND_LIMIT / TX_NUM_KEYS) - .into_iter() - .map(|chunk| chunk.collect_vec()) - .collect_vec(); - - for chunk in tx_chunks { - let mut query_builder: QueryBuilder = QueryBuilder::new( - " - INSERT INTO transactions ( - address_trace, - block_number, - block_timestamp, - block_timestamp_unix, - from_address, - gas, - gas_price, - input, - max_fee_per_gas, - max_priority_fee_per_gas, - nonce, - receipt_contract_address, - receipt_cumulative_gas_used, - receipt_effective_gas_price, - receipt_gas_used, - receipt_status, - to_address, - transaction_hash, - transaction_index, - transaction_type, - value, - prev_nonce_timestamp, - prev_nonce_timestamp_unix - ) - ", - ); - - query_builder.push_values(chunk, |mut builder, TaggedTx { tx, .. }| { - builder - .push_bind(tx.address_trace.clone()) - .push_bind(tx.block_number) - .push_bind(tx.block_timestamp) - .push_bind(tx.block_timestamp.timestamp()) - .push_bind(tx.from_address.clone()) - .push_bind(tx.gas) - .push_bind(tx.gas_price) - .push_bind(tx.input.clone()) - .push_bind(tx.max_fee_per_gas) - .push_bind(tx.max_priority_fee_per_gas) - .push_bind(tx.nonce) - .push_bind(tx.receipt_contract_address.clone()) - .push_bind(tx.receipt_cumulative_gas_used) - .push_bind(tx.receipt_effective_gas_price) - .push_bind(tx.receipt_gas_used) - .push_bind(tx.receipt_status) - .push_bind(tx.to_address.clone()) - .push_bind(tx.transaction_hash.clone()) - .push_bind(tx.transaction_index) - .push_bind(tx.transaction_type) - .push(format_args!("{}::numeric", tx.value.clone())) - .push_bind(tx.prev_nonce_timestamp) - .push_bind(tx.prev_nonce_timestamp.map(|t| t.timestamp())); - }); - - query_builder.build().execute(&self.pool).await?; - } - - let timestamp_chunks = txs - .into_iter() - .flat_map(|TaggedTx { timestamps, tx }| { - timestamps - .into_iter() - .map(move |ts| (tx.transaction_hash.clone(), ts)) - }) - .chunks(BIND_LIMIT / TIMESTAMP_NUM_KEYS) - .into_iter() - .map(|chunk| chunk.collect_vec()) - .collect_vec(); - - for chunk in timestamp_chunks { - let mut query_builder: QueryBuilder = QueryBuilder::new( - " - INSERT INTO mempool_timestamps ( - transaction_hash, - source_id, - timestamp, - timestamp_unix - ) - ", - ); - - query_builder.push_values(chunk, |mut builder, (tx_hash, ts)| { - builder - .push_bind(tx_hash) - .push_bind(ts.id.to_string()) - .push_bind(ts.timestamp) - .push_bind(ts.timestamp.timestamp()); - }); - - query_builder.build().execute(&self.pool).await?; - } - - Ok(()) - } - - async fn populate_tx_metadata(&self) -> Result { - let insert_count = sqlx::query_file_scalar!("sql/populate_tx_metadata.sql") - .fetch_one(&self.pool) - .await?; - - Ok(insert_count.unwrap_or(0)) - } - - async fn refresh_matviews(&self) -> Result<()> { - let matviews = vec![ - "builders_7d", - "builders_30d", - "builder_blocks_7d", - "builder_blocks_30d", - "censored_transactions_7d", - "censored_transactions_30d", - "inclusion_delay_7d", - "inclusion_delay_30d", - "operators_all", - "top_7d", - "top_30d", - "relay_censorship_7d", - "relay_censorship_30d", - "censorship_delay_7d", - "censorship_delay_30d", - ]; - - for matview in matviews { - let result = sqlx::query(&format!( - "REFRESH MATERIALIZED VIEW CONCURRENTLY {}", - matview - )) - .execute(&self.pool) - .await; - - if let Err(err) = result { - warn!( - "error refreshing matview {} concurrently: {}. retrying without concurrent", - matview, err - ); - sqlx::query(&format!("REFRESH MATERIALIZED VIEW {}", matview)) - .execute(&self.pool) - .await?; - } - } - - Ok(()) - } } diff --git a/src/censorship/env.rs b/src/censorship/env.rs index d176f4b..a420739 100644 --- a/src/censorship/env.rs +++ b/src/censorship/env.rs @@ -1,22 +1,13 @@ -use chrono::{DateTime, Duration, Utc}; use lazy_static::lazy_static; use serde::Deserialize; -use crate::env::{deserialize_duration_minutes, get_app_config}; +use crate::env::get_app_config; #[derive(Deserialize)] pub struct AppConfig { pub port: u16, pub database_url: String, - pub zeromev_database_url: String, - pub bigquery_service_account: String, - pub infura_api_key: String, - pub backfill_until: DateTime, pub backfill_until_slot: i64, - #[serde(deserialize_with = "deserialize_duration_minutes")] - pub chain_data_interval: Duration, - #[serde(deserialize_with = "deserialize_duration_minutes")] - pub chain_data_batch_size: Duration, } lazy_static! { diff --git a/src/censorship/mempool.rs b/src/censorship/mempool.rs deleted file mode 100644 index 929914f..0000000 --- a/src/censorship/mempool.rs +++ /dev/null @@ -1,76 +0,0 @@ -mod zeromev; - -use anyhow::Result; -use async_trait::async_trait; -use chrono::{DateTime, Utc}; -use std::{fmt, str::FromStr}; - -use super::chain::Tx; -pub use zeromev::ZeroMev; - -// service for timestamping transactions based on their appearance in the mempool(s) -#[async_trait] -pub trait MempoolStore { - async fn fetch_tx_timestamps( - &self, - mut txs: Vec, - start_block: i64, - end_block: i64, - ) -> Result>; -} - -#[derive(Clone)] -pub struct MempoolTimestamp { - pub id: SourceId, - pub timestamp: DateTime, -} - -#[derive(Clone)] -pub struct TaggedTx { - pub timestamps: Vec, - pub tx: Tx, -} - -#[derive(Debug, PartialEq, Clone, Copy)] -#[allow(clippy::enum_variant_names)] -pub enum SourceId { - ZeroMevInf, - ZeroMevQn, - ZeroMevUs, - ZeroMevEu, - ZeroMevAs, -} - -#[derive(Debug, PartialEq, Eq)] -pub struct ParseExtractorIdError(String); - -impl FromStr for SourceId { - type Err = ParseExtractorIdError; - - fn from_str(s: &str) -> Result { - match s { - "Inf" => Ok(SourceId::ZeroMevInf), - "Qn" => Ok(SourceId::ZeroMevQn), - "US" => Ok(SourceId::ZeroMevUs), - "EU" => Ok(SourceId::ZeroMevEu), - "AS" => Ok(SourceId::ZeroMevAs), - unknown => Err(ParseExtractorIdError(format!( - "unknown extractor id: {}", - unknown - ))), - } - } -} - -impl fmt::Display for SourceId { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - let str = match &self { - SourceId::ZeroMevInf => "zeromev-inf", - SourceId::ZeroMevQn => "zeromev-qn", - SourceId::ZeroMevUs => "zeromev-us", - SourceId::ZeroMevEu => "zeromev-eu", - SourceId::ZeroMevAs => "zeromev-as", - }; - write!(f, "{}", str) - } -} diff --git a/src/censorship/mempool/zeromev.rs b/src/censorship/mempool/zeromev.rs deleted file mode 100644 index cd2a624..0000000 --- a/src/censorship/mempool/zeromev.rs +++ /dev/null @@ -1,192 +0,0 @@ -mod format; - -use anyhow::Result; -use async_trait::async_trait; -use chrono::{DateTime, Duration, Utc}; -use itertools::Itertools; -use sqlx::{postgres::PgPoolOptions, Pool, Postgres, Row}; -use std::str::FromStr; -use tracing::{debug, warn}; - -use self::format::{parse_tx_data, TxTuple}; -use super::{MempoolStore, MempoolTimestamp, SourceId, TaggedTx, Tx}; -use crate::censorship::env::APP_CONFIG; - -pub struct ZeroMev { - db_pool: Pool, -} - -impl ZeroMev { - pub async fn new() -> Self { - let db_pool = PgPoolOptions::new() - .max_connections(3) - .acquire_timeout(Duration::seconds(3).to_std().unwrap()) - .connect(&APP_CONFIG.zeromev_database_url) - .await - .expect("can't connect to zeromev database"); - - Self { db_pool } - } -} - -pub type BlockNumber = i64; - -#[derive(Clone)] -struct BlockExtractorRow { - block_number: BlockNumber, - block_time: DateTime, - extractor: SourceId, - tx_data: Vec, -} - -fn tag_transactions( - mut txs: Vec, - mut rows: Vec, - start_block: i64, - end_block: i64, -) -> Vec { - // we rely on transaction index to associate timestamps from zeromev to transactions - // on our side, so we need to make sure everything is sorted - txs.sort_by_key(|tx| (tx.block_number, tx.transaction_index)); - rows.sort_by_key(|row| row.block_number); - - let block_numbers = start_block..=end_block; - - let data_by_block = block_numbers - .map(|block_number| { - let block_txs = txs - .iter() - .skip_while(|tx| tx.block_number < block_number) - .take_while(|tx| tx.block_number == block_number) - .map(|tx| tx.to_owned()) - .collect_vec(); - - let block_extractors = rows - .iter() - .skip_while(|row| row.block_number < block_number) - .take_while(|row| row.block_number == block_number) - .map(|row| row.to_owned()) - .collect_vec(); - - let valid_extractors = filter_extractor_rows(block_txs.len(), block_extractors); - - let missing_extractors = !block_txs.is_empty() && valid_extractors.is_empty(); - - if missing_extractors { - warn!( - "no extractors for block {} ({} txs)", - block_number, - block_txs.len() - ); - } - - (block_number, block_txs, valid_extractors) - }) - .collect_vec(); - - data_by_block - .iter() - .flat_map(|(_, txs, extractors)| { - txs.iter().map(|tx| { - let timestamps: Vec = extractors - .iter() - .map(|ex| MempoolTimestamp { - id: ex.extractor, - timestamp: ex - .tx_data - .get(usize::try_from(tx.transaction_index).unwrap()) - .expect("expected extractor data to contain transaction index") - .0, - }) - .collect(); - - if timestamps.is_empty() { - warn!("no mempool timestamps for tx {}", &tx.transaction_hash); - } - - TaggedTx { - timestamps, - tx: tx.clone(), - } - }) - }) - .collect() -} - -// First filter out extractors that don't have a tx count that matches what's on chain, -// then pick the most recent extractor if there are multiple -fn filter_extractor_rows( - expected_tx_count: usize, - rows: Vec, -) -> Vec { - rows.into_iter() - .filter(|row| row.tx_data.len() == expected_tx_count) - .group_by(|row| row.extractor) - .into_iter() - .map(|(_, group)| group.max_by_key(|row| row.block_time).unwrap()) - .collect() -} - -// when there's an empty block, this is what tx_data will be set to -const GZIP_HEADER_HEX: &str = "\\x1f8b080000000000000303000000000000000000"; - -#[async_trait] -impl MempoolStore for ZeroMev { - async fn fetch_tx_timestamps( - &self, - txs: Vec, - start_block: i64, - end_block: i64, - ) -> Result> { - let query = format!( - " - SELECT - block_number, - block_time, - extractor.code AS extractor, - tx_data - FROM - extractor_block - INNER JOIN - extractor USING (extractor_index) - WHERE - block_number >= {} - AND block_number <= {} - AND tx_data != '{}' - ORDER BY - block_number, extractor - ", - start_block, end_block, &GZIP_HEADER_HEX - ); - - let rows: Vec = - sqlx::query(&query) - .fetch_all(&self.db_pool) - .await - .map(|rows| { - rows.iter() - .map(|row| BlockExtractorRow { - block_number: row.get("block_number"), - block_time: row.get("block_time"), - extractor: SourceId::from_str(&row.get::("extractor")) - .expect("failed to parse extractor id"), - tx_data: parse_tx_data(row.get("tx_data")), - }) - .collect() - })?; - - Ok(tag_transactions(txs, rows, start_block, end_block)) - } -} - -#[allow(dead_code)] -fn log_non_consecutive(id: &str, ns: Vec<&i64>) { - let first = ns.first().unwrap(); - ns.iter().skip(1).fold(*first, |prev, n| { - if *n - prev > 1 { - debug!("{}: non-consecutive: {}", id, &n); - return n; - } - n - }); -} diff --git a/src/censorship/mempool/zeromev/format.rs b/src/censorship/mempool/zeromev/format.rs deleted file mode 100644 index a0cf7dc..0000000 --- a/src/censorship/mempool/zeromev/format.rs +++ /dev/null @@ -1,70 +0,0 @@ -use std::{io::Read, ops::Add}; - -use chrono::{DateTime, Duration, Utc}; -use flate2::read::GzDecoder; -use lazy_static::lazy_static; - -use super::BlockNumber; - -// this module exposes a helper for dealing with zeromev custom binary format - -// timestamp, block_number tuple. these are mapped to transactions by their index in the vector (equal to tx_index) -pub type TxTuple = (DateTime, BlockNumber); - -lazy_static! { - static ref CENTURY_START: DateTime = - DateTime::parse_from_rfc3339("0001-01-01T00:00:00.00Z") - .unwrap() - .with_timezone(&Utc); -} - -fn from_dotnet_ticks(ticks: i64) -> DateTime { - CENTURY_START.add(Duration::microseconds(ticks / 10)) -} - -fn to_i64_vec(bytes: Vec) -> Vec { - assert!( - bytes.len() % 8 == 0, - "expected byte array to be evenly divisible by 8" - ); - - bytes - .chunks(8) - .map(|slice| i64::from_le_bytes(slice.try_into().unwrap())) - .collect() -} - -pub fn parse_tx_data(bytes: Vec) -> Vec { - let mut decomp_bytes = vec![]; - - GzDecoder::new(&bytes[..]) - .read_to_end(&mut decomp_bytes) - .expect("failed to gzip decode zeromev tx_data bytes"); - - let i64_vec = to_i64_vec(decomp_bytes); - - assert!( - i64_vec.len() % 2 == 0, - "expected i64 array to be evenly divisible by 2" - ); - - i64_vec - .chunks(2) - .map(|slice| (from_dotnet_ticks(slice[0]), slice[1])) - .collect() -} - -#[cfg(test)] -mod tests { - use super::to_i64_vec; - - #[test] - fn test_bytes_to_i64_vec() { - let bytes: Vec = vec![1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0]; - let result = to_i64_vec(bytes); - - assert_eq!(result.len(), 2); - assert_eq!(result[0], 1); - assert_eq!(result[1], 1); - } -} diff --git a/src/env.rs b/src/env.rs index a667d31..64f43fd 100644 --- a/src/env.rs +++ b/src/env.rs @@ -1,4 +1,3 @@ -use chrono::Duration; use reqwest::Url; use serde::{ de::{DeserializeOwned, Error}, @@ -105,14 +104,6 @@ where .collect() } -pub fn deserialize_duration_minutes<'de, D>(deserializer: D) -> Result -where - D: Deserializer<'de>, -{ - let num: i64 = Deserialize::deserialize(deserializer)?; - Ok(Duration::minutes(num)) -} - pub fn get_app_config() -> T { match envy::from_env::() { Ok(config) => config, diff --git a/src/lib.rs b/src/lib.rs index 0082230..d962ec8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -7,6 +7,5 @@ mod serve; pub use censorship::patch_block_production_interval; pub use censorship::start_block_production_ingest; -pub use censorship::start_chain_data_ingest; pub use phoenix::monitor_critical_services; pub use serve::start_server;