Skip to content

Commit

Permalink
agent: Ensure all events are written when io_uring is not used
Browse files Browse the repository at this point in the history
This switches to using `tokio::io::AsyncWriteExt::write_all` to
guarantee that all data is written to the primary log file.

Signed-off-by: Daiki Ueno <[email protected]>
  • Loading branch information
ueno committed Jun 29, 2023
1 parent bc42c70 commit e5f0b67
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 13 deletions.
2 changes: 1 addition & 1 deletion agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ page_size = "0.5"
serde = "1.0"
serde_cbor = "0.10"
time = { version = "0.3", features = ["formatting", "local-offset", "macros"] }
tokio = { version = "1.23", features = ["io-util", "signal"] }
tokio = { version = "1.23", features = ["fs", "io-util", "signal"] }
tokio-uring = { version = "0.4", optional = true }
toml = "0.6"
tracing = "0.1"
Expand Down
33 changes: 21 additions & 12 deletions agent/src/log_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,26 @@ impl LogWriter {
Ok(())
}

#[cfg(feature = "tokio-uring")]
async fn write_all(&mut self, data: Vec<u8>) -> Result<()> {
let (res, _) = match self.file {
Some(ref file) => file.write_at(data, self.offset).await,
_ => bail!("log file is not opened"),
};
let n = res?;
self.offset += n as u64;
Ok(())
}

#[cfg(not(feature = "tokio-uring"))]
async fn write_all(&mut self, data: Vec<u8>) -> Result<()> {
match self.file {
Some(ref mut file) => file.write_all(&data).await?,
_ => bail!("log file is not opened"),
};
Ok(())
}

pub async fn flush(&mut self) -> Result<()> {
self.pending_events = 0;
for group in self.groups.clone() {
Expand All @@ -158,18 +178,7 @@ impl LogWriter {
config::Format::Packed => serde_cbor::ser::to_vec_packed(&group)?,
config::Format::Minimal => to_vec_minimal(&group)?,
};
#[cfg(feature = "tokio-uring")]
let res = match self.file {
Some(ref file) => file.write_at(v, self.offset).await.0,
_ => bail!("log file is not opened"),
};
#[cfg(not(feature = "tokio-uring"))]
let res = match self.file {
Some(ref mut file) => file.write(&v).await,
_ => bail!("log file is not opened"),
};
let n = res?;
self.offset += n as u64;
self.write_all(v).await?;
self.written_events += group.events().len();
}
self.groups.clear();
Expand Down

0 comments on commit e5f0b67

Please sign in to comment.