From 2b4937ba0cbbdae5f9ad320e777f61d4f0fea77a Mon Sep 17 00:00:00 2001 From: Daniel Xifra Date: Wed, 30 Oct 2024 17:28:24 -0300 Subject: [PATCH] new block processor with signed protocol. --- Cargo.lock | 11 ++ Cargo.toml | 14 ++- config-live-example.toml | 1 + src/blocks_processor.rs | 156 ++++++++++++++++--------- src/flashbots_config.rs | 92 +++++++++++++-- src/flashbots_signer.rs | 244 +++++++++++++++++++++++++++++++++++++++ src/lib.rs | 1 + 7 files changed, 452 insertions(+), 67 deletions(-) create mode 100644 src/flashbots_signer.rs diff --git a/Cargo.lock b/Cargo.lock index f9e41af..f749b59 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7208,6 +7208,11 @@ dependencies = [ "alloy-json-rpc", "alloy-primitives 0.8.0", "alloy-provider", + "alloy-rpc-client", + "alloy-signer", + "alloy-signer-local", + "alloy-transport", + "alloy-transport-http", "built", "clap", "clickhouse", @@ -7216,6 +7221,9 @@ dependencies = [ "eyre", "flume", "futures", + "futures-util", + "http 0.2.12", + "hyper 0.14.30", "itertools 0.11.0", "jsonrpsee 0.20.3", "lazy_static", @@ -7227,9 +7235,11 @@ dependencies = [ "rand 0.8.5", "rbuilder", "redis", + "reqwest 0.11.27", "reth", "reth-db", "reth-payload-builder", + "secp256k1", "serde", "serde_json", "serde_with", @@ -7244,6 +7254,7 @@ dependencies = [ "tower", "tracing", "tungstenite 0.23.0", + "url", "uuid", ] diff --git a/Cargo.toml b/Cargo.toml index c8e9b4f..b09c276 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,9 +41,17 @@ prost = "0.11" tokio-stream = { version = "0.1", features = ["net"] } futures = "0.3" tower = "0.4" - - - +reqwest = { version = "0.11.20", features = ["blocking"] } +secp256k1 = { version = "0.29" } +alloy-signer-local = { version = "0.3.0" } +alloy-signer = { version = "0.3.0" } +alloy-transport-http = { version = "0.3.0" } +alloy-transport = { version = "0.3.0" } +alloy-rpc-client = { version = "0.3.0" } +url = "2.4.1" +http = "0.2.9" +hyper = "0.14" +futures-util = "0.3" metrics_macros = { git = "https://github.com/flashbots/rbuilder.git", rev = "d96e7215483bac0ab145459f4ddaa811d99459d6"} diff --git a/config-live-example.toml b/config-live-example.toml index 149ef4f..8a144e8 100644 --- a/config-live-example.toml +++ b/config-live-example.toml @@ -22,6 +22,7 @@ dry_run = true dry_run_validation_url = "http://localhost:8545" blocks_processor_url = "http://block_processor.internal" +key_registration_url = "http://127.0.0.1:8090" ignore_cancellable_orders = true sbundle_mergeabe_signers = [] diff --git a/src/blocks_processor.rs b/src/blocks_processor.rs index 5a4b0fe..b13c8e7 100644 --- a/src/blocks_processor.rs +++ b/src/blocks_processor.rs @@ -1,6 +1,8 @@ -use alloy_json_rpc::RpcError; use alloy_primitives::{BlockHash, U256}; -use alloy_provider::Provider; +use jsonrpsee::{ + core::{client::ClientT, traits::ToRpcParams}, + http_client::{HttpClient, HttpClientBuilder}, +}; use rbuilder::{ building::BuiltBlockTrace, live_builder::block_output::bid_observer::BidObserver, @@ -9,10 +11,11 @@ use rbuilder::{ serialize::{RawBundle, RawShareBundle}, Order, }, - utils::{error_storage::store_error_event, http_provider, BoxedProvider}, + utils::error_storage::store_error_event, }; use reth::primitives::SealedBlock; use serde::{Deserialize, Serialize}; +use serde_json::value::RawValue; use serde_with::{serde_as, DisplayFromStr}; use std::sync::Arc; use time::format_description::well_known; @@ -21,11 +24,8 @@ use tracing::{debug, error, warn, Span}; use crate::metrics::inc_blocks_api_errors; const BLOCK_PROCESSOR_ERROR_CATEGORY: &str = "block_processor"; - -#[derive(Debug, Clone)] -pub struct BlocksProcessorClient { - client: Arc, -} +const DEFAULT_BLOCK_CONSUME_BUILT_BLOCK_METHOD: &str = "block_consumeBuiltBlockV2"; +pub const SIGNED_BLOCK_CONSUME_BUILT_BLOCK_METHOD: &str = "flashbots_consumeBuiltBlockV2"; #[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] @@ -64,13 +64,68 @@ struct BlocksProcessorHeader { pub number: Option, } -impl BlocksProcessorClient { +type ConsumeBuiltBlockRequest = ( + BlocksProcessorHeader, + String, + String, + Vec, + Vec, + Vec, + reth::rpc::types::beacon::relay::BidTrace, + String, + U256, + U256, +); + +/// Struct to avoid copying ConsumeBuiltBlockRequest since HttpClient::request eats the parameter. +#[derive(Clone)] +struct ConsumeBuiltBlockRequestArc { + inner: Arc, +} + +impl ConsumeBuiltBlockRequestArc { + fn new(request: ConsumeBuiltBlockRequest) -> Self { + Self { + inner: Arc::new(request), + } + } + fn as_ref(&self) -> &ConsumeBuiltBlockRequest { + self.inner.as_ref() + } +} + +impl ToRpcParams for ConsumeBuiltBlockRequestArc { + fn to_rpc_params(self) -> Result>, jsonrpsee::core::Error> { + let json = serde_json::to_string(self.inner.as_ref()) + .map_err(jsonrpsee::core::Error::ParseError)?; + RawValue::from_string(json) + .map(Some) + .map_err(jsonrpsee::core::Error::ParseError) + } +} + +#[derive(Debug, Clone)] +pub struct BlocksProcessorClient { + client: HttpClientType, + consume_built_block_method: &'static str, +} + +impl BlocksProcessorClient { pub fn try_from(url: &str) -> eyre::Result { Ok(Self { - client: Arc::new(http_provider(url.parse()?)), + client: HttpClientBuilder::default().build(url)?, + consume_built_block_method: DEFAULT_BLOCK_CONSUME_BUILT_BLOCK_METHOD, }) } +} +impl BlocksProcessorClient { + pub fn new(client: HttpClientType, consume_built_block_method: &'static str) -> Self { + Self { + client, + consume_built_block_method, + } + } pub async fn submit_built_block( &self, sealed_block: &SealedBlock, @@ -115,7 +170,7 @@ impl BlocksProcessorClient { let used_share_bundles = Self::get_used_sbundles(built_block_trace); - let params = ( + let params: ConsumeBuiltBlockRequest = ( header, closed_at, sealed_at, @@ -127,51 +182,15 @@ impl BlocksProcessorClient { built_block_trace.true_bid_value, best_bid_value, ); - + let request = ConsumeBuiltBlockRequestArc::new(params); match self .client - .raw_request("block_consumeBuiltBlockV2".into(), ¶ms) + .request(self.consume_built_block_method, request.clone()) .await { Ok(()) => {} Err(err) => { - match &err { - RpcError::ErrorResp(err) => { - error!(err = ?err, "Block processor returned error"); - store_error_event( - BLOCK_PROCESSOR_ERROR_CATEGORY, - &err.to_string(), - ¶ms, - ); - } - RpcError::SerError(err) => { - error!(err = ?err, "Failed to serialize block processor request"); - } - RpcError::DeserError { err, text } => { - if !(text.contains("504 Gateway Time-out") - || text.contains("502 Bad Gateway")) - { - error!(err = ?err, "Failed to deserialize block processor response"); - store_error_event( - BLOCK_PROCESSOR_ERROR_CATEGORY, - &err.to_string(), - ¶ms, - ); - } - } - RpcError::Transport(err) => { - debug!(err = ?err, "Failed to send block processor request"); - } - RpcError::NullResp => { - error!("Block processor returned null response"); - } - RpcError::UnsupportedFeature(err) => { - error!(err = ?err, "Unsupported feature"); - } - RpcError::LocalUsageError(err) => { - error!(err = ?err, "Local usage error"); - } - } + Self::handle_rpc_error(&err, request.as_ref()); return Err(err.into()); } } @@ -179,6 +198,31 @@ impl BlocksProcessorClient { Ok(()) } + /// T is parametric just because it too BIG to type + fn handle_rpc_error(err: &jsonrpsee::core::Error, request: &ConsumeBuiltBlockRequest) { + match err { + jsonrpsee::core::Error::Call(error_object) => { + error!(err = ?error_object, "Block processor returned error"); + store_error_event(BLOCK_PROCESSOR_ERROR_CATEGORY, &err.to_string(), request); + } + jsonrpsee::core::Error::Transport(_) => { + debug!(err = ?err, "Failed to send block processor request"); + } + jsonrpsee::core::Error::ParseError(error) => { + error!(err = ?err, "Failed to deserialize block processor response"); + let error_txt = error.to_string(); + if !(error_txt.contains("504 Gateway Time-out") + || error_txt.contains("502 Bad Gateway")) + { + store_error_event(BLOCK_PROCESSOR_ERROR_CATEGORY, &err.to_string(), request); + } + } + _ => { + error!(err = ?err, "Block processor error"); + } + } + } + /// Gets the UsedSbundle carefully considering virtual orders formed by other original orders. fn get_used_sbundles(built_block_trace: &BuiltBlockTrace) -> Vec { built_block_trace @@ -229,17 +273,19 @@ impl BlocksProcessorClient { /// BidObserver sending all data to a BlocksProcessorClient #[derive(Debug)] -pub struct BlocksProcessorClientBidObserver { - client: BlocksProcessorClient, +pub struct BlocksProcessorClientBidObserver { + client: BlocksProcessorClient, } -impl BlocksProcessorClientBidObserver { - pub fn new(client: BlocksProcessorClient) -> Self { +impl BlocksProcessorClientBidObserver { + pub fn new(client: BlocksProcessorClient) -> Self { Self { client } } } -impl BidObserver for BlocksProcessorClientBidObserver { +impl BidObserver + for BlocksProcessorClientBidObserver +{ fn block_submitted( &self, sealed_block: SealedBlock, diff --git a/src/flashbots_config.rs b/src/flashbots_config.rs index c82cde7..ebe7608 100644 --- a/src/flashbots_config.rs +++ b/src/flashbots_config.rs @@ -1,7 +1,10 @@ //! Config should always be deserializable, default values should be used //! This code has lots of copy/paste from the example config but it's not really copy/paste since we use our own private types. //! @Pending make this copy/paste generic code on the library +use alloy_signer_local::PrivateKeySigner; use eyre::Context; +use http::StatusCode; +use jsonrpsee::http_client::HttpClientBuilder; use jsonrpsee::RpcModule; use rbuilder::building::builders::merging_builder::merging_build_backtest; use rbuilder::building::builders::UnfinishedBlockBuildingSinkFactory; @@ -33,13 +36,19 @@ use reth_db::DatabaseEnv; use serde::Deserialize; use serde_with::serde_as; use tokio_util::sync::CancellationToken; +use tower::ServiceBuilder; use tracing::{error, warn}; +use url::Url; use crate::best_bid_ws::BestBidWSConnector; use crate::bidding_service_wrapper::client::bidding_service_client_adapter::BiddingServiceClientAdapter; use crate::block_descriptor_bidding::bidding_service_adapter::BiddingServiceAdapter; -use crate::blocks_processor::{BlocksProcessorClient, BlocksProcessorClientBidObserver}; +use crate::blocks_processor::{ + BlocksProcessorClient, BlocksProcessorClientBidObserver, + SIGNED_BLOCK_CONSUME_BUILT_BLOCK_METHOD, +}; use crate::build_info::rbuilder_version; +use crate::flashbots_signer::FlashbotsSignerLayer; use crate::true_block_value_push::unfinished_block_building_sink_factory_wrapper::UnfinishedBlockBuildingSinkFactoryWrapper; use clickhouse::Client; @@ -94,9 +103,12 @@ pub struct FlashbotsConfig { /// selected builder configurations pub builders: Vec, + /// If this is Some then signed mode is used for blocks_processor and tbv_push is done via blocks_processor_url (signed block-processor also handles flashbots_reportBestTrueValue) + pub key_registration_url: Option, + pub blocks_processor_url: Option, - /// For production should always be Some since it's used by smart-multiplexing. + /// For production: Some <=> key_registration_url is Some since it's used by smart-multiplexing. tbv_push_redis: Option, } @@ -184,6 +196,16 @@ async fn handle_subsidise_block( }; } +#[derive(thiserror::Error, Debug)] +enum RegisterKeyError { + #[error("Register key error parsing url: {0:?}")] + UrlParse(#[from] url::ParseError), + #[error("Register key network error: {0:?}")] + Network(#[from] reqwest::Error), + #[error("Register key service error: {0:?}")] + Service(StatusCode), +} + impl FlashbotsConfig { /// Returns the BiddingService + an optional FlashbotsBlockSubsidySelector so smart multiplexing can force blocks. /// FlashbotsBlockSubsidySelector can be None if subcidy is disabled. @@ -200,6 +222,59 @@ impl FlashbotsConfig { Ok(Box::new(BiddingServiceAdapter::new(client))) } + /// Creates a new PrivateKeySigner and registers the associated address on key_registration_url + async fn register_key( + &self, + key_registration_url: &str, + ) -> Result { + let signer = PrivateKeySigner::random(); + let client = reqwest::Client::new(); + let url = { + let mut url = Url::parse(key_registration_url)?; + url.set_path("/api/l1-builder/v1/register_credentials/rbuilder"); + url + }; + let body = format!("{{ \"ecdsa_pubkey_address\": \"{}\" }}", signer.address()); + let res = client.post(url).body(body).send().await?; + if res.status().is_success() { + Ok(signer) + } else { + Err(RegisterKeyError::Service(res.status())) + } + } + + /// Depending on the cfg may create: + /// - Dummy sink (no blocks_processor_url) + /// - Standard block processor client + /// - Secure block processor client (using block_processor_key to sign) + fn create_block_processor_client( + &self, + block_processor_key: Option, + ) -> eyre::Result> { + let bid_observer: Box = + if let Some(url) = &self.blocks_processor_url { + if let Some(block_processor_key) = block_processor_key { + let signing_middleware = FlashbotsSignerLayer::new(block_processor_key); + let service_builder = ServiceBuilder::new() + // map signer errors to http errors + .map_err(jsonrpsee::http_client::transport::Error::Http) + .layer(signing_middleware); + let client = HttpClientBuilder::default() + .set_middleware(service_builder) + .build(url)?; + let block_processor = + BlocksProcessorClient::new(client, SIGNED_BLOCK_CONSUME_BUILT_BLOCK_METHOD); + Box::new(BlocksProcessorClientBidObserver::new(block_processor)) + } else { + let client = BlocksProcessorClient::try_from(url)?; + Box::new(BlocksProcessorClientBidObserver::new(client)) + } + } else { + Box::new(NullBidObserver {}) + }; + Ok(bid_observer) + } + /// Connects (UnfinishedBlockBuildingSinkFactoryWrapper->BlockSealingBidderFactory)->RelaySubmitSinkFactory /// RelaySubmitSinkFactory: submits final blocks to relays /// BlockSealingBidderFactory: performs sealing/bidding. Sends bids to the RelaySubmitSinkFactory @@ -214,14 +289,13 @@ impl FlashbotsConfig { Vec, Arc, )> { + let block_processor_key = if let Some(key_registration_url) = &self.key_registration_url { + Some(self.register_key(key_registration_url).await?) + } else { + None + }; // RelaySubmitSinkFactory - let bid_observer: Box = - if let Some(url) = &self.blocks_processor_url { - let client = BlocksProcessorClient::try_from(url)?; - Box::new(BlocksProcessorClientBidObserver::new(client)) - } else { - Box::new(NullBidObserver {}) - }; + let bid_observer = self.create_block_processor_client(block_processor_key)?; let (sink_sealed_factory, relays) = self .l1_config .create_relays_sealed_sink_factory(self.base_config.chain_spec()?, bid_observer)?; diff --git a/src/flashbots_signer.rs b/src/flashbots_signer.rs new file mode 100644 index 0000000..9673120 --- /dev/null +++ b/src/flashbots_signer.rs @@ -0,0 +1,244 @@ +//! A layer responsible for implementing flashbots-style authentication +//! by signing the request body with a private key and adding the signature +//! to the request headers. +//! Based on https://github.com/paradigmxyz/mev-share-rs/tree/a75c5959e98a79031a89f8893c97528e8f726826 but upgraded to alloy + +use std::{ + error::Error, + task::{Context, Poll}, +}; + +use alloy_primitives::{hex, keccak256}; +use alloy_signer::Signer; +use futures_util::future::BoxFuture; + +use http::{header::HeaderValue, HeaderName, Request}; +use hyper::Body; + +use tower::{Layer, Service}; + +static FLASHBOTS_HEADER: HeaderName = HeaderName::from_static("x-flashbots-signature"); + +/// Layer that applies [`FlashbotsSigner`] which adds a request header with a signed payload. +#[derive(Clone, Debug)] +pub struct FlashbotsSignerLayer { + signer: S, +} + +impl FlashbotsSignerLayer { + /// Creates a new [`FlashbotsSignerLayer`] with the given signer. + pub fn new(signer: S) -> Self { + FlashbotsSignerLayer { signer } + } +} + +impl Layer for FlashbotsSignerLayer { + type Service = FlashbotsSigner; + + fn layer(&self, inner: I) -> Self::Service { + FlashbotsSigner { + signer: self.signer.clone(), + inner, + } + } +} + +/// Middleware that signs the request body and adds the signature to the x-flashbots-signature +/// header. For more info, see +#[derive(Clone, Debug)] +pub struct FlashbotsSigner { + signer: S, + inner: I, +} + +impl Service> for FlashbotsSigner +where + I: Service> + Clone + Send + 'static, + I::Future: Send, + I::Error: Into> + 'static, + S: Signer + Clone + Send + Sync + 'static, +{ + type Response = I::Response; + type Error = Box; + type Future = BoxFuture<'static, Result>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx).map_err(Into::into) + } + + fn call(&mut self, request: Request) -> Self::Future { + let clone = self.inner.clone(); + // wait for service to be ready + let mut inner = std::mem::replace(&mut self.inner, clone); + let signer = self.signer.clone(); + + let (mut parts, body) = request.into_parts(); + + // if method is not POST, return an error. + if parts.method != http::Method::POST { + return Box::pin(async move { + Err(format!("Invalid method: {}", parts.method.as_str()).into()) + }); + } + + // if content-type is not json, or signature already exists, just pass through the request + let is_json = parts + .headers + .get(http::header::CONTENT_TYPE) + .map(|v| v == HeaderValue::from_static("application/json")) + .unwrap_or(false); + let has_sig = parts.headers.contains_key(FLASHBOTS_HEADER.clone()); + + if !is_json || has_sig { + return Box::pin(async move { + let request = Request::from_parts(parts, body); + inner.call(request).await.map_err(Into::into) + }); + } + + // otherwise, sign the request body and add the signature to the header + Box::pin(async move { + let body_bytes = hyper::body::to_bytes(body).await?; + + // sign request body and insert header + let signature = signer + .sign_message(format!("{:?}", keccak256(&body_bytes)).as_bytes()) + .await?; + + let header_val = HeaderValue::from_str(&format!( + "{:?}:0x{}", + signer.address(), + hex::encode(signature.as_bytes()) + )) + .expect("Header contains invalid characters"); + parts.headers.insert(FLASHBOTS_HEADER.clone(), header_val); + + let request = Request::from_parts(parts, Body::from(body_bytes.clone())); + inner.call(request).await.map_err(Into::into) + }) + } +} +/* +#[cfg(test)] +mod tests { + use super::*; + use http::Response; + use hyper::Body; + use std::convert::Infallible; + use tower::{service_fn, ServiceExt}; + + #[tokio::test] + async fn test_signature() { + let fb_signer = LocalWallet::new(&mut thread_rng()); + + // mock service that returns the request headers + let svc = FlashbotsSigner { + signer: fb_signer.clone(), + inner: service_fn(|_req: Request| async { + let (parts, _) = _req.into_parts(); + + let mut res = Response::builder(); + for (k, v) in parts.headers.iter() { + res = res.header(k, v); + } + let res = res.body(Body::empty()).unwrap(); + Ok::<_, Infallible>(res) + }), + }; + + // build request + let bytes = vec![1u8; 32]; + let req = Request::builder() + .method(http::Method::POST) + .header(http::header::CONTENT_TYPE, "application/json") + .body(Body::from(bytes.clone())) + .unwrap(); + + let res = svc.oneshot(req).await.unwrap(); + + let header = res.headers().get("x-flashbots-signature").unwrap(); + let header = header.to_str().unwrap(); + let header = header.split(":0x").collect::>(); + let header_address = header[0]; + let header_signature = header[1]; + + let signer_address = format!("{:?}", fb_signer.address()); + let expected_signature = fb_signer + .sign_message(format!("0x{:x}", H256::from(keccak256(bytes.clone())))) + .await + .unwrap() + .to_string(); + + // verify that the header contains expected address and signature + assert_eq!(header_address, signer_address); + assert_eq!(header_signature, expected_signature); + } + + #[tokio::test] + async fn test_skips_non_json() { + let fb_signer = LocalWallet::new(&mut thread_rng()); + + // mock service that returns the request headers + let svc = FlashbotsSigner { + signer: fb_signer.clone(), + inner: service_fn(|_req: Request| async { + let (parts, _) = _req.into_parts(); + + let mut res = Response::builder(); + for (k, v) in parts.headers.iter() { + res = res.header(k, v); + } + let res = res.body(Body::empty()).unwrap(); + Ok::<_, Infallible>(res) + }), + }; + + // build plain text request + let bytes = vec![1u8; 32]; + let req = Request::builder() + .method(http::Method::POST) + .header(http::header::CONTENT_TYPE, "text/plain") + .body(Body::from(bytes.clone())) + .unwrap(); + + let res = svc.oneshot(req).await.unwrap(); + + // response should not contain a signature header + let header = res.headers().get("x-flashbots-signature"); + assert!(header.is_none()); + } + + #[tokio::test] + async fn test_returns_error_when_not_post() { + let fb_signer = LocalWallet::new(&mut thread_rng()); + + // mock service that returns the request headers + let svc = FlashbotsSigner { + signer: fb_signer.clone(), + inner: service_fn(|_req: Request| async { + let (parts, _) = _req.into_parts(); + + let mut res = Response::builder(); + for (k, v) in parts.headers.iter() { + res = res.header(k, v); + } + let res = res.body(Body::empty()).unwrap(); + Ok::<_, Infallible>(res) + }), + }; + + // build plain text request + let bytes = vec![1u8; 32]; + let req = Request::builder() + .method(http::Method::GET) + .header(http::header::CONTENT_TYPE, "application/json") + .body(Body::from(bytes.clone())) + .unwrap(); + + let res = svc.oneshot(req).await; + + // should be an error + assert!(res.is_err()); + } +} +*/ diff --git a/src/lib.rs b/src/lib.rs index b7ac92c..00e89f1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,5 +4,6 @@ pub mod block_descriptor_bidding; pub mod blocks_processor; pub mod build_info; pub mod flashbots_config; +pub mod flashbots_signer; pub mod metrics; mod true_block_value_push;