Skip to content

Commit

Permalink
Merge pull request #371 from shikhar/dsw-flush
Browse files Browse the repository at this point in the history
support partial flushes from DmaStreamWriter
  • Loading branch information
Glauber Costa authored Jul 5, 2021
2 parents bbeb3fd + 683b81d commit e79192b
Show file tree
Hide file tree
Showing 3 changed files with 257 additions and 43 deletions.
256 changes: 219 additions & 37 deletions glommio/src/io/dma_file_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::{
cell::RefCell,
collections::VecDeque,
io,
os::unix::prelude::AsRawFd,
pin::Pin,
rc::Rc,
task::{Context, Poll},
Expand Down Expand Up @@ -808,6 +809,12 @@ impl DmaStreamWriterFlushState {
}
}

fn last_flush_pos(&self) -> u64 {
self.flushes
.back()
.map_or(self.flushed_pos, |(pos, _)| *pos)
}

fn take_pending_handles(&mut self) -> Vec<task::JoinHandle<()>> {
let mut handles = Vec::with_capacity(self.pending_flush_count);
for (_, status) in &mut self.flushes {
Expand All @@ -822,9 +829,7 @@ impl DmaStreamWriterFlushState {

fn on_start(&mut self, flush_pos: u64, handle: task::JoinHandle<()>) {
self.pending_flush_count += 1;
if let Some((prev_flush_pos, _)) = self.flushes.back() {
assert!(*prev_flush_pos < flush_pos);
}
assert!(self.last_flush_pos() < flush_pos);
self.flushes
.push_back((flush_pos, FlushStatus::Pending(Some(handle))));
}
Expand Down Expand Up @@ -871,6 +876,7 @@ struct DmaStreamWriterState {
flush_state: DmaStreamWriterFlushState,
current_buffer: Option<DmaBuffer>,
aligned_pos: u64,
synced_pos: u64,
buffer_pos: usize,
write_behind: usize,
sync_on_close: bool,
Expand Down Expand Up @@ -920,13 +926,10 @@ impl DmaStreamWriterState {
file: Rc<DmaFile>,
do_close: bool,
) {
self.file_status = FileStatus::Closing;
let final_pos = self.current_pos();
if self.buffer_pos > 0 {
let buffer = self.current_buffer.take();
self.flush_one_buffer(buffer.unwrap(), state.clone(), file.clone());
}
self.flush_padded(state.clone(), file.clone());
let mut pending = self.take_pending_handles();
self.file_status = FileStatus::Closing;
Local::local(async move {
defer! {
waker.wake();
Expand All @@ -945,11 +948,12 @@ impl DmaStreamWriterState {

assert_eq!(state.flushed_pos(), final_pos);

if state.sync_on_close {
if state.sync_on_close && state.synced_pos < final_pos {
let res = file.fdatasync().await;
if collect_error!(state, res) {
return;
}
state.synced_pos = final_pos;
}

if state.must_truncate() {
Expand Down Expand Up @@ -983,6 +987,33 @@ impl DmaStreamWriterState {
self.flush_state.take_pending_handles()
}

fn flush_padded(&mut self, state: Rc<RefCell<Self>>, file: Rc<DmaFile>) -> bool {
if self.buffer_pos == 0 {
return false;
} else {
assert!(
self.buffer_pos < self.buffer_size,
"full buffer should have already been flushed"
);
}
let last_flush_pos = self.flush_state.last_flush_pos();
let current_pos = self.current_pos();
if last_flush_pos == current_pos {
return false;
} else {
assert!(last_flush_pos < current_pos);
}
let buffer = self.current_buffer.take().unwrap();
if let FileStatus::Open = self.file_status {
let mut buffer_copy = file.alloc_dma_buffer(self.buffer_size);
buffer_copy.as_bytes_mut()[..self.buffer_pos]
.copy_from_slice(&buffer.as_bytes()[..self.buffer_pos]);
self.current_buffer = Some(buffer_copy);
}
self.flush_one_buffer(buffer, state, file);
true
}

fn flush_one_buffer(&mut self, buffer: DmaBuffer, state: Rc<RefCell<Self>>, file: Rc<DmaFile>) {
let aligned_pos = self.aligned_pos;
let flush_pos = self.current_pos();
Expand All @@ -1004,10 +1035,26 @@ impl DmaStreamWriterState {

impl Drop for DmaStreamWriterState {
fn drop(&mut self) {
let mut pending = self.take_pending_handles();
assert!(
self.take_pending_handles().is_empty(),
"DmaStreamerWriter::drop() should have cancelled pending flushes"
);
}
}

impl Drop for DmaStreamWriter {
fn drop(&mut self) {
let mut state = self.state.borrow_mut();
let mut pending = state.take_pending_handles();
for flush in pending.drain(..) {
flush.cancel();
}
if state.must_truncate() {
let file = self.file.take().unwrap();
Local::get_reactor()
.sys
.async_truncate(file.as_raw_fd(), state.flushed_pos());
}
}
}

Expand Down Expand Up @@ -1046,6 +1093,7 @@ impl DmaStreamWriter {
file_status: FileStatus::Open,
aligned_pos: 0,
buffer_pos: 0,
synced_pos: 0,
};

let state = Rc::new(RefCell::new(state));
Expand Down Expand Up @@ -1124,13 +1172,125 @@ impl DmaStreamWriter {
self.state.borrow().flushed_pos()
}

/// Waits for all currently in-flight buffers to return and be safely stored
/// in the underlying storage
async fn flush_inner(&self, partial: bool) -> Result<u64> {
let (target_pos, mut pending) = {
let mut state = self.state.borrow_mut();
let pos = if partial && state.buffer_pos > 0 {
state.flush_padded(self.state.clone(), self.file.clone().unwrap());
state.current_pos()
} else {
state.aligned_pos
};
(pos, state.take_pending_handles())
};
for flush in pending.drain(..) {
flush.await;
}
let mut state = self.state.borrow_mut();
if let Some(err) = current_error!(state) {
return err;
}
assert!(state.flushed_pos() >= target_pos);
Ok(state.flushed_pos())
}

async fn sync_inner(&self) -> Result<u64> {
let (flushed_pos, presync_pos) = {
let state = self.state.borrow();
(state.flushed_pos(), state.synced_pos)
};
if presync_pos < flushed_pos {
self.file.clone().unwrap().fdatasync().await?;
self.state.borrow_mut().synced_pos = flushed_pos;
} else {
assert_eq!(presync_pos, flushed_pos);
}
Ok(flushed_pos)
}

/// Waits for all currently in-flight buffers to be written to the
/// underlying storage.
///
/// This does not include the current buffer if it is not full. If all data
/// must be flushed, use [`flush`].
///
/// Note that the current buffer being written to is not flushed, as it may
/// not be properly aligned. Buffers that are currently in-flight will
/// be waited on, and a sync operation will be issued by the operating
/// system.
/// Returns the flushed position of the file.
///
/// # Examples
/// ```no_run
/// use futures::io::AsyncWriteExt;
/// use glommio::{
/// io::{DmaFile, DmaStreamWriterBuilder},
/// LocalExecutor,
/// };
///
/// let ex = LocalExecutor::default();
/// ex.run(async {
/// let file = DmaFile::create("myfile.txt").await.unwrap();
/// let mut writer = DmaStreamWriterBuilder::new(file)
/// .with_buffer_size(4096)
/// .with_write_behind(2)
/// .build();
/// let buffer = [0u8; 5000];
/// writer.write_all(&buffer).await.unwrap();
/// // with 5000 bytes written into a 4096-byte buffer a flush
/// // has certainly started. But if very likely didn't finish right
/// // away.
/// assert_eq!(writer.current_flushed_pos(), 0);
/// assert_eq!(writer.flush_aligned().await.unwrap(), 4096);
/// writer.close().await.unwrap();
/// });
/// ```
///
/// [`flush`]: https://docs.rs/futures/0.3.15/futures/io/trait.AsyncWriteExt.html#method.flush
pub async fn flush_aligned(&self) -> Result<u64> {
self.flush_inner(false).await
}

/// Waits for all currently in-flight buffers to be written to the
/// underlying storage, and ensures they are safely persisted.
///
/// This does not include the current buffer if it is not full. If all data
/// must be synced, use [`Self::sync`].
///
/// Returns the flushed position of the file at the time the sync started.
///
/// # Examples
/// ```no_run
/// use futures::io::AsyncWriteExt;
/// use glommio::{
/// io::{DmaFile, DmaStreamWriterBuilder},
/// LocalExecutor,
/// };
///
/// let ex = LocalExecutor::default();
/// ex.run(async {
/// let file = DmaFile::create("myfile.txt").await.unwrap();
/// let mut writer = DmaStreamWriterBuilder::new(file)
/// .with_buffer_size(4096)
/// .with_write_behind(2)
/// .build();
/// let buffer = [0u8; 5000];
/// writer.write_all(&buffer).await.unwrap();
/// // with 5000 bytes written into a 4096-byte buffer a flush
/// // has certainly started. But if very likely didn't finish right
/// // away.
/// assert_eq!(writer.current_flushed_pos(), 0);
/// assert_eq!(writer.sync_aligned().await.unwrap(), 4096);
/// writer.close().await.unwrap();
/// });
/// ```
pub async fn sync_aligned(&self) -> Result<u64> {
self.flush_aligned().await?;
self.sync_inner().await
}

/// Waits for all buffers to be written to the underlying storage, and
/// ensures they are safely persisted.
///
/// This includes the current buffer even if it is not full, by padding it.
/// The padding will get over-written by future writes, or truncated upon
/// [`close`].
///
/// Returns the flushed position of the file at the time the sync started.
///
Expand All @@ -1153,28 +1313,17 @@ impl DmaStreamWriter {
/// writer.write_all(&buffer).await.unwrap();
/// // with 5000 bytes written into a 4096-byte buffer a flush
/// // has certainly started. But if very likely didn't finish right
/// // away. It will not be reflected on current_flushed_pos(), but a
/// // sync() will wait on it.
/// // away.
/// assert_eq!(writer.current_flushed_pos(), 0);
/// assert_eq!(writer.sync().await.unwrap(), 4096);
/// assert_eq!(writer.sync().await.unwrap(), 5000);
/// writer.close().await.unwrap();
/// });
/// ```
///
/// [`close`]: https://docs.rs/futures/0.3.15/futures/io/trait.AsyncWriteExt.html#method.close
pub async fn sync(&self) -> Result<u64> {
let mut pending = self.state.borrow_mut().take_pending_handles();
for v in pending.drain(..) {
v.await;
}
let presync_pos = {
let mut state = self.state.borrow_mut();
if let Some(err) = current_error!(state) {
return err;
}
state.flushed_pos()
};
let file = self.file.clone().unwrap();
file.fdatasync().await?;
Ok(presync_pos)
self.flush_inner(true).await?;
self.sync_inner().await
}

// internal function that does everything that close does (flushes buffers, etc,
Expand Down Expand Up @@ -1265,8 +1414,17 @@ impl AsyncWrite for DmaStreamWriter {
}
}

fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
let mut state = self.state.borrow_mut();
if let Some(err) = current_error!(state) {
return Poll::Ready(err);
}
if state.flushed_pos() == state.current_pos() {
return Poll::Ready(Ok(()));
}
state.flush_padded(self.state.clone(), self.file.clone().unwrap());
state.add_waker(cx.waker().clone());
Poll::Pending
}

fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Expand Down Expand Up @@ -1299,7 +1457,7 @@ mod test {
timer::Timer,
};
use futures::{AsyncReadExt, AsyncWriteExt};
use std::{io::ErrorKind, time::Duration};
use std::{io::ErrorKind, path::Path, time::Duration};

macro_rules! file_stream_read_test {
( $name:ident, $dir:ident, $kind:ident, $file:ident, $file_size:ident: $size:tt, $code:block) => {
Expand Down Expand Up @@ -1359,6 +1517,10 @@ mod test {
};
}

fn file_size<P: AsRef<Path>>(path: P) -> u64 {
std::fs::metadata(path).unwrap().len()
}

file_stream_read_test!(read_exact_empty_file, path, _k, file, _file_size: 0, {
let mut reader = DmaStreamReaderBuilder::new(file)
.build();
Expand Down Expand Up @@ -1846,6 +2008,24 @@ mod test {
assert_eq!(writer.current_flushed_pos(), 5000);
});

file_stream_write_test!(flush_and_drop, path, _k, filename, file, {
let mut writer = DmaStreamWriterBuilder::new(file)
.with_buffer_size(4096)
.with_write_behind(2)
.build();

assert_eq!(writer.current_pos(), 0);
let buffer = [0u8; 5000];
writer.write_all(&buffer).await.unwrap();
assert_eq!(writer.flush_aligned().await.unwrap(), 4096);
assert_eq!(file_size(&filename), 4096);
writer.flush().await.unwrap();
assert_eq!(writer.current_flushed_pos(), 5000);
assert_eq!(file_size(&filename), 8192);
drop(writer);
assert_eq!(file_size(&filename), 5000);
});

file_stream_write_test!(sync_and_close, path, _k, filename, file, {
let mut writer = DmaStreamWriterBuilder::new(file)
.with_buffer_size(4096)
Expand All @@ -1855,12 +2035,14 @@ mod test {
assert_eq!(writer.current_pos(), 0);
let buffer = [0u8; 5000];
writer.write_all(&buffer).await.unwrap();
assert_eq!(writer.sync().await.unwrap(), 4096);
assert_eq!(writer.sync().await.unwrap(), 5000);
assert_eq!(file_size(&filename), 8192);
// write more
writer.write_all(&buffer).await.unwrap();
writer.close().await.unwrap();

assert_eq!(writer.current_flushed_pos(), 10000);
assert_eq!(file_size(&filename), 10000);
});

#[test]
Expand Down
Loading

0 comments on commit e79192b

Please sign in to comment.