From bfba299e6b0509f3764a96dd4a1633805629d7ca Mon Sep 17 00:00:00 2001 From: mdecimus Date: Thu, 16 Jan 2025 18:52:46 +0100 Subject: [PATCH] Increased concurrency for local message delivery --- CHANGELOG.md | 2 ++ crates/common/src/config/smtp/queue.rs | 9 +++-- crates/common/src/listener/acme/order.rs | 2 +- crates/jmap/src/services/delivery.rs | 46 +++++++++++++++++++----- crates/store/src/backend/s3/mod.rs | 6 ++-- tests/Cargo.toml | 2 +- tests/src/jmap/mod.rs | 4 +-- tests/src/smtp/queue/concurrent.rs | 4 +-- 8 files changed, 56 insertions(+), 19 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index de8365672..9d0b3a9d5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,10 +8,12 @@ To upgrade update the webadmin and then replace the `stalwart-mail` binary. ### Added - Automatic revoking of access tokens when secrets, permissions, ACLs or group memberships change (#649). +- Increased concurrency for local message delivery (configurable via `queue.threads.local`). - Cluster node roles. - `config_get` expression function. ### Changed +- `queue.outbound.concurrency` is now `queue.threads.remote`. - `lookup.default.hostname` is now `server.hostname`. - `lookup.default.domain` is now `report.domain`. diff --git a/crates/common/src/config/smtp/queue.rs b/crates/common/src/config/smtp/queue.rs index 57cc69d41..60499d76a 100644 --- a/crates/common/src/config/smtp/queue.rs +++ b/crates/common/src/config/smtp/queue.rs @@ -84,7 +84,8 @@ pub struct QueueOutboundTimeout { #[derive(Debug, Clone)] pub struct QueueThrottle { - pub outbound_concurrency: u64, + pub outbound_concurrency: usize, + pub local_concurrency: usize, pub sender: Vec, pub rcpt: Vec, pub host: Vec, @@ -203,6 +204,7 @@ impl Default for QueueConfig { }, throttle: QueueThrottle { outbound_concurrency: 25, + local_concurrency: 10, sender: Default::default(), rcpt: Default::default(), host: Default::default(), @@ -387,8 +389,11 @@ fn parse_queue_throttle(config: &mut Config) -> QueueThrottle { rcpt: Vec::new(), host: Vec::new(), outbound_concurrency: config - .property_or_default::("queue.outbound.concurrency", "25") + .property_or_default::("queue.threads.remote", "25") .unwrap_or(25), + local_concurrency: config + .property_or_default::("queue.threads.local", "10") + .unwrap_or(10), }; let all_throttles = parse_throttle( diff --git a/crates/common/src/listener/acme/order.rs b/crates/common/src/listener/acme/order.rs index 2bfefb26f..21b28c9a8 100644 --- a/crates/common/src/listener/acme/order.rs +++ b/crates/common/src/listener/acme/order.rs @@ -59,7 +59,7 @@ impl Server { match self.order(provider).await { Ok(pem) => return self.process_cert(provider, pem, false).await, Err(err) - if !err.matches(EventType::Acme(AcmeEvent::OrderInvalid)) && backoff < 16 => + if !err.matches(EventType::Acme(AcmeEvent::OrderInvalid)) && backoff < 9 => { trc::event!( Acme(AcmeEvent::RenewBackoff), diff --git a/crates/jmap/src/services/delivery.rs b/crates/jmap/src/services/delivery.rs index b3f8b6749..17b9b5423 100644 --- a/crates/jmap/src/services/delivery.rs +++ b/crates/jmap/src/services/delivery.rs @@ -7,20 +7,50 @@ use std::sync::Arc; use common::{core::BuildServer, ipc::DeliveryEvent, Inner}; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, Semaphore}; use super::ingest::MailDelivery; pub fn spawn_delivery_manager(inner: Arc, mut delivery_rx: mpsc::Receiver) { tokio::spawn(async move { - while let Some(event) = delivery_rx.recv().await { - match event { - DeliveryEvent::Ingest { message, result_tx } => { - result_tx - .send(inner.build_server().deliver_message(message).await) - .ok(); + let semaphore = Arc::new(Semaphore::new( + inner + .shared_core + .load() + .smtp + .queue + .throttle + .local_concurrency, + )); + + loop { + let permit = match semaphore.clone().acquire_owned().await { + Ok(permit) => permit, + Err(_) => { + trc::error!(trc::StoreEvent::UnexpectedError + .into_err() + .details("Semaphore error") + .caused_by(trc::location!())); + break; + } + }; + + match delivery_rx.recv().await { + Some(event) => match event { + DeliveryEvent::Ingest { message, result_tx } => { + let server = inner.build_server(); + + tokio::spawn(async move { + result_tx.send(server.deliver_message(message).await).ok(); + + drop(permit); + }); + } + DeliveryEvent::Stop => break, + }, + None => { + break; } - DeliveryEvent::Stop => break, } } }); diff --git a/crates/store/src/backend/s3/mod.rs b/crates/store/src/backend/s3/mod.rs index f519380d0..df07ba4e2 100644 --- a/crates/store/src/backend/s3/mod.rs +++ b/crates/store/src/backend/s3/mod.rs @@ -100,7 +100,7 @@ impl S3Store { 500..=599 if retries_left > 0 => { // wait backoff tokio::time::sleep(Duration::from_secs( - 1 << (self.max_retries - retries_left).max(16), + 1 << (self.max_retries - retries_left).max(6), )) .await; @@ -130,7 +130,7 @@ impl S3Store { 500..=599 if retries_left > 0 => { // wait backoff tokio::time::sleep(Duration::from_secs( - 1 << (self.max_retries - retries_left).max(16), + 1 << (self.max_retries - retries_left).max(6), )) .await; @@ -161,7 +161,7 @@ impl S3Store { 500..=599 if retries_left > 0 => { // wait backoff tokio::time::sleep(Duration::from_secs( - 1 << (self.max_retries - retries_left).max(16), + 1 << (self.max_retries - retries_left).max(6), )) .await; diff --git a/tests/Cargo.toml b/tests/Cargo.toml index 5e54d7773..160214c83 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -7,7 +7,7 @@ resolver = "2" [features] #default = ["sqlite", "postgres", "mysql", "rocks", "elastic", "s3", "redis", "azure", "foundationdb"] #default = ["sqlite", "postgres", "mysql", "rocks", "s3", "redis", "foundationdb"] -default = ["rocks", "redis"] +default = ["rocks", "redis", "s3"] sqlite = ["store/sqlite"] foundationdb = ["store/foundation", "common/foundation"] postgres = ["store/postgres"] diff --git a/tests/src/jmap/mod.rs b/tests/src/jmap/mod.rs index bb48b1acb..ef7d01d02 100644 --- a/tests/src/jmap/mod.rs +++ b/tests/src/jmap/mod.rs @@ -381,9 +381,9 @@ pub async fn jmap_tests() { email_query_changes::test(&mut params).await; email_copy::test(&mut params).await; thread_get::test(&mut params).await; - thread_merge::test(&mut params).await; + thread_merge::test(&mut params).await;*/ mailbox::test(&mut params).await; - delivery::test(&mut params).await;*/ + delivery::test(&mut params).await; auth_acl::test(&mut params).await; auth_limits::test(&mut params).await; auth_oauth::test(&mut params).await; diff --git a/tests/src/smtp/queue/concurrent.rs b/tests/src/smtp/queue/concurrent.rs index 405551a0f..3ab9cdecd 100644 --- a/tests/src/smtp/queue/concurrent.rs +++ b/tests/src/smtp/queue/concurrent.rs @@ -22,8 +22,8 @@ relay = true [session.data.limits] messages = 2000 -[queue.outbound] -concurrency = 4 +[queue.threads] +remote = 4 [queue.schedule] retry = "1s"