diff --git a/lib/src/container/unencapsulate.rs b/lib/src/container/unencapsulate.rs index e4f37334..1e162bfd 100644 --- a/lib/src/container/unencapsulate.rs +++ b/lib/src/container/unencapsulate.rs @@ -165,6 +165,7 @@ pub(crate) async fn join_fetch( (Err(worker), Err(driver)) => { let text = driver.root_cause().to_string(); if text.ends_with("broken pipe") { + tracing::trace!("Ignoring broken pipe failure from driver"); Err(worker) } else { Err(worker.context(format!("proxy failure: {} and client error", text))) diff --git a/lib/src/tar/write.rs b/lib/src/tar/write.rs index 29e0d82c..df147c90 100644 --- a/lib/src/tar/write.rs +++ b/lib/src/tar/write.rs @@ -259,16 +259,28 @@ async fn filter_tar_async( mut dest: impl AsyncWrite + Send + Unpin, ) -> Result> { let (tx_buf, mut rx_buf) = tokio::io::duplex(8192); + // The source must be moved to the heap so we know it is stable for passing to the worker thread let src = Box::pin(src); - let tar_transformer = tokio::task::spawn_blocking(move || -> Result<_> { - let src = tokio_util::io::SyncIoBridge::new(src); + let tar_transformer = tokio::task::spawn_blocking(move || { + let mut src = tokio_util::io::SyncIoBridge::new(src); let dest = tokio_util::io::SyncIoBridge::new(tx_buf); - filter_tar(src, dest) + let r = filter_tar(&mut src, dest); + // Pass ownership of the input stream back to the caller - see below. + (r, src) }); let copier = tokio::io::copy(&mut rx_buf, &mut dest); let (r, v) = tokio::join!(tar_transformer, copier); let _v: u64 = v?; - r? + let (r, src) = r?; + // Note that the worker thread took temporary ownership of the input stream; we only close + // it at this point, after we're sure we've done all processing of the input. The reason + // for this is that both the skopeo process *or* us could encounter an error (see join_fetch). + // By ensuring we hold the stream open as long as possible, it ensures that we're going to + // see a remote error first, instead of the remote skopeo process seeing us close the pipe + // because we found an error. + drop(src); + // And pass back the result + r } /// Write the contents of a tarball as an ostree commit.