Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

install: Disable fsync() in repo when pulling && improved pull progress #655

Merged
merged 3 commits into from
Jul 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions lib/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,7 @@ async fn upgrade(opts: UpgradeOpts) -> Result<()> {
}
}
} else {
let fetched = crate::deploy::pull(sysroot, imgref, opts.quiet).await?;
let fetched = crate::deploy::pull(repo, imgref, opts.quiet).await?;
let kargs = crate::kargs::get_kargs(repo, &booted_deployment, fetched.as_ref())?;
let staged_digest = staged_image.as_ref().map(|s| s.image_digest.as_str());
let fetched_digest = fetched.manifest_digest.as_str();
Expand Down Expand Up @@ -630,7 +630,7 @@ async fn switch(opts: SwitchOpts) -> Result<()> {
}
let new_spec = RequiredHostSpec::from_spec(&new_spec)?;

let fetched = crate::deploy::pull(sysroot, &target, opts.quiet).await?;
let fetched = crate::deploy::pull(repo, &target, opts.quiet).await?;
let kargs = crate::kargs::get_kargs(repo, &booted_deployment, fetched.as_ref())?;

if !opts.retain {
Expand Down Expand Up @@ -664,6 +664,8 @@ async fn rollback(_opts: RollbackOpts) -> Result<()> {
#[context("Editing spec")]
async fn edit(opts: EditOpts) -> Result<()> {
let sysroot = &get_locked_sysroot().await?;
let repo = &sysroot.repo();

let (booted_deployment, _deployments, host) =
crate::status::get_status_require_booted(sysroot)?;
let new_host: Host = if let Some(filename) = opts.filename {
Expand All @@ -690,8 +692,7 @@ async fn edit(opts: EditOpts) -> Result<()> {
return crate::deploy::rollback(sysroot).await;
}

let fetched = crate::deploy::pull(sysroot, new_spec.image, opts.quiet).await?;
let repo = &sysroot.repo();
let fetched = crate::deploy::pull(repo, new_spec.image, opts.quiet).await?;
let kargs = crate::kargs::get_kargs(repo, &booted_deployment, fetched.as_ref())?;

// TODO gc old layers here
Expand Down
104 changes: 77 additions & 27 deletions lib/src/deploy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ use fn_error_context::context;
use ostree::{gio, glib};
use ostree_container::OstreeImageReference;
use ostree_ext::container as ostree_container;
use ostree_ext::container::store::PrepareResult;
use ostree_ext::container::store::{ImportProgress, PrepareResult};
use ostree_ext::oci_spec::image::Descriptor;
use ostree_ext::ostree;
use ostree_ext::ostree::Deployment;
use ostree_ext::sysroot::SysrootLock;
Expand Down Expand Up @@ -112,34 +113,76 @@ pub(crate) fn check_bootc_label(config: &ostree_ext::oci_spec::image::ImageConfi
}
}

fn descriptor_of_progress(p: &ImportProgress) -> &Descriptor {
match p {
ImportProgress::OstreeChunkStarted(l) => l,
ImportProgress::OstreeChunkCompleted(l) => l,
ImportProgress::DerivedLayerStarted(l) => l,
ImportProgress::DerivedLayerCompleted(l) => l,
}
}

fn prefix_of_progress(p: &ImportProgress) -> &'static str {
match p {
ImportProgress::OstreeChunkStarted(_) | ImportProgress::OstreeChunkCompleted(_) => {
"ostree chunk"
}
ImportProgress::DerivedLayerStarted(_) | ImportProgress::DerivedLayerCompleted(_) => {
"layer"
}
}
}

/// Write container fetch progress to standard output.
async fn handle_layer_progress_print(
mut layers: tokio::sync::mpsc::Receiver<ostree_container::store::ImportProgress>,
mut layer_bytes: tokio::sync::watch::Receiver<Option<ostree_container::store::LayerProgress>>,
total_layers: usize,
n_layers_fetched: &mut usize,
n_layers_to_fetch: usize,
) {
let style = indicatif::ProgressStyle::default_bar();
let pb = indicatif::ProgressBar::new(100);
pb.set_style(
style
.template("{prefix} {bytes} [{bar:20}] ({eta}) {msg}")
let start = std::time::Instant::now();
let mut total_read = 0u64;
let bar = indicatif::MultiProgress::new();
let layers_bar = bar.add(indicatif::ProgressBar::new(
n_layers_to_fetch.try_into().unwrap(),
));
let byte_bar = bar.add(indicatif::ProgressBar::new(0));
// let byte_bar = indicatif::ProgressBar::new(0);
// byte_bar.set_draw_target(indicatif::ProgressDrawTarget::hidden());
layers_bar.set_style(
indicatif::ProgressStyle::default_bar()
.template("{prefix} {bar} {pos}/{len} {wide_msg}")
.unwrap(),
);
layers_bar.set_prefix("Fetching layers");
layers_bar.set_message("");
byte_bar.set_prefix("Fetching");
byte_bar.set_style(
indicatif::ProgressStyle::default_bar()
.template(
" └ {prefix} {bar} {binary_bytes}/{binary_total_bytes} ({binary_bytes_per_sec}) {wide_msg}",
)
.unwrap()
);
loop {
tokio::select! {
// Always handle layer changes first.
biased;
layer = layers.recv() => {
if let Some(l) = layer {
let layer = descriptor_of_progress(&l);
let layer_size = u64::try_from(layer.size()).unwrap();
if l.is_starting() {
pb.set_position(0);
byte_bar.reset_elapsed();
byte_bar.reset_eta();
byte_bar.set_length(layer_size);
let layer_type = prefix_of_progress(&l);
let short_digest = &layer.digest()[0..21];
byte_bar.set_message(format!("{layer_type} {short_digest}"));
} else {
pb.finish();
*n_layers_fetched += 1;
byte_bar.set_position(layer_size);
layers_bar.inc(1);
total_read = total_read.saturating_add(layer_size);
}
pb.set_prefix(format!("[{}/{}]", *n_layers_fetched, total_layers));
pb.set_message(ostree_ext::cli::layer_progress_format(&l));
} else {
// If the receiver is disconnected, then we're done
break
Expand All @@ -152,23 +195,35 @@ async fn handle_layer_progress_print(
}
let bytes = layer_bytes.borrow();
if let Some(bytes) = &*bytes {
pb.set_length(bytes.total);
pb.set_position(bytes.fetched);
byte_bar.set_position(bytes.fetched);
}
}

}
}
byte_bar.finish_and_clear();
layers_bar.finish_and_clear();
if let Err(e) = bar.clear() {
tracing::warn!("clearing bar: {e}");
}
let end = std::time::Instant::now();
let elapsed = end.duration_since(start);
let persec = total_read as f64 / elapsed.as_secs_f64();
let persec = indicatif::HumanBytes(persec as u64);
println!(
"Fetched layers: {} in {} ({}/s)",
indicatif::HumanBytes(total_read),
indicatif::HumanDuration(elapsed),
persec,
);
}

/// Wrapper for pulling a container image, wiring up status output.
#[context("Pulling")]
pub(crate) async fn pull(
sysroot: &SysrootLock,
repo: &ostree::Repo,
imgref: &ImageReference,
quiet: bool,
) -> Result<Box<ImageState>> {
let repo = &sysroot.repo();
let ostree_imgref = &OstreeImageReference::from(imgref.clone());
let mut imp = new_importer(repo, ostree_imgref).await?;
let prep = match imp.prepare().await? {
Expand All @@ -183,19 +238,14 @@ pub(crate) async fn pull(
ostree_ext::cli::print_deprecated_warning(warning).await;
}
ostree_ext::cli::print_layer_status(&prep);
let layers_to_fetch = prep.layers_to_fetch().collect::<Result<Vec<_>>>()?;
let n_layers_to_fetch = layers_to_fetch.len();
let printer = (!quiet).then(|| {
let layer_progress = imp.request_progress();
let layer_byte_progress = imp.request_layer_progress();
let total_layers = prep.layers_to_fetch().count();
let mut n_fetched = 0usize;
tokio::task::spawn(async move {
handle_layer_progress_print(
layer_progress,
layer_byte_progress,
total_layers,
&mut n_fetched,
)
.await
handle_layer_progress_print(layer_progress, layer_byte_progress, n_layers_to_fetch)
.await
})
});
let import = imp.import(prep).await;
Expand Down
11 changes: 11 additions & 0 deletions lib/src/install.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use serde::{Deserialize, Serialize};
use self::baseline::InstallBlockDeviceOpts;
use crate::containerenv::ContainerExecutionInfo;
use crate::mount::Filesystem;
use crate::spec::ImageReference;
use crate::task::Task;
use crate::utils::sigpolicy_from_opts;

Expand Down Expand Up @@ -651,6 +652,16 @@ async fn initialize_ostree_root_from_self(
imgref: src_imageref,
};

// Pull the container image into the target root filesystem. Since this is
// an install path, we don't need to fsync() individual layers.
{
let spec_imgref = ImageReference::from(src_imageref.clone());
let repo = &sysroot.repo();
repo.set_disable_fsync(true);
crate::deploy::pull(repo, &spec_imgref, false).await?;
repo.set_disable_fsync(false);
}

// Load the kargs from the /usr/lib/bootc/kargs.d from the running root,
// which should be the same as the filesystem we'll deploy.
let kargsd = crate::kargs::get_kargs_in_root(container_rootfs, std::env::consts::ARCH)?;
Expand Down
Loading