From 9525086366d0c61da8f93de4be8d6275f510aa96 Mon Sep 17 00:00:00 2001 From: Colin Walters Date: Mon, 13 Dec 2021 20:43:13 -0500 Subject: [PATCH] Initial "chunking" code This analyzes an ostree commit and splits it into chunks suitable for output to separate layers in an OCI image. --- lib/Cargo.toml | 4 +- lib/src/chunking.rs | 404 +++++++++++++++++++++++++++++ lib/src/cli.rs | 39 ++- lib/src/container/encapsulate.rs | 61 ++++- lib/src/container/mod.rs | 2 + lib/src/container/store.rs | 24 +- lib/src/container/unencapsulate.rs | 176 +++++++++---- lib/src/lib.rs | 2 + lib/src/tar/export.rs | 83 +++++- lib/src/tar/import.rs | 68 +++-- lib/tests/it/main.rs | 36 +-- 11 files changed, 796 insertions(+), 103 deletions(-) create mode 100644 lib/src/chunking.rs diff --git a/lib/Cargo.toml b/lib/Cargo.toml index 368b8b9b..40aa6b09 100644 --- a/lib/Cargo.toml +++ b/lib/Cargo.toml @@ -10,7 +10,7 @@ version = "0.6.0" [dependencies] anyhow = "1.0" -containers-image-proxy = "0.4.0" +containers-image-proxy = { features = ["proxy_v0_2_3"], version = "0.4.0" } async-compression = { version = "0.3", features = ["gzip", "tokio"] } bitflags = "1" @@ -55,4 +55,4 @@ features = ["dox"] [features] dox = ["ostree/dox"] internal-testing-api = [] -proxy_v0_2_3 = ["containers-image-proxy/proxy_v0_2_3"] +proxy_v0_2_3 = [] diff --git a/lib/src/chunking.rs b/lib/src/chunking.rs new file mode 100644 index 00000000..7c18b97f --- /dev/null +++ b/lib/src/chunking.rs @@ -0,0 +1,404 @@ +//! Split an OSTree commit into separate chunks + +// SPDX-License-Identifier: Apache-2.0 OR MIT + +use std::borrow::Borrow; +use std::collections::{BTreeMap, BTreeSet}; +use std::rc::Rc; + +use crate::objgv::*; +use anyhow::Result; +use camino::Utf8PathBuf; +use gvariant::aligned_bytes::TryAsAligned; +use gvariant::{Marker, Structure}; +use ostree; +use ostree::prelude::*; +use ostree::{gio, glib}; + +const FIRMWARE: &str = "/usr/lib/firmware"; +const MODULES: &str = "/usr/lib/modules"; + +const QUERYATTRS: &str = "standard::name,standard::type"; + +/// Size in bytes of the smallest chunk we will emit. +// pub(crate) const MIN_CHUNK_SIZE: u32 = 10 * 1024; +/// Maximum number of layers (chunks) we will use. +// We take half the limit of 128. +// https://github.com/ostreedev/ostree-rs-ext/issues/69 +pub(crate) const MAX_CHUNKS: u32 = 64; + +/// Size in bytes for the minimum size for chunks +#[allow(dead_code)] +pub(crate) const DEFAULT_MIN_CHUNK: usize = 10 * 1024; + +#[derive(Debug, Hash, PartialEq, Eq, PartialOrd, Ord, Clone)] +pub(crate) struct RcStr(Rc); + +impl Borrow for RcStr { + fn borrow(&self) -> &str { + &*self.0 + } +} + +impl From<&str> for RcStr { + fn from(s: &str) -> Self { + Self(Rc::from(s)) + } +} + +#[derive(Debug, Default)] +pub(crate) struct Chunk { + pub(crate) name: String, + pub(crate) content: BTreeMap)>, + pub(crate) size: u64, +} + +#[derive(Debug)] +pub(crate) enum Meta { + DirTree(RcStr), + DirMeta(RcStr), +} + +impl Meta { + pub(crate) fn objtype(&self) -> ostree::ObjectType { + match self { + Meta::DirTree(_) => ostree::ObjectType::DirTree, + Meta::DirMeta(_) => ostree::ObjectType::DirMeta, + } + } + + pub(crate) fn checksum(&self) -> &str { + match self { + Meta::DirTree(v) => &*v.0, + Meta::DirMeta(v) => &*v.0, + } + } +} + +#[derive(Debug, Default)] +pub(crate) struct Chunking { + pub(crate) metadata_size: u64, + pub(crate) commit: Box, + pub(crate) meta: Vec, + pub(crate) remainder: Chunk, + pub(crate) chunks: Vec, +} + +// pub(crate) struct ChunkConfig { +// pub(crate) min_size: u32, +// pub(crate) max_chunks: u32, +// } +// +// impl Default for ChunkConfig { +// fn default() -> Self { +// Self { +// min_size: MIN_CHUNK_SIZE, +// max_chunks: MAX_CHUNKS, +// } +// } +// } + +#[derive(Default)] +struct Generation { + path: Utf8PathBuf, + metadata_size: u64, + meta: Vec, + dirtree_found: BTreeSet, + dirmeta_found: BTreeSet, +} + +fn generate_chunking_recurse( + repo: &ostree::Repo, + gen: &mut Generation, + chunk: &mut Chunk, + dt: &glib::Variant, +) -> Result<()> { + let dt = dt.data_as_bytes(); + let dt = dt.try_as_aligned()?; + let dt = gv_dirtree!().cast(dt); + let (files, dirs) = dt.to_tuple(); + // A reusable buffer to avoid heap allocating these + let mut hexbuf = [0u8; 64]; + for file in files { + let (name, csum) = file.to_tuple(); + let fpath = gen.path.join(name.to_str()); + hex::encode_to_slice(csum, &mut hexbuf)?; + let checksum = std::str::from_utf8(&hexbuf)?; + let (_, meta, _) = repo.load_file(checksum, gio::NONE_CANCELLABLE)?; + // SAFETY: We know this API returns this value; it only has a return nullable because the + // caller can pass NULL to skip it. + let meta = meta.unwrap(); + let size = meta.size() as u64; + let entry = chunk.content.entry(RcStr::from(checksum)).or_default(); + entry.0 = size; + let first = entry.1.is_empty(); + if first { + chunk.size += size; + } + entry.1.push(fpath); + } + for item in dirs { + let (name, contents_csum, meta_csum) = item.to_tuple(); + let name = name.to_str(); + // Extend our current path + gen.path.push(name); + hex::encode_to_slice(contents_csum, &mut hexbuf)?; + let checksum_s = std::str::from_utf8(&hexbuf)?; + if !gen.dirtree_found.contains(checksum_s) { + let checksum = RcStr::from(checksum_s); + gen.dirtree_found.insert(RcStr::clone(&checksum)); + gen.meta.push(Meta::DirTree(checksum)); + let child_v = repo.load_variant(ostree::ObjectType::DirTree, checksum_s)?; + gen.metadata_size += child_v.data_as_bytes().as_ref().len() as u64; + generate_chunking_recurse(repo, gen, chunk, &child_v)?; + } + hex::encode_to_slice(meta_csum, &mut hexbuf)?; + let checksum_s = std::str::from_utf8(&hexbuf)?; + if !gen.dirtree_found.contains(checksum_s) { + let checksum = RcStr::from(checksum_s); + gen.dirmeta_found.insert(RcStr::clone(&checksum)); + let child_v = repo.load_variant(ostree::ObjectType::DirMeta, checksum_s)?; + gen.metadata_size += child_v.data_as_bytes().as_ref().len() as u64; + gen.meta.push(Meta::DirMeta(checksum)); + } + // We did a push above, so pop must succeed. + assert!(gen.path.pop()); + } + Ok(()) +} + +impl Chunk { + fn new(name: &str) -> Self { + Chunk { + name: name.to_string(), + ..Default::default() + } + } + + fn move_obj(&mut self, dest: &mut Self, checksum: &str) -> bool { + // In most cases, we expect the object to exist in the source. However, it's + // conveneient here to simply ignore objects which were already moved into + // a chunk. + if let Some((name, (size, paths))) = self.content.remove_entry(checksum) { + let v = dest.content.insert(name, (size, paths)); + debug_assert!(v.is_none()); + self.size -= size; + dest.size += size; + true + } else { + false + } + } + + // fn split(self) -> (Self, Self) { + // todo!() + // } +} + +fn find_kernel_dir( + root: &gio::File, + cancellable: Option<&gio::Cancellable>, +) -> Result> { + let moddir = root.resolve_relative_path(MODULES); + let e = moddir.enumerate_children( + "standard::name", + gio::FileQueryInfoFlags::NOFOLLOW_SYMLINKS, + cancellable, + )?; + let mut r = None; + for child in e.clone() { + let child = &child?; + let childpath = e.child(child); + if child.file_type() == gio::FileType::Directory { + if r.replace(childpath).is_some() { + anyhow::bail!("Found multiple subdirectories in {}", MODULES); + } + } + } + Ok(r) +} + +impl Chunking { + /// Generate an initial single chunk. + pub(crate) fn new(repo: &ostree::Repo, rev: &str) -> Result { + // Find the target commit + let rev = repo.resolve_rev(rev, false)?.unwrap(); + + // Load and parse the commit object + let (commit_v, _) = repo.load_commit(&rev)?; + let commit_v = commit_v.data_as_bytes(); + let commit_v = commit_v.try_as_aligned()?; + let commit = gv_commit!().cast(commit_v); + let commit = commit.to_tuple(); + + // Find the root directory tree + let contents_checksum = &hex::encode(commit.6); + let contents_v = repo.load_variant(ostree::ObjectType::DirTree, contents_checksum)?; + + // Load it all into a single chunk + let mut gen: Generation = Default::default(); + gen.path = Utf8PathBuf::from("/"); + let mut chunk: Chunk = Default::default(); + generate_chunking_recurse(repo, &mut gen, &mut chunk, &contents_v)?; + + let chunking = Chunking { + commit: Box::from(rev.as_str()), + metadata_size: gen.metadata_size, + meta: gen.meta, + remainder: chunk, + ..Default::default() + }; + Ok(chunking) + } + + fn remaining(&self) -> u32 { + MAX_CHUNKS.saturating_sub(self.chunks.len() as u32) + } + + /// Find the object named by `path` in `src`, and move it to `dest`. + fn extend_chunk( + repo: &ostree::Repo, + src: &mut Chunk, + dest: &mut Chunk, + path: &ostree::RepoFile, + ) -> Result<()> { + let cancellable = gio::NONE_CANCELLABLE; + let ft = path.query_file_type(gio::FileQueryInfoFlags::NOFOLLOW_SYMLINKS, cancellable); + if ft == gio::FileType::Directory { + let e = path.enumerate_children( + QUERYATTRS, + gio::FileQueryInfoFlags::NOFOLLOW_SYMLINKS, + cancellable, + )?; + for child in e { + let childi = child?; + let child = path.child(childi.name()); + let child = child.downcast::().unwrap(); + Self::extend_chunk(repo, src, dest, &child)?; + } + } else { + let checksum = path.checksum().unwrap(); + src.move_obj(dest, checksum.as_str()); + } + Ok(()) + } + + /// Create a new chunk from the provided filesystem paths. + pub(crate) fn chunk_paths<'a>( + &mut self, + repo: &ostree::Repo, + paths: impl IntoIterator, + name: &str, + cancellable: Option<&gio::Cancellable>, + ) -> Result<()> { + // Do nothing if we've hit our max. + if self.remaining() == 0 { + return Ok(()); + } + + let mut chunk = Chunk::new(name); + for path in paths { + if !path.query_exists(cancellable) { + continue; + } + let child = path.downcast_ref::().unwrap(); + Self::extend_chunk(repo, &mut self.remainder, &mut chunk, &child)?; + } + if !chunk.content.is_empty() { + self.chunks.push(chunk); + } + Ok(()) + } + + fn chunk_kernel_initramfs( + &mut self, + repo: &ostree::Repo, + root: &gio::File, + cancellable: Option<&gio::Cancellable>, + ) -> Result<()> { + let moddir = if let Some(m) = find_kernel_dir(root, cancellable)? { + m + } else { + return Ok(()); + }; + // The initramfs has a dependency on userspace *and* kernel, so we + // should chunk the kernel separately. + let initramfs = &moddir.resolve_relative_path("initramfs.img"); + self.chunk_paths(repo, [initramfs], "initramfs", cancellable)?; + // Gather all of the rest of the kernel as a single chunk + self.chunk_paths(repo, [&moddir], "kernel", cancellable) + } + + /// Apply built-in heuristics to automatically create chunks. + pub(crate) fn auto_chunk(&mut self, repo: &ostree::Repo) -> Result<()> { + let cancellable = gio::NONE_CANCELLABLE; + let root = &repo.read_commit(&self.commit, cancellable)?.0; + + // Grab all of linux-firmware; it's the largest thing in FCOS. + let firmware = root.resolve_relative_path(FIRMWARE); + self.chunk_paths(repo, [&firmware], "firmware", cancellable)?; + + // Kernel and initramfs + self.chunk_kernel_initramfs(repo, root, cancellable)?; + + self.large_files(20, 1)?; + + Ok(()) + } + + /// Gather large files (up to `max` chunks) as a percentage (1-99) of total size. + pub(crate) fn large_files(&mut self, max: u32, percentage: u32) -> Result<()> { + let max = max.min(self.remaining()); + if max == 0 { + return Ok(()); + } + + let mut large_objects = Vec::new(); + let total_size = self.remainder.size; + let largefile_limit = (total_size * (percentage * 100) as u64) / total_size; + for (objid, (size, _names)) in &self.remainder.content { + if *size > largefile_limit { + large_objects.push((*size, objid.clone())); + } + } + large_objects.sort_by(|a, b| a.0.cmp(&b.0)); + for (_size, objid) in large_objects.iter().rev().take(max as usize) { + let mut chunk = { + let (_size, names) = self.remainder.content.get(objid).unwrap(); + let name = &names[0]; + Chunk::new(name.as_str()) + }; + let moved = self.remainder.move_obj(&mut chunk, objid.borrow()); + // The object only exists once, so we must have moved it. + assert!(moved); + self.chunks.push(chunk); + } + Ok(()) + } + + pub(crate) fn take_chunks(&mut self) -> Vec { + let mut r = Vec::new(); + std::mem::swap(&mut self.chunks, &mut r); + r + } +} + +pub(crate) fn print(src: &Chunking) { + println!("Metadata: {}", glib::format_size(src.metadata_size)); + for (n, chunk) in src.chunks.iter().enumerate() { + let sz = glib::format_size(chunk.size); + println!( + "Chunk {}: \"{}\": objects:{} size:{}", + n, + chunk.name, + chunk.content.len(), + sz + ); + } + let sz = glib::format_size(src.remainder.size); + println!( + "Remainder: objects:{} size:{}", + src.remainder.content.len(), + sz + ); +} diff --git a/lib/src/cli.rs b/lib/src/cli.rs index 6466779f..99a5c416 100644 --- a/lib/src/cli.rs +++ b/lib/src/cli.rs @@ -112,6 +112,10 @@ enum ContainerOpts { /// Corresponds to the Dockerfile `CMD` instruction. #[structopt(long)] cmd: Option>, + + #[structopt(long, hidden = true)] + /// Output in multiple blobs + ex_chunked: bool, }, /// Commands for working with (possibly layered, non-encapsulated) container images. @@ -230,6 +234,19 @@ enum TestingOpts { DetectEnv, } +/// Experimental options +#[derive(Debug, StructOpt)] +enum ExperimentalOpts { + /// Print chunking + PrintChunks { + /// Path to the repository + #[structopt(long)] + repo: String, + /// The ostree ref or commt + rev: String, + }, +} + /// Toplevel options for extended ostree functionality. #[derive(Debug, StructOpt)] #[structopt(name = "ostree-ext")] @@ -245,6 +262,9 @@ enum Opt { #[structopt(setting(structopt::clap::AppSettings::Hidden))] #[cfg(feature = "internal-testing-api")] InternalOnlyForTesting(TestingOpts), + /// Experimental/debug CLI + #[structopt(setting(structopt::clap::AppSettings::Hidden))] + Experimental(ExperimentalOpts), } #[allow(clippy::from_over_into)] @@ -355,13 +375,17 @@ async fn container_export( imgref: &ImageReference, labels: BTreeMap, cmd: Option>, + chunked: bool, ) -> Result<()> { let repo = &ostree::Repo::open_at(libc::AT_FDCWD, repo, gio::NONE_CANCELLABLE)?; let config = Config { labels: Some(labels), cmd, }; - let opts = Some(Default::default()); + let opts = Some(crate::container::ExportOpts { + chunked, + ..Default::default() + }); let pushed = crate::container::encapsulate(repo, rev, &config, opts, imgref).await?; println!("{}", pushed); Ok(()) @@ -480,6 +504,7 @@ where imgref, labels, cmd, + ex_chunked, } => { let labels: Result> = labels .into_iter() @@ -490,7 +515,8 @@ where Ok((k.to_string(), v.to_string())) }) .collect(); - container_export(&repo, &rev, &imgref, labels?, cmd).await + + container_export(&repo, &rev, &imgref, labels?, cmd, ex_chunked).await } ContainerOpts::Image(opts) => match opts { ContainerImageOpts::List { repo } => { @@ -545,5 +571,14 @@ where Opt::ImaSign(ref opts) => ima_sign(opts), #[cfg(feature = "internal-testing-api")] Opt::InternalOnlyForTesting(ref opts) => testing(opts), + Opt::Experimental(ref opts) => match opts { + ExperimentalOpts::PrintChunks { repo, rev } => { + let repo = &ostree::Repo::open_at(libc::AT_FDCWD, &repo, gio::NONE_CANCELLABLE)?; + let mut chunks = crate::chunking::Chunking::new(repo, rev)?; + chunks.auto_chunk(repo)?; + crate::chunking::print(&chunks); + Ok(()) + } + }, } } diff --git a/lib/src/container/encapsulate.rs b/lib/src/container/encapsulate.rs index a28325ec..3312b47e 100644 --- a/lib/src/container/encapsulate.rs +++ b/lib/src/container/encapsulate.rs @@ -2,6 +2,7 @@ use super::ociwriter::OciWriter; use super::*; +use crate::chunking::Chunking; use crate::tar as ostree_tar; use anyhow::Context; use fn_error_context::context; @@ -40,6 +41,42 @@ fn export_ostree_ref( w.complete() } +/// Write an ostree commit to an OCI blob +#[context("Writing ostree root to blob")] +fn export_chunked( + repo: &ostree::Repo, + ociw: &mut OciWriter, + mut chunking: Chunking, + compression: Option, + description: &str, +) -> Result<()> { + let layers: Result> = chunking + .take_chunks() + .into_iter() + .enumerate() + .map(|(i, chunk)| -> Result<_> { + let mut w = ociw.create_layer(compression)?; + ostree_tar::export_chunk(repo, &chunk, &mut w) + .with_context(|| format!("Exporting chunk {}", i))?; + let w = w.into_inner()?; + Ok((w.complete()?, chunk.name)) + }) + .collect(); + for (layer, name) in layers? { + ociw.push_layer(layer, &name); + } + let mut w = ociw.create_layer(compression)?; + ostree_tar::export_final_chunk(repo, &chunking, &mut w)?; + let w = w.into_inner()?; + let final_layer = w.complete()?; + ociw.add_config_annotation( + crate::container::OSTREE_LAYER_LABEL, + &final_layer.blob.sha256, + ); + ociw.push_layer(final_layer, description); + Ok(()) +} + /// Generate an OCI image from a given ostree root #[context("Building oci")] fn build_oci( @@ -67,6 +104,15 @@ fn build_oci( let commit_meta = &commit_v.child_value(0); let commit_meta = glib::VariantDict::new(Some(commit_meta)); + let chunking = if opts.chunked { + // compression = Some(flate2::Compression::none()); + let mut c = crate::chunking::Chunking::new(repo, commit)?; + c.auto_chunk(repo)?; + Some(c) + } else { + None + }; + if let Some(version) = commit_meta.lookup_value("version", Some(glib::VariantTy::new("s").unwrap())) { @@ -91,15 +137,20 @@ fn build_oci( flate2::Compression::none() }; - let rootfs_blob = export_ostree_ref(repo, commit, &mut writer, Some(compression))?; + let mut annos = HashMap::new(); + annos.insert(BLOB_OSTREE_ANNOTATION.to_string(), "true".to_string()); let description = if commit_subject.is_empty() { Cow::Owned(format!("ostree export of commit {}", commit)) } else { Cow::Borrowed(commit_subject) }; - let mut annos = HashMap::new(); - annos.insert(BLOB_OSTREE_ANNOTATION.to_string(), "true".to_string()); - writer.push_layer_annotated(rootfs_blob, Some(annos), &description); + + if let Some(chunking) = chunking { + export_chunked(repo, &mut writer, chunking, Some(compression), &description)?; + } else { + let rootfs_blob = export_ostree_ref(repo, commit, &mut writer, Some(compression))?; + writer.push_layer_annotated(rootfs_blob, Some(annos), &description); + } writer.complete()?; Ok(ImageReference { @@ -179,6 +230,8 @@ async fn build_impl( pub struct ExportOpts { /// If true, perform gzip compression of the tar layers. pub compress: bool, + /// Whether or not to generate multiple layers + pub chunked: bool, } /// Given an OSTree repository and ref, generate a container image. diff --git a/lib/src/container/mod.rs b/lib/src/container/mod.rs index a1c38825..47749a71 100644 --- a/lib/src/container/mod.rs +++ b/lib/src/container/mod.rs @@ -32,6 +32,8 @@ use std::ops::Deref; /// The label injected into a container image that contains the ostree commit SHA-256. pub const OSTREE_COMMIT_LABEL: &str = "ostree.commit"; +/// The label/annotation which contains the sha256 of the final commit. +const OSTREE_LAYER_LABEL: &str = "ostree.layer"; /// Our generic catchall fatal error, expected to be converted /// to a string to output to a terminal or logs. diff --git a/lib/src/container/store.rs b/lib/src/container/store.rs index 3b633121..29f0e100 100644 --- a/lib/src/container/store.rs +++ b/lib/src/container/store.rs @@ -131,7 +131,7 @@ pub struct PreparedImport { /// The deserialized manifest. pub manifest: oci_image::ImageManifest, /// The deserialized configuration. - pub config: Option, + pub config: oci_image::ImageConfiguration, /// The previously stored manifest digest. pub previous_manifest_digest: Option, /// The previously stored image ID. @@ -143,7 +143,10 @@ pub struct PreparedImport { } // Given a manifest, compute its ostree ref name and cached ostree commit -fn query_layer(repo: &ostree::Repo, layer: oci_image::Descriptor) -> Result { +pub(crate) fn query_layer( + repo: &ostree::Repo, + layer: oci_image::Descriptor, +) -> Result { let ostree_ref = ref_for_layer(&layer)?; let commit = repo.resolve_rev(&ostree_ref, true)?.map(|s| s.to_string()); Ok(ManifestLayerState { @@ -244,15 +247,9 @@ impl LayeredImageImporter { (None, None) }; - #[cfg(feature = "proxy_v0_2_3")] - let config = { - let config_bytes = self.proxy.fetch_config(&self.proxy_img).await?; - let config: oci_image::ImageConfiguration = - serde_json::from_slice(&config_bytes).context("Parsing image configuration")?; - Some(config) - }; - #[cfg(not(feature = "proxy_v0_2_3"))] - let config = None; + let config_bytes = self.proxy.fetch_config(&self.proxy_img).await?; + let config: oci_image::ImageConfiguration = + serde_json::from_slice(&config_bytes).context("Parsing image configuration")?; let mut layers = manifest.layers().iter().cloned(); // We require a base layer. @@ -293,6 +290,7 @@ impl LayeredImageImporter { target_imgref, &self.proxy_img, &import.manifest, + &import.config, None, true, ) @@ -329,9 +327,9 @@ impl LayeredImageImporter { base: Some(base_commit.clone()), selinux: true, }; - let w = + let r = crate::tar::write_tar(&self.repo, blob, layer.ostree_ref.as_str(), Some(opts)); - let r = super::unencapsulate::join_fetch(w, driver) + let r = super::unencapsulate::join_fetch(r, driver) .await .with_context(|| format!("Parsing layer blob {}", layer.digest()))?; layer_commits.push(r.commit); diff --git a/lib/src/container/unencapsulate.rs b/lib/src/container/unencapsulate.rs index 0f728b7a..8ec03c08 100644 --- a/lib/src/container/unencapsulate.rs +++ b/lib/src/container/unencapsulate.rs @@ -32,11 +32,12 @@ // which is exactly what is exported by the [`crate::tar::export`] process. use super::*; -use anyhow::{anyhow, Context}; +use anyhow::anyhow; use containers_image_proxy::{ImageProxy, OpenedImage}; use fn_error_context::context; use futures_util::Future; use oci_spec::image as oci_image; +use std::sync::{Arc, Mutex}; use tokio::io::{AsyncBufRead, AsyncRead}; use tracing::{event, instrument, Level}; @@ -56,7 +57,7 @@ struct ProgressReader { #[pin] reader: T, #[pin] - progress: Option, + progress: Option>>, } impl AsyncRead for ProgressReader { @@ -70,6 +71,7 @@ impl AsyncRead for ProgressReader { match this.reader.poll_read(cx, buf) { v @ std::task::Poll::Ready(Ok(_)) => { if let Some(progress) = this.progress.as_ref().get_ref() { + let progress = progress.lock().unwrap(); let state = { let mut state = *progress.borrow(); let newlen = buf.filled().len(); @@ -116,20 +118,6 @@ pub struct Import { pub image_digest: String, } -fn require_one_layer_blob(manifest: &oci_image::ImageManifest) -> Result<&oci_image::Descriptor> { - let n = manifest.layers().len(); - if let Some(layer) = manifest.layers().get(0) { - if n > 1 { - Err(anyhow!("Expected 1 layer, found {}", n)) - } else { - Ok(layer) - } - } else { - // Validated by find_layer_blobids() - unreachable!() - } -} - /// Use this to process potential errors from a worker and a driver. /// This is really a brutal hack around the fact that an error can occur /// on either our side or in the proxy. But if an error occurs on our @@ -184,9 +172,12 @@ pub async fn unencapsulate( let oi = &proxy.open_image(&imgref.imgref.to_string()).await?; let (image_digest, raw_manifest) = proxy.fetch_manifest(oi).await?; let manifest = serde_json::from_slice(&raw_manifest)?; - let ostree_commit = - unencapsulate_from_manifest_impl(repo, &mut proxy, imgref, oi, &manifest, options, false) - .await?; + let config = proxy.fetch_config(oi).await?; + let config = serde_json::from_slice(&config)?; + let ostree_commit = unencapsulate_from_manifest_impl( + repo, &mut proxy, imgref, oi, &manifest, &config, options, false, + ) + .await?; proxy.close_image(oi).await?; Ok(Import { ostree_commit, @@ -226,12 +217,62 @@ pub(crate) async fn fetch_layer_decompress<'a>( Ok((blob, driver)) } +struct OstreeContainerLayers<'a> { + component_layers: Vec<&'a oci_image::Descriptor>, + commit_layer: &'a oci_image::Descriptor, + remaining_layers: Vec<&'a oci_image::Descriptor>, +} + +impl<'a> OstreeContainerLayers<'a> { + fn new( + manifest: &'a oci_image::ImageManifest, + config: &oci_image::ImageConfiguration, + ) -> Result { + let label = crate::container::OSTREE_LAYER_LABEL; + let config_labels = config.config().as_ref().and_then(|c| c.labels().as_ref()); + let commit_layer_digest = config_labels + .and_then(|labels| labels.get(label)) + .ok_or_else(|| { + anyhow!( + "Missing annotation {} (not an ostree-exported container?)", + label + ) + })?; + let mut component_layers = Vec::new(); + let mut commit_layer = None; + let mut remaining_layers = Vec::new(); + for layer in manifest.layers() { + if layer.digest() != commit_layer_digest { + if commit_layer.is_none() { + component_layers.push(layer); + } else { + remaining_layers.push(layer); + } + } else { + commit_layer = Some(layer); + } + } + let commit_layer = commit_layer.ok_or_else(|| { + anyhow!( + "Image does not contain ostree-exported layer {}", + commit_layer_digest + ) + })?; + Ok(Self { + component_layers, + commit_layer, + remaining_layers, + }) + } +} + pub(crate) async fn unencapsulate_from_manifest_impl( repo: &ostree::Repo, proxy: &mut ImageProxy, imgref: &OstreeImageReference, oi: &containers_image_proxy::OpenedImage, manifest: &oci_spec::image::ImageManifest, + config: &oci_spec::image::ImageConfiguration, options: Option, ignore_layered: bool, ) -> Result { @@ -241,35 +282,77 @@ pub(crate) async fn unencapsulate_from_manifest_impl( return Err(anyhow!("containers-policy.json specifies a default of `insecureAcceptAnything`; refusing usage")); } let options = options.unwrap_or_default(); - let layer = if ignore_layered { - manifest - .layers() - .get(0) - .ok_or_else(|| anyhow!("No layers in image"))? - } else { - require_one_layer_blob(manifest)? + let remote = match &imgref.sigverify { + SignatureSource::OstreeRemote(remote) => Some(remote.clone()), + SignatureSource::ContainerPolicy | SignatureSource::ContainerPolicyAllowInsecure => None, }; + + let parsed_layers = OstreeContainerLayers::new(manifest, config)?; + + let component_layer_size = parsed_layers + .component_layers + .iter() + .fold(0, |acc, s| acc + s.size()); event!( Level::DEBUG, - "target blob digest:{} size: {}", - layer.digest().as_str(), - layer.size() + "commit blob digest:{} size: {} components: {} size: {}", + parsed_layers.commit_layer.digest().as_str(), + parsed_layers.commit_layer.size(), + parsed_layers.component_layers.len(), + component_layer_size, ); - let (blob, driver) = fetch_layer_decompress(proxy, oi, layer).await?; - let blob = ProgressReader { - reader: blob, - progress: options.progress, - }; - let mut taropts: crate::tar::TarImportOptions = Default::default(); - match &imgref.sigverify { - SignatureSource::OstreeRemote(remote) => taropts.remote = Some(remote.clone()), - SignatureSource::ContainerPolicy | SignatureSource::ContainerPolicyAllowInsecure => {} + let (tx, rx) = tokio::sync::mpsc::channel(1); + let repo = repo.clone(); + let import = crate::tokio_util::spawn_blocking_cancellable(move |cancellable| { + let mut rx = rx; + let mut importer = crate::tar::Importer::new(&repo, remote); + let txn = repo.auto_transaction(Some(cancellable))?; + + // First, import the commit + { + let commit_blob = rx.blocking_recv().unwrap(); + let commit_blob = tokio_util::io::SyncIoBridge::new(commit_blob); + let mut archive = tar::Archive::new(commit_blob); + importer.import_commit(&mut archive, Some(cancellable))?; + } + + // Then, all component/split blobs + while let Some(blob) = rx.blocking_recv() { + let blob = tokio_util::io::SyncIoBridge::new(blob); + let mut archive = tar::Archive::new(blob); + importer.import_objects(&mut archive, Some(cancellable))?; + } + + let checksum: String = importer.finish_import(); + txn.commit(Some(cancellable))?; + repo.mark_commit_partial(&checksum, false)?; + Ok::<_, anyhow::Error>(checksum) + }); + let progress = options.progress.map(|v| Arc::new(Mutex::new(v))); + for &layer in + std::iter::once(&parsed_layers.commit_layer).chain(parsed_layers.component_layers.iter()) + { + let (blob, driver) = fetch_layer_decompress(proxy, &oi, layer).await?; + let blob = ProgressReader { + reader: blob, + progress: progress.as_ref().map(|v| Arc::clone(v)), + }; + let (txsend, driver) = tokio::join!(tx.send(blob), driver); + let err = txsend + .err() + .map(|_| anyhow::anyhow!("Failed to send")) + .or(driver.err()); + if let Some(err) = err { + drop(tx); + return match import.await? { + Ok(_) => return Err(err), + Err(e) => Err(e), + }; + } } - let import = crate::tar::import_tar(repo, blob, Some(taropts)); - let ostree_commit = join_fetch(import, driver) - .await - .with_context(|| format!("Parsing blob {}", layer.digest()))?; + drop(tx); + let ostree_commit = import.await??; event!(Level::DEBUG, "created commit {}", ostree_commit); Ok(ostree_commit) } @@ -285,9 +368,12 @@ pub async fn unencapsulate_from_manifest( ) -> Result { let mut proxy = ImageProxy::new().await?; let oi = &proxy.open_image(&imgref.imgref.to_string()).await?; - let r = - unencapsulate_from_manifest_impl(repo, &mut proxy, imgref, oi, manifest, options, false) - .await?; + let config = proxy.fetch_config(&oi).await?; + let config = serde_json::from_slice(&config)?; + let r = unencapsulate_from_manifest_impl( + repo, &mut proxy, imgref, oi, manifest, &config, options, false, + ) + .await?; proxy.close_image(oi).await?; // FIXME write ostree commit after proxy finalization proxy.finalize().await?; diff --git a/lib/src/lib.rs b/lib/src/lib.rs index d1695dc6..8cf249e5 100644 --- a/lib/src/lib.rs +++ b/lib/src/lib.rs @@ -33,8 +33,10 @@ pub mod refescape; pub mod tar; pub mod tokio_util; +mod chunking; mod cmdext; pub(crate) mod objgv; + /// Prelude, intended for glob import. pub mod prelude { #[doc(hidden)] diff --git a/lib/src/tar/export.rs b/lib/src/tar/export.rs index 0bf37c5b..202b4406 100644 --- a/lib/src/tar/export.rs +++ b/lib/src/tar/export.rs @@ -1,5 +1,7 @@ //! APIs for creating container images from OSTree commits +use crate::chunking; +use crate::chunking::Chunking; use crate::objgv::*; use anyhow::Context; use anyhow::Result; @@ -10,6 +12,7 @@ use gio::prelude::*; use gvariant::aligned_bytes::TryAsAligned; use gvariant::{Marker, Structure}; use ostree::gio; +use std::borrow::Borrow; use std::borrow::Cow; use std::collections::HashSet; use std::io::BufReader; @@ -260,7 +263,7 @@ impl<'a, W: std::io::Write> OstreeTarWriter<'a, W> { /// Write a content object, returning the path/header that should be used /// as a hard link to it in the target path. This matches how ostree checkouts work. - fn append_content(&mut self, checksum: &str) -> Result<(Utf8PathBuf, tar::Header)> { + fn append_content_obj(&mut self, checksum: &str) -> Result<(Utf8PathBuf, tar::Header)> { let path = object_path(ostree::ObjectType::File, checksum); let (instream, meta, xattrs) = self.repo.load_file(checksum, gio::NONE_CANCELLABLE)?; @@ -331,6 +334,18 @@ impl<'a, W: std::io::Write> OstreeTarWriter<'a, W> { Ok(()) } + fn append_content_hardlink( + &mut self, + srcpath: &Utf8Path, + mut h: tar::Header, + dest: &Utf8Path, + ) -> Result<()> { + h.set_entry_type(tar::EntryType::Link); + h.set_link_name(srcpath)?; + self.out.append_data(&mut h, dest, &mut std::io::empty())?; + Ok(()) + } + /// Write a dirtree object. fn append_dirtree>( &mut self, @@ -357,13 +372,12 @@ impl<'a, W: std::io::Write> OstreeTarWriter<'a, W> { let (name, csum) = file.to_tuple(); let name = name.to_str(); let checksum = &hex::encode(csum); - let (objpath, mut h) = self.append_content(checksum)?; + let (objpath, mut h) = self.append_content_obj(checksum)?; h.set_entry_type(tar::EntryType::Link); h.set_link_name(&objpath)?; let subpath = &dirpath.join(name); let subpath = map_path(subpath); - self.out - .append_data(&mut h, &*subpath, &mut std::io::empty())?; + self.append_content_hardlink(&objpath, h, &*subpath)?; } for item in dirs { @@ -431,6 +445,67 @@ pub fn export_commit( Ok(()) } +/// Output a chunk. +pub(crate) fn export_chunk( + repo: &ostree::Repo, + chunk: &chunking::Chunk, + out: &mut tar::Builder, +) -> Result<()> { + let writer = &mut OstreeTarWriter::new(repo, out, ExportOptions::default()); + writer.write_repo_structure()?; + for (checksum, (_size, paths)) in chunk.content.iter() { + let (objpath, h) = writer.append_content_obj(checksum.borrow())?; + for path in paths.iter() { + let path = path.strip_prefix("/").unwrap_or(path); + let h = h.clone(); + writer.append_content_hardlink(&objpath, h, path)?; + } + } + Ok(()) +} + +/// Output the last chunk in a chunking. +#[context("Exporting final chunk")] +pub(crate) fn export_final_chunk( + repo: &ostree::Repo, + chunking: &Chunking, + out: &mut tar::Builder, +) -> Result<()> { + let cancellable = gio::NONE_CANCELLABLE; + let writer = &mut OstreeTarWriter::new(repo, out, ExportOptions::default()); + writer.write_repo_structure()?; + + let (commit_v, _) = repo.load_commit(&chunking.commit)?; + let commit_v = &commit_v; + writer.append(ostree::ObjectType::Commit, &chunking.commit, commit_v)?; + if let Some(commitmeta) = repo.read_commit_detached_metadata(&chunking.commit, cancellable)? { + writer.append( + ostree::ObjectType::CommitMeta, + &chunking.commit, + &commitmeta, + )?; + } + + // In the chunked case, the final layer has all ostree metadata objects. + for meta in &chunking.meta { + let objtype = meta.objtype(); + let checksum = meta.checksum(); + let v = repo.load_variant(objtype, checksum)?; + writer.append(objtype, checksum, &v)?; + } + + for (checksum, (_size, paths)) in chunking.remainder.content.iter() { + let (objpath, h) = writer.append_content_obj(checksum.borrow())?; + for path in paths.iter() { + let path = path.strip_prefix("/").unwrap_or(path); + let h = h.clone(); + writer.append_content_hardlink(&objpath, h, path)?; + } + } + + Ok(()) +} + #[cfg(test)] mod tests { use super::*; diff --git a/lib/src/tar/import.rs b/lib/src/tar/import.rs index aa768d07..e45b44d6 100644 --- a/lib/src/tar/import.rs +++ b/lib/src/tar/import.rs @@ -39,12 +39,15 @@ struct ImportStats { } /// Importer machine. -struct Importer { +pub(crate) struct Importer { repo: ostree::Repo, remote: Option, xattrs: HashMap, next_xattrs: Option<(String, String)>, + /// Set when we import a commit object + commit_checksum: Option, + // Reusable buffer for reads. See also https://github.com/rust-lang/rust/issues/78485 buf: Vec, @@ -141,7 +144,7 @@ fn parse_checksum(parent: &str, name: &Utf8Path) -> Result { } impl Importer { - fn new(repo: &ostree::Repo, remote: Option) -> Self { + pub(crate) fn new(repo: &ostree::Repo, remote: Option) -> Self { Self { repo: repo.clone(), remote, @@ -149,6 +152,7 @@ impl Importer { xattrs: Default::default(), next_xattrs: None, stats: Default::default(), + commit_checksum: None, } } @@ -454,17 +458,47 @@ impl Importer { Ok(()) } - fn import( - mut self, + fn import_objects_impl<'a>( + &mut self, + ents: impl Iterator, Utf8PathBuf)>>, + cancellable: Option<&gio::Cancellable>, + ) -> Result<()> { + for entry in ents { + let (entry, path) = entry?; + + if let Ok(p) = path.strip_prefix("objects/") { + self.import_object(entry, p, cancellable)?; + } else if path.strip_prefix("xattrs/").is_ok() { + self.import_xattrs(entry)?; + } + } + Ok(()) + } + + pub(crate) fn import_objects( + &mut self, + archive: &mut tar::Archive, + cancellable: Option<&gio::Cancellable>, + ) -> Result<()> { + let ents = archive.entries()?.filter_map(|e| match e { + Ok(e) => Self::filter_entry(e).transpose(), + Err(e) => Some(Err(anyhow::Error::msg(e))), + }); + self.import_objects_impl(ents, cancellable) + } + + pub(crate) fn import_commit( + &mut self, archive: &mut tar::Archive, cancellable: Option<&gio::Cancellable>, - ) -> Result { + ) -> Result<()> { + // This can only be invoked once + assert!(self.commit_checksum.is_none()); // Create an iterator that skips over directories; we just care about the file names. let mut ents = archive.entries()?.filter_map(|e| match e { Ok(e) => Self::filter_entry(e).transpose(), Err(e) => Some(Err(anyhow::Error::msg(e))), }); - // Read the commit object. let (commit_ent, commit_path) = ents .next() @@ -557,18 +591,15 @@ impl Importer { } } } + self.commit_checksum = Some(checksum); - for entry in ents { - let (entry, path) = entry?; + self.import_objects_impl(ents, cancellable)?; - if let Ok(p) = path.strip_prefix("objects/") { - self.import_object(entry, p, cancellable)?; - } else if path.strip_prefix("xattrs/").is_ok() { - self.import_xattrs(entry)?; - } - } + Ok(()) + } - Ok(checksum) + pub(crate) fn finish_import(self) -> String { + self.commit_checksum.unwrap() } } @@ -589,7 +620,7 @@ pub struct TarImportOptions { pub remote: Option, } -/// Read the contents of a tarball and import the ostree commit inside. The sha56 of the imported commit will be returned. +/// Read the contents of a tarball and import the ostree commit inside. The sha256 of the imported commit will be returned. #[instrument(skip(repo, src))] pub async fn import_tar( repo: &ostree::Repo, @@ -602,8 +633,9 @@ pub async fn import_tar( let import = crate::tokio_util::spawn_blocking_cancellable(move |cancellable| { let mut archive = tar::Archive::new(src); let txn = repo.auto_transaction(Some(cancellable))?; - let importer = Importer::new(&repo, options.remote); - let checksum = importer.import(&mut archive, Some(cancellable))?; + let mut importer = Importer::new(&repo, options.remote); + importer.import_commit(&mut archive, Some(cancellable))?; + let checksum = importer.finish_import(); txn.commit(Some(cancellable))?; repo.mark_commit_partial(&checksum, false)?; Ok::<_, anyhow::Error>(checksum) diff --git a/lib/tests/it/main.rs b/lib/tests/it/main.rs index 7046b04b..f3142ff6 100644 --- a/lib/tests/it/main.rs +++ b/lib/tests/it/main.rs @@ -5,7 +5,7 @@ use indoc::indoc; use once_cell::sync::Lazy; use ostree_ext::container::store::PrepareResult; use ostree_ext::container::{ - Config, ImageReference, OstreeImageReference, SignatureSource, Transport, + Config, ExportOpts, ImageReference, OstreeImageReference, SignatureSource, Transport, }; use ostree_ext::tar::TarImportOptions; use ostree_ext::{gio, glib}; @@ -385,8 +385,7 @@ fn skopeo_inspect(imgref: &str) -> Result { Ok(String::from_utf8(out.stdout)?) } -#[tokio::test] -async fn test_container_import_export() -> Result<()> { +async fn impl_test_container_import_export(chunked: bool) -> Result<()> { let fixture = Fixture::new()?; let testrev = fixture .srcrepo @@ -408,11 +407,15 @@ async fn test_container_import_export() -> Result<()> { ), cmd: Some(vec!["/bin/bash".to_string()]), }; + let opts = ExportOpts { + chunked, + ..Default::default() + }; let digest = ostree_ext::container::encapsulate( &fixture.srcrepo, TESTREF, &config, - None, + Some(opts), &srcoci_imgref, ) .await @@ -496,6 +499,13 @@ async fn oci_clone(src: impl AsRef, dest: impl AsRef) -> Res Ok(()) } +#[tokio::test] +async fn test_container_import_export() -> Result<()> { + impl_test_container_import_export(false).await?; + impl_test_container_import_export(true).await?; + Ok(()) +} + /// But layers work via the container::write module. #[tokio::test] async fn test_container_write_derive() -> Result<()> { @@ -587,17 +597,13 @@ async fn test_container_write_derive() -> Result<()> { assert!(digest.starts_with("sha256:")); assert_eq!(digest, expected_digest); - #[cfg(feature = "proxy_v0_2_3")] - { - let commit_meta = &imported_commit.child_value(0); - let proxy = containers_image_proxy::ImageProxy::new().await?; - let commit_meta = glib::VariantDict::new(Some(commit_meta)); - let config = commit_meta - .lookup::("ostree.container.image-config")? - .unwrap(); - let config: oci_spec::image::ImageConfiguration = serde_json::from_str(&config)?; - assert_eq!(config.os(), &oci_spec::image::Os::Linux); - } + let commit_meta = &imported_commit.child_value(0); + let commit_meta = glib::VariantDict::new(Some(commit_meta)); + let config = commit_meta + .lookup::("ostree.container.image-config")? + .unwrap(); + let config: oci_spec::image::ImageConfiguration = serde_json::from_str(&config)?; + assert_eq!(config.os(), &oci_spec::image::Os::Linux); // Parse the commit and verify we pulled the derived content. bash!(