Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a worker pool to the client to limit parallel requests #1107

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ rocksdb = { version = "0.21.0", default-features = false, features = [
rumqttc = { version = "0.22.0", default-features = false, features = [
"websocket",
], optional = true }
pin-project = { version = "1.1.3", default-features = false, optional = true }
serde_repr = { version = "0.1.16", default-features = false, optional = true }
thiserror = { version = "1.0.46", default-features = false, optional = true }
time = { version = "0.3.25", default-features = false, features = [
Expand Down Expand Up @@ -206,6 +207,8 @@ client = [
"iota-crypto/keccak",
"iota-crypto/bip44",
"iota-crypto/random",
"dep:pin-project",
"rand"
]
wallet = ["client"]

Expand Down
4 changes: 2 additions & 2 deletions sdk/src/client/api/block_builder/pow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ use crate::pow::miner::{Miner, MinerBuilder, MinerCancel};
#[cfg(target_family = "wasm")]
use crate::pow::wasm_miner::{SingleThreadedMiner, SingleThreadedMinerBuilder};
use crate::{
client::{ClientInner, Error, Result},
client::{Client, Error, Result},
types::block::{parent::Parents, payload::Payload, Block, BlockBuilder, Error as BlockError},
};

impl ClientInner {
impl Client {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just making sure, this is not breaking in any way since Client derefs to ClientInner, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But wait, ClientInner is public, so this is breaking change?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mmmm, yes I suppose so. Technically. But if you used the client this way you probably deserve to have your code break 😆

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It this change (Client/ClientInner) really required for this PR? Otherwise we just do it in 2.0

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately it is most definitely required

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will think about alternative impls though. I'm not 100% with this one.

/// Finishes the block with local PoW if needed.
/// Without local PoW, it will finish the block with a 0 nonce.
pub async fn finish_block_builder(&self, parents: Option<Parents>, payload: Option<Payload>) -> Result<Block> {
Expand Down
13 changes: 13 additions & 0 deletions sdk/src/client/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ pub struct ClientBuilder {
#[cfg(not(target_family = "wasm"))]
#[serde(default, skip_serializing_if = "Option::is_none")]
pub pow_worker_count: Option<usize>,
#[cfg(not(target_family = "wasm"))]
#[serde(default = "default_max_parallel_api_requests")]
pub max_parallel_api_requests: usize,
}

fn default_api_timeout() -> Duration {
Expand All @@ -58,6 +61,11 @@ fn default_remote_pow_timeout() -> Duration {
DEFAULT_REMOTE_POW_API_TIMEOUT
}

#[cfg(not(target_family = "wasm"))]
fn default_max_parallel_api_requests() -> usize {
super::constants::MAX_PARALLEL_API_REQUESTS
}

impl Default for NetworkInfo {
fn default() -> Self {
Self {
Expand All @@ -82,6 +90,8 @@ impl Default for ClientBuilder {
remote_pow_timeout: DEFAULT_REMOTE_POW_API_TIMEOUT,
#[cfg(not(target_family = "wasm"))]
pow_worker_count: None,
#[cfg(not(target_family = "wasm"))]
max_parallel_api_requests: super::constants::MAX_PARALLEL_API_REQUESTS,
}
}
}
Expand Down Expand Up @@ -269,6 +279,7 @@ impl ClientBuilder {
sender: RwLock::new(mqtt_event_tx),
receiver: RwLock::new(mqtt_event_rx),
},
worker_pool: crate::client::worker::WorkerPool::new(self.max_parallel_api_requests),
});

client_inner.sync_nodes(&nodes, ignore_node_health).await?;
Expand Down Expand Up @@ -327,6 +338,8 @@ impl ClientBuilder {
remote_pow_timeout: client.get_remote_pow_timeout().await,
#[cfg(not(target_family = "wasm"))]
pow_worker_count: *client.pow_worker_count.read().await,
#[cfg(not(target_family = "wasm"))]
max_parallel_api_requests: client.worker_pool.size().await,
}
}
}
Expand Down
41 changes: 37 additions & 4 deletions sdk/src/client/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ use {
tokio::sync::watch::{Receiver as WatchReceiver, Sender as WatchSender},
};

#[cfg(not(target_family = "wasm"))]
pub use super::worker::TaskPriority;
#[cfg(not(target_family = "wasm"))]
use super::worker::WorkerPool;
#[cfg(target_family = "wasm")]
use crate::client::constants::CACHE_NETWORK_INFO_TIMEOUT_IN_SECONDS;
use crate::{
Expand Down Expand Up @@ -56,6 +60,8 @@ pub struct ClientInner {
pub(crate) mqtt: MqttInner,
#[cfg(target_family = "wasm")]
pub(crate) last_sync: tokio::sync::Mutex<Option<u32>>,
#[cfg(not(target_family = "wasm"))]
pub(crate) worker_pool: WorkerPool,
}

#[derive(Default)]
Expand Down Expand Up @@ -83,10 +89,13 @@ pub(crate) struct MqttInner {
impl std::fmt::Debug for Client {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut d = f.debug_struct("Client");
d.field("node_manager", &self.inner.node_manager);
d.field("node_manager", &self.node_manager);
#[cfg(feature = "mqtt")]
d.field("broker_options", &self.inner.mqtt.broker_options);
d.field("network_info", &self.inner.network_info).finish()
d.field("broker_options", &self.mqtt.broker_options);
d.field("network_info", &self.network_info);
#[cfg(not(target_family = "wasm"))]
d.field("worker_pool", &self.worker_pool);
d.finish()
}
}

Expand All @@ -95,9 +104,33 @@ impl Client {
pub fn builder() -> ClientBuilder {
ClientBuilder::new()
}

#[cfg(not(target_family = "wasm"))]
pub async fn rate_limit<F, Fut>(&self, f: F) -> Fut::Output
where
F: 'static + Send + Sync + FnOnce(Self) -> Fut,
Fut: futures::Future + Send,
Fut::Output: Send,
{
self.prioritized_rate_limit(TaskPriority::Medium, f).await
}

#[cfg(not(target_family = "wasm"))]
pub async fn prioritized_rate_limit<F, Fut>(&self, priority: TaskPriority, f: F) -> Fut::Output
where
F: 'static + Send + Sync + FnOnce(Self) -> Fut,
Fut: futures::Future + Send,
Fut::Output: Send,
{
let client = self.clone();
self.worker_pool
.process_task(priority, async move { f(client).await })
.await
.unwrap() // TODO
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

todo 👀

}
}

impl ClientInner {
impl Client {
/// Gets the network related information such as network_id and min_pow_score
/// and if it's the default one, sync it first and set the NetworkInfo.
pub async fn get_network_info(&self) -> Result<NetworkInfo> {
Expand Down
2 changes: 2 additions & 0 deletions sdk/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ pub mod storage;
#[cfg_attr(docsrs, doc(cfg(feature = "stronghold")))]
pub mod stronghold;
pub mod utils;
#[cfg(not(target_family = "wasm"))]
pub(crate) mod worker;

#[cfg(feature = "mqtt")]
pub use self::node_api::mqtt;
Expand Down
90 changes: 15 additions & 75 deletions sdk/src/client/node_api/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@

pub mod routes;

#[cfg(not(target_family = "wasm"))]
use crate::client::constants::MAX_PARALLEL_API_REQUESTS;
use crate::{
client::{Client, Result},
types::block::output::{OutputId, OutputMetadata, OutputWithMetadata},
Expand All @@ -15,87 +13,29 @@ use crate::{
impl Client {
/// Request outputs by their output ID in parallel
pub async fn get_outputs(&self, output_ids: &[OutputId]) -> Result<Vec<OutputWithMetadata>> {
#[cfg(target_family = "wasm")]
let outputs = futures::future::try_join_all(output_ids.iter().map(|id| self.get_output(id))).await?;

#[cfg(not(target_family = "wasm"))]
let outputs =
futures::future::try_join_all(output_ids.chunks(MAX_PARALLEL_API_REQUESTS).map(|output_ids_chunk| {
let client = self.clone();
let output_ids_chunk = output_ids_chunk.to_vec();
async move {
tokio::spawn(async move {
futures::future::try_join_all(output_ids_chunk.iter().map(|id| client.get_output(id))).await
})
.await?
}
}))
.await?
.into_iter()
.flatten()
.collect();

Ok(outputs)
futures::future::try_join_all(output_ids.iter().map(|id| self.get_output(id))).await
}

/// Request outputs by their output ID in parallel, ignoring failed requests
/// Useful to get data about spent outputs, that might not be pruned yet
pub async fn get_outputs_ignore_errors(&self, output_ids: &[OutputId]) -> Result<Vec<OutputWithMetadata>> {
#[cfg(target_family = "wasm")]
let outputs = futures::future::join_all(output_ids.iter().map(|id| self.get_output(id)))
.await
.into_iter()
.filter_map(Result::ok)
.collect();

#[cfg(not(target_family = "wasm"))]
let outputs =
futures::future::try_join_all(output_ids.chunks(MAX_PARALLEL_API_REQUESTS).map(|output_ids_chunk| {
let client = self.clone();
let output_ids_chunk = output_ids_chunk.to_vec();
tokio::spawn(async move {
futures::future::join_all(output_ids_chunk.iter().map(|id| client.get_output(id)))
.await
.into_iter()
.filter_map(Result::ok)
.collect::<Vec<_>>()
})
}))
.await?
.into_iter()
.flatten()
.collect();

Ok(outputs)
Ok(
futures::future::join_all(output_ids.iter().map(|id| self.get_output(id)))
.await
.into_iter()
.filter_map(Result::ok)
.collect(),
)
}

/// Requests metadata for outputs by their output ID in parallel, ignoring failed requests
pub async fn get_outputs_metadata_ignore_errors(&self, output_ids: &[OutputId]) -> Result<Vec<OutputMetadata>> {
#[cfg(target_family = "wasm")]
let metadata = futures::future::join_all(output_ids.iter().map(|id| self.get_output_metadata(id)))
.await
.into_iter()
.filter_map(Result::ok)
.collect();

#[cfg(not(target_family = "wasm"))]
let metadata =
futures::future::try_join_all(output_ids.chunks(MAX_PARALLEL_API_REQUESTS).map(|output_ids_chunk| {
let client = self.clone();
let output_ids_chunk = output_ids_chunk.to_vec();
tokio::spawn(async move {
futures::future::join_all(output_ids_chunk.iter().map(|id| client.get_output_metadata(id)))
.await
.into_iter()
.filter_map(Result::ok)
.collect::<Vec<_>>()
})
}))
.await?
.into_iter()
.flatten()
.collect();

Ok(metadata)
Ok(
futures::future::join_all(output_ids.iter().map(|id| self.get_output_metadata(id)))
.await
.into_iter()
.filter_map(Result::ok)
.collect(),
)
}
}
Loading
Loading