Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: remove unmaintained surf for reqwest #1448

Merged
merged 3 commits into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,362 changes: 412 additions & 950 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ portalnet.workspace = true
prometheus_exporter.workspace = true
rand.workspace = true
regex = "1.10.2"
reqwest.workspace = true
reth-ipc.workspace = true
rpc.workspace = true
serde_json = { workspace = true, features = ["preserve_order"]}
sha3.workspace = true
surf.workspace = true
tempfile.workspace = true
tokio.workspace = true
tracing.workspace = true
Expand Down Expand Up @@ -111,6 +111,7 @@ quickcheck = "1.0.3"
r2d2 = "0.8.9"
r2d2_sqlite = "0.24.0"
rand = "0.8.5"
reqwest = { version = "0.12.7", default-features = false, features = ["rustls-tls", "json"] }
reth-ipc = { tag = "v0.2.0-beta.5", git = "https://github.com/paradigmxyz/reth.git"}
reth-rpc-types = { tag = "v1.0.6", git = "https://github.com/paradigmxyz/reth.git"}
revm = { version = "14.0.1", default-features = false, features = ["std", "secp256k1", "serde-json"] }
Expand All @@ -128,7 +129,6 @@ sha3 = "0.9.1"
snap = "1.1.1"
ssz_types = "0.8.0"
strum = { version = "0.26.1", features = ["derive"] }
surf = { version = "2.3.2", default-features = false, features = ["h1-client-rustls", "middleware-logger", "encoding"] } # we use rustls because OpenSSL cause issues compiling on aarch64
tempfile = "3.3.0"
test-log = { version = "0.2.11", features = ["trace"] }
thiserror = "1.0.57"
Expand Down
2 changes: 1 addition & 1 deletion e2store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ ethereum_ssz.workspace = true
ethereum_ssz_derive.workspace = true
ethportal-api.workspace = true
rand.workspace = true
reqwest.workspace = true
scraper.workspace = true
snap.workspace = true
surf.workspace = true

[dev-dependencies]
rstest.workspace = true
Expand Down
10 changes: 3 additions & 7 deletions e2store/src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::{collections::HashMap, io};

use anyhow::{anyhow, ensure, Error};
use anyhow::{ensure, Error};
use rand::{seq::SliceRandom, thread_rng};
use reqwest::Client;
use scraper::{Html, Selector};
use surf::Client;

const ERA_DIR_URL: &str = "https://mainnet.era.nimbus.team/";
const ERA1_DIR_URL: &str = "https://era1.ethportal.net/";
Expand All @@ -22,11 +22,7 @@ pub async fn download_era_links(
http_client: &Client,
url: &str,
) -> anyhow::Result<HashMap<u64, String>> {
let index_html = http_client
.get(url)
.recv_string()
.await
.map_err(|e| anyhow!("{e}"))?;
let index_html = http_client.get(url).send().await?.text().await?;
let index_html = Html::parse_document(&index_html);
let selector = Selector::parse("a[href*='mainnet-']").expect("to be able to parse selector");
let era_files: HashMap<u64, String> = index_html
Expand Down
6 changes: 2 additions & 4 deletions ethportal-api/src/types/execution/block_body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,7 @@ impl ssz::Decode for BlockBodyLegacy {
))
})?;
let uncles: Vec<Header> = Decodable::decode(&mut uncles.as_slice()).map_err(|e| {
ssz::DecodeError::BytesInvalid(
format!("Legacy block body contains invalid txs: {e:?}",),
)
ssz::DecodeError::BytesInvalid(format!("Legacy block body contains invalid txs: {e:?}"))
})?;
Ok(Self { txs, uncles })
}
Expand Down Expand Up @@ -350,7 +348,7 @@ impl ssz::Decode for BlockBodyMerge {
))
})?;
let uncles: Vec<Header> = Decodable::decode(&mut uncles.as_slice()).map_err(|e| {
ssz::DecodeError::BytesInvalid(format!("Merge block body contains invalid txs: {e:?}",))
ssz::DecodeError::BytesInvalid(format!("Merge block body contains invalid txs: {e:?}"))
})?;
if !uncles.is_empty() {
return Err(ssz::DecodeError::BytesInvalid(
Expand Down
1 change: 0 additions & 1 deletion ethportal-peertest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ rpc.workspace = true
serde.workspace = true
serde_json.workspace = true
serde_yaml.workspace = true
surf.workspace = true
tempfile.workspace = true
tokio.workspace = true
tracing.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion light-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jsonrpsee = { workspace = true, features = ["full"] }
log = "0.4.17"
milagro_bls = { package="snowbridge-milagro-bls", git = "https://github.com/Snowfork/milagro_bls" }
portalnet.workspace = true
reqwest = { version = "0.11.13", default-features = false, features = ["json", "rustls-tls"] }
reqwest.workspace = true
serde.workspace = true
serde-this-or-that.workspace = true
serde_json.workspace = true
Expand Down
4 changes: 3 additions & 1 deletion portal-bridge/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,16 @@ jsonrpsee = { workspace = true, features = [
portalnet.workspace = true
prometheus_exporter.workspace = true
rand.workspace = true
reqwest.workspace = true
reqwest-middleware = { version = "0.3.3", features = ["json"] }
reqwest-retry = "0.6.1"
revm.workspace = true
revm-primitives.workspace = true
scraper.workspace = true
serde = { workspace = true, features = ["rc"] }
serde_json.workspace = true
serde_yaml.workspace = true
ssz_types.workspace = true
surf.workspace = true
tokio.workspace = true
tracing.workspace = true
trin-execution.workspace = true
Expand Down
29 changes: 18 additions & 11 deletions portal-bridge/src/api/consensus.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use std::fmt::Display;

use anyhow::anyhow;
use surf::Client;
use tokio::time::sleep;
use tracing::{debug, warn};
use url::Url;

use crate::{cli::url_to_client, constants::FALLBACK_RETRY_AFTER};
use crate::{
cli::{url_to_client, ClientWithBaseUrl},
constants::FALLBACK_RETRY_AFTER,
};

/// Implements endpoints from the Beacon API to access data from the consensus layer.
#[derive(Clone, Debug)]
Expand All @@ -16,10 +18,9 @@ pub struct ConsensusApi {
}

impl ConsensusApi {
pub async fn new(primary: Url, fallback: Url) -> Result<Self, surf::Error> {
pub async fn new(primary: Url, fallback: Url) -> Result<Self, reqwest_middleware::Error> {
debug!(
"Starting ConsensusApi with primary provider: {} and fallback provider: {}",
primary, fallback
"Starting ConsensusApi with primary provider: {primary} and fallback provider: {fallback}",
);
let client = url_to_client(primary.clone()).map_err(|err| {
anyhow!("Unable to create primary client for consensus data provider: {err:?}")
Expand Down Expand Up @@ -90,26 +91,32 @@ impl ConsensusApi {
let client = url_to_client(self.primary.clone()).map_err(|err| {
anyhow!("Unable to create client for primary consensus data provider: {err:?}")
})?;
match client.get(&endpoint).recv_string().await {
match client.get(&endpoint)?.send().await?.text().await {
Ok(response) => Ok(response),
Err(err) => {
warn!("Error requesting consensus data from provider, retrying with fallback provider: {err:?}");
sleep(FALLBACK_RETRY_AFTER).await;
let client = url_to_client(self.fallback.clone()).map_err(|err| {
anyhow!("Unable to create client for fallback consensus data provider: {err:?}")
})?;
client.get(endpoint).recv_string().await.map_err(|err| {
anyhow!("Unable to request consensus data from fallback provider: {err:?}")
})
client
.get(endpoint)?
.send()
.await?
.text()
.await
.map_err(|err| {
anyhow!("Unable to request consensus data from fallback provider: {err:?}")
})
}
}
}
}

/// Check that provider is valid and accessible.
async fn check_provider(client: &Client) -> anyhow::Result<()> {
async fn check_provider(client: &ClientWithBaseUrl) -> anyhow::Result<()> {
let endpoint = "/eth/v1/node/version".to_string();
match client.get(endpoint).recv_string().await {
match client.get(endpoint)?.send().await?.text().await {
Ok(_) => Ok(()),
Err(err) => Err(anyhow!(
"Unable to request consensus data from provider: {err:?}"
Expand Down
105 changes: 60 additions & 45 deletions portal-bridge/src/api/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,6 @@ use std::sync::Arc;

use alloy_primitives::B256;
use anyhow::{anyhow, bail};
use futures::future::join_all;
use serde_json::{json, Value};
use surf::Client;
use tokio::time::sleep;
use tracing::{debug, error, warn};
use url::Url;

use crate::{
cli::url_to_client,
constants::{FALLBACK_RETRY_AFTER, GET_RECEIPTS_RETRY_AFTER},
types::full_header::FullHeader,
};
use ethportal_api::{
types::{
execution::{
Expand All @@ -32,10 +20,21 @@ use ethportal_api::{
BlockBodyKey, BlockHeaderKey, BlockReceiptsKey, Header, HistoryContentKey, HistoryContentValue,
Receipts,
};
use futures::future::join_all;
use serde_json::{json, Value};
use tokio::time::sleep;
use tracing::{debug, error, warn};
use trin_validation::{
accumulator::PreMergeAccumulator, constants::MERGE_BLOCK_NUMBER,
header_validator::HeaderValidator,
};
use url::Url;

use crate::{
cli::{url_to_client, ClientWithBaseUrl},
constants::{FALLBACK_RETRY_AFTER, GET_RECEIPTS_RETRY_AFTER},
types::full_header::FullHeader,
};

/// Limit the number of requests in a single batch to avoid exceeding the
/// provider's batch size limit configuration of 100.
Expand All @@ -51,7 +50,7 @@ pub struct ExecutionApi {
}

impl ExecutionApi {
pub async fn new(primary: Url, fallback: Url) -> Result<Self, surf::Error> {
pub async fn new(primary: Url, fallback: Url) -> Result<Self, reqwest_middleware::Error> {
// Only check that provider is connected & available if not using a test provider.
debug!(
"Starting ExecutionApi with primary provider: {primary} and fallback provider: {fallback}",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on line 276 should be removed, and probably some testing to make sure that reqwest can handle the async load, though it's a very popular library so I expect it will

Expand Down Expand Up @@ -273,9 +272,6 @@ impl ExecutionApi {
Ok(header.number)
}

/// Used the "surf" library here instead of "ureq" since "surf" is much more capable of handling
/// multiple async requests. Using "ureq" consistently resulted in errors as soon as the number
/// of concurrent tasks increased significantly.
async fn batch_requests(&self, obj: Vec<JsonRequest>) -> anyhow::Result<String> {
let batched_request_futures = obj
.chunks(BATCH_LIMIT)
Expand Down Expand Up @@ -320,20 +316,19 @@ impl ExecutionApi {
}

async fn send_batch_request(
client: &Client,
client: &ClientWithBaseUrl,
requests: &Vec<JsonRequest>,
) -> anyhow::Result<Vec<Value>> {
let result = client
.post("")
.body_json(&json!(requests))
.map_err(|e| anyhow!("Unable to construct json post for batched requests: {e:?}"))?;
let response = result
.recv_string()
let response = client
.post("")?
.json(&requests)
.send()
.await
.map_err(|err| anyhow!("Unable to request execution batch from provider: {err:?}"))?;
serde_json::from_str::<Vec<Value>>(&response).map_err(|err| {
anyhow!("Unable to parse execution batch from provider: {err:?} response: {response:?}")
})
response
.json::<Vec<Value>>()
.await
.map_err(|err| anyhow!("Unable to parse execution batch from provider: {err:?}"))
}

async fn try_request(&self, request: JsonRequest) -> anyhow::Result<Value> {
Expand All @@ -355,19 +350,27 @@ impl ExecutionApi {
}
}

async fn send_request(client: &Client, request: &JsonRequest) -> anyhow::Result<Value> {
let result = client
.post("")
.body_json(&request)
.map_err(|e| anyhow!("Unable to construct json post for single request: {e:?}"))?;
let response = result
.recv_string()
async fn send_request(
client: &ClientWithBaseUrl,
request: &JsonRequest,
) -> anyhow::Result<Value> {
let request = client
.post("")?
.json(&request)
.build()
.map_err(|e| anyhow!("Unable to construct JSON POST for single request: {e:?}"))?;
let response = client
.execute(request)
.await
.map_err(|err| anyhow!("Unable to request execution payload from provider: {err:?}"))?;
serde_json::from_str::<Value>(&response).map_err(|err| {
let response_text = response
.text()
.await
.map_err(|e| anyhow!("Unable to read response body: {e:?}"))?;
serde_json::from_str::<Value>(&response_text).map_err(|err| {
anyhow!(
"Unable to parse execution response from provider: {err:?} response: {response:?}"
)
"Unable to parse execution response from provider: {err:?} response: {response_text:?}",
)
})
}
}
Expand All @@ -383,19 +386,31 @@ pub async fn construct_proof(
}

/// Check that provider is valid and accessible.
async fn check_provider(client: &Client) -> anyhow::Result<()> {
async fn check_provider(client: &ClientWithBaseUrl) -> anyhow::Result<()> {
let request = client
.post("")
.body_json(
&json!({"jsonrpc": "2.0", "method": "web3_clientVersion", "params": [], "id": 1}),
)
.post("")?
.json(&json!({
"jsonrpc": "2.0",
"method": "web3_clientVersion",
"params": [],
"id": 1,
}))
.build()
.map_err(|e| anyhow!("Unable to construct json post request: {e:?}"))?;
let response = request
.recv_string()
let response = client
.execute(request)
.await
.map_err(|err| anyhow!("Unable to request execution batch from provider: {err:?}"))?;
let response: Value = serde_json::from_str(&response).map_err(|err| anyhow!("Unable to parse json response from execution provider, it's likely unavailable/configured incorrectly: {err:?} response: {response:?}"))?;
if response["result"].as_str().is_none() {
let response_text = response
.text()
.await
.map_err(|e| anyhow!("Unable to read response body: {e:?}"))?;
let response_json: Value = serde_json::from_str(&response_text).map_err(|err| {
anyhow!(
"Unable to parse json response from execution provider, it's likely unavailable/configured incorrectly: {err:?} response: {response_text:?}",
)
})?;
if response_json["result"].as_str().is_none() {
bail!("Invalid response from execution provider check, it's likely unavailable/configured incorrectly");
}
Ok(())
Expand Down
Loading
Loading