Skip to content

Commit

Permalink
fix: resolve PR concerns
Browse files Browse the repository at this point in the history
  • Loading branch information
KolbyML committed Sep 13, 2024
1 parent d91992c commit 10acc88
Show file tree
Hide file tree
Showing 8 changed files with 95 additions and 63 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion light-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jsonrpsee = { workspace = true, features = ["full"] }
log = "0.4.17"
milagro_bls = { package="snowbridge-milagro-bls", git = "https://github.com/Snowfork/milagro_bls" }
portalnet.workspace = true
reqwest = { version = "0.11.13", default-features = false, features = ["json", "rustls-tls"] }
reqwest.workspace = true
serde.workspace = true
serde-this-or-that.workspace = true
serde_json.workspace = true
Expand Down
14 changes: 8 additions & 6 deletions portal-bridge/src/api/consensus.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use std::fmt::Display;

use anyhow::anyhow;
use reqwest_middleware::ClientWithMiddleware;
use tokio::time::sleep;
use tracing::{debug, warn};
use url::Url;

use crate::{cli::url_to_client, constants::FALLBACK_RETRY_AFTER};
use crate::{
cli::{url_to_client, ClientWithBaseUrl},
constants::FALLBACK_RETRY_AFTER,
};

/// Implements endpoints from the Beacon API to access data from the consensus layer.
#[derive(Clone, Debug)]
Expand Down Expand Up @@ -89,7 +91,7 @@ impl ConsensusApi {
let client = url_to_client(self.primary.clone()).map_err(|err| {
anyhow!("Unable to create client for primary consensus data provider: {err:?}")
})?;
match client.get(&endpoint).send().await?.text().await {
match client.get(&endpoint)?.send().await?.text().await {
Ok(response) => Ok(response),
Err(err) => {
warn!("Error requesting consensus data from provider, retrying with fallback provider: {err:?}");
Expand All @@ -98,7 +100,7 @@ impl ConsensusApi {
anyhow!("Unable to create client for fallback consensus data provider: {err:?}")
})?;
client
.get(endpoint)
.get(endpoint)?
.send()
.await?
.text()
Expand All @@ -112,9 +114,9 @@ impl ConsensusApi {
}

/// Check that provider is valid and accessible.
async fn check_provider(client: &ClientWithMiddleware) -> anyhow::Result<()> {
async fn check_provider(client: &ClientWithBaseUrl) -> anyhow::Result<()> {
let endpoint = "/eth/v1/node/version".to_string();
match client.get(endpoint).send().await?.text().await {
match client.get(endpoint)?.send().await?.text().await {
Ok(_) => Ok(()),
Err(err) => Err(anyhow!(
"Unable to request consensus data from provider: {err:?}"
Expand Down
33 changes: 12 additions & 21 deletions portal-bridge/src/api/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use ethportal_api::{
Receipts,
};
use futures::future::join_all;
use reqwest_middleware::ClientWithMiddleware;
use serde_json::{json, Value};
use tokio::time::sleep;
use tracing::{debug, error, warn};
Expand All @@ -32,7 +31,7 @@ use trin_validation::{
use url::Url;

use crate::{
cli::url_to_client,
cli::{url_to_client, ClientWithBaseUrl},
constants::{FALLBACK_RETRY_AFTER, GET_RECEIPTS_RETRY_AFTER},
types::full_header::FullHeader,
};
Expand Down Expand Up @@ -273,9 +272,6 @@ impl ExecutionApi {
Ok(header.number)
}

/// Used the "surf" library here instead of "ureq" since "surf" is much more capable of handling
/// multiple async requests. Using "ureq" consistently resulted in errors as soon as the number
/// of concurrent tasks increased significantly.
async fn batch_requests(&self, obj: Vec<JsonRequest>) -> anyhow::Result<String> {
let batched_request_futures = obj
.chunks(BATCH_LIMIT)
Expand Down Expand Up @@ -320,24 +316,19 @@ impl ExecutionApi {
}

async fn send_batch_request(
client: &ClientWithMiddleware,
client: &ClientWithBaseUrl,
requests: &Vec<JsonRequest>,
) -> anyhow::Result<Vec<Value>> {
let request =
client.post("").json(&requests).build().map_err(|e| {
anyhow!("Unable to construct JSON POST for batched requests: {e:?}")
})?;
let response = client
.execute(request)
.post("")?
.json(&requests)
.send()
.await
.map_err(|err| anyhow!("Unable to request execution batch from provider: {err:?}"))?;
let response_text = response
.text()
response
.json::<Vec<Value>>()
.await
.map_err(|e| anyhow!("Unable to read response body: {e:?}"))?;
serde_json::from_str::<Vec<Value>>(&response_text).map_err(|err| {
anyhow!("Unable to parse execution batch from provider: {err:?} response: {response_text:?}")
})
.map_err(|err| anyhow!("Unable to parse execution batch from provider: {err:?}"))
}

async fn try_request(&self, request: JsonRequest) -> anyhow::Result<Value> {
Expand All @@ -360,11 +351,11 @@ impl ExecutionApi {
}

async fn send_request(
client: &ClientWithMiddleware,
client: &ClientWithBaseUrl,
request: &JsonRequest,
) -> anyhow::Result<Value> {
let request = client
.post("")
.post("")?
.json(&request)
.build()
.map_err(|e| anyhow!("Unable to construct JSON POST for single request: {e:?}"))?;
Expand Down Expand Up @@ -395,9 +386,9 @@ pub async fn construct_proof(
}

/// Check that provider is valid and accessible.
async fn check_provider(client: &ClientWithMiddleware) -> anyhow::Result<()> {
async fn check_provider(client: &ClientWithBaseUrl) -> anyhow::Result<()> {
let request = client
.post("")
.post("")?
.json(&json!({
"jsonrpc": "2.0",
"method": "web3_clientVersion",
Expand Down
9 changes: 6 additions & 3 deletions portal-bridge/src/bridge/era1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,12 @@ impl Era1Bridge {
gossip_limit: usize,
execution_api: ExecutionApi,
) -> anyhow::Result<Self> {
let mut headers = HeaderMap::new();
headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/xml"));
let http_client: Client = Client::builder().default_headers(headers).build()?;
let http_client = Client::builder()
.default_headers(HeaderMap::from_iter([(
CONTENT_TYPE,
HeaderValue::from_static("application/xml"),
)]))
.build()?;
let era1_files = get_shuffled_era1_files(&http_client).await?;
let metrics = BridgeMetricsReporter::new("era1".to_string(), &format!("{mode:?}"));
let gossip_semaphore = Arc::new(Semaphore::new(gossip_limit));
Expand Down
47 changes: 41 additions & 6 deletions portal-bridge/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ use anyhow::anyhow;
use clap::{Parser, Subcommand};
use reqwest::{
header::{HeaderMap, HeaderValue, CONTENT_TYPE},
Client,
Client, IntoUrl, Request, Response,
};
use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
use reqwest_middleware::{ClientBuilder, ClientWithMiddleware, RequestBuilder};
use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
use tokio::process::Child;
use tracing::error;
Expand Down Expand Up @@ -154,7 +154,7 @@ pub struct BridgeConfig {
pub gossip_limit: usize,
}

pub fn url_to_client(url: Url) -> Result<ClientWithMiddleware, String> {
pub fn url_to_client(url: Url) -> Result<ClientWithBaseUrl, String> {
let mut headers = HeaderMap::new();
headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));

Expand Down Expand Up @@ -191,12 +191,47 @@ pub fn url_to_client(url: Url) -> Result<ClientWithMiddleware, String> {
.timeout(HTTP_REQUEST_TIMEOUT)
.build()
.map_err(|_| "Failed to build HTTP client")?;
let retry_policy: ExponentialBackoff = ExponentialBackoff::builder().build_with_max_retries(3);
let client = ClientBuilder::new(reqwest_client)
.with(RetryTransientMiddleware::new_with_policy(retry_policy))
.with(RetryTransientMiddleware::new_with_policy(
ExponentialBackoff::builder().build_with_max_retries(3),
))
.build();
let client_with_base_url = ClientWithBaseUrl::new(client, url);

Ok(client)
Ok(client_with_base_url)
}

pub struct ClientWithBaseUrl {
client: ClientWithMiddleware,
base_url: Url,
}

impl ClientWithBaseUrl {
pub fn new(client: ClientWithMiddleware, base_url: Url) -> Self {
Self { client, base_url }
}

pub fn client(&self) -> &ClientWithMiddleware {
&self.client
}

pub fn base_url(&self) -> &Url {
&self.base_url
}

pub fn get<U: IntoUrl>(&self, url: U) -> anyhow::Result<RequestBuilder> {
let url = self.base_url.join(url.as_str())?;
Ok(self.client.get(url))
}

pub fn post<U: IntoUrl>(&self, url: U) -> anyhow::Result<RequestBuilder> {
let url = self.base_url.join(url.as_str())?;
Ok(self.client.post(url))
}

pub async fn execute(&self, request: Request) -> Result<Response, reqwest_middleware::Error> {
self.client.execute(request).await
}
}

// parser for subnetworks, makes sure that the state network is not ran alongside other subnetworks
Expand Down
41 changes: 20 additions & 21 deletions src/bin/test_providers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,15 @@ use std::{fmt, fs, ops::Range, sync::Arc};

use anyhow::{anyhow, Result};
use clap::Parser;
use ethportal_api::{
types::{
execution::accumulator::EpochAccumulator,
jsonrpc::{params::Params, request::JsonRequest},
},
utils::bytes::hex_encode,
Header,
};
use portal_bridge::api::execution::ExecutionApi;
use rand::{
distributions::{Distribution, Uniform},
thread_rng,
Expand All @@ -13,12 +22,6 @@ use reqwest::{
use serde_json::json;
use ssz::Decode;
use tracing::{debug, info, warn};
use url::Url;

use ethportal_api::{
types::execution::accumulator::EpochAccumulator, utils::bytes::hex_encode, Header,
};
use portal_bridge::api::execution::ExecutionApi;
use trin_utils::log::init_tracing_logger;
use trin_validation::{
accumulator::PreMergeAccumulator,
Expand All @@ -29,6 +32,7 @@ use trin_validation::{
},
header_validator::HeaderValidator,
};
use url::Url;

lazy_static::lazy_static! {
static ref PANDAOPS_CLIENT_ID: String = std::env::var("PANDAOPS_CLIENT_ID").unwrap();
Expand Down Expand Up @@ -435,27 +439,22 @@ async fn get_latest_block_number() -> Result<u64> {
HeaderValue::from_str(&PANDAOPS_CLIENT_SECRET)
.map_err(|_| anyhow!("Invalid CF-Access-Client-Secret header value"))?,
);
let client = Client::builder()
.default_headers(headers)
.build()
.map_err(|e| anyhow!("Failed to build HTTP client: {:?}", e))?;
let request_body = json!({
"jsonrpc": "2.0",
"method": "eth_getBlockByNumber",
"params": ["latest", false],
"id": 1
});
let response = client
let request = JsonRequest::new(
"eth_getBlockByNumber".to_string(),
Params::Array(vec![json!("latest"), json!(false)]),
/* id= */ 1,
);
let response = Client::new()
.post("https://geth-lighthouse.mainnet.eu1.ethpandaops.io/")
.json(&request_body)
.headers(headers)
.json(&request)
.send()
.await
.map_err(|e| anyhow!("Request failed: {:?}", e))?;
let response_text = response
.text()
let response = response
.json::<serde_json::Value>()
.await
.map_err(|e| anyhow!("Failed to read response text: {:?}", e))?;
let response = serde_json::from_str::<serde_json::Value>(&response_text)?;
let result = response
.get("result")
.ok_or_else(|| anyhow!("Unable to fetch latest block"))?;
Expand Down
10 changes: 6 additions & 4 deletions trin-execution/src/era/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@ pub struct EraManager {

impl EraManager {
pub async fn new(next_block_number: u64) -> anyhow::Result<Self> {
let mut headers = HeaderMap::new();
headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/xml"));
let http_client: Client = Client::builder().default_headers(headers).build()?;

let http_client = Client::builder()
.default_headers(HeaderMap::from_iter([(
CONTENT_TYPE,
HeaderValue::from_static("application/xml"),
)]))
.build()?;
let era1_files = get_era1_files(&http_client).await?;
let era_files = get_era_files(&http_client).await?;

Expand Down

0 comments on commit 10acc88

Please sign in to comment.