Skip to content

Commit

Permalink
pull: Improved progress output
Browse files Browse the repository at this point in the history
indicatif has nice support for multiple bars. Instead
of hand-rolling a `[nn/NN]` for the overall layer count,
change things so that we have:

```
Fetching layers [bar...] 8/65
 ostree chunk sha256:29fc11ff03e4b3 [bar] (0 B/s)
```

Signed-off-by: Colin Walters <[email protected]>
  • Loading branch information
cgwalters committed Jun 29, 2024
1 parent 6c81a40 commit d628101
Showing 1 changed file with 84 additions and 23 deletions.
107 changes: 84 additions & 23 deletions lib/src/deploy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ use cap_std::fs::{Dir, MetadataExt};
use cap_std_ext::cap_std;
use cap_std_ext::dirext::CapStdExtDirExt;
use fn_error_context::context;
use indicatif::ProgressBar;
use ostree::{gio, glib};
use ostree_container::OstreeImageReference;
use ostree_ext::container as ostree_container;
use ostree_ext::container::store::PrepareResult;
use ostree_ext::container::store::{ImportProgress, PrepareResult};
use ostree_ext::oci_spec::image::Descriptor;
use ostree_ext::ostree;
use ostree_ext::ostree::Deployment;
use ostree_ext::sysroot::SysrootLock;
Expand Down Expand Up @@ -112,34 +114,84 @@ pub(crate) fn check_bootc_label(config: &ostree_ext::oci_spec::image::ImageConfi
}
}

fn descriptor_of_progress(p: &ImportProgress) -> &Descriptor {
match p {
ImportProgress::OstreeChunkStarted(l) => l,
ImportProgress::OstreeChunkCompleted(l) => l,
ImportProgress::DerivedLayerStarted(l) => l,
ImportProgress::DerivedLayerCompleted(l) => l,
}
}

fn prefix_of_progress(p: &ImportProgress) -> &'static str {
match p {
ImportProgress::OstreeChunkStarted(_) | ImportProgress::OstreeChunkCompleted(_) => {
"ostree chunk"
}
ImportProgress::DerivedLayerStarted(_) | ImportProgress::DerivedLayerCompleted(_) => {
"layer"
}
}
}

/// Write container fetch progress to standard output.
async fn handle_layer_progress_print(
mut layers: tokio::sync::mpsc::Receiver<ostree_container::store::ImportProgress>,
mut layer_bytes: tokio::sync::watch::Receiver<Option<ostree_container::store::LayerProgress>>,
total_layers: usize,
n_layers_fetched: &mut usize,
n_layers_to_fetch: usize,
) {
let start = std::time::Instant::now();
let mut total_read = 0u64;
let bar = indicatif::MultiProgress::new();
let layers_bar = bar.add(indicatif::ProgressBar::new(
n_layers_to_fetch.try_into().unwrap(),
));
let style = indicatif::ProgressStyle::default_bar();
let pb = indicatif::ProgressBar::new(100);
pb.set_style(
layers_bar.set_style(
style
.template("{prefix} {bytes} [{bar:20}] ({eta}) {msg}")
.template("{prefix} {bar} {pos}/{len} {wide_msg}")
.unwrap(),
);
layers_bar.set_prefix("Fetching layers");
layers_bar.set_message("");
let mut byte_bar_holder: Option<ProgressBar> = None;
fn new_byte_bar() -> ProgressBar {
let layers_bar = indicatif::ProgressBar::new(0);
let style = indicatif::ProgressStyle::default_bar();
layers_bar.set_style(
style
.template(
" └ {prefix} {bar} {binary_bytes}/{binary_total_bytes} ({binary_bytes_per_sec}) {wide_msg}",
)
.unwrap(),
);
layers_bar
}
loop {
tokio::select! {
// Always handle layer changes first.
biased;
layer = layers.recv() => {
if let Some(l) = layer {
let layer = descriptor_of_progress(&l);
let layer_size = u64::try_from(layer.size()).unwrap();
if l.is_starting() {
pb.set_position(0);
let pb = byte_bar_holder.take().unwrap_or_else(|| {
let pb = new_byte_bar();
bar.insert_after(&layers_bar, pb)
});
pb.reset();
pb.set_length(layer_size);
let layer_type = prefix_of_progress(&l);
let short_digest = &layer.digest()[0..21];
pb.set_prefix(format!("{layer_type} {short_digest}"));
byte_bar_holder = Some(pb);
} else {
pb.finish();
*n_layers_fetched += 1;
let byte_bar = byte_bar_holder.as_ref().expect("byte progress");
byte_bar.set_position(layer_size);
layers_bar.inc(1);
total_read = total_read.saturating_add(layer_size);
}
pb.set_prefix(format!("[{}/{}]", *n_layers_fetched, total_layers));
pb.set_message(ostree_ext::cli::layer_progress_format(&l));
} else {
// If the receiver is disconnected, then we're done
break
Expand All @@ -150,15 +202,29 @@ async fn handle_layer_progress_print(
// If the receiver is disconnected, then we're done
break
}
// Avoid updating too frequently
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let bytes = layer_bytes.borrow();
let bytes_progress = byte_bar_holder.as_ref().expect("layer progress before bytes progress");
if let Some(bytes) = &*bytes {
pb.set_length(bytes.total);
pb.set_position(bytes.fetched);
bytes_progress.set_position(bytes.fetched);
}
}

}
}
if let Err(e) = bar.clear() {
tracing::warn!("clearing bar: {e}");
}
let end = std::time::Instant::now();
let elapsed = end.duration_since(start);
let persec = total_read as f64 / elapsed.as_secs_f64();
let persec = indicatif::HumanBytes(persec as u64);
println!(
"Fetched layers: {} in {} ({}/s)",
indicatif::HumanBytes(total_read),
indicatif::HumanDuration(elapsed),
persec,
);
}

/// Wrapper for pulling a container image, wiring up status output.
Expand All @@ -182,19 +248,14 @@ pub(crate) async fn pull(
ostree_ext::cli::print_deprecated_warning(warning).await;
}
ostree_ext::cli::print_layer_status(&prep);
let layers_to_fetch = prep.layers_to_fetch().collect::<Result<Vec<_>>>()?;
let n_layers_to_fetch = layers_to_fetch.len();
let printer = (!quiet).then(|| {
let layer_progress = imp.request_progress();
let layer_byte_progress = imp.request_layer_progress();
let total_layers = prep.layers_to_fetch().count();
let mut n_fetched = 0usize;
tokio::task::spawn(async move {
handle_layer_progress_print(
layer_progress,
layer_byte_progress,
total_layers,
&mut n_fetched,
)
.await
handle_layer_progress_print(layer_progress, layer_byte_progress, n_layers_to_fetch)
.await
})
});
let import = imp.import(prep).await;
Expand Down

0 comments on commit d628101

Please sign in to comment.