Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a request pool #1137

Merged
merged 20 commits into from
Sep 7, 2023
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `Wallet::get_chrysalis_data()` method;
- `PrivateKeySecretManager` and `SecretManager::PrivateKey`;
- `SecretManager::from` impl for variants;
- `Client` requests now obey a maximum concurrency using a request pool (`set via ClientBuilder::with_max_parallel_api_requests`);
thibault-martinez marked this conversation as resolved.
Show resolved Hide resolved

### Fixed

Expand Down
20 changes: 20 additions & 0 deletions sdk/src/client/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ pub struct ClientBuilder {
#[cfg(not(target_family = "wasm"))]
#[serde(default, skip_serializing_if = "Option::is_none")]
pub pow_worker_count: Option<usize>,
#[cfg(not(target_family = "wasm"))]
#[serde(default = "default_max_parallel_api_requests")]
pub max_parallel_api_requests: usize,
}

fn default_api_timeout() -> Duration {
Expand All @@ -58,6 +61,11 @@ fn default_remote_pow_timeout() -> Duration {
DEFAULT_REMOTE_POW_API_TIMEOUT
}

#[cfg(not(target_family = "wasm"))]
fn default_max_parallel_api_requests() -> usize {
super::constants::MAX_PARALLEL_API_REQUESTS
}

impl Default for NetworkInfo {
fn default() -> Self {
Self {
Expand All @@ -82,6 +90,8 @@ impl Default for ClientBuilder {
remote_pow_timeout: DEFAULT_REMOTE_POW_API_TIMEOUT,
#[cfg(not(target_family = "wasm"))]
pow_worker_count: None,
#[cfg(not(target_family = "wasm"))]
max_parallel_api_requests: super::constants::MAX_PARALLEL_API_REQUESTS,
}
}
}
Expand Down Expand Up @@ -237,6 +247,13 @@ impl ClientBuilder {
self
}

/// Set maximum parallel API requests.
#[cfg(not(target_family = "wasm"))]
pub fn with_max_parallel_api_requests(mut self, max_parallel_api_requests: usize) -> Self {
self.max_parallel_api_requests = max_parallel_api_requests;
self
}

/// Build the Client instance.
#[cfg(not(target_family = "wasm"))]
pub async fn finish(self) -> Result<Client> {
Expand Down Expand Up @@ -269,6 +286,7 @@ impl ClientBuilder {
sender: RwLock::new(mqtt_event_tx),
receiver: RwLock::new(mqtt_event_rx),
},
request_pool: crate::client::request_pool::RequestPool::new(self.max_parallel_api_requests),
});

client_inner.sync_nodes(&nodes, ignore_node_health).await?;
Expand Down Expand Up @@ -327,6 +345,8 @@ impl ClientBuilder {
remote_pow_timeout: client.get_remote_pow_timeout().await,
#[cfg(not(target_family = "wasm"))]
pow_worker_count: *client.pow_worker_count.read().await,
#[cfg(not(target_family = "wasm"))]
max_parallel_api_requests: client.request_pool.size().await,
}
}
}
Expand Down
13 changes: 10 additions & 3 deletions sdk/src/client/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ use {
tokio::sync::watch::{Receiver as WatchReceiver, Sender as WatchSender},
};

#[cfg(not(target_family = "wasm"))]
use super::request_pool::RequestPool;
#[cfg(target_family = "wasm")]
use crate::client::constants::CACHE_NETWORK_INFO_TIMEOUT_IN_SECONDS;
use crate::{
Expand Down Expand Up @@ -56,6 +58,8 @@ pub struct ClientInner {
pub(crate) mqtt: MqttInner,
#[cfg(target_family = "wasm")]
pub(crate) last_sync: tokio::sync::Mutex<Option<u32>>,
#[cfg(not(target_family = "wasm"))]
pub(crate) request_pool: RequestPool,
}

#[derive(Default)]
Expand Down Expand Up @@ -83,10 +87,13 @@ pub(crate) struct MqttInner {
impl std::fmt::Debug for Client {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut d = f.debug_struct("Client");
d.field("node_manager", &self.inner.node_manager);
d.field("node_manager", &self.node_manager);
#[cfg(feature = "mqtt")]
d.field("broker_options", &self.inner.mqtt.broker_options);
d.field("network_info", &self.inner.network_info).finish()
d.field("broker_options", &self.mqtt.broker_options);
d.field("network_info", &self.network_info);
#[cfg(not(target_family = "wasm"))]
d.field("request_pool", &self.request_pool);
d.finish()
}
}

Expand Down
2 changes: 2 additions & 0 deletions sdk/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ pub mod core;
pub mod error;
pub mod node_api;
pub mod node_manager;
#[cfg(not(target_family = "wasm"))]
pub(crate) mod request_pool;
pub mod secret;
pub mod storage;
#[cfg(feature = "stronghold")]
Expand Down
90 changes: 15 additions & 75 deletions sdk/src/client/node_api/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@

pub mod routes;

#[cfg(not(target_family = "wasm"))]
use crate::client::constants::MAX_PARALLEL_API_REQUESTS;
use crate::{
client::{Client, Result},
types::block::output::{OutputId, OutputMetadata, OutputWithMetadata},
Expand All @@ -15,87 +13,29 @@ use crate::{
impl Client {
/// Request outputs by their output ID in parallel
pub async fn get_outputs(&self, output_ids: &[OutputId]) -> Result<Vec<OutputWithMetadata>> {
#[cfg(target_family = "wasm")]
let outputs = futures::future::try_join_all(output_ids.iter().map(|id| self.get_output(id))).await?;

#[cfg(not(target_family = "wasm"))]
let outputs =
futures::future::try_join_all(output_ids.chunks(MAX_PARALLEL_API_REQUESTS).map(|output_ids_chunk| {
let client = self.clone();
let output_ids_chunk = output_ids_chunk.to_vec();
async move {
tokio::spawn(async move {
futures::future::try_join_all(output_ids_chunk.iter().map(|id| client.get_output(id))).await
})
.await?
}
}))
.await?
.into_iter()
.flatten()
.collect();

Ok(outputs)
futures::future::try_join_all(output_ids.iter().map(|id| self.get_output(id))).await
thibault-martinez marked this conversation as resolved.
Show resolved Hide resolved
}

/// Request outputs by their output ID in parallel, ignoring failed requests
/// Useful to get data about spent outputs, that might not be pruned yet
pub async fn get_outputs_ignore_errors(&self, output_ids: &[OutputId]) -> Result<Vec<OutputWithMetadata>> {
#[cfg(target_family = "wasm")]
let outputs = futures::future::join_all(output_ids.iter().map(|id| self.get_output(id)))
.await
.into_iter()
.filter_map(Result::ok)
.collect();

#[cfg(not(target_family = "wasm"))]
let outputs =
futures::future::try_join_all(output_ids.chunks(MAX_PARALLEL_API_REQUESTS).map(|output_ids_chunk| {
let client = self.clone();
let output_ids_chunk = output_ids_chunk.to_vec();
tokio::spawn(async move {
futures::future::join_all(output_ids_chunk.iter().map(|id| client.get_output(id)))
.await
.into_iter()
.filter_map(Result::ok)
.collect::<Vec<_>>()
})
}))
.await?
.into_iter()
.flatten()
.collect();

Ok(outputs)
Ok(
futures::future::join_all(output_ids.iter().map(|id| self.get_output(id)))
.await
.into_iter()
.filter_map(Result::ok)
.collect(),
)
}

/// Requests metadata for outputs by their output ID in parallel, ignoring failed requests
pub async fn get_outputs_metadata_ignore_errors(&self, output_ids: &[OutputId]) -> Result<Vec<OutputMetadata>> {
#[cfg(target_family = "wasm")]
let metadata = futures::future::join_all(output_ids.iter().map(|id| self.get_output_metadata(id)))
.await
.into_iter()
.filter_map(Result::ok)
.collect();

#[cfg(not(target_family = "wasm"))]
let metadata =
futures::future::try_join_all(output_ids.chunks(MAX_PARALLEL_API_REQUESTS).map(|output_ids_chunk| {
let client = self.clone();
let output_ids_chunk = output_ids_chunk.to_vec();
tokio::spawn(async move {
futures::future::join_all(output_ids_chunk.iter().map(|id| client.get_output_metadata(id)))
.await
.into_iter()
.filter_map(Result::ok)
.collect::<Vec<_>>()
})
}))
.await?
.into_iter()
.flatten()
.collect();

Ok(metadata)
Ok(
futures::future::join_all(output_ids.iter().map(|id| self.get_output_metadata(id)))
.await
.into_iter()
.filter_map(Result::ok)
.collect(),
)
}
}
Loading
Loading