Skip to content

Commit

Permalink
feat(reflink): Separate reflink behavior into their own functions (#58)
Browse files Browse the repository at this point in the history
BREAKING CHANGE: some signatures for copy have changed, and copy no longer automatically reflinks
  • Loading branch information
zkat authored Oct 7, 2023
1 parent 8134c4d commit 270c3ee
Show file tree
Hide file tree
Showing 6 changed files with 305 additions and 45 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ futures = { version = "0.3.17", optional = true }
hex = "0.4.3"
memmap2 = { version = "0.5.8", optional = true }
miette = "5.7.0"
reflink-copy = "0.1.5"
reflink-copy = "0.1.9"
serde = "1.0.130"
serde_derive = "1.0.130"
serde_json = "1.0.68"
Expand Down
2 changes: 1 addition & 1 deletion benches/benchmarks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ fn write_hash_async_xxh3(c: &mut Criterion) {
fn create_tmpfile(tmp: &tempfile::TempDir, buf: &[u8]) -> PathBuf {
let dir = tmp.path().to_owned();
let target = dir.join("target-file");
std::fs::create_dir_all(target.parent().unwrap().clone()).unwrap();
std::fs::create_dir_all(&target.parent().unwrap()).unwrap();
let mut file = File::create(target.clone()).unwrap();
file.write_all(buf).unwrap();
file.flush().unwrap();
Expand Down
10 changes: 5 additions & 5 deletions src/content/linkto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ fn create_symlink(sri: Integrity, cache: &PathBuf, target: &PathBuf) -> Result<I
cpath.parent().unwrap().display()
)
})?;
if let Err(e) = symlink_file(target, cpath.clone()) {
if let Err(e) = symlink_file(target, &cpath) {
// If symlinking fails because there's *already* a file at the desired
// destination, that is ok -- all the cache should care about is that
// there is **some** valid file associated with the computed integrity.
Expand Down Expand Up @@ -187,8 +187,8 @@ mod tests {
fn create_tmpfile(tmp: &tempfile::TempDir, buf: &[u8]) -> PathBuf {
let dir = tmp.path().to_owned();
let target = dir.join("target-file");
std::fs::create_dir_all(target.parent().unwrap().clone()).unwrap();
let mut file = File::create(target.clone()).unwrap();
std::fs::create_dir_all(&target.parent().unwrap()).unwrap();
let mut file = File::create(&target).unwrap();
file.write_all(buf).unwrap();
file.flush().unwrap();
target
Expand Down Expand Up @@ -216,7 +216,7 @@ mod tests {

let cpath = path::content_path(&dir, &sri);
assert!(cpath.exists());
let metadata = std::fs::symlink_metadata(cpath.clone()).unwrap();
let metadata = std::fs::symlink_metadata(&cpath).unwrap();
let file_type = metadata.file_type();
assert!(file_type.is_symlink());
assert_eq!(std::fs::read(cpath).unwrap(), b"hello world");
Expand Down Expand Up @@ -249,7 +249,7 @@ mod tests {

let cpath = path::content_path(&dir, &sri);
assert!(cpath.exists());
let metadata = std::fs::symlink_metadata(cpath.clone()).unwrap();
let metadata = std::fs::symlink_metadata(&cpath).unwrap();
let file_type = metadata.file_type();
assert!(file_type.is_symlink());
assert_eq!(std::fs::read(cpath).unwrap(), b"hello world");
Expand Down
81 changes: 63 additions & 18 deletions src/content/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use futures::io::AsyncReadExt;
#[cfg(feature = "tokio")]
use tokio::io::AsyncReadExt;

use reflink_copy as reflink;
use ssri::{Algorithm, Integrity, IntegrityChecker};

#[cfg(any(feature = "async-std", feature = "tokio"))]
Expand Down Expand Up @@ -133,20 +132,68 @@ pub async fn read_async<'a>(cache: &'a Path, sri: &'a Integrity) -> Result<Vec<u
Ok(ret)
}

pub fn copy_unchecked(cache: &Path, sri: &Integrity, to: &Path) -> Result<()> {
pub fn reflink_unchecked(cache: &Path, sri: &Integrity, to: &Path) -> Result<()> {
let cpath = path::content_path(cache, sri);
reflink::reflink_or_copy(cpath, to).with_context(|| {
reflink_copy::reflink(cpath, to).with_context(|| {
format!(
"Failed to copy cache contents from {} to {}",
"Failed to reflink cache contents from {} to {}",
path::content_path(cache, sri).display(),
to.display()
)
})?;
Ok(())
}

pub fn reflink(cache: &Path, sri: &Integrity, to: &Path) -> Result<()> {
let mut reader = open(cache, sri.clone())?;
let mut buf: [u8; 1024] = [0; 1024];
loop {
let read = reader.read(&mut buf).with_context(|| {
format!(
"Failed to read cache contents while verifying integrity for {}",
path::content_path(cache, sri).display()
)
})?;
if read == 0 {
break;
}
}
reader.check()?;
reflink_unchecked(cache, sri, to)
}

pub async fn reflink_async(cache: &Path, sri: &Integrity, to: &Path) -> Result<()> {
let mut reader = open_async(cache, sri.clone()).await?;
let mut buf = [0u8; 1024 * 8];
loop {
let read = AsyncReadExt::read(&mut reader, &mut buf)
.await
.with_context(|| {
format!(
"Failed to read cache contents while verifying integrity for {}",
path::content_path(cache, sri).display()
)
})?;
if read == 0 {
break;
}
}
reader.check()?;
reflink_unchecked(cache, sri, to)
}

pub fn copy_unchecked(cache: &Path, sri: &Integrity, to: &Path) -> Result<u64> {
let cpath = path::content_path(cache, sri);
std::fs::copy(cpath, to).with_context(|| {
format!(
"Failed to copy cache contents from {} to {}",
path::content_path(cache, sri).display(),
to.display()
)
})
}

pub fn copy(cache: &Path, sri: &Integrity, to: &Path) -> Result<u64> {
copy_unchecked(cache, sri, to)?;
let mut reader = open(cache, sri.clone())?;
let mut buf: [u8; 1024] = [0; 1024];
let mut size = 0;
Expand All @@ -163,6 +210,7 @@ pub fn copy(cache: &Path, sri: &Integrity, to: &Path) -> Result<u64> {
}
}
reader.check()?;
copy_unchecked(cache, sri, to)?;

Ok(size as u64)
}
Expand All @@ -172,23 +220,19 @@ pub async fn copy_unchecked_async<'a>(
cache: &'a Path,
sri: &'a Integrity,
to: &'a Path,
) -> Result<()> {
) -> Result<u64> {
let cpath = path::content_path(cache, sri);
if reflink::reflink(&cpath, to).is_err() {
crate::async_lib::copy(&cpath, to).await.with_context(|| {
format!(
"Failed to copy cache contents from {} to {}",
path::content_path(cache, sri).display(),
to.display()
)
})?;
}
Ok(())
crate::async_lib::copy(&cpath, to).await.with_context(|| {
format!(
"Failed to copy cache contents from {} to {}",
path::content_path(cache, sri).display(),
to.display()
)
})
}

#[cfg(any(feature = "async-std", feature = "tokio"))]
pub async fn copy_async<'a>(cache: &'a Path, sri: &'a Integrity, to: &'a Path) -> Result<u64> {
copy_unchecked_async(cache, sri, to).await?;
let mut reader = open_async(cache, sri.clone()).await?;
let mut buf: [u8; 1024] = [0; 1024];
let mut size = 0;
Expand All @@ -207,6 +251,7 @@ pub async fn copy_async<'a>(cache: &'a Path, sri: &'a Integrity, to: &'a Path) -
}
}
reader.check()?;
copy_unchecked_async(cache, sri, to).await?;
Ok(size as u64)
}

Expand Down Expand Up @@ -243,7 +288,6 @@ pub fn hard_link(cache: &Path, sri: &Integrity, to: &Path) -> Result<()> {

#[cfg(any(feature = "async-std", feature = "tokio"))]
pub async fn hard_link_async(cache: &Path, sri: &Integrity, to: &Path) -> Result<()> {
hard_link_unchecked(cache, sri, to)?;
let mut reader = open_async(cache, sri.clone()).await?;
let mut buf = [0u8; 1024 * 8];
loop {
Expand All @@ -260,6 +304,7 @@ pub async fn hard_link_async(cache: &Path, sri: &Integrity, to: &Path) -> Result
}
}
reader.check()?;
hard_link_unchecked(cache, sri, to)?;
Ok(())
}

Expand Down
Loading

0 comments on commit 270c3ee

Please sign in to comment.