Skip to content
This repository has been archived by the owner on Nov 7, 2024. It is now read-only.

Remove decompress_bridge and move decompression inline #677

Merged
merged 1 commit into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions lib/src/container/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,7 @@ impl ImageImporter {
p.send(ImportProgress::OstreeChunkStarted(layer.layer.clone()))
.await?;
}
let (blob, driver) = fetch_layer_decompress(
let (blob, driver, media_type) = fetch_layer(
&self.proxy,
&self.proxy_img,
&import.manifest,
Expand All @@ -733,6 +733,7 @@ impl ImageImporter {
let txn = repo.auto_transaction(Some(cancellable))?;
let mut importer = crate::tar::Importer::new_for_object_set(&repo);
let blob = tokio_util::io::SyncIoBridge::new(blob);
let blob = super::unencapsulate::decompressor(&media_type, blob)?;
let mut archive = tar::Archive::new(blob);
importer.import_objects(&mut archive, Some(cancellable))?;
let commit = if write_refs {
Expand Down Expand Up @@ -761,7 +762,7 @@ impl ImageImporter {
))
.await?;
}
let (blob, driver) = fetch_layer_decompress(
let (blob, driver, media_type) = fetch_layer(
&self.proxy,
&self.proxy_img,
&import.manifest,
Expand All @@ -778,6 +779,7 @@ impl ImageImporter {
let txn = repo.auto_transaction(Some(cancellable))?;
let mut importer = crate::tar::Importer::new_for_commit(&repo, remote);
let blob = tokio_util::io::SyncIoBridge::new(blob);
let blob = super::unencapsulate::decompressor(&media_type, blob)?;
let mut archive = tar::Archive::new(blob);
importer.import_commit(&mut archive, Some(cancellable))?;
let commit = importer.finish_import_commit();
Expand Down Expand Up @@ -873,7 +875,7 @@ impl ImageImporter {
p.send(ImportProgress::DerivedLayerStarted(layer.layer.clone()))
.await?;
}
let (blob, driver) = super::unencapsulate::fetch_layer_decompress(
let (blob, driver, media_type) = super::unencapsulate::fetch_layer(
&proxy,
&proxy_img,
&import.manifest,
Expand All @@ -891,8 +893,13 @@ impl ImageImporter {
allow_nonusr: root_is_transient,
retain_var: self.ostree_v2024_3,
};
let r =
crate::tar::write_tar(&self.repo, blob, layer.ostree_ref.as_str(), Some(opts));
let r = crate::tar::write_tar(
&self.repo,
blob,
media_type,
layer.ostree_ref.as_str(),
Some(opts),
);
let r = super::unencapsulate::join_fetch(r, driver)
.await
.with_context(|| format!("Parsing layer blob {}", layer.layer.digest()))?;
Expand Down
98 changes: 23 additions & 75 deletions lib/src/container/unencapsulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@
use crate::container::store::LayerProgress;

use super::*;
use anyhow::Context as _;
use containers_image_proxy::{ImageProxy, OpenedImage};
use fn_error_context::context;
use futures_util::{Future, FutureExt, TryFutureExt as _};
use futures_util::{Future, FutureExt};
use oci_spec::image::{self as oci_image, Digest};
use std::io::Read;
use std::sync::{Arc, Mutex};
use tokio::{
io::{AsyncBufRead, AsyncRead},
Expand Down Expand Up @@ -191,80 +191,30 @@ pub async fn unencapsulate(repo: &ostree::Repo, imgref: &OstreeImageReference) -
importer.unencapsulate().await
}

/// Take an async AsyncBufRead and handle decompression for it, returning
/// a wrapped AsyncBufRead implementation.
/// This is implemented with a background thread using a pipe-to-self,
/// and so there is an additional Future object returned that is a "driver"
/// task and must also be checked for errors.
pub(crate) fn decompress_bridge(
src: impl tokio::io::AsyncBufRead + Send + Unpin + 'static,
is_zstd: bool,
) -> Result<(
// This one is the input reader
impl tokio::io::AsyncBufRead + Send + Unpin + 'static,
// And this represents the worker thread doing copying
impl Future<Output = Result<()>> + Send + Unpin + 'static,
)> {
// We use a plain unix pipe() because it's just a very convenient
// way to bridge arbitrarily between sync and async with a worker
// thread. Yes, it involves going through the kernel, but
// eventually we'll replace all this logic with podman anyways.
let (tx, rx) = tokio::net::unix::pipe::pipe()?;
let task = tokio::task::spawn_blocking(move || -> Result<()> {
// Convert the write half of the pipe() into a regular blocking file descriptor
let tx = tx.into_blocking_fd()?;
let mut tx = std::fs::File::from(tx);
// Convert the async input back to synchronous.
let src = tokio_util::io::SyncIoBridge::new(src);
let bufr = std::io::BufReader::new(src);
// Wrap the input in a decompressor; I originally tried to make
// this function take a function pointer, but yeah that was painful
// with the type system.
let mut src: Box<dyn std::io::Read> = if is_zstd {
Box::new(zstd::stream::read::Decoder::new(bufr)?)
} else {
Box::new(flate2::bufread::GzDecoder::new(bufr))
};
// We don't care about the number of bytes copied
let _n: u64 = std::io::copy(&mut src, &mut tx).context("Copying for decompression")?;
Ok(())
})
// Flatten the nested Result<Result<>>
.map(crate::tokio_util::flatten_anyhow);
// And return the pair of futures
Ok((tokio::io::BufReader::new(rx), task))
}

/// Create a decompressor for this MIME type, given a stream of input.
fn new_async_decompressor(
pub(crate) fn decompressor(
media_type: &oci_image::MediaType,
src: impl AsyncBufRead + Send + Unpin + 'static,
) -> Result<(
Box<dyn AsyncBufRead + Send + Unpin + 'static>,
impl Future<Output = Result<()>> + Send + Unpin + 'static,
)> {
let r: (
Box<dyn AsyncBufRead + Send + Unpin + 'static>,
Box<dyn Future<Output = Result<()>> + Send + Unpin + 'static>,
) = match media_type {
src: impl Read + Send + 'static,
) -> Result<Box<dyn Read + Send + 'static>> {
let r: Box<dyn std::io::Read + Send + 'static> = match media_type {
m @ (oci_image::MediaType::ImageLayerGzip | oci_image::MediaType::ImageLayerZstd) => {
let is_zstd = matches!(m, oci_image::MediaType::ImageLayerZstd);
let (r, driver) = decompress_bridge(src, is_zstd)?;
(Box::new(r), Box::new(driver) as _)
}
oci_image::MediaType::ImageLayer => {
(Box::new(src), Box::new(futures_util::future::ready(Ok(()))))
}
oci_image::MediaType::Other(t) if t.as_str() == DOCKER_TYPE_LAYER_TAR => {
(Box::new(src), Box::new(futures_util::future::ready(Ok(()))))
if matches!(m, oci_image::MediaType::ImageLayerZstd) {
Box::new(zstd::stream::read::Decoder::new(src)?)
} else {
Box::new(flate2::bufread::GzDecoder::new(std::io::BufReader::new(
src,
)))
}
}
oci_image::MediaType::ImageLayer => Box::new(src),
oci_image::MediaType::Other(t) if t.as_str() == DOCKER_TYPE_LAYER_TAR => Box::new(src),
o => anyhow::bail!("Unhandled layer type: {}", o),
};
Ok(r)
}

/// A wrapper for [`get_blob`] which fetches a layer and decompresses it.
pub(crate) async fn fetch_layer_decompress<'a>(
pub(crate) async fn fetch_layer<'a>(
proxy: &'a ImageProxy,
img: &OpenedImage,
manifest: &oci_image::ImageManifest,
Expand All @@ -275,12 +225,13 @@ pub(crate) async fn fetch_layer_decompress<'a>(
) -> Result<(
Box<dyn AsyncBufRead + Send + Unpin>,
impl Future<Output = Result<()>> + 'a,
oci_image::MediaType,
)> {
use futures_util::future::Either;
tracing::debug!("fetching {}", layer.digest());
let layer_index = manifest.layers().iter().position(|x| x == layer).unwrap();
let (blob, driver, size);
let media_type: &oci_image::MediaType;
let media_type: oci_image::MediaType;
match transport_src {
Transport::ContainerStorage => {
let layer_info = layer_info
Expand All @@ -290,12 +241,12 @@ pub(crate) async fn fetch_layer_decompress<'a>(
anyhow!("blobid position {layer_index} exceeds diffid count {n_layers}")
})?;
size = layer_blob.size;
media_type = &layer_blob.media_type;
media_type = layer_blob.media_type.clone();
(blob, driver) = proxy.get_blob(img, &layer_blob.digest, size).await?;
}
_ => {
size = layer.size();
media_type = layer.media_type();
media_type = layer.media_type().clone();
(blob, driver) = proxy.get_blob(img, layer.digest(), size).await?;
}
};
Expand All @@ -316,13 +267,10 @@ pub(crate) async fn fetch_layer_decompress<'a>(
progress.send_replace(Some(status));
}
};
let (reader, compression_driver) = new_async_decompressor(media_type, readprogress)?;
let driver = driver.and_then(|()| compression_driver);
let reader = Box::new(readprogress);
let driver = futures_util::future::join(readproxy, driver).map(|r| r.1);
Ok((reader, Either::Left(driver)))
Ok((reader, Either::Left(driver), media_type))
} else {
let (blob, compression_driver) = new_async_decompressor(media_type, blob)?;
let driver = driver.and_then(|()| compression_driver);
Ok((blob, Either::Right(driver)))
Ok((Box::new(blob), Either::Right(driver), media_type))
}
}
23 changes: 18 additions & 5 deletions lib/src/tar/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use cap_std::io_lifetimes;
use cap_std_ext::cap_std::fs::Dir;
use cap_std_ext::cmdext::CapStdExtCommandExt;
use cap_std_ext::{cap_std, cap_tempfile};
use containers_image_proxy::oci_spec::image as oci_image;
use fn_error_context::context;
use ostree::gio;
use ostree::prelude::FileExt;
Expand Down Expand Up @@ -337,6 +338,7 @@ pub(crate) fn filter_tar(
#[context("Filtering tar stream")]
async fn filter_tar_async(
src: impl AsyncRead + Send + 'static,
media_type: oci_image::MediaType,
mut dest: impl AsyncWrite + Send + Unpin,
config: &TarImportConfig,
repo_tmpdir: Dir,
Expand All @@ -345,12 +347,14 @@ async fn filter_tar_async(
// The source must be moved to the heap so we know it is stable for passing to the worker thread
let src = Box::pin(src);
let config = config.clone();
let tar_transformer = tokio::task::spawn_blocking(move || {
let mut src = tokio_util::io::SyncIoBridge::new(src);
let tar_transformer = crate::tokio_util::spawn_blocking_flatten(move || {
let src = tokio_util::io::SyncIoBridge::new(src);
let mut src = crate::container::decompressor(&media_type, src)?;
let dest = tokio_util::io::SyncIoBridge::new(tx_buf);

let r = filter_tar(&mut src, dest, &config, &repo_tmpdir);
// Pass ownership of the input stream back to the caller - see below.
(r, src)
Ok((r, src))
});
let copier = tokio::io::copy(&mut rx_buf, &mut dest);
let (r, v) = tokio::join!(tar_transformer, copier);
Expand All @@ -373,6 +377,7 @@ async fn filter_tar_async(
pub async fn write_tar(
repo: &ostree::Repo,
src: impl tokio::io::AsyncRead + Send + Unpin + 'static,
media_type: oci_image::MediaType,
refname: &str,
options: Option<WriteTarOptions>,
) -> Result<WriteTarResult> {
Expand Down Expand Up @@ -430,7 +435,8 @@ pub async fn write_tar(
let repo_tmpdir = Dir::reopen_dir(&repo.dfd_borrow())?
.open_dir("tmp")
.context("Getting repo tmpdir")?;
let filtered_result = filter_tar_async(src, child_stdin, &import_config, repo_tmpdir);
let filtered_result =
filter_tar_async(src, media_type, child_stdin, &import_config, repo_tmpdir);
let output_copier = async move {
// Gather stdout/stderr to buffers
let mut child_stdout_buf = String::new();
Expand Down Expand Up @@ -585,7 +591,14 @@ mod tests {
let mut dest = Vec::new();
let src = tokio::io::BufReader::new(tokio::fs::File::open(rootfs_tar_path).await?);
let cap_tmpdir = Dir::open_ambient_dir(&tempd, cap_std::ambient_authority())?;
filter_tar_async(src, &mut dest, &Default::default(), cap_tmpdir).await?;
filter_tar_async(
src,
oci_image::MediaType::ImageLayer,
&mut dest,
&Default::default(),
cap_tmpdir,
)
.await?;
let dest = dest.as_slice();
let mut final_tar = tar::Archive::new(Cursor::new(dest));
let destdir = &tempd.path().join("destdir");
Expand Down
9 changes: 9 additions & 0 deletions lib/src/tokio_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,15 @@ where
spawn_blocking_cancellable(f).map(flatten_anyhow)
}

/// A wrapper around [`tokio::task::spawn_blocking`] that flattens nested results.
pub fn spawn_blocking_flatten<F, T>(f: F) -> impl Future<Output = Result<T>>
where
F: FnOnce() -> Result<T> + Send + 'static,
T: Send + 'static,
{
tokio::task::spawn_blocking(f).map(flatten_anyhow)
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
18 changes: 16 additions & 2 deletions lib/tests/it/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,14 @@ async fn test_tar_write() -> Result<()> {
let src = fixture.dir.open(tmptar)?;
fixture.dir.remove_file(tmptar)?;
let src = tokio::fs::File::from_std(src.into_std());
let r = ostree_ext::tar::write_tar(fixture.destrepo(), src, "layer", None).await?;
let r = ostree_ext::tar::write_tar(
fixture.destrepo(),
src,
oci_image::MediaType::ImageLayer,
"layer",
None,
)
.await?;
let layer_commit = r.commit.as_str();
cmd!(
sh,
Expand All @@ -409,7 +416,14 @@ async fn test_tar_write_tar_layer() -> Result<()> {
let mut dec = flate2::bufread::GzDecoder::new(std::io::Cursor::new(EXAMPLE_TAR_LAYER));
let _n = std::io::copy(&mut dec, &mut v)?;
let r = tokio::io::BufReader::new(std::io::Cursor::new(v));
ostree_ext::tar::write_tar(fixture.destrepo(), r, "test", None).await?;
ostree_ext::tar::write_tar(
fixture.destrepo(),
r,
oci_image::MediaType::ImageLayer,
"test",
None,
)
.await?;
Ok(())
}

Expand Down
Loading