Skip to content

Commit

Permalink
Group pipelined IMAP FETCH and STATUS operations
Browse files Browse the repository at this point in the history
  • Loading branch information
mdecimus committed Jan 30, 2025
1 parent 4c7052d commit 54733de
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 62 deletions.
6 changes: 3 additions & 3 deletions crates/imap/src/core/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ impl<T: SessionStream> Session<T> {
.await
.map(|_| SessionResult::Continue),
Command::Status => self
.handle_status(request)
.handle_status(group_requests(&mut requests, vec![request]))
.await
.map(|_| SessionResult::Continue),
Command::Append => self
Expand All @@ -134,8 +134,8 @@ impl<T: SessionStream> Session<T> {
.handle_search(request, false, is_uid)
.await
.map(|_| SessionResult::Continue),
Command::Fetch(is_uid) => self
.handle_fetch(request, is_uid)
Command::Fetch(_) => self
.handle_fetch(group_requests(&mut requests, vec![request]))
.await
.map(|_| SessionResult::Continue),
Command::Store(is_uid) => self
Expand Down
74 changes: 46 additions & 28 deletions crates/imap/src/op/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,43 +42,61 @@ use trc::AddContext;
use super::{FromModSeq, ImapContext};

impl<T: SessionStream> Session<T> {
pub async fn handle_fetch(
&mut self,
request: Request<Command>,
is_uid: bool,
) -> trc::Result<()> {
pub async fn handle_fetch(&mut self, requests: Vec<Request<Command>>) -> trc::Result<()> {
// Validate access
self.assert_has_permission(Permission::ImapFetch)?;

let op_start = Instant::now();
let arguments = request.parse_fetch()?;

let (data, mailbox) = self.state.select_data();
let is_qresync = self.is_qresync;
let is_rev2 = self.version.is_rev2();

let enabled_condstore = if !self.is_condstore && arguments.changed_since.is_some()
|| arguments.attributes.contains(&Attribute::ModSeq)
{
self.is_condstore = true;
true
} else {
false
};
let mut ops = Vec::with_capacity(requests.len());

for request in requests {
let is_uid = matches!(request.command, Command::Fetch(true));
match request.parse_fetch() {
Ok(arguments) => {
let enabled_condstore = if !self.is_condstore
&& arguments.changed_since.is_some()
|| arguments.attributes.contains(&Attribute::ModSeq)
{
self.is_condstore = true;
true
} else {
false
};

ops.push(Ok((is_uid, enabled_condstore, arguments)));
}
Err(err) => {
ops.push(Err(err));
}
}
}

spawn_op!(data, {
let response = data
.fetch(
arguments,
mailbox,
is_uid,
is_qresync,
is_rev2,
enabled_condstore,
op_start,
)
.await?;
data.write_bytes(response.into_bytes()).await
for op in ops {
match op {
Ok((is_uid, enabled_condstore, arguments)) => {
let response = data
.fetch(
arguments,
mailbox.clone(),
is_uid,
is_qresync,
is_rev2,
enabled_condstore,
Instant::now(),
)
.await?;

data.write_bytes(response.into_bytes()).await?;
}
Err(err) => data.write_error(err).await?,
}
}

Ok(())
})
}
}
Expand Down
75 changes: 44 additions & 31 deletions crates/imap/src/op/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,47 +34,60 @@ use trc::AddContext;
use super::ToModSeq;

impl<T: SessionStream> Session<T> {
pub async fn handle_status(&mut self, request: Request<Command>) -> trc::Result<()> {
pub async fn handle_status(&mut self, requests: Vec<Request<Command>>) -> trc::Result<()> {
// Validate access
self.assert_has_permission(Permission::ImapStatus)?;

let op_start = Instant::now();
let arguments = request.parse_status(self.version)?;
let version = self.version;
let data = self.state.session_data();

spawn_op!(data, {
// Refresh mailboxes
data.synchronize_mailboxes(false)
.await
.imap_ctx(&arguments.tag, trc::location!())?;
let mut did_sync = false;

// Fetch status
let status = data
.status(arguments.mailbox_name, &arguments.items)
.await
.imap_ctx(&arguments.tag, trc::location!())?;
for request in requests.into_iter() {
match request.parse_status(version) {
Ok(arguments) => {
let op_start = Instant::now();
if !did_sync {
// Refresh mailboxes
data.synchronize_mailboxes(false)
.await
.imap_ctx(&arguments.tag, trc::location!())?;
did_sync = true;
}

trc::event!(
Imap(trc::ImapEvent::Status),
SpanId = data.session_id,
MailboxName = status.mailbox_name.clone(),
Details = arguments
.items
.iter()
.map(|c| trc::Value::from(format!("{c:?}")))
.collect::<Vec<_>>(),
Elapsed = op_start.elapsed()
);
// Fetch status
let status = data
.status(arguments.mailbox_name, &arguments.items)
.await
.imap_ctx(&arguments.tag, trc::location!())?;

let mut buf = Vec::with_capacity(32);
status.serialize(&mut buf, version.is_rev2());
data.write_bytes(
StatusResponse::completed(Command::Status)
.with_tag(arguments.tag)
.serialize(buf),
)
.await
trc::event!(
Imap(trc::ImapEvent::Status),
SpanId = data.session_id,
MailboxName = status.mailbox_name.clone(),
Details = arguments
.items
.iter()
.map(|c| trc::Value::from(format!("{c:?}")))
.collect::<Vec<_>>(),
Elapsed = op_start.elapsed()
);

let mut buf = Vec::with_capacity(32);
status.serialize(&mut buf, version.is_rev2());
data.write_bytes(
StatusResponse::completed(Command::Status)
.with_tag(arguments.tag)
.serialize(buf),
)
.await?;
}
Err(err) => data.write_error(err).await?,
}
}

Ok(())
})
}
}
Expand Down

0 comments on commit 54733de

Please sign in to comment.