Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move warm_up into rustic_core #709

Merged
merged 1 commit into from
Jun 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions crates/rustic_core/src/commands/prune.rs
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,8 @@ impl PrunePlan {
repo: &OpenRepository<P>,
opts: &PruneOpts,
) -> RusticResult<()> {
repo.warm_up_wait(self.repack_packs().into_iter())?;

let be = &repo.dbe;
let pb = &repo.pb;

Expand Down
2 changes: 2 additions & 0 deletions crates/rustic_core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,8 @@ pub enum RepositoryErrorKind {
AccessToConfigFileFailed,
/// {0:?}
FromNomError(nom::Err<()>),
/// {0:?}
FromThreadPoolbilderError(rayon::ThreadPoolBuildError),
/// reading Password failed: `{0:?}`
ReadingPasswordFromReaderFailed(std::io::Error),
/// reading Password from prompt failed: `{0:?}`
Expand Down
2 changes: 1 addition & 1 deletion crates/rustic_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,5 +146,5 @@ pub use crate::{
},
RepoFile,
},
repository::{parse_command, OpenRepository, Repository, RepositoryOptions},
repository::{OpenRepository, Repository, RepositoryOptions},
};
13 changes: 12 additions & 1 deletion crates/rustic_core/src/repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,17 @@ use crate::{
crypto::aespoly1305::Key,
error::RepositoryErrorKind,
repofile::{configfile::ConfigFile, keyfile::find_key_in_backend},
BlobType, IndexBackend, NoProgressBars, ProgressBars, PruneOpts, PrunePlan, RusticResult,
BlobType, Id, IndexBackend, NoProgressBars, ProgressBars, PruneOpts, PrunePlan, RusticResult,
SnapshotFile, SnapshotGroup, SnapshotGroupCriterion,
};

pub(super) mod constants {
pub(super) const MAX_PASSWORD_RETRIES: usize = 5;
}

mod warm_up;
use warm_up::{warm_up, warm_up_wait};

#[serde_as]
#[cfg_attr(feature = "clap", derive(clap::Parser))]
#[cfg_attr(feature = "merge", derive(merge::Merge))]
Expand Down Expand Up @@ -402,6 +405,14 @@ impl<P: ProgressBars> OpenRepository<P> {
pub fn infos_index(&self) -> RusticResult<IndexInfos> {
commands::repoinfo::collect_index_infos(self)
}

pub fn warm_up(&self, packs: impl ExactSizeIterator<Item = Id>) -> RusticResult<()> {
warm_up(self, packs)
}

pub fn warm_up_wait(&self, packs: impl ExactSizeIterator<Item = Id>) -> RusticResult<()> {
warm_up_wait(self, packs)
}
}

#[derive(Debug)]
Expand Down
93 changes: 93 additions & 0 deletions crates/rustic_core/src/repository/warm_up.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
use std::process::Command;
use std::thread::sleep;

use log::{debug, warn};
use rayon::ThreadPoolBuilder;

use super::parse_command;
use crate::{
error::RepositoryErrorKind, FileType, Id, OpenRepository, Progress, ProgressBars, ReadBackend,
RusticResult,
};

pub(super) mod constants {
pub(super) const MAX_READER_THREADS_NUM: usize = 20;
}

pub(crate) fn warm_up_wait<P: ProgressBars>(
repo: &OpenRepository<P>,
packs: impl ExactSizeIterator<Item = Id>,
) -> RusticResult<()> {
warm_up(repo, packs)?;
if let Some(wait) = repo.opts.warm_up_wait {
let p = repo.pb.progress_spinner(format!("waiting {wait}..."));
sleep(*wait);
p.finish();
}
Ok(())
}

pub(crate) fn warm_up<P: ProgressBars>(
repo: &OpenRepository<P>,
packs: impl ExactSizeIterator<Item = Id>,
) -> RusticResult<()> {
if let Some(command) = &repo.opts.warm_up_command {
warm_up_command(packs, command, &repo.pb)?;
} else if repo.opts.warm_up {
warm_up_access(repo, packs)?;
}
Ok(())
}

fn warm_up_command<P: ProgressBars>(
packs: impl ExactSizeIterator<Item = Id>,
command: &str,
pb: &P,
) -> RusticResult<()> {
let p = pb.progress_counter("warming up packs...");
p.set_length(packs.len() as u64);
for pack in packs {
let actual_command = command.replace("%id", &pack.to_hex());
debug!("calling {actual_command}...");
let commands = parse_command::<()>(&actual_command)
.map_err(RepositoryErrorKind::FromNomError)?
.1;
let status = Command::new(commands[0]).args(&commands[1..]).status()?;
if !status.success() {
warn!("warm-up command was not successful for pack {pack:?}. {status}");
}
}
p.finish();
Ok(())
}

fn warm_up_access<P: ProgressBars>(
repo: &OpenRepository<P>,
packs: impl ExactSizeIterator<Item = Id>,
) -> RusticResult<()> {
let mut be = repo.be.clone();
be.set_option("retry", "false")?;

let p = repo.pb.progress_counter("warming up packs...");
p.set_length(packs.len() as u64);

let pool = ThreadPoolBuilder::new()
.num_threads(constants::MAX_READER_THREADS_NUM)
.build()
.map_err(RepositoryErrorKind::FromThreadPoolbilderError)?;
let p = &p;
let be = &be;
pool.in_place_scope(|s| {
for pack in packs {
s.spawn(move |_| {
// ignore errors as they are expected from the warm-up
_ = be.read_partial(FileType::Pack, &pack, false, 0, 1);
p.inc(1);
});
}
});

p.finish();

Ok(())
}
16 changes: 3 additions & 13 deletions src/commands/prune.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ use log::debug;

use anyhow::Result;

use crate::helpers::warm_up_wait;

use rustic_core::{PruneOpts, PruneStats, Sum};

/// `prune` subcommand
Expand All @@ -37,23 +35,15 @@ impl Runnable for PruneCmd {
impl PruneCmd {
fn inner_run(&self) -> Result<()> {
let config = RUSTIC_APP.config();
let progress_options = &config.global.progress_options;

let repo = open_repository(get_repository(&config));

let pruner = repo.prune_plan(&self.opts)?;

print_stats(&pruner.stats);

let dry_run = config.global.dry_run;
warm_up_wait(
&repo,
pruner.repack_packs().into_iter(),
!dry_run,
progress_options,
)?;

if !dry_run {
if config.global.dry_run {
repo.warm_up(pruner.repack_packs().into_iter())?;
} else {
pruner.do_prune(&repo, &self.opts)?;
}

Expand Down
9 changes: 1 addition & 8 deletions src/commands/repair.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ use rustic_core::{
ProgressBars, ReadBackend, ReadIndex, SnapshotFile, StringList, Tree, WriteBackend,
};

use crate::helpers::warm_up_wait;

/// `repair` subcommand
#[derive(clap::Parser, Command, Debug)]
pub(crate) struct RepairCmd {
Expand Down Expand Up @@ -185,12 +183,7 @@ impl IndexSubCmd {
// process packs which are listed but not contained in the index
pack_read_header.extend(packs.into_iter().map(|(id, size)| (id, false, None, size)));

warm_up_wait(
&repo,
pack_read_header.iter().map(|(id, _, _, _)| *id),
true,
progress_options,
)?;
repo.warm_up_wait(pack_read_header.iter().map(|(id, _, _, _)| *id))?;

let indexer = Indexer::new(be.clone()).into_shared();
let p = progress_options.progress_counter("reading pack headers");
Expand Down
15 changes: 5 additions & 10 deletions src/commands/restore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use rustic_core::{
TreeStreamerOptions,
};

use crate::{filtering::SnapshotFilter, helpers::warm_up_wait};
use crate::filtering::SnapshotFilter;

pub(crate) mod constants {
pub(crate) const MAX_READER_THREADS_NUM: usize = 20;
Expand Down Expand Up @@ -134,16 +134,11 @@ impl RestoreCmd {

if file_infos.restore_size == 0 {
info!("all file contents are fine.");
} else if config.global.dry_run {
repo.warm_up(file_infos.to_packs().into_iter())?;
} else {
warm_up_wait(
&repo,
file_infos.to_packs().into_iter(),
!config.global.dry_run,
progress_options,
)?;
if !config.global.dry_run {
restore_contents(be, &dest, file_infos)?;
}
repo.warm_up_wait(file_infos.to_packs().into_iter())?;
restore_contents(be, &dest, file_infos)?;
}

if !config.global.dry_run {
Expand Down
92 changes: 6 additions & 86 deletions src/helpers.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::BTreeSet, process::Command};
use std::collections::BTreeSet;

use abscissa_core::Application;
use anyhow::Result;
Expand All @@ -7,95 +7,15 @@ use comfy_table::{
presets::ASCII_MARKDOWN, Attribute, Cell, CellAlignment, ContentArrangement, Table,
};

use log::{debug, info, trace, warn};
use rayon::{
prelude::{IntoParallelRefIterator, ParallelBridge, ParallelIterator},
ThreadPoolBuilder,
};
use log::{info, trace};
use rayon::prelude::{IntoParallelRefIterator, ParallelBridge, ParallelIterator};

use rustic_core::{
parse_command, BlobType, DecryptWriteBackend, FileType, Id, IndexBackend, IndexedBackend,
Indexer, NodeType, OpenRepository, Packer, Progress, ProgressBars, ReadBackend, ReadIndex,
SnapshotFile, TreeStreamerOnce,
BlobType, DecryptWriteBackend, IndexBackend, IndexedBackend, Indexer, NodeType, OpenRepository,
Packer, Progress, ProgressBars, ReadIndex, SnapshotFile, TreeStreamerOnce,
};

use crate::{application::RUSTIC_APP, config::progress_options::ProgressOptions};

pub(super) mod constants {
pub(super) const MAX_READER_THREADS_NUM: usize = 20;
}

pub(crate) fn warm_up_wait<P>(
repo: &OpenRepository<P>,
packs: impl ExactSizeIterator<Item = Id>,
wait: bool,
progress_options: &ProgressOptions,
) -> Result<()> {
if let Some(command) = &repo.opts.warm_up_command {
warm_up_command(packs, command, progress_options)?;
} else if repo.opts.warm_up {
warm_up(&repo.be, packs, progress_options)?;
}
if wait {
if let Some(wait) = repo.opts.warm_up_wait {
let p = progress_options.progress_spinner(format!("waiting {wait}..."));
std::thread::sleep(*wait);
p.finish();
}
}
Ok(())
}

pub(crate) fn warm_up_command(
packs: impl ExactSizeIterator<Item = Id>,
command: &str,
progress_options: &ProgressOptions,
) -> Result<()> {
let p = progress_options.progress_counter("warming up packs...");
p.set_length(packs.len() as u64);
for pack in packs {
let actual_command = command.replace("%id", &pack.to_hex());
debug!("calling {actual_command}...");
let commands = parse_command::<()>(&actual_command)?.1;
let status = Command::new(commands[0]).args(&commands[1..]).status()?;
if !status.success() {
warn!("warm-up command was not successful for pack {pack:?}. {status}");
}
}
p.finish();
Ok(())
}

pub(crate) fn warm_up(
be: &impl ReadBackend,
packs: impl ExactSizeIterator<Item = Id>,
progress_options: &ProgressOptions,
) -> Result<()> {
let mut be = be.clone();
be.set_option("retry", "false")?;

let p = progress_options.progress_counter("warming up packs...");
p.set_length(packs.len() as u64);

let pool = ThreadPoolBuilder::new()
.num_threads(constants::MAX_READER_THREADS_NUM)
.build()?;
let p = &p;
let be = &be;
pool.in_place_scope(|s| {
for pack in packs {
s.spawn(move |_| {
// ignore errors as they are expected from the warm-up
_ = be.read_partial(FileType::Pack, &pack, false, 0, 1);
p.inc(1);
});
}
});

p.finish();

Ok(())
}
use crate::application::RUSTIC_APP;

pub(crate) fn copy<P>(
snapshots: &[SnapshotFile],
Expand Down
Loading