Skip to content

Commit

Permalink
Merge pull request #709 from rustic-rs/refactor-warmup
Browse files Browse the repository at this point in the history
move warm_up into rustic_core
  • Loading branch information
aawsome authored Jun 25, 2023
2 parents a146fd9 + b20f2ee commit 5cfc0d0
Show file tree
Hide file tree
Showing 9 changed files with 125 additions and 119 deletions.
2 changes: 2 additions & 0 deletions crates/rustic_core/src/commands/prune.rs
Original file line number Diff line number Diff line change
Expand Up @@ -745,6 +745,8 @@ impl PrunePlan {
repo: &OpenRepository<P>,
opts: &PruneOpts,
) -> RusticResult<()> {
repo.warm_up_wait(self.repack_packs().into_iter())?;

let be = &repo.dbe;
let pb = &repo.pb;

Expand Down
2 changes: 2 additions & 0 deletions crates/rustic_core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,8 @@ pub enum RepositoryErrorKind {
AccessToConfigFileFailed,
/// {0:?}
FromNomError(nom::Err<()>),
/// {0:?}
FromThreadPoolbilderError(rayon::ThreadPoolBuildError),
/// reading Password failed: `{0:?}`
ReadingPasswordFromReaderFailed(std::io::Error),
/// reading Password from prompt failed: `{0:?}`
Expand Down
2 changes: 1 addition & 1 deletion crates/rustic_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,5 +146,5 @@ pub use crate::{
},
RepoFile,
},
repository::{parse_command, OpenRepository, Repository, RepositoryOptions},
repository::{OpenRepository, Repository, RepositoryOptions},
};
13 changes: 12 additions & 1 deletion crates/rustic_core/src/repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,17 @@ use crate::{
crypto::aespoly1305::Key,
error::RepositoryErrorKind,
repofile::{configfile::ConfigFile, keyfile::find_key_in_backend},
BlobType, IndexBackend, NoProgressBars, ProgressBars, PruneOpts, PrunePlan, RusticResult,
BlobType, Id, IndexBackend, NoProgressBars, ProgressBars, PruneOpts, PrunePlan, RusticResult,
SnapshotFile, SnapshotGroup, SnapshotGroupCriterion,
};

pub(super) mod constants {
pub(super) const MAX_PASSWORD_RETRIES: usize = 5;
}

mod warm_up;
use warm_up::{warm_up, warm_up_wait};

#[serde_as]
#[cfg_attr(feature = "clap", derive(clap::Parser))]
#[cfg_attr(feature = "merge", derive(merge::Merge))]
Expand Down Expand Up @@ -402,6 +405,14 @@ impl<P: ProgressBars> OpenRepository<P> {
pub fn infos_index(&self) -> RusticResult<IndexInfos> {
commands::repoinfo::collect_index_infos(self)
}

pub fn warm_up(&self, packs: impl ExactSizeIterator<Item = Id>) -> RusticResult<()> {
warm_up(self, packs)
}

pub fn warm_up_wait(&self, packs: impl ExactSizeIterator<Item = Id>) -> RusticResult<()> {
warm_up_wait(self, packs)
}
}

#[derive(Debug)]
Expand Down
93 changes: 93 additions & 0 deletions crates/rustic_core/src/repository/warm_up.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
use std::process::Command;
use std::thread::sleep;

use log::{debug, warn};
use rayon::ThreadPoolBuilder;

use super::parse_command;
use crate::{
error::RepositoryErrorKind, FileType, Id, OpenRepository, Progress, ProgressBars, ReadBackend,
RusticResult,
};

pub(super) mod constants {
pub(super) const MAX_READER_THREADS_NUM: usize = 20;
}

pub(crate) fn warm_up_wait<P: ProgressBars>(
repo: &OpenRepository<P>,
packs: impl ExactSizeIterator<Item = Id>,
) -> RusticResult<()> {
warm_up(repo, packs)?;
if let Some(wait) = repo.opts.warm_up_wait {
let p = repo.pb.progress_spinner(format!("waiting {wait}..."));
sleep(*wait);
p.finish();
}
Ok(())
}

pub(crate) fn warm_up<P: ProgressBars>(
repo: &OpenRepository<P>,
packs: impl ExactSizeIterator<Item = Id>,
) -> RusticResult<()> {
if let Some(command) = &repo.opts.warm_up_command {
warm_up_command(packs, command, &repo.pb)?;
} else if repo.opts.warm_up {
warm_up_access(repo, packs)?;
}
Ok(())
}

fn warm_up_command<P: ProgressBars>(
packs: impl ExactSizeIterator<Item = Id>,
command: &str,
pb: &P,
) -> RusticResult<()> {
let p = pb.progress_counter("warming up packs...");
p.set_length(packs.len() as u64);
for pack in packs {
let actual_command = command.replace("%id", &pack.to_hex());
debug!("calling {actual_command}...");
let commands = parse_command::<()>(&actual_command)
.map_err(RepositoryErrorKind::FromNomError)?
.1;
let status = Command::new(commands[0]).args(&commands[1..]).status()?;
if !status.success() {
warn!("warm-up command was not successful for pack {pack:?}. {status}");
}
}
p.finish();
Ok(())
}

fn warm_up_access<P: ProgressBars>(
repo: &OpenRepository<P>,
packs: impl ExactSizeIterator<Item = Id>,
) -> RusticResult<()> {
let mut be = repo.be.clone();
be.set_option("retry", "false")?;

let p = repo.pb.progress_counter("warming up packs...");
p.set_length(packs.len() as u64);

let pool = ThreadPoolBuilder::new()
.num_threads(constants::MAX_READER_THREADS_NUM)
.build()
.map_err(RepositoryErrorKind::FromThreadPoolbilderError)?;
let p = &p;
let be = &be;
pool.in_place_scope(|s| {
for pack in packs {
s.spawn(move |_| {
// ignore errors as they are expected from the warm-up
_ = be.read_partial(FileType::Pack, &pack, false, 0, 1);
p.inc(1);
});
}
});

p.finish();

Ok(())
}
16 changes: 3 additions & 13 deletions src/commands/prune.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ use log::debug;

use anyhow::Result;

use crate::helpers::warm_up_wait;

use rustic_core::{PruneOpts, PruneStats, Sum};

/// `prune` subcommand
Expand All @@ -37,23 +35,15 @@ impl Runnable for PruneCmd {
impl PruneCmd {
fn inner_run(&self) -> Result<()> {
let config = RUSTIC_APP.config();
let progress_options = &config.global.progress_options;

let repo = open_repository(get_repository(&config));

let pruner = repo.prune_plan(&self.opts)?;

print_stats(&pruner.stats);

let dry_run = config.global.dry_run;
warm_up_wait(
&repo,
pruner.repack_packs().into_iter(),
!dry_run,
progress_options,
)?;

if !dry_run {
if config.global.dry_run {
repo.warm_up(pruner.repack_packs().into_iter())?;
} else {
pruner.do_prune(&repo, &self.opts)?;
}

Expand Down
9 changes: 1 addition & 8 deletions src/commands/repair.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ use rustic_core::{
ProgressBars, ReadBackend, ReadIndex, SnapshotFile, StringList, Tree, WriteBackend,
};

use crate::helpers::warm_up_wait;

/// `repair` subcommand
#[derive(clap::Parser, Command, Debug)]
pub(crate) struct RepairCmd {
Expand Down Expand Up @@ -185,12 +183,7 @@ impl IndexSubCmd {
// process packs which are listed but not contained in the index
pack_read_header.extend(packs.into_iter().map(|(id, size)| (id, false, None, size)));

warm_up_wait(
&repo,
pack_read_header.iter().map(|(id, _, _, _)| *id),
true,
progress_options,
)?;
repo.warm_up_wait(pack_read_header.iter().map(|(id, _, _, _)| *id))?;

let indexer = Indexer::new(be.clone()).into_shared();
let p = progress_options.progress_counter("reading pack headers");
Expand Down
15 changes: 5 additions & 10 deletions src/commands/restore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use rustic_core::{
TreeStreamerOptions,
};

use crate::{filtering::SnapshotFilter, helpers::warm_up_wait};
use crate::filtering::SnapshotFilter;

pub(crate) mod constants {
pub(crate) const MAX_READER_THREADS_NUM: usize = 20;
Expand Down Expand Up @@ -134,16 +134,11 @@ impl RestoreCmd {

if file_infos.restore_size == 0 {
info!("all file contents are fine.");
} else if config.global.dry_run {
repo.warm_up(file_infos.to_packs().into_iter())?;
} else {
warm_up_wait(
&repo,
file_infos.to_packs().into_iter(),
!config.global.dry_run,
progress_options,
)?;
if !config.global.dry_run {
restore_contents(be, &dest, file_infos)?;
}
repo.warm_up_wait(file_infos.to_packs().into_iter())?;
restore_contents(be, &dest, file_infos)?;
}

if !config.global.dry_run {
Expand Down
92 changes: 6 additions & 86 deletions src/helpers.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::BTreeSet, process::Command};
use std::collections::BTreeSet;

use abscissa_core::Application;
use anyhow::Result;
Expand All @@ -7,95 +7,15 @@ use comfy_table::{
presets::ASCII_MARKDOWN, Attribute, Cell, CellAlignment, ContentArrangement, Table,
};

use log::{debug, info, trace, warn};
use rayon::{
prelude::{IntoParallelRefIterator, ParallelBridge, ParallelIterator},
ThreadPoolBuilder,
};
use log::{info, trace};
use rayon::prelude::{IntoParallelRefIterator, ParallelBridge, ParallelIterator};

use rustic_core::{
parse_command, BlobType, DecryptWriteBackend, FileType, Id, IndexBackend, IndexedBackend,
Indexer, NodeType, OpenRepository, Packer, Progress, ProgressBars, ReadBackend, ReadIndex,
SnapshotFile, TreeStreamerOnce,
BlobType, DecryptWriteBackend, IndexBackend, IndexedBackend, Indexer, NodeType, OpenRepository,
Packer, Progress, ProgressBars, ReadIndex, SnapshotFile, TreeStreamerOnce,
};

use crate::{application::RUSTIC_APP, config::progress_options::ProgressOptions};

pub(super) mod constants {
pub(super) const MAX_READER_THREADS_NUM: usize = 20;
}

pub(crate) fn warm_up_wait<P>(
repo: &OpenRepository<P>,
packs: impl ExactSizeIterator<Item = Id>,
wait: bool,
progress_options: &ProgressOptions,
) -> Result<()> {
if let Some(command) = &repo.opts.warm_up_command {
warm_up_command(packs, command, progress_options)?;
} else if repo.opts.warm_up {
warm_up(&repo.be, packs, progress_options)?;
}
if wait {
if let Some(wait) = repo.opts.warm_up_wait {
let p = progress_options.progress_spinner(format!("waiting {wait}..."));
std::thread::sleep(*wait);
p.finish();
}
}
Ok(())
}

pub(crate) fn warm_up_command(
packs: impl ExactSizeIterator<Item = Id>,
command: &str,
progress_options: &ProgressOptions,
) -> Result<()> {
let p = progress_options.progress_counter("warming up packs...");
p.set_length(packs.len() as u64);
for pack in packs {
let actual_command = command.replace("%id", &pack.to_hex());
debug!("calling {actual_command}...");
let commands = parse_command::<()>(&actual_command)?.1;
let status = Command::new(commands[0]).args(&commands[1..]).status()?;
if !status.success() {
warn!("warm-up command was not successful for pack {pack:?}. {status}");
}
}
p.finish();
Ok(())
}

pub(crate) fn warm_up(
be: &impl ReadBackend,
packs: impl ExactSizeIterator<Item = Id>,
progress_options: &ProgressOptions,
) -> Result<()> {
let mut be = be.clone();
be.set_option("retry", "false")?;

let p = progress_options.progress_counter("warming up packs...");
p.set_length(packs.len() as u64);

let pool = ThreadPoolBuilder::new()
.num_threads(constants::MAX_READER_THREADS_NUM)
.build()?;
let p = &p;
let be = &be;
pool.in_place_scope(|s| {
for pack in packs {
s.spawn(move |_| {
// ignore errors as they are expected from the warm-up
_ = be.read_partial(FileType::Pack, &pack, false, 0, 1);
p.inc(1);
});
}
});

p.finish();

Ok(())
}
use crate::application::RUSTIC_APP;

pub(crate) fn copy<P>(
snapshots: &[SnapshotFile],
Expand Down

0 comments on commit 5cfc0d0

Please sign in to comment.