diff --git a/lib/src/container/store.rs b/lib/src/container/store.rs index 6563d341..c715f8ca 100644 --- a/lib/src/container/store.rs +++ b/lib/src/container/store.rs @@ -716,7 +716,7 @@ impl ImageImporter { p.send(ImportProgress::OstreeChunkStarted(layer.layer.clone())) .await?; } - let (blob, driver) = fetch_layer_decompress( + let (blob, driver, media_type) = fetch_layer( &self.proxy, &self.proxy_img, &import.manifest, @@ -733,6 +733,7 @@ impl ImageImporter { let txn = repo.auto_transaction(Some(cancellable))?; let mut importer = crate::tar::Importer::new_for_object_set(&repo); let blob = tokio_util::io::SyncIoBridge::new(blob); + let blob = super::unencapsulate::decompressor(&media_type, blob)?; let mut archive = tar::Archive::new(blob); importer.import_objects(&mut archive, Some(cancellable))?; let commit = if write_refs { @@ -761,7 +762,7 @@ impl ImageImporter { )) .await?; } - let (blob, driver) = fetch_layer_decompress( + let (blob, driver, media_type) = fetch_layer( &self.proxy, &self.proxy_img, &import.manifest, @@ -778,6 +779,7 @@ impl ImageImporter { let txn = repo.auto_transaction(Some(cancellable))?; let mut importer = crate::tar::Importer::new_for_commit(&repo, remote); let blob = tokio_util::io::SyncIoBridge::new(blob); + let blob = super::unencapsulate::decompressor(&media_type, blob)?; let mut archive = tar::Archive::new(blob); importer.import_commit(&mut archive, Some(cancellable))?; let commit = importer.finish_import_commit(); @@ -873,7 +875,7 @@ impl ImageImporter { p.send(ImportProgress::DerivedLayerStarted(layer.layer.clone())) .await?; } - let (blob, driver) = super::unencapsulate::fetch_layer_decompress( + let (blob, driver, media_type) = super::unencapsulate::fetch_layer( &proxy, &proxy_img, &import.manifest, @@ -891,8 +893,13 @@ impl ImageImporter { allow_nonusr: root_is_transient, retain_var: self.ostree_v2024_3, }; - let r = - crate::tar::write_tar(&self.repo, blob, layer.ostree_ref.as_str(), Some(opts)); + let r = crate::tar::write_tar( + &self.repo, + blob, + media_type, + layer.ostree_ref.as_str(), + Some(opts), + ); let r = super::unencapsulate::join_fetch(r, driver) .await .with_context(|| format!("Parsing layer blob {}", layer.layer.digest()))?; diff --git a/lib/src/container/unencapsulate.rs b/lib/src/container/unencapsulate.rs index 45bfd989..9e0818a8 100644 --- a/lib/src/container/unencapsulate.rs +++ b/lib/src/container/unencapsulate.rs @@ -34,11 +34,11 @@ use crate::container::store::LayerProgress; use super::*; -use anyhow::Context as _; use containers_image_proxy::{ImageProxy, OpenedImage}; use fn_error_context::context; -use futures_util::{Future, FutureExt, TryFutureExt as _}; +use futures_util::{Future, FutureExt}; use oci_spec::image::{self as oci_image, Digest}; +use std::io::Read; use std::sync::{Arc, Mutex}; use tokio::{ io::{AsyncBufRead, AsyncRead}, @@ -191,80 +191,30 @@ pub async fn unencapsulate(repo: &ostree::Repo, imgref: &OstreeImageReference) - importer.unencapsulate().await } -/// Take an async AsyncBufRead and handle decompression for it, returning -/// a wrapped AsyncBufRead implementation. -/// This is implemented with a background thread using a pipe-to-self, -/// and so there is an additional Future object returned that is a "driver" -/// task and must also be checked for errors. -pub(crate) fn decompress_bridge( - src: impl tokio::io::AsyncBufRead + Send + Unpin + 'static, - is_zstd: bool, -) -> Result<( - // This one is the input reader - impl tokio::io::AsyncBufRead + Send + Unpin + 'static, - // And this represents the worker thread doing copying - impl Future> + Send + Unpin + 'static, -)> { - // We use a plain unix pipe() because it's just a very convenient - // way to bridge arbitrarily between sync and async with a worker - // thread. Yes, it involves going through the kernel, but - // eventually we'll replace all this logic with podman anyways. - let (tx, rx) = tokio::net::unix::pipe::pipe()?; - let task = tokio::task::spawn_blocking(move || -> Result<()> { - // Convert the write half of the pipe() into a regular blocking file descriptor - let tx = tx.into_blocking_fd()?; - let mut tx = std::fs::File::from(tx); - // Convert the async input back to synchronous. - let src = tokio_util::io::SyncIoBridge::new(src); - let bufr = std::io::BufReader::new(src); - // Wrap the input in a decompressor; I originally tried to make - // this function take a function pointer, but yeah that was painful - // with the type system. - let mut src: Box = if is_zstd { - Box::new(zstd::stream::read::Decoder::new(bufr)?) - } else { - Box::new(flate2::bufread::GzDecoder::new(bufr)) - }; - // We don't care about the number of bytes copied - let _n: u64 = std::io::copy(&mut src, &mut tx).context("Copying for decompression")?; - Ok(()) - }) - // Flatten the nested Result> - .map(crate::tokio_util::flatten_anyhow); - // And return the pair of futures - Ok((tokio::io::BufReader::new(rx), task)) -} - /// Create a decompressor for this MIME type, given a stream of input. -fn new_async_decompressor( +pub(crate) fn decompressor( media_type: &oci_image::MediaType, - src: impl AsyncBufRead + Send + Unpin + 'static, -) -> Result<( - Box, - impl Future> + Send + Unpin + 'static, -)> { - let r: ( - Box, - Box> + Send + Unpin + 'static>, - ) = match media_type { + src: impl Read + Send + 'static, +) -> Result> { + let r: Box = match media_type { m @ (oci_image::MediaType::ImageLayerGzip | oci_image::MediaType::ImageLayerZstd) => { - let is_zstd = matches!(m, oci_image::MediaType::ImageLayerZstd); - let (r, driver) = decompress_bridge(src, is_zstd)?; - (Box::new(r), Box::new(driver) as _) - } - oci_image::MediaType::ImageLayer => { - (Box::new(src), Box::new(futures_util::future::ready(Ok(())))) - } - oci_image::MediaType::Other(t) if t.as_str() == DOCKER_TYPE_LAYER_TAR => { - (Box::new(src), Box::new(futures_util::future::ready(Ok(())))) + if matches!(m, oci_image::MediaType::ImageLayerZstd) { + Box::new(zstd::stream::read::Decoder::new(src)?) + } else { + Box::new(flate2::bufread::GzDecoder::new(std::io::BufReader::new( + src, + ))) + } } + oci_image::MediaType::ImageLayer => Box::new(src), + oci_image::MediaType::Other(t) if t.as_str() == DOCKER_TYPE_LAYER_TAR => Box::new(src), o => anyhow::bail!("Unhandled layer type: {}", o), }; Ok(r) } /// A wrapper for [`get_blob`] which fetches a layer and decompresses it. -pub(crate) async fn fetch_layer_decompress<'a>( +pub(crate) async fn fetch_layer<'a>( proxy: &'a ImageProxy, img: &OpenedImage, manifest: &oci_image::ImageManifest, @@ -275,12 +225,13 @@ pub(crate) async fn fetch_layer_decompress<'a>( ) -> Result<( Box, impl Future> + 'a, + oci_image::MediaType, )> { use futures_util::future::Either; tracing::debug!("fetching {}", layer.digest()); let layer_index = manifest.layers().iter().position(|x| x == layer).unwrap(); let (blob, driver, size); - let media_type: &oci_image::MediaType; + let media_type: oci_image::MediaType; match transport_src { Transport::ContainerStorage => { let layer_info = layer_info @@ -290,12 +241,12 @@ pub(crate) async fn fetch_layer_decompress<'a>( anyhow!("blobid position {layer_index} exceeds diffid count {n_layers}") })?; size = layer_blob.size; - media_type = &layer_blob.media_type; + media_type = layer_blob.media_type.clone(); (blob, driver) = proxy.get_blob(img, &layer_blob.digest, size).await?; } _ => { size = layer.size(); - media_type = layer.media_type(); + media_type = layer.media_type().clone(); (blob, driver) = proxy.get_blob(img, layer.digest(), size).await?; } }; @@ -316,13 +267,10 @@ pub(crate) async fn fetch_layer_decompress<'a>( progress.send_replace(Some(status)); } }; - let (reader, compression_driver) = new_async_decompressor(media_type, readprogress)?; - let driver = driver.and_then(|()| compression_driver); + let reader = Box::new(readprogress); let driver = futures_util::future::join(readproxy, driver).map(|r| r.1); - Ok((reader, Either::Left(driver))) + Ok((reader, Either::Left(driver), media_type)) } else { - let (blob, compression_driver) = new_async_decompressor(media_type, blob)?; - let driver = driver.and_then(|()| compression_driver); - Ok((blob, Either::Right(driver))) + Ok((Box::new(blob), Either::Right(driver), media_type)) } } diff --git a/lib/src/tar/write.rs b/lib/src/tar/write.rs index 29221e9b..57c367bf 100644 --- a/lib/src/tar/write.rs +++ b/lib/src/tar/write.rs @@ -15,6 +15,7 @@ use cap_std::io_lifetimes; use cap_std_ext::cap_std::fs::Dir; use cap_std_ext::cmdext::CapStdExtCommandExt; use cap_std_ext::{cap_std, cap_tempfile}; +use containers_image_proxy::oci_spec::image as oci_image; use fn_error_context::context; use ostree::gio; use ostree::prelude::FileExt; @@ -337,6 +338,7 @@ pub(crate) fn filter_tar( #[context("Filtering tar stream")] async fn filter_tar_async( src: impl AsyncRead + Send + 'static, + media_type: oci_image::MediaType, mut dest: impl AsyncWrite + Send + Unpin, config: &TarImportConfig, repo_tmpdir: Dir, @@ -345,12 +347,14 @@ async fn filter_tar_async( // The source must be moved to the heap so we know it is stable for passing to the worker thread let src = Box::pin(src); let config = config.clone(); - let tar_transformer = tokio::task::spawn_blocking(move || { - let mut src = tokio_util::io::SyncIoBridge::new(src); + let tar_transformer = crate::tokio_util::spawn_blocking_flatten(move || { + let src = tokio_util::io::SyncIoBridge::new(src); + let mut src = crate::container::decompressor(&media_type, src)?; let dest = tokio_util::io::SyncIoBridge::new(tx_buf); + let r = filter_tar(&mut src, dest, &config, &repo_tmpdir); // Pass ownership of the input stream back to the caller - see below. - (r, src) + Ok((r, src)) }); let copier = tokio::io::copy(&mut rx_buf, &mut dest); let (r, v) = tokio::join!(tar_transformer, copier); @@ -373,6 +377,7 @@ async fn filter_tar_async( pub async fn write_tar( repo: &ostree::Repo, src: impl tokio::io::AsyncRead + Send + Unpin + 'static, + media_type: oci_image::MediaType, refname: &str, options: Option, ) -> Result { @@ -430,7 +435,8 @@ pub async fn write_tar( let repo_tmpdir = Dir::reopen_dir(&repo.dfd_borrow())? .open_dir("tmp") .context("Getting repo tmpdir")?; - let filtered_result = filter_tar_async(src, child_stdin, &import_config, repo_tmpdir); + let filtered_result = + filter_tar_async(src, media_type, child_stdin, &import_config, repo_tmpdir); let output_copier = async move { // Gather stdout/stderr to buffers let mut child_stdout_buf = String::new(); @@ -585,7 +591,14 @@ mod tests { let mut dest = Vec::new(); let src = tokio::io::BufReader::new(tokio::fs::File::open(rootfs_tar_path).await?); let cap_tmpdir = Dir::open_ambient_dir(&tempd, cap_std::ambient_authority())?; - filter_tar_async(src, &mut dest, &Default::default(), cap_tmpdir).await?; + filter_tar_async( + src, + oci_image::MediaType::ImageLayer, + &mut dest, + &Default::default(), + cap_tmpdir, + ) + .await?; let dest = dest.as_slice(); let mut final_tar = tar::Archive::new(Cursor::new(dest)); let destdir = &tempd.path().join("destdir"); diff --git a/lib/src/tokio_util.rs b/lib/src/tokio_util.rs index d376dee2..e21b142c 100644 --- a/lib/src/tokio_util.rs +++ b/lib/src/tokio_util.rs @@ -73,6 +73,15 @@ where spawn_blocking_cancellable(f).map(flatten_anyhow) } +/// A wrapper around [`tokio::task::spawn_blocking`] that flattens nested results. +pub fn spawn_blocking_flatten(f: F) -> impl Future> +where + F: FnOnce() -> Result + Send + 'static, + T: Send + 'static, +{ + tokio::task::spawn_blocking(f).map(flatten_anyhow) +} + #[cfg(test)] mod tests { use super::*; diff --git a/lib/tests/it/main.rs b/lib/tests/it/main.rs index 97f3efff..dc044804 100644 --- a/lib/tests/it/main.rs +++ b/lib/tests/it/main.rs @@ -385,7 +385,14 @@ async fn test_tar_write() -> Result<()> { let src = fixture.dir.open(tmptar)?; fixture.dir.remove_file(tmptar)?; let src = tokio::fs::File::from_std(src.into_std()); - let r = ostree_ext::tar::write_tar(fixture.destrepo(), src, "layer", None).await?; + let r = ostree_ext::tar::write_tar( + fixture.destrepo(), + src, + oci_image::MediaType::ImageLayer, + "layer", + None, + ) + .await?; let layer_commit = r.commit.as_str(); cmd!( sh, @@ -409,7 +416,14 @@ async fn test_tar_write_tar_layer() -> Result<()> { let mut dec = flate2::bufread::GzDecoder::new(std::io::Cursor::new(EXAMPLE_TAR_LAYER)); let _n = std::io::copy(&mut dec, &mut v)?; let r = tokio::io::BufReader::new(std::io::Cursor::new(v)); - ostree_ext::tar::write_tar(fixture.destrepo(), r, "test", None).await?; + ostree_ext::tar::write_tar( + fixture.destrepo(), + r, + oci_image::MediaType::ImageLayer, + "test", + None, + ) + .await?; Ok(()) }