From e39b72769993b1dde1548149da7814c360ba3cda Mon Sep 17 00:00:00 2001 From: Colin Walters Date: Thu, 31 Aug 2023 15:55:34 -0400 Subject: [PATCH 1/2] container: Add a trace log for when we discard "broken pipe" error I don't think we're hitting this in https://github.com/coreos/rpm-ostree/issues/4567 but it'd be useful to have a trace message in case. --- lib/src/container/unencapsulate.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/src/container/unencapsulate.rs b/lib/src/container/unencapsulate.rs index e4f37334..1e162bfd 100644 --- a/lib/src/container/unencapsulate.rs +++ b/lib/src/container/unencapsulate.rs @@ -165,6 +165,7 @@ pub(crate) async fn join_fetch( (Err(worker), Err(driver)) => { let text = driver.root_cause().to_string(); if text.ends_with("broken pipe") { + tracing::trace!("Ignoring broken pipe failure from driver"); Err(worker) } else { Err(worker.context(format!("proxy failure: {} and client error", text))) From 13455cce4923ac0c3237d3983c2d4044842fb139 Mon Sep 17 00:00:00 2001 From: Colin Walters Date: Thu, 31 Aug 2023 17:18:21 -0400 Subject: [PATCH 2/2] tar: Hold open input stream as long as possible I'm hoping this will help us debug https://github.com/coreos/rpm-ostree/issues/4567 ``` [2023-08-30T15:00:16.554Z] Aug 30 15:00:15 qemu0 kola-runext-container-image[1957]: error: Importing: Parsing layer blob sha256:00623c39da63781bdd3fb00fedb36f8b9ec95e42cdb4d389f692457f24c67144: Failed to invoke skopeo proxy method FinishPipe: remote error: write |1: broken pipe ``` I haven't been able to reproduce it outside of CI yet, but we had a prior ugly hack for this in https://github.com/ostreedev/ostree-rs-ext/commit/a27dac83831297a6e83bd25c5b6b1b842249ad4d As the comments say - the goal is to hold open the input stream as long as feasibly possible. --- lib/src/tar/write.rs | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/lib/src/tar/write.rs b/lib/src/tar/write.rs index 29e0d82c..df147c90 100644 --- a/lib/src/tar/write.rs +++ b/lib/src/tar/write.rs @@ -259,16 +259,28 @@ async fn filter_tar_async( mut dest: impl AsyncWrite + Send + Unpin, ) -> Result> { let (tx_buf, mut rx_buf) = tokio::io::duplex(8192); + // The source must be moved to the heap so we know it is stable for passing to the worker thread let src = Box::pin(src); - let tar_transformer = tokio::task::spawn_blocking(move || -> Result<_> { - let src = tokio_util::io::SyncIoBridge::new(src); + let tar_transformer = tokio::task::spawn_blocking(move || { + let mut src = tokio_util::io::SyncIoBridge::new(src); let dest = tokio_util::io::SyncIoBridge::new(tx_buf); - filter_tar(src, dest) + let r = filter_tar(&mut src, dest); + // Pass ownership of the input stream back to the caller - see below. + (r, src) }); let copier = tokio::io::copy(&mut rx_buf, &mut dest); let (r, v) = tokio::join!(tar_transformer, copier); let _v: u64 = v?; - r? + let (r, src) = r?; + // Note that the worker thread took temporary ownership of the input stream; we only close + // it at this point, after we're sure we've done all processing of the input. The reason + // for this is that both the skopeo process *or* us could encounter an error (see join_fetch). + // By ensuring we hold the stream open as long as possible, it ensures that we're going to + // see a remote error first, instead of the remote skopeo process seeing us close the pipe + // because we found an error. + drop(src); + // And pass back the result + r } /// Write the contents of a tarball as an ostree commit.