From e5f0b67a3ffa301f7e3b86a0c6496a32addc9bd4 Mon Sep 17 00:00:00 2001 From: Daiki Ueno Date: Thu, 29 Jun 2023 14:33:38 +0200 Subject: [PATCH] agent: Ensure all events are written when io_uring is not used This switches to using `tokio::io::AsyncWriteExt::write_all` to guarantee that all data is written to the primary log file. Signed-off-by: Daiki Ueno --- agent/Cargo.toml | 2 +- agent/src/log_writer.rs | 33 +++++++++++++++++++++------------ 2 files changed, 22 insertions(+), 13 deletions(-) diff --git a/agent/Cargo.toml b/agent/Cargo.toml index faac8ba..bb4519a 100644 --- a/agent/Cargo.toml +++ b/agent/Cargo.toml @@ -23,7 +23,7 @@ page_size = "0.5" serde = "1.0" serde_cbor = "0.10" time = { version = "0.3", features = ["formatting", "local-offset", "macros"] } -tokio = { version = "1.23", features = ["io-util", "signal"] } +tokio = { version = "1.23", features = ["fs", "io-util", "signal"] } tokio-uring = { version = "0.4", optional = true } toml = "0.6" tracing = "0.1" diff --git a/agent/src/log_writer.rs b/agent/src/log_writer.rs index d7933f7..3fe21d1 100644 --- a/agent/src/log_writer.rs +++ b/agent/src/log_writer.rs @@ -147,6 +147,26 @@ impl LogWriter { Ok(()) } + #[cfg(feature = "tokio-uring")] + async fn write_all(&mut self, data: Vec) -> Result<()> { + let (res, _) = match self.file { + Some(ref file) => file.write_at(data, self.offset).await, + _ => bail!("log file is not opened"), + }; + let n = res?; + self.offset += n as u64; + Ok(()) + } + + #[cfg(not(feature = "tokio-uring"))] + async fn write_all(&mut self, data: Vec) -> Result<()> { + match self.file { + Some(ref mut file) => file.write_all(&data).await?, + _ => bail!("log file is not opened"), + }; + Ok(()) + } + pub async fn flush(&mut self) -> Result<()> { self.pending_events = 0; for group in self.groups.clone() { @@ -158,18 +178,7 @@ impl LogWriter { config::Format::Packed => serde_cbor::ser::to_vec_packed(&group)?, config::Format::Minimal => to_vec_minimal(&group)?, }; - #[cfg(feature = "tokio-uring")] - let res = match self.file { - Some(ref file) => file.write_at(v, self.offset).await.0, - _ => bail!("log file is not opened"), - }; - #[cfg(not(feature = "tokio-uring"))] - let res = match self.file { - Some(ref mut file) => file.write(&v).await, - _ => bail!("log file is not opened"), - }; - let n = res?; - self.offset += n as u64; + self.write_all(v).await?; self.written_events += group.events().len(); } self.groups.clear();