Skip to content

Commit

Permalink
tar: Add an API to import an object set
Browse files Browse the repository at this point in the history
  • Loading branch information
cgwalters committed Mar 19, 2022
1 parent 15403a0 commit 276e253
Showing 1 changed file with 162 additions and 17 deletions.
179 changes: 162 additions & 17 deletions lib/src/tar/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use gio::glib;
use gio::prelude::*;
use glib::Variant;
use ostree::gio;
use std::collections::BTreeSet;
use std::collections::HashMap;
use std::convert::TryInto;
use std::io::prelude::*;
Expand Down Expand Up @@ -38,19 +39,28 @@ struct ImportStats {
symlinks: u32,
}

enum ImporterMode {
Commit(Option<String>),
ObjectSet(BTreeSet<String>),
}

/// Importer machine.
struct Importer {
pub(crate) struct Importer {
repo: ostree::Repo,
remote: Option<String>,
// Cache of xattrs, keyed by their content checksum.
xattrs: HashMap<String, glib::Variant>,
// Reusable buffer for xattrs references. It maps a file checksum (.0)
// to an xattrs checksum (.1) in the `xattrs` cache above.
next_xattrs: Option<(String, String)>,

// Reusable buffer for reads. See also https://github.com/rust-lang/rust/issues/78485
buf: Vec<u8>,

stats: ImportStats,

/// Additional state depending on whether we're importing an object set or a commit.
data: ImporterMode,
}

/// Validate size/type of a tar header for OSTree metadata object.
Expand Down Expand Up @@ -151,14 +161,30 @@ fn parse_xattrs_link_target(path: &Utf8Path) -> Result<String> {
}

impl Importer {
fn new(repo: &ostree::Repo, remote: Option<String>) -> Self {
/// Create an importer which will import an OSTree commit object.
pub(crate) fn new_for_commit(repo: &ostree::Repo, remote: Option<String>) -> Self {
Self {
repo: repo.clone(),
remote,
buf: vec![0u8; 16384],
xattrs: Default::default(),
next_xattrs: None,
stats: Default::default(),
data: ImporterMode::Commit(None),
}
}

/// Create an importer to write an "object set"; a chunk of objects which is
/// usually streamed from a separate storage system, such as an OCI container image layer.
pub(crate) fn new_for_object_set(repo: &ostree::Repo) -> Self {
Self {
repo: repo.clone(),
remote: None,
buf: vec![0u8; 16384],
xattrs: Default::default(),
next_xattrs: None,
stats: Default::default(),
data: ImporterMode::ObjectSet(Default::default()),
}
}

Expand Down Expand Up @@ -375,13 +401,35 @@ impl Importer {

match suffix {
"commit" => Err(anyhow!("Found multiple commit objects")),
"file" => self.import_content_object(entry, &checksum, cancellable),
"file" => {
self.import_content_object(entry, &checksum, cancellable)?;
// Track the objects we wrote
match &mut self.data {
ImporterMode::ObjectSet(imported) => {
if let Some(p) = imported.replace(checksum) {
anyhow::bail!("Duplicate object: {}", p);
}
}
ImporterMode::Commit(_) => {}
}
Ok(())
}
"file-xattrs" => self.process_file_xattrs(entry, checksum),
"file-xattrs-link" => self.process_file_xattrs_link(entry, checksum),
"xattrs" => self.process_xattr_ref(entry, checksum),
kind => {
let objtype = objtype_from_string(kind)
.ok_or_else(|| anyhow!("Invalid object type {}", kind))?;
match &mut self.data {
ImporterMode::ObjectSet(_) => {
anyhow::bail!(
"Found metadata object {}.{} in object set mode",
checksum,
objtype
);
}
ImporterMode::Commit(_) => {}
}
self.import_metadata(entry, &checksum, objtype)
}
}
Expand Down Expand Up @@ -539,17 +587,46 @@ impl Importer {
Ok(xattrs_checksum)
}

fn import(
mut self,
fn import_objects_impl<'a>(
&mut self,
ents: impl Iterator<Item = Result<(tar::Entry<'a, impl Read + Send + Unpin + 'a>, Utf8PathBuf)>>,
cancellable: Option<&gio::Cancellable>,
) -> Result<()> {
for entry in ents {
let (entry, path) = entry?;
if let Ok(p) = path.strip_prefix("objects/") {
self.import_object(entry, p, cancellable)?;
} else if path.strip_prefix("xattrs/").is_ok() {
self.process_split_xattrs_content(entry)?;
}
}
Ok(())
}

pub(crate) fn import_objects(
&mut self,
archive: &mut tar::Archive<impl Read + Send + Unpin>,
cancellable: Option<&gio::Cancellable>,
) -> Result<String> {
) -> Result<()> {
let ents = archive.entries()?.filter_map(|e| match e {
Ok(e) => Self::filter_entry(e).transpose(),
Err(e) => Some(Err(anyhow::Error::msg(e))),
});
self.import_objects_impl(ents, cancellable)
}

pub(crate) fn import_commit(
&mut self,
archive: &mut tar::Archive<impl Read + Send + Unpin>,
cancellable: Option<&gio::Cancellable>,
) -> Result<()> {
// This can only be invoked once
assert!(matches!(self.data, ImporterMode::Commit(None)));
// Create an iterator that skips over directories; we just care about the file names.
let mut ents = archive.entries()?.filter_map(|e| match e {
Ok(e) => Self::filter_entry(e).transpose(),
Err(e) => Some(Err(anyhow::Error::msg(e))),
});

// Read the commit object.
let (commit_ent, commit_path) = ents
.next()
Expand Down Expand Up @@ -642,18 +719,63 @@ impl Importer {
}
}
}
match &mut self.data {
ImporterMode::Commit(c) => {
c.replace(checksum);
}
ImporterMode::ObjectSet(_) => unreachable!(),
}

for entry in ents {
let (entry, path) = entry?;
self.import_objects_impl(ents, cancellable)?;

if let Ok(p) = path.strip_prefix("objects/") {
self.import_object(entry, p, cancellable)?;
} else if path.strip_prefix("xattrs/").is_ok() {
self.process_split_xattrs_content(entry)?;
}
Ok(())
}

pub(crate) fn finish_import_commit(self) -> String {
tracing::debug!("Import stats: {:?}", self.stats);
match self.data {
ImporterMode::Commit(c) => c.unwrap(),
ImporterMode::ObjectSet(_) => unreachable!(),
}
}

Ok(checksum)
pub(crate) fn default_dirmeta() -> glib::Variant {
let finfo = gio::FileInfo::new();
finfo.set_attribute_uint32("unix::uid", 0);
finfo.set_attribute_uint32("unix::gid", 0);
finfo.set_attribute_uint32("unix::mode", libc::S_IFDIR | 0o755);
// SAFETY: TODO: This is not a nullable return, fix it in ostree
ostree::create_directory_metadata(&finfo, None).unwrap()
}

pub(crate) fn finish_import_object_set(self) -> Result<String> {
let objset = match self.data {
ImporterMode::Commit(_) => unreachable!(),
ImporterMode::ObjectSet(s) => s,
};
tracing::debug!("Imported {} content objects", objset.len());
let mtree = ostree::MutableTree::new();
for checksum in objset.into_iter() {
mtree.replace_file(&checksum, &checksum)?;
}
let dirmeta = self.repo.write_metadata(
ostree::ObjectType::DirMeta,
None,
&Self::default_dirmeta(),
gio::NONE_CANCELLABLE,
)?;
mtree.set_metadata_checksum(&dirmeta.to_hex());
let tree = self.repo.write_mtree(&mtree, gio::NONE_CANCELLABLE)?;
let commit = self.repo.write_commit_with_time(
None,
None,
None,
None,
tree.downcast_ref().unwrap(),
0,
gio::NONE_CANCELLABLE,
)?;
Ok(commit.to_string())
}
}

Expand Down Expand Up @@ -689,15 +811,38 @@ pub async fn import_tar(
crate::tokio_util::spawn_blocking_cancellable_flatten(move |cancellable| {
let mut archive = tar::Archive::new(src);
let txn = repo.auto_transaction(Some(cancellable))?;
let importer = Importer::new(&repo, options.remote);
let checksum = importer.import(&mut archive, Some(cancellable))?;
let mut importer = Importer::new_for_commit(&repo, options.remote);
importer.import_commit(&mut archive, Some(cancellable))?;
let checksum = importer.finish_import_commit();
txn.commit(Some(cancellable))?;
repo.mark_commit_partial(&checksum, false)?;
Ok::<_, anyhow::Error>(checksum)
})
.await
}

/// Read the contents of a tarball and import the content objects inside.
/// Generates a synthetic commit object referencing them.
#[instrument(skip(repo, src))]
pub async fn import_tar_objects(
repo: &ostree::Repo,
src: impl tokio::io::AsyncRead + Send + Unpin + 'static,
) -> Result<String> {
let src = tokio_util::io::SyncIoBridge::new(src);
let repo = repo.clone();
// The tar code we use today is blocking, so we spawn a thread.
crate::tokio_util::spawn_blocking_cancellable_flatten(move |cancellable| {
let mut archive = tar::Archive::new(src);
let mut importer = Importer::new_for_object_set(&repo);
let txn = repo.auto_transaction(Some(cancellable))?;
importer.import_objects(&mut archive, Some(cancellable))?;
let r = importer.finish_import_object_set()?;
txn.commit(Some(cancellable))?;
Ok::<_, anyhow::Error>(r)
})
.await
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down

0 comments on commit 276e253

Please sign in to comment.