From 8e86f07ce65de6dd654a8f52fb7d0b528fe8c13a Mon Sep 17 00:00:00 2001 From: Alexander Weiss Date: Mon, 10 Jul 2023 17:49:24 +0200 Subject: [PATCH] refactor restore command --- crates/rustic_core/examples/restore.rs | 45 ++ crates/rustic_core/src/backend/local.rs | 2 +- crates/rustic_core/src/blob/tree.rs | 2 +- crates/rustic_core/src/commands.rs | 1 + crates/rustic_core/src/commands/restore.rs | 595 ++++++++++++++++++++ crates/rustic_core/src/error.rs | 10 + crates/rustic_core/src/file.rs | 170 ------ crates/rustic_core/src/lib.rs | 3 +- crates/rustic_core/src/repository.rs | 33 +- src/commands/restore.rs | 622 +-------------------- 10 files changed, 705 insertions(+), 778 deletions(-) create mode 100644 crates/rustic_core/examples/restore.rs create mode 100644 crates/rustic_core/src/commands/restore.rs delete mode 100644 crates/rustic_core/src/file.rs diff --git a/crates/rustic_core/examples/restore.rs b/crates/rustic_core/examples/restore.rs new file mode 100644 index 000000000..794331359 --- /dev/null +++ b/crates/rustic_core/examples/restore.rs @@ -0,0 +1,45 @@ +//! `restore` example +use rustic_core::{ + LocalDestination, Repository, RepositoryOptions, RestoreOpts, TreeStreamerOptions, +}; +use simplelog::{Config, LevelFilter, SimpleLogger}; + +fn main() { + // Display info logs + let _ = SimpleLogger::init(LevelFilter::Info, Config::default()); + + // Open repository + let repo_opts = RepositoryOptions { + repository: Some("/tmp/repo".to_string()), + password: Some("test".to_string()), + ..Default::default() + }; + + let repo = Repository::new(&repo_opts) + .unwrap() + .open() + .unwrap() + .to_indexed() + .unwrap(); + + // use latest snapshot without filtering snapshots + let node = repo.node_from_snapshot_path("latest", |_| true).unwrap(); + + // use list of the snapshot contents using no additional filtering + let recursive = true; + let streamer_opts = TreeStreamerOptions::default(); + let ls = repo.ls(&node, &streamer_opts, recursive).unwrap(); + + let destination = "./restore/"; // restore to this destination dir + let create = true; // create destination dir, if it doesn't exist + let dest = LocalDestination::new(destination, create, !node.is_dir()).unwrap(); + + let opts = RestoreOpts::default(); + let dry_run = false; + // create restore infos. Note: this also already creates needed dirs in the destination + let restore_infos = repo + .prepare_restore(&opts, ls.clone(), &dest, dry_run) + .unwrap(); + + repo.restore(restore_infos, &opts, ls, &dest).unwrap(); +} diff --git a/crates/rustic_core/src/backend/local.rs b/crates/rustic_core/src/backend/local.rs index 876e4a194..913288fe8 100644 --- a/crates/rustic_core/src/backend/local.rs +++ b/crates/rustic_core/src/backend/local.rs @@ -282,7 +282,7 @@ impl LocalDestination { Ok(Self { path, is_file }) } - fn path(&self, item: impl AsRef) -> PathBuf { + pub(crate) fn path(&self, item: impl AsRef) -> PathBuf { if self.is_file { self.path.clone() } else { diff --git a/crates/rustic_core/src/blob/tree.rs b/crates/rustic_core/src/blob/tree.rs index 47ddab465..f327fec9c 100644 --- a/crates/rustic_core/src/blob/tree.rs +++ b/crates/rustic_core/src/blob/tree.rs @@ -148,7 +148,7 @@ pub struct TreeStreamerOptions { } /// [`NodeStreamer`] recursively streams all nodes of a given tree including all subtrees in-order -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct NodeStreamer where BE: IndexedBackend, diff --git a/crates/rustic_core/src/commands.rs b/crates/rustic_core/src/commands.rs index 465212faa..96442aa8c 100644 --- a/crates/rustic_core/src/commands.rs +++ b/crates/rustic_core/src/commands.rs @@ -7,4 +7,5 @@ pub mod init; pub mod key; pub mod prune; pub mod repoinfo; +pub mod restore; pub mod snapshots; diff --git a/crates/rustic_core/src/commands/restore.rs b/crates/rustic_core/src/commands/restore.rs new file mode 100644 index 000000000..a3c838f70 --- /dev/null +++ b/crates/rustic_core/src/commands/restore.rs @@ -0,0 +1,595 @@ +//! `restore` subcommand + +use log::{debug, error, info, trace, warn}; + +use std::{ + cmp::Ordering, + collections::BTreeMap, + io::Read, + num::NonZeroU32, + path::{Path, PathBuf}, + sync::Mutex, +}; + +use chrono::{DateTime, Local, Utc}; +use ignore::{DirEntry, WalkBuilder}; +use itertools::Itertools; +use rayon::ThreadPoolBuilder; + +use crate::{ + error::CommandErrorKind, hash, repository::Indexed, DecryptReadBackend, FileType, Id, + IndexedBackend, LocalDestination, Node, NodeType, Open, Progress, ProgressBars, ReadBackend, + Repository, RusticResult, +}; + +pub(crate) mod constants { + pub(crate) const MAX_READER_THREADS_NUM: usize = 20; +} + +/// `restore` subcommand +#[allow(clippy::struct_excessive_bools)] +#[cfg_attr(feature = "clap", derive(clap::Parser))] +#[derive(Debug, Copy, Clone, Default)] +pub struct RestoreOpts { + /// Remove all files/dirs in destination which are not contained in snapshot. + /// WARNING: Use with care, maybe first try this with --dry-run? + #[cfg_attr(feature = "clap", clap(long))] + pub delete: bool, + + /// Use numeric ids instead of user/group when restoring uid/gui + #[cfg_attr(feature = "clap", clap(long))] + pub numeric_id: bool, + + /// Don't restore ownership (user/group) + #[cfg_attr(feature = "clap", clap(long, conflicts_with = "numeric_id"))] + pub no_ownership: bool, + + /// Always read and verify existing files (don't trust correct modification time and file size) + #[cfg_attr(feature = "clap", clap(long))] + pub verify_existing: bool, +} + +#[derive(Default, Debug, Clone, Copy)] +pub struct FileDirStats { + pub restore: u64, + pub unchanged: u64, + pub verified: u64, + pub modify: u64, + pub additional: u64, +} + +#[derive(Default, Debug, Clone, Copy)] +pub struct RestoreStats { + pub files: FileDirStats, + pub dirs: FileDirStats, +} + +impl RestoreOpts { + pub(crate) fn restore( + self, + file_infos: RestoreInfos, + repo: &Repository, + node_streamer: impl Iterator>, + dest: &LocalDestination, + ) -> RusticResult<()> { + repo.warm_up_wait(file_infos.to_packs().into_iter())?; + restore_contents(repo, dest, file_infos)?; + + let p = repo.pb.progress_spinner("setting metadata..."); + self.restore_metadata(node_streamer, dest)?; + p.finish(); + + Ok(()) + } + + /// collect restore information, scan existing files, create needed dirs and remove superfluous files + pub(crate) fn collect_and_prepare( + self, + repo: &Repository, + mut node_streamer: impl Iterator>, + dest: &LocalDestination, + dry_run: bool, + ) -> RusticResult { + let p = repo.pb.progress_spinner("collecting file information..."); + let dest_path = dest.path(""); + + let mut stats = RestoreStats::default(); + let mut restore_infos = RestoreInfos::default(); + let mut additional_existing = false; + let mut removed_dir = None; + + let mut process_existing = |entry: &DirEntry| -> RusticResult<_> { + if entry.depth() == 0 { + // don't process the root dir which should be existing + return Ok(()); + } + + debug!("additional {:?}", entry.path()); + if entry.file_type().unwrap().is_dir() { + stats.dirs.additional += 1; + } else { + stats.files.additional += 1; + } + match (self.delete, dry_run, entry.file_type().unwrap().is_dir()) { + (true, true, true) => { + info!("would have removed the additional dir: {:?}", entry.path()); + } + (true, true, false) => { + info!("would have removed the additional file: {:?}", entry.path()); + } + (true, false, true) => { + let path = entry.path(); + match &removed_dir { + Some(dir) if path.starts_with(dir) => {} + _ => match dest.remove_dir(path) { + Ok(()) => { + removed_dir = Some(path.to_path_buf()); + } + Err(err) => { + error!("error removing {path:?}: {err}"); + } + }, + } + } + (true, false, false) => { + if let Err(err) = dest.remove_file(entry.path()) { + error!("error removing {:?}: {err}", entry.path()); + } + } + (false, _, _) => { + additional_existing = true; + } + } + + Ok(()) + }; + + let mut process_node = |path: &PathBuf, node: &Node, exists: bool| -> RusticResult<_> { + match node.node_type { + NodeType::Dir => { + if exists { + stats.dirs.modify += 1; + trace!("existing dir {path:?}"); + } else { + stats.dirs.restore += 1; + debug!("to restore: {path:?}"); + if !dry_run { + dest.create_dir(path).map_err(|err| { + CommandErrorKind::ErrorCreating(path.to_path_buf(), Box::new(err)) + })?; + } + } + } + NodeType::File => { + // collect blobs needed for restoring + match ( + exists, + restore_infos + .add_file(dest, node, path.clone(), repo.index(), self.verify_existing) + .map_err(|err| { + CommandErrorKind::ErrorCollecting(path.to_path_buf(), Box::new(err)) + })?, + ) { + // Note that exists = false and Existing or Verified can happen if the file is changed between scanning the dir + // and calling add_file. So we don't care about exists but trust add_file here. + (_, AddFileResult::Existing) => { + stats.files.unchanged += 1; + trace!("identical file: {path:?}"); + } + (_, AddFileResult::Verified) => { + stats.files.verified += 1; + trace!("verified identical file: {path:?}"); + } + // TODO: The differentiation between files to modify and files to create could be done only by add_file + // Currently, add_file never returns Modify, but always New, so we differentiate based on exists + (true, AddFileResult::Modify) => { + stats.files.modify += 1; + debug!("to modify: {path:?}"); + } + (false, AddFileResult::Modify) => { + stats.files.restore += 1; + debug!("to restore: {path:?}"); + } + } + } + _ => {} // nothing to do for symlink, device, etc. + } + Ok(()) + }; + + let mut dst_iter = WalkBuilder::new(dest_path) + .follow_links(false) + .hidden(false) + .ignore(false) + .sort_by_file_path(Path::cmp) + .build() + .filter_map(Result::ok); // TODO: print out the ignored error + let mut next_dst = dst_iter.next(); + + let mut next_node = node_streamer.next().transpose()?; + + loop { + match (&next_dst, &next_node) { + (None, None) => break, + + (Some(dst), None) => { + process_existing(dst)?; + next_dst = dst_iter.next(); + } + (Some(dst), Some((path, node))) => match dst.path().cmp(&dest.path(path)) { + Ordering::Less => { + process_existing(dst)?; + next_dst = dst_iter.next(); + } + Ordering::Equal => { + // process existing node + if (node.is_dir() && !dst.file_type().unwrap().is_dir()) + || (node.is_file() && !dst.metadata().unwrap().is_file()) + || { + let this = &node; + matches!( + this.node_type, + NodeType::Symlink { linktarget: _ } + | NodeType::Dev { device: _ } + | NodeType::Chardev { device: _ } + | NodeType::Fifo + | NodeType::Socket + ) + } + { + // if types do not match, first remove the existing file + process_existing(dst)?; + } + process_node(path, node, true)?; + next_dst = dst_iter.next(); + next_node = node_streamer.next().transpose()?; + } + Ordering::Greater => { + process_node(path, node, false)?; + next_node = node_streamer.next().transpose()?; + } + }, + (None, Some((path, node))) => { + process_node(path, node, false)?; + next_node = node_streamer.next().transpose()?; + } + } + } + + if additional_existing { + warn!("Note: additional entries exist in destination"); + } + + restore_infos.stats = stats; + p.finish(); + + Ok(restore_infos) + } + + fn restore_metadata( + self, + mut node_streamer: impl Iterator>, + dest: &LocalDestination, + ) -> RusticResult<()> { + let mut dir_stack = Vec::new(); + while let Some((path, node)) = node_streamer.next().transpose()? { + match node.node_type { + NodeType::Dir => { + // set metadata for all non-parent paths in stack + while let Some((stackpath, _)) = dir_stack.last() { + if path.starts_with(stackpath) { + break; + } + let (path, node) = dir_stack.pop().unwrap(); + self.set_metadata(dest, &path, &node); + } + // push current path to the stack + dir_stack.push((path, node)); + } + _ => self.set_metadata(dest, &path, &node), + } + } + + // empty dir stack and set metadata + for (path, node) in dir_stack.into_iter().rev() { + self.set_metadata(dest, &path, &node); + } + + Ok(()) + } + + fn set_metadata(&self, dest: &LocalDestination, path: &PathBuf, node: &Node) { + debug!("setting metadata for {:?}", path); + dest.create_special(path, node) + .unwrap_or_else(|_| warn!("restore {:?}: creating special file failed.", path)); + match (self.no_ownership, self.numeric_id) { + (true, _) => {} + (false, true) => dest + .set_uid_gid(path, &node.meta) + .unwrap_or_else(|_| warn!("restore {:?}: setting UID/GID failed.", path)), + (false, false) => dest + .set_user_group(path, &node.meta) + .unwrap_or_else(|_| warn!("restore {:?}: setting User/Group failed.", path)), + } + dest.set_permission(path, node) + .unwrap_or_else(|_| warn!("restore {:?}: chmod failed.", path)); + dest.set_extended_attributes(path, &node.meta.extended_attributes) + .unwrap_or_else(|_| warn!("restore {:?}: setting extended attributes failed.", path)); + dest.set_times(path, &node.meta) + .unwrap_or_else(|_| warn!("restore {:?}: setting file times failed.", path)); + } +} + +/// [`restore_contents`] restores all files contents as described by `file_infos` +/// using the [`DecryptReadBackend`] `be` and writing them into the [`LocalBackend`] `dest`. +fn restore_contents( + repo: &Repository, + dest: &LocalDestination, + file_infos: RestoreInfos, +) -> RusticResult<()> { + let RestoreInfos { + names: filenames, + file_lengths, + r: restore_info, + restore_size: total_size, + .. + } = file_infos; + let filenames = &filenames; + let be = repo.dbe(); + + // first create needed empty files, as they are not created later. + for (i, size) in file_lengths.iter().enumerate() { + if *size == 0 { + let path = &filenames[i]; + dest.set_length(path, *size).map_err(|err| { + CommandErrorKind::ErrorSettingLength(path.to_path_buf(), Box::new(err)) + })?; + } + } + + let sizes = &Mutex::new(file_lengths); + + let p = repo.pb.progress_bytes("restoring file contents..."); + p.set_length(total_size); + + let blobs: Vec<_> = restore_info + .into_iter() + .map(|((pack, bl), fls)| { + let from_file = fls + .iter() + .find(|fl| fl.matches) + .map(|fl| (fl.file_idx, fl.file_start, bl.data_length())); + + let name_dests: Vec<_> = fls + .iter() + .filter(|fl| !fl.matches) + .map(|fl| (bl.clone(), fl.file_idx, fl.file_start)) + .collect(); + (pack, bl.offset, bl.length, from_file, name_dests) + }) + .coalesce(|mut x, mut y| { + if x.0 == y.0 && x.3.is_none() && y.1 == x.1 + x.2 { + x.2 += y.2; + x.4.append(&mut y.4); + Ok(x) + } else { + Err((x, y)) + } + }) + .collect(); + + let pool = ThreadPoolBuilder::new() + .num_threads(constants::MAX_READER_THREADS_NUM) + .build() + .map_err(CommandErrorKind::FromRayonError)?; + pool.in_place_scope(|s| { + for (pack, offset, length, from_file, name_dests) in blobs { + let p = &p; + + if !name_dests.is_empty() { + // TODO: error handling! + s.spawn(move |s1| { + let read_data = match &from_file { + Some((file_idx, offset_file, length_file)) => { + // read from existing file + dest.read_at(&filenames[*file_idx], *offset_file, *length_file) + .unwrap() + } + None => { + // read needed part of the pack + be.read_partial(FileType::Pack, &pack, false, offset, length) + .unwrap() + } + }; + + // save into needed files in parallel + for (bl, group) in &name_dests.into_iter().group_by(|item| item.0.clone()) { + let size = bl.data_length(); + let data = if from_file.is_some() { + read_data.clone() + } else { + let start = usize::try_from(bl.offset - offset).unwrap(); + let end = usize::try_from(bl.offset + bl.length - offset).unwrap(); + be.read_encrypted_from_partial( + &read_data[start..end], + bl.uncompressed_length, + ) + .unwrap() + }; + for (_, file_idx, start) in group { + let data = data.clone(); + s1.spawn(move |_| { + let path = &filenames[file_idx]; + // Allocate file if it is not yet allocated + let mut sizes_guard = sizes.lock().unwrap(); + let filesize = sizes_guard[file_idx]; + if filesize > 0 { + dest.set_length(path, filesize) + .map_err(|err| { + CommandErrorKind::ErrorSettingLength( + path.to_path_buf(), + Box::new(err), + ) + }) + .unwrap(); + sizes_guard[file_idx] = 0; + } + drop(sizes_guard); + dest.write_at(path, start, &data).unwrap(); + p.inc(size); + }); + } + } + }); + } + } + }); + + p.finish(); + + Ok(()) +} + +/// struct that contains information of file contents grouped by +/// 1) pack ID, +/// 2) blob within this pack +/// 3) the actual files and position of this blob within those +#[derive(Debug, Default)] +pub struct RestoreInfos { + names: Filenames, + file_lengths: Vec, + r: RestoreInfo, + pub restore_size: u64, + pub matched_size: u64, + pub stats: RestoreStats, +} + +type RestoreInfo = BTreeMap<(Id, BlobLocation), Vec>; +type Filenames = Vec; + +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] +struct BlobLocation { + offset: u32, + length: u32, + uncompressed_length: Option, +} + +impl BlobLocation { + fn data_length(&self) -> u64 { + self.uncompressed_length + .map_or( + self.length - 32, // crypto overhead + |length| length.get(), + ) + .into() + } +} + +#[derive(Debug)] +struct FileLocation { + file_idx: usize, + file_start: u64, + matches: bool, //indicates that the file exists and these contents are already correct +} + +enum AddFileResult { + Existing, + Verified, + Modify, +} + +impl RestoreInfos { + /// Add the file to [`FileInfos`] using `index` to get blob information. + fn add_file( + &mut self, + dest: &LocalDestination, + file: &Node, + name: PathBuf, + index: &impl IndexedBackend, + ignore_mtime: bool, + ) -> RusticResult { + let mut open_file = dest.get_matching_file(&name, file.meta.size); + + // Empty files which exists with correct size should always return Ok(Existsing)! + if file.meta.size == 0 { + if let Some(meta) = open_file.as_ref().map(|f| f.metadata()).transpose()? { + if meta.len() == 0 { + // Empty file exists + return Ok(AddFileResult::Existing); + } + } + } + + if !ignore_mtime { + if let Some(meta) = open_file.as_ref().map(|f| f.metadata()).transpose()? { + // TODO: This is the same logic as in backend/ignore.rs => consollidate! + let mtime = meta + .modified() + .ok() + .map(|t| DateTime::::from(t).with_timezone(&Local)); + if meta.len() == file.meta.size && mtime == file.meta.mtime { + // File exists with fitting mtime => we suspect this file is ok! + debug!("file {name:?} exists with suitable size and mtime, accepting it!"); + self.matched_size += file.meta.size; + return Ok(AddFileResult::Existing); + } + } + } + + let file_idx = self.names.len(); + self.names.push(name); + let mut file_pos = 0; + let mut has_unmatched = false; + for id in file.content.iter().flatten() { + let ie = index + .get_data(id) + .ok_or_else(|| CommandErrorKind::IdNotFound(*id))?; + let bl = BlobLocation { + offset: ie.offset, + length: ie.length, + uncompressed_length: ie.uncompressed_length, + }; + let length = bl.data_length(); + + let matches = open_file.as_mut().map_or(false, |file| { + // Existing file content; check if SHA256 matches + let mut vec = vec![0; length as usize]; + file.read_exact(&mut vec).is_ok() && id == &hash(&vec) + }); + + let blob_location = self.r.entry((ie.pack, bl)).or_insert_with(Vec::new); + blob_location.push(FileLocation { + file_idx, + file_start: file_pos, + matches, + }); + + if matches { + self.matched_size += length; + } else { + self.restore_size += length; + has_unmatched = true; + } + + file_pos += length; + } + + self.file_lengths.push(file_pos); + + if !has_unmatched && open_file.is_some() { + Ok(AddFileResult::Verified) + } else { + Ok(AddFileResult::Modify) + } + } + + pub fn to_packs(&self) -> Vec { + self.r + .iter() + // filter out packs which we need + .filter(|(_, fls)| fls.iter().all(|fl| !fl.matches)) + .map(|((pack, _), _)| *pack) + .dedup() + .collect() + } +} diff --git a/crates/rustic_core/src/error.rs b/crates/rustic_core/src/error.rs index c8a51ba5c..4b055c660 100644 --- a/crates/rustic_core/src/error.rs +++ b/crates/rustic_core/src/error.rs @@ -179,6 +179,16 @@ pub enum CommandErrorKind { MinPackSizeTolerateWrong, /// max_packsize_tolerate_percent must be >= 100 or 0" MaxPackSizeTolerateWrong, + /// error creating {0:?}: {1:?} + ErrorCreating(PathBuf, Box), + /// error collecting information for {0:?}: {1:?} + ErrorCollecting(PathBuf, Box), + /// error setting length for {0:?}: {1:?} + ErrorSettingLength(PathBuf, Box), + /// did not find id {0} in index + IdNotFound(Id), + /// {0:?} + FromRayonError(#[from] rayon::ThreadPoolBuildError), } /// [`CryptoErrorKind`] describes the errors that can happen while dealing with Cryptographic functions diff --git a/crates/rustic_core/src/file.rs b/crates/rustic_core/src/file.rs deleted file mode 100644 index e4f609b9a..000000000 --- a/crates/rustic_core/src/file.rs +++ /dev/null @@ -1,170 +0,0 @@ -use chrono::{DateTime, Local, Utc}; -use log::debug; -use std::{ - collections::HashMap, - io::Read, - path::{Path, PathBuf}, -}; - -use crate::{ - backend::{local::LocalDestination, node::Node}, - blob::BlobLocation, - crypto::hasher::hash, - error::{FileErrorKind, RusticResult}, - id::Id, - index::IndexedBackend, -}; - -type RestoreInfo = HashMap>>; -type Filenames = Vec; - -#[derive(Debug, Clone, Copy)] -pub enum AddFileResult { - Existing, - Verified, - New(u64), - Modify(u64), -} - -#[derive(Default, Debug, Clone, Copy)] -pub struct FileStats { - pub restore: u64, - pub unchanged: u64, - pub verified: u64, - pub modify: u64, - pub additional: u64, -} - -#[derive(Default, Debug, Clone, Copy)] -pub struct RestoreStats { - pub file: FileStats, - pub dir: FileStats, -} - -/// struct that contains information of file contents grouped by -/// 1) pack ID, -/// 2) blob within this pack -/// 3) the actual files and position of this blob within those -#[derive(Debug, Default)] -pub struct FileInfos { - pub names: Filenames, - pub r: RestoreInfo, - pub restore_size: u64, - pub matched_size: u64, -} - -#[derive(Debug, Default, Clone, Copy)] -pub struct FileLocation { - pub file_idx: usize, - pub file_start: u64, - pub matches: bool, //indicates that the file exists and these contents are already correct -} - -impl FileInfos { - #[must_use] - pub fn new() -> Self { - Self { - names: Vec::new(), - r: HashMap::new(), - restore_size: 0, - matched_size: 0, - } - } - - /// Add the file to [`FileInfos`] using `index` to get blob information. - /// Returns the computed length of the file - pub fn add_file

( - &mut self, - dest: &LocalDestination, - file: &Node, - name: P, - index: &impl IndexedBackend, - ignore_mtime: bool, - ) -> RusticResult - where - P: Into + AsRef + std::fmt::Debug, - { - let mut open_file = dest.get_matching_file(&name, file.meta.size); - - if !ignore_mtime { - if let Some(meta) = open_file - .as_ref() - .map(std::fs::File::metadata) - .transpose() - .map_err(FileErrorKind::TransposingOptionResultFailed)? - { - // TODO: This is the same logic as in backend/ignore.rs => consollidate! - let mtime = meta - .modified() - .ok() - .map(|t| DateTime::::from(t).with_timezone(&Local)); - if meta.len() == file.meta.size && mtime == file.meta.mtime { - // File exists with fitting mtime => we suspect this file is ok! - debug!("file {name:?} exists with suitable size and mtime, accepting it!"); - self.matched_size += file.meta.size; - return Ok(AddFileResult::Existing); - } - } - } - - let file_idx = self.names.len(); - self.names.push(name.into()); - let mut file_pos = 0; - let mut has_unmatched = false; - for id in file.content.iter().flatten() { - let ie = index - .get_data(id) - .ok_or_else(|| FileErrorKind::CouldNotFindIdInIndex(*id))?; - let bl = BlobLocation { - offset: ie.offset, - length: ie.length, - uncompressed_length: ie.uncompressed_length, - }; - let length = bl.data_length(); - - let matches = match &mut open_file { - Some(file) => { - // Existing file content; check if SHA256 matches - let try_length = usize::try_from(length) - .map_err(FileErrorKind::ConversionFromU64ToUsizeFailed)?; - let mut vec = vec![0; try_length]; - file.read_exact(&mut vec).is_ok() && id == &hash(&vec) - } - None => false, - }; - - let pack = self.r.entry(ie.pack).or_insert_with(HashMap::new); - let blob_location = pack.entry(bl).or_insert_with(Vec::new); - blob_location.push(FileLocation { - file_idx, - file_start: file_pos, - matches, - }); - - if matches { - self.matched_size += length; - } else { - self.restore_size += length; - has_unmatched = true; - } - - file_pos += length; - } - - match (has_unmatched, open_file.is_some()) { - (true, true) => Ok(AddFileResult::Modify(file_pos)), - (false, true) => Ok(AddFileResult::Verified), - (_, false) => Ok(AddFileResult::New(file_pos)), - } - } - - #[must_use] - pub fn to_packs(&self) -> Vec { - self.r - .iter() - // filter out packs which we need - .filter(|(_, blob)| blob.iter().any(|(_, fls)| fls.iter().all(|fl| !fl.matches))) - .map(|(pack, _)| *pack) - .collect() - } -} diff --git a/crates/rustic_core/src/lib.rs b/crates/rustic_core/src/lib.rs index 2724acb57..2b232b288 100644 --- a/crates/rustic_core/src/lib.rs +++ b/crates/rustic_core/src/lib.rs @@ -93,7 +93,6 @@ pub(crate) mod chunker; pub(crate) mod commands; pub(crate) mod crypto; pub(crate) mod error; -pub(crate) mod file; pub(crate) mod id; pub(crate) mod index; pub(crate) mod progress; @@ -127,10 +126,10 @@ pub use crate::{ key::KeyOpts, prune::{PruneOpts, PrunePlan, PruneStats}, repoinfo::{BlobInfo, IndexInfos, PackInfo, RepoFileInfo, RepoFileInfos}, + restore::{FileDirStats, RestoreOpts}, }, crypto::{aespoly1305::Key, hasher::hash}, error::{RusticError, RusticResult}, - file::{AddFileResult, FileInfos, RestoreStats}, id::Id, index::{ binarysorted::{IndexCollector, IndexType}, diff --git a/crates/rustic_core/src/repository.rs b/crates/rustic_core/src/repository.rs index 8a6102a5f..374cf77e8 100644 --- a/crates/rustic_core/src/repository.rs +++ b/crates/rustic_core/src/repository.rs @@ -34,14 +34,15 @@ use crate::{ forget::{ForgetGroups, KeepOptions}, key::KeyOpts, repoinfo::{IndexInfos, RepoFileInfos}, + restore::{RestoreInfos, RestoreOpts}, }, crypto::aespoly1305::Key, error::{KeyFileErrorKind, RepositoryErrorKind, RusticErrorKind}, repofile::RepoFile, repofile::{configfile::ConfigFile, keyfile::find_key_in_backend}, - BlobType, DecryptFullBackend, Id, IndexBackend, IndexedBackend, NoProgressBars, Node, - NodeStreamer, ProgressBars, PruneOpts, PrunePlan, RusticResult, SnapshotFile, SnapshotGroup, - SnapshotGroupCriterion, Tree, TreeStreamerOptions, + BlobType, DecryptFullBackend, Id, IndexBackend, IndexedBackend, LocalDestination, + NoProgressBars, Node, NodeStreamer, ProgressBars, PruneOpts, PrunePlan, RusticResult, + SnapshotFile, SnapshotGroup, SnapshotGroupCriterion, Tree, TreeStreamerOptions, }; mod warm_up; @@ -644,7 +645,31 @@ impl Repository { node: &Node, streamer_opts: &TreeStreamerOptions, recursive: bool, - ) -> RusticResult>> { + ) -> RusticResult> + Clone> { NodeStreamer::new_with_glob(self.index().clone(), node, streamer_opts, recursive) } + + /// Prepare the restore. + /// If `dry_run` is set to false, it will also: + /// - remove existing files from the destination, if `opts.delete` is set to true + /// - create all dirs for the restore + pub fn prepare_restore( + &self, + opts: &RestoreOpts, + node_streamer: impl Iterator>, + dest: &LocalDestination, + dry_run: bool, + ) -> RusticResult { + opts.collect_and_prepare(self, node_streamer, dest, dry_run) + } + + pub fn restore( + &self, + restore_infos: RestoreInfos, + opts: &RestoreOpts, + node_streamer: impl Iterator>, + dest: &LocalDestination, + ) -> RusticResult<()> { + opts.restore(restore_infos, self, node_streamer, dest) + } } diff --git a/src/commands/restore.rs b/src/commands/restore.rs index 067d67263..1dd4c3c0f 100644 --- a/src/commands/restore.rs +++ b/src/commands/restore.rs @@ -6,37 +6,14 @@ use crate::{ commands::open_repository, helpers::bytes_size_to_string, status_err, Application, RUSTIC_APP, }; -use log::{debug, error, info, trace, warn}; - use abscissa_core::{Command, Runnable, Shutdown}; +use anyhow::Result; +use log::info; -use std::{ - cmp::Ordering, - collections::BTreeMap, - io::Read, - num::NonZeroU32, - path::{Path, PathBuf}, - sync::Mutex, -}; - -use anyhow::{anyhow, Context, Result}; -use chrono::{DateTime, Local, Utc}; -use ignore::{DirEntry, WalkBuilder}; -use itertools::Itertools; -use rayon::ThreadPoolBuilder; - -use rustic_core::{ - hash, DecryptReadBackend, FileType, Id, IndexBackend, IndexedBackend, LocalDestination, Node, - NodeStreamer, NodeType, Open, Progress, ProgressBars, RestoreStats, SnapshotFile, Tree, - TreeStreamerOptions, -}; +use rustic_core::{LocalDestination, RestoreOpts, TreeStreamerOptions}; use crate::filtering::SnapshotFilter; -pub(crate) mod constants { - pub(crate) const MAX_READER_THREADS_NUM: usize = 20; -} - /// `restore` subcommand #[allow(clippy::struct_excessive_bools)] #[derive(clap::Parser, Command, Debug)] @@ -49,22 +26,8 @@ pub(crate) struct RestoreCmd { #[clap(value_name = "DESTINATION")] dest: String, - /// Remove all files/dirs in destination which are not contained in snapshot. - /// WARNING: Use with care, maybe first try this with --dry-run? - #[clap(long)] - delete: bool, - - /// Use numeric ids instead of user/group when restoring uid/gui - #[clap(long)] - numeric_id: bool, - - /// Don't restore ownership (user/group) - #[clap(long, conflicts_with = "numeric_id")] - no_ownership: bool, - - /// Always read and verify existing files (don't trust correct modification time and file size) - #[clap(long)] - verify_existing: bool, + #[clap(flatten)] + opts: RestoreOpts, #[clap(flatten)] streamer_opts: TreeStreamerOptions, @@ -87,590 +50,49 @@ impl Runnable for RestoreCmd { impl RestoreCmd { fn inner_run(&self) -> Result<()> { let config = RUSTIC_APP.config(); - let progress_options = &config.global.progress_options; - let repo = open_repository(&config)?; - let be = repo.dbe(); - - let (id, path) = self.snap.split_once(':').unwrap_or((&self.snap, "")); - let snap = SnapshotFile::from_str( - be, - id, - |sn| config.snapshot_filter.matches(sn), - &progress_options.progress_counter(""), - )?; + let dry_run = config.global.dry_run; + let repo = open_repository(&config)?.to_indexed()?; - let index = IndexBackend::new(be, &progress_options.progress_counter(""))?; - let node = Tree::node_from_path(&index, snap.tree, Path::new(path))?; + let node = + repo.node_from_snapshot_path(&self.snap, |sn| config.snapshot_filter.matches(sn))?; + let ls = repo.ls(&node, &self.streamer_opts, true)?; let dest = LocalDestination::new(&self.dest, true, !node.is_dir())?; - let p = progress_options.progress_spinner("collecting file information..."); - let (file_infos, stats) = self.allocate_and_collect(&dest, &index, &node)?; - p.finish(); + let restore_infos = repo.prepare_restore(&self.opts, ls.clone(), &dest, dry_run)?; - let fs = stats.file; + let fs = restore_infos.stats.files; println!( "Files: {} to restore, {} unchanged, {} verified, {} to modify, {} additional", fs.restore, fs.unchanged, fs.verified, fs.modify, fs.additional ); - let ds = stats.dir; + let ds = restore_infos.stats.dirs; println!( "Dirs: {} to restore, {} to modify, {} additional", - ds.restore, fs.modify, ds.additional + ds.restore, ds.modify, ds.additional ); info!( "total restore size: {}", - bytes_size_to_string(file_infos.restore_size) + bytes_size_to_string(restore_infos.restore_size) ); - if file_infos.matched_size > 0 { + if restore_infos.matched_size > 0 { info!( "using {} of existing file contents.", - bytes_size_to_string(file_infos.matched_size) + bytes_size_to_string(restore_infos.matched_size) ); } - - if file_infos.restore_size == 0 { + if restore_infos.restore_size == 0 { info!("all file contents are fine."); - } else if config.global.dry_run { - repo.warm_up(file_infos.to_packs().into_iter())?; - } else { - repo.warm_up_wait(file_infos.to_packs().into_iter())?; - restore_contents(be, &dest, file_infos)?; } - if !config.global.dry_run { - let p = progress_options.progress_spinner("setting metadata..."); - self.restore_metadata(&dest, index, &node)?; - p.finish(); + if dry_run { + repo.warm_up(restore_infos.to_packs().into_iter())?; + } else { + repo.restore(restore_infos, &self.opts, ls, &dest)?; println!("restore done."); } Ok(()) } } - -impl RestoreCmd { - /// collect restore information, scan existing files and allocate non-existing files - fn allocate_and_collect( - &self, - dest: &LocalDestination, - index: &I, - node: &Node, - ) -> Result<(FileInfos, RestoreStats)> { - let config = RUSTIC_APP.config(); - let dest_path = Path::new(&self.dest); - let mut stats = RestoreStats::default(); - - let mut file_infos = FileInfos::new(); - let mut additional_existing = false; - let mut removed_dir = None; - - let mut process_existing = |entry: &DirEntry| -> Result<_> { - if entry.depth() == 0 { - // don't process the root dir which should be existing - return Ok(()); - } - - debug!("additional {:?}", entry.path()); - if entry.file_type().unwrap().is_dir() { - stats.dir.additional += 1; - } else { - stats.file.additional += 1; - } - match ( - self.delete, - config.global.dry_run, - entry.file_type().unwrap().is_dir(), - ) { - (true, true, true) => { - info!("would have removed the additional dir: {:?}", entry.path()); - } - (true, true, false) => { - info!("would have removed the additional file: {:?}", entry.path()); - } - (true, false, true) => { - let path = entry.path(); - match &removed_dir { - Some(dir) if path.starts_with(dir) => {} - _ => match dest.remove_dir(path) { - Ok(()) => { - removed_dir = Some(path.to_path_buf()); - } - Err(err) => { - error!("error removing {path:?}: {err}"); - } - }, - } - } - (true, false, false) => { - if let Err(err) = dest.remove_file(entry.path()) { - error!("error removing {:?}: {err}", entry.path()); - } - } - (false, _, _) => { - additional_existing = true; - } - } - - Ok(()) - }; - - let mut process_node = |path: &PathBuf, node: &Node, exists: bool| -> Result<_> { - match node.node_type { - NodeType::Dir => { - if exists { - stats.dir.modify += 1; - trace!("existing dir {path:?}"); - } else { - stats.dir.restore += 1; - debug!("to restore: {path:?}"); - if !config.global.dry_run { - dest.create_dir(path) - .with_context(|| format!("error creating {path:?}"))?; - } - } - } - NodeType::File => { - // collect blobs needed for restoring - match ( - exists, - file_infos - .add_file(dest, node, path.clone(), index, self.verify_existing) - .with_context(|| { - format!("error collecting information for {path:?}") - })?, - ) { - // Note that exists = false and Existing or Verified can happen if the file is changed between scanning the dir - // and calling add_file. So we don't care about exists but trust add_file here. - (_, AddFileResult::Existing) => { - stats.file.unchanged += 1; - trace!("identical file: {path:?}"); - } - (_, AddFileResult::Verified) => { - stats.file.verified += 1; - trace!("verified identical file: {path:?}"); - } - // TODO: The differentiation between files to modify and files to create could be done only by add_file - // Currently, add_file never returns Modify, but always New, so we differentiate based on exists - (true, AddFileResult::Modify) => { - stats.file.modify += 1; - debug!("to modify: {path:?}"); - } - (false, AddFileResult::Modify) => { - stats.file.restore += 1; - debug!("to restore: {path:?}"); - } - } - } - _ => {} // nothing to do for symlink, device, etc. - } - Ok(()) - }; - - let mut dst_iter = WalkBuilder::new(dest_path) - .follow_links(false) - .hidden(false) - .ignore(false) - .sort_by_file_path(Path::cmp) - .build() - .filter_map(Result::ok); // TODO: print out the ignored error - let mut next_dst = dst_iter.next(); - - let mut node_streamer = - NodeStreamer::new_with_glob(index.clone(), node, &self.streamer_opts.clone(), true)?; - let mut next_node = node_streamer.next().transpose()?; - - loop { - match (&next_dst, &next_node) { - (None, None) => break, - - (Some(dst), None) => { - process_existing(dst)?; - next_dst = dst_iter.next(); - } - (Some(dst), Some((path, node))) => match dst.path().cmp(&dest_path.join(path)) { - Ordering::Less => { - process_existing(dst)?; - next_dst = dst_iter.next(); - } - Ordering::Equal => { - // process existing node - if (node.is_dir() && !dst.file_type().unwrap().is_dir()) - || (node.is_file() && !dst.metadata().unwrap().is_file()) - || { - let this = &node; - matches!( - this.node_type, - NodeType::Symlink { linktarget: _ } - | NodeType::Dev { device: _ } - | NodeType::Chardev { device: _ } - | NodeType::Fifo - | NodeType::Socket - ) - } - { - // if types do not match, first remove the existing file - process_existing(dst)?; - } - process_node(path, node, true)?; - next_dst = dst_iter.next(); - next_node = node_streamer.next().transpose()?; - } - Ordering::Greater => { - process_node(path, node, false)?; - next_node = node_streamer.next().transpose()?; - } - }, - (None, Some((path, node))) => { - process_node(path, node, false)?; - next_node = node_streamer.next().transpose()?; - } - } - } - - if additional_existing { - warn!("Note: additional entries exist in destination"); - } - - Ok((file_infos, stats)) - } - - fn restore_metadata( - &self, - dest: &LocalDestination, - index: impl IndexedBackend + Unpin, - node: &Node, - ) -> Result<()> { - // walk over tree in repository and compare with tree in dest - let mut node_streamer = - NodeStreamer::new_with_glob(index, node, &self.streamer_opts.clone(), true)?; - let mut dir_stack = Vec::new(); - while let Some((path, node)) = node_streamer.next().transpose()? { - match node.node_type { - NodeType::Dir => { - // set metadata for all non-parent paths in stack - while let Some((stackpath, _)) = dir_stack.last() { - if path.starts_with(stackpath) { - break; - } - let (path, node) = dir_stack.pop().unwrap(); - self.set_metadata(dest, &path, &node); - } - // push current path to the stack - dir_stack.push((path, node)); - } - _ => self.set_metadata(dest, &path, &node), - } - } - - // empty dir stack and set metadata - for (path, node) in dir_stack.into_iter().rev() { - self.set_metadata(dest, &path, &node); - } - - Ok(()) - } - - fn set_metadata(&self, dest: &LocalDestination, path: &PathBuf, node: &Node) { - debug!("setting metadata for {:?}", path); - dest.create_special(path, node) - .unwrap_or_else(|_| warn!("restore {:?}: creating special file failed.", path)); - match (self.no_ownership, self.numeric_id) { - (true, _) => {} - (false, true) => dest - .set_uid_gid(path, &node.meta) - .unwrap_or_else(|_| warn!("restore {:?}: setting UID/GID failed.", path)), - (false, false) => dest - .set_user_group(path, &node.meta) - .unwrap_or_else(|_| warn!("restore {:?}: setting User/Group failed.", path)), - } - dest.set_permission(path, node) - .unwrap_or_else(|_| warn!("restore {:?}: chmod failed.", path)); - dest.set_extended_attributes(path, &node.meta.extended_attributes) - .unwrap_or_else(|_| warn!("restore {:?}: setting extended attributes failed.", path)); - dest.set_times(path, &node.meta) - .unwrap_or_else(|_| warn!("restore {:?}: setting file times failed.", path)); - } -} - -/// [`restore_contents`] restores all files contents as described by `file_infos` -/// using the [`DecryptReadBackend`] `be` and writing them into the [`LocalBackend`] `dest`. -fn restore_contents( - be: &impl DecryptReadBackend, - dest: &LocalDestination, - file_infos: FileInfos, -) -> Result<()> { - let FileInfos { - names: filenames, - file_lengths, - r: restore_info, - restore_size: total_size, - .. - } = file_infos; - let filenames = &filenames; - - // first create needed empty files, as they are not created later. - for (i, size) in file_lengths.iter().enumerate() { - if *size == 0 { - let path = &filenames[i]; - dest.set_length(path, *size) - .with_context(|| format!("error setting length for {path:?}"))?; - } - } - - let sizes = &Mutex::new(file_lengths); - - let p = RUSTIC_APP - .config() - .global - .progress_options - .progress_bytes("restoring file contents..."); - p.set_length(total_size); - - let blobs: Vec<_> = restore_info - .into_iter() - .map(|((pack, bl), fls)| { - let from_file = fls - .iter() - .find(|fl| fl.matches) - .map(|fl| (fl.file_idx, fl.file_start, bl.data_length())); - - let name_dests: Vec<_> = fls - .iter() - .filter(|fl| !fl.matches) - .map(|fl| (bl.clone(), fl.file_idx, fl.file_start)) - .collect(); - (pack, bl.offset, bl.length, from_file, name_dests) - }) - .coalesce(|mut x, mut y| { - if x.0 == y.0 && x.3.is_none() && y.1 == x.1 + x.2 { - x.2 += y.2; - x.4.append(&mut y.4); - Ok(x) - } else { - Err((x, y)) - } - }) - .collect(); - - let pool = ThreadPoolBuilder::new() - .num_threads(constants::MAX_READER_THREADS_NUM) - .build()?; - pool.in_place_scope(|s| { - for (pack, offset, length, from_file, name_dests) in blobs { - let p = &p; - - if !name_dests.is_empty() { - // TODO: error handling! - s.spawn(move |s1| { - let read_data = match &from_file { - Some((file_idx, offset_file, length_file)) => { - // read from existing file - dest.read_at(&filenames[*file_idx], *offset_file, *length_file) - .unwrap() - } - None => { - // read needed part of the pack - be.read_partial(FileType::Pack, &pack, false, offset, length) - .unwrap() - } - }; - - // save into needed files in parallel - for (bl, group) in &name_dests.into_iter().group_by(|item| item.0.clone()) { - let size = bl.data_length(); - let data = if from_file.is_some() { - read_data.clone() - } else { - let start = usize::try_from(bl.offset - offset).unwrap(); - let end = usize::try_from(bl.offset + bl.length - offset).unwrap(); - be.read_encrypted_from_partial( - &read_data[start..end], - bl.uncompressed_length, - ) - .unwrap() - }; - for (_, file_idx, start) in group { - let data = data.clone(); - s1.spawn(move |_| { - let path = &filenames[file_idx]; - // Allocate file if it is not yet allocated - let mut sizes_guard = sizes.lock().unwrap(); - let filesize = sizes_guard[file_idx]; - if filesize > 0 { - dest.set_length(path, filesize) - .with_context(|| { - format!("error setting length for {path:?}") - }) - .unwrap(); - sizes_guard[file_idx] = 0; - } - drop(sizes_guard); - dest.write_at(path, start, &data).unwrap(); - p.inc(size); - }); - } - } - }); - } - } - }); - - p.finish(); - - Ok(()) -} - -/// struct that contains information of file contents grouped by -/// 1) pack ID, -/// 2) blob within this pack -/// 3) the actual files and position of this blob within those -#[derive(Debug)] -struct FileInfos { - names: Filenames, - file_lengths: Vec, - r: RestoreInfo, - restore_size: u64, - matched_size: u64, -} - -type RestoreInfo = BTreeMap<(Id, BlobLocation), Vec>; -type Filenames = Vec; - -#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] -struct BlobLocation { - offset: u32, - length: u32, - uncompressed_length: Option, -} - -impl BlobLocation { - fn data_length(&self) -> u64 { - self.uncompressed_length - .map_or( - self.length - 32, // crypto overhead - |length| length.get(), - ) - .into() - } -} - -#[derive(Debug)] -struct FileLocation { - file_idx: usize, - file_start: u64, - matches: bool, //indicates that the file exists and these contents are already correct -} - -enum AddFileResult { - Existing, - Verified, - Modify, -} - -impl FileInfos { - fn new() -> Self { - Self { - names: Vec::new(), - file_lengths: Vec::new(), - r: BTreeMap::new(), - restore_size: 0, - matched_size: 0, - } - } - - /// Add the file to [`FileInfos`] using `index` to get blob information. - fn add_file( - &mut self, - dest: &LocalDestination, - file: &Node, - name: PathBuf, - index: &impl IndexedBackend, - ignore_mtime: bool, - ) -> Result { - let mut open_file = dest.get_matching_file(&name, file.meta.size); - - // Empty files which exists with correct size should always return Ok(Existsing)! - if file.meta.size == 0 { - if let Some(meta) = open_file.as_ref().map(|f| f.metadata()).transpose()? { - if meta.len() == 0 { - // Empty file exists - return Ok(AddFileResult::Existing); - } - } - } - - if !ignore_mtime { - if let Some(meta) = open_file.as_ref().map(|f| f.metadata()).transpose()? { - // TODO: This is the same logic as in backend/ignore.rs => consollidate! - let mtime = meta - .modified() - .ok() - .map(|t| DateTime::::from(t).with_timezone(&Local)); - if meta.len() == file.meta.size && mtime == file.meta.mtime { - // File exists with fitting mtime => we suspect this file is ok! - debug!("file {name:?} exists with suitable size and mtime, accepting it!"); - self.matched_size += file.meta.size; - return Ok(AddFileResult::Existing); - } - } - } - - let file_idx = self.names.len(); - self.names.push(name); - let mut file_pos = 0; - let mut has_unmatched = false; - for id in file.content.iter().flatten() { - let ie = index - .get_data(id) - .ok_or_else(|| anyhow!("did not find id {} in index", id))?; - let bl = BlobLocation { - offset: ie.offset, - length: ie.length, - uncompressed_length: ie.uncompressed_length, - }; - let length = bl.data_length(); - - let matches = open_file.as_mut().map_or(false, |file| { - // Existing file content; check if SHA256 matches - let mut vec = vec![0; length as usize]; - file.read_exact(&mut vec).is_ok() && id == &hash(&vec) - }); - - let blob_location = self.r.entry((ie.pack, bl)).or_insert_with(Vec::new); - blob_location.push(FileLocation { - file_idx, - file_start: file_pos, - matches, - }); - - if matches { - self.matched_size += length; - } else { - self.restore_size += length; - has_unmatched = true; - } - - file_pos += length; - } - - self.file_lengths.push(file_pos); - - if !has_unmatched && open_file.is_some() { - Ok(AddFileResult::Verified) - } else { - Ok(AddFileResult::Modify) - } - } - - fn to_packs(&self) -> Vec { - self.r - .iter() - // filter out packs which we need - .filter(|(_, fls)| fls.iter().all(|fl| !fl.matches)) - .map(|((pack, _), _)| *pack) - .dedup() - .collect() - } -}