Skip to content

Commit

Permalink
repo: clean up stream-creation operations
Browse files Browse the repository at this point in the history
Creating a stream is a lot higher-level now: you ask the repo for a
SplitStreamWriter, write to it, then store it back into the repo.

Add a "by-sha256" concept as outlined in #3.  This makes importing
already-import OCI layers instantaneous, even if they were originally
imported by a different user, by a different piece of software, or under
a different reference name.

Closes #3
  • Loading branch information
allisonkarlitskaya committed Oct 11, 2024
1 parent db83d53 commit 361fd6a
Show file tree
Hide file tree
Showing 5 changed files with 192 additions and 44 deletions.
18 changes: 15 additions & 3 deletions src/bin/cfsctl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ use anyhow::Result;
use clap::{Parser, Subcommand};

use composefs_experiments::{
fsverity::{
FsVerityHashValue,
Sha256HashValue,
},
oci,
repository::Repository,
};
Expand All @@ -27,6 +31,8 @@ enum OciCommand {
/// Stores a tar file as a splitstream in the repository.
ImportLayer {
name: String,
#[clap(long)]
sha256: Option<String>,
},
/// Lists the contents of a tar stream
LsLayer {
Expand Down Expand Up @@ -97,9 +103,15 @@ fn main() -> Result<()> {
println!("{}", hex::encode(image_id));
},
Command::Oci{ cmd: oci_cmd } => match oci_cmd {
OciCommand::ImportLayer { name } => {
let stream_id = oci::import_layer(&repo, &name, &mut std::io::stdin())?;
println!("{}", hex::encode(stream_id));
OciCommand::ImportLayer { name, sha256 } => {
if let Some(digest) = sha256 {
let mut value = Sha256HashValue::EMPTY;
hex::decode_to_slice(digest, &mut value)?;
oci::import_layer_by_sha256(&repo, &name, &mut std::io::stdin(), value)?;
} else {
let stream_id = oci::import_layer(&repo, &name, &mut std::io::stdin())?;
println!("{}", hex::encode(stream_id));
}
},
OciCommand::LsLayer { name } => {
oci::ls_layer(&repo, &name)?;
Expand Down
26 changes: 14 additions & 12 deletions src/oci/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,24 @@ use anyhow::Result;

use crate::{
fsverity::Sha256HashValue,
repository::Repository
repository::Repository,
};

pub fn import_layer<R: Read>(repo: &Repository, name: &str, tar_stream: &mut R) -> Result<Sha256HashValue> {
let mut split_stream = zstd::stream::write::Encoder::new(vec![], 0)?;

tar::split(
tar_stream,
&mut split_stream,
|data: &[u8]| -> Result<Sha256HashValue> {
repo.ensure_object(data)
}
)?;
let mut writer = repo.create_stream(None);
tar::split(tar_stream, &mut writer)?;
repo.store_stream(writer, name)
}

let object_id = repo.ensure_object(&split_stream.finish()?)?;
repo.link_ref(name, "streams", object_id)
pub fn import_layer_by_sha256<R: Read>(
repo: &Repository,
name: &str,
tar_stream: &mut R,
sha256: Sha256HashValue
) -> Result<()> {
repo.store_stream_by_sha256(name, sha256, |writer| {
tar::split(tar_stream, writer)
})
}

pub fn ls_layer(repo: &Repository, name: &str) -> Result<()> {
Expand Down
20 changes: 5 additions & 15 deletions src/oci/tar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,7 @@ use std::{
Path,
PathBuf,
},
io::{
Read,
Write,
},
io::Read,
};

use anyhow::{
Expand All @@ -39,7 +36,6 @@ use tar::{
};

use crate::{
fsverity::Sha256HashValue,
splitstream::{
SplitStreamData,
SplitStreamReader,
Expand All @@ -60,13 +56,10 @@ fn read_header<R: Read>(reader: &mut R) -> Result<Option<Header>> {
/// Splits the tar file from tar_stream into a Split Stream. The store_data function is
/// responsible for ensuring that "external data" is in the composefs repository and returns the
/// fsverity hash value of that data.
pub fn split<R: Read, W: Write, F: FnMut(&[u8]) -> Result<Sha256HashValue>>(
pub fn split<R: Read>(
tar_stream: &mut R,
split_stream: &mut W,
mut store_data: F,
writer: &mut SplitStreamWriter,
) -> Result<()> {
let mut writer = SplitStreamWriter::new(split_stream);

while let Some(header) = read_header(tar_stream)? {
// the header always gets stored as inline data
writer.write_inline(header.as_bytes());
Expand All @@ -84,16 +77,13 @@ pub fn split<R: Read, W: Write, F: FnMut(&[u8]) -> Result<Sha256HashValue>>(
if header.entry_type() == EntryType::Regular && storage_size > 0 {
// non-empty regular file: store the data in the object store
let padding = buffer.split_off(actual_size);
let reference = store_data(&buffer)?;
writer.write_reference(reference, padding)?;
writer.write_external(&buffer, padding)?;
} else {
// else: store the data inline in the split stream
writer.write_inline(&buffer);
}
}

// flush out any remaining inline data
writer.done()
Ok(())
}

fn path_from_tar(pax: Option<Vec<u8>>, gnu: Vec<u8>, short: &[u8]) -> PathBuf {
Expand Down
115 changes: 112 additions & 3 deletions src/repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ use crate::{
splitstream::{
splitstream_merge,
splitstream_objects,
SplitStreamWriter,
},
util::proc_self_fd,
};
Expand Down Expand Up @@ -112,6 +113,14 @@ impl Repository {
}
}

pub fn exists(&self, name: &str) -> Result<bool> {
match accessat(&self.repository, name, Access::READ_OK, AtFlags::empty()) {
Ok(()) => Ok(true),
Err(ref e) if e.kind() == ErrorKind::NotFound => Ok(false),
Err(e) => Err(e)?
}
}

pub fn ensure_object(&self, data: &[u8]) -> Result<Sha256HashValue> {
let digest = FsVerityHasher::hash(data);
let dir = PathBuf::from(format!("objects/{:02x}", digest[0]));
Expand Down Expand Up @@ -148,7 +157,7 @@ impl Repository {
}

pub fn open_with_verity(&self, filename: &str, expected_verity: Sha256HashValue) -> Result<OwnedFd> {
let fd = openat(&self.repository, filename, OFlags::RDONLY, Mode::empty())?;
let fd = self.openat(filename, OFlags::RDONLY)?;
let measured_verity: Sha256HashValue = fs_ioc_measure_verity(&fd)?;
if measured_verity != expected_verity {
bail!("bad verity!")
Expand All @@ -157,21 +166,100 @@ impl Repository {
}
}

/// Performs a lookup of a by-sha256 reference in the given category
/// If such a reference exists, this returns the underlying object ID.
pub fn find_by_sha256(&self, category: &str, sha256: Sha256HashValue) -> Result<Option<Sha256HashValue>> {
let filename = format!("{}/by-sha256/{}", category, hex::encode(sha256));
match readlinkat(&self.repository, &filename, []) {
Err(ref e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
Err(e) => Err(e)?,
Ok(linkpath) => {
let mut hash = Sha256HashValue::EMPTY;
let linkbytes = linkpath.to_bytes();
if linkbytes.len() != 67 || &linkbytes[0..3] != b"../" {
bail!("Incorrectly formatted symlink {}/{}", self.path, filename);
}
hex::decode_to_slice(&linkpath.to_bytes()[3..], &mut hash).
with_context(|| format!("Incorrectly formatted symlink {}/{}", self.path, filename))?;
Ok(Some(hash))
}
}
}

/// Creates a SplitStreamWriter for writing a split stream.
/// You should write the data to the returned object and then pass it to .store_stream() to
/// store the result.
pub fn create_stream(&self, sha256: Option<Sha256HashValue>) -> SplitStreamWriter {
SplitStreamWriter::new(self, sha256)
}

/// Consumes the SplitStreamWriter, stores the splitstream in the object store (if it's not
/// already present), and links the named reference to it.
///
///
///
/// This is an error if the reference already exists.
///
/// In any case, the object ID (by fs-verity digest) is returned.
pub fn store_stream(&self, writer: SplitStreamWriter, name: &str) -> Result<Sha256HashValue> {
let object_id = writer.done()?;

let object_path = format!("objects/{:02x}/{}", object_id[0], hex::encode(&object_id[1..]));
let stream_path = format!("streams/{}", hex::encode(&object_id));
let reference_path = format!("streams/refs/{name}");

self.ensure_symlink(&stream_path, &object_path)?;
self.symlink(&reference_path, &stream_path)?;
Ok(object_id)
}

/// A convenience function to check if a stream with the given SHA256 digest already exists.
///
/// If such a stream exists, then this function simply creates a new named reference to the
/// stream and returns the underlying object ID.
///
/// If not, the user's callback is called with a SplitStreamWriter which should be populated
/// with the data for the stream. After the callback returns, we write the stream to disk and
/// link the named reference to it, returning the underlying object ID.
///
/// It is an error if the named reference already exists.
pub fn store_stream_by_sha256<F: FnOnce(&mut SplitStreamWriter) -> Result<()>>(
&self, name: &str, sha256: Sha256HashValue, callback: F,
) -> Result<()> {
let by_sha256_path = format!("streams/by-sha256/{}", hex::encode(sha256));

if !self.exists(&by_sha256_path)? {
let mut writer = self.create_stream(Some(sha256));
callback(&mut writer)?;
let object_id = writer.done()?;

let object_path = format!("objects/{:02x}/{}", object_id[0], hex::encode(&object_id[1..]));
let stream_path = format!("streams/{}", hex::encode(object_id));
self.ensure_symlink(&stream_path, &object_path)?;
self.ensure_symlink(&by_sha256_path, &stream_path)?;
}

let reference_path = format!("streams/refs/{name}");
self.symlink(&reference_path, &by_sha256_path)?;
Ok(())
}

/// category is like "streams" or "images"
/// name is like "refs/1000/user/xyz" (with '/') or a sha256 hex hash value (without '/')
fn open_in_category(&self, category: &str, name: &str) -> Result<OwnedFd> {
let filename = format!("{}/{}", category, name);

if name.contains("/") {
// no fsverity checking on this path
Ok(openat(&self.repository, filename, OFlags::RDONLY, Mode::empty())?)
self.openat(&filename, OFlags::RDONLY)
} else {
// this must surely be a hash value, and we want to verify it
let mut hash = Sha256HashValue::EMPTY;
hex::decode_to_slice(name, &mut hash)?;
self.open_with_verity(&filename, hash)
}
}

pub fn open_stream(&self, name: &str) -> Result<zstd::stream::read::Decoder<BufReader<File>>> {
let file = File::from(self.open_in_category("streams", name)?);
Ok(zstd::stream::read::Decoder::new(file)?)
Expand Down Expand Up @@ -222,7 +310,7 @@ impl Repository {
Ok(object_id)
}

fn symlink<P: AsRef<Path>>(&self, name: P, target: &str) -> Result<()> {
pub fn symlink<P: AsRef<Path>>(&self, name: P, target: &str) -> Result<()> {
let name = name.as_ref();
let parent = name.parent()
.expect("make_link() called for file directly in repo top-level");
Expand All @@ -237,6 +325,27 @@ impl Repository {
Ok(symlinkat(target_path, &self.repository, name)?)
}

// TODO: more DRY with the above function plz
pub fn ensure_symlink<P: AsRef<Path>>(&self, name: P, target: &str) -> Result<()> {
let name = name.as_ref();
let parent = name.parent()
.expect("make_link() called for file directly in repo top-level");
self.ensure_dir(parent)?;

let mut target_path = PathBuf::new();
for _ in parent.iter() {
target_path.push("..");
}
target_path.push(target);

match symlinkat(target_path, &self.repository, name) {
Ok(()) => Ok(()),
// NB: we assume that the link is the same
Err(ref e) if e.kind() == ErrorKind::AlreadyExists => Ok(()),
Err(e) => Err(e)?
}
}

fn read_symlink_hashvalue(dirfd: &OwnedFd, name: &CStr) -> Result<Sha256HashValue> {
let link_content = readlinkat(dirfd, name, [])?;
let link_bytes = link_content.to_bytes();
Expand Down
Loading

0 comments on commit 361fd6a

Please sign in to comment.