From 54733de0ff353ca7c77bedce030761283eec9ccc Mon Sep 17 00:00:00 2001 From: mdecimus Date: Thu, 30 Jan 2025 12:39:46 +0100 Subject: [PATCH] Group pipelined IMAP FETCH and STATUS operations --- crates/imap/src/core/client.rs | 6 +-- crates/imap/src/op/fetch.rs | 74 ++++++++++++++++++++------------- crates/imap/src/op/status.rs | 75 ++++++++++++++++++++-------------- 3 files changed, 93 insertions(+), 62 deletions(-) diff --git a/crates/imap/src/core/client.rs b/crates/imap/src/core/client.rs index 195e1fa8..10f4ad86 100644 --- a/crates/imap/src/core/client.rs +++ b/crates/imap/src/core/client.rs @@ -111,7 +111,7 @@ impl Session { .await .map(|_| SessionResult::Continue), Command::Status => self - .handle_status(request) + .handle_status(group_requests(&mut requests, vec![request])) .await .map(|_| SessionResult::Continue), Command::Append => self @@ -134,8 +134,8 @@ impl Session { .handle_search(request, false, is_uid) .await .map(|_| SessionResult::Continue), - Command::Fetch(is_uid) => self - .handle_fetch(request, is_uid) + Command::Fetch(_) => self + .handle_fetch(group_requests(&mut requests, vec![request])) .await .map(|_| SessionResult::Continue), Command::Store(is_uid) => self diff --git a/crates/imap/src/op/fetch.rs b/crates/imap/src/op/fetch.rs index bff71819..6c14aa03 100644 --- a/crates/imap/src/op/fetch.rs +++ b/crates/imap/src/op/fetch.rs @@ -42,43 +42,61 @@ use trc::AddContext; use super::{FromModSeq, ImapContext}; impl Session { - pub async fn handle_fetch( - &mut self, - request: Request, - is_uid: bool, - ) -> trc::Result<()> { + pub async fn handle_fetch(&mut self, requests: Vec>) -> trc::Result<()> { // Validate access self.assert_has_permission(Permission::ImapFetch)?; - let op_start = Instant::now(); - let arguments = request.parse_fetch()?; - let (data, mailbox) = self.state.select_data(); let is_qresync = self.is_qresync; let is_rev2 = self.version.is_rev2(); - let enabled_condstore = if !self.is_condstore && arguments.changed_since.is_some() - || arguments.attributes.contains(&Attribute::ModSeq) - { - self.is_condstore = true; - true - } else { - false - }; + let mut ops = Vec::with_capacity(requests.len()); + + for request in requests { + let is_uid = matches!(request.command, Command::Fetch(true)); + match request.parse_fetch() { + Ok(arguments) => { + let enabled_condstore = if !self.is_condstore + && arguments.changed_since.is_some() + || arguments.attributes.contains(&Attribute::ModSeq) + { + self.is_condstore = true; + true + } else { + false + }; + + ops.push(Ok((is_uid, enabled_condstore, arguments))); + } + Err(err) => { + ops.push(Err(err)); + } + } + } spawn_op!(data, { - let response = data - .fetch( - arguments, - mailbox, - is_uid, - is_qresync, - is_rev2, - enabled_condstore, - op_start, - ) - .await?; - data.write_bytes(response.into_bytes()).await + for op in ops { + match op { + Ok((is_uid, enabled_condstore, arguments)) => { + let response = data + .fetch( + arguments, + mailbox.clone(), + is_uid, + is_qresync, + is_rev2, + enabled_condstore, + Instant::now(), + ) + .await?; + + data.write_bytes(response.into_bytes()).await?; + } + Err(err) => data.write_error(err).await?, + } + } + + Ok(()) }) } } diff --git a/crates/imap/src/op/status.rs b/crates/imap/src/op/status.rs index ddc0aba4..1badde82 100644 --- a/crates/imap/src/op/status.rs +++ b/crates/imap/src/op/status.rs @@ -34,47 +34,60 @@ use trc::AddContext; use super::ToModSeq; impl Session { - pub async fn handle_status(&mut self, request: Request) -> trc::Result<()> { + pub async fn handle_status(&mut self, requests: Vec>) -> trc::Result<()> { // Validate access self.assert_has_permission(Permission::ImapStatus)?; - let op_start = Instant::now(); - let arguments = request.parse_status(self.version)?; let version = self.version; let data = self.state.session_data(); spawn_op!(data, { - // Refresh mailboxes - data.synchronize_mailboxes(false) - .await - .imap_ctx(&arguments.tag, trc::location!())?; + let mut did_sync = false; - // Fetch status - let status = data - .status(arguments.mailbox_name, &arguments.items) - .await - .imap_ctx(&arguments.tag, trc::location!())?; + for request in requests.into_iter() { + match request.parse_status(version) { + Ok(arguments) => { + let op_start = Instant::now(); + if !did_sync { + // Refresh mailboxes + data.synchronize_mailboxes(false) + .await + .imap_ctx(&arguments.tag, trc::location!())?; + did_sync = true; + } - trc::event!( - Imap(trc::ImapEvent::Status), - SpanId = data.session_id, - MailboxName = status.mailbox_name.clone(), - Details = arguments - .items - .iter() - .map(|c| trc::Value::from(format!("{c:?}"))) - .collect::>(), - Elapsed = op_start.elapsed() - ); + // Fetch status + let status = data + .status(arguments.mailbox_name, &arguments.items) + .await + .imap_ctx(&arguments.tag, trc::location!())?; - let mut buf = Vec::with_capacity(32); - status.serialize(&mut buf, version.is_rev2()); - data.write_bytes( - StatusResponse::completed(Command::Status) - .with_tag(arguments.tag) - .serialize(buf), - ) - .await + trc::event!( + Imap(trc::ImapEvent::Status), + SpanId = data.session_id, + MailboxName = status.mailbox_name.clone(), + Details = arguments + .items + .iter() + .map(|c| trc::Value::from(format!("{c:?}"))) + .collect::>(), + Elapsed = op_start.elapsed() + ); + + let mut buf = Vec::with_capacity(32); + status.serialize(&mut buf, version.is_rev2()); + data.write_bytes( + StatusResponse::completed(Command::Status) + .with_tag(arguments.tag) + .serialize(buf), + ) + .await?; + } + Err(err) => data.write_error(err).await?, + } + } + + Ok(()) }) } }