Skip to content

Commit

Permalink
feat: State compression
Browse files Browse the repository at this point in the history
  • Loading branch information
Nukesor committed Feb 18, 2025
1 parent e2096dc commit 5846ab0
Show file tree
Hide file tree
Showing 10 changed files with 121 additions and 25 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ Upon updating Pueue and restarting the daemon, the previous state will be wiped,
- Print most of Pueue's info/log messages to `stderr`. Only keep useful stuff like json and task log output on `stdout`.
- **Breaking**: Ported from `anyhow` to `color_eyre` for prettier log output.
- **Breaking**: Switch `cbor` handling library, potentially breaking backwards-compatible communication on a data format level.
- Option to save the state in compressed form. This can be toggled with the `daemon.compress_status_file` config file.
Preliminary testing shows significant compression ratios (up to x15), which helps with large states in embedded and I/O bound environments.

### Add

Expand Down
37 changes: 36 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pueue/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ color-eyre.workspace = true
comfy-table = "7"
command-group.workspace = true
ctrlc = { version = "3", features = ["termination"] }
flate2 = "1"
handlebars.workspace = true
interim = { version = "0.1.2", features = ["chrono"] }
pest = "2.7"
Expand Down
53 changes: 41 additions & 12 deletions pueue/src/daemon/internal_state/state.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use std::{
collections::BTreeMap,
fs::read_to_string,
path::Path,
fs::{read_to_string, File},
io::{Read, Write},
process::Child,
sync::{Arc, Mutex, MutexGuard},
};

use chrono::Local;
use flate2::Compression;
use pueue_lib::{
error::Error,
network::message::request::Shutdown,
Expand Down Expand Up @@ -246,16 +247,30 @@ impl InternalState {
/// In comparison to the daemon -> client communication, the state is saved
/// as JSON for readability and debugging purposes.
pub fn save(&self, settings: &Settings) -> Result<()> {
let serialized = serde_json::to_string(&self.inner).context("Failed to serialize state:");
let serialized =
serde_json::to_string(&self.inner).context("Failed to serialize state:")?;

let serialized = serialized.unwrap();
let path = settings.shared.pueue_directory();
let temp = path.join("state.json.partial");
let real = path.join("state.json");
let mut temp = path.join("state.json.partial");
let mut real = path.join("state.json");

// Write to temporary log file first, to prevent loss due to crashes.
std::fs::write(&temp, serialized)
.context("Failed to write temp file while saving state.")?;
if settings.daemon.compress_status_file {
temp = path.join("state.json.gz.partial");
real = path.join("state.json.gz");

let file = if temp.exists() {
File::open(&temp)?

Check warning on line 262 in pueue/src/daemon/internal_state/state.rs

View check run for this annotation

Codecov / codecov/patch

pueue/src/daemon/internal_state/state.rs#L262

Added line #L262 was not covered by tests
} else {
File::create(&temp)?
};

let mut encoder = flate2::write::GzEncoder::new(file, Compression::default());
encoder.write_all(serialized.as_bytes())?;
} else {
// Write to temporary log file first, to prevent loss due to crashes.
std::fs::write(&temp, serialized)
.context("Failed to write temp file while saving state.")?;
}

// Overwrite the original with the temp file, if everything went fine.
std::fs::rename(&temp, &real)
Expand All @@ -271,8 +286,13 @@ impl InternalState {
///
/// If the state cannot be deserialized, an empty default state will be used instead. \
/// All groups with queued tasks will be automatically paused to prevent unwanted execution.
pub fn restore_state(pueue_directory: &Path) -> Result<Option<InternalState>> {
let path = pueue_directory.join("state.json");
pub fn restore_state(settings: &Settings) -> Result<Option<InternalState>> {
let pueue_directory = settings.shared.pueue_directory();
let mut path = pueue_directory.join("state.json");

if settings.daemon.compress_status_file {
path = pueue_directory.join("state.json.gz");
}

// Ignore if the file doesn't exist. It doesn't have to.
if !path.exists() {
Expand All @@ -282,7 +302,16 @@ impl InternalState {
info!("Restoring state");

// Try to load the file.
let data = read_to_string(&path).context("State restore: Failed to read file:\n\n{}")?;
let data = if settings.daemon.compress_status_file {
let file = File::open(path)?;
let mut decoder = flate2::read::GzDecoder::new(file);
let mut data = String::new();
decoder.read_to_string(&mut data)?;

Check warning on line 309 in pueue/src/daemon/internal_state/state.rs

View check run for this annotation

Codecov / codecov/patch

pueue/src/daemon/internal_state/state.rs#L306-L309

Added lines #L306 - L309 were not covered by tests

data

Check warning on line 311 in pueue/src/daemon/internal_state/state.rs

View check run for this annotation

Codecov / codecov/patch

pueue/src/daemon/internal_state/state.rs#L311

Added line #L311 was not covered by tests
} else {
read_to_string(&path).context("State restore: Failed to read file:\n\n{}")?
};

// Try to deserialize the state file.
let state: State = serde_json::from_str(&data).context("Failed to deserialize state.")?;
Expand Down
2 changes: 1 addition & 1 deletion pueue/src/daemon/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ pub async fn run(config_path: Option<PathBuf>, profile: Option<String>, test: bo
// Restore the previous state and save any changes that might have happened during this
// process. If no previous state exists, just create a new one.
// Create a new empty state if any errors occur, but print the error message.
let state = match InternalState::restore_state(&settings.shared.pueue_directory()) {
let state = match InternalState::restore_state(&settings) {
Ok(Some(state)) => state,
Ok(None) => InternalState::new(),
Err(error) => {
Expand Down
25 changes: 21 additions & 4 deletions pueue/tests/daemon/integration/restore.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,21 @@
use pueue_lib::{network::message::TaskSelection, state::GroupStatus};
use rstest::rstest;

use crate::{helper::*, internal_prelude::*};

/// The daemon should start in the same state as before shutdown, if no tasks are queued.
/// This function tests for the running state.
#[rstest]
#[case(true)]
#[case(false)]
#[tokio::test]
async fn test_start_running() -> Result<()> {
let (settings, _tempdir) = daemon_base_setup()?;
async fn test_start_running(#[case] compress: bool) -> Result<()> {
let (mut settings, tempdir) = daemon_base_setup()?;
settings.daemon.compress_status_file = compress;
settings
.save(&Some(tempdir.path().join("pueue.yml")))
.context("Couldn't write pueue config to temporary directory")?;

let mut child = standalone_daemon(&settings.shared).await?;
let shared = &settings.shared;

Expand All @@ -31,9 +40,17 @@ async fn test_start_running() -> Result<()> {

/// The daemon should start in the same state as before shutdown, if no tasks are queued.
/// This function tests for the paused state.
#[rstest]
#[case(true)]
#[case(false)]
#[tokio::test]
async fn test_start_paused() -> Result<()> {
let (settings, _tempdir) = daemon_base_setup()?;
async fn test_start_paused(#[case] compress: bool) -> Result<()> {
let (mut settings, tempdir) = daemon_base_setup()?;
settings.daemon.compress_status_file = compress;
settings
.save(&Some(tempdir.path().join("pueue.yml")))
.context("Couldn't write pueue config to temporary directory")?;

let mut child = standalone_daemon(&settings.shared).await?;
let shared = &settings.shared;

Expand Down
14 changes: 8 additions & 6 deletions pueue/tests/daemon/state_backward_compatibility.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use pueue::daemon::internal_state::state::InternalState;
use pueue_lib::settings::Settings;
use tempfile::TempDir;

use crate::internal_prelude::*;
use crate::{helper::enable_logger, internal_prelude::*};

/// 4.0.0 introduced numerous breaking changes.
/// From here on, we now aim to once again have full backward compatibility.
Expand All @@ -17,22 +17,24 @@ use crate::internal_prelude::*;
/// This should be handled as well.
#[test]
fn test_restore_from_old_state() -> Result<()> {
enable_logger();
better_panic::install();
let old_state = include_str!("data/v4.0.0_state.json");

let temp_dir = TempDir::new()?;
let temp_path = temp_dir.path();
let temp_path = temp_dir.path().to_path_buf();

// Open new file and write old state to it.
let temp_state_path = temp_dir.path().join("state.json");
let temp_state_path = temp_path.join("state.json");
let mut file = File::create(temp_state_path)?;
file.write_all(old_state.as_bytes())?;

let mut settings = Settings::default();
settings.shared.pueue_directory = Some(temp_path.to_path_buf());
settings.shared.pueue_directory = Some(temp_path);
debug!("{settings:#?}");

let state = InternalState::restore_state(&settings.shared.pueue_directory())
.context("Failed to restore state in test")?;
let state =
InternalState::restore_state(&settings).context("Failed to restore state in test")?;

assert!(state.is_some());

Expand Down
1 change: 1 addition & 0 deletions pueue/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ mod internal_prelude {
eyre::{bail, eyre, WrapErr},
Result,
};
pub use tracing::debug;
}

#[cfg(unix)]
Expand Down
9 changes: 9 additions & 0 deletions pueue_lib/src/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,14 @@ pub struct Daemon {
/// Whether the daemon (and all groups) should be paused as soon as a single task fails
#[serde(default = "Default::default")]
pub pause_all_on_failure: bool,
/// If this is set to `true`, the status file will be compressed, which usually results in a
/// significantly smaller file. This is particularily useful for I/O starved or embedded
/// environments, as it trades a bit of CPU power for I/O ops.
///
/// The state tends to be quite large for many tasks, as the whole environment is copied every
/// time. You can expect a ~10 compression ratio.
#[serde(default = "Default::default")]
pub compress_status_file: bool,
/// The callback that's called whenever a task finishes.
pub callback: Option<String>,
/// Environment variables that can be will be injected into all executed processes.
Expand Down Expand Up @@ -210,6 +218,7 @@ impl Default for Daemon {
pause_all_on_failure: false,
callback: None,
callback_log_lines: default_callback_log_lines(),
compress_status_file: false,
shell_command: None,
env_vars: HashMap::new(),
}
Expand Down
2 changes: 1 addition & 1 deletion pueue_lib/tests/settings_backward_compatibility.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use pueue_lib::settings::Settings;
/// On top of simply having old settings, I also removed a few default fields.
/// This should be handled as well.
#[test]
fn test_restore_from_old_state() -> Result<()> {
fn test_restore_from_old_settings() -> Result<()> {
better_panic::install();
let old_settings_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("tests")
Expand Down

0 comments on commit 5846ab0

Please sign in to comment.