Skip to content

Commit

Permalink
Add a request pool (#1137)
Browse files Browse the repository at this point in the history
* initial changes

* fix wasm

* add rand dep to client

* Rewrite to be a much simpler request pool

* cleanup

* Add builder fn and changelog

* timeout cleanup

* fix wasm

* refactor

* add config to bindings

* changelogs

* Update sdk/CHANGELOG.md

* Update release date

* SDK release date

---------

Co-authored-by: Thibault Martinez <[email protected]>
  • Loading branch information
Alexandcoats and thibault-martinez authored Sep 7, 2023
1 parent a3260a4 commit 7d89459
Show file tree
Hide file tree
Showing 18 changed files with 247 additions and 253 deletions.
6 changes: 5 additions & 1 deletion bindings/nodejs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Security -->

## 1.0.9 - 2023-09-06
## 1.0.9 - 2023-09-07

### Added

- `IClientOptions::maxParallelApiRequests`;

### Fixed

Expand Down
2 changes: 2 additions & 0 deletions bindings/nodejs/lib/types/client/client-options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ export interface IClientOptions {
powWorkerCount?: number;
/** Whether the PoW should be done locally or remotely. */
localPow?: boolean;
/** The maximum parallel API requests. */
maxParallelApiRequests?: number;
}

/** Time duration */
Expand Down
2 changes: 1 addition & 1 deletion bindings/nodejs/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@iota/sdk",
"version": "1.0.8",
"version": "1.0.9",
"description": "Node.js binding to the IOTA SDK library",
"main": "out/index.js",
"types": "out/index.d.ts",
Expand Down
6 changes: 6 additions & 0 deletions bindings/python/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Security -->

## 1.0.2 - 2023-MM-DD

### Added

- `ClientOptions::maxParallelApiRequests`;

## 1.0.1 - 2023-08-23

### Fixed
Expand Down
3 changes: 3 additions & 0 deletions bindings/python/iota_sdk/types/client_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ class ClientOptions:
Timeout when sending a block that requires remote proof of work.
powWorkerCount (int):
The amount of threads to be used for proof of work.
maxParallelApiRequests (int):
The maximum parallel API requests.
"""
primaryNode: Optional[str] = None
primaryPowNode: Optional[str] = None
Expand All @@ -103,6 +105,7 @@ class ClientOptions:
apiTimeout: Optional[Duration] = None
remotePowTimeout: Optional[Duration] = None
powWorkerCount: Optional[int] = None
maxParallelApiRequests: Optional[int] = None

def as_dict(self):
config = {k: v for k, v in self.__dict__.items() if v is not None}
Expand Down
3 changes: 2 additions & 1 deletion sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Security -->

## 1.0.3 - 2023-MM-DD
## 1.0.3 - 2023-09-07

### Added

- `migrate_db_chrysalis_to_stardust()` function;
- `Wallet::get_chrysalis_data()` method;
- `PrivateKeySecretManager` and `SecretManager::PrivateKey`;
- `SecretManager::from` impl for variants;
- `Client` requests now obey a maximum concurrency using a request pool (set via `ClientBuilder::with_max_parallel_api_requests`);

### Fixed

Expand Down
21 changes: 21 additions & 0 deletions sdk/src/client/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ pub struct ClientBuilder {
#[cfg(not(target_family = "wasm"))]
#[serde(default, skip_serializing_if = "Option::is_none")]
pub pow_worker_count: Option<usize>,
/// The maximum parallel API requests
#[cfg(not(target_family = "wasm"))]
#[serde(default = "default_max_parallel_api_requests")]
pub max_parallel_api_requests: usize,
}

fn default_api_timeout() -> Duration {
Expand All @@ -58,6 +62,11 @@ fn default_remote_pow_timeout() -> Duration {
DEFAULT_REMOTE_POW_API_TIMEOUT
}

#[cfg(not(target_family = "wasm"))]
fn default_max_parallel_api_requests() -> usize {
super::constants::MAX_PARALLEL_API_REQUESTS
}

impl Default for NetworkInfo {
fn default() -> Self {
Self {
Expand All @@ -82,6 +91,8 @@ impl Default for ClientBuilder {
remote_pow_timeout: DEFAULT_REMOTE_POW_API_TIMEOUT,
#[cfg(not(target_family = "wasm"))]
pow_worker_count: None,
#[cfg(not(target_family = "wasm"))]
max_parallel_api_requests: super::constants::MAX_PARALLEL_API_REQUESTS,
}
}
}
Expand Down Expand Up @@ -237,6 +248,13 @@ impl ClientBuilder {
self
}

/// Set maximum parallel API requests.
#[cfg(not(target_family = "wasm"))]
pub fn with_max_parallel_api_requests(mut self, max_parallel_api_requests: usize) -> Self {
self.max_parallel_api_requests = max_parallel_api_requests;
self
}

/// Build the Client instance.
#[cfg(not(target_family = "wasm"))]
pub async fn finish(self) -> Result<Client> {
Expand Down Expand Up @@ -269,6 +287,7 @@ impl ClientBuilder {
sender: RwLock::new(mqtt_event_tx),
receiver: RwLock::new(mqtt_event_rx),
},
request_pool: crate::client::request_pool::RequestPool::new(self.max_parallel_api_requests),
});

client_inner.sync_nodes(&nodes, ignore_node_health).await?;
Expand Down Expand Up @@ -327,6 +346,8 @@ impl ClientBuilder {
remote_pow_timeout: client.get_remote_pow_timeout().await,
#[cfg(not(target_family = "wasm"))]
pow_worker_count: *client.pow_worker_count.read().await,
#[cfg(not(target_family = "wasm"))]
max_parallel_api_requests: client.request_pool.size().await,
}
}
}
Expand Down
13 changes: 10 additions & 3 deletions sdk/src/client/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ use {
tokio::sync::watch::{Receiver as WatchReceiver, Sender as WatchSender},
};

#[cfg(not(target_family = "wasm"))]
use super::request_pool::RequestPool;
#[cfg(target_family = "wasm")]
use crate::client::constants::CACHE_NETWORK_INFO_TIMEOUT_IN_SECONDS;
use crate::{
Expand Down Expand Up @@ -56,6 +58,8 @@ pub struct ClientInner {
pub(crate) mqtt: MqttInner,
#[cfg(target_family = "wasm")]
pub(crate) last_sync: tokio::sync::Mutex<Option<u32>>,
#[cfg(not(target_family = "wasm"))]
pub(crate) request_pool: RequestPool,
}

#[derive(Default)]
Expand Down Expand Up @@ -83,10 +87,13 @@ pub(crate) struct MqttInner {
impl std::fmt::Debug for Client {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut d = f.debug_struct("Client");
d.field("node_manager", &self.inner.node_manager);
d.field("node_manager", &self.node_manager);
#[cfg(feature = "mqtt")]
d.field("broker_options", &self.inner.mqtt.broker_options);
d.field("network_info", &self.inner.network_info).finish()
d.field("broker_options", &self.mqtt.broker_options);
d.field("network_info", &self.network_info);
#[cfg(not(target_family = "wasm"))]
d.field("request_pool", &self.request_pool);
d.finish()
}
}

Expand Down
2 changes: 2 additions & 0 deletions sdk/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ pub mod core;
pub mod error;
pub mod node_api;
pub mod node_manager;
#[cfg(not(target_family = "wasm"))]
pub(crate) mod request_pool;
pub mod secret;
pub mod storage;
#[cfg(feature = "stronghold")]
Expand Down
90 changes: 15 additions & 75 deletions sdk/src/client/node_api/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@

pub mod routes;

#[cfg(not(target_family = "wasm"))]
use crate::client::constants::MAX_PARALLEL_API_REQUESTS;
use crate::{
client::{Client, Result},
types::block::output::{OutputId, OutputMetadata, OutputWithMetadata},
Expand All @@ -15,87 +13,29 @@ use crate::{
impl Client {
/// Request outputs by their output ID in parallel
pub async fn get_outputs(&self, output_ids: &[OutputId]) -> Result<Vec<OutputWithMetadata>> {
#[cfg(target_family = "wasm")]
let outputs = futures::future::try_join_all(output_ids.iter().map(|id| self.get_output(id))).await?;

#[cfg(not(target_family = "wasm"))]
let outputs =
futures::future::try_join_all(output_ids.chunks(MAX_PARALLEL_API_REQUESTS).map(|output_ids_chunk| {
let client = self.clone();
let output_ids_chunk = output_ids_chunk.to_vec();
async move {
tokio::spawn(async move {
futures::future::try_join_all(output_ids_chunk.iter().map(|id| client.get_output(id))).await
})
.await?
}
}))
.await?
.into_iter()
.flatten()
.collect();

Ok(outputs)
futures::future::try_join_all(output_ids.iter().map(|id| self.get_output(id))).await
}

/// Request outputs by their output ID in parallel, ignoring failed requests
/// Useful to get data about spent outputs, that might not be pruned yet
pub async fn get_outputs_ignore_errors(&self, output_ids: &[OutputId]) -> Result<Vec<OutputWithMetadata>> {
#[cfg(target_family = "wasm")]
let outputs = futures::future::join_all(output_ids.iter().map(|id| self.get_output(id)))
.await
.into_iter()
.filter_map(Result::ok)
.collect();

#[cfg(not(target_family = "wasm"))]
let outputs =
futures::future::try_join_all(output_ids.chunks(MAX_PARALLEL_API_REQUESTS).map(|output_ids_chunk| {
let client = self.clone();
let output_ids_chunk = output_ids_chunk.to_vec();
tokio::spawn(async move {
futures::future::join_all(output_ids_chunk.iter().map(|id| client.get_output(id)))
.await
.into_iter()
.filter_map(Result::ok)
.collect::<Vec<_>>()
})
}))
.await?
.into_iter()
.flatten()
.collect();

Ok(outputs)
Ok(
futures::future::join_all(output_ids.iter().map(|id| self.get_output(id)))
.await
.into_iter()
.filter_map(Result::ok)
.collect(),
)
}

/// Requests metadata for outputs by their output ID in parallel, ignoring failed requests
pub async fn get_outputs_metadata_ignore_errors(&self, output_ids: &[OutputId]) -> Result<Vec<OutputMetadata>> {
#[cfg(target_family = "wasm")]
let metadata = futures::future::join_all(output_ids.iter().map(|id| self.get_output_metadata(id)))
.await
.into_iter()
.filter_map(Result::ok)
.collect();

#[cfg(not(target_family = "wasm"))]
let metadata =
futures::future::try_join_all(output_ids.chunks(MAX_PARALLEL_API_REQUESTS).map(|output_ids_chunk| {
let client = self.clone();
let output_ids_chunk = output_ids_chunk.to_vec();
tokio::spawn(async move {
futures::future::join_all(output_ids_chunk.iter().map(|id| client.get_output_metadata(id)))
.await
.into_iter()
.filter_map(Result::ok)
.collect::<Vec<_>>()
})
}))
.await?
.into_iter()
.flatten()
.collect();

Ok(metadata)
Ok(
futures::future::join_all(output_ids.iter().map(|id| self.get_output_metadata(id)))
.await
.into_iter()
.filter_map(Result::ok)
.collect(),
)
}
}
Loading

0 comments on commit 7d89459

Please sign in to comment.