From db3d4d7903eec0441dc611c9a4ca0df94bdf99fc Mon Sep 17 00:00:00 2001 From: Benjamin Kampmann Date: Fri, 28 Jun 2024 22:00:09 +0530 Subject: [PATCH 01/10] Allow to limit the number of concurrent requests made by the sdk --- crates/matrix-sdk/src/config/request.rs | 24 ++++++++++++++-- crates/matrix-sdk/src/http_client/mod.rs | 35 +++++++++++++++++++++++- 2 files changed, 56 insertions(+), 3 deletions(-) diff --git a/crates/matrix-sdk/src/config/request.rs b/crates/matrix-sdk/src/config/request.rs index 226d26a99b8..650bbf4758d 100644 --- a/crates/matrix-sdk/src/config/request.rs +++ b/crates/matrix-sdk/src/config/request.rs @@ -44,18 +44,21 @@ pub struct RequestConfig { pub(crate) timeout: Duration, pub(crate) retry_limit: Option, pub(crate) retry_timeout: Option, + pub(crate) max_concurrent_requests: usize, pub(crate) force_auth: bool, } #[cfg(not(tarpaulin_include))] impl Debug for RequestConfig { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - let Self { timeout, retry_limit, retry_timeout, force_auth } = self; + let Self { timeout, retry_limit, retry_timeout, force_auth, max_concurrent_requests } = + self; let mut res = fmt.debug_struct("RequestConfig"); res.field("timeout", timeout) .maybe_field("retry_limit", retry_limit) - .maybe_field("retry_timeout", retry_timeout); + .maybe_field("retry_timeout", retry_timeout) + .field("max_concurrent_requests", max_concurrent_requests); if *force_auth { res.field("force_auth", &true); @@ -71,6 +74,7 @@ impl Default for RequestConfig { timeout: DEFAULT_REQUEST_TIMEOUT, retry_limit: Default::default(), retry_timeout: Default::default(), + max_concurrent_requests: 0, force_auth: false, } } @@ -106,6 +110,22 @@ impl RequestConfig { self } + /// The total limit of request that are pending or run concurrently. + /// Any additional request beyond that number will be waiting until another + /// concurrent requests finished. Requests are queued fairly. + #[must_use] + pub fn max_concurrent_requests(mut self, limit: usize) -> Self { + self.max_concurrent_requests = limit; + self + } + + /// Disable the limit of concurrent requests. Setting the limit to 0 + /// has the same effect. + #[must_use] + pub fn disable_max_concurrent_requests(mut self) -> Self { + self.max_concurrent_requests = 0; + self + } /// Set the timeout duration for all HTTP requests. #[must_use] pub fn timeout(mut self, timeout: Duration) -> Self { diff --git a/crates/matrix-sdk/src/http_client/mod.rs b/crates/matrix-sdk/src/http_client/mod.rs index 3b2a699b966..1aa99397119 100644 --- a/crates/matrix-sdk/src/http_client/mod.rs +++ b/crates/matrix-sdk/src/http_client/mod.rs @@ -30,6 +30,7 @@ use ruma::api::{ error::{FromHttpResponseError, IntoHttpError}, AuthScheme, MatrixVersion, OutgoingRequest, SendAccessToken, }; +use tokio::sync::{Semaphore, SemaphorePermit}; use tracing::{debug, field::debug, instrument, trace}; use crate::{config::RequestConfig, error::HttpError}; @@ -48,16 +49,45 @@ pub(crate) use native::HttpSettings; pub(crate) const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(30); +#[derive(Clone, Debug)] +struct MaybeSemaphore(Arc>); + +#[allow(dead_code)] // holding this until drop is all we are doing +struct MaybeSemaphorePermit<'a>(Option>); + +impl MaybeSemaphore { + fn new(max: usize) -> Self { + let inner = if max > 0 { Some(Semaphore::new(max)) } else { None }; + MaybeSemaphore(Arc::new(inner)) + } + + async fn acquire(&self) -> MaybeSemaphorePermit { + match self.0.as_ref() { + Some(inner) => { + // ignoring errors as we never close this + MaybeSemaphorePermit(inner.acquire().await.ok()) + } + None => MaybeSemaphorePermit(None), + } + } +} + #[derive(Clone, Debug)] pub(crate) struct HttpClient { pub(crate) inner: reqwest::Client, pub(crate) request_config: RequestConfig, + queue: MaybeSemaphore, next_request_id: Arc, } impl HttpClient { pub(crate) fn new(inner: reqwest::Client, request_config: RequestConfig) -> Self { - HttpClient { inner, request_config, next_request_id: AtomicU64::new(0).into() } + HttpClient { + inner, + request_config, + queue: MaybeSemaphore::new(request_config.max_concurrent_requests), + next_request_id: AtomicU64::new(0).into(), + } } fn get_request_id(&self) -> String { @@ -184,6 +214,9 @@ impl HttpClient { request }; + // will be automatically dropped at the end of this function + let _handle = self.queue.acquire().await; + debug!("Sending request"); // There's a bunch of state in send_request, factor out a pinned inner From 0a38cdf085be9b17ccca793ad8f2bd2fa1aebf18 Mon Sep 17 00:00:00 2001 From: Benjamin Kampmann Date: Fri, 28 Jun 2024 22:03:31 +0530 Subject: [PATCH 02/10] Add Changelog entry --- crates/matrix-sdk/CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/matrix-sdk/CHANGELOG.md b/crates/matrix-sdk/CHANGELOG.md index 6454a206811..f2bb58ac0e5 100644 --- a/crates/matrix-sdk/CHANGELOG.md +++ b/crates/matrix-sdk/CHANGELOG.md @@ -20,6 +20,7 @@ Breaking changes: Additions: +- new `RequestConfig.max_concurrent_requests` which allows to limit the maximum number of concurrent requests the internal HTTP client issues (all others have to wait until the number drops below that threshold again) - Expose new method `Client::Oidc::login_with_qr_code()`. ([#3466](https://github.com/matrix-org/matrix-rust-sdk/pull/3466)) - Add the `ClientBuilder::add_root_certificates()` method which re-exposes the From f8668101c4bbff4087d7abb5ccc2478226ac987e Mon Sep 17 00:00:00 2001 From: Benjamin Kampmann Date: Fri, 28 Jun 2024 22:31:42 +0530 Subject: [PATCH 03/10] Add lifetime param to make linting happy --- crates/matrix-sdk/src/http_client/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/matrix-sdk/src/http_client/mod.rs b/crates/matrix-sdk/src/http_client/mod.rs index 1aa99397119..e12844f51a5 100644 --- a/crates/matrix-sdk/src/http_client/mod.rs +++ b/crates/matrix-sdk/src/http_client/mod.rs @@ -61,7 +61,7 @@ impl MaybeSemaphore { MaybeSemaphore(Arc::new(inner)) } - async fn acquire(&self) -> MaybeSemaphorePermit { + async fn acquire(&self) -> MaybeSemaphorePermit<'_> { match self.0.as_ref() { Some(inner) => { // ignoring errors as we never close this From 5fe3b9410bef88962233fd6de365c1605b3c1ddd Mon Sep 17 00:00:00 2001 From: Benjamin Kampmann Date: Mon, 1 Jul 2024 18:01:43 +0530 Subject: [PATCH 04/10] Switch concurrent_request_semaphore param to Option --- crates/matrix-sdk/src/config/request.rs | 16 +++++----------- crates/matrix-sdk/src/http_client/mod.rs | 13 ++++++++----- 2 files changed, 13 insertions(+), 16 deletions(-) diff --git a/crates/matrix-sdk/src/config/request.rs b/crates/matrix-sdk/src/config/request.rs index 650bbf4758d..88b66e4ff05 100644 --- a/crates/matrix-sdk/src/config/request.rs +++ b/crates/matrix-sdk/src/config/request.rs @@ -14,6 +14,7 @@ use std::{ fmt::{self, Debug}, + num::NonZeroUsize, time::Duration, }; @@ -44,7 +45,7 @@ pub struct RequestConfig { pub(crate) timeout: Duration, pub(crate) retry_limit: Option, pub(crate) retry_timeout: Option, - pub(crate) max_concurrent_requests: usize, + pub(crate) max_concurrent_requests: Option, pub(crate) force_auth: bool, } @@ -58,7 +59,7 @@ impl Debug for RequestConfig { res.field("timeout", timeout) .maybe_field("retry_limit", retry_limit) .maybe_field("retry_timeout", retry_timeout) - .field("max_concurrent_requests", max_concurrent_requests); + .maybe_field("max_concurrent_requests", max_concurrent_requests); if *force_auth { res.field("force_auth", &true); @@ -74,7 +75,7 @@ impl Default for RequestConfig { timeout: DEFAULT_REQUEST_TIMEOUT, retry_limit: Default::default(), retry_timeout: Default::default(), - max_concurrent_requests: 0, + max_concurrent_requests: Default::default(), force_auth: false, } } @@ -114,18 +115,11 @@ impl RequestConfig { /// Any additional request beyond that number will be waiting until another /// concurrent requests finished. Requests are queued fairly. #[must_use] - pub fn max_concurrent_requests(mut self, limit: usize) -> Self { + pub fn max_concurrent_requests(mut self, limit: Option) -> Self { self.max_concurrent_requests = limit; self } - /// Disable the limit of concurrent requests. Setting the limit to 0 - /// has the same effect. - #[must_use] - pub fn disable_max_concurrent_requests(mut self) -> Self { - self.max_concurrent_requests = 0; - self - } /// Set the timeout duration for all HTTP requests. #[must_use] pub fn timeout(mut self, timeout: Duration) -> Self { diff --git a/crates/matrix-sdk/src/http_client/mod.rs b/crates/matrix-sdk/src/http_client/mod.rs index e12844f51a5..243b85c33d6 100644 --- a/crates/matrix-sdk/src/http_client/mod.rs +++ b/crates/matrix-sdk/src/http_client/mod.rs @@ -15,6 +15,7 @@ use std::{ any::type_name, fmt::Debug, + num::NonZeroUsize, sync::{ atomic::{AtomicU64, Ordering}, Arc, @@ -56,8 +57,8 @@ struct MaybeSemaphore(Arc>); struct MaybeSemaphorePermit<'a>(Option>); impl MaybeSemaphore { - fn new(max: usize) -> Self { - let inner = if max > 0 { Some(Semaphore::new(max)) } else { None }; + fn new(max: Option) -> Self { + let inner = max.map(|i| Semaphore::new(i.into())); MaybeSemaphore(Arc::new(inner)) } @@ -76,7 +77,7 @@ impl MaybeSemaphore { pub(crate) struct HttpClient { pub(crate) inner: reqwest::Client, pub(crate) request_config: RequestConfig, - queue: MaybeSemaphore, + concurrent_request_semaphore: MaybeSemaphore, next_request_id: Arc, } @@ -85,7 +86,9 @@ impl HttpClient { HttpClient { inner, request_config, - queue: MaybeSemaphore::new(request_config.max_concurrent_requests), + concurrent_request_semaphore: MaybeSemaphore::new( + request_config.max_concurrent_requests, + ), next_request_id: AtomicU64::new(0).into(), } } @@ -215,7 +218,7 @@ impl HttpClient { }; // will be automatically dropped at the end of this function - let _handle = self.queue.acquire().await; + let _handle = self.concurrent_request_semaphore.acquire().await; debug!("Sending request"); From 273ec086479199998553d6afcd75d973230f8bc1 Mon Sep 17 00:00:00 2001 From: Benjamin Kampmann Date: Mon, 1 Jul 2024 22:17:19 +0530 Subject: [PATCH 05/10] Add tests for concurrency limiting --- crates/matrix-sdk/src/http_client/mod.rs | 104 +++++++++++++++++++++++ 1 file changed, 104 insertions(+) diff --git a/crates/matrix-sdk/src/http_client/mod.rs b/crates/matrix-sdk/src/http_client/mod.rs index 243b85c33d6..4451e40d653 100644 --- a/crates/matrix-sdk/src/http_client/mod.rs +++ b/crates/matrix-sdk/src/http_client/mod.rs @@ -295,3 +295,107 @@ impl tower::Service> for HttpClient { Box::pin(fut) } } + +#[cfg(test)] +mod tests { + use crate::{ + http_client::RequestConfig, + test_utils::{set_client_session, test_client_builder_with_server}, + }; + use matrix_sdk_test::async_test; + use std::{ + num::NonZeroUsize, + sync::{ + atomic::{AtomicU8, Ordering}, + Arc, + }, + time::Duration, + }; + use wiremock::{matchers::method, Mock, Request, ResponseTemplate}; + + #[async_test] + async fn ensure_concurrent_request_limit_is_observed() { + let (client_builder, server) = test_client_builder_with_server().await; + let mut client = client_builder + .request_config(RequestConfig::default().max_concurrent_requests(NonZeroUsize::new(5))) + .build() + .await + .unwrap(); + + set_client_session(&mut client).await; + + let counter = Arc::new(AtomicU8::new(0)); + let inner_counter = counter.clone(); + + Mock::given(method("GET")) + .respond_with(move |_req: &Request| { + inner_counter.fetch_add(1, Ordering::SeqCst); + // we stall the requests + ResponseTemplate::new(200).set_delay(Duration::from_secs(60)) + }) + .mount(&server) + .await; + + let _bg_task = tokio::spawn(async move { + let mut pollers = Vec::new(); + + for _n in 0..10 { + pollers.push(client.whoami()); + } + // issue parallel execution + futures_util::future::join_all(pollers).await + }); + + // give it a moment to issue the requests + tokio::time::sleep(Duration::from_secs(2)).await; + + assert_eq!( + counter.load(Ordering::SeqCst), + 5, + "More requests passed than the limit we configured" + ); + } + + #[async_test] + async fn ensure_no_max_concurrent_request_does_not_limit() { + let (client_builder, server) = test_client_builder_with_server().await; + let mut client = client_builder + .request_config(RequestConfig::default().max_concurrent_requests(None)) + .build() + .await + .unwrap(); + + set_client_session(&mut client).await; + + let counter = Arc::new(AtomicU8::new(0)); + let inner_counter = counter.clone(); + + Mock::given(method("GET")) + .respond_with(move |_req: &Request| { + inner_counter.fetch_add(1, Ordering::SeqCst); + // we stall the requests + ResponseTemplate::new(200).set_delay(Duration::from_secs(60)) + }) + .mount(&server) + .await; + + let _bg_task = tokio::spawn(async move { + let mut pollers = Vec::new(); + + for _n in 1..254 { + pollers.push(client.whoami()); + } + // issue parallel execution + futures_util::future::join_all(pollers).await + }); + + // give it a moment to issue the requests + tokio::time::sleep(Duration::from_secs(2)).await; + + assert_eq!( + counter.load(Ordering::SeqCst), + 254, + "More requests passed than the limit we configured" + ); + } +} From 63a5f06aa6b9c83463773a2570194f69a3b79429 Mon Sep 17 00:00:00 2001 From: Benjamin Kampmann Date: Tue, 2 Jul 2024 15:26:34 +0530 Subject: [PATCH 06/10] Improving the test as per review feedback --- crates/matrix-sdk/src/http_client/mod.rs | 68 +++++++++++++----------- 1 file changed, 36 insertions(+), 32 deletions(-) diff --git a/crates/matrix-sdk/src/http_client/mod.rs b/crates/matrix-sdk/src/http_client/mod.rs index 4451e40d653..c1b91bcd97f 100644 --- a/crates/matrix-sdk/src/http_client/mod.rs +++ b/crates/matrix-sdk/src/http_client/mod.rs @@ -298,11 +298,6 @@ impl tower::Service> for HttpClient { #[cfg(test)] mod tests { - use crate::{ - http_client::RequestConfig, - test_utils::{set_client_session, test_client_builder_with_server}, - }; - use matrix_sdk_test::async_test; use std::{ num::NonZeroUsize, sync::{ @@ -311,7 +306,17 @@ mod tests { }, time::Duration, }; - use wiremock::{matchers::method, Mock, Request, ResponseTemplate}; + + use matrix_sdk_test::{async_test, test_json}; + use wiremock::{ + matchers::{method, path}, + Mock, Request, ResponseTemplate, + }; + + use crate::{ + http_client::RequestConfig, + test_utils::{set_client_session, test_client_builder_with_server}, + }; #[async_test] async fn ensure_concurrent_request_limit_is_observed() { @@ -328,6 +333,13 @@ mod tests { let inner_counter = counter.clone(); Mock::given(method("GET")) + .and(path("/_matrix/client/versions")) + .respond_with(ResponseTemplate::new(200).set_body_json(&*test_json::VERSIONS)) + .mount(&server) + .await; + + Mock::given(method("GET")) + .and(path("_matrix/client/r0/account/whoami")) .respond_with(move |_req: &Request| { inner_counter.fetch_add(1, Ordering::SeqCst); // we stall the requests @@ -336,24 +348,19 @@ mod tests { .mount(&server) .await; - let _bg_task = tokio::spawn(async move { - let mut pollers = Vec::new(); - - for _n in 0..10 { - pollers.push(client.whoami()); - } - // issue parallel execution - futures_util::future::join_all(pollers).await + let bg_task = tokio::spawn(async move { + futures_util::future::join_all((0..10).map(|_| client.whoami())).await }); - // give it a moment to issue the requests - tokio::time::sleep(Duration::from_secs(2)).await; + // give it some time to issue the requests + tokio::time::sleep(Duration::from_millis(300)).await; assert_eq!( counter.load(Ordering::SeqCst), 5, "More requests passed than the limit we configured" ); + bg_task.abort(); } #[async_test] @@ -371,31 +378,28 @@ mod tests { let inner_counter = counter.clone(); Mock::given(method("GET")) + .and(path("/_matrix/client/versions")) + .respond_with(ResponseTemplate::new(200).set_body_json(&*test_json::VERSIONS)) + .mount(&server) + .await; + + Mock::given(method("GET")) + .and(path("_matrix/client/r0/account/whoami")) .respond_with(move |_req: &Request| { inner_counter.fetch_add(1, Ordering::SeqCst); - // we stall the requests ResponseTemplate::new(200).set_delay(Duration::from_secs(60)) }) .mount(&server) .await; - let _bg_task = tokio::spawn(async move { - let mut pollers = Vec::new(); - - for _n in 1..254 { - pollers.push(client.whoami()); - } - // issue parallel execution - futures_util::future::join_all(pollers).await + let bg_task = tokio::spawn(async move { + futures_util::future::join_all((0..254).map(|_| client.whoami())).await }); - // give it a moment to issue the requests - tokio::time::sleep(Duration::from_secs(2)).await; + // give it some time to issue the requests + tokio::time::sleep(Duration::from_secs(3)).await; - assert_eq!( - counter.load(Ordering::SeqCst), - 254, - "More requests passed than the limit we configured" - ); + assert_eq!(counter.load(Ordering::SeqCst), 254, "Not all requests passed through"); + bg_task.abort(); } } From 6d6f2ca74e0f95e8cbbe77d66ded0a1b2631ca01 Mon Sep 17 00:00:00 2001 From: Benjamin Kampmann Date: Tue, 2 Jul 2024 15:27:27 +0530 Subject: [PATCH 07/10] Missed one delay --- crates/matrix-sdk/src/http_client/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/matrix-sdk/src/http_client/mod.rs b/crates/matrix-sdk/src/http_client/mod.rs index c1b91bcd97f..17cd93bc1aa 100644 --- a/crates/matrix-sdk/src/http_client/mod.rs +++ b/crates/matrix-sdk/src/http_client/mod.rs @@ -397,7 +397,7 @@ mod tests { }); // give it some time to issue the requests - tokio::time::sleep(Duration::from_secs(3)).await; + tokio::time::sleep(Duration::from_millis(300)).await; assert_eq!(counter.load(Ordering::SeqCst), 254, "Not all requests passed through"); bg_task.abort(); From 0ce0a2a92183fa2ef92eb564a6bfc377ee591007 Mon Sep 17 00:00:00 2001 From: Benjamin Kampmann Date: Tue, 2 Jul 2024 16:30:35 +0530 Subject: [PATCH 08/10] limit test to non wasm --- crates/matrix-sdk/src/http_client/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/matrix-sdk/src/http_client/mod.rs b/crates/matrix-sdk/src/http_client/mod.rs index 17cd93bc1aa..f017d551184 100644 --- a/crates/matrix-sdk/src/http_client/mod.rs +++ b/crates/matrix-sdk/src/http_client/mod.rs @@ -296,7 +296,7 @@ impl tower::Service> for HttpClient { } } -#[cfg(test)] +#[cfg(all(test, not(target_arch = "wasm32")))] mod tests { use std::{ num::NonZeroUsize, From 2911fa488eeecd28a441d54833f9fbdbe12b0876 Mon Sep 17 00:00:00 2001 From: Benjamin Kampmann Date: Tue, 2 Jul 2024 17:36:46 +0530 Subject: [PATCH 09/10] fix lints --- crates/matrix-sdk/src/http_client/mod.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/matrix-sdk/src/http_client/mod.rs b/crates/matrix-sdk/src/http_client/mod.rs index f017d551184..af8b9ae3815 100644 --- a/crates/matrix-sdk/src/http_client/mod.rs +++ b/crates/matrix-sdk/src/http_client/mod.rs @@ -321,13 +321,13 @@ mod tests { #[async_test] async fn ensure_concurrent_request_limit_is_observed() { let (client_builder, server) = test_client_builder_with_server().await; - let mut client = client_builder + let client = client_builder .request_config(RequestConfig::default().max_concurrent_requests(NonZeroUsize::new(5))) .build() .await .unwrap(); - set_client_session(&mut client).await; + set_client_session(&client).await; let counter = Arc::new(AtomicU8::new(0)); let inner_counter = counter.clone(); @@ -366,13 +366,13 @@ mod tests { #[async_test] async fn ensure_no_max_concurrent_request_does_not_limit() { let (client_builder, server) = test_client_builder_with_server().await; - let mut client = client_builder + let client = client_builder .request_config(RequestConfig::default().max_concurrent_requests(None)) .build() .await .unwrap(); - set_client_session(&mut client).await; + set_client_session(&client).await; let counter = Arc::new(AtomicU8::new(0)); let inner_counter = counter.clone(); From 52c5387bce0a716fdc267c038db2963f6646e3eb Mon Sep 17 00:00:00 2001 From: Benjamin Kampmann Date: Thu, 4 Jul 2024 18:33:38 +0530 Subject: [PATCH 10/10] Fix broken CI, improve inline docs --- crates/matrix-sdk/src/http_client/mod.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/crates/matrix-sdk/src/http_client/mod.rs b/crates/matrix-sdk/src/http_client/mod.rs index af8b9ae3815..7521f532488 100644 --- a/crates/matrix-sdk/src/http_client/mod.rs +++ b/crates/matrix-sdk/src/http_client/mod.rs @@ -53,7 +53,7 @@ pub(crate) const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(30); #[derive(Clone, Debug)] struct MaybeSemaphore(Arc>); -#[allow(dead_code)] // holding this until drop is all we are doing +#[allow(dead_code)] // false-positive lint: we never use it but only hold it for the drop struct MaybeSemaphorePermit<'a>(Option>); impl MaybeSemaphore { @@ -65,7 +65,8 @@ impl MaybeSemaphore { async fn acquire(&self) -> MaybeSemaphorePermit<'_> { match self.0.as_ref() { Some(inner) => { - // ignoring errors as we never close this + // This can only ever error if the semaphore was closed, + // which we never do, so we can safely ignore any error case MaybeSemaphorePermit(inner.acquire().await.ok()) } None => MaybeSemaphorePermit(None), @@ -397,7 +398,7 @@ mod tests { }); // give it some time to issue the requests - tokio::time::sleep(Duration::from_millis(300)).await; + tokio::time::sleep(Duration::from_secs(1)).await; assert_eq!(counter.load(Ordering::SeqCst), 254, "Not all requests passed through"); bg_task.abort();