From 25bdfd14201dbc8f25f5bcead2f1f7a086551dfe Mon Sep 17 00:00:00 2001 From: Kolby Moroz Liebl <31669092+KolbyML@users.noreply.github.com> Date: Fri, 13 Sep 2024 09:46:35 -0600 Subject: [PATCH] fix: resolve PR concerns --- Cargo.lock | 2 +- light-client/Cargo.toml | 2 +- portal-bridge/src/api/consensus.rs | 14 +++++---- portal-bridge/src/api/execution.rs | 33 ++++++++------------- portal-bridge/src/bridge/era1.rs | 9 ++++-- portal-bridge/src/cli.rs | 47 ++++++++++++++++++++++++++---- src/bin/test_providers.rs | 41 +++++++++++++------------- trin-execution/src/era/manager.rs | 10 ++++--- 8 files changed, 95 insertions(+), 63 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ee4892fa6..0aa1e179c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3980,7 +3980,7 @@ dependencies = [ "jsonrpsee", "log", "portalnet", - "reqwest 0.11.27", + "reqwest 0.12.7", "serde", "serde-this-or-that", "serde_json", diff --git a/light-client/Cargo.toml b/light-client/Cargo.toml index e0603baa0..c3a320289 100644 --- a/light-client/Cargo.toml +++ b/light-client/Cargo.toml @@ -22,7 +22,7 @@ jsonrpsee = { workspace = true, features = ["full"] } log = "0.4.17" milagro_bls = { package="snowbridge-milagro-bls", git = "https://github.com/Snowfork/milagro_bls" } portalnet.workspace = true -reqwest = { version = "0.11.13", default-features = false, features = ["json", "rustls-tls"] } +reqwest.workspace = true serde.workspace = true serde-this-or-that.workspace = true serde_json.workspace = true diff --git a/portal-bridge/src/api/consensus.rs b/portal-bridge/src/api/consensus.rs index e69c89053..23a350e4f 100644 --- a/portal-bridge/src/api/consensus.rs +++ b/portal-bridge/src/api/consensus.rs @@ -1,12 +1,14 @@ use std::fmt::Display; use anyhow::anyhow; -use reqwest_middleware::ClientWithMiddleware; use tokio::time::sleep; use tracing::{debug, warn}; use url::Url; -use crate::{cli::url_to_client, constants::FALLBACK_RETRY_AFTER}; +use crate::{ + cli::{url_to_client, ClientWithBaseUrl}, + constants::FALLBACK_RETRY_AFTER, +}; /// Implements endpoints from the Beacon API to access data from the consensus layer. #[derive(Clone, Debug)] @@ -89,7 +91,7 @@ impl ConsensusApi { let client = url_to_client(self.primary.clone()).map_err(|err| { anyhow!("Unable to create client for primary consensus data provider: {err:?}") })?; - match client.get(&endpoint).send().await?.text().await { + match client.get(&endpoint)?.send().await?.text().await { Ok(response) => Ok(response), Err(err) => { warn!("Error requesting consensus data from provider, retrying with fallback provider: {err:?}"); @@ -98,7 +100,7 @@ impl ConsensusApi { anyhow!("Unable to create client for fallback consensus data provider: {err:?}") })?; client - .get(endpoint) + .get(endpoint)? .send() .await? .text() @@ -112,9 +114,9 @@ impl ConsensusApi { } /// Check that provider is valid and accessible. -async fn check_provider(client: &ClientWithMiddleware) -> anyhow::Result<()> { +async fn check_provider(client: &ClientWithBaseUrl) -> anyhow::Result<()> { let endpoint = "/eth/v1/node/version".to_string(); - match client.get(endpoint).send().await?.text().await { + match client.get(endpoint)?.send().await?.text().await { Ok(_) => Ok(()), Err(err) => Err(anyhow!( "Unable to request consensus data from provider: {err:?}" diff --git a/portal-bridge/src/api/execution.rs b/portal-bridge/src/api/execution.rs index d0dcdba16..61e8ee19d 100644 --- a/portal-bridge/src/api/execution.rs +++ b/portal-bridge/src/api/execution.rs @@ -21,7 +21,6 @@ use ethportal_api::{ Receipts, }; use futures::future::join_all; -use reqwest_middleware::ClientWithMiddleware; use serde_json::{json, Value}; use tokio::time::sleep; use tracing::{debug, error, warn}; @@ -32,7 +31,7 @@ use trin_validation::{ use url::Url; use crate::{ - cli::url_to_client, + cli::{url_to_client, ClientWithBaseUrl}, constants::{FALLBACK_RETRY_AFTER, GET_RECEIPTS_RETRY_AFTER}, types::full_header::FullHeader, }; @@ -273,9 +272,6 @@ impl ExecutionApi { Ok(header.number) } - /// Used the "surf" library here instead of "ureq" since "surf" is much more capable of handling - /// multiple async requests. Using "ureq" consistently resulted in errors as soon as the number - /// of concurrent tasks increased significantly. async fn batch_requests(&self, obj: Vec) -> anyhow::Result { let batched_request_futures = obj .chunks(BATCH_LIMIT) @@ -320,24 +316,19 @@ impl ExecutionApi { } async fn send_batch_request( - client: &ClientWithMiddleware, + client: &ClientWithBaseUrl, requests: &Vec, ) -> anyhow::Result> { - let request = - client.post("").json(&requests).build().map_err(|e| { - anyhow!("Unable to construct JSON POST for batched requests: {e:?}") - })?; let response = client - .execute(request) + .post("")? + .json(&requests) + .send() .await .map_err(|err| anyhow!("Unable to request execution batch from provider: {err:?}"))?; - let response_text = response - .text() + response + .json::>() .await - .map_err(|e| anyhow!("Unable to read response body: {e:?}"))?; - serde_json::from_str::>(&response_text).map_err(|err| { - anyhow!("Unable to parse execution batch from provider: {err:?} response: {response_text:?}") - }) + .map_err(|err| anyhow!("Unable to parse execution batch from provider: {err:?}")) } async fn try_request(&self, request: JsonRequest) -> anyhow::Result { @@ -360,11 +351,11 @@ impl ExecutionApi { } async fn send_request( - client: &ClientWithMiddleware, + client: &ClientWithBaseUrl, request: &JsonRequest, ) -> anyhow::Result { let request = client - .post("") + .post("")? .json(&request) .build() .map_err(|e| anyhow!("Unable to construct JSON POST for single request: {e:?}"))?; @@ -395,9 +386,9 @@ pub async fn construct_proof( } /// Check that provider is valid and accessible. -async fn check_provider(client: &ClientWithMiddleware) -> anyhow::Result<()> { +async fn check_provider(client: &ClientWithBaseUrl) -> anyhow::Result<()> { let request = client - .post("") + .post("")? .json(&json!({ "jsonrpc": "2.0", "method": "web3_clientVersion", diff --git a/portal-bridge/src/bridge/era1.rs b/portal-bridge/src/bridge/era1.rs index f7c0e847d..38faabfca 100644 --- a/portal-bridge/src/bridge/era1.rs +++ b/portal-bridge/src/bridge/era1.rs @@ -68,9 +68,12 @@ impl Era1Bridge { gossip_limit: usize, execution_api: ExecutionApi, ) -> anyhow::Result { - let mut headers = HeaderMap::new(); - headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/xml")); - let http_client: Client = Client::builder().default_headers(headers).build()?; + let http_client = Client::builder() + .default_headers(HeaderMap::from_iter([( + CONTENT_TYPE, + HeaderValue::from_static("application/xml"), + )])) + .build()?; let era1_files = get_shuffled_era1_files(&http_client).await?; let metrics = BridgeMetricsReporter::new("era1".to_string(), &format!("{mode:?}")); let gossip_semaphore = Arc::new(Semaphore::new(gossip_limit)); diff --git a/portal-bridge/src/cli.rs b/portal-bridge/src/cli.rs index 4c06e1c30..f83982ff4 100644 --- a/portal-bridge/src/cli.rs +++ b/portal-bridge/src/cli.rs @@ -5,9 +5,9 @@ use anyhow::anyhow; use clap::{Parser, Subcommand}; use reqwest::{ header::{HeaderMap, HeaderValue, CONTENT_TYPE}, - Client, + Client, IntoUrl, Request, Response, }; -use reqwest_middleware::{ClientBuilder, ClientWithMiddleware}; +use reqwest_middleware::{ClientBuilder, ClientWithMiddleware, RequestBuilder}; use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware}; use tokio::process::Child; use tracing::error; @@ -154,7 +154,7 @@ pub struct BridgeConfig { pub gossip_limit: usize, } -pub fn url_to_client(url: Url) -> Result { +pub fn url_to_client(url: Url) -> Result { let mut headers = HeaderMap::new(); headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json")); @@ -191,12 +191,47 @@ pub fn url_to_client(url: Url) -> Result { .timeout(HTTP_REQUEST_TIMEOUT) .build() .map_err(|_| "Failed to build HTTP client")?; - let retry_policy: ExponentialBackoff = ExponentialBackoff::builder().build_with_max_retries(3); let client = ClientBuilder::new(reqwest_client) - .with(RetryTransientMiddleware::new_with_policy(retry_policy)) + .with(RetryTransientMiddleware::new_with_policy( + ExponentialBackoff::builder().build_with_max_retries(3), + )) .build(); + let client_with_base_url = ClientWithBaseUrl::new(client, url); - Ok(client) + Ok(client_with_base_url) +} + +pub struct ClientWithBaseUrl { + client: ClientWithMiddleware, + base_url: Url, +} + +impl ClientWithBaseUrl { + pub fn new(client: ClientWithMiddleware, base_url: Url) -> Self { + Self { client, base_url } + } + + pub fn client(&self) -> &ClientWithMiddleware { + &self.client + } + + pub fn base_url(&self) -> &Url { + &self.base_url + } + + pub fn get(&self, url: U) -> anyhow::Result { + let url = self.base_url.join(url.as_str())?; + Ok(self.client.get(url)) + } + + pub fn post(&self, url: U) -> anyhow::Result { + let url = self.base_url.join(url.as_str())?; + Ok(self.client.post(url)) + } + + pub async fn execute(&self, request: Request) -> Result { + self.client.execute(request).await + } } // parser for subnetworks, makes sure that the state network is not ran alongside other subnetworks diff --git a/src/bin/test_providers.rs b/src/bin/test_providers.rs index 9cb58be63..8e2222e7e 100644 --- a/src/bin/test_providers.rs +++ b/src/bin/test_providers.rs @@ -2,6 +2,15 @@ use std::{fmt, fs, ops::Range, sync::Arc}; use anyhow::{anyhow, Result}; use clap::Parser; +use ethportal_api::{ + types::{ + execution::accumulator::EpochAccumulator, + jsonrpc::{params::Params, request::JsonRequest}, + }, + utils::bytes::hex_encode, + Header, +}; +use portal_bridge::api::execution::ExecutionApi; use rand::{ distributions::{Distribution, Uniform}, thread_rng, @@ -13,12 +22,6 @@ use reqwest::{ use serde_json::json; use ssz::Decode; use tracing::{debug, info, warn}; -use url::Url; - -use ethportal_api::{ - types::execution::accumulator::EpochAccumulator, utils::bytes::hex_encode, Header, -}; -use portal_bridge::api::execution::ExecutionApi; use trin_utils::log::init_tracing_logger; use trin_validation::{ accumulator::PreMergeAccumulator, @@ -29,6 +32,7 @@ use trin_validation::{ }, header_validator::HeaderValidator, }; +use url::Url; lazy_static::lazy_static! { static ref PANDAOPS_CLIENT_ID: String = std::env::var("PANDAOPS_CLIENT_ID").unwrap(); @@ -435,27 +439,22 @@ async fn get_latest_block_number() -> Result { HeaderValue::from_str(&PANDAOPS_CLIENT_SECRET) .map_err(|_| anyhow!("Invalid CF-Access-Client-Secret header value"))?, ); - let client = Client::builder() - .default_headers(headers) - .build() - .map_err(|e| anyhow!("Failed to build HTTP client: {:?}", e))?; - let request_body = json!({ - "jsonrpc": "2.0", - "method": "eth_getBlockByNumber", - "params": ["latest", false], - "id": 1 - }); - let response = client + let request = JsonRequest::new( + "eth_getBlockByNumber".to_string(), + Params::Array(vec![json!("latest"), json!(false)]), + /* id= */ 1, + ); + let response = Client::new() .post("https://geth-lighthouse.mainnet.eu1.ethpandaops.io/") - .json(&request_body) + .headers(headers) + .json(&request) .send() .await .map_err(|e| anyhow!("Request failed: {:?}", e))?; - let response_text = response - .text() + let response = response + .json::() .await .map_err(|e| anyhow!("Failed to read response text: {:?}", e))?; - let response = serde_json::from_str::(&response_text)?; let result = response .get("result") .ok_or_else(|| anyhow!("Unable to fetch latest block"))?; diff --git a/trin-execution/src/era/manager.rs b/trin-execution/src/era/manager.rs index 48bc234b4..f2e750eae 100644 --- a/trin-execution/src/era/manager.rs +++ b/trin-execution/src/era/manager.rs @@ -33,10 +33,12 @@ pub struct EraManager { impl EraManager { pub async fn new(next_block_number: u64) -> anyhow::Result { - let mut headers = HeaderMap::new(); - headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/xml")); - let http_client: Client = Client::builder().default_headers(headers).build()?; - + let http_client = Client::builder() + .default_headers(HeaderMap::from_iter([( + CONTENT_TYPE, + HeaderValue::from_static("application/xml"), + )])) + .build()?; let era1_files = get_era1_files(&http_client).await?; let era_files = get_era_files(&http_client).await?;