diff --git a/Cargo.lock b/Cargo.lock index 5f61caf57..439493159 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -148,17 +148,6 @@ dependencies = [ "syn 2.0.37", ] -[[package]] -name = "atty" -version = "0.2.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" -dependencies = [ - "hermit-abi 0.1.19", - "libc", - "winapi", -] - [[package]] name = "autocfg" version = "1.1.0" @@ -752,23 +741,6 @@ dependencies = [ "windows-targets 0.48.5", ] -[[package]] -name = "clap" -version = "3.2.25" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ea181bf566f71cb9a5d17a59e1871af638180a18fb0035c92ae62b705207123" -dependencies = [ - "atty", - "bitflags 1.3.2", - "clap_derive 3.2.25", - "clap_lex 0.2.4", - "indexmap 1.9.3", - "once_cell", - "strsim", - "termcolor", - "textwrap", -] - [[package]] name = "clap" version = "4.4.6" @@ -776,7 +748,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d04704f56c2cde07f43e8e2c154b43f216dc5c92fc98ada720177362f953b956" dependencies = [ "clap_builder", - "clap_derive 4.4.2", + "clap_derive", ] [[package]] @@ -787,23 +759,10 @@ checksum = "0e231faeaca65ebd1ea3c737966bf858971cd38c3849107aa3ea7de90a804e45" dependencies = [ "anstream", "anstyle", - "clap_lex 0.5.1", + "clap_lex", "strsim", ] -[[package]] -name = "clap_derive" -version = "3.2.25" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae6371b8bdc8b7d3959e9cf7b22d4435ef3e79e138688421ec654acf8c81b008" -dependencies = [ - "heck", - "proc-macro-error", - "proc-macro2", - "quote", - "syn 1.0.109", -] - [[package]] name = "clap_derive" version = "4.4.2" @@ -816,15 +775,6 @@ dependencies = [ "syn 2.0.37", ] -[[package]] -name = "clap_lex" -version = "0.2.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2850f2f5a82cbf437dd5af4d49848fbdfc27c157c3d010345776f952765261c5" -dependencies = [ - "os_str_bytes", -] - [[package]] name = "clap_lex" version = "0.5.1" @@ -1400,15 +1350,6 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" -[[package]] -name = "hermit-abi" -version = "0.1.19" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62b467343b94ba476dcb2500d242dadbb39557df889310ac77c5d99100aaac33" -dependencies = [ - "libc", -] - [[package]] name = "hermit-abi" version = "0.3.3" @@ -1645,7 +1586,7 @@ version = "0.4.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cb0889898416213fab133e1d33a0e5858a48177452750691bde3666d0fdbaf8b" dependencies = [ - "hermit-abi 0.3.3", + "hermit-abi", "rustix", "windows-sys 0.48.0", ] @@ -1734,7 +1675,7 @@ dependencies = [ "jsonpath_lib", "k8s-openapi", "kube-core", - "pem", + "pem 1.1.1", "pin-project", "rand", "rustls 0.21.7", @@ -1937,7 +1878,7 @@ version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" dependencies = [ - "hermit-abi 0.3.3", + "hermit-abi", "libc", ] @@ -2007,12 +1948,6 @@ dependencies = [ "windows-sys 0.48.0", ] -[[package]] -name = "os_str_bytes" -version = "6.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d5d9eb14b174ee9aa2ef96dc2b94637a2d4b6e7cb873c7e171f0c20c6cf3eac" - [[package]] name = "outref" version = "0.5.1" @@ -2062,30 +1997,22 @@ dependencies = [ ] [[package]] -name = "path-absolutize" -version = "3.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e4af381fe79fa195b4909485d99f73a80792331df0625188e707854f0b3383f5" -dependencies = [ - "path-dedot", -] - -[[package]] -name = "path-dedot" -version = "3.1.1" +name = "pem" +version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "07ba0ad7e047712414213ff67533e6dd477af0a4e1d14fb52343e53d30ea9397" +checksum = "a8835c273a76a90455d7344889b0964598e3316e2a79ede8e36f16bdcf2228b8" dependencies = [ - "once_cell", + "base64 0.13.1", ] [[package]] name = "pem" -version = "1.1.1" +version = "3.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a8835c273a76a90455d7344889b0964598e3316e2a79ede8e36f16bdcf2228b8" +checksum = "3163d2912b7c3b52d651a055f2c7eec9ba5cd22d26ef75b8dd3a59980b185923" dependencies = [ - "base64 0.13.1", + "base64 0.21.4", + "serde", ] [[package]] @@ -2230,8 +2157,9 @@ dependencies = [ "aws-smithy-types", "aws-types", "buildsys", + "bytes", "chrono", - "clap 4.4.6", + "clap", "coldsnap", "duct", "futures", @@ -2243,7 +2171,6 @@ dependencies = [ "num_cpus", "parse-datetime", "pubsys-config", - "rayon", "reqwest", "semver", "serde", @@ -2284,7 +2211,7 @@ dependencies = [ name = "pubsys-setup" version = "0.1.0" dependencies = [ - "clap 4.4.6", + "clap", "hex", "log", "pubsys-config", @@ -2470,10 +2397,12 @@ dependencies = [ "system-configuration", "tokio", "tokio-rustls 0.24.1", + "tokio-util", "tower-service", "url", "wasm-bindgen", "wasm-bindgen-futures", + "wasm-streams", "web-sys", "webpki-roots", "winreg", @@ -2872,6 +2801,8 @@ checksum = "e4de37ad025c587a29e8f3f5605c00f70b98715ef90b9061a815b9e59e9042d6" dependencies = [ "backtrace", "doc-comment", + "futures-core", + "pin-project", "snafu-derive", ] @@ -3045,7 +2976,7 @@ dependencies = [ "base64 0.21.4", "bottlerocket-types", "bottlerocket-variant", - "clap 4.4.6", + "clap", "env_logger", "fastrand 2.0.1", "futures", @@ -3114,12 +3045,6 @@ dependencies = [ "topological-sort", ] -[[package]] -name = "textwrap" -version = "0.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "222a222a5bfe1bba4a77b45ec488a741b3cb8872e5e499451fd7d0129c9c7c3d" - [[package]] name = "thiserror" version = "1.0.49" @@ -3335,18 +3260,22 @@ checksum = "ea68304e134ecd095ac6c3574494fc62b909f416c4fca77e440530221e549d3d" [[package]] name = "tough" -version = "0.14.0" +version = "0.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eda3efa9005cf9c1966984c3b9a44c3f37b7ed2c95ba338d6ad51bba70e989a0" +checksum = "d16dc5f42fc7ce7cb51eebc7a6ef91f4d69a6d41bb13f34a09674ec47e454d9b" dependencies = [ + "async-recursion", + "async-trait", + "bytes", "chrono", "dyn-clone", + "futures", + "futures-core", "globset", "hex", "log", "olpc-cjson", - "path-absolutize", - "pem", + "pem 3.0.2", "percent-encoding", "reqwest", "ring", @@ -3355,6 +3284,9 @@ dependencies = [ "serde_plain", "snafu", "tempfile", + "tokio", + "tokio-util", + "typed-path", "untrusted", "url", "walkdir", @@ -3362,13 +3294,13 @@ dependencies = [ [[package]] name = "tough-kms" -version = "0.6.0" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc49c1a5300e54484604162ec78417fc39306f0c9e2c98166df3ebfa203d6800" +checksum = "99a56a6fa02de987070fdf751b7d761af8f750ed51b269ee0bb8c9f0d98db4c1" dependencies = [ "aws-config", "aws-sdk-kms", - "pem", + "pem 3.0.2", "ring", "snafu", "tokio", @@ -3377,9 +3309,9 @@ dependencies = [ [[package]] name = "tough-ssm" -version = "0.9.0" +version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bcf4932265842607b42840e65f3fde9dde2834eaa97209b994d6c1a7ff9f3fd7" +checksum = "211fc74d544db304d19d4c9030a5e2dd189ce82496b16f1627ad201062351d30" dependencies = [ "aws-config", "aws-sdk-ssm", @@ -3488,19 +3420,21 @@ checksum = "3528ecfd12c466c6f163363caf2d02a71161dd5e1cc6ae7b34207ea2d42d81ed" [[package]] name = "tuftool" -version = "0.10.0" +version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "659f5ef4d8b3f2ef48f73df042820dc19e66b375aeca341aa1e8f0b1b989a134" +checksum = "0dd87a609b3ea68aba986684805e99fa2a901e4c0d4bec6705f1bfb0b5b60c7d" dependencies = [ "aws-config", "aws-sdk-kms", "aws-sdk-ssm", "chrono", - "clap 3.2.25", + "clap", + "futures", "hex", "log", "maplit", "olpc-cjson", + "pem 3.0.2", "rayon", "reqwest", "ring", @@ -3509,6 +3443,7 @@ dependencies = [ "simplelog", "snafu", "tempfile", + "tokio", "tough", "tough-kms", "tough-ssm", @@ -3543,7 +3478,7 @@ dependencies = [ "async-recursion", "buildsys", "bytes", - "clap 4.4.6", + "clap", "env_logger", "flate2", "hex", @@ -3562,6 +3497,12 @@ dependencies = [ "uuid", ] +[[package]] +name = "typed-path" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb9d13b8242894ff21f9990082b90a6410a43dcc6029ac4227a1467853ba781" + [[package]] name = "typenum" version = "1.17.0" @@ -3776,6 +3717,19 @@ version = "0.2.87" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ca6ad05a4870b2bf5fe995117d3728437bd27d7cd5f06f13c17443ef369775a1" +[[package]] +name = "wasm-streams" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4609d447824375f43e1ffbc051b50ad8f4b3ae8219680c94452ea05eb240ac7" +dependencies = [ + "futures-util", + "js-sys", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "web-sys" version = "0.3.64" diff --git a/deny.toml b/deny.toml index c4d446227..ba9df0464 100644 --- a/deny.toml +++ b/deny.toml @@ -69,22 +69,18 @@ skip = [ { name = "base64" }, # several dependencies are using an old version of bitflags { name = "bitflags", version = "=1.3" }, - # tuftool is using an older version of clap - { name = "clap", version = "3" }, - { name = "clap_derive", version = "3" }, - { name = "clap_lex", version = "0.2" }, # several dependencies are using an old version of serde_yaml { name = "serde_yaml", version = "=0.8" }, # aws-sdk-rust is using an old version of fastrand { name = "fastrand", version = "=1.9" }, # multiple deps are using an older version of hashbrown { name = "hashbrown", version = "=0.12" }, - # tuftool is using an old clap (v3) which is using old hermit-abi - { name = "hermit-abi", version = "0.1" }, # multiple deps are using an older version of indexmap { name = "indexmap", version = "1" }, # kube-client uses an old version of redox_syscall { name = "redox_syscall", version = "=0.2" }, + # kube-client uses an older version of pem + { name = "pem", version = "=1" }, # hyper and tokio are using different versions of socket2 { name = "socket2", version = "0.4" }, # multiple deps are using an older version of syn diff --git a/tools/pubsys/Cargo.toml b/tools/pubsys/Cargo.toml index 45aaaa2be..37b38729d 100644 --- a/tools/pubsys/Cargo.toml +++ b/tools/pubsys/Cargo.toml @@ -17,6 +17,7 @@ aws-sdk-sts = "0.28" aws-smithy-types = "0.55" aws-types = "0.55" buildsys = { path = "../buildsys", version = "0.1" } +bytes = "1" chrono = { version = "0.4", default-features = false, features = ["std", "clock"] } clap = { version = "4", features = ["derive"] } coldsnap = { version = "0.6", default-features = false, features = ["aws-sdk-rust-rustls"] } @@ -30,7 +31,6 @@ nonzero_ext = "0.3" num_cpus = "1" parse-datetime = { path = "../parse-datetime", version = "0.1" } pubsys-config = { path = "../pubsys-config/", version = "0.1" } -rayon = "1" # Need to bring in reqwest with a TLS feature so tough can support TLS repos. reqwest = { version = "0.11", default-features = false, features = ["rustls-tls", "blocking"] } semver = "1" @@ -42,11 +42,11 @@ snafu = "0.7" tabled = "0.10" tempfile = "3" tinytemplate = "1" -tokio = { version = "1", features = ["full"] } # LTS +tokio = { version = "1", features = ["full"] } tokio-stream = { version = "0.1", features = ["time"] } toml = "0.8" -tough = { version = "0.14", features = ["http"] } -tough-kms = "0.6" -tough-ssm = "0.9" +tough = { version = "0.15", features = ["http"] } +tough-kms = { version = "0.7" } +tough-ssm = { version = "0.10" } update-metadata = { path = "../update-metadata/", version = "0.1" } url = { version = "2", features = ["serde"] } diff --git a/tools/pubsys/src/aws/ami/mod.rs b/tools/pubsys/src/aws/ami/mod.rs index 825f23dc9..abf0700f6 100644 --- a/tools/pubsys/src/aws/ami/mod.rs +++ b/tools/pubsys/src/aws/ami/mod.rs @@ -85,7 +85,9 @@ pub(crate) async fn run(args: &Args, ami_args: &AmiArgs) -> Result<()> { Ok(amis) => { // Write the AMI IDs to file if requested if let Some(ref path) = ami_args.ami_output { - write_amis(path, &amis).context(error::WriteAmisSnafu { path })?; + write_amis(path, &amis) + .await + .context(error::WriteAmisSnafu { path })?; } Ok(()) } diff --git a/tools/pubsys/src/aws/ami/register.rs b/tools/pubsys/src/aws/ami/register.rs index aed614aec..736d95917 100644 --- a/tools/pubsys/src/aws/ami/register.rs +++ b/tools/pubsys/src/aws/ami/register.rs @@ -8,6 +8,7 @@ use buildsys::manifest::{self, ImageFeature}; use coldsnap::{SnapshotUploader, SnapshotWaiter}; use log::{debug, info, warn}; use snafu::{ensure, OptionExt, ResultExt}; +use tokio::fs; const ROOT_DEVICE_NAME: &str = "/dev/xvda"; const DATA_DEVICE_NAME: &str = "/dev/xvdb"; @@ -48,9 +49,11 @@ async fn _register_image( let (os_volume_size, data_volume_size) = image_layout.publish_image_sizes_gib(); let uefi_data = - std::fs::read_to_string(&ami_args.uefi_data).context(error::LoadUefiDataSnafu { - path: &ami_args.uefi_data, - })?; + fs::read_to_string(&ami_args.uefi_data) + .await + .context(error::LoadUefiDataSnafu { + path: &ami_args.uefi_data, + })?; debug!("Uploading images into EBS snapshots in {}", region); let uploader = SnapshotUploader::new(ebs_client); diff --git a/tools/pubsys/src/aws/promote_ssm/mod.rs b/tools/pubsys/src/aws/promote_ssm/mod.rs index 21f4ca1d4..0969e0239 100644 --- a/tools/pubsys/src/aws/promote_ssm/mod.rs +++ b/tools/pubsys/src/aws/promote_ssm/mod.rs @@ -114,6 +114,7 @@ pub(crate) async fn run(args: &Args, promote_args: &PromoteArgs) -> Result<()> { // in their naming let template_parameters = template::get_parameters(&promote_args.template_path, &source_build_context) + .await .context(error::FindTemplatesSnafu)?; if template_parameters.parameters.is_empty() { diff --git a/tools/pubsys/src/aws/publish_ami/mod.rs b/tools/pubsys/src/aws/publish_ami/mod.rs index 578bdee48..b2871e244 100644 --- a/tools/pubsys/src/aws/publish_ami/mod.rs +++ b/tools/pubsys/src/aws/publish_ami/mod.rs @@ -23,9 +23,10 @@ use log::{debug, error, info, trace}; use pubsys_config::InfraConfig; use snafu::{ensure, OptionExt, ResultExt}; use std::collections::{HashMap, HashSet}; -use std::fs::File; use std::iter::FromIterator; use std::path::PathBuf; +use tokio::fs; +use tokio::fs::File; #[derive(Debug, Parser)] #[group(id = "who", required = true, multiple = true)] @@ -82,12 +83,14 @@ pub(crate) async fn run(args: &Args, publish_args: &Who) -> Result<()> { "Using AMI data from path: {}", publish_args.ami_input.display() ); - let file = File::open(&publish_args.ami_input).context(error::FileSnafu { - op: "open", - path: &publish_args.ami_input, - })?; - let mut ami_input: HashMap = - serde_json::from_reader(file).context(error::DeserializeSnafu { + let file = File::open(&publish_args.ami_input) + .await + .context(error::FileSnafu { + op: "open", + path: &publish_args.ami_input, + })?; + let mut ami_input: HashMap = serde_json::from_reader(file.into_std().await) + .context(error::DeserializeSnafu { path: &publish_args.ami_input, })?; trace!("Parsed AMI input: {:?}", ami_input); @@ -218,19 +221,19 @@ pub(crate) async fn run(args: &Args, publish_args: &Who) -> Result<()> { .into_iter() .map(|(region, image)| (region.to_string(), image)) .collect::>(), - )?; + ) + .await?; Ok(()) } -pub(crate) fn write_amis(path: &PathBuf, amis: &HashMap) -> Result<()> { - let file = File::create(path).context(error::FileSnafu { +pub(crate) async fn write_amis(path: &PathBuf, amis: &HashMap) -> Result<()> { + let json = serde_json::to_string_pretty(&amis).context(error::SerializeSnafu { path })?; + fs::write(path, &json).await.context(error::FileSnafu { op: "write AMIs to file", path, })?; - serde_json::to_writer_pretty(file, &amis).context(error::SerializeSnafu { path })?; info!("Wrote AMI data to {}", path.display()); - Ok(()) } diff --git a/tools/pubsys/src/aws/ssm/mod.rs b/tools/pubsys/src/aws/ssm/mod.rs index 82d3685b0..1f160e080 100644 --- a/tools/pubsys/src/aws/ssm/mod.rs +++ b/tools/pubsys/src/aws/ssm/mod.rs @@ -119,6 +119,7 @@ pub(crate) async fn run(args: &Args, ssm_args: &SsmArgs) -> Result<()> { ssm_args.template_path.display() ); let template_parameters = template::get_parameters(&ssm_args.template_path, &build_context) + .await .context(error::FindTemplatesSnafu)?; if template_parameters.parameters.is_empty() { diff --git a/tools/pubsys/src/aws/ssm/template.rs b/tools/pubsys/src/aws/ssm/template.rs index ac60583e3..9b90bcfe7 100644 --- a/tools/pubsys/src/aws/ssm/template.rs +++ b/tools/pubsys/src/aws/ssm/template.rs @@ -8,9 +8,9 @@ use log::trace; use serde::{Deserialize, Serialize}; use snafu::{ensure, ResultExt}; use std::collections::HashMap; -use std::fs; use std::path::Path; use tinytemplate::TinyTemplate; +use tokio::fs; /// Represents a single SSM parameter #[derive(Debug, Deserialize)] @@ -36,14 +36,16 @@ pub(crate) struct TemplateParameters { /// Deserializes template parameters from the template file, taking into account conditional /// parameters that may or may not apply based on our build context. -pub(crate) fn get_parameters( +pub(crate) async fn get_parameters( template_path: &Path, build_context: &BuildContext<'_>, ) -> Result { - let templates_str = fs::read_to_string(template_path).context(error::FileSnafu { - op: "read", - path: &template_path, - })?; + let templates_str = fs::read_to_string(template_path) + .await + .context(error::FileSnafu { + op: "read", + path: &template_path, + })?; let mut template_parameters: TemplateParameters = toml::from_str(&templates_str).context(error::InvalidTomlSnafu { path: &template_path, diff --git a/tools/pubsys/src/aws/validate_ami/mod.rs b/tools/pubsys/src/aws/validate_ami/mod.rs index e827059c6..101b2b9a6 100644 --- a/tools/pubsys/src/aws/validate_ami/mod.rs +++ b/tools/pubsys/src/aws/validate_ami/mod.rs @@ -15,8 +15,8 @@ use log::{error, info, trace}; use pubsys_config::InfraConfig; use snafu::ResultExt; use std::collections::{HashMap, HashSet}; -use std::fs::File; use std::path::PathBuf; +use tokio::fs; /// Validates EC2 images by calling `describe-images` on all images in the file given by /// `expected-amis-path` and ensuring that the returned `public`, `ena-support`, @@ -134,13 +134,13 @@ pub(crate) async fn validate( }; // Write the results as JSON - serde_json::to_writer_pretty( - &File::create(write_results_path).context(error::WriteValidationResultsSnafu { + let json = serde_json::to_string_pretty(&results) + .context(error::SerializeValidationResultsSnafu)?; + fs::write(&write_results_path, &json).await.context( + error::WriteValidationResultsSnafu { path: write_results_path, - })?, - &results, - ) - .context(error::SerializeValidationResultsSnafu)?; + }, + )?; } Ok(validation_results) @@ -199,12 +199,14 @@ pub(crate) async fn parse_expected_amis( expected_amis_path: &PathBuf, ) -> Result>> { // Parse the JSON file as a `HashMap` of region_name, mapped to an `ImageData` struct - let expected_amis: HashMap = serde_json::from_reader( - &File::open(expected_amis_path.clone()).context(error::ReadExpectedImagesFileSnafu { - path: expected_amis_path, - })?, - ) - .context(error::ParseExpectedImagesFileSnafu)?; + let file_bytes = + fs::read(&expected_amis_path) + .await + .context(error::ReadExpectedImagesFileSnafu { + path: expected_amis_path, + })?; + let expected_amis: HashMap = + serde_json::from_slice(&file_bytes).context(error::ParseExpectedImagesFileSnafu)?; // Extract the `Vec` from the `ImageData` structs let vectored_images = expected_amis diff --git a/tools/pubsys/src/aws/validate_ami/results.rs b/tools/pubsys/src/aws/validate_ami/results.rs index 698fbe01a..45a601dbb 100644 --- a/tools/pubsys/src/aws/validate_ami/results.rs +++ b/tools/pubsys/src/aws/validate_ami/results.rs @@ -210,7 +210,7 @@ mod test { (Region::new("us-west-2"), HashSet::from([])), (Region::new("us-east-1"), HashSet::from([])), ])); - let results_filtered = results.get_results_for_status(&vec![ + let results_filtered = results.get_results_for_status(&[ AmiValidationResultStatus::Correct, AmiValidationResultStatus::Incorrect, AmiValidationResultStatus::Missing, @@ -355,7 +355,7 @@ mod test { ), ])); let results_filtered = - results.get_results_for_status(&vec![AmiValidationResultStatus::Correct]); + results.get_results_for_status(&[AmiValidationResultStatus::Correct]); assert_eq!( results_filtered, @@ -525,7 +525,7 @@ mod test { ]), ), ])); - let results_filtered = results.get_results_for_status(&vec![ + let results_filtered = results.get_results_for_status(&[ AmiValidationResultStatus::Correct, AmiValidationResultStatus::Incorrect, ]); @@ -756,7 +756,7 @@ mod test { )]), ), ])); - let results_filtered = results.get_results_for_status(&vec![ + let results_filtered = results.get_results_for_status(&[ AmiValidationResultStatus::Correct, AmiValidationResultStatus::Incorrect, AmiValidationResultStatus::Missing, @@ -1027,7 +1027,7 @@ mod test { ), ])); let results_filtered = - results.get_results_for_status(&vec![AmiValidationResultStatus::Missing]); + results.get_results_for_status(&[AmiValidationResultStatus::Missing]); assert_eq!(results_filtered, HashSet::new()); } diff --git a/tools/pubsys/src/aws/validate_ssm/mod.rs b/tools/pubsys/src/aws/validate_ssm/mod.rs index 3dc5f4ee8..eded93168 100644 --- a/tools/pubsys/src/aws/validate_ssm/mod.rs +++ b/tools/pubsys/src/aws/validate_ssm/mod.rs @@ -14,8 +14,8 @@ use log::{error, info, trace}; use pubsys_config::InfraConfig; use snafu::ResultExt; use std::collections::{HashMap, HashSet}; -use std::fs::File; use std::path::PathBuf; +use tokio::fs; /// Validates SSM parameters and AMIs #[derive(Debug, Parser)] @@ -128,13 +128,13 @@ pub async fn validate( }; // Write the results as JSON - serde_json::to_writer_pretty( - &File::create(write_results_path).context(error::WriteValidationResultsSnafu { + let json = serde_json::to_string_pretty(&results) + .context(error::SerializeValidationResultsSnafu)?; + fs::write(write_results_path, &json) + .await + .context(error::WriteValidationResultsSnafu { path: write_results_path, - })?, - &results, - ) - .context(error::SerializeValidationResultsSnafu)?; + })?; } Ok(validation_results) @@ -206,15 +206,15 @@ type ParameterValue = String; pub(crate) async fn parse_parameters( expected_parameters_file: &PathBuf, ) -> Result>> { + let file_bytes = fs::read(expected_parameters_file.clone()).await.context( + error::ReadExpectedParameterFileSnafu { + path: expected_parameters_file, + }, + )?; // Parse the JSON file as a HashMap of region_name, mapped to a HashMap of parameter_name and // parameter_value let expected_parameters: HashMap> = - serde_json::from_reader(&File::open(expected_parameters_file.clone()).context( - error::ReadExpectedParameterFileSnafu { - path: expected_parameters_file, - }, - )?) - .context(error::ParseExpectedParameterFileSnafu)?; + serde_json::from_slice(&file_bytes).context(error::ParseExpectedParameterFileSnafu)?; // Iterate over the parsed HashMap, converting the nested HashMap into a HashMap of Region // mapped to a HashMap of SsmKey, String diff --git a/tools/pubsys/src/main.rs b/tools/pubsys/src/main.rs index 520ef4e5a..d61aec9f0 100644 --- a/tools/pubsys/src/main.rs +++ b/tools/pubsys/src/main.rs @@ -26,15 +26,19 @@ mod aws; mod repo; mod vmware; +use bytes::Bytes; use clap::Parser; +use futures::Stream; +use futures::TryStreamExt; use semver::Version; use simplelog::{CombinedLogger, Config as LogConfig, ConfigBuilder, LevelFilter, SimpleLogger}; use snafu::ResultExt; use std::path::PathBuf; use std::process; -use tokio::runtime::Runtime; +use std::result::Result as StdResult; +use tough::error::Error as ToughError; -fn run() -> Result<()> { +async fn run() -> Result<()> { // Parse and store the args passed to the program let args = Args::parse(); @@ -72,73 +76,55 @@ fn run() -> Result<()> { } match args.subcommand { - SubCommands::Repo(ref repo_args) => repo::run(&args, repo_args).context(error::RepoSnafu), + SubCommands::Repo(ref repo_args) => { + repo::run(&args, repo_args).await.context(error::RepoSnafu) + } SubCommands::ValidateRepo(ref validate_repo_args) => { - repo::validate_repo::run(&args, validate_repo_args).context(error::ValidateRepoSnafu) + repo::validate_repo::run(&args, validate_repo_args) + .await + .context(error::ValidateRepoSnafu) } SubCommands::CheckRepoExpirations(ref check_expirations_args) => { repo::check_expirations::run(&args, check_expirations_args) + .await .context(error::CheckExpirationsSnafu) } SubCommands::RefreshRepo(ref refresh_repo_args) => { - repo::refresh_repo::run(&args, refresh_repo_args).context(error::RefreshRepoSnafu) - } - SubCommands::Ami(ref ami_args) => { - let rt = Runtime::new().context(error::RuntimeSnafu)?; - rt.block_on(async { - aws::ami::run(&args, ami_args) - .await - .context(error::AmiSnafu) - }) - } - SubCommands::PublishAmi(ref publish_args) => { - let rt = Runtime::new().context(error::RuntimeSnafu)?; - rt.block_on(async { - aws::publish_ami::run(&args, publish_args) - .await - .context(error::PublishAmiSnafu) - }) - } - SubCommands::Ssm(ref ssm_args) => { - let rt = Runtime::new().context(error::RuntimeSnafu)?; - rt.block_on(async { - aws::ssm::run(&args, ssm_args) - .await - .context(error::SsmSnafu) - }) - } - SubCommands::PromoteSsm(ref promote_args) => { - let rt = Runtime::new().context(error::RuntimeSnafu)?; - rt.block_on(async { - aws::promote_ssm::run(&args, promote_args) - .await - .context(error::PromoteSsmSnafu) - }) + repo::refresh_repo::run(&args, refresh_repo_args) + .await + .context(error::RefreshRepoSnafu) } + SubCommands::Ami(ref ami_args) => aws::ami::run(&args, ami_args) + .await + .context(error::AmiSnafu), + SubCommands::PublishAmi(ref publish_args) => aws::publish_ami::run(&args, publish_args) + .await + .context(error::PublishAmiSnafu), + SubCommands::Ssm(ref ssm_args) => aws::ssm::run(&args, ssm_args) + .await + .context(error::SsmSnafu), + SubCommands::PromoteSsm(ref promote_args) => aws::promote_ssm::run(&args, promote_args) + .await + .context(error::PromoteSsmSnafu), SubCommands::ValidateSsm(ref validate_ssm_args) => { - let rt = Runtime::new().context(error::RuntimeSnafu)?; - rt.block_on(async { - aws::validate_ssm::run(&args, validate_ssm_args) - .await - .context(error::ValidateSsmSnafu) - }) + aws::validate_ssm::run(&args, validate_ssm_args) + .await + .context(error::ValidateSsmSnafu) } SubCommands::ValidateAmi(ref validate_ami_args) => { - let rt = Runtime::new().context(error::RuntimeSnafu)?; - rt.block_on(async { - aws::validate_ami::run(&args, validate_ami_args) - .await - .context(error::ValidateAmiSnafu) - }) - } - SubCommands::UploadOva(ref upload_args) => { - vmware::upload_ova::run(&args, upload_args).context(error::UploadOvaSnafu) + aws::validate_ami::run(&args, validate_ami_args) + .await + .context(error::ValidateAmiSnafu) } + SubCommands::UploadOva(ref upload_args) => vmware::upload_ova::run(&args, upload_args) + .await + .context(error::UploadOvaSnafu), } } -fn main() { - if let Err(e) = run() { +#[tokio::main] +async fn main() { + if let Err(e) = run().await { eprintln!("{}", e); process::exit(1); } @@ -188,6 +174,19 @@ pub(crate) fn friendly_version( Version::parse(version_str) } +type BytesResult = StdResult; + +pub(crate) async fn read_stream( + stream: impl Stream + Send, +) -> StdResult, ToughError> { + stream + .try_fold(Vec::new(), |mut acc, bytes| { + acc.extend(bytes.as_ref()); + std::future::ready(Ok(acc)) + }) + .await +} + mod error { use snafu::Snafu; @@ -232,9 +231,6 @@ mod error { source: crate::repo::refresh_repo::Error, }, - #[snafu(display("Failed to create async runtime: {}", source))] - Runtime { source: std::io::Error }, - #[snafu(display("Failed to update SSM: {}", source))] Ssm { source: crate::aws::ssm::Error }, diff --git a/tools/pubsys/src/repo.rs b/tools/pubsys/src/repo.rs index cd564ff3f..c5d7f0c78 100644 --- a/tools/pubsys/src/repo.rs +++ b/tools/pubsys/src/repo.rs @@ -4,7 +4,7 @@ pub(crate) mod check_expirations; pub(crate) mod refresh_repo; pub(crate) mod validate_repo; -use crate::{friendly_version, Args}; +use crate::{friendly_version, read_stream, Args}; use aws_sdk_kms::{config::Region, Client as KmsClient}; use chrono::{DateTime, Utc}; use clap::Parser; @@ -17,10 +17,10 @@ use pubsys_config::{ use semver::Version; use snafu::{ensure, OptionExt, ResultExt}; use std::convert::TryInto; -use std::fs::{self, File}; use std::num::NonZeroU64; use std::path::{Path, PathBuf}; use tempfile::NamedTempFile; +use tokio::fs; use tokio::runtime::Runtime; use tough::{ editor::signed::PathExists, @@ -102,6 +102,12 @@ pub(crate) struct RepoArgs { outdir: PathBuf, } +pub(crate) async fn root_bytes(path: impl AsRef) -> Result> { + fs::read(path.as_ref()).await.context(error::FileSnafu { + path: path.as_ref(), + }) +} + /// Adds update, migrations, and waves to the Manifest fn update_manifest(repo_args: &RepoArgs, manifest: &mut Manifest) -> Result<()> { // Add update =^..^= =^..^= =^..^= =^..^= @@ -229,7 +235,7 @@ fn set_versions(editor: &mut RepositoryEditor) -> Result<()> { } /// Adds targets, expirations, and version to the RepositoryEditor -fn update_editor<'a, P>( +async fn update_editor<'a, P>( repo_args: &'a RepoArgs, editor: &mut RepositoryEditor, targets: impl Iterator, @@ -244,12 +250,16 @@ where debug!("Adding target from path: {}", target_path.display()); editor .add_target_path(target_path) + .await .context(error::AddTargetSnafu { path: &target_path })?; } - let manifest_target = Target::from_path(&manifest_path).context(error::BuildTargetSnafu { - path: manifest_path.as_ref(), - })?; + let manifest_target = + Target::from_path(&manifest_path) + .await + .context(error::BuildTargetSnafu { + path: manifest_path.as_ref(), + })?; debug!("Adding target for manifest.json"); editor .add_target("manifest.json", manifest_target) @@ -329,7 +339,7 @@ fn repo_urls<'a>( /// Builds an editor and manifest; will start from an existing repo if one is specified in the /// configuration. Returns Err if we fail to read from the repo. Returns Ok(None) if we detect /// that the repo does not exist. -fn load_editor_and_manifest<'a, P>( +async fn load_editor_and_manifest<'a, P>( root_role_path: P, metadata_url: &'a Url, targets_url: &'a Url, @@ -338,37 +348,35 @@ where P: AsRef, { let root_role_path = root_role_path.as_ref(); + let root = root_bytes(root_role_path).await?; // Try to load the repo... - let repo_load_result = RepositoryLoader::new( - File::open(root_role_path).context(error::FileSnafu { - path: root_role_path, - })?, - metadata_url.clone(), - targets_url.clone(), - ) - .load(); - - match repo_load_result { + let repo_load_result = + RepositoryLoader::new(&root, metadata_url.clone(), targets_url.clone()).load(); + + match repo_load_result.await { // If we load it successfully, build an editor and manifest from it. Ok(repo) => { let target = "manifest.json"; let target = target .try_into() .context(error::ParseTargetNameSnafu { target })?; - let reader = repo + let stream = repo .read_target(&target) + .await .context(error::ReadTargetSnafu { target: target.raw(), })? .with_context(|| error::NoManifestSnafu { metadata_url: metadata_url.clone(), })?; - let manifest = serde_json::from_reader(reader).context(error::InvalidJsonSnafu { + let bytes = read_stream(stream).await.context(error::StreamSnafu)?; + let manifest = serde_json::from_slice(&bytes).context(error::InvalidJsonSnafu { path: "manifest.json", })?; let editor = RepositoryEditor::from_repo(root_role_path, repo) + .await .context(error::EditorFromRepoSnafu)?; Ok(Some((editor, manifest))) @@ -444,7 +452,7 @@ async fn async_get_client(region: &str) -> KmsClient { } /// Common entrypoint from main() -pub(crate) fn run(args: &Args, repo_args: &RepoArgs) -> Result<()> { +pub(crate) async fn run(args: &Args, repo_args: &RepoArgs) -> Result<()> { let metadata_out_dir = repo_args .outdir .join(&repo_args.variant) @@ -492,7 +500,8 @@ pub(crate) fn run(args: &Args, repo_args: &RepoArgs) -> Result<()> { let (mut editor, mut manifest) = if let Some((metadata_url, targets_url)) = maybe_urls.as_ref() { info!("Found metadata and target URLs, loading existing repository"); - match load_editor_and_manifest(&repo_args.root_role_path, metadata_url, targets_url)? { + match load_editor_and_manifest(&repo_args.root_role_path, metadata_url, targets_url).await? + { Some((editor, manifest)) => (editor, manifest), None => { warn!( @@ -501,6 +510,7 @@ pub(crate) fn run(args: &Args, repo_args: &RepoArgs) -> Result<()> { ); ( RepositoryEditor::new(&repo_args.root_role_path) + .await .context(error::NewEditorSnafu)?, Manifest::default(), ) @@ -509,7 +519,9 @@ pub(crate) fn run(args: &Args, repo_args: &RepoArgs) -> Result<()> { } else { info!("Did not find metadata and target URLs in infra config, creating a new repository"); ( - RepositoryEditor::new(&repo_args.root_role_path).context(error::NewEditorSnafu)?, + RepositoryEditor::new(&repo_args.root_role_path) + .await + .context(error::NewEditorSnafu)?, Manifest::default(), ) }; @@ -533,7 +545,7 @@ pub(crate) fn run(args: &Args, repo_args: &RepoArgs) -> Result<()> { ]); let all_targets = copy_targets.iter().chain(link_targets.clone()); - update_editor(repo_args, &mut editor, all_targets, &manifest_path)?; + update_editor(repo_args, &mut editor, all_targets, &manifest_path).await?; // Sign repo =^..^= =^..^= =^..^= =^..^= @@ -555,15 +567,20 @@ pub(crate) fn run(args: &Args, repo_args: &RepoArgs) -> Result<()> { }) }; - let signed_repo = editor.sign(&[key_source]).context(error::RepoSignSnafu)?; + let signed_repo = editor + .sign(&[key_source]) + .await + .context(error::RepoSignSnafu)?; // Write repo =^..^= =^..^= =^..^= =^..^= // Write targets first so we don't have invalid metadata if targets fail info!("Writing repo targets to: {}", targets_out_dir.display()); - fs::create_dir_all(&targets_out_dir).context(error::CreateDirSnafu { - path: &targets_out_dir, - })?; + fs::create_dir_all(&targets_out_dir) + .await + .context(error::CreateDirSnafu { + path: &targets_out_dir, + })?; // Copy manifest with proper name instead of tempfile name debug!("Copying manifest.json into {}", targets_out_dir.display()); @@ -579,6 +596,7 @@ pub(crate) fn run(args: &Args, repo_args: &RepoArgs) -> Result<()> { PathExists::Fail, Some(&target), ) + .await .context(error::CopyTargetSnafu { target: &manifest_path, path: &targets_out_dir, @@ -593,6 +611,7 @@ pub(crate) fn run(args: &Args, repo_args: &RepoArgs) -> Result<()> { ); signed_repo .copy_target(copy_target, &targets_out_dir, PathExists::Skip, None) + .await .context(error::CopyTargetSnafu { target: copy_target, path: &targets_out_dir, @@ -606,6 +625,7 @@ pub(crate) fn run(args: &Args, repo_args: &RepoArgs) -> Result<()> { ); signed_repo .link_target(link_target, &targets_out_dir, PathExists::Skip, None) + .await .context(error::LinkTargetSnafu { target: link_target, path: &targets_out_dir, @@ -613,11 +633,14 @@ pub(crate) fn run(args: &Args, repo_args: &RepoArgs) -> Result<()> { } info!("Writing repo metadata to: {}", metadata_out_dir.display()); - fs::create_dir_all(&metadata_out_dir).context(error::CreateDirSnafu { - path: &metadata_out_dir, - })?; + fs::create_dir_all(&metadata_out_dir) + .await + .context(error::CreateDirSnafu { + path: &metadata_out_dir, + })?; signed_repo .write(&metadata_out_dir) + .await .context(error::RepoWriteSnafu { path: &repo_args.outdir, })?; @@ -797,6 +820,12 @@ mod error { source: Box, }, + #[snafu(display("Error reading bytes from stream: {}", source))] + Stream { + #[snafu(source(from(tough::error::Error, Box::new)))] + source: Box, + }, + #[snafu(display("Failed to create temporary file: {}", source))] TempFile { source: io::Error }, diff --git a/tools/pubsys/src/repo/check_expirations/mod.rs b/tools/pubsys/src/repo/check_expirations/mod.rs index bebbcd257..f0941b352 100644 --- a/tools/pubsys/src/repo/check_expirations/mod.rs +++ b/tools/pubsys/src/repo/check_expirations/mod.rs @@ -2,7 +2,7 @@ //! checking the metadata expirations of a given TUF repository. use crate::repo::{error as repo_error, repo_urls}; -use crate::Args; +use crate::{repo, Args}; use chrono::{DateTime, Utc}; use clap::Parser; use log::{error, info, trace, warn}; @@ -10,7 +10,6 @@ use parse_datetime::parse_datetime; use pubsys_config::InfraConfig; use snafu::{OptionExt, ResultExt}; use std::collections::HashMap; -use std::fs::File; use std::path::PathBuf; use tough::{ExpirationEnforcement, Repository, RepositoryLoader}; use url::Url; @@ -73,7 +72,7 @@ fn find_upcoming_metadata_expiration( expirations } -fn check_expirations( +async fn check_expirations( root_role_path: &PathBuf, metadata_url: &Url, targets_url: &Url, @@ -81,15 +80,14 @@ fn check_expirations( ) -> Result<()> { // Load the repository let repo = RepositoryLoader::new( - File::open(root_role_path).context(repo_error::FileSnafu { - path: root_role_path, - })?, + &repo::root_bytes(root_role_path).await?, metadata_url.clone(), targets_url.clone(), ) // We're gonna check the expiration ourselves .expiration_enforcement(ExpirationEnforcement::Unsafe) .load() + .await .context(repo_error::RepoLoadSnafu { metadata_base_url: metadata_url.clone(), })?; @@ -128,7 +126,7 @@ fn check_expirations( } /// Common entrypoint from main() -pub(crate) fn run(args: &Args, check_expirations_args: &CheckExpirationsArgs) -> Result<()> { +pub(crate) async fn run(args: &Args, check_expirations_args: &CheckExpirationsArgs) -> Result<()> { // If a lock file exists, use that, otherwise use Infra.toml let infra_config = InfraConfig::from_path_or_lock(&args.infra_config_path, false) .context(repo_error::ConfigSnafu)?; @@ -157,7 +155,8 @@ pub(crate) fn run(args: &Args, check_expirations_args: &CheckExpirationsArgs) -> &repo_urls.0, repo_urls.1, check_expirations_args.expiration_limit, - )?; + ) + .await?; Ok(()) } diff --git a/tools/pubsys/src/repo/refresh_repo/mod.rs b/tools/pubsys/src/repo/refresh_repo/mod.rs index be7078761..f6444692a 100644 --- a/tools/pubsys/src/repo/refresh_repo/mod.rs +++ b/tools/pubsys/src/repo/refresh_repo/mod.rs @@ -4,16 +4,15 @@ use crate::repo::{ error as repo_error, get_signing_key_source, repo_urls, set_expirations, set_versions, }; -use crate::Args; +use crate::{repo, Args}; use chrono::{DateTime, Utc}; use clap::Parser; use lazy_static::lazy_static; use log::{info, trace}; use pubsys_config::{InfraConfig, RepoExpirationPolicy}; use snafu::{ensure, OptionExt, ResultExt}; -use std::fs; -use std::fs::File; use std::path::{Path, PathBuf}; +use tokio::fs; use tough::editor::RepositoryEditor; use tough::key_source::{KeySource, LocalKeySource}; use tough::{ExpirationEnforcement, RepositoryLoader}; @@ -59,7 +58,7 @@ pub(crate) struct RefreshRepoArgs { unsafe_refresh: bool, } -fn refresh_repo( +async fn refresh_repo( root_role_path: &PathBuf, metadata_out_dir: &PathBuf, metadata_url: &Url, @@ -85,18 +84,18 @@ fn refresh_repo( // Load the repository and get the repo editor for it let repo = RepositoryLoader::new( - File::open(root_role_path).context(repo_error::FileSnafu { - path: root_role_path, - })?, + &repo::root_bytes(root_role_path).await?, metadata_url.clone(), targets_url.clone(), ) .expiration_enforcement(expiration_enforcement) .load() + .await .context(repo_error::RepoLoadSnafu { metadata_base_url: metadata_url.clone(), })?; let mut repo_editor = RepositoryEditor::from_repo(root_role_path, repo) + .await .context(repo_error::EditorFromRepoSnafu)?; info!("Loaded TUF repo: {}", metadata_url); @@ -109,15 +108,19 @@ fn refresh_repo( // Sign the repository let signed_repo = repo_editor .sign(&[key_source]) + .await .context(repo_error::RepoSignSnafu)?; // Write out the metadata files for the repository info!("Writing repo metadata to: {}", metadata_out_dir.display()); - fs::create_dir_all(metadata_out_dir).context(repo_error::CreateDirSnafu { - path: &metadata_out_dir, - })?; + fs::create_dir_all(metadata_out_dir) + .await + .context(repo_error::CreateDirSnafu { + path: &metadata_out_dir, + })?; signed_repo .write(metadata_out_dir) + .await .context(repo_error::RepoWriteSnafu { path: &metadata_out_dir, })?; @@ -126,7 +129,7 @@ fn refresh_repo( } /// Common entrypoint from main() -pub(crate) fn run(args: &Args, refresh_repo_args: &RefreshRepoArgs) -> Result<(), Error> { +pub(crate) async fn run(args: &Args, refresh_repo_args: &RefreshRepoArgs) -> Result<(), Error> { // If a lock file exists, use that, otherwise use Infra.toml let infra_config = InfraConfig::from_path_or_lock(&args.infra_config_path, false) .context(repo_error::ConfigSnafu)?; @@ -189,7 +192,8 @@ pub(crate) fn run(args: &Args, refresh_repo_args: &RefreshRepoArgs) -> Result<() key_source, &expiration, refresh_repo_args.unsafe_refresh, - )?; + ) + .await?; Ok(()) } diff --git a/tools/pubsys/src/repo/validate_repo/mod.rs b/tools/pubsys/src/repo/validate_repo/mod.rs index 1734f6bb0..582396e2f 100644 --- a/tools/pubsys/src/repo/validate_repo/mod.rs +++ b/tools/pubsys/src/repo/validate_repo/mod.rs @@ -2,16 +2,17 @@ //! a given TUF repository by attempting to load the repository and download its targets. use crate::repo::{error as repo_error, repo_urls}; -use crate::Args; +use crate::{read_stream, repo, Args}; use clap::Parser; use log::{info, trace}; use pubsys_config::InfraConfig; use snafu::{OptionExt, ResultExt}; -use std::cmp::min; -use std::fs::File; -use std::io; +use std::io::Cursor; use std::path::PathBuf; -use std::sync::mpsc; +use std::sync::Arc; +use tokio::io; +use tokio::sync::{Mutex, Semaphore}; +use tokio::task::JoinSet; use tough::{Repository, RepositoryLoader, TargetName}; use url::Url; @@ -38,52 +39,39 @@ pub(crate) struct ValidateRepoArgs { validate_targets: bool, } -/// If we are on a machine with a large number of cores, then we limit the number of simultaneous -/// downloads to this arbitrarily chosen maximum. -const MAX_DOWNLOAD_THREADS: usize = 16; +/// Retrieves listed targets and attempts to download them for validation purposes. +async fn retrieve_targets(repo: Arc>) -> Result<(), Error> { + let targets = repo.lock().await.targets().signed.targets.clone(); + let mut tasks = JoinSet::new(); -/// Retrieves listed targets and attempts to download them for validation purposes. We use a Rayon -/// thread pool instead of tokio for async execution because `reqwest::blocking` creates a tokio -/// runtime (and multiple tokio runtimes are not supported). -fn retrieve_targets(repo: &Repository) -> Result<(), Error> { - let targets = &repo.targets().signed.targets; - let thread_pool = rayon::ThreadPoolBuilder::new() - .num_threads(min(num_cpus::get(), MAX_DOWNLOAD_THREADS)) - .build() - .context(error::ThreadPoolSnafu)?; - - // create the channels through which our download results will be passed - let (tx, rx) = mpsc::channel(); + // Limit to an arbitrarily chosen maximum number of parallel downloads. + let sem = Arc::new(Semaphore::new(10)); for target in targets.keys() { - let repo = repo.clone(); - let tx = tx.clone(); info!("Downloading target: {}", target.raw()); - let target = target.clone(); - thread_pool.spawn(move || { - tx.send(download_targets(&repo, target)) - // inability to send on this channel is unrecoverable - .unwrap(); - }); + let cloned_sem = sem.clone(); + let cloned_repo = repo.clone(); + let cloned_target = target.clone(); + let fut = async move { + let _permit = cloned_sem.acquire().await.context(error::SemaphoreSnafu)?; + download_target(cloned_repo, cloned_target).await + }; + tasks.spawn(fut); } - // close all senders - drop(tx); - - // block and await all downloads - let results: Vec> = rx.into_iter().collect(); - // check all results and return the first error we see - for result in results { - result?; + while let Some(join_result) = tasks.join_next().await { + // Return an error if there was an error joining the task. + let result = join_result.context(error::JoinSnafu)?; + // Return an error if the download failed. + let _ = result?; } - - // no errors were found, the targets are validated Ok(()) } -fn download_targets(repo: &Repository, target: TargetName) -> Result { - let mut reader = match repo.read_target(&target) { - Ok(Some(reader)) => reader, +async fn download_target(repo: Arc>, target: TargetName) -> Result { + let repo = repo.lock().await; + let stream = match repo.read_target(&target).await { + Ok(Some(stream)) => stream, Ok(None) => { return error::TargetMissingSnafu { target: target.raw(), @@ -96,13 +84,16 @@ fn download_targets(repo: &Repository, target: TargetName) -> Result }) } }; + let mut bytes = Cursor::new(read_stream(stream).await.context(error::StreamSnafu)?); // tough's `Read` implementation validates the target as it's being downloaded - io::copy(&mut reader, &mut io::sink()).context(error::TargetDownloadSnafu { - target: target.raw(), - }) + io::copy(&mut bytes, &mut io::sink()) + .await + .context(error::TargetDownloadSnafu { + target: target.raw(), + }) } -fn validate_repo( +async fn validate_repo( root_role_path: &PathBuf, metadata_url: Url, targets_url: &Url, @@ -110,27 +101,27 @@ fn validate_repo( ) -> Result<(), Error> { // Load the repository let repo = RepositoryLoader::new( - File::open(root_role_path).context(repo_error::FileSnafu { - path: root_role_path, - })?, + &repo::root_bytes(root_role_path).await?, metadata_url.clone(), targets_url.clone(), ) .load() + .await .context(repo_error::RepoLoadSnafu { metadata_base_url: metadata_url.clone(), })?; info!("Loaded TUF repo: {}", metadata_url); + let thread_safe_repo = Arc::new(Mutex::new(repo)); if validate_targets { // Try retrieving listed targets - retrieve_targets(&repo)?; + retrieve_targets(thread_safe_repo).await?; } Ok(()) } /// Common entrypoint from main() -pub(crate) fn run(args: &Args, validate_repo_args: &ValidateRepoArgs) -> Result<(), Error> { +pub(crate) async fn run(args: &Args, validate_repo_args: &ValidateRepoArgs) -> Result<(), Error> { // If a lock file exists, use that, otherwise use Infra.toml let infra_config = InfraConfig::from_path_or_lock(&args.infra_config_path, false) .context(repo_error::ConfigSnafu)?; @@ -160,6 +151,7 @@ pub(crate) fn run(args: &Args, validate_repo_args: &ValidateRepoArgs) -> Result< repo_urls.1, validate_repo_args.validate_targets, ) + .await } mod error { @@ -169,6 +161,9 @@ mod error { #[derive(Debug, Snafu)] #[snafu(visibility(pub(super)))] pub(crate) enum Error { + #[snafu(display("Error running async code: {}", source))] + Async { source: std::io::Error }, + #[snafu(display("Invalid percentage specified: {} is greater than 100", percentage))] InvalidPercentage { percentage: u8 }, @@ -178,9 +173,18 @@ mod error { source: Box, }, + #[snafu(display("Error parallelizing download tasks: {}", source))] + Semaphore { source: tokio::sync::AcquireError }, + + #[snafu(display("Error reading bytes from stream: {}", source))] + Stream { source: tough::error::Error }, + #[snafu(display("Failed to download and write target '{}': {}", target, source))] TargetDownload { target: String, source: io::Error }, + #[snafu(display("Failed to complete download task: {}", source))] + Join { source: tokio::task::JoinError }, + #[snafu(display("Missing target: {}", target))] TargetMissing { target: String }, @@ -190,9 +194,6 @@ mod error { #[snafu(source(from(tough::error::Error, Box::new)))] source: Box, }, - - #[snafu(display("Unable to create thread pool: {}", source))] - ThreadPool { source: rayon::ThreadPoolBuildError }, } } pub(crate) use error::Error; diff --git a/tools/pubsys/src/vmware/upload_ova/mod.rs b/tools/pubsys/src/vmware/upload_ova/mod.rs index 3df0454df..e392437ab 100644 --- a/tools/pubsys/src/vmware/upload_ova/mod.rs +++ b/tools/pubsys/src/vmware/upload_ova/mod.rs @@ -11,10 +11,10 @@ use pubsys_config::vmware::{ use pubsys_config::InfraConfig; use serde::Serialize; use snafu::{ensure, OptionExt, ResultExt}; -use std::fs; use std::path::PathBuf; use tempfile::NamedTempFile; use tinytemplate::TinyTemplate; +use tokio::fs; const SPEC_TEMPLATE_NAME: &str = "spec_template"; @@ -43,7 +43,7 @@ pub(crate) struct UploadArgs { } /// Common entrypoint from main() -pub(crate) fn run(args: &Args, upload_args: &UploadArgs) -> Result<()> { +pub(crate) async fn run(args: &Args, upload_args: &UploadArgs) -> Result<()> { // If a lock file exists, use that, otherwise use Infra.toml or default let infra_config = InfraConfig::from_path_or_lock(&args.infra_config_path, true) .context(error::InfraConfigSnafu)?; @@ -89,10 +89,13 @@ pub(crate) fn run(args: &Args, upload_args: &UploadArgs) -> Result<()> { let dc_common = vmware.common.as_ref(); // Read the import spec as a template - let import_spec_str = fs::read_to_string(&upload_args.spec).context(error::FileSnafu { - action: "read", - path: &upload_args.spec, - })?; + let import_spec_str = + fs::read_to_string(&upload_args.spec) + .await + .context(error::FileSnafu { + action: "read", + path: &upload_args.spec, + })?; let mut tt = TinyTemplate::new(); tt.add_template(SPEC_TEMPLATE_NAME, &import_spec_str) .context(error::AddTemplateSnafu { @@ -129,10 +132,12 @@ pub(crate) fn run(args: &Args, upload_args: &UploadArgs) -> Result<()> { // Render the import spec with this datacenter's details and write to temp file let rendered_spec = render_spec(&tt, &datacenter.network, upload_args.mark_as_template)?; let import_spec = NamedTempFile::new().context(error::TempFileSnafu)?; - fs::write(import_spec.path(), &rendered_spec).context(error::FileSnafu { - action: "write", - path: import_spec.path(), - })?; + fs::write(import_spec.path(), &rendered_spec) + .await + .context(error::FileSnafu { + action: "write", + path: import_spec.path(), + })?; trace!("Import spec: {}", &rendered_spec); if upload_args.mark_as_template { diff --git a/twoliter/src/project.rs b/twoliter/src/project.rs index bee333675..31880b418 100644 --- a/twoliter/src/project.rs +++ b/twoliter/src/project.rs @@ -236,7 +236,7 @@ mod test { let deserialized = Project::load(path).await.unwrap(); // Add checks here as desired to validate deserialization. - assert_eq!(SchemaVersion::<1>::default(), deserialized.schema_version); + assert_eq!(SchemaVersion::<1>, deserialized.schema_version); let sdk_name = deserialized.sdk_name().unwrap(); let toolchain_name = deserialized.toolchain_name().unwrap(); assert_eq!("a.com/b", sdk_name.registry.as_ref().unwrap().as_str());