Skip to content

Commit

Permalink
refactor copy command
Browse files Browse the repository at this point in the history
  • Loading branch information
aawsome committed Jul 14, 2023
1 parent d0890d7 commit 0953e18
Show file tree
Hide file tree
Showing 6 changed files with 209 additions and 175 deletions.
1 change: 1 addition & 0 deletions crates/rustic_core/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub mod backup;
pub mod cat;
pub mod check;
pub mod config;
pub mod copy;
pub mod dump;
pub mod forget;
pub mod init;
Expand Down
134 changes: 134 additions & 0 deletions crates/rustic_core/src/commands/copy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
use std::collections::BTreeSet;

use log::trace;
use rayon::prelude::{IntoParallelRefIterator, ParallelBridge, ParallelIterator};

use crate::{
repository::{IndexedFull, IndexedIds, IndexedTree},
BlobType, DecryptWriteBackend, IndexedBackend, Indexer, NodeType, Open, Packer, ProgressBars,
ReadIndex, Repository, RusticResult, SnapshotFile, TreeStreamerOnce,
};

#[derive(Debug)]
pub struct CopySnapshot {
pub relevant: bool,
pub sn: SnapshotFile,
}

pub(crate) fn copy<'a, Q, R: IndexedFull, P: ProgressBars, S: IndexedIds>(
repo: &Repository<Q, R>,
repo_dest: &Repository<P, S>,
snapshots: impl IntoIterator<Item = &'a SnapshotFile>,
) -> RusticResult<()> {
let be_dest = repo_dest.dbe();
let pb = &repo_dest.pb;

let (snap_trees, snaps): (Vec<_>, Vec<_>) = snapshots
.into_iter()
.cloned()
.map(|sn| (sn.tree, SnapshotFile::clear_ids(sn)))
.unzip();

let index = repo.index();
let index_dest = repo_dest.index();
let indexer = Indexer::new(be_dest.clone()).into_shared();

let data_packer = Packer::new(
be_dest.clone(),
BlobType::Data,
indexer.clone(),
repo_dest.config(),
index.total_size(BlobType::Data),
)?;
let tree_packer = Packer::new(
be_dest.clone(),
BlobType::Tree,
indexer.clone(),
repo_dest.config(),
index.total_size(BlobType::Tree),
)?;

let p = pb.progress_counter("copying blobs in snapshots...");

snap_trees
.par_iter()
.try_for_each(|id| -> RusticResult<_> {
trace!("copy tree blob {id}");
if !index_dest.has_tree(id) {
let data = index.get_tree(id).unwrap().read_data(index.be())?;
tree_packer.add(data, *id)?;
}
Ok(())
})?;

let tree_streamer = TreeStreamerOnce::new(index.clone(), snap_trees, p)?;
tree_streamer
.par_bridge()
.try_for_each(|item| -> RusticResult<_> {
let (_, tree) = item?;
tree.nodes.par_iter().try_for_each(|node| {
match node.node_type {
NodeType::File => {
node.content.par_iter().flatten().try_for_each(
|id| -> RusticResult<_> {
trace!("copy data blob {id}");
if !index_dest.has_data(id) {
let data = index.get_data(id).unwrap().read_data(index.be())?;
data_packer.add(data, *id)?;
}
Ok(())
},
)?;
}

NodeType::Dir => {
let id = node.subtree.unwrap();
trace!("copy tree blob {id}");
if !index_dest.has_tree(&id) {
let data = index.get_tree(&id).unwrap().read_data(index.be())?;
tree_packer.add(data, id)?;
}
}

_ => {} // nothing to copy
}
Ok(())
})
})?;

_ = data_packer.finalize()?;
_ = tree_packer.finalize()?;
indexer.write().unwrap().finalize()?;

let p = pb.progress_counter("saving snapshots...");
be_dest.save_list(snaps.iter(), p)?;
Ok(())
}

pub(crate) fn relevant_snapshots<F, P: ProgressBars, S: Open>(
snaps: &[SnapshotFile],
dest_repo: &Repository<P, S>,
filter: F,
) -> RusticResult<Vec<CopySnapshot>>
where
F: FnMut(&SnapshotFile) -> bool,
{
let p = dest_repo
.pb
.progress_counter("finding relevant snapshots...");
// save snapshots in destination in BTreeSet, as we want to efficiently search within to filter out already existing snapshots before copying.
let snapshots_dest: BTreeSet<_> = SnapshotFile::all_from_backend(dest_repo.dbe(), filter, &p)?
.into_iter()
.collect();

let relevant = snaps
.iter()
.cloned()
.map(|sn| CopySnapshot {
relevant: !snapshots_dest.contains(&sn),
sn,
})
.collect();

Ok(relevant)
}
1 change: 1 addition & 0 deletions crates/rustic_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ pub use crate::{
backup::{BackupOpts, ParentOpts},
check::CheckOpts,
config::ConfigOpts,
copy::CopySnapshot,
forget::{ForgetGroup, ForgetGroups, ForgetSnapshot, KeepOptions},
key::KeyOpts,
prune::{PruneOpts, PrunePlan, PruneStats},
Expand Down
19 changes: 18 additions & 1 deletion crates/rustic_core/src/repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use crate::{
backup::BackupOpts,
check::CheckOpts,
config::ConfigOpts,
copy::CopySnapshot,
forget::{ForgetGroups, KeepOptions},
key::KeyOpts,
repoinfo::{IndexInfos, RepoFileInfos},
Expand Down Expand Up @@ -178,7 +179,7 @@ pub fn read_password_from_reader(file: &mut impl BufRead) -> RusticResult<String

#[derive(Debug, Clone)]
pub struct Repository<P, S> {
name: String,
pub name: String,
pub be: HotColdBackend<ChooseBackend>,
pub be_hot: Option<ChooseBackend>,
opts: RepositoryOptions,
Expand Down Expand Up @@ -514,6 +515,14 @@ impl<P: ProgressBars, S: Open> Repository<P, S> {
commands::forget::get_forget_snapshots(self, keep, group_by, filter)
}

pub fn relevant_copy_snapshots(
&self,
filter: impl FnMut(&SnapshotFile) -> bool,
snaps: &[SnapshotFile],
) -> RusticResult<Vec<CopySnapshot>> {
commands::copy::relevant_snapshots(snaps, self, filter)
}

pub fn delete_snapshots(&self, ids: &[Id]) -> RusticResult<()> {
let p = self.pb.progress_counter("removing snapshots...");
self.dbe()
Expand Down Expand Up @@ -718,4 +727,12 @@ impl<P: ProgressBars, S: IndexedFull> Repository<P, S> {
) -> RusticResult<RestoreInfos> {
opts.collect_and_prepare(self, node_streamer, dest, dry_run)
}

pub fn copy<'a, Q: ProgressBars, R: IndexedIds>(
&self,
repo_dest: &Repository<Q, R>,
snapshots: impl IntoIterator<Item = &'a SnapshotFile>,
) -> RusticResult<()> {
commands::copy::copy(self, repo_dest, snapshots)
}
}
70 changes: 55 additions & 15 deletions src/commands/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,17 @@

/// App-local prelude includes `app_reader()`/`app_writer()`/`app_config()`
/// accessors along with logging macros. Customize as you see fit.
use crate::{commands::open_repository, helpers::copy, status_err, Application, RUSTIC_APP};
use crate::{
commands::open_repository, helpers::table_with_titles, status_err, Application, RUSTIC_APP,
};
use abscissa_core::{Command, Runnable, Shutdown};
use anyhow::{bail, Result};
use log::info;
use log::{error, info};

use merge::Merge;
use serde::Deserialize;

use rustic_core::{
Id, IndexBackend, KeyOpts, Open, ProgressBars, Repository, RepositoryOptions, SnapshotFile,
};
use rustic_core::{CopySnapshot, Id, KeyOpts, Open, Repository, RepositoryOptions};

/// `copy` subcommand
#[derive(clap::Parser, Command, Debug)]
Expand Down Expand Up @@ -48,30 +48,32 @@ impl CopyCmd {
fn inner_run(&self) -> Result<()> {
let config = RUSTIC_APP.config();

let repo = open_repository(&config)?;

if config.copy.targets.is_empty() {
status_err!("no [[copy.targets]] section in config file found!");
RUSTIC_APP.shutdown(Shutdown::Crash);
}

let be = repo.dbe();
let p = config.global.progress_options.progress_hidden();
let repo = open_repository(&config)?.to_indexed()?;
let mut snapshots = if self.ids.is_empty() {
SnapshotFile::all_from_backend(be, |sn| config.snapshot_filter.matches(sn), &p)?
repo.get_matching_snapshots(|sn| config.snapshot_filter.matches(sn))?
} else {
SnapshotFile::from_ids(be, &self.ids, &p)?
repo.get_snapshots(&self.ids)?
};
// sort for nicer output
snapshots.sort_unstable();

let index = IndexBackend::new(be, &config.global.progress_options.progress_counter(""))?;

let poly = repo.config().poly()?;
for target_opt in &config.copy.targets {
let repo_dest = Repository::new(target_opt)?;

let repo_dest = if self.init && repo_dest.config_id()?.is_none() {
if config.global.dry_run {
error!(
"cannot initialize target {} in dry-run mode!",
repo_dest.name
);
continue;
}
let mut config_dest = repo.config().clone();
config_dest.id = Id::random();
let pass = repo_dest.password()?.unwrap();
Expand All @@ -80,11 +82,49 @@ impl CopyCmd {
repo_dest.open()?
};

info!("copying to target {:?}...", repo_dest); // TODO: repo_dest.name
info!("copying to target {}...", repo_dest.name);
if poly != repo_dest.config().poly()? {
bail!("cannot copy to repository with different chunker parameter (re-chunking not implemented)!");
}
copy(&snapshots, &index, &repo_dest)?;

let snaps = repo_dest.relevant_copy_snapshots(
|sn| !self.ids.is_empty() || config.snapshot_filter.matches(sn),
&snapshots,
)?;

let mut table =
table_with_titles(["ID", "Time", "Host", "Label", "Tags", "Paths", "Status"]);
for CopySnapshot { relevant, sn } in snaps.iter() {
let tags = sn.tags.formatln();
let paths = sn.paths.formatln();
let time = sn.time.format("%Y-%m-%d %H:%M:%S").to_string();
_ = table.add_row([
&sn.id.to_string(),
&time,
&sn.hostname,
&sn.label,
&tags,
&paths,
&(if *relevant { "to copy" } else { "existing" }).to_string(),
]);
}
println!("{table}");

let count = snaps.iter().filter(|sn| sn.relevant).count();
if count > 0 {
if config.global.dry_run {
info!("would have copied {count} snapshots.");
} else {
repo.copy(
&repo_dest.to_indexed_ids()?,
snaps
.iter()
.filter_map(|CopySnapshot { relevant, sn }| relevant.then_some(sn)),
)?;
}
} else {
info!("nothing to copy.");
}
}
Ok(())
}
Expand Down
Loading

0 comments on commit 0953e18

Please sign in to comment.