Skip to content
This repository has been archived by the owner on Nov 7, 2024. It is now read-only.

Commit

Permalink
store: Only close self-pipe when we're done
Browse files Browse the repository at this point in the history
This took a crazy long time to debug but after lots of false
starts I think this is right. Basically what's going on is
we have async tasks that are talking over a `pipe()` inside
our own process.

We must not close the read side of the pipe until the
writer is done.

I believe this is dependent on tokio task scheduling order,
and it's way easier to reproduce when pinned to a single CPU.

Closes: #657
Signed-off-by: Colin Walters <[email protected]>
  • Loading branch information
cgwalters committed Oct 23, 2024
1 parent 993a583 commit 3b2c098
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 9 deletions.
24 changes: 16 additions & 8 deletions lib/src/container/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -730,8 +730,8 @@ impl ImageImporter {
crate::tokio_util::spawn_blocking_cancellable_flatten(move |cancellable| {
let txn = repo.auto_transaction(Some(cancellable))?;
let mut importer = crate::tar::Importer::new_for_object_set(&repo);
let blob = tokio_util::io::SyncIoBridge::new(blob);
let mut archive = tar::Archive::new(blob);
let mut blob = tokio_util::io::SyncIoBridge::new(blob);
let mut archive = tar::Archive::new(&mut blob);
importer.import_objects(&mut archive, Some(cancellable))?;
let commit = if write_refs {
let commit = importer.finish_import_object_set()?;
Expand All @@ -742,10 +742,14 @@ impl ImageImporter {
None
};
txn.commit(Some(cancellable))?;
Ok::<_, anyhow::Error>(commit)
// Pass back ownership, see below
Ok::<_, anyhow::Error>((blob.into_inner(), commit))
})
.map_err(|e| e.context(format!("Layer {}", layer.layer.digest())));
let commit = super::unencapsulate::join_fetch(import_task, driver).await?;
let (blob, commit) = super::unencapsulate::join_fetch(import_task, driver).await?;
// We can't close the read side until we've completed the rest of the processing
// to avoid breaking our own pipe-to-self. See https://github.com/ostreedev/ostree-rs-ext/issues/657
drop(blob);
layer.commit = commit;
if let Some(p) = self.layer_progress.as_ref() {
p.send(ImportProgress::OstreeChunkCompleted(layer.layer.clone()))
Expand Down Expand Up @@ -775,8 +779,8 @@ impl ImageImporter {
crate::tokio_util::spawn_blocking_cancellable_flatten(move |cancellable| {
let txn = repo.auto_transaction(Some(cancellable))?;
let mut importer = crate::tar::Importer::new_for_commit(&repo, remote);
let blob = tokio_util::io::SyncIoBridge::new(blob);
let mut archive = tar::Archive::new(blob);
let mut blob = tokio_util::io::SyncIoBridge::new(blob);
let mut archive = tar::Archive::new(&mut blob);
importer.import_commit(&mut archive, Some(cancellable))?;
let commit = importer.finish_import_commit();
if write_refs {
Expand All @@ -785,9 +789,13 @@ impl ImageImporter {
}
repo.mark_commit_partial(&commit, false)?;
txn.commit(Some(cancellable))?;
Ok::<_, anyhow::Error>(commit)
// Pass back ownership, see below
Ok::<_, anyhow::Error>((blob.into_inner(), commit))
});
let commit = super::unencapsulate::join_fetch(import_task, driver).await?;
let (blob, commit) = super::unencapsulate::join_fetch(import_task, driver).await?;
// We can't close the read side until we've completed the rest of the processing
// to avoid breaking our own pipe-to-self. See https://github.com/ostreedev/ostree-rs-ext/issues/657
drop(blob);
import.ostree_commit_layer.commit = Some(commit);
if let Some(p) = self.layer_progress.as_ref() {
p.send(ImportProgress::OstreeChunkCompleted(
Expand Down
2 changes: 1 addition & 1 deletion lib/src/container/unencapsulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ pub struct Import {
/// to see if the worker function had an error *and* if the proxy
/// had an error, but if the proxy's error ends in `broken pipe`
/// then it means the real only error is from the worker.
pub(crate) async fn join_fetch<T: std::fmt::Debug>(
pub(crate) async fn join_fetch<T>(
worker: impl Future<Output = Result<T>>,
driver: impl Future<Output = Result<()>>,
) -> Result<T> {
Expand Down

0 comments on commit 3b2c098

Please sign in to comment.