Skip to content

Commit

Permalink
restore: Download multiple contiguous blobs in one request
Browse files Browse the repository at this point in the history
  • Loading branch information
aawsome committed Jun 18, 2023
1 parent 2a7a78b commit 13e3f1b
Show file tree
Hide file tree
Showing 3 changed files with 231 additions and 51 deletions.
1 change: 1 addition & 0 deletions changelog/new.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ Bugs fixed:
New features:
- New global configuration paths are available, located at /etc/rustic/*.toml or %PROGRAMDATA%/rustic/config/*.toml, depending on your platform.
- REST backend: Now allows to use custom TLS root certificates.
- restore: The restore algorithm has been improved and should now be faster for remote repositories.
25 changes: 18 additions & 7 deletions crates/rustic_core/src/backend/decrypt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,12 @@ pub trait DecryptReadBackend: ReadBackend {

fn read_encrypted_full(&self, tpe: FileType, id: &Id) -> RusticResult<Bytes>;

fn read_encrypted_partial(
fn read_encrypted_from_partial(
&self,
tpe: FileType,
id: &Id,
cacheable: bool,
offset: u32,
length: u32,
data: &[u8],
uncompressed_length: Option<NonZeroU32>,
) -> RusticResult<Bytes> {
let mut data = self.decrypt(&self.read_partial(tpe, id, cacheable, offset, length)?)?;
let mut data = self.decrypt(data)?;
if let Some(length) = uncompressed_length {
data = decode_all(&*data)
.map_err(CryptBackendErrorKind::DecodingZstdCompressedDataFailed)?;
Expand All @@ -44,6 +40,21 @@ pub trait DecryptReadBackend: ReadBackend {
Ok(data.into())
}

fn read_encrypted_partial(
&self,
tpe: FileType,
id: &Id,
cacheable: bool,
offset: u32,
length: u32,
uncompressed_length: Option<NonZeroU32>,
) -> RusticResult<Bytes> {
self.read_encrypted_from_partial(
&self.read_partial(tpe, id, cacheable, offset, length)?,
uncompressed_length,
)
}

fn get_file<F: RepoFile>(&self, id: &Id) -> RusticResult<F> {
let data = self.read_encrypted_full(F::TYPE, id)?;
Ok(serde_json::from_slice(&data)
Expand Down
256 changes: 212 additions & 44 deletions src/commands/restore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,22 @@ use abscissa_core::{Command, Runnable, Shutdown};

use std::{
cmp::Ordering,
collections::BTreeMap,
io::Read,
num::NonZeroU32,
path::{Path, PathBuf},
};

use anyhow::{Context, Result};
use anyhow::{anyhow, Context, Result};
use chrono::{DateTime, Local, Utc};
use ignore::{DirEntry, WalkBuilder};
use itertools::Itertools;
use rayon::ThreadPoolBuilder;

use rustic_core::{
AddFileResult, DecryptReadBackend, FileInfos, FileType, IndexBackend, IndexedBackend,
LocalDestination, Node, NodeStreamer, NodeType, Progress, ProgressBars, RestoreStats,
SnapshotFile, Tree, TreeStreamerOptions,
hash, DecryptReadBackend, FileType, Id, IndexBackend, IndexedBackend, LocalDestination, Node,
NodeStreamer, NodeType, Progress, ProgressBars, RestoreStats, SnapshotFile, Tree,
TreeStreamerOptions,
};

use crate::{filtering::SnapshotFilter, helpers::warm_up_wait};
Expand Down Expand Up @@ -419,6 +424,7 @@ fn restore_contents(
restore_size: total_size,
..
} = file_infos;
let filenames = &filenames;

let p = RUSTIC_APP
.config()
Expand All @@ -427,57 +433,78 @@ fn restore_contents(
.progress_bytes("restoring file contents...");
p.set_length(total_size);

let blobs: Vec<_> = restore_info
.into_iter()
.map(|((pack, bl), fls)| {
let from_file = fls
.iter()
.find(|fl| fl.matches)
.map(|fl| (fl.file_idx, fl.file_start, bl.data_length()));

let name_dests: Vec<_> = fls
.iter()
.filter(|fl| !fl.matches)
.map(|fl| (bl.clone(), fl.file_idx, fl.file_start))
.collect();
(pack, bl.offset, bl.length, from_file, name_dests)
})
.coalesce(|mut x, mut y| {
if x.0 == y.0 && x.3.is_none() && y.1 == x.1 + x.2 {
x.2 += y.2;
x.4.append(&mut y.4);
Ok(x)
} else {
Err((x, y))
}
})
.collect();

let pool = ThreadPoolBuilder::new()
.num_threads(constants::MAX_READER_THREADS_NUM)
.build()?;
pool.in_place_scope(|s| {
for (pack, blob) in restore_info {
for (bl, fls) in blob {
let from_file = fls
.iter()
.find(|fl| fl.matches)
.map(|fl| (filenames[fl.file_idx].clone(), fl.file_start));

let name_dests: Vec<_> = fls
.iter()
.filter(|fl| !fl.matches)
.map(|fl| (filenames[fl.file_idx].clone(), fl.file_start))
.collect();
let p = &p;

if !name_dests.is_empty() {
// TODO: error handling!
s.spawn(move |s1| {
let data = match from_file {
Some((filename, start)) => {
// read from existing file
dest.read_at(filename, start, bl.data_length()).unwrap()
}
None => {
// read pack at blob_offset with length blob_length
be.read_encrypted_partial(
FileType::Pack,
&pack,
false,
bl.offset,
bl.length,
bl.uncompressed_length,
)
for (pack, offset, length, from_file, name_dests) in blobs {
let p = &p;

if !name_dests.is_empty() {
// TODO: error handling!
s.spawn(move |s1| {
let read_data = match &from_file {
Some((file_idx, offset_file, length_file)) => {
// read from existing file
dest.read_at(&filenames[*file_idx], *offset_file, *length_file)
.unwrap()
}
};
let size = bl.data_length();
}
None => {
// read needed part of the pack
be.read_partial(FileType::Pack, &pack, false, offset, length)
.unwrap()
}
};

// save into needed files in parallel
for (name, start) in name_dests {
// save into needed files in parallel
for (bl, group) in &name_dests.into_iter().group_by(|item| item.0.clone()) {
let size = bl.data_length();
let data = if from_file.is_some() {
read_data.clone()
} else {
let start = usize::try_from(bl.offset - offset).unwrap();
let end = usize::try_from(bl.offset + bl.length - offset).unwrap();
be.read_encrypted_from_partial(
&read_data[start..end],
bl.uncompressed_length,
)
.unwrap()
};
for (_, file_idx, start) in group {
let data = data.clone();
s1.spawn(move |_| {
dest.write_at(&name, start, &data).unwrap();
dest.write_at(&filenames[file_idx], start, &data).unwrap();
p.inc(size);
});
}
});
}
}
});
}
}
});
Expand All @@ -486,3 +513,144 @@ fn restore_contents(

Ok(())
}

/// struct that contains information of file contents grouped by
/// 1) pack ID,
/// 2) blob within this pack
/// 3) the actual files and position of this blob within those
#[derive(Debug)]
struct FileInfos {
names: Filenames,
r: RestoreInfo,
restore_size: u64,
matched_size: u64,
}

type RestoreInfo = BTreeMap<(Id, BlobLocation), Vec<FileLocation>>;
type Filenames = Vec<PathBuf>;

#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
struct BlobLocation {
offset: u32,
length: u32,
uncompressed_length: Option<NonZeroU32>,
}

impl BlobLocation {
fn data_length(&self) -> u64 {
self.uncompressed_length
.map_or(
self.length - 32, // crypto overhead
|length| length.get(),
)
.into()
}
}

#[derive(Debug)]
struct FileLocation {
file_idx: usize,
file_start: u64,
matches: bool, //indicates that the file exists and these contents are already correct
}

enum AddFileResult {
Existing,
Verified,
New(u64),
Modify(u64),
}

impl FileInfos {
fn new() -> Self {
Self {
names: Vec::new(),
r: BTreeMap::new(),
restore_size: 0,
matched_size: 0,
}
}

/// Add the file to [`FileInfos`] using `index` to get blob information.
/// Returns the computed length of the file
fn add_file(
&mut self,
dest: &LocalDestination,
file: &Node,
name: PathBuf,
index: &impl IndexedBackend,
ignore_mtime: bool,
) -> Result<AddFileResult> {
let mut open_file = dest.get_matching_file(&name, file.meta.size);

if !ignore_mtime {
if let Some(meta) = open_file.as_ref().map(|f| f.metadata()).transpose()? {
// TODO: This is the same logic as in backend/ignore.rs => consollidate!
let mtime = meta
.modified()
.ok()
.map(|t| DateTime::<Utc>::from(t).with_timezone(&Local));
if meta.len() == file.meta.size && mtime == file.meta.mtime {
// File exists with fitting mtime => we suspect this file is ok!
debug!("file {name:?} exists with suitable size and mtime, accepting it!");
self.matched_size += file.meta.size;
return Ok(AddFileResult::Existing);
}
}
}

let file_idx = self.names.len();
self.names.push(name);
let mut file_pos = 0;
let mut has_unmatched = false;
for id in file.content.iter().flatten() {
let ie = index
.get_data(id)
.ok_or_else(|| anyhow!("did not find id {} in index", id))?;
let bl = BlobLocation {
offset: ie.offset,
length: ie.length,
uncompressed_length: ie.uncompressed_length,
};
let length = bl.data_length();

let matches = open_file.as_mut().map_or(false, |file| {
// Existing file content; check if SHA256 matches
let mut vec = vec![0; length as usize];
file.read_exact(&mut vec).is_ok() && id == &hash(&vec)
});

let blob_location = self.r.entry((ie.pack, bl)).or_insert_with(Vec::new);
blob_location.push(FileLocation {
file_idx,
file_start: file_pos,
matches,
});

if matches {
self.matched_size += length;
} else {
self.restore_size += length;
has_unmatched = true;
}

file_pos += length;
}

match (has_unmatched, open_file.is_some()) {
(true, true) => Ok(AddFileResult::Modify(file_pos)),
(false, true) => Ok(AddFileResult::Verified),
(_, false) => Ok(AddFileResult::New(file_pos)),
}
}

fn to_packs(&self) -> Vec<Id> {
self.r
.iter()
// filter out packs which we need
.filter(|(_, fls)| fls.iter().all(|fl| !fl.matches))
.map(|((pack, _), _)| *pack)
.dedup()
.collect()
}
}

0 comments on commit 13e3f1b

Please sign in to comment.