Skip to content

Commit

Permalink
Increased concurrency for local message delivery
Browse files Browse the repository at this point in the history
  • Loading branch information
mdecimus committed Jan 16, 2025
1 parent ef72141 commit bfba299
Show file tree
Hide file tree
Showing 8 changed files with 56 additions and 19 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ To upgrade update the webadmin and then replace the `stalwart-mail` binary.

### Added
- Automatic revoking of access tokens when secrets, permissions, ACLs or group memberships change (#649).
- Increased concurrency for local message delivery (configurable via `queue.threads.local`).
- Cluster node roles.
- `config_get` expression function.

### Changed
- `queue.outbound.concurrency` is now `queue.threads.remote`.
- `lookup.default.hostname` is now `server.hostname`.
- `lookup.default.domain` is now `report.domain`.

Expand Down
9 changes: 7 additions & 2 deletions crates/common/src/config/smtp/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ pub struct QueueOutboundTimeout {

#[derive(Debug, Clone)]
pub struct QueueThrottle {
pub outbound_concurrency: u64,
pub outbound_concurrency: usize,
pub local_concurrency: usize,
pub sender: Vec<Throttle>,
pub rcpt: Vec<Throttle>,
pub host: Vec<Throttle>,
Expand Down Expand Up @@ -203,6 +204,7 @@ impl Default for QueueConfig {
},
throttle: QueueThrottle {
outbound_concurrency: 25,
local_concurrency: 10,
sender: Default::default(),
rcpt: Default::default(),
host: Default::default(),
Expand Down Expand Up @@ -387,8 +389,11 @@ fn parse_queue_throttle(config: &mut Config) -> QueueThrottle {
rcpt: Vec::new(),
host: Vec::new(),
outbound_concurrency: config
.property_or_default::<u64>("queue.outbound.concurrency", "25")
.property_or_default::<usize>("queue.threads.remote", "25")
.unwrap_or(25),
local_concurrency: config
.property_or_default::<usize>("queue.threads.local", "10")
.unwrap_or(10),
};

let all_throttles = parse_throttle(
Expand Down
2 changes: 1 addition & 1 deletion crates/common/src/listener/acme/order.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl Server {
match self.order(provider).await {
Ok(pem) => return self.process_cert(provider, pem, false).await,
Err(err)
if !err.matches(EventType::Acme(AcmeEvent::OrderInvalid)) && backoff < 16 =>
if !err.matches(EventType::Acme(AcmeEvent::OrderInvalid)) && backoff < 9 =>
{
trc::event!(
Acme(AcmeEvent::RenewBackoff),
Expand Down
46 changes: 38 additions & 8 deletions crates/jmap/src/services/delivery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,50 @@
use std::sync::Arc;

use common::{core::BuildServer, ipc::DeliveryEvent, Inner};
use tokio::sync::mpsc;
use tokio::sync::{mpsc, Semaphore};

use super::ingest::MailDelivery;

pub fn spawn_delivery_manager(inner: Arc<Inner>, mut delivery_rx: mpsc::Receiver<DeliveryEvent>) {
tokio::spawn(async move {
while let Some(event) = delivery_rx.recv().await {
match event {
DeliveryEvent::Ingest { message, result_tx } => {
result_tx
.send(inner.build_server().deliver_message(message).await)
.ok();
let semaphore = Arc::new(Semaphore::new(
inner
.shared_core
.load()
.smtp
.queue
.throttle
.local_concurrency,
));

loop {
let permit = match semaphore.clone().acquire_owned().await {
Ok(permit) => permit,
Err(_) => {
trc::error!(trc::StoreEvent::UnexpectedError
.into_err()
.details("Semaphore error")
.caused_by(trc::location!()));
break;
}
};

match delivery_rx.recv().await {
Some(event) => match event {
DeliveryEvent::Ingest { message, result_tx } => {
let server = inner.build_server();

tokio::spawn(async move {
result_tx.send(server.deliver_message(message).await).ok();

drop(permit);
});
}
DeliveryEvent::Stop => break,
},
None => {
break;
}
DeliveryEvent::Stop => break,
}
}
});
Expand Down
6 changes: 3 additions & 3 deletions crates/store/src/backend/s3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ impl S3Store {
500..=599 if retries_left > 0 => {
// wait backoff
tokio::time::sleep(Duration::from_secs(
1 << (self.max_retries - retries_left).max(16),
1 << (self.max_retries - retries_left).max(6),
))
.await;

Expand Down Expand Up @@ -130,7 +130,7 @@ impl S3Store {
500..=599 if retries_left > 0 => {
// wait backoff
tokio::time::sleep(Duration::from_secs(
1 << (self.max_retries - retries_left).max(16),
1 << (self.max_retries - retries_left).max(6),
))
.await;

Expand Down Expand Up @@ -161,7 +161,7 @@ impl S3Store {
500..=599 if retries_left > 0 => {
// wait backoff
tokio::time::sleep(Duration::from_secs(
1 << (self.max_retries - retries_left).max(16),
1 << (self.max_retries - retries_left).max(6),
))
.await;

Expand Down
2 changes: 1 addition & 1 deletion tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ resolver = "2"
[features]
#default = ["sqlite", "postgres", "mysql", "rocks", "elastic", "s3", "redis", "azure", "foundationdb"]
#default = ["sqlite", "postgres", "mysql", "rocks", "s3", "redis", "foundationdb"]
default = ["rocks", "redis"]
default = ["rocks", "redis", "s3"]
sqlite = ["store/sqlite"]
foundationdb = ["store/foundation", "common/foundation"]
postgres = ["store/postgres"]
Expand Down
4 changes: 2 additions & 2 deletions tests/src/jmap/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,9 +381,9 @@ pub async fn jmap_tests() {
email_query_changes::test(&mut params).await;
email_copy::test(&mut params).await;
thread_get::test(&mut params).await;
thread_merge::test(&mut params).await;
thread_merge::test(&mut params).await;*/
mailbox::test(&mut params).await;
delivery::test(&mut params).await;*/
delivery::test(&mut params).await;
auth_acl::test(&mut params).await;
auth_limits::test(&mut params).await;
auth_oauth::test(&mut params).await;
Expand Down
4 changes: 2 additions & 2 deletions tests/src/smtp/queue/concurrent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ relay = true
[session.data.limits]
messages = 2000
[queue.outbound]
concurrency = 4
[queue.threads]
remote = 4
[queue.schedule]
retry = "1s"
Expand Down

0 comments on commit bfba299

Please sign in to comment.