Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: persist contents of read buffer if required #366

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 99 additions & 44 deletions storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use seahash::hash;
use std::collections::VecDeque;
use std::fs::{self, OpenOptions};
use std::io::{self, copy, Write};
use std::mem;
use std::path::{Path, PathBuf};

#[derive(thiserror::Error, Debug)]
Expand All @@ -23,7 +22,7 @@ pub enum Error {
}

pub struct Storage {
name: String,
pub name: String,
/// maximum allowed file size
max_file_size: usize,
/// current open file
Expand Down Expand Up @@ -99,34 +98,61 @@ impl Storage {
self.flush()
}

/// Force flush the contents of write buffer onto disk
pub fn flush(&mut self) -> Result<Option<u64>, Error> {
if self.current_write_file.is_empty() {
return Err(Error::NoWrites);
}

let Some(persistence) = &mut self.persistence else {
fn write_to_persistence(
persistence: &mut Option<Persistence>,
name: &String,
buf: &mut BytesMut,
) -> Result<Option<u64>, Error> {
let Some(persistence) = persistence else {
// TODO(RT): Make sure that disk files starts with id 1 to represent in memory file
// with id 0
self.current_write_file.clear();
buf.clear();
warn!(
"Persistence disabled for storage: {}. Deleted in-memory buffer on overflow",
self.name
name
);
return Ok(Some(0));
};

let NextFile { mut file, deleted } = persistence.open_next_write_file()?;
info!("Flushing data to disk for stoarge: {}; path = {:?}", self.name, file.path());
file.write(&mut self.current_write_file)?;
info!("Flushing data to disk for stoarge: {}; path = {:?}", name, file.path());
file.write(buf)?;

// 8 is the number of bytes the hash(u64) occupies
persistence.bytes_occupied += 8 + self.current_write_file.len();
self.current_write_file.clear();
persistence.bytes_occupied += 8 + buf.len();
buf.clear();

Ok(deleted)
}

/// Force flush the contents of write buffer onto disk
pub fn flush(&mut self) -> Result<Option<u64>, Error> {
if self.current_write_file.is_empty() {
return Err(Error::NoWrites);
}

Self::write_to_persistence(&mut self.persistence, &self.name, &mut self.current_write_file)
}

/// Force flush the contents of read buffer onto disk
pub fn save_read_buffer(&mut self) -> Result<(), Error> {
if self.persistence.as_ref().is_some_and(|p| p.current_read_file_id.is_some())
|| self.current_read_file.is_empty()
{
return Err(Error::NoWrites);
}

if let Some(deleted) = Self::write_to_persistence(
&mut self.persistence,
&self.name,
&mut self.current_read_file,
)? {
error!("Persistence deleted during flushing of read buffer: {deleted}");
}

Ok(())
}

/// Loads head file to current inmemory read buffer. Deletes
/// the file after loading. If all the disk data is caught up,
/// swaps current write buffer to current read buffer if there
Expand All @@ -138,41 +164,34 @@ impl Storage {
return Ok(());
}

let Some(persistence) = &mut self.persistence else {
mem::swap(&mut self.current_read_file, &mut self.current_write_file);
// If read buffer is 0 after swapping, all the data is caught up
if self.current_read_file.is_empty() {
return Err(Error::Done);
if let Some(persistence) = &mut self.persistence {
// Remove read file on completion in destructive-read mode
if let Some(id) =
persistence.current_read_file_id.take_if(|_| !persistence.non_destructive_read)
{
let deleted_file = persistence.remove(id)?;
debug!("Completed reading a persistence file, deleting it; storage = {}, path = {deleted_file:?}", self.name);
}

return Ok(());
};

// Remove read file on completion in destructive-read mode
let read_is_destructive = !persistence.non_destructive_read;
let read_file_id = persistence.current_read_file_id.take();
if let Some(id) = read_is_destructive.then_some(read_file_id).flatten() {
let deleted_file = persistence.remove(id)?;
debug!("Completed reading a persistence file, deleting it; storage = {}, path = {deleted_file:?}", self.name);
}

// Swap read buffer with write buffer to read data in inmemory write
// buffer when all the backlog disk files are done
if persistence.backlog_files.is_empty() {
mem::swap(&mut self.current_read_file, &mut self.current_write_file);
// If read buffer is 0 after swapping, all the data is caught up
if self.current_read_file.is_empty() {
return Err(Error::Done);
match persistence.load_next_read_file(&mut self.current_read_file) {
Err(Error::Done) => {}
Err(e) => {
error!("Couldn't read persisted file: {e}");
return Err(e);
}
_ => return Ok(()),
}
};

return Ok(());
if self.current_write_file.is_empty() {
return Err(Error::Done);
}

if let Err(e) = persistence.load_next_read_file(&mut self.current_read_file) {
self.current_read_file.clear();
persistence.current_read_file_id.take();
return Err(e);
}
// Swap read buffer with write buffer to read data from inmemory write
// buffer when all the backlog disk files are done.
std::mem::swap(&mut self.current_read_file, &mut self.current_write_file);
// Write buffer is emptied to ensure fresh start.
self.current_write_file.clear();

Ok(())
}
Expand Down Expand Up @@ -584,4 +603,40 @@ mod test {
let files = get_file_ids(&backup.path(), 10).unwrap();
assert_eq!(files, vec![10, 11, 12, 13, 14, 15, 16, 17, 18, 19]);
}

#[test]
fn ensure_current_read_file_is_not_lost() {
let backup = init_backup_folders();
let mut storage = Storage::new("test", 10 * 1036);
storage.set_persistence(backup.path(), 10).unwrap();
// partially fill write buffer
write_n_publishes(&mut storage, 1);

// Nothing written to disk
assert!(storage.persistence.as_ref().unwrap().backlog_files.is_empty());

// Trigger swap of read and write buffers, ensure packets in read buffer
storage.reload_on_eof().unwrap();
assert!(!storage.current_read_file.is_empty());
assert!(storage.persistence.as_ref().unwrap().current_read_file_id.is_none());

// Trigger flush onto disk, and drop storage
matches!(storage.flush().unwrap_err(), super::Error::NoWrites);
storage.save_read_buffer().unwrap();
drop(storage);

// reload storage
let mut storage = Storage::new("test", 10 * 1036);
storage.set_persistence(backup.path(), 10).unwrap();
assert_eq!(storage.file_count(), 1);

// verify read buffer was persisted by reading a single packet
read_n_publishes(&mut storage, 1);
assert_eq!(storage.file_count(), 0);
let file_id = storage.persistence.as_ref().unwrap().current_read_file_id.unwrap();
assert_eq!(file_id, 0);

// verify no more data left to be read
matches!(storage.reload_on_eof().unwrap_err(), super::Error::Done);
}
}
34 changes: 21 additions & 13 deletions uplink/src/base/serializer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,17 +200,32 @@ impl Storage {
}

// Ensures all data is written into persistence, when configured.
pub fn flush(&mut self) -> Result<Option<u64>, storage::Error> {
pub fn flush(&mut self) {
// Write live cache to disk when flushing
if let Some(mut publish) = self.latest_data.take() {
publish.pkid = 1;
if let Err(e) = publish.write(self.inner.writer()) {
error!("Failed to fill disk buffer. Error = {e}");
return Ok(None);
error!("Failed to serialize into write buffer. Error = {e}");
}
}

self.inner.flush()
// Save contents of read buffer to disk if required
match self.inner.save_read_buffer() {
Ok(()) => trace!("Force flushed read buffer onto disk; stream = {}", self.inner.name),
Err(storage::Error::NoWrites) => {}
Err(e) => {
error!("Error = {e}; storage = {}", self.inner.name)
}
}

// Flush contents of write buffer to disk if required
match self.inner.flush() {
Ok(_) => trace!("Force flushed write buffer onto disk; stream = {}", self.inner.name),
Err(storage::Error::NoWrites) => {}
Err(e) => {
error!("Error = {e}; storage = {}", self.inner.name)
}
}
}
}

Expand Down Expand Up @@ -322,15 +337,8 @@ impl StorageHandler {
// Force flushes all in-memory buffers to ensure zero packet loss during uplink restart.
// TODO: Ensure packets in read-buffer but not on disk are not lost.
fn flush_all(&mut self) {
for (stream_config, storage) in self.map.iter_mut() {
match storage.flush() {
Ok(_) => trace!("Force flushed stream = {} onto disk", stream_config.topic),
Err(storage::Error::NoWrites) => {}
Err(e) => error!(
"Error when force flushing storage = {}; error = {e}",
stream_config.topic
),
}
for storage in self.map.values_mut() {
storage.flush();
}
}

Expand Down
6 changes: 3 additions & 3 deletions uplink/tests/serializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,18 +87,18 @@ async fn preferential_send_on_network() {
one.set_persistence(persistence_path(&config.persistence_path, "one"), 1).unwrap();
one.write(publish("topic/one".to_string(), 4)).unwrap();
one.write(publish("topic/one".to_string(), 5)).unwrap();
one.flush().unwrap();
one.flush();

let mut two = Storage::new("topic/two", 1024 * 1024, false);
two.set_persistence(persistence_path(&config.persistence_path, "two"), 1).unwrap();
two.write(publish("topic/two".to_string(), 3)).unwrap();
two.flush().unwrap();
two.flush();

let mut top = Storage::new("topic/top", 1024 * 1024, false);
top.set_persistence(persistence_path(&config.persistence_path, "top"), 1).unwrap();
top.write(publish("topic/top".to_string(), 1)).unwrap();
top.write(publish("topic/top".to_string(), 2)).unwrap();
top.flush().unwrap();
top.flush();

let config = Arc::new(config);
let (data_tx, data_rx) = bounded(1);
Expand Down
Loading