diff --git a/Cargo.lock b/Cargo.lock index c3161e9f7..29a193013 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -456,9 +456,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.3.5" +version = "4.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2686c4115cb0810d9a984776e197823d08ec94f176549a89a9efded477c456dc" +checksum = "d9394150f5b4273a1763355bd1c2ec54cc5a2593f790587bcd6b2c947cfa9211" dependencies = [ "clap_builder", "clap_derive", @@ -467,9 +467,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.3.5" +version = "4.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e53afce1efce6ed1f633cf0e57612fe51db54a1ee4fd8f8503d078fe02d69ae" +checksum = "9a78fbdd3cc2914ddf37ba444114bc7765bbdcb55ec9cbe6fa054f0137400717" dependencies = [ "anstream", "anstyle", diff --git a/changelog/new.txt b/changelog/new.txt index 4ae7fa3fb..e50807a37 100644 --- a/changelog/new.txt +++ b/changelog/new.txt @@ -11,4 +11,5 @@ New features: - restore: Files are now allocated just before being first processed. This allows easier resumed restores. - New option: `no-require-git` for backup - if enabled, a git repository is not required to apply `git-ignore` rule. - fix: wait for password-command to successfully exit, allowing to input something into the command, and read password from stdout. -- Creation of new keys now enforces confirmation of entered key. This helps to prevent mistype of passwords during the initial entry \ No newline at end of file +- repoinfo: Added new options --json, --only-files, --only-index +- Creation of new keys now enforces confirmation of entered key. This helps to prevent mistype of passwords during the initial entry diff --git a/crates/rustic_core/examples/prune.rs b/crates/rustic_core/examples/prune.rs new file mode 100644 index 000000000..7100e2725 --- /dev/null +++ b/crates/rustic_core/examples/prune.rs @@ -0,0 +1,21 @@ +//! `prune` example +use rustic_core::{PruneOpts, Repository, RepositoryOptions}; +use simplelog::{Config, LevelFilter, SimpleLogger}; + +fn main() { + // Display info logs + let _ = SimpleLogger::init(LevelFilter::Info, Config::default()); + + // Open repository + let mut repo_opts = RepositoryOptions::default(); + repo_opts.repository = Some("/tmp/repo".to_string()); + repo_opts.password = Some("test".to_string()); + let repo = Repository::new(&repo_opts).unwrap().open().unwrap(); + + let prune_opts = PruneOpts::default(); + let prune_plan = repo.prune_plan(&prune_opts).unwrap(); + println!("{:?}", prune_plan.stats); + println!("to repack: {:?}", prune_plan.repack_packs()); + // to run the plan uncomment this line: + // prune_plan.do_prune(&repo, &prune_opts).unwrap(); +} diff --git a/crates/rustic_core/src/backend.rs b/crates/rustic_core/src/backend.rs index 03a674ab8..3a3a5d9da 100644 --- a/crates/rustic_core/src/backend.rs +++ b/crates/rustic_core/src/backend.rs @@ -15,6 +15,7 @@ use std::{io::Read, path::PathBuf}; use bytes::Bytes; use displaydoc::Display; use log::trace; +use serde::{Deserialize, Serialize}; use crate::{backend::node::Node, error::BackendErrorKind, id::Id, RusticResult}; @@ -27,17 +28,22 @@ pub const ALL_FILE_TYPES: [FileType; 4] = [ ]; /// Type for describing the kind of a file that can occur. -#[derive(Clone, Copy, Debug, PartialEq, Eq, Display)] +#[derive(Clone, Copy, Debug, PartialEq, Eq, Display, Serialize, Deserialize)] pub enum FileType { /// config + #[serde(rename = "config")] Config, /// index + #[serde(rename = "index")] Index, /// keys + #[serde(rename = "key")] Key, /// snapshots + #[serde(rename = "snapshot")] Snapshot, /// data + #[serde(rename = "pack")] Pack, } diff --git a/crates/rustic_core/src/commands.rs b/crates/rustic_core/src/commands.rs index a91af1f86..100d85284 100644 --- a/crates/rustic_core/src/commands.rs +++ b/crates/rustic_core/src/commands.rs @@ -1,2 +1,5 @@ pub mod cat; pub mod check; +pub mod prune; +pub mod repoinfo; +pub mod snapshots; diff --git a/crates/rustic_core/src/commands/prune.rs b/crates/rustic_core/src/commands/prune.rs new file mode 100644 index 000000000..3e22f2391 --- /dev/null +++ b/crates/rustic_core/src/commands/prune.rs @@ -0,0 +1,1088 @@ +//! `prune` subcommand + +/// App-local prelude includes `app_reader()`/`app_writer()`/`app_config()` +/// accessors along with logging macros. Customize as you see fit. +use log::info; + +use std::{ + cmp::Ordering, + collections::{HashMap, HashSet}, + str::FromStr, + sync::{Arc, Mutex}, +}; + +use bytesize::ByteSize; +use chrono::{DateTime, Duration, Local}; + +use derive_more::Add; +use itertools::Itertools; +use rayon::prelude::{IntoParallelIterator, ParallelIterator}; + +use crate::{ + error::CommandErrorKind, BlobType, BlobTypeMap, DecryptReadBackend, DecryptWriteBackend, + FileType, HeaderEntry, Id, IndexBackend, IndexBlob, IndexCollector, IndexFile, IndexPack, + IndexType, IndexedBackend, Indexer, Initialize, NodeType, OpenRepository, PackSizer, Progress, + ProgressBars, ReadBackend, ReadIndex, Repacker, RusticResult, SnapshotFile, Sum, + TreeStreamerOnce, +}; + +pub(super) mod constants { + pub(super) const MIN_INDEX_LEN: usize = 10_000; +} + +/// `prune` subcommand +#[allow(clippy::struct_excessive_bools)] +#[cfg_attr(feature = "clap", derive(clap::Parser))] +#[derive(Debug, Clone)] +#[cfg_attr(feature = "clap", group(id = "prune_opts"))] +pub struct PruneOpts { + /// Define maximum data to repack in % of reposize or as size (e.g. '5b', '2 kB', '3M', '4TiB') or 'unlimited' + #[cfg_attr( + feature = "clap", + clap(long, value_name = "LIMIT", default_value = "unlimited") + )] + pub max_repack: LimitOption, + + /// Tolerate limit of unused data in % of reposize after pruning or as size (e.g. '5b', '2 kB', '3M', '4TiB') or 'unlimited' + #[cfg_attr( + feature = "clap", + clap(long, value_name = "LIMIT", default_value = "5%") + )] + pub max_unused: LimitOption, + + /// Minimum duration (e.g. 90d) to keep packs before repacking or removing. More recently created + /// packs won't be repacked or marked for deletion within this prune run. + #[cfg_attr( + feature = "clap", + clap(long, value_name = "DURATION", default_value = "0d") + )] + pub keep_pack: humantime::Duration, + + /// Minimum duration (e.g. 10m) to keep packs marked for deletion. More recently marked packs won't be + /// deleted within this prune run. + #[cfg_attr( + feature = "clap", + clap(long, value_name = "DURATION", default_value = "23h") + )] + pub keep_delete: humantime::Duration, + + /// Delete files immediately instead of marking them. This also removes all files already marked for deletion. + /// WARNING: Only use if you are sure the repository is not accessed by parallel processes! + #[cfg_attr(feature = "clap", clap(long))] + pub instant_delete: bool, + + /// Simply copy blobs when repacking instead of decrypting; possibly compressing; encrypting + #[cfg_attr(feature = "clap", clap(long))] + pub fast_repack: bool, + + /// Repack packs containing uncompressed blobs. This cannot be used with --fast-repack. + /// Implies --max-unused=0. + #[cfg_attr(feature = "clap", clap(long, conflicts_with = "fast_repack"))] + pub repack_uncompressed: bool, + + /// Repack all packs. Implies --max-unused=0. + #[cfg_attr(feature = "clap", clap(long))] + pub repack_all: bool, + + /// Only repack packs which are cacheable [default: true for a hot/cold repository, else false] + #[cfg_attr(feature = "clap", clap(long, value_name = "TRUE/FALSE"))] + pub repack_cacheable_only: Option, + + /// Do not repack packs which only needs to be resized + #[cfg_attr(feature = "clap", clap(long))] + pub no_resize: bool, + + #[cfg_attr(feature = "clap", clap(skip))] + pub ignore_snaps: Vec, +} + +impl Default for PruneOpts { + fn default() -> Self { + Self { + max_repack: LimitOption::Unlimited, + max_unused: LimitOption::Percentage(5), + keep_pack: std::time::Duration::from_secs(0).into(), + keep_delete: std::time::Duration::from_secs(82800).into(), // = 23h + instant_delete: false, + fast_repack: false, + repack_uncompressed: false, + repack_all: false, + repack_cacheable_only: None, + no_resize: false, + ignore_snaps: Vec::new(), + } + } +} + +impl PruneOpts { + pub fn get_plan(&self, repo: &OpenRepository

) -> RusticResult { + let pb = &repo.pb; + let be = &repo.dbe; + + if repo.config.version < 2 && self.repack_uncompressed { + return Err(CommandErrorKind::RepackUncompressedRepoV1.into()); + } + + let mut index_files = Vec::new(); + + let p = pb.progress_counter("reading index..."); + let mut index_collector = IndexCollector::new(IndexType::OnlyTrees); + + for index in be.stream_all::(&p)? { + let (id, index) = index?; + index_collector.extend(index.packs.clone()); + // we add the trees from packs_to_delete to the index such that searching for + // used blobs doesn't abort if they are already marked for deletion + index_collector.extend(index.packs_to_delete.clone()); + + index_files.push((id, index)); + } + p.finish(); + + let (used_ids, total_size) = { + let index = index_collector.into_index(); + let total_size = BlobTypeMap::init(|blob_type| index.total_size(blob_type)); + let indexed_be = IndexBackend::new_from_index(&be.clone(), index); + let used_ids = find_used_blobs(&indexed_be, &self.ignore_snaps, pb)?; + (used_ids, total_size) + }; + + // list existing pack files + let p = pb.progress_spinner("getting packs from repository..."); + let existing_packs: HashMap<_, _> = + be.list_with_size(FileType::Pack)?.into_iter().collect(); + p.finish(); + + let mut pruner = PrunePlan::new(used_ids, existing_packs, index_files); + pruner.count_used_blobs(); + pruner.check()?; + let repack_cacheable_only = self + .repack_cacheable_only + .unwrap_or_else(|| repo.config.is_hot == Some(true)); + let pack_sizer = + total_size.map(|tpe, size| PackSizer::from_config(&repo.config, tpe, size)); + pruner.decide_packs( + Duration::from_std(*self.keep_pack).map_err(CommandErrorKind::FromOutOfRangeError)?, + Duration::from_std(*self.keep_delete).map_err(CommandErrorKind::FromOutOfRangeError)?, + repack_cacheable_only, + self.repack_uncompressed, + self.repack_all, + &pack_sizer, + )?; + pruner.decide_repack( + &self.max_repack, + &self.max_unused, + self.repack_uncompressed || self.repack_all, + self.no_resize, + &pack_sizer, + ); + pruner.check_existing_packs()?; + pruner.filter_index_files(self.instant_delete); + + Ok(pruner) + } +} + +#[derive(Clone, Copy, Debug)] +pub enum LimitOption { + Size(ByteSize), + Percentage(u64), + Unlimited, +} + +impl FromStr for LimitOption { + type Err = CommandErrorKind; + fn from_str(s: &str) -> Result { + Ok(match s.chars().last().unwrap_or('0') { + '%' => Self::Percentage({ + let mut copy = s.to_string(); + _ = copy.pop(); + copy.parse()? + }), + 'd' if s == "unlimited" => Self::Unlimited, + _ => Self::Size(ByteSize::from_str(s).map_err(CommandErrorKind::FromByteSizeParser)?), + }) + } +} + +#[derive(Default, Debug, Clone, Copy)] +pub struct DeleteStats { + pub remove: u64, + pub recover: u64, + pub keep: u64, +} + +impl DeleteStats { + pub const fn total(&self) -> u64 { + self.remove + self.recover + self.keep + } +} +#[derive(Debug, Default, Clone, Copy)] +pub struct PackStats { + pub used: u64, + pub partly_used: u64, + pub unused: u64, // this equals to packs-to-remove + pub repack: u64, + pub keep: u64, +} +#[derive(Debug, Default, Clone, Copy, Add)] +pub struct SizeStats { + pub used: u64, + pub unused: u64, + pub remove: u64, + pub repack: u64, + pub repackrm: u64, +} + +impl SizeStats { + pub const fn total(&self) -> u64 { + self.used + self.unused + } + pub const fn total_after_prune(&self) -> u64 { + self.used + self.unused_after_prune() + } + pub const fn unused_after_prune(&self) -> u64 { + self.unused - self.remove - self.repackrm + } +} + +#[derive(Default, Debug)] +pub struct PruneStats { + pub packs_to_delete: DeleteStats, + pub size_to_delete: DeleteStats, + pub packs: PackStats, + pub blobs: BlobTypeMap, + pub size: BlobTypeMap, + pub packs_unref: u64, + pub size_unref: u64, + pub index_files: u64, + pub index_files_rebuild: u64, +} + +#[derive(Debug)] +struct PruneIndex { + id: Id, + modified: bool, + packs: Vec, +} + +impl PruneIndex { + fn len(&self) -> usize { + self.packs.iter().map(|p| p.blobs.len()).sum() + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum PackToDo { + Undecided, + Keep, + Repack, + MarkDelete, + KeepMarked, + Recover, + Delete, +} + +#[derive(Debug)] +struct PrunePack { + id: Id, + blob_type: BlobType, + size: u32, + delete_mark: bool, + to_do: PackToDo, + time: Option>, + blobs: Vec, +} + +impl PrunePack { + fn from_index_pack(p: IndexPack, delete_mark: bool) -> Self { + Self { + id: p.id, + blob_type: p.blob_type(), + size: p.pack_size(), + delete_mark, + to_do: PackToDo::Undecided, + time: p.time, + blobs: p.blobs, + } + } + + fn from_index_pack_unmarked(p: IndexPack) -> Self { + Self::from_index_pack(p, false) + } + + fn from_index_pack_marked(p: IndexPack) -> Self { + Self::from_index_pack(p, true) + } + + fn into_index_pack(self) -> IndexPack { + IndexPack { + id: self.id, + time: self.time, + size: None, + blobs: self.blobs, + } + } + + fn into_index_pack_with_time(self, time: DateTime) -> IndexPack { + IndexPack { + id: self.id, + time: Some(time), + size: None, + blobs: self.blobs, + } + } + + fn set_todo(&mut self, todo: PackToDo, pi: &PackInfo, stats: &mut PruneStats) { + let tpe = self.blob_type; + match todo { + PackToDo::Undecided => panic!("not possible"), + PackToDo::Keep => { + stats.blobs[tpe].used += u64::from(pi.used_blobs); + stats.blobs[tpe].unused += u64::from(pi.unused_blobs); + stats.size[tpe].used += u64::from(pi.used_size); + stats.size[tpe].unused += u64::from(pi.unused_size); + stats.packs.keep += 1; + } + PackToDo::Repack => { + stats.blobs[tpe].used += u64::from(pi.used_blobs); + stats.blobs[tpe].unused += u64::from(pi.unused_blobs); + stats.size[tpe].used += u64::from(pi.used_size); + stats.size[tpe].unused += u64::from(pi.unused_size); + stats.packs.repack += 1; + stats.blobs[tpe].repack += u64::from(pi.unused_blobs + pi.used_blobs); + stats.blobs[tpe].repackrm += u64::from(pi.unused_blobs); + stats.size[tpe].repack += u64::from(pi.unused_size + pi.used_size); + stats.size[tpe].repackrm += u64::from(pi.unused_size); + } + + PackToDo::MarkDelete => { + stats.blobs[tpe].unused += u64::from(pi.unused_blobs); + stats.size[tpe].unused += u64::from(pi.unused_size); + stats.blobs[tpe].remove += u64::from(pi.unused_blobs); + stats.size[tpe].remove += u64::from(pi.unused_size); + } + PackToDo::Recover => { + stats.packs_to_delete.recover += 1; + stats.size_to_delete.recover += u64::from(self.size); + } + PackToDo::Delete => { + stats.packs_to_delete.remove += 1; + stats.size_to_delete.remove += u64::from(self.size); + } + PackToDo::KeepMarked => { + stats.packs_to_delete.keep += 1; + stats.size_to_delete.keep += u64::from(self.size); + } + } + self.to_do = todo; + } + + fn is_compressed(&self) -> bool { + self.blobs + .iter() + .all(|blob| blob.uncompressed_length.is_some()) + } +} + +#[derive(PartialEq, Eq, Debug)] +enum RepackReason { + PartlyUsed, + ToCompress, + SizeMismatch, +} +use RepackReason::{PartlyUsed, SizeMismatch, ToCompress}; + +#[derive(Debug)] +pub struct PrunePlan { + time: DateTime, + used_ids: HashMap, + existing_packs: HashMap, + repack_candidates: Vec<(PackInfo, RepackReason, usize, usize)>, + index_files: Vec, + pub stats: PruneStats, +} + +impl PrunePlan { + fn new( + used_ids: HashMap, + existing_packs: HashMap, + index_files: Vec<(Id, IndexFile)>, + ) -> Self { + let mut processed_packs = HashSet::new(); + let mut processed_packs_delete = HashSet::new(); + let mut index_files: Vec<_> = index_files + .into_iter() + .map(|(id, index)| { + let mut modified = false; + let mut packs: Vec<_> = index + .packs + .into_iter() + // filter out duplicate packs + .filter(|p| { + let no_duplicate = processed_packs.insert(p.id); + modified |= !no_duplicate; + no_duplicate + }) + .map(PrunePack::from_index_pack_unmarked) + .collect(); + packs.extend( + index + .packs_to_delete + .into_iter() + // filter out duplicate packs + .filter(|p| { + let no_duplicate = processed_packs_delete.insert(p.id); + modified |= !no_duplicate; + no_duplicate + }) + .map(PrunePack::from_index_pack_marked), + ); + + PruneIndex { + id, + modified, + packs, + } + }) + .collect(); + + // filter out "normally" indexed packs from packs_to_delete + for index in &mut index_files { + let mut modified = false; + index.packs.retain(|p| { + !p.delete_mark || { + let duplicate = processed_packs.contains(&p.id); + modified |= duplicate; + !duplicate + } + }); + + index.modified |= modified; + } + + Self { + time: Local::now(), + used_ids, + existing_packs, + repack_candidates: Vec::new(), + index_files, + stats: PruneStats::default(), + } + } + + fn count_used_blobs(&mut self) { + for blob in self + .index_files + .iter() + .flat_map(|index| &index.packs) + .flat_map(|pack| &pack.blobs) + { + if let Some(count) = self.used_ids.get_mut(&blob.id) { + // note that duplicates are only counted up to 255. If there are more + // duplicates, the number is set to 255. This may imply that later on + // not the "best" pack is chosen to have that blob marked as used. + *count = count.saturating_add(1); + } + } + } + + fn check(&self) -> RusticResult<()> { + // check that all used blobs are present in index + for (id, count) in &self.used_ids { + if *count == 0 { + return Err(CommandErrorKind::BlobsMissing(*id).into()); + } + } + Ok(()) + } + + fn decide_packs( + &mut self, + keep_pack: Duration, + keep_delete: Duration, + repack_cacheable_only: bool, + repack_uncompressed: bool, + repack_all: bool, + pack_sizer: &BlobTypeMap, + ) -> RusticResult<()> { + // first process all marked packs then the unmarked ones: + // - first processed packs are more likely to have all blobs seen as unused + // - if marked packs have used blob but these blobs are all present in + // unmarked packs, we want to perform the deletion! + for mark_case in [true, false] { + for (index_num, index) in self.index_files.iter_mut().enumerate() { + for (pack_num, pack) in index + .packs + .iter_mut() + .enumerate() + .filter(|(_, p)| p.delete_mark == mark_case) + { + let pi = PackInfo::from_pack(pack, &mut self.used_ids); + + // Various checks to determine if packs need to be kept + let too_young = pack.time > Some(self.time - keep_pack); + let keep_uncacheable = repack_cacheable_only && !pack.blob_type.is_cacheable(); + + let to_compress = repack_uncompressed && !pack.is_compressed(); + let size_mismatch = !pack_sizer[pack.blob_type].size_ok(pack.size); + + match (pack.delete_mark, pi.used_blobs, pi.unused_blobs) { + (false, 0, _) => { + // unused pack + self.stats.packs.unused += 1; + if too_young { + // keep packs which are too young + pack.set_todo(PackToDo::Keep, &pi, &mut self.stats); + } else { + pack.set_todo(PackToDo::MarkDelete, &pi, &mut self.stats); + } + } + (false, 1.., 0) => { + // used pack + self.stats.packs.used += 1; + if too_young || keep_uncacheable { + pack.set_todo(PackToDo::Keep, &pi, &mut self.stats); + } else if to_compress || repack_all { + self.repack_candidates + .push((pi, ToCompress, index_num, pack_num)); + } else if size_mismatch { + self.repack_candidates.push(( + pi, + SizeMismatch, + index_num, + pack_num, + )); + } else { + pack.set_todo(PackToDo::Keep, &pi, &mut self.stats); + } + } + + (false, 1.., 1..) => { + // partly used pack + self.stats.packs.partly_used += 1; + + if too_young || keep_uncacheable { + // keep packs which are too young and non-cacheable packs if requested + pack.set_todo(PackToDo::Keep, &pi, &mut self.stats); + } else { + // other partly used pack => candidate for repacking + self.repack_candidates + .push((pi, PartlyUsed, index_num, pack_num)); + } + } + (true, 0, _) => { + let local_date_time = + pack.time.ok_or(CommandErrorKind::NoTimeInPacksToDelete)?; + if self.time - local_date_time >= keep_delete { + pack.set_todo(PackToDo::Delete, &pi, &mut self.stats); + } else { + pack.set_todo(PackToDo::KeepMarked, &pi, &mut self.stats); + } + } + (true, 1.., _) => { + // needed blobs; mark this pack for recovery + pack.set_todo(PackToDo::Recover, &pi, &mut self.stats); + } + } + } + } + } + Ok(()) + } + + fn decide_repack( + &mut self, + max_repack: &LimitOption, + max_unused: &LimitOption, + repack_uncompressed: bool, + no_resize: bool, + pack_sizer: &BlobTypeMap, + ) { + let max_unused = match (repack_uncompressed, max_unused) { + (true, _) => 0, + (false, LimitOption::Unlimited) => u64::MAX, + (false, LimitOption::Size(size)) => size.as_u64(), + // if percentag is given, we want to have + // unused <= p/100 * size_after = p/100 * (size_used + unused) + // which equals (1 - p/100) * unused <= p/100 * size_used + (false, LimitOption::Percentage(p)) => (p * self.stats.size.sum().used) / (100 - p), + }; + + let max_repack = match max_repack { + LimitOption::Unlimited => u64::MAX, + LimitOption::Size(size) => size.as_u64(), + LimitOption::Percentage(p) => (p * self.stats.size.sum().total()) / 100, + }; + + self.repack_candidates.sort_unstable_by_key(|rc| rc.0); + let mut resize_packs = BlobTypeMap::>::default(); + let mut do_repack = BlobTypeMap::default(); + let mut repack_size = BlobTypeMap::::default(); + + for (pi, repack_reason, index_num, pack_num) in std::mem::take(&mut self.repack_candidates) + { + let pack = &mut self.index_files[index_num].packs[pack_num]; + let blob_type = pi.blob_type; + + let total_repack_size: u64 = repack_size.into_values().sum(); + if total_repack_size + u64::from(pi.used_size) >= max_repack + || (self.stats.size.sum().unused_after_prune() < max_unused + && repack_reason == PartlyUsed + && blob_type == BlobType::Data) + || (repack_reason == SizeMismatch && no_resize) + { + pack.set_todo(PackToDo::Keep, &pi, &mut self.stats); + } else if repack_reason == SizeMismatch { + resize_packs[blob_type].push((pi, index_num, pack_num)); + repack_size[blob_type] += u64::from(pi.used_size); + } else { + pack.set_todo(PackToDo::Repack, &pi, &mut self.stats); + repack_size[blob_type] += u64::from(pi.used_size); + do_repack[blob_type] = true; + } + } + for (blob_type, resize_packs) in resize_packs { + // packs in resize_packs are only repacked if we anyway repack this blob type or + // if the target pack size is reached for the blob type. + let todo = if do_repack[blob_type] + || repack_size[blob_type] > u64::from(pack_sizer[blob_type].pack_size()) + { + PackToDo::Repack + } else { + PackToDo::Keep + }; + for (pi, index_num, pack_num) in resize_packs { + let pack = &mut self.index_files[index_num].packs[pack_num]; + pack.set_todo(todo, &pi, &mut self.stats); + } + } + } + + fn check_existing_packs(&mut self) -> RusticResult<()> { + for pack in self.index_files.iter().flat_map(|index| &index.packs) { + let existing_size = self.existing_packs.remove(&pack.id); + + // TODO: Unused Packs which don't exist (i.e. only existing in index) + let check_size = || { + match existing_size { + Some(size) if size == pack.size => Ok(()), // size is ok => continue + Some(size) => Err(CommandErrorKind::PackSizeNotMatching( + pack.id, pack.size, size, + )), + None => Err(CommandErrorKind::PackNotExisting(pack.id)), + } + }; + + match pack.to_do { + PackToDo::Undecided => return Err(CommandErrorKind::NoDecicion(pack.id).into()), + PackToDo::Keep | PackToDo::Recover => { + for blob in &pack.blobs { + _ = self.used_ids.remove(&blob.id); + } + check_size()?; + } + PackToDo::Repack => { + check_size()?; + } + PackToDo::MarkDelete | PackToDo::Delete | PackToDo::KeepMarked => {} + } + } + + self.used_ids.shrink_to_fit(); + self.existing_packs.shrink_to_fit(); + + // all remaining packs in existing_packs are unreferenced packs + for size in self.existing_packs.values() { + self.stats.size_unref += u64::from(*size); + } + self.stats.packs_unref = self.existing_packs.len() as u64; + + Ok(()) + } + + fn filter_index_files(&mut self, instant_delete: bool) { + let mut any_must_modify = false; + self.stats.index_files = self.index_files.len() as u64; + // filter out only the index files which need processing + self.index_files.retain(|index| { + // index must be processed if it has been modified + // or if any pack is not kept + let must_modify = index.modified + || index.packs.iter().any(|p| { + p.to_do != PackToDo::Keep && (instant_delete || p.to_do != PackToDo::KeepMarked) + }); + + any_must_modify |= must_modify; + + // also process index files which are too small (i.e. rebuild them) + must_modify || index.len() < constants::MIN_INDEX_LEN + }); + + if !any_must_modify && self.index_files.len() == 1 { + // only one index file to process but only because it is too small + self.index_files.clear(); + } + + self.stats.index_files_rebuild = self.index_files.len() as u64; + + // TODO: Sort index files such that files with deletes come first and files with + // repacks come at end + } + + pub fn repack_packs(&self) -> Vec { + self.index_files + .iter() + .flat_map(|index| &index.packs) + .filter(|pack| pack.to_do == PackToDo::Repack) + .map(|pack| pack.id) + .collect() + } + + #[allow(clippy::significant_drop_tightening)] + pub fn do_prune( + self, + repo: &OpenRepository

, + opts: &PruneOpts, + ) -> RusticResult<()> { + repo.warm_up_wait(self.repack_packs().into_iter())?; + + let be = &repo.dbe; + let pb = &repo.pb; + + let indexer = Indexer::new_unindexed(be.clone()).into_shared(); + + // Calculate an approximation of sizes after pruning. + // The size actually is: + // total_size_of_all_blobs + total_size_of_pack_headers + #packs * pack_overhead + // This is hard/impossible to compute because: + // - the size of blobs can change during repacking if compression is changed + // - the size of pack headers depends on whether blobs are compressed or not + // - we don't know the number of packs generated by repacking + // So, we simply use the current size of the blobs and an estimation of the pack + // header size. + + let size_after_prune = BlobTypeMap::init(|blob_type| { + self.stats.size[blob_type].total_after_prune() + + self.stats.blobs[blob_type].total_after_prune() + * u64::from(HeaderEntry::ENTRY_LEN_COMPRESSED) + }); + + let tree_repacker = Repacker::new( + be.clone(), + BlobType::Tree, + indexer.clone(), + &repo.config, + size_after_prune[BlobType::Tree], + )?; + + let data_repacker = Repacker::new( + be.clone(), + BlobType::Data, + indexer.clone(), + &repo.config, + size_after_prune[BlobType::Data], + )?; + + // mark unreferenced packs for deletion + if !self.existing_packs.is_empty() { + if opts.instant_delete { + let p = pb.progress_counter("removing unindexed packs..."); + let existing_packs: Vec<_> = self.existing_packs.into_keys().collect(); + be.delete_list(FileType::Pack, true, existing_packs.iter(), p)?; + } else { + let p = + pb.progress_counter("marking unneeded unindexed pack files for deletion..."); + p.set_length(self.existing_packs.len().try_into().unwrap()); + for (id, size) in self.existing_packs { + let pack = IndexPack { + id, + size: Some(size), + time: Some(Local::now()), + blobs: Vec::new(), + }; + indexer.write().unwrap().add_remove(pack)?; + p.inc(1); + } + p.finish(); + } + } + + // process packs by index_file + let p = match (self.index_files.is_empty(), self.stats.packs.repack > 0) { + (true, _) => { + info!("nothing to do!"); + pb.progress_hidden() + } + // TODO: Use a MultiProgressBar here + (false, true) => pb.progress_bytes("repacking // rebuilding index..."), + (false, false) => pb.progress_spinner("rebuilding index..."), + }; + + p.set_length(self.stats.size.sum().repack - self.stats.size.sum().repackrm); + + let mut indexes_remove = Vec::new(); + let tree_packs_remove = Arc::new(Mutex::new(Vec::new())); + let data_packs_remove = Arc::new(Mutex::new(Vec::new())); + + let delete_pack = |pack: PrunePack| { + // delete pack + match pack.blob_type { + BlobType::Data => data_packs_remove.lock().unwrap().push(pack.id), + BlobType::Tree => tree_packs_remove.lock().unwrap().push(pack.id), + } + }; + + let used_ids = Arc::new(Mutex::new(self.used_ids)); + + let packs: Vec<_> = self + .index_files + .into_iter() + .map(|index| { + indexes_remove.push(index.id); + index + }) + .flat_map(|index| index.packs) + .collect(); + + packs + .into_par_iter() + .try_for_each(|pack| -> RusticResult<_> { + match pack.to_do { + PackToDo::Undecided => return Err(CommandErrorKind::NoDecicion(pack.id).into()), + PackToDo::Keep => { + // keep pack: add to new index + let pack = pack.into_index_pack(); + indexer.write().unwrap().add(pack)?; + } + PackToDo::Repack => { + // TODO: repack in parallel + for blob in &pack.blobs { + if used_ids.lock().unwrap().remove(&blob.id).is_none() { + // don't save duplicate blobs + continue; + } + + let repacker = match blob.tpe { + BlobType::Data => &data_repacker, + BlobType::Tree => &tree_repacker, + }; + if opts.fast_repack { + repacker.add_fast(&pack.id, blob)?; + } else { + repacker.add(&pack.id, blob)?; + } + p.inc(u64::from(blob.length)); + } + if opts.instant_delete { + delete_pack(pack); + } else { + // mark pack for removal + let pack = pack.into_index_pack_with_time(self.time); + indexer.write().unwrap().add_remove(pack)?; + } + } + PackToDo::MarkDelete => { + if opts.instant_delete { + delete_pack(pack); + } else { + // mark pack for removal + let pack = pack.into_index_pack_with_time(self.time); + indexer.write().unwrap().add_remove(pack)?; + } + } + PackToDo::KeepMarked => { + if opts.instant_delete { + delete_pack(pack); + } else { + // keep pack: add to new index + let pack = pack.into_index_pack(); + indexer.write().unwrap().add_remove(pack)?; + } + } + PackToDo::Recover => { + // recover pack: add to new index in section packs + let pack = pack.into_index_pack_with_time(self.time); + indexer.write().unwrap().add(pack)?; + } + PackToDo::Delete => delete_pack(pack), + } + Ok(()) + })?; + _ = tree_repacker.finalize()?; + _ = data_repacker.finalize()?; + indexer.write().unwrap().finalize()?; + p.finish(); + + // remove old index files first as they may reference pack files which are removed soon. + if !indexes_remove.is_empty() { + let p = pb.progress_counter("removing old index files..."); + be.delete_list(FileType::Index, true, indexes_remove.iter(), p)?; + } + + // get variable out of Arc> + let data_packs_remove = data_packs_remove.lock().unwrap(); + if !data_packs_remove.is_empty() { + let p = pb.progress_counter("removing old data packs..."); + be.delete_list(FileType::Pack, false, data_packs_remove.iter(), p)?; + } + + // get variable out of Arc> + let tree_packs_remove = tree_packs_remove.lock().unwrap(); + if !tree_packs_remove.is_empty() { + let p = pb.progress_counter("removing old tree packs..."); + be.delete_list(FileType::Pack, true, tree_packs_remove.iter(), p)?; + } + + Ok(()) + } +} + +#[derive(PartialEq, Eq, Clone, Copy, Debug)] +struct PackInfo { + blob_type: BlobType, + used_blobs: u16, + unused_blobs: u16, + used_size: u32, + unused_size: u32, +} + +impl PartialOrd for PackInfo { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for PackInfo { + fn cmp(&self, other: &Self) -> Ordering { + // first order by blob type such that tree packs are picked first + self.blob_type.cmp(&other.blob_type).then( + // then order such that packs with highest + // ratio unused/used space are picked first. + // This is equivalent to ordering by unused / total space. + (u64::from(other.unused_size) * u64::from(self.used_size)) + .cmp(&(u64::from(self.unused_size) * u64::from(other.used_size))), + ) + } +} + +impl PackInfo { + fn from_pack(pack: &PrunePack, used_ids: &mut HashMap) -> Self { + let mut pi = Self { + blob_type: pack.blob_type, + used_blobs: 0, + unused_blobs: 0, + used_size: 0, + unused_size: 0, + }; + + // We search all blobs in the pack for needed ones. We do this by already marking + // and decreasing the used blob counter for the processed blobs. If the counter + // was decreased to 0, the blob and therefore the pack is actually used. + // Note that by this processing, we are also able to handle duplicate blobs within a pack + // correctly. + // If we found a needed blob, we stop and process the information that the pack is actually needed. + let first_needed = pack.blobs.iter().position(|blob| { + match used_ids.get_mut(&blob.id) { + None | Some(0) => { + pi.unused_size += blob.length; + pi.unused_blobs += 1; + } + Some(count) => { + // decrease counter + *count -= 1; + if *count == 0 { + // blob is actually needed + pi.used_size += blob.length; + pi.used_blobs += 1; + return true; // break the search + } + // blob is not needed + pi.unused_size += blob.length; + pi.unused_blobs += 1; + } + } + false // continue with next blob + }); + + if let Some(first_needed) = first_needed { + // The pack is actually needed. + // We reprocess the blobs up to the first needed one and mark all blobs which are genarally needed as used. + for blob in &pack.blobs[..first_needed] { + match used_ids.get_mut(&blob.id) { + None | Some(0) => {} // already correctly marked + Some(count) => { + // remark blob as used + pi.unused_size -= blob.length; + pi.unused_blobs -= 1; + pi.used_size += blob.length; + pi.used_blobs += 1; + *count = 0; // count = 0 indicates to other packs that the blob is not needed anymore. + } + } + } + // Then we process the remaining blobs and mark all blobs which are generally needed as used in this blob + for blob in &pack.blobs[first_needed + 1..] { + match used_ids.get_mut(&blob.id) { + None | Some(0) => { + pi.unused_size += blob.length; + pi.unused_blobs += 1; + } + Some(count) => { + // blob is used in this pack + pi.used_size += blob.length; + pi.used_blobs += 1; + *count = 0; // count = 0 indicates to other packs that the blob is not needed anymore. + } + } + } + } + + pi + } +} + +// find used blobs in repo +fn find_used_blobs( + index: &(impl IndexedBackend + Unpin), + ignore_snaps: &[Id], + pb: &impl ProgressBars, +) -> RusticResult> { + let ignore_snaps: HashSet<_> = ignore_snaps.iter().collect(); + + let p = pb.progress_counter("reading snapshots..."); + let list = index + .be() + .list(FileType::Snapshot)? + .into_iter() + .filter(|id| !ignore_snaps.contains(id)) + .collect(); + let snap_trees: Vec<_> = index + .be() + .stream_list::(list, &p)? + .into_iter() + .map_ok(|(_, snap)| snap.tree) + .try_collect()?; + p.finish(); + + let mut ids: HashMap<_, _> = snap_trees.iter().map(|id| (*id, 0)).collect(); + let p = pb.progress_counter("finding used blobs..."); + + let mut tree_streamer = TreeStreamerOnce::new(index.clone(), snap_trees, p)?; + while let Some(item) = tree_streamer.next().transpose()? { + let (_, tree) = item; + for node in tree.nodes { + match node.node_type { + NodeType::File => { + ids.extend(node.content.iter().flatten().map(|id| (*id, 0))); + } + NodeType::Dir => { + _ = ids.insert(node.subtree.unwrap(), 0); + } + _ => {} // nothing to do + } + } + } + + Ok(ids) +} diff --git a/crates/rustic_core/src/commands/repoinfo.rs b/crates/rustic_core/src/commands/repoinfo.rs new file mode 100644 index 000000000..3d4197e02 --- /dev/null +++ b/crates/rustic_core/src/commands/repoinfo.rs @@ -0,0 +1,143 @@ +use serde::{Deserialize, Serialize}; + +use crate::{ + index::IndexEntry, + repofile::indexfile::{IndexFile, IndexPack}, + BlobType, BlobTypeMap, DecryptReadBackend, FileType, OpenRepository, Progress, ProgressBars, + ReadBackend, Repository, RusticResult, ALL_FILE_TYPES, +}; + +#[derive(Default, Clone, Debug, Serialize, Deserialize)] +pub struct IndexInfos { + pub blobs: Vec, + pub blobs_delete: Vec, + pub packs: Vec, + pub packs_delete: Vec, +} + +#[derive(Clone, Copy, Debug, Serialize, Deserialize)] +pub struct BlobInfo { + pub blob_type: BlobType, + pub count: u64, + pub size: u64, + pub data_size: u64, +} + +impl BlobInfo { + pub fn add(&mut self, ie: IndexEntry) { + self.count += 1; + self.size += u64::from(ie.length); + self.data_size += u64::from(ie.data_length()); + } +} + +#[serde_with::apply(Option => #[serde(default, skip_serializing_if = "Option::is_none")])] +#[derive(Clone, Copy, Debug, Serialize, Deserialize)] +pub struct PackInfo { + pub blob_type: BlobType, + pub count: u64, + pub min_size: Option, + pub max_size: Option, +} + +impl PackInfo { + pub fn add(&mut self, ip: &IndexPack) { + self.count += 1; + let size = u64::from(ip.pack_size()); + self.min_size = self + .min_size + .map_or(Some(size), |min_size| Some(min_size.min(size))); + self.max_size = self + .max_size + .map_or(Some(size), |max_size| Some(max_size.max(size))); + } +} + +pub(crate) fn collect_index_infos( + repo: &OpenRepository

, +) -> RusticResult { + let mut blob_info = BlobTypeMap::<()>::default().map(|blob_type, _| BlobInfo { + blob_type, + count: 0, + size: 0, + data_size: 0, + }); + let mut blob_info_delete = blob_info; + let mut pack_info = BlobTypeMap::<()>::default().map(|blob_type, _| PackInfo { + blob_type, + count: 0, + min_size: None, + max_size: None, + }); + let mut pack_info_delete = pack_info; + + let p = repo.pb.progress_counter("scanning index..."); + for index in repo.dbe.stream_all::(&p)? { + let index = index?.1; + for pack in &index.packs { + let tpe = pack.blob_type(); + pack_info[tpe].add(pack); + + for blob in &pack.blobs { + let ie = IndexEntry::from_index_blob(blob, pack.id); + blob_info[tpe].add(ie); + } + } + + for pack in &index.packs_to_delete { + let tpe = pack.blob_type(); + pack_info_delete[tpe].add(pack); + for blob in &pack.blobs { + let ie = IndexEntry::from_index_blob(blob, pack.id); + blob_info_delete[tpe].add(ie); + } + } + } + p.finish(); + + let info = IndexInfos { + blobs: blob_info.into_values().collect(), + blobs_delete: blob_info_delete.into_values().collect(), + packs: pack_info.into_values().collect(), + packs_delete: pack_info_delete.into_values().collect(), + }; + + Ok(info) +} + +#[serde_with::apply(Option => #[serde(default, skip_serializing_if = "Option::is_none")])] +#[derive(Default, Clone, Debug, Serialize, Deserialize)] +pub struct RepoFileInfos { + pub repo: Vec, + pub repo_hot: Option>, +} + +#[derive(Clone, Copy, Debug, Serialize, Deserialize)] +pub struct RepoFileInfo { + pub tpe: FileType, + pub count: u64, + pub size: u64, +} + +pub(crate) fn collect_file_info(be: &impl ReadBackend) -> RusticResult> { + let mut files = Vec::with_capacity(ALL_FILE_TYPES.len()); + for tpe in ALL_FILE_TYPES { + let list = be.list_with_size(tpe)?; + let count = list.len() as u64; + let size = list.iter().map(|f| u64::from(f.1)).sum(); + files.push(RepoFileInfo { tpe, count, size }); + } + Ok(files) +} + +pub fn collect_file_infos(repo: &Repository

) -> RusticResult { + let p = repo.pb.progress_spinner("scanning files..."); + let files = collect_file_info(&repo.be)?; + let files_hot = repo.be_hot.as_ref().map(collect_file_info).transpose()?; + p.finish(); + + Ok(RepoFileInfos { + repo: files, + repo_hot: files_hot, + }) +} diff --git a/crates/rustic_core/src/commands/snapshots.rs b/crates/rustic_core/src/commands/snapshots.rs new file mode 100644 index 000000000..3d9211be1 --- /dev/null +++ b/crates/rustic_core/src/commands/snapshots.rs @@ -0,0 +1,39 @@ +//! `smapshot` subcommand + +use crate::{ + OpenRepository, ProgressBars, RusticResult, SnapshotFile, SnapshotGroup, SnapshotGroupCriterion, +}; + +pub(crate) fn get_snapshot_group( + repo: &OpenRepository

, + ids: &[String], + group_by: SnapshotGroupCriterion, + filter: impl FnMut(&SnapshotFile) -> bool, +) -> RusticResult)>> { + let pb = &repo.pb; + let p = pb.progress_counter("getting snapshots..."); + let groups = match ids { + [] => SnapshotFile::group_from_backend(&repo.dbe, filter, group_by, &p)?, + [id] if id == "latest" => { + SnapshotFile::group_from_backend(&repo.dbe, filter, group_by, &p)? + .into_iter() + .map(|(group, mut snaps)| { + snaps.sort_unstable(); + let last_idx = snaps.len() - 1; + snaps.swap(0, last_idx); + snaps.truncate(1); + (group, snaps) + }) + .collect::>() + } + _ => { + let item = ( + SnapshotGroup::default(), + SnapshotFile::from_ids(&repo.dbe, ids, &p)?, + ); + vec![item] + } + }; + + Ok(groups) +} diff --git a/crates/rustic_core/src/error.rs b/crates/rustic_core/src/error.rs index 60b5c10ab..38dea4a97 100644 --- a/crates/rustic_core/src/error.rs +++ b/crates/rustic_core/src/error.rs @@ -144,6 +144,24 @@ pub enum RusticErrorKind { pub enum CommandErrorKind { /// path is no dir: `{0:?}` PathIsNoDir(String), + /// used blobs are missing: blob {0} doesn't existing + BlobsMissing(Id), + /// packs_to_delete doesn't contain `time`. + NoTimeInPacksToDelete, + /// used pack {0}: size does not match! Expected size: {1}, real size: {2} + PackSizeNotMatching(Id, u32, u32), + /// "used pack {0} does not exist! + PackNotExisting(Id), + /// pack {0} got no decicion what to do + NoDecicion(Id), + /// {0:?} + FromParseIntError(#[from] ParseIntError), + /// {0} + FromByteSizeParser(String), + /// --repack-uncompressed makes no sense for v1 repo! + RepackUncompressedRepoV1, + /// datetime out of range: `{0:?}` + FromOutOfRangeError(#[from] OutOfRangeError), } /// [`CryptoErrorKind`] describes the errors that can happen while dealing with Cryptographic functions @@ -217,6 +235,8 @@ pub enum RepositoryErrorKind { AccessToConfigFileFailed, /// {0:?} FromNomError(nom::Err<()>), + /// {0:?} + FromThreadPoolbilderError(rayon::ThreadPoolBuildError), /// reading Password failed: `{0:?}` ReadingPasswordFromReaderFailed(std::io::Error), /// reading Password from prompt failed: `{0:?}` diff --git a/crates/rustic_core/src/lib.rs b/crates/rustic_core/src/lib.rs index 1ec76f8fe..bfa226262 100644 --- a/crates/rustic_core/src/lib.rs +++ b/crates/rustic_core/src/lib.rs @@ -120,7 +120,11 @@ pub use crate::{ BlobLocation, BlobType, BlobTypeMap, Initialize, Sum, }, chunker::random_poly, - commands::check::CheckOpts, + commands::{ + check::CheckOpts, + prune::{PruneOpts, PrunePlan, PruneStats}, + repoinfo::{BlobInfo, IndexInfos, PackInfo, RepoFileInfo, RepoFileInfos}, + }, crypto::{aespoly1305::Key, hasher::hash}, error::{RusticError, RusticResult}, file::{AddFileResult, FileInfos, RestoreStats}, @@ -142,5 +146,5 @@ pub use crate::{ }, RepoFile, }, - repository::{parse_command, read_password_from_reader, OpenRepository, RepoInfo, Repository, RepositoryOptions}, + repository::{read_password_from_reader, OpenRepository, RepoInfo, Repository, RepositoryOptions }; diff --git a/crates/rustic_core/src/repofile/snapshotfile.rs b/crates/rustic_core/src/repofile/snapshotfile.rs index f6f9fe469..5a5fc905f 100644 --- a/crates/rustic_core/src/repofile/snapshotfile.rs +++ b/crates/rustic_core/src/repofile/snapshotfile.rs @@ -341,7 +341,7 @@ impl SnapshotFile { pub fn group_from_backend( be: &B, filter: F, - crit: &SnapshotGroupCriterion, + crit: SnapshotGroupCriterion, p: &impl Progress, ) -> RusticResult)>> where @@ -349,7 +349,7 @@ impl SnapshotFile { F: FnMut(&Self) -> bool, { let mut snaps = Self::all_from_backend(be, filter, p)?; - snaps.sort_unstable_by(|sn1, sn2| sn1.cmp_group(*crit, sn2)); + snaps.sort_unstable_by(|sn1, sn2| sn1.cmp_group(crit, sn2)); let mut result = Vec::new(); for (group, snaps) in &snaps @@ -528,7 +528,7 @@ impl Display for SnapshotGroup { impl SnapshotGroup { #[must_use] - pub fn from_sn(sn: &SnapshotFile, crit: &SnapshotGroupCriterion) -> Self { + pub fn from_sn(sn: &SnapshotFile, crit: SnapshotGroupCriterion) -> Self { Self { hostname: crit.hostname.then(|| sn.hostname.clone()), label: crit.label.then(|| sn.label.clone()), diff --git a/crates/rustic_core/src/repository.rs b/crates/rustic_core/src/repository.rs index 89041880a..1eb5b4367 100644 --- a/crates/rustic_core/src/repository.rs +++ b/crates/rustic_core/src/repository.rs @@ -7,7 +7,6 @@ use std::{ }; use bytes::Bytes; -use derive_more::Add; use log::{debug, error, info}; use nom::{ @@ -29,44 +28,24 @@ use crate::{ decrypt::DecryptReadBackend, decrypt::DecryptWriteBackend, hotcold::HotColdBackend, FileType, ReadBackend, }, - commands::{self, check::CheckOpts}, + commands::{ + self, + check::CheckOpts, + repoinfo::{IndexInfos, RepoFileInfos}, + }, crypto::aespoly1305::Key, error::RepositoryErrorKind, - index::IndexEntry, - repofile::{configfile::ConfigFile, indexfile::IndexPack, keyfile::find_key_in_backend}, - BlobType, IndexBackend, NoProgressBars, ProgressBars, RusticResult, SnapshotFile, + repofile::{configfile::ConfigFile, keyfile::find_key_in_backend}, + BlobType, Id, IndexBackend, NoProgressBars, ProgressBars, PruneOpts, PrunePlan, RusticResult, + SnapshotFile, SnapshotGroup, SnapshotGroupCriterion, }; pub(super) mod constants { pub(super) const MAX_PASSWORD_RETRIES: usize = 5; } -#[derive(Default, Clone, Copy, Add, Debug)] -pub struct RepoInfo { - pub count: u64, - pub size: u64, - pub data_size: u64, - pub pack_count: u64, - pub total_pack_size: u64, - pub min_pack_size: u64, - pub max_pack_size: u64, -} - -impl RepoInfo { - pub fn add(&mut self, ie: IndexEntry) { - self.count += 1; - self.size += u64::from(ie.length); - self.data_size += u64::from(ie.data_length()); - } - - pub fn add_pack(&mut self, ip: &IndexPack) { - self.pack_count += 1; - let size = u64::from(ip.pack_size()); - self.total_pack_size += size; - self.min_pack_size = self.min_pack_size.min(size); - self.max_pack_size = self.max_pack_size.max(size); - } -} +mod warm_up; +use warm_up::{warm_up, warm_up_wait}; #[serde_as] #[cfg_attr(feature = "clap", derive(clap::Parser))] @@ -201,7 +180,7 @@ pub struct Repository

{ pub be: HotColdBackend, pub be_hot: Option, opts: RepositoryOptions, - pb: P, + pub(crate) pb: P, } impl Repository { @@ -354,6 +333,12 @@ impl

Repository

{ } } +impl Repository

{ + pub fn infos_files(&self) -> RusticResult { + commands::repoinfo::collect_file_infos(self) + } +} + pub(crate) fn get_key(be: &impl ReadBackend, password: Option) -> RusticResult { for _ in 0..constants::MAX_PASSWORD_RETRIES { match password { @@ -394,6 +379,15 @@ pub struct OpenRepository

{ } impl OpenRepository

{ + pub fn get_snapshot_group( + &self, + ids: &[String], + group_by: SnapshotGroupCriterion, + filter: impl FnMut(&SnapshotFile) -> bool, + ) -> RusticResult)>> { + commands::snapshots::get_snapshot_group(self, ids, group_by, filter) + } + pub fn cat_file(&self, tpe: FileType, id: &str) -> RusticResult { commands::cat::cat_file(self, tpe, id) } @@ -402,10 +396,26 @@ impl OpenRepository

{ opts.run(self) } + pub fn prune_plan(&self, opts: &PruneOpts) -> RusticResult { + opts.get_plan(self) + } + pub fn to_indexed(self) -> RusticResult> { let index = IndexBackend::new(&self.dbe, &self.pb.progress_counter(""))?; Ok(IndexedRepository { repo: self, index }) } + + pub fn infos_index(&self) -> RusticResult { + commands::repoinfo::collect_index_infos(self) + } + + pub fn warm_up(&self, packs: impl ExactSizeIterator) -> RusticResult<()> { + warm_up(self, packs) + } + + pub fn warm_up_wait(&self, packs: impl ExactSizeIterator) -> RusticResult<()> { + warm_up_wait(self, packs) + } } #[derive(Debug)] diff --git a/crates/rustic_core/src/repository/warm_up.rs b/crates/rustic_core/src/repository/warm_up.rs new file mode 100644 index 000000000..433005cd0 --- /dev/null +++ b/crates/rustic_core/src/repository/warm_up.rs @@ -0,0 +1,93 @@ +use std::process::Command; +use std::thread::sleep; + +use log::{debug, warn}; +use rayon::ThreadPoolBuilder; + +use super::parse_command; +use crate::{ + error::RepositoryErrorKind, FileType, Id, OpenRepository, Progress, ProgressBars, ReadBackend, + RusticResult, +}; + +pub(super) mod constants { + pub(super) const MAX_READER_THREADS_NUM: usize = 20; +} + +pub(crate) fn warm_up_wait( + repo: &OpenRepository

, + packs: impl ExactSizeIterator, +) -> RusticResult<()> { + warm_up(repo, packs)?; + if let Some(wait) = repo.opts.warm_up_wait { + let p = repo.pb.progress_spinner(format!("waiting {wait}...")); + sleep(*wait); + p.finish(); + } + Ok(()) +} + +pub(crate) fn warm_up( + repo: &OpenRepository

, + packs: impl ExactSizeIterator, +) -> RusticResult<()> { + if let Some(command) = &repo.opts.warm_up_command { + warm_up_command(packs, command, &repo.pb)?; + } else if repo.opts.warm_up { + warm_up_access(repo, packs)?; + } + Ok(()) +} + +fn warm_up_command( + packs: impl ExactSizeIterator, + command: &str, + pb: &P, +) -> RusticResult<()> { + let p = pb.progress_counter("warming up packs..."); + p.set_length(packs.len() as u64); + for pack in packs { + let actual_command = command.replace("%id", &pack.to_hex()); + debug!("calling {actual_command}..."); + let commands = parse_command::<()>(&actual_command) + .map_err(RepositoryErrorKind::FromNomError)? + .1; + let status = Command::new(commands[0]).args(&commands[1..]).status()?; + if !status.success() { + warn!("warm-up command was not successful for pack {pack:?}. {status}"); + } + } + p.finish(); + Ok(()) +} + +fn warm_up_access( + repo: &OpenRepository

, + packs: impl ExactSizeIterator, +) -> RusticResult<()> { + let mut be = repo.be.clone(); + be.set_option("retry", "false")?; + + let p = repo.pb.progress_counter("warming up packs..."); + p.set_length(packs.len() as u64); + + let pool = ThreadPoolBuilder::new() + .num_threads(constants::MAX_READER_THREADS_NUM) + .build() + .map_err(RepositoryErrorKind::FromThreadPoolbilderError)?; + let p = &p; + let be = &be; + pool.in_place_scope(|s| { + for pack in packs { + s.spawn(move |_| { + // ignore errors as they are expected from the warm-up + _ = be.read_partial(FileType::Pack, &pack, false, 0, 1); + p.inc(1); + }); + } + }); + + p.finish(); + + Ok(()) +} diff --git a/src/commands/backup.rs b/src/commands/backup.rs index d9170cdb0..26293087b 100644 --- a/src/commands/backup.rs +++ b/src/commands/backup.rs @@ -252,7 +252,7 @@ impl BackupCmd { // get suitable snapshot group from snapshot and opts.group_by. This is used to filter snapshots for the parent detection let group = SnapshotGroup::from_sn( &snap, - &opts.group_by.unwrap_or_else(|| { + opts.group_by.unwrap_or_else(|| { SnapshotGroupCriterion::from_str("host,label,paths").unwrap() }), ); diff --git a/src/commands/forget.rs b/src/commands/forget.rs index 8e3099664..94e8640aa 100644 --- a/src/commands/forget.rs +++ b/src/commands/forget.rs @@ -113,7 +113,7 @@ impl ForgetCmd { SnapshotFile::group_from_backend( be, |sn| config.forget.filter.matches(sn), - &group_by, + group_by, &p, )? } else { @@ -201,7 +201,7 @@ impl ForgetCmd { if self.config.prune { let mut prune_opts = self.prune_opts.clone(); - prune_opts.ignore_snaps = forget_snaps; + prune_opts.opts.ignore_snaps = forget_snaps; prune_opts.run(); } diff --git a/src/commands/prune.rs b/src/commands/prune.rs index f8feae42c..c58a6592c 100644 --- a/src/commands/prune.rs +++ b/src/commands/prune.rs @@ -4,92 +4,23 @@ /// accessors along with logging macros. Customize as you see fit. use crate::{ commands::{get_repository, open_repository}, - config::progress_options::ProgressOptions, helpers::bytes_size_to_string, status_err, Application, RUSTIC_APP, }; use abscissa_core::{Command, Runnable, Shutdown}; -use log::{debug, error, info}; +use log::debug; -use std::{ - cmp::Ordering, - collections::{HashMap, HashSet}, - str::FromStr, - sync::{Arc, Mutex}, -}; - -use anyhow::{anyhow, bail, Result}; -use bytesize::ByteSize; -use chrono::{DateTime, Duration, Local}; - -use derive_more::Add; -use itertools::Itertools; -use rayon::prelude::{IntoParallelIterator, ParallelIterator}; - -use crate::helpers::warm_up_wait; +use anyhow::Result; -use rustic_core::{ - BlobType, BlobTypeMap, DecryptReadBackend, DecryptWriteBackend, FileType, HeaderEntry, Id, - IndexBackend, IndexBlob, IndexCollector, IndexFile, IndexPack, IndexType, IndexedBackend, - Indexer, Initialize, NodeType, OpenRepository, PackSizer, Progress, ProgressBars, ReadBackend, - ReadIndex, Repacker, SnapshotFile, Sum, TreeStreamerOnce, -}; - -pub(super) mod constants { - pub(super) const MIN_INDEX_LEN: usize = 10_000; -} +use rustic_core::{PruneOpts, PruneStats, Sum}; /// `prune` subcommand #[allow(clippy::struct_excessive_bools)] #[derive(clap::Parser, Command, Debug, Clone)] #[group(id = "prune_opts")] pub(crate) struct PruneCmd { - /// Define maximum data to repack in % of reposize or as size (e.g. '5b', '2 kB', '3M', '4TiB') or 'unlimited' - #[clap(long, value_name = "LIMIT", default_value = "unlimited")] - max_repack: LimitOption, - - /// Tolerate limit of unused data in % of reposize after pruning or as size (e.g. '5b', '2 kB', '3M', '4TiB') or 'unlimited' - #[clap(long, value_name = "LIMIT", default_value = "5%")] - max_unused: LimitOption, - - /// Minimum duration (e.g. 90d) to keep packs before repacking or removing. More recently created - /// packs won't be repacked or marked for deletion within this prune run. - #[clap(long, value_name = "DURATION", default_value = "0d")] - keep_pack: humantime::Duration, - - /// Minimum duration (e.g. 10m) to keep packs marked for deletion. More recently marked packs won't be - /// deleted within this prune run. - #[clap(long, value_name = "DURATION", default_value = "23h")] - keep_delete: humantime::Duration, - - /// Delete files immediately instead of marking them. This also removes all files already marked for deletion. - /// WARNING: Only use if you are sure the repository is not accessed by parallel processes! - #[clap(long)] - instant_delete: bool, - - /// Simply copy blobs when repacking instead of decrypting; possibly compressing; encrypting - #[clap(long)] - fast_repack: bool, - - /// Repack packs containing uncompressed blobs. This cannot be used with --fast-repack. - /// Implies --max-unused=0. - #[clap(long, conflicts_with = "fast_repack")] - repack_uncompressed: bool, - - /// Repack all packs. Implies --max-unused=0. - #[clap(long)] - repack_all: bool, - - /// Only repack packs which are cacheable [default: true for a hot/cold repository, else false] - #[clap(long, value_name = "TRUE/FALSE")] - repack_cacheable_only: Option, - - /// Do not repack packs which only needs to be resized - #[clap(long)] - no_resize: bool, - - #[clap(skip)] - pub(crate) ignore_snaps: Vec, + #[clap(flatten)] + pub(crate) opts: PruneOpts, } impl Runnable for PruneCmd { @@ -104,1086 +35,110 @@ impl Runnable for PruneCmd { impl PruneCmd { fn inner_run(&self) -> Result<()> { let config = RUSTIC_APP.config(); - let progress_options = &config.global.progress_options; - let repo = open_repository(get_repository(&config)); - let be = &repo.dbe; - if repo.config.version < 2 && self.repack_uncompressed { - bail!("--repack-uncompressed makes no sense for v1 repo!"); - } - - let mut index_files = Vec::new(); - - let p = progress_options.progress_counter("reading index..."); - let mut index_collector = IndexCollector::new(IndexType::OnlyTrees); + let pruner = repo.prune_plan(&self.opts)?; - be.stream_all::(&p)? - .into_iter() - .for_each(|index| { - let (id, index) = match index { - Ok(it) => it, - Err(err) => { - status_err!("{}", err); - RUSTIC_APP.shutdown(Shutdown::Crash); - } - }; - index_collector.extend(index.packs.clone()); - // we add the trees from packs_to_delete to the index such that searching for - // used blobs doesn't abort if they are already marked for deletion - index_collector.extend(index.packs_to_delete.clone()); - - index_files.push((id, index)); - }); - p.finish(); - - let (used_ids, total_size) = { - let index = index_collector.into_index(); - let total_size = BlobTypeMap::init(|blob_type| index.total_size(blob_type)); - let indexed_be = IndexBackend::new_from_index(&be.clone(), index); - let used_ids = find_used_blobs(&indexed_be, &self.ignore_snaps, progress_options)?; - (used_ids, total_size) - }; - - // list existing pack files - let p = progress_options.progress_spinner("getting packs from repository..."); - let existing_packs: HashMap<_, _> = - be.list_with_size(FileType::Pack)?.into_iter().collect(); - p.finish(); - - let mut pruner = Pruner::new(used_ids, existing_packs, index_files); - pruner.count_used_blobs(); - pruner.check()?; - let repack_cacheable_only = self - .repack_cacheable_only - .unwrap_or_else(|| repo.config.is_hot == Some(true)); - let pack_sizer = - total_size.map(|tpe, size| PackSizer::from_config(&repo.config, tpe, size)); - pruner.decide_packs( - Duration::from_std(*self.keep_pack)?, - Duration::from_std(*self.keep_delete)?, - repack_cacheable_only, - self.repack_uncompressed, - self.repack_all, - &pack_sizer, - ); - pruner.decide_repack( - &self.max_repack, - &self.max_unused, - self.repack_uncompressed || self.repack_all, - self.no_resize, - &pack_sizer, - ); - pruner.check_existing_packs()?; - pruner.filter_index_files(self.instant_delete); - pruner.print_stats(); + print_stats(&pruner.stats); - let dry_run = config.global.dry_run; - warm_up_wait( - &repo, - pruner.repack_packs().into_iter(), - !dry_run, - progress_options, - )?; - - if !dry_run { - pruner.do_prune(repo, self, progress_options)?; + if config.global.dry_run { + repo.warm_up(pruner.repack_packs().into_iter())?; + } else { + pruner.do_prune(&repo, &self.opts)?; } Ok(()) } } - -#[derive(Clone, Debug)] -enum LimitOption { - Size(ByteSize), - Percentage(u64), - Unlimited, -} - -impl FromStr for LimitOption { - type Err = anyhow::Error; - fn from_str(s: &str) -> Result { - Ok(match s.chars().last().unwrap_or('0') { - '%' => Self::Percentage({ - let mut copy = s.to_string(); - _ = copy.pop(); - copy.parse()? - }), - 'd' if s == "unlimited" => Self::Unlimited, - _ => Self::Size(ByteSize::from_str(s).map_err(|err| anyhow!(err))?), - }) - } -} - -#[derive(Default)] -struct DeleteStats { - remove: u64, - recover: u64, - keep: u64, -} - -impl DeleteStats { - const fn total(&self) -> u64 { - self.remove + self.recover + self.keep - } -} -#[derive(Default)] -struct PackStats { - used: u64, - partly_used: u64, - unused: u64, // this equals to packs-to-remove - repack: u64, - keep: u64, -} -#[derive(Default, Clone, Copy, Add)] -struct SizeStats { - used: u64, - unused: u64, - remove: u64, - repack: u64, - repackrm: u64, -} - -impl SizeStats { - const fn total(&self) -> u64 { - self.used + self.unused - } - const fn total_after_prune(&self) -> u64 { - self.used + self.unused_after_prune() - } - const fn unused_after_prune(&self) -> u64 { - self.unused - self.remove - self.repackrm - } -} - -#[derive(Default)] -struct PruneStats { - packs_to_delete: DeleteStats, - size_to_delete: DeleteStats, - packs: PackStats, - blobs: BlobTypeMap, - size: BlobTypeMap, - size_unref: u64, - index_files: u64, -} - -#[derive(Debug)] -struct PruneIndex { - id: Id, - modified: bool, - packs: Vec, -} - -impl PruneIndex { - fn len(&self) -> usize { - self.packs.iter().map(|p| p.blobs.len()).sum() - } -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -enum PackToDo { - Undecided, - Keep, - Repack, - MarkDelete, - KeepMarked, - Recover, - Delete, -} - -#[derive(Debug)] -struct PrunePack { - id: Id, - blob_type: BlobType, - size: u32, - delete_mark: bool, - to_do: PackToDo, - time: Option>, - blobs: Vec, -} - -impl PrunePack { - fn from_index_pack(p: IndexPack, delete_mark: bool) -> Self { - Self { - id: p.id, - blob_type: p.blob_type(), - size: p.pack_size(), - delete_mark, - to_do: PackToDo::Undecided, - time: p.time, - blobs: p.blobs, - } - } - - fn from_index_pack_unmarked(p: IndexPack) -> Self { - Self::from_index_pack(p, false) - } - - fn from_index_pack_marked(p: IndexPack) -> Self { - Self::from_index_pack(p, true) - } - - fn into_index_pack(self) -> IndexPack { - IndexPack { - id: self.id, - time: self.time, - size: None, - blobs: self.blobs, - } - } - - fn into_index_pack_with_time(self, time: DateTime) -> IndexPack { - IndexPack { - id: self.id, - time: Some(time), - size: None, - blobs: self.blobs, - } - } - - fn set_todo(&mut self, todo: PackToDo, pi: &PackInfo, stats: &mut PruneStats) { - let tpe = self.blob_type; - match todo { - PackToDo::Undecided => panic!("not possible"), - PackToDo::Keep => { - stats.blobs[tpe].used += u64::from(pi.used_blobs); - stats.blobs[tpe].unused += u64::from(pi.unused_blobs); - stats.size[tpe].used += u64::from(pi.used_size); - stats.size[tpe].unused += u64::from(pi.unused_size); - stats.packs.keep += 1; - } - PackToDo::Repack => { - stats.blobs[tpe].used += u64::from(pi.used_blobs); - stats.blobs[tpe].unused += u64::from(pi.unused_blobs); - stats.size[tpe].used += u64::from(pi.used_size); - stats.size[tpe].unused += u64::from(pi.unused_size); - stats.packs.repack += 1; - stats.blobs[tpe].repack += u64::from(pi.unused_blobs + pi.used_blobs); - stats.blobs[tpe].repackrm += u64::from(pi.unused_blobs); - stats.size[tpe].repack += u64::from(pi.unused_size + pi.used_size); - stats.size[tpe].repackrm += u64::from(pi.unused_size); - } - - PackToDo::MarkDelete => { - stats.blobs[tpe].unused += u64::from(pi.unused_blobs); - stats.size[tpe].unused += u64::from(pi.unused_size); - stats.blobs[tpe].remove += u64::from(pi.unused_blobs); - stats.size[tpe].remove += u64::from(pi.unused_size); - } - PackToDo::Recover => { - stats.packs_to_delete.recover += 1; - stats.size_to_delete.recover += u64::from(self.size); - } - PackToDo::Delete => { - stats.packs_to_delete.remove += 1; - stats.size_to_delete.remove += u64::from(self.size); - } - PackToDo::KeepMarked => { - stats.packs_to_delete.keep += 1; - stats.size_to_delete.keep += u64::from(self.size); - } - } - self.to_do = todo; - } - - fn is_compressed(&self) -> bool { - self.blobs - .iter() - .all(|blob| blob.uncompressed_length.is_some()) - } -} - -#[derive(PartialEq, Eq)] -enum RepackReason { - PartlyUsed, - ToCompress, - SizeMismatch, -} -use RepackReason::{PartlyUsed, SizeMismatch, ToCompress}; - -struct Pruner { - time: DateTime, - used_ids: HashMap, - existing_packs: HashMap, - repack_candidates: Vec<(PackInfo, RepackReason, usize, usize)>, - index_files: Vec, - stats: PruneStats, -} - -impl Pruner { - fn new( - used_ids: HashMap, - existing_packs: HashMap, - index_files: Vec<(Id, IndexFile)>, - ) -> Self { - let mut processed_packs = HashSet::new(); - let mut processed_packs_delete = HashSet::new(); - let mut index_files: Vec<_> = index_files - .into_iter() - .map(|(id, index)| { - let mut modified = false; - let mut packs: Vec<_> = index - .packs - .into_iter() - // filter out duplicate packs - .filter(|p| { - let no_duplicate = processed_packs.insert(p.id); - modified |= !no_duplicate; - no_duplicate - }) - .map(PrunePack::from_index_pack_unmarked) - .collect(); - packs.extend( - index - .packs_to_delete - .into_iter() - // filter out duplicate packs - .filter(|p| { - let no_duplicate = processed_packs_delete.insert(p.id); - modified |= !no_duplicate; - no_duplicate - }) - .map(PrunePack::from_index_pack_marked), - ); - - PruneIndex { - id, - modified, - packs, - } - }) - .collect(); - - // filter out "normally" indexed packs from packs_to_delete - for index in &mut index_files { - let mut modified = false; - index.packs.retain(|p| { - !p.delete_mark || { - let duplicate = processed_packs.contains(&p.id); - modified |= duplicate; - !duplicate - } - }); - - index.modified |= modified; - } - - Self { - time: Local::now(), - used_ids, - existing_packs, - repack_candidates: Vec::new(), - index_files, - stats: PruneStats::default(), - } - } - - fn count_used_blobs(&mut self) { - for blob in self - .index_files - .iter() - .flat_map(|index| &index.packs) - .flat_map(|pack| &pack.blobs) - { - if let Some(count) = self.used_ids.get_mut(&blob.id) { - // note that duplicates are only counted up to 255. If there are more - // duplicates, the number is set to 255. This may imply that later on - // not the "best" pack is chosen to have that blob marked as used. - *count = count.saturating_add(1); - } - } - } - - fn check(&self) -> Result<()> { - // check that all used blobs are present in index - for (id, count) in &self.used_ids { - if *count == 0 { - error!("used blob {} is missing", id); - bail!("missing blobs"); - } - } - Ok(()) - } - - fn decide_packs( - &mut self, - keep_pack: Duration, - keep_delete: Duration, - repack_cacheable_only: bool, - repack_uncompressed: bool, - repack_all: bool, - pack_sizer: &BlobTypeMap, - ) { - // first process all marked packs then the unmarked ones: - // - first processed packs are more likely to have all blobs seen as unused - // - if marked packs have used blob but these blobs are all present in - // unmarked packs, we want to perform the deletion! - for mark_case in [true, false] { - for (index_num, index) in self.index_files.iter_mut().enumerate() { - for (pack_num, pack) in index - .packs - .iter_mut() - .enumerate() - .filter(|(_, p)| p.delete_mark == mark_case) - { - let pi = PackInfo::from_pack(pack, &mut self.used_ids); - - // Various checks to determine if packs need to be kept - let too_young = pack.time > Some(self.time - keep_pack); - let keep_uncacheable = repack_cacheable_only && !pack.blob_type.is_cacheable(); - - let to_compress = repack_uncompressed && !pack.is_compressed(); - let size_mismatch = !pack_sizer[pack.blob_type].size_ok(pack.size); - - match (pack.delete_mark, pi.used_blobs, pi.unused_blobs) { - (false, 0, _) => { - // unused pack - self.stats.packs.unused += 1; - if too_young { - // keep packs which are too young - pack.set_todo(PackToDo::Keep, &pi, &mut self.stats); - } else { - pack.set_todo(PackToDo::MarkDelete, &pi, &mut self.stats); - } - } - (false, 1.., 0) => { - // used pack - self.stats.packs.used += 1; - if too_young || keep_uncacheable { - pack.set_todo(PackToDo::Keep, &pi, &mut self.stats); - } else if to_compress || repack_all { - self.repack_candidates - .push((pi, ToCompress, index_num, pack_num)); - } else if size_mismatch { - self.repack_candidates.push(( - pi, - SizeMismatch, - index_num, - pack_num, - )); - } else { - pack.set_todo(PackToDo::Keep, &pi, &mut self.stats); - } - } - - (false, 1.., 1..) => { - // partly used pack - self.stats.packs.partly_used += 1; - - if too_young || keep_uncacheable { - // keep packs which are too young and non-cacheable packs if requested - pack.set_todo(PackToDo::Keep, &pi, &mut self.stats); - } else { - // other partly used pack => candidate for repacking - self.repack_candidates - .push((pi, PartlyUsed, index_num, pack_num)); - } - } - (true, 0, _) => { - let local_date_time = pack.time.map_or_else( - || { - status_err!("Packs_to_delete doesn't contain `time`."); - RUSTIC_APP.shutdown(Shutdown::Crash); - }, - |it| it, - ); - if self.time - local_date_time >= keep_delete { - pack.set_todo(PackToDo::Delete, &pi, &mut self.stats); - } else { - pack.set_todo(PackToDo::KeepMarked, &pi, &mut self.stats); - } - } - (true, 1.., _) => { - // needed blobs; mark this pack for recovery - pack.set_todo(PackToDo::Recover, &pi, &mut self.stats); - } - } - } - } - } - } - - fn decide_repack( - &mut self, - max_repack: &LimitOption, - max_unused: &LimitOption, - repack_uncompressed: bool, - no_resize: bool, - pack_sizer: &BlobTypeMap, - ) { - let max_unused = match (repack_uncompressed, max_unused) { - (true, _) => 0, - (false, LimitOption::Unlimited) => u64::MAX, - (false, LimitOption::Size(size)) => size.as_u64(), - // if percentag is given, we want to have - // unused <= p/100 * size_after = p/100 * (size_used + unused) - // which equals (1 - p/100) * unused <= p/100 * size_used - (false, LimitOption::Percentage(p)) => (p * self.stats.size.sum().used) / (100 - p), - }; - - let max_repack = match max_repack { - LimitOption::Unlimited => u64::MAX, - LimitOption::Size(size) => size.as_u64(), - LimitOption::Percentage(p) => (p * self.stats.size.sum().total()) / 100, - }; - - self.repack_candidates.sort_unstable_by_key(|rc| rc.0); - let mut resize_packs = BlobTypeMap::>::default(); - let mut do_repack = BlobTypeMap::default(); - let mut repack_size = BlobTypeMap::::default(); - - for (pi, repack_reason, index_num, pack_num) in std::mem::take(&mut self.repack_candidates) - { - let pack = &mut self.index_files[index_num].packs[pack_num]; - let blob_type = pi.blob_type; - - let total_repack_size: u64 = repack_size.into_values().sum(); - if total_repack_size + u64::from(pi.used_size) >= max_repack - || (self.stats.size.sum().unused_after_prune() < max_unused - && repack_reason == PartlyUsed - && blob_type == BlobType::Data) - || (repack_reason == SizeMismatch && no_resize) - { - pack.set_todo(PackToDo::Keep, &pi, &mut self.stats); - } else if repack_reason == SizeMismatch { - resize_packs[blob_type].push((pi, index_num, pack_num)); - repack_size[blob_type] += u64::from(pi.used_size); - } else { - pack.set_todo(PackToDo::Repack, &pi, &mut self.stats); - repack_size[blob_type] += u64::from(pi.used_size); - do_repack[blob_type] = true; - } - } - for (blob_type, resize_packs) in resize_packs { - // packs in resize_packs are only repacked if we anyway repack this blob type or - // if the target pack size is reached for the blob type. - let todo = if do_repack[blob_type] - || repack_size[blob_type] > u64::from(pack_sizer[blob_type].pack_size()) - { - PackToDo::Repack - } else { - PackToDo::Keep - }; - for (pi, index_num, pack_num) in resize_packs { - let pack = &mut self.index_files[index_num].packs[pack_num]; - pack.set_todo(todo, &pi, &mut self.stats); - } - } - } - - fn check_existing_packs(&mut self) -> Result<()> { - for pack in self.index_files.iter().flat_map(|index| &index.packs) { - let existing_size = self.existing_packs.remove(&pack.id); - - // TODO: Unused Packs which don't exist (i.e. only existing in index) - let check_size = || { - match existing_size { - Some(size) if size == pack.size => Ok(()), // size is ok => continue - Some(size) => bail!( - "used pack {}: size does not match! Expected size: {}, real size: {}", - pack.id, - pack.size, - size - ), - None => bail!("used pack {} does not exist!", pack.id), - } - }; - - match pack.to_do { - PackToDo::Undecided => { - bail!("should not happen!") - } - PackToDo::Keep | PackToDo::Recover => { - for blob in &pack.blobs { - _ = self.used_ids.remove(&blob.id); - } - check_size()?; - } - PackToDo::Repack => { - check_size()?; - } - PackToDo::MarkDelete | PackToDo::Delete | PackToDo::KeepMarked => {} - } - } - - self.used_ids.shrink_to_fit(); - self.existing_packs.shrink_to_fit(); - - // all remaining packs in existing_packs are unreferenced packs - for size in self.existing_packs.values() { - self.stats.size_unref += u64::from(*size); - } - - Ok(()) - } - - fn filter_index_files(&mut self, instant_delete: bool) { - let mut any_must_modify = false; - self.stats.index_files = self.index_files.len() as u64; - // filter out only the index files which need processing - self.index_files.retain(|index| { - // index must be processed if it has been modified - // or if any pack is not kept - let must_modify = index.modified - || index.packs.iter().any(|p| { - p.to_do != PackToDo::Keep && (instant_delete || p.to_do != PackToDo::KeepMarked) - }); - - any_must_modify |= must_modify; - - // also process index files which are too small (i.e. rebuild them) - must_modify || index.len() < constants::MIN_INDEX_LEN - }); - - if !any_must_modify && self.index_files.len() == 1 { - // only one index file to process but only because it is too small - self.index_files.clear(); - } - - // TODO: Sort index files such that files with deletes come first and files with - // repacks come at end - } - - #[allow(clippy::cast_precision_loss)] - fn print_stats(&self) { - let pack_stat = &self.stats.packs; - let blob_stat = self.stats.blobs.sum(); - let size_stat = self.stats.size.sum(); - - debug!( - "used: {:>10} blobs, {:>10}", - blob_stat.used, - bytes_size_to_string(size_stat.used) - ); - - debug!( - "unused: {:>10} blobs, {:>10}", - blob_stat.unused, - bytes_size_to_string(size_stat.unused) - ); - debug!( - "total: {:>10} blobs, {:>10}", - blob_stat.total(), - bytes_size_to_string(size_stat.total()) - ); - - println!( - "to repack: {:>10} packs, {:>10} blobs, {:>10}", - pack_stat.repack, - blob_stat.repack, - bytes_size_to_string(size_stat.repack) - ); - println!( - "this removes: {:>10} blobs, {:>10}", - blob_stat.repackrm, - bytes_size_to_string(size_stat.repackrm) - ); - println!( - "to delete: {:>10} packs, {:>10} blobs, {:>10}", - pack_stat.unused, - blob_stat.remove, - bytes_size_to_string(size_stat.remove) - ); - if !self.existing_packs.is_empty() { - println!( - "unindexed: {:>10} packs, ?? blobs, {:>10}", - self.existing_packs.len(), - bytes_size_to_string(self.stats.size_unref) - ); - } - - println!( - "total prune: {:>10} blobs, {:>10}", - blob_stat.repackrm + blob_stat.remove, - bytes_size_to_string(size_stat.repackrm + size_stat.remove + self.stats.size_unref) - ); - println!( - "remaining: {:>10} blobs, {:>10}", - blob_stat.total_after_prune(), - bytes_size_to_string(size_stat.total_after_prune()) - ); +#[allow(clippy::cast_precision_loss)] +fn print_stats(stats: &PruneStats) { + let pack_stat = &stats.packs; + let blob_stat = stats.blobs.sum(); + let size_stat = stats.size.sum(); + + debug!( + "used: {:>10} blobs, {:>10}", + blob_stat.used, + bytes_size_to_string(size_stat.used) + ); + + debug!( + "unused: {:>10} blobs, {:>10}", + blob_stat.unused, + bytes_size_to_string(size_stat.unused) + ); + debug!( + "total: {:>10} blobs, {:>10}", + blob_stat.total(), + bytes_size_to_string(size_stat.total()) + ); + + println!( + "to repack: {:>10} packs, {:>10} blobs, {:>10}", + pack_stat.repack, + blob_stat.repack, + bytes_size_to_string(size_stat.repack) + ); + println!( + "this removes: {:>10} blobs, {:>10}", + blob_stat.repackrm, + bytes_size_to_string(size_stat.repackrm) + ); + println!( + "to delete: {:>10} packs, {:>10} blobs, {:>10}", + pack_stat.unused, + blob_stat.remove, + bytes_size_to_string(size_stat.remove) + ); + if !stats.packs_unref > 0 { println!( - "unused size after prune: {:>10} ({:.2}% of remaining size)", - bytes_size_to_string(size_stat.unused_after_prune()), - size_stat.unused_after_prune() as f64 / size_stat.total_after_prune() as f64 * 100.0 - ); - - println!(); - - println!( - "packs marked for deletion: {:>10}, {:>10}", - self.stats.packs_to_delete.total(), - bytes_size_to_string(self.stats.size_to_delete.total()), - ); - println!( - " - complete deletion: {:>10}, {:>10}", - self.stats.packs_to_delete.remove, - bytes_size_to_string(self.stats.size_to_delete.remove), - ); - println!( - " - keep marked: {:>10}, {:>10}", - self.stats.packs_to_delete.keep, - bytes_size_to_string(self.stats.size_to_delete.keep), - ); - println!( - " - recover: {:>10}, {:>10}", - self.stats.packs_to_delete.recover, - bytes_size_to_string(self.stats.size_to_delete.recover), - ); - - debug!( - "index files to rebuild: {} / {}", - self.index_files.len(), - self.stats.index_files - ); - } - - fn repack_packs(&self) -> Vec { - self.index_files - .iter() - .flat_map(|index| &index.packs) - .filter(|pack| pack.to_do == PackToDo::Repack) - .map(|pack| pack.id) - .collect() - } - - #[allow(clippy::significant_drop_tightening)] - fn do_prune

( - self, - repo: OpenRepository

, - opts: &PruneCmd, - progress_options: &ProgressOptions, - ) -> Result<()> { - let be = repo.dbe; - - let indexer = Indexer::new_unindexed(be.clone()).into_shared(); - - // Calculate an approximation of sizes after pruning. - // The size actually is: - // total_size_of_all_blobs + total_size_of_pack_headers + #packs * pack_overhead - // This is hard/impossible to compute because: - // - the size of blobs can change during repacking if compression is changed - // - the size of pack headers depends on whether blobs are compressed or not - // - we don't know the number of packs generated by repacking - // So, we simply use the current size of the blobs and an estimation of the pack - // header size. - - let size_after_prune = BlobTypeMap::init(|blob_type| { - self.stats.size[blob_type].total_after_prune() - + self.stats.blobs[blob_type].total_after_prune() - * u64::from(HeaderEntry::ENTRY_LEN_COMPRESSED) - }); - - let tree_repacker = Repacker::new( - be.clone(), - BlobType::Tree, - indexer.clone(), - &repo.config, - size_after_prune[BlobType::Tree], - )?; - - let data_repacker = Repacker::new( - be.clone(), - BlobType::Data, - indexer.clone(), - &repo.config, - size_after_prune[BlobType::Data], - )?; - - // mark unreferenced packs for deletion - if !self.existing_packs.is_empty() { - if opts.instant_delete { - let p = progress_options.progress_counter("removing unindexed packs..."); - let existing_packs: Vec<_> = self.existing_packs.into_keys().collect(); - be.delete_list(FileType::Pack, true, existing_packs.iter(), p)?; - } else { - info!("marking not needed unindexed pack files for deletion..."); - for (id, size) in self.existing_packs { - let pack = IndexPack { - id, - size: Some(size), - time: Some(Local::now()), - blobs: Vec::new(), - }; - indexer.write().unwrap().add_remove(pack)?; - } - } - } - - // process packs by index_file - let p = match (self.index_files.is_empty(), self.stats.packs.repack > 0) { - (true, _) => { - info!("nothing to do!"); - ProgressOptions::no_progress() - } - // TODO: Use a MultiProgressBar here - (false, true) => progress_options.progress_bytes("repacking // rebuilding index..."), - (false, false) => progress_options.progress_spinner("rebuilding index..."), - }; - - p.set_length(self.stats.size.sum().repack - self.stats.size.sum().repackrm); - - let mut indexes_remove = Vec::new(); - let tree_packs_remove = Arc::new(Mutex::new(Vec::new())); - let data_packs_remove = Arc::new(Mutex::new(Vec::new())); - - let delete_pack = |pack: PrunePack| { - // delete pack - match pack.blob_type { - BlobType::Data => data_packs_remove.lock().unwrap().push(pack.id), - BlobType::Tree => tree_packs_remove.lock().unwrap().push(pack.id), - } - }; - - let used_ids = Arc::new(Mutex::new(self.used_ids)); - - let packs: Vec<_> = self - .index_files - .into_iter() - .map(|index| { - indexes_remove.push(index.id); - index - }) - .flat_map(|index| index.packs) - .collect(); - - packs.into_par_iter().try_for_each(|pack| { - match pack.to_do { - PackToDo::Undecided => bail!("pack {} got no decicion what to do", pack.id), - PackToDo::Keep => { - // keep pack: add to new index - let pack = pack.into_index_pack(); - indexer.write().unwrap().add(pack)?; - } - PackToDo::Repack => { - // TODO: repack in parallel - for blob in &pack.blobs { - if used_ids.lock().unwrap().remove(&blob.id).is_none() { - // don't save duplicate blobs - continue; - } - - let repacker = match blob.tpe { - BlobType::Data => &data_repacker, - BlobType::Tree => &tree_repacker, - }; - if opts.fast_repack { - repacker.add_fast(&pack.id, blob)?; - } else { - repacker.add(&pack.id, blob)?; - } - p.inc(u64::from(blob.length)); - } - if opts.instant_delete { - delete_pack(pack); - } else { - // mark pack for removal - let pack = pack.into_index_pack_with_time(self.time); - indexer.write().unwrap().add_remove(pack)?; - } - } - PackToDo::MarkDelete => { - if opts.instant_delete { - delete_pack(pack); - } else { - // mark pack for removal - let pack = pack.into_index_pack_with_time(self.time); - indexer.write().unwrap().add_remove(pack)?; - } - } - PackToDo::KeepMarked => { - if opts.instant_delete { - delete_pack(pack); - } else { - // keep pack: add to new index - let pack = pack.into_index_pack(); - indexer.write().unwrap().add_remove(pack)?; - } - } - PackToDo::Recover => { - // recover pack: add to new index in section packs - let pack = pack.into_index_pack_with_time(self.time); - indexer.write().unwrap().add(pack)?; - } - PackToDo::Delete => delete_pack(pack), - } - Ok(()) - })?; - _ = tree_repacker.finalize()?; - _ = data_repacker.finalize()?; - indexer.write().unwrap().finalize()?; - p.finish(); - - // remove old index files first as they may reference pack files which are removed soon. - if !indexes_remove.is_empty() { - let p = progress_options.progress_counter("removing old index files..."); - be.delete_list(FileType::Index, true, indexes_remove.iter(), p)?; - } - - // get variable out of Arc> - let data_packs_remove = data_packs_remove.lock().unwrap(); - if !data_packs_remove.is_empty() { - let p = progress_options.progress_counter("removing old data packs..."); - be.delete_list(FileType::Pack, false, data_packs_remove.iter(), p)?; - } - - // get variable out of Arc> - let tree_packs_remove = tree_packs_remove.lock().unwrap(); - if !tree_packs_remove.is_empty() { - let p = progress_options.progress_counter("removing old tree packs..."); - be.delete_list(FileType::Pack, true, tree_packs_remove.iter(), p)?; - } - - Ok(()) - } -} - -#[derive(PartialEq, Eq, Clone, Copy)] -struct PackInfo { - blob_type: BlobType, - used_blobs: u16, - unused_blobs: u16, - used_size: u32, - unused_size: u32, -} - -impl PartialOrd for PackInfo { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -impl Ord for PackInfo { - fn cmp(&self, other: &Self) -> Ordering { - // first order by blob type such that tree packs are picked first - self.blob_type.cmp(&other.blob_type).then( - // then order such that packs with highest - // ratio unused/used space are picked first. - // This is equivalent to ordering by unused / total space. - (u64::from(other.unused_size) * u64::from(self.used_size)) - .cmp(&(u64::from(self.unused_size) * u64::from(other.used_size))), - ) - } -} - -impl PackInfo { - fn from_pack(pack: &PrunePack, used_ids: &mut HashMap) -> Self { - let mut pi = Self { - blob_type: pack.blob_type, - used_blobs: 0, - unused_blobs: 0, - used_size: 0, - unused_size: 0, - }; - - // We search all blobs in the pack for needed ones. We do this by already marking - // and decreasing the used blob counter for the processed blobs. If the counter - // was decreased to 0, the blob and therefore the pack is actually used. - // Note that by this processing, we are also able to handle duplicate blobs within a pack - // correctly. - // If we found a needed blob, we stop and process the information that the pack is actually needed. - let first_needed = pack.blobs.iter().position(|blob| { - match used_ids.get_mut(&blob.id) { - None | Some(0) => { - pi.unused_size += blob.length; - pi.unused_blobs += 1; - } - Some(count) => { - // decrease counter - *count -= 1; - if *count == 0 { - // blob is actually needed - pi.used_size += blob.length; - pi.used_blobs += 1; - return true; // break the search - } - // blob is not needed - pi.unused_size += blob.length; - pi.unused_blobs += 1; - } - } - false // continue with next blob - }); - - if let Some(first_needed) = first_needed { - // The pack is actually needed. - // We reprocess the blobs up to the first needed one and mark all blobs which are genarally needed as used. - for blob in &pack.blobs[..first_needed] { - match used_ids.get_mut(&blob.id) { - None | Some(0) => {} // already correctly marked - Some(count) => { - // remark blob as used - pi.unused_size -= blob.length; - pi.unused_blobs -= 1; - pi.used_size += blob.length; - pi.used_blobs += 1; - *count = 0; // count = 0 indicates to other packs that the blob is not needed anymore. - } - } - } - // Then we process the remaining blobs and mark all blobs which are generally needed as used in this blob - for blob in &pack.blobs[first_needed + 1..] { - match used_ids.get_mut(&blob.id) { - None | Some(0) => { - pi.unused_size += blob.length; - pi.unused_blobs += 1; - } - Some(count) => { - // blob is used in this pack - pi.used_size += blob.length; - pi.used_blobs += 1; - *count = 0; // count = 0 indicates to other packs that the blob is not needed anymore. - } - } - } - } - - pi - } -} - -// find used blobs in repo -fn find_used_blobs( - index: &(impl IndexedBackend + Unpin), - ignore_snaps: &[Id], - progress_options: &ProgressOptions, -) -> Result> { - let ignore_snaps: HashSet<_> = ignore_snaps.iter().collect(); - - let p = progress_options.progress_counter("reading snapshots..."); - let list = index - .be() - .list(FileType::Snapshot)? - .into_iter() - .filter(|id| !ignore_snaps.contains(id)) - .collect(); - let snap_trees: Vec<_> = index - .be() - .stream_list::(list, &p)? - .into_iter() - .map_ok(|(_, snap)| snap.tree) - .try_collect()?; - p.finish(); - - let mut ids: HashMap<_, _> = snap_trees.iter().map(|id| (*id, 0)).collect(); - let p = progress_options.progress_counter("finding used blobs..."); - - let mut tree_streamer = TreeStreamerOnce::new(index.clone(), snap_trees, p)?; - while let Some(item) = tree_streamer.next().transpose()? { - let (_, tree) = item; - for node in tree.nodes { - match node.node_type { - NodeType::File => { - ids.extend(node.content.iter().flatten().map(|id| (*id, 0))); - } - NodeType::Dir => { - _ = ids.insert(node.subtree.unwrap(), 0); - } - _ => {} // nothing to do - } - } - } - - Ok(ids) + "unindexed: {:>10} packs, ?? blobs, {:>10}", + stats.packs_unref, + bytes_size_to_string(stats.size_unref) + ); + } + + println!( + "total prune: {:>10} blobs, {:>10}", + blob_stat.repackrm + blob_stat.remove, + bytes_size_to_string(size_stat.repackrm + size_stat.remove + stats.size_unref) + ); + println!( + "remaining: {:>10} blobs, {:>10}", + blob_stat.total_after_prune(), + bytes_size_to_string(size_stat.total_after_prune()) + ); + println!( + "unused size after prune: {:>10} ({:.2}% of remaining size)", + bytes_size_to_string(size_stat.unused_after_prune()), + size_stat.unused_after_prune() as f64 / size_stat.total_after_prune() as f64 * 100.0 + ); + + println!(); + + println!( + "packs marked for deletion: {:>10}, {:>10}", + stats.packs_to_delete.total(), + bytes_size_to_string(stats.size_to_delete.total()), + ); + println!( + " - complete deletion: {:>10}, {:>10}", + stats.packs_to_delete.remove, + bytes_size_to_string(stats.size_to_delete.remove), + ); + println!( + " - keep marked: {:>10}, {:>10}", + stats.packs_to_delete.keep, + bytes_size_to_string(stats.size_to_delete.keep), + ); + println!( + " - recover: {:>10}, {:>10}", + stats.packs_to_delete.recover, + bytes_size_to_string(stats.size_to_delete.recover), + ); + + debug!( + "index files to rebuild: {} / {}", + stats.index_files_rebuild, stats.index_files + ); } diff --git a/src/commands/repair.rs b/src/commands/repair.rs index 30604ac4f..5ebfa85e5 100644 --- a/src/commands/repair.rs +++ b/src/commands/repair.rs @@ -19,8 +19,6 @@ use rustic_core::{ ProgressBars, ReadBackend, ReadIndex, SnapshotFile, StringList, Tree, WriteBackend, }; -use crate::helpers::warm_up_wait; - /// `repair` subcommand #[derive(clap::Parser, Command, Debug)] pub(crate) struct RepairCmd { @@ -185,12 +183,7 @@ impl IndexSubCmd { // process packs which are listed but not contained in the index pack_read_header.extend(packs.into_iter().map(|(id, size)| (id, false, None, size))); - warm_up_wait( - &repo, - pack_read_header.iter().map(|(id, _, _, _)| *id), - true, - progress_options, - )?; + repo.warm_up_wait(pack_read_header.iter().map(|(id, _, _, _)| *id))?; let indexer = Indexer::new(be.clone()).into_shared(); let p = progress_options.progress_counter("reading pack headers"); diff --git a/src/commands/repoinfo.rs b/src/commands/repoinfo.rs index a204d5c74..18d715ec5 100644 --- a/src/commands/repoinfo.rs +++ b/src/commands/repoinfo.rs @@ -3,23 +3,31 @@ /// App-local prelude includes `app_reader()`/`app_writer()`/`app_config()` /// accessors along with logging macros. Customize as you see fit. use crate::{ - commands::{get_repository, open_repository}, - helpers::bytes_size_to_string, - status_err, Application, RUSTIC_APP, + commands::get_repository, helpers::bytes_size_to_string, status_err, Application, RUSTIC_APP, }; use abscissa_core::{Command, Runnable, Shutdown}; +use serde::Serialize; -use crate::helpers::{print_file_info, table_right_from}; +use crate::helpers::table_right_from; use anyhow::Result; -use rustic_core::{ - BlobType, BlobTypeMap, DecryptReadBackend, IndexEntry, IndexFile, Progress, ProgressBars, - RepoInfo, Sum, -}; +use rustic_core::{IndexInfos, RepoFileInfo, RepoFileInfos}; /// `repoinfo` subcommand #[derive(clap::Parser, Command, Debug)] -pub(crate) struct RepoInfoCmd; +pub(crate) struct RepoInfoCmd { + /// Only scan repository files (doesn't need repository password) + #[clap(long)] + only_files: bool, + + /// Only scan index + #[clap(long)] + only_index: bool, + + /// Show infos in json format + #[clap(long)] + json: bool, +} impl Runnable for RepoInfoCmd { fn run(&self) { @@ -30,107 +38,142 @@ impl Runnable for RepoInfoCmd { } } +#[serde_with::apply(Option => #[serde(default, skip_serializing_if = "Option::is_none")])] +#[derive(Serialize)] +struct Infos { + files: Option, + index: Option, +} + impl RepoInfoCmd { fn inner_run(&self) -> Result<()> { let config = RUSTIC_APP.config(); - let repo = open_repository(get_repository(&config)); - - print_file_info("repository files", &repo.be)?; + let repo = get_repository(&config); + + let infos = Infos { + files: (!self.only_index).then(|| repo.infos_files()).transpose()?, + index: (!self.only_files) + .then(|| -> Result<_> { + let repo = repo.open()?; + let info_index = repo.infos_index()?; + Ok(info_index) + }) + .transpose()?, + }; - if let Some(hot_be) = &repo.be_hot { - print_file_info("hot repository files", hot_be)?; + if self.json { + let mut stdout = std::io::stdout(); + serde_json::to_writer_pretty(&mut stdout, &infos)?; + return Ok(()); } - let mut info = BlobTypeMap::::default(); - info[BlobType::Tree].min_pack_size = u64::MAX; - info[BlobType::Data].min_pack_size = u64::MAX; - let mut info_delete = BlobTypeMap::::default(); - - let p = config - .global - .progress_options - .progress_counter("scanning index..."); - repo.dbe - .stream_all::(&p)? - .into_iter() - .for_each(|index| { - let index = match index { - Ok(it) => it, - Err(err) => { - status_err!("{}", err); - RUSTIC_APP.shutdown(Shutdown::Crash); - } - } - .1; - for pack in &index.packs { - info[pack.blob_type()].add_pack(pack); - - for blob in &pack.blobs { - let ie = IndexEntry::from_index_blob(blob, pack.id); - info[pack.blob_type()].add(ie); - } - } - - for pack in &index.packs_to_delete { - for blob in &pack.blobs { - let ie = IndexEntry::from_index_blob(blob, pack.id); - info_delete[pack.blob_type()].add(ie); - } - } - }); - p.finish(); - - let mut table = table_right_from( - 1, - ["Blob type", "Count", "Total Size", "Total Size in Packs"], - ); - - for (blob_type, info) in &info { - _ = table.add_row([ - format!("{blob_type:?}"), - info.count.to_string(), - bytes_size_to_string(info.data_size), - bytes_size_to_string(info.size), - ]); + if let Some(file_info) = infos.files { + print_file_info("repository files", file_info.repo); + if let Some(info) = file_info.repo_hot { + print_file_info("hot repository files", info); + } } - for (blob_type, info_delete) in &info_delete { - if info_delete.count > 0 { - _ = table.add_row([ - format!("{blob_type:?} to delete"), - info_delete.count.to_string(), - bytes_size_to_string(info_delete.data_size), - bytes_size_to_string(info_delete.size), - ]); - } + if let Some(index_info) = infos.index { + print_index_info(index_info); } - let total = info.sum() + info_delete.sum(); + Ok(()) + } +} + +pub fn print_file_info(text: &str, info: Vec) { + let mut table = table_right_from(1, ["File type", "Count", "Total Size"]); + let mut total_count = 0; + let mut total_size = 0; + for row in info { _ = table.add_row([ - "Total".to_string(), - total.count.to_string(), - bytes_size_to_string(total.data_size), - bytes_size_to_string(total.size), + format!("{:?}", row.tpe), + row.count.to_string(), + bytes_size_to_string(row.size), ]); + total_count += row.count; + total_size += row.size; + } + println!("{text}"); + _ = table.add_row([ + "Total".to_string(), + total_count.to_string(), + bytes_size_to_string(total_size), + ]); + + println!(); + println!("{table}"); + println!(); +} - println!(); - println!("{table}"); +pub fn print_index_info(index_info: IndexInfos) { + let mut table = table_right_from( + 1, + ["Blob type", "Count", "Total Size", "Total Size in Packs"], + ); - let mut table = table_right_from( - 1, - ["Blob type", "Pack Count", "Minimum Size", "Maximum Size"], - ); + let mut total_count = 0; + let mut total_data_size = 0; + let mut total_size = 0; - for (blob_type, info) in info { + for blobs in &index_info.blobs { + _ = table.add_row([ + format!("{:?}", blobs.blob_type), + blobs.count.to_string(), + bytes_size_to_string(blobs.data_size), + bytes_size_to_string(blobs.size), + ]); + total_count += blobs.count; + total_data_size += blobs.data_size; + total_size += blobs.size; + } + for blobs in &index_info.blobs_delete { + if blobs.count > 0 { _ = table.add_row([ - format!("{blob_type:?} packs"), - info.pack_count.to_string(), - bytes_size_to_string(info.min_pack_size), - bytes_size_to_string(info.max_pack_size), + format!("{:?} to delete", blobs.blob_type), + blobs.count.to_string(), + bytes_size_to_string(blobs.data_size), + bytes_size_to_string(blobs.size), ]); + total_count += blobs.count; + total_data_size += blobs.data_size; + total_size += blobs.size; } - println!(); - println!("{table}"); + } - Ok(()) + _ = table.add_row([ + "Total".to_string(), + total_count.to_string(), + bytes_size_to_string(total_data_size), + bytes_size_to_string(total_size), + ]); + + println!(); + println!("{table}"); + + let mut table = table_right_from( + 1, + ["Blob type", "Pack Count", "Minimum Size", "Maximum Size"], + ); + + for packs in index_info.packs { + _ = table.add_row([ + format!("{:?} packs", packs.blob_type), + packs.count.to_string(), + packs.min_size.map_or("-".to_string(), bytes_size_to_string), + packs.max_size.map_or("-".to_string(), bytes_size_to_string), + ]); + } + for packs in index_info.packs_delete { + if packs.count > 0 { + _ = table.add_row([ + format!("{:?} packs to delete", packs.blob_type), + packs.count.to_string(), + packs.min_size.map_or("-".to_string(), bytes_size_to_string), + packs.max_size.map_or("-".to_string(), bytes_size_to_string), + ]); + } } + println!(); + println!("{table}"); } diff --git a/src/commands/restore.rs b/src/commands/restore.rs index e93ed4364..568771b86 100644 --- a/src/commands/restore.rs +++ b/src/commands/restore.rs @@ -33,7 +33,7 @@ use rustic_core::{ TreeStreamerOptions, }; -use crate::{filtering::SnapshotFilter, helpers::warm_up_wait}; +use crate::filtering::SnapshotFilter; pub(crate) mod constants { pub(crate) const MAX_READER_THREADS_NUM: usize = 20; @@ -134,16 +134,11 @@ impl RestoreCmd { if file_infos.restore_size == 0 { info!("all file contents are fine."); + } else if config.global.dry_run { + repo.warm_up(file_infos.to_packs().into_iter())?; } else { - warm_up_wait( - &repo, - file_infos.to_packs().into_iter(), - !config.global.dry_run, - progress_options, - )?; - if !config.global.dry_run { - restore_contents(be, &dest, file_infos)?; - } + repo.warm_up_wait(file_infos.to_packs().into_iter())?; + restore_contents(be, &dest, file_infos)?; } if !config.global.dry_run { diff --git a/src/commands/snapshots.rs b/src/commands/snapshots.rs index 485d5b0af..f1c3aed83 100644 --- a/src/commands/snapshots.rs +++ b/src/commands/snapshots.rs @@ -3,20 +3,18 @@ /// App-local prelude includes `app_reader()`/`app_writer()`/`app_config()` /// accessors along with logging macros. Customize as you see fit. use crate::{ - commands::{get_repository, open_repository}, + commands::get_repository, helpers::{bold_cell, bytes_size_to_string, table, table_right_from}, status_err, Application, RUSTIC_APP, }; +use abscissa_core::{Command, Runnable, Shutdown}; +use anyhow::Result; use comfy_table::Cell; use humantime::format_duration; - -use abscissa_core::{Command, Runnable, Shutdown}; - use itertools::Itertools; -use rustic_core::DeleteOption; -use rustic_core::{ProgressBars, SnapshotFile, SnapshotGroup, SnapshotGroupCriterion}; +use rustic_core::{DeleteOption, SnapshotFile, SnapshotGroupCriterion}; /// `snapshot` subcommand #[derive(clap::Parser, Command, Debug)] @@ -56,42 +54,13 @@ impl Runnable for SnapshotCmd { } impl SnapshotCmd { - fn inner_run(&self) -> anyhow::Result<()> { + fn inner_run(&self) -> Result<()> { let config = RUSTIC_APP.config(); + let repo = get_repository(&config).open()?; - let repo = open_repository(get_repository(&config)); - - let p = config.global.progress_options.progress_hidden(); - let groups = match &self.ids[..] { - [] => SnapshotFile::group_from_backend( - &repo.dbe, - |sn| config.snapshot_filter.matches(sn), - &self.group_by, - &p, - )?, - [id] if id == "latest" => SnapshotFile::group_from_backend( - &repo.dbe, - |sn| config.snapshot_filter.matches(sn), - &self.group_by, - &p, - )? - .into_iter() - .map(|(group, mut snaps)| { - snaps.sort_unstable(); - let last_idx = snaps.len() - 1; - snaps.swap(0, last_idx); - snaps.truncate(1); - (group, snaps) - }) - .collect::>(), - _ => { - let item = ( - SnapshotGroup::default(), - SnapshotFile::from_ids(&repo.dbe, &self.ids, &p)?, - ); - vec![item] - } - }; + let groups = repo.get_snapshot_group(&self.ids, self.group_by, |sn| { + config.snapshot_filter.matches(sn) + })?; if self.json { let mut stdout = std::io::stdout(); @@ -99,6 +68,7 @@ impl SnapshotCmd { return Ok(()); } + let mut total_count = 0; for (group, mut snapshots) in groups { if !group.is_empty() { println!("\nsnapshots for {group}"); @@ -160,7 +130,10 @@ impl SnapshotCmd { println!("{table}"); } println!("{count} snapshot(s)"); + total_count += count; } + println!(); + println!("total: {total_count} snapshot(s)"); Ok(()) } diff --git a/src/helpers.rs b/src/helpers.rs index 8101a2ce4..8375cb6e1 100644 --- a/src/helpers.rs +++ b/src/helpers.rs @@ -1,4 +1,4 @@ -use std::{collections::BTreeSet, process::Command}; +use std::collections::BTreeSet; use abscissa_core::Application; use anyhow::Result; @@ -7,95 +7,15 @@ use comfy_table::{ presets::ASCII_MARKDOWN, Attribute, Cell, CellAlignment, ContentArrangement, Table, }; -use log::{debug, info, trace, warn}; -use rayon::{ - prelude::{IntoParallelRefIterator, ParallelBridge, ParallelIterator}, - ThreadPoolBuilder, -}; +use log::{info, trace}; +use rayon::prelude::{IntoParallelRefIterator, ParallelBridge, ParallelIterator}; use rustic_core::{ - parse_command, BlobType, DecryptWriteBackend, FileType, Id, IndexBackend, IndexedBackend, - Indexer, NodeType, OpenRepository, Packer, Progress, ProgressBars, ReadBackend, ReadIndex, - RusticResult, SnapshotFile, TreeStreamerOnce, ALL_FILE_TYPES, + BlobType, DecryptWriteBackend, IndexBackend, IndexedBackend, Indexer, NodeType, OpenRepository, + Packer, Progress, ProgressBars, ReadIndex, SnapshotFile, TreeStreamerOnce, }; -use crate::{application::RUSTIC_APP, config::progress_options::ProgressOptions}; - -pub(super) mod constants { - pub(super) const MAX_READER_THREADS_NUM: usize = 20; -} - -pub(crate) fn warm_up_wait

( - repo: &OpenRepository

, - packs: impl ExactSizeIterator, - wait: bool, - progress_options: &ProgressOptions, -) -> Result<()> { - if let Some(command) = &repo.opts.warm_up_command { - warm_up_command(packs, command, progress_options)?; - } else if repo.opts.warm_up { - warm_up(&repo.be, packs, progress_options)?; - } - if wait { - if let Some(wait) = repo.opts.warm_up_wait { - let p = progress_options.progress_spinner(format!("waiting {wait}...")); - std::thread::sleep(*wait); - p.finish(); - } - } - Ok(()) -} - -pub(crate) fn warm_up_command( - packs: impl ExactSizeIterator, - command: &str, - progress_options: &ProgressOptions, -) -> Result<()> { - let p = progress_options.progress_counter("warming up packs..."); - p.set_length(packs.len() as u64); - for pack in packs { - let actual_command = command.replace("%id", &pack.to_hex()); - debug!("calling {actual_command}..."); - let commands = parse_command::<()>(&actual_command)?.1; - let status = Command::new(commands[0]).args(&commands[1..]).status()?; - if !status.success() { - warn!("warm-up command was not successful for pack {pack:?}. {status}"); - } - } - p.finish(); - Ok(()) -} - -pub(crate) fn warm_up( - be: &impl ReadBackend, - packs: impl ExactSizeIterator, - progress_options: &ProgressOptions, -) -> Result<()> { - let mut be = be.clone(); - be.set_option("retry", "false")?; - - let p = progress_options.progress_counter("warming up packs..."); - p.set_length(packs.len() as u64); - - let pool = ThreadPoolBuilder::new() - .num_threads(constants::MAX_READER_THREADS_NUM) - .build()?; - let p = &p; - let be = &be; - pool.in_place_scope(|s| { - for pack in packs { - s.spawn(move |_| { - // ignore errors as they are expected from the warm-up - _ = be.read_partial(FileType::Pack, &pack, false, 0, 1); - p.inc(1); - }); - } - }); - - p.finish(); - - Ok(()) -} +use crate::application::RUSTIC_APP; pub(crate) fn copy

( snapshots: &[SnapshotFile], @@ -274,37 +194,6 @@ pub fn table_right_from, T: ToString>(start: usize, ti table } -pub fn print_file_info(text: &str, be: &impl ReadBackend) -> RusticResult<()> { - info!("scanning files..."); - - let mut table = table_right_from(1, ["File type", "Count", "Total Size"]); - let mut total_count = 0; - let mut total_size = 0; - for tpe in ALL_FILE_TYPES { - let list = be.list_with_size(tpe)?; - let count = list.len(); - let size = list.iter().map(|f| u64::from(f.1)).sum(); - _ = table.add_row([ - format!("{tpe:?}"), - count.to_string(), - bytes_size_to_string(size), - ]); - total_count += count; - total_size += size; - } - println!("{text}"); - _ = table.add_row([ - "Total".to_string(), - total_count.to_string(), - bytes_size_to_string(total_size), - ]); - - println!(); - println!("{table}"); - println!(); - Ok(()) -} - #[must_use] pub fn bytes_size_to_string(b: u64) -> String { ByteSize(b).to_string_as(true) diff --git a/tests/backup_restore.rs b/tests/backup_restore.rs index 6ecd1d1df..fe07cb954 100644 --- a/tests/backup_restore.rs +++ b/tests/backup_restore.rs @@ -77,10 +77,10 @@ fn test_backup_and_check_passes() -> TestResult<()> { let mut output = String::new(); cmd.stdout().read_to_string(&mut output)?; - let patterns = &["1 snapshot(s)"]; + let patterns = &["total: 1 snapshot(s)"]; let matches = get_matches(patterns, output)?; - assert_eq!(matches, vec![(PatternID::must(0), 13)]); + assert_eq!(matches, vec![(PatternID::must(0), 20)]); cmd.wait()?.expect_success(); } @@ -111,10 +111,10 @@ fn test_backup_and_check_passes() -> TestResult<()> { let mut output = String::new(); cmd.stdout().read_to_string(&mut output)?; - let patterns = &["2 snapshot(s)"]; + let patterns = &["total: 2 snapshot(s)"]; let matches = get_matches(patterns, output)?; - assert_eq!(matches, vec![(PatternID::must(0), 13)]); + assert_eq!(matches, vec![(PatternID::must(0), 20)]); cmd.wait()?.expect_success(); }