From 6080458ecd631a981b193960627fc028ad350aa8 Mon Sep 17 00:00:00 2001 From: Martin Hoffmann Date: Fri, 24 Nov 2023 15:45:30 +0100 Subject: [PATCH 1/9] Conditional requests for the HTTP target. --- src/lib.rs | 1 + src/targets/http.rs | 145 ++++++++++++++++++++++++++++++++------- src/utils/http.rs | 160 ++++++++++++++++++++++++++++++++++++++++++++ src/utils/mod.rs | 1 + 4 files changed, 281 insertions(+), 26 deletions(-) create mode 100644 src/utils/http.rs create mode 100644 src/utils/mod.rs diff --git a/src/lib.rs b/src/lib.rs index 8c6a2da..a6ef894 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -33,3 +33,4 @@ pub mod metrics; pub mod payload; pub mod targets; pub mod units; +pub mod utils; diff --git a/src/targets/http.rs b/src/targets/http.rs index c088539..697e632 100644 --- a/src/targets/http.rs +++ b/src/targets/http.rs @@ -3,15 +3,20 @@ use std::convert::Infallible; use std::sync::Arc; use arc_swap::ArcSwap; +use chrono::{DateTime, Utc}; use futures::stream; -use hyper::{Body, Method, Request, Response}; +use hyper::{Body, Method, Request, Response, StatusCode}; +use hyper::http::response; use log::debug; +use rpki::rtr::State; use serde::Deserialize; use crate::payload; use crate::comms::Link; use crate::formats::output; use crate::log::ExitError; use crate::manager::Component; +use crate::utils::http::EtagsIter; +use crate::utils::http::{format_http_date, parse_http_date}; //------------ Target -------------------------------------------------------- @@ -43,30 +48,45 @@ impl Target { return None } - if let Some(update) = http_source.set() { - Some( - Response::builder() - .header("Content-Type", format.content_type()) - .body(Body::wrap_stream(stream::iter( - format.stream(update) - .map(Result::<_, Infallible>::Ok) - ))) - .unwrap() - ) + let update = http_source.data(); + let update = match update.as_ref() { + Some(update) => update, + None => { + return Some( + Response::builder() + .status(503) + .header("Content-Type", "text/plain") + .body( + "Initial validation ongoing. \ + Please wait.".into() + ) + .unwrap() + ) + } + }; + + if update.is_not_modified(request) { + return Some(update.not_modified()) } - else { - Some( + + Some( + update.header( Response::builder() - .status(503) - .header("Content-Type", "text/plain") - .body("Initial validation ongoing. Please wait.".into()) - .unwrap() + ).header( + "Content-Type", format.content_type() ) - } + .body(Body::wrap_stream(stream::iter( + format.stream(update.set.clone()) + .map(Result::<_, Infallible>::Ok) + ))) + .unwrap() + ) } ); component.register_http_resource(processor.clone()); + let mut state = State::new(); + loop { debug!("Target {}: link status: {}", component.name(), unit.get_status() @@ -76,33 +96,106 @@ impl Target { "Target {}: Got update ({} entries)", component.name(), update.set().len() ); - source.update(update); + source.update(SourceData::new(update, &mut state)); } } } } - //------------ Source -------------------------------------------------------- /// The date source for an HTTP target. #[derive(Clone, Default)] struct Source { /// The current set of RTR data. - data: Arc>> + data: Arc>> } impl Source { /// Updates the data source from the given update. - fn update(&self, update: payload::Update) { - self.data.store(Some(update.set().clone()).into()) + fn update(&self, data: SourceData) { + self.data.store(Some(data).into()) } - /// Returns the current payload set. - fn set(&self) -> Option { - self.data.load().as_ref().as_ref().cloned() + /// Returns the current payload data. + fn data(&self) -> Arc> { + self.data.load_full() } } +//------------ SourceData ---------------------------------------------------- + +/// The data held by a data source. +/// +struct SourceData { + set: payload::Set, + etag: String, + created: DateTime, +} + +impl SourceData { + fn new(update: payload::Update, state: &mut State) -> Self { + let etag = format!("\"{:x}-{}\"", state.session(), state.serial()); + state.inc(); + Self { + set: update.set().clone(), + etag, + created: Utc::now(), + } + } + + /// Returns whether 304 Not Modified response should be retured. + fn is_not_modified(&self, req: &Request) -> bool { + // First, check If-None-Match. + for value in req.headers().get_all("If-None-Match").iter() { + // Skip ill-formatted values. By being lazy here we may falsely + // return a full response, so this should be fine. + let value = match value.to_str() { + Ok(value) => value, + Err(_) => continue + }; + let value = value.trim(); + if value == "*" { + return true + } + for tag in EtagsIter::new(value) { + if tag.trim() == self.etag { + return true + } + } + } + + // Now, the If-Modified-Since header. + if let Some(value) = req.headers().get("If-Modified-Since") { + let value = match value.to_str() { + Ok(value) => value, + Err(_) => return false, + }; + if let Some(date) = parse_http_date(value) { + if date >= self.created { + return true + } + } + } + + false + } + + fn not_modified(&self) -> Response { + self.header( + response::Builder::new().status( + StatusCode::NOT_MODIFIED + ) + ).body(Body::empty()).expect("broken HTTP response builder") + } + + fn header(&self, builder: response::Builder) -> response::Builder { + builder.header( + "ETag", &self.etag + ).header( + "Last-Modified", format_http_date(self.created) + ) + } +} diff --git a/src/utils/http.rs b/src/utils/http.rs new file mode 100644 index 0000000..5fc78db --- /dev/null +++ b/src/utils/http.rs @@ -0,0 +1,160 @@ +use chrono::{DateTime, Utc}; +use chrono::format::{Item, Fixed, Numeric, Pad}; + + +//------------ Parsing Etags ------------------------------------------------- + +/// An iterator over the etags in an If-Not-Match header value. +/// +/// This does not handle the "*" value. +/// +/// One caveat: The iterator stops when it encounters bad formatting which +/// makes this indistinguishable from reaching the end of a correctly +/// formatted value. As a consequence, we will 304 a request that has the +/// right tag followed by garbage. +pub struct EtagsIter<'a>(&'a str); + +impl<'a> EtagsIter<'a> { + pub fn new(value: &'a str) -> Self { + Self(value) + } +} + +impl<'a> Iterator for EtagsIter<'a> { + type Item = &'a str; + + fn next(&mut self) -> Option { + // Skip white space and check if we are done. + self.0 = self.0.trim_start(); + if self.0.is_empty() { + return None + } + + // We either have to have a lone DQUOTE or one prefixed by W/ + let prefix_len = if self.0.starts_with('"') { + 1 + } + else if self.0.starts_with("W/\"") { + 3 + } + else { + return None + }; + + // Find the end of the tag which is after the next DQUOTE. + let end = match self.0[prefix_len..].find('"') { + Some(index) => index + prefix_len + 1, + None => return None + }; + + let res = &self.0[0..end]; + + // Move past the second DQUOTE and any space. + self.0 = self.0[end..].trim_start(); + + // If we have a comma, skip over that and any space. + if self.0.starts_with(',') { + self.0 = self.0[1..].trim_start(); + } + + Some(res) + } +} + + +//------------ Parsing and Constructing HTTP Dates --------------------------- + +/// Definition of the preferred date format (aka IMF-fixdate). +/// +/// The definition allows for relaxed parsing: It accepts additional white +/// space and ignores case for textual representations. It does, however, +/// construct the correct representation when formatting. +const IMF_FIXDATE: &[Item<'static>] = &[ + Item::Space(""), + Item::Fixed(Fixed::ShortWeekdayName), + Item::Space(""), + Item::Literal(","), + Item::Space(" "), + Item::Numeric(Numeric::Day, Pad::Zero), + Item::Space(" "), + Item::Fixed(Fixed::ShortMonthName), + Item::Space(" "), + Item::Numeric(Numeric::Year, Pad::Zero), + Item::Space(" "), + Item::Numeric(Numeric::Hour, Pad::Zero), + Item::Literal(":"), + Item::Numeric(Numeric::Minute, Pad::Zero), + Item::Literal(":"), + Item::Numeric(Numeric::Second, Pad::Zero), + Item::Space(" "), + Item::Literal("GMT"), + Item::Space(""), +]; + +/// Definition of the obsolete RFC850 date format.. +const RFC850_DATE: &[Item<'static>] = &[ + Item::Space(""), + Item::Fixed(Fixed::LongWeekdayName), + Item::Space(""), + Item::Literal(","), + Item::Space(" "), + Item::Numeric(Numeric::Day, Pad::Zero), + Item::Literal("-"), + Item::Fixed(Fixed::ShortMonthName), + Item::Literal("-"), + Item::Numeric(Numeric::YearMod100, Pad::Zero), + Item::Space(" "), + Item::Numeric(Numeric::Hour, Pad::Zero), + Item::Literal(":"), + Item::Numeric(Numeric::Minute, Pad::Zero), + Item::Literal(":"), + Item::Numeric(Numeric::Second, Pad::Zero), + Item::Space(" "), + Item::Literal("GMT"), + Item::Space(""), +]; + +/// Definition of the obsolete asctime date format. +const ASCTIME_DATE: &[Item<'static>] = &[ + Item::Space(""), + Item::Fixed(Fixed::ShortWeekdayName), + Item::Space(" "), + Item::Fixed(Fixed::ShortMonthName), + Item::Space(" "), + Item::Numeric(Numeric::Day, Pad::Space), + Item::Space(" "), + Item::Numeric(Numeric::Hour, Pad::Zero), + Item::Literal(":"), + Item::Numeric(Numeric::Minute, Pad::Zero), + Item::Literal(":"), + Item::Numeric(Numeric::Second, Pad::Zero), + Item::Space(" "), + Item::Numeric(Numeric::Year, Pad::Zero), + Item::Space(""), +]; + +/// Parses an HTTP date. +/// +/// Since all date format allow ASCII characters only, this expects a str. +/// If it cannot parse the date, it simply returns `None`. +#[allow(clippy::question_mark)] // False positive. +pub fn parse_http_date(date: &str) -> Option> { + use chrono::format::{Parsed, parse}; + + let mut parsed = Parsed::new(); + if parse(&mut parsed, date, IMF_FIXDATE.iter()).is_err() { + parsed = Parsed::new(); + if parse(&mut parsed, date, RFC850_DATE.iter()).is_err() { + parsed = Parsed::new(); + if parse(&mut parsed, date, ASCTIME_DATE.iter()).is_err() { + return None + } + } + } + parsed.to_datetime_with_timezone(&Utc).ok() +} + +pub fn format_http_date(date: DateTime) -> String { + date.format_with_items(IMF_FIXDATE.iter()).to_string() +} + diff --git a/src/utils/mod.rs b/src/utils/mod.rs new file mode 100644 index 0000000..3883215 --- /dev/null +++ b/src/utils/mod.rs @@ -0,0 +1 @@ +pub mod http; From ece9fba729f7a769c17c36ca7563fd96d3345fc9 Mon Sep 17 00:00:00 2001 From: Martin Hoffmann Date: Tue, 28 Nov 2023 13:21:50 +0100 Subject: [PATCH 2/9] Switch to non-blocking reqwest. --- Cargo.lock | 279 +++++++++++++++++++++++++--------------------- Cargo.toml | 3 +- src/manager.rs | 2 +- src/units/json.rs | 241 +++++++++++++++++++++++---------------- 4 files changed, 301 insertions(+), 224 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 71b5caf..09ab31d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -72,9 +72,9 @@ dependencies = [ [[package]] name = "base64" -version = "0.21.4" +version = "0.21.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ba43ea6f343b788c8764558649e08df62f86c6ef251fdaeb1ffd010a9ae50a2" +checksum = "35636a1494ede3b646cc98f74f8e62c773a38a659ebc777a2cf26b9b74171df9" [[package]] name = "bcder" @@ -234,18 +234,18 @@ checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" [[package]] name = "form_urlencoded" -version = "1.2.0" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a62bc1cf6f830c2ec14a513a9fb124d0a213a629668a4186f329db21fe045652" +checksum = "e13624c2627564efccf4934284bdd98cbaa14e79b0b5a141218e507b3a823456" dependencies = [ "percent-encoding", ] [[package]] name = "futures" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "23342abe12aba583913b2e62f22225ff9c950774065e4bfb61a19cd9770fec40" +checksum = "da0290714b38af9b4a7b094b8a37086d1b4e61f2df9122c3cad2577669145335" dependencies = [ "futures-channel", "futures-core", @@ -258,9 +258,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "955518d47e09b25bbebc7a18df10b81f0c766eaf4c4f1cccef2fca5f2a4fb5f2" +checksum = "ff4dd66668b557604244583e3e1e1eada8c5c2e96a6d0d6653ede395b78bbacb" dependencies = [ "futures-core", "futures-sink", @@ -268,15 +268,15 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c" +checksum = "eb1d22c66e66d9d72e1758f0bd7d4fd0bee04cad842ee34587d68c07e45d088c" [[package]] name = "futures-executor" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ccecee823288125bd88b4d7f565c9e58e41858e47ab72e8ea2d64e93624386e0" +checksum = "0f4fb8693db0cf099eadcca0efe2a5a22e4550f98ed16aba6c48700da29597bc" dependencies = [ "futures-core", "futures-task", @@ -285,15 +285,15 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964" +checksum = "8bf34a163b5c4c52d0478a4d757da8fb65cabef42ba90515efee0f6f9fa45aaa" [[package]] name = "futures-macro" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" +checksum = "53b153fd91e4b0147f4aced87be237c98248656bb01050b96bf3ee89220a8ddb" dependencies = [ "proc-macro2", "quote", @@ -302,21 +302,21 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f43be4fe21a13b9781a69afa4985b0f6ee0e1afab2c6f454a8cf30e2b2237b6e" +checksum = "e36d3378ee38c2a36ad710c5d30c2911d752cb941c00c72dbabfb786a7970817" [[package]] name = "futures-task" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65" +checksum = "efd193069b0ddadc69c46389b740bbccdd97203899b48d09c5f7969591d6bae2" [[package]] name = "futures-util" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533" +checksum = "a19526d624e703a3179b3d322efec918b6246ea0fa51d41124525f00f1cc8104" dependencies = [ "futures-channel", "futures-core", @@ -332,9 +332,9 @@ dependencies = [ [[package]] name = "getrandom" -version = "0.2.10" +version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be4136b2a15dd319360be1c07d9933517ccf0be8f16bf62a3bee4f0d618df427" +checksum = "fe9006bed769170c11f845cf00c7c1e9092aeb3f268e007c3e760ac68008070f" dependencies = [ "cfg-if", "libc", @@ -343,15 +343,15 @@ dependencies = [ [[package]] name = "gimli" -version = "0.28.0" +version = "0.28.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6fb8d784f27acf97159b40fc4db5ecd8aa23b9ad5ef69cdd136d3bc80665f0c0" +checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253" [[package]] name = "h2" -version = "0.3.21" +version = "0.3.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91fc23aa11be92976ef4729127f1a74adf36d8436f7816b185d18df956790833" +checksum = "4d6250322ef6e60f93f9a2162799302cd6f68f79f6e5d85c8c16f14d1d958178" dependencies = [ "bytes", "fnv", @@ -359,7 +359,7 @@ dependencies = [ "futures-sink", "futures-util", "http", - "indexmap 1.9.3", + "indexmap 2.1.0", "slab", "tokio", "tokio-util", @@ -374,9 +374,9 @@ checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" [[package]] name = "hashbrown" -version = "0.14.1" +version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7dfda62a12f55daeae5015f81b0baea145391cb4520f86c248fc615d72640d12" +checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604" [[package]] name = "hermit-abi" @@ -406,9 +406,9 @@ dependencies = [ [[package]] name = "http" -version = "0.2.9" +version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bd6effc99afb63425aff9b05836f029929e345a6148a14b7ecd5ab67af944482" +checksum = "8947b1a6fad4393052c7ba1f4cd97bed3e953a95c79c92ad9b051a04611d9fbb" dependencies = [ "bytes", "fnv", @@ -455,7 +455,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2 0.4.9", + "socket2 0.4.10", "tokio", "tower-service", "tracing", @@ -464,9 +464,9 @@ dependencies = [ [[package]] name = "hyper-rustls" -version = "0.24.1" +version = "0.24.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d78e1e73ec14cf7375674f74d7dde185c8206fd9dea6fb6295e8a98098aaa97" +checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590" dependencies = [ "futures-util", "http", @@ -501,9 +501,9 @@ dependencies = [ [[package]] name = "idna" -version = "0.4.0" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7d20d6b07bfbc108882d88ed8e37d39636dcc260e15e30c45e6ba089610b917c" +checksum = "634d9b1461af396cad843f47fdba5597a4f9e6ddd4bfb6ff5d85028c25cb12f6" dependencies = [ "unicode-bidi", "unicode-normalization", @@ -521,19 +521,19 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.0.2" +version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8adf3ddd720272c6ea8bf59463c04e0f93d0bbf7c5439b691bca2987e0270897" +checksum = "d530e1a18b1cb4c484e6e34556a0d948706958449fca0cab753d649f2bce3d1f" dependencies = [ "equivalent", - "hashbrown 0.14.1", + "hashbrown 0.14.3", ] [[package]] name = "ipnet" -version = "2.8.0" +version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28b29a3cd74f0f4598934efe3aeba42bae0eb4680554128851ebbecb02af14e6" +checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3" [[package]] name = "itoa" @@ -543,18 +543,18 @@ checksum = "af150ab688ff2122fcef229be89cb50dd66af9e01a4ff320cc137eecc9bacc38" [[package]] name = "js-sys" -version = "0.3.64" +version = "0.3.66" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c5f195fe497f702db0f318b07fdd68edb16955aed830df8363d837542f8f935a" +checksum = "cee9c64da59eae3b50095c18d3e74f8b73c0b86d2792824ff01bbce68ba229ca" dependencies = [ "wasm-bindgen", ] [[package]] name = "libc" -version = "0.2.149" +version = "0.2.150" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a08173bc88b7955d1b3145aa561539096c421ac8debde8cbc3612ec635fee29b" +checksum = "89d92a4743f9a61002fae18374ed11e7973f530cb3a3255fb354818118b2203c" [[package]] name = "log" @@ -602,9 +602,9 @@ dependencies = [ [[package]] name = "mio" -version = "0.8.8" +version = "0.8.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "927a765cd3fc26206e66b296465fa9d3e5ab003e651c1b3c060e7956d96b19d2" +checksum = "3dce281c5e46beae905d4de1870d8b1509a9142b62eedf18b443b011ca8343d0" dependencies = [ "libc", "wasi", @@ -662,9 +662,9 @@ checksum = "e2355d85b9a3786f481747ced0e0ff2ba35213a1f9bd406ed906554d7af805a1" [[package]] name = "percent-encoding" -version = "2.3.0" +version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94" +checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" [[package]] name = "pin-project-lite" @@ -692,9 +692,9 @@ checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" [[package]] name = "proc-macro2" -version = "1.0.69" +version = "1.0.70" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "134c189feb4956b20f6f547d2cf727d4c0fe06722b20a0eec87ed445a97f92da" +checksum = "39278fbbf5fb4f646ce651690877f89d1c5811a3d4acb27700c1cb3cdb78fd3b" dependencies = [ "unicode-ident", ] @@ -796,12 +796,26 @@ dependencies = [ "cc", "libc", "once_cell", - "spin", - "untrusted", + "spin 0.5.2", + "untrusted 0.7.1", "web-sys", "winapi", ] +[[package]] +name = "ring" +version = "0.17.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb0205304757e5d899b9c2e448b867ffd03ae7f988002e47cd24954391394d0b" +dependencies = [ + "cc", + "getrandom", + "libc", + "spin 0.9.8", + "untrusted 0.9.0", + "windows-sys", +] + [[package]] name = "rpki" version = "0.17.2" @@ -814,12 +828,12 @@ dependencies = [ "chrono", "futures-util", "log", - "ring", + "ring 0.16.20", "serde", "serde_json", "tokio", "tokio-stream", - "untrusted", + "untrusted 0.7.1", "uuid", ] @@ -828,6 +842,7 @@ name = "rtrtr" version = "0.3.0-dev" dependencies = [ "arc-swap", + "bytes", "chrono", "clap", "crossbeam-utils", @@ -863,33 +878,33 @@ checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" [[package]] name = "rustls" -version = "0.21.7" +version = "0.21.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cd8d6c9f025a446bc4d18ad9632e69aec8f287aa84499ee335599fabd20c3fd8" +checksum = "629648aced5775d558af50b2b4c7b02983a04b312126d45eeead26e7caa498b9" dependencies = [ "log", - "ring", + "ring 0.17.5", "rustls-webpki", "sct", ] [[package]] name = "rustls-pemfile" -version = "1.0.3" +version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2d3987094b1d07b653b7dfdc3f70ce9a1da9c51ac18c1b06b662e4f9a0e9f4b2" +checksum = "1c74cae0a4cf6ccbbf5f359f08efdf8ee7e1dc532573bf0db71968cb56b1448c" dependencies = [ "base64", ] [[package]] name = "rustls-webpki" -version = "0.101.6" +version = "0.101.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c7d5dece342910d9ba34d259310cae3e0154b873b35408b787b59bce53d34fe" +checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765" dependencies = [ - "ring", - "untrusted", + "ring 0.17.5", + "untrusted 0.9.0", ] [[package]] @@ -900,28 +915,28 @@ checksum = "1ad4cc8da4ef723ed60bced201181d83791ad433213d8c24efffda1eec85d741" [[package]] name = "sct" -version = "0.7.0" +version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d53dcdb7c9f8158937a7981b48accfd39a43af418591a5d008c7b22b5e1b7ca4" +checksum = "da046153aa2352493d6cb7da4b6e5c0c057d8a1d0a9aa8560baffdd945acd414" dependencies = [ - "ring", - "untrusted", + "ring 0.17.5", + "untrusted 0.9.0", ] [[package]] name = "serde" -version = "1.0.189" +version = "1.0.193" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e422a44e74ad4001bdc8eede9a4570ab52f71190e9c076d14369f38b9200537" +checksum = "25dd9975e68d0cb5aa1120c288333fc98731bd1dd12f561e468ea4728c042b89" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.189" +version = "1.0.193" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e48d1f918009ce3145511378cf68d613e3b3d9137d67272562080d68a2b32d5" +checksum = "43576ca501357b9b071ac53cdc7da8ef0cbd9493d8df094cd821777ea6e894d3" dependencies = [ "proc-macro2", "quote", @@ -930,9 +945,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.107" +version = "1.0.108" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6b420ce6e3d8bd882e9b243c6eed35dbc9a6110c9769e74b584e0d68d1f20c65" +checksum = "3d1c7e3eac408d115102c4c24ad393e0821bb3a5df4d506a80f85f7a742a526b" dependencies = [ "itoa", "ryu", @@ -941,9 +956,9 @@ dependencies = [ [[package]] name = "serde_spanned" -version = "0.6.3" +version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "96426c9936fd7a0124915f9185ea1d20aa9445cc9821142f0a73bc9207a2e186" +checksum = "12022b835073e5b11e90a14f86838ceb1c8fb0325b72416845c487ac0fa95e80" dependencies = [ "serde", ] @@ -971,15 +986,15 @@ dependencies = [ [[package]] name = "smallvec" -version = "1.11.1" +version = "1.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "942b4a808e05215192e39f4ab80813e599068285906cc91aa64f923db842bd5a" +checksum = "4dccd0940a2dcdf68d092b8cbab7dc0ad8fa938bf95787e1b916b0e3d0e8e970" [[package]] name = "socket2" -version = "0.4.9" +version = "0.4.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64a4a911eed85daf18834cfaa86a79b7d266ff93ff5ba14005426219480ed662" +checksum = "9f7916fc008ca5542385b89a3d3ce689953c143e9304a9bf8beec1de48994c0d" dependencies = [ "libc", "winapi", @@ -987,9 +1002,9 @@ dependencies = [ [[package]] name = "socket2" -version = "0.5.4" +version = "0.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4031e820eb552adee9295814c0ced9e5cf38ddf1e8b7d566d6de8e2538ea989e" +checksum = "7b5fac59a5cb5dd637972e5fca70daf0523c9067fcdc4842f053dae04a18f8e9" dependencies = [ "libc", "windows-sys", @@ -1001,6 +1016,12 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" +[[package]] +name = "spin" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" + [[package]] name = "strsim" version = "0.10.0" @@ -1009,9 +1030,9 @@ checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" [[package]] name = "syn" -version = "2.0.38" +version = "2.0.39" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e96b79aaa137db8f61e26363a0c9b47d8b4ec75da28b7d1d614c2303e232408b" +checksum = "23e78b90f2fcf45d3e842032ce32e3f2d1545ba6636271dcbf24fa306d87be7a" dependencies = [ "proc-macro2", "quote", @@ -1054,9 +1075,9 @@ dependencies = [ [[package]] name = "termcolor" -version = "1.3.0" +version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6093bad37da69aab9d123a8091e4be0aa4a03e4d601ec641c327398315f62b64" +checksum = "ff1bc3d3f05aff0403e8ac0d92ced918ec05b666a43f83297ccef5bea8a3d449" dependencies = [ "winapi-util", ] @@ -1115,9 +1136,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.33.0" +version = "1.34.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4f38200e3ef7995e5ef13baec2f432a6da0aa9ac495b2c0e8f3b7eec2c92d653" +checksum = "d0c014766411e834f7af5b8f4cf46257aab4036ca95e9d2c144a10f59ad6f5b9" dependencies = [ "backtrace", "bytes", @@ -1125,16 +1146,16 @@ dependencies = [ "mio", "num_cpus", "pin-project-lite", - "socket2 0.5.4", + "socket2 0.5.5", "tokio-macros", "windows-sys", ] [[package]] name = "tokio-macros" -version = "2.1.0" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" +checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" dependencies = [ "proc-macro2", "quote", @@ -1164,9 +1185,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.9" +version = "0.7.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d68074620f57a0b21594d9735eb2e98ab38b17f80d3fcb189fca266771ca60d" +checksum = "5419f34732d9eb6ee4c3578b7989078579b7f039cbbb9ca2c4da015749371e15" dependencies = [ "bytes", "futures-core", @@ -1178,9 +1199,9 @@ dependencies = [ [[package]] name = "toml" -version = "0.8.2" +version = "0.8.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "185d8ab0dfbb35cf1399a6344d8484209c088f75f8f68230da55d48d95d43e3d" +checksum = "a1a195ec8c9da26928f773888e0742ca3ca1040c6cd859c919c9f59c1954ab35" dependencies = [ "serde", "serde_spanned", @@ -1190,20 +1211,20 @@ dependencies = [ [[package]] name = "toml_datetime" -version = "0.6.3" +version = "0.6.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7cda73e2f1397b1262d6dfdcef8aafae14d1de7748d66822d3bfeeb6d03e5e4b" +checksum = "3550f4e9685620ac18a50ed434eb3aec30db8ba93b0287467bca5826ea25baf1" dependencies = [ "serde", ] [[package]] name = "toml_edit" -version = "0.20.2" +version = "0.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "396e4d48bbb2b7554c944bde63101b5ae446cff6ec4a24227428f15eb72ef338" +checksum = "d34d383cd00a163b4a5b85053df514d45bc330f6de7737edfe0a93311d1eaa03" dependencies = [ - "indexmap 2.0.2", + "indexmap 2.1.0", "serde", "serde_spanned", "toml_datetime", @@ -1218,9 +1239,9 @@ checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" [[package]] name = "tracing" -version = "0.1.39" +version = "0.1.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee2ef2af84856a50c1d430afce2fdded0a4ec7eda868db86409b4543df0797f9" +checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" dependencies = [ "pin-project-lite", "tracing-core", @@ -1268,11 +1289,17 @@ version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" +[[package]] +name = "untrusted" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" + [[package]] name = "url" -version = "2.4.1" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "143b538f18257fac9cad154828a57c6bf5157e1aa604d4816b5995bf6de87ae5" +checksum = "31e6302e3bb753d46e83516cae55ae196fc0c309407cf11ab35cc51a4c2a4633" dependencies = [ "form_urlencoded", "idna", @@ -1282,9 +1309,9 @@ dependencies = [ [[package]] name = "uuid" -version = "1.4.1" +version = "1.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "79daa5ed5740825c40b389c5e50312b9c86df53fccd33f281df655642b43869d" +checksum = "5e395fcf16a7a3d8127ec99782007af141946b4795001f876d54fb0d55978560" [[package]] name = "version_check" @@ -1309,9 +1336,9 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" [[package]] name = "wasm-bindgen" -version = "0.2.87" +version = "0.2.89" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7706a72ab36d8cb1f80ffbf0e071533974a60d0a308d01a5d0375bf60499a342" +checksum = "0ed0d4f68a3015cc185aff4db9506a015f4b96f95303897bfa23f846db54064e" dependencies = [ "cfg-if", "wasm-bindgen-macro", @@ -1319,9 +1346,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.87" +version = "0.2.89" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ef2b6d3c510e9625e5fe6f509ab07d66a760f0885d858736483c32ed7809abd" +checksum = "1b56f625e64f3a1084ded111c4d5f477df9f8c92df113852fa5a374dbda78826" dependencies = [ "bumpalo", "log", @@ -1334,9 +1361,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-futures" -version = "0.4.37" +version = "0.4.39" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c02dbc21516f9f1f04f187958890d7e6026df8d16540b7ad9492bc34a67cea03" +checksum = "ac36a15a220124ac510204aec1c3e5db8a22ab06fd6706d881dc6149f8ed9a12" dependencies = [ "cfg-if", "js-sys", @@ -1346,9 +1373,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.87" +version = "0.2.89" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dee495e55982a3bd48105a7b947fd2a9b4a8ae3010041b9e0faab3f9cd028f1d" +checksum = "0162dbf37223cd2afce98f3d0785506dcb8d266223983e4b5b525859e6e182b2" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -1356,9 +1383,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.87" +version = "0.2.89" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "54681b18a46765f095758388f2d0cf16eb8d4169b639ab575a8f5693af210c7b" +checksum = "f0eb82fcb7930ae6219a7ecfd55b217f5f0893484b7a13022ebb2b2bf20b5283" dependencies = [ "proc-macro2", "quote", @@ -1369,15 +1396,15 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.87" +version = "0.2.89" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca6ad05a4870b2bf5fe995117d3728437bd27d7cd5f06f13c17443ef369775a1" +checksum = "7ab9b36309365056cd639da3134bf87fa8f3d86008abf99e612384a6eecd459f" [[package]] name = "web-sys" -version = "0.3.64" +version = "0.3.65" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b85cbef8c220a6abc02aefd892dfc0fc23afb1c6a426316ec33253a3877249b" +checksum = "5db499c5f66323272151db0e666cd34f78617522fb0c1604d31a27c50c206a85" dependencies = [ "js-sys", "wasm-bindgen", @@ -1385,9 +1412,9 @@ dependencies = [ [[package]] name = "webpki-roots" -version = "0.25.2" +version = "0.25.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "14247bb57be4f377dfb94c72830b8ce8fc6beac03cf4bf7b9732eadd414123fc" +checksum = "1778a42e8b3b90bff8d0f5032bf22250792889a5cdc752aa0020c84abe3aaf10" [[package]] name = "winapi" @@ -1497,9 +1524,9 @@ checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" [[package]] name = "winnow" -version = "0.5.17" +version = "0.5.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a3b801d0e0a6726477cc207f60162da452f3a95adb368399bef20a946e06f65c" +checksum = "829846f3e3db426d4cee4510841b71a8e58aa2a76b1132579487ae430ccd9c7b" dependencies = [ "memchr", ] diff --git a/Cargo.toml b/Cargo.toml index acef6e2..3317a5a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,7 @@ readme = "README.md" [dependencies] arc-swap = "1.0" +bytes = "1" chrono = "0.4.31" clap = { version = "3.0", features = [ "cargo" ] } crossbeam-utils = "0.8.4" @@ -29,7 +30,7 @@ rustls-webpki = "0.101.6" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" slab = "0.4.2" -tokio = { version = "1.6", features = ["io-util", "macros", "net", "rt", "rt-multi-thread", "sync", "time"]} +tokio = { version = "1.6", features = ["fs", "io-util", "macros", "net", "rt", "rt-multi-thread", "sync", "time"]} tokio-rustls = "0.24.1" tokio-stream = { version = "0.1", features = ["net"] } toml = "0.8.2" diff --git a/src/manager.rs b/src/manager.rs index 71e584b..64a9220 100644 --- a/src/manager.rs +++ b/src/manager.rs @@ -5,7 +5,7 @@ use std::collections::HashMap; use std::sync::Arc; use log::error; use serde::Deserialize; -use reqwest::blocking::Client as HttpClient; +use reqwest::Client as HttpClient; use tokio::runtime::Runtime; use crate::{http, metrics}; use crate::comms::{Gate, GateAgent, Link}; diff --git a/src/units/json.rs b/src/units/json.rs index 0ce0f64..3d59438 100644 --- a/src/units/json.rs +++ b/src/units/json.rs @@ -1,21 +1,25 @@ //! JSON clients. -use std::{io, thread}; -use std::convert::TryFrom; -use std::fs::File; +use std::{cmp, io}; use std::str::FromStr; use std::time::Duration; +//use chrono::{DateTime, Utc}; +use bytes::{Buf, Bytes, BytesMut}; use log::{debug, warn}; use reqwest::Url; use rpki::rtr::Serial; use serde::Deserialize; -use tokio::sync::oneshot; +use tokio::fs::File; +use tokio::io::AsyncReadExt; +use tokio::task::spawn_blocking; use tokio::time::{Instant, timeout_at}; use crate::payload; use crate::comms::{Gate, Terminated, UnitStatus}; use crate::config::ConfigPath; use crate::formats::json::Set as JsonSet; use crate::manager::Component; +use crate::log::Failed; + //------------ Json ---------------------------------------------------------- @@ -33,7 +37,7 @@ impl Json { pub async fn run( self, component: Component, gate: Gate ) -> Result<(), Terminated> { - JsonRunner::new(self, component, gate).run().await + JsonRunner::new(self, component).run(gate).await } } @@ -43,45 +47,52 @@ impl Json { struct JsonRunner { json: Json, component: Component, - gate: Gate, serial: Serial, status: UnitStatus, current: Option, + /* + last_modified: Option>, + etag: Option, + */ } impl JsonRunner { fn new( - json: Json, component: Component, gate: Gate + json: Json, component: Component ) -> Self { JsonRunner { - json, component, gate, + json, component, serial: Serial::default(), status: UnitStatus::Stalled, current: Default::default(), + /* + last_modified: None, + etag: None, + */ } } - async fn run(mut self) -> Result<(), Terminated> { - self.component.register_metrics(self.gate.metrics()); - self.gate.update_status(self.status).await; + async fn run(mut self, mut gate: Gate) -> Result<(), Terminated> { + self.component.register_metrics(gate.metrics()); + gate.update_status(self.status).await; loop { - self.step().await?; - self.wait().await?; + self.step(&mut gate).await?; + self.wait(&mut gate).await?; } } - async fn step(&mut self) -> Result<(), Terminated> { - match self.load_json().await? { - Some(res) => { + async fn step(&mut self, gate: &mut Gate) -> Result<(), Terminated> { + match gate.process_until(self.fetch_json()).await? { + Ok(res) => { let res = res.into_payload(); if self.current.as_ref() != Some(&res) { self.serial = self.serial.add(1); self.current = Some(res.clone()); if self.status != UnitStatus::Healthy { self.status = UnitStatus::Healthy; - self.gate.update_status(self.status).await + gate.update_status(self.status).await } - self.gate.update_data( + gate.update_data( payload::Update::new(self.serial, res, None) ).await; debug!( @@ -96,10 +107,10 @@ impl JsonRunner { ); } } - None => { + Err(Failed) => { if self.status != UnitStatus::Stalled { self.status = UnitStatus::Stalled; - self.gate.update_status(self.status).await + gate.update_status(self.status).await } debug!("Unit {}: marked as stalled.", self.component.name()); } @@ -107,35 +118,76 @@ impl JsonRunner { Ok(()) } - async fn load_json(&mut self) -> Result, Terminated> { - let (tx, rx) = oneshot::channel(); - let reader = match self.json.uri.reader(&self.component) { - Some(reader) => reader, - None => return Ok(None) - }; - let _ = thread::spawn(move || { - let _ = tx.send(serde_json::from_reader::<_, JsonSet>(reader)); + async fn fetch_json(&mut self) -> Result { + let reader = HttpReader::new(match self.json.uri { + SourceUri::Http(ref url) => { + ReaderSource::Http( + self.component.http_client().get( + url.clone() + ).send().await.map_err(|err| { + warn!( + "Unit {}: HTTP request failed: {}", + self.component.name(), err + ); + Failed + })? + ) + } + SourceUri::File(ref path) => { + match File::open(path).await { + Ok(file) => ReaderSource::File(file), + Err(err) => { + warn!( + "Unit {}: Failed to open file {}: {}.", + self.component.name(), path.display(), err + ); + return Err(Failed) + } + } + } }); - - // XXX I think awaiting rx should never produce an error, so - // unwrapping is the right thing to do. But is it really? - match self.gate.process_until(rx).await?.unwrap() { - Ok(res) => Ok(Some(res)), - Err(err) => { + match spawn_blocking(move || { + serde_json::from_reader::<_, JsonSet>(reader) + }).await { + Ok(Ok(res)) => Ok(res), + Ok(Err(err)) => { + // Joining succeded but JSON parsing didn’t. warn!( "{}: Failed parsing source: {}", self.component.name(), err ); - Ok(None) + Err(Failed) + } + Err(err) => { + // Joining failed. This may either be because the JSON + // parser panicked or because the future was dropped. The + // former probably means the JSON was kaputt in a very + // creative way and the latter can’t really happening. So + // it is probably safe to ignore the JSON as if it were + // broken. + if err.is_panic() { + warn!( + "Unit {}: Failed parsing source: JSON parser panicked.", + self.component.name(), + ); + } + else { + warn!( + "Unit {}: Failed parsing source: parser was dropped \ + (This can't happen.)", + self.component.name(), + ); + } + Err(Failed) } } } - async fn wait(&mut self) -> Result<(), Terminated> { + async fn wait(&mut self, gate: &mut Gate) -> Result<(), Terminated> { let end = Instant::now() + Duration::from_secs(self.json.refresh); while end > Instant::now() { - match timeout_at(end, self.gate.process()).await { + match timeout_at(end, gate.process()).await { Ok(Ok(_status)) => { //self.status = status } @@ -159,32 +211,6 @@ enum SourceUri { File(ConfigPath), } -impl SourceUri { - fn reader(&self, component: &Component) -> Option { - match *self { - SourceUri::Http(ref uri) => { - Some(JsonReader::HttpRequest( - Some(component.http_client().get(uri.clone())) - )) - } - SourceUri::File(ref path) => { - match File::open(path).map(JsonReader::File) { - Ok(some) => Some(some), - Err(err) => { - warn!( - "{}: Failed reading open {}: {}", - component.name(), - path.display(), - err - ); - None - } - } - } - } - } -} - impl TryFrom for SourceUri { type Error = ::Err; @@ -200,45 +226,68 @@ impl TryFrom for SourceUri { } -//------------ JsonReader ---------------------------------------------------- +//------------ HttpReader ---------------------------------------------------- -/// A reader producing the JSON source. -enum JsonReader { +struct HttpReader { + source: ReaderSource, + chunk: Bytes, + rt: tokio::runtime::Handle, +} + +enum ReaderSource { File(File), - HttpRequest(Option), - Http(reqwest::blocking::Response), + Http(reqwest::Response), } -impl io::Read for JsonReader { - fn read(&mut self, buf: &mut [u8]) -> Result { - let http = match *self { - JsonReader::File(ref mut inner) => { - return inner.read(buf) - } - JsonReader::Http(ref mut inner) => { - return inner.read(buf) - } - JsonReader::HttpRequest(ref mut inner) => { - match inner.take() { - Some(inner) => { - inner.send().map_err(|err| { - io::Error::new( - io::ErrorKind::Other, - err - ) - })? - } - None => { - return Err(io::Error::new( - io::ErrorKind::Other, - "already failed to send request" - )) - } +impl HttpReader { + fn new(source: ReaderSource) -> Self { + HttpReader { + source, + chunk: Bytes::new(), + rt: tokio::runtime::Handle::current() + } + } + + fn prepare_chunk(&mut self) -> Result { + if !self.chunk.is_empty() { + return Ok(true) + } + match self.source { + ReaderSource::File(ref mut file) => { + let mut buf = BytesMut::with_capacity(16384); + let read = self.rt.block_on(file.read_buf(&mut buf))?; + if read == 0 { + return Ok(false) } + self.chunk = buf.freeze(); } - }; - *self = JsonReader::Http(http); - self.read(buf) + ReaderSource::Http(ref mut response) => { + let chunk = self.rt.block_on(response.chunk()).map_err(|err| { + io::Error::new( + io::ErrorKind::Other, + format!("failed to read HTTP response: {}", err) + ) + })?; + self.chunk = match chunk { + Some(chunk) => chunk, + None => return Ok(false) + }; + } + } + Ok(true) + } +} + +impl io::Read for HttpReader { + fn read(&mut self, buf: &mut [u8]) -> Result { + if !self.prepare_chunk()? { + return Ok(0) + } + + let len = cmp::min(self.chunk.len(), buf.len()); + buf[..len].copy_from_slice(&self.chunk[..len]); + self.chunk.advance(len); + Ok(len) } } From feb4bc590ba0fb766ad1978c4216ec16809ecedc Mon Sep 17 00:00:00 2001 From: Martin Hoffmann Date: Tue, 28 Nov 2023 15:27:46 +0100 Subject: [PATCH 3/9] Add conditional requests and file modified checks to the json unit. --- src/units/json.rs | 226 +++++++++++++++++++++++++++++++++++++--------- 1 file changed, 181 insertions(+), 45 deletions(-) diff --git a/src/units/json.rs b/src/units/json.rs index 3d59438..a500988 100644 --- a/src/units/json.rs +++ b/src/units/json.rs @@ -2,11 +2,13 @@ use std::{cmp, io}; use std::str::FromStr; -use std::time::Duration; -//use chrono::{DateTime, Utc}; +use std::time::{Duration, SystemTime}; +use std::fs::metadata; +use chrono::{DateTime, Utc}; use bytes::{Buf, Bytes, BytesMut}; use log::{debug, warn}; -use reqwest::Url; +use reqwest::header; +use reqwest::{StatusCode, Url}; use rpki::rtr::Serial; use serde::Deserialize; use tokio::fs::File; @@ -19,6 +21,7 @@ use crate::config::ConfigPath; use crate::formats::json::Set as JsonSet; use crate::manager::Component; use crate::log::Failed; +use crate::utils::http::{format_http_date, parse_http_date}; //------------ Json ---------------------------------------------------------- @@ -50,10 +53,6 @@ struct JsonRunner { serial: Serial, status: UnitStatus, current: Option, - /* - last_modified: Option>, - etag: Option, - */ } impl JsonRunner { @@ -65,10 +64,6 @@ impl JsonRunner { serial: Serial::default(), status: UnitStatus::Stalled, current: Default::default(), - /* - last_modified: None, - etag: None, - */ } } @@ -83,7 +78,7 @@ impl JsonRunner { async fn step(&mut self, gate: &mut Gate) -> Result<(), Terminated> { match gate.process_until(self.fetch_json()).await? { - Ok(res) => { + Ok(Some(res)) => { let res = res.into_payload(); if self.current.as_ref() != Some(&res) { self.serial = self.serial.add(1); @@ -107,6 +102,10 @@ impl JsonRunner { ); } } + Ok(None) => { + // Fetching succeeded but there isn’t an update. Nothing + // to do, really. + } Err(Failed) => { if self.status != UnitStatus::Stalled { self.status = UnitStatus::Stalled; @@ -118,38 +117,18 @@ impl JsonRunner { Ok(()) } - async fn fetch_json(&mut self) -> Result { - let reader = HttpReader::new(match self.json.uri { - SourceUri::Http(ref url) => { - ReaderSource::Http( - self.component.http_client().get( - url.clone() - ).send().await.map_err(|err| { - warn!( - "Unit {}: HTTP request failed: {}", - self.component.name(), err - ); - Failed - })? - ) - } - SourceUri::File(ref path) => { - match File::open(path).await { - Ok(file) => ReaderSource::File(file), - Err(err) => { - warn!( - "Unit {}: Failed to open file {}: {}.", - self.component.name(), path.display(), err - ); - return Err(Failed) - } - } - } - }); + async fn fetch_json(&mut self) -> Result, Failed> { + let reader = match HttpReader::open( + &mut self.json.uri, + &self.component, + ).await? { + Some(reader) => reader, + None => return Ok(None) + }; match spawn_blocking(move || { serde_json::from_reader::<_, JsonSet>(reader) }).await { - Ok(Ok(res)) => Ok(res), + Ok(Ok(res)) => Ok(Some(res)), Ok(Err(err)) => { // Joining succeded but JSON parsing didn’t. warn!( @@ -204,11 +183,21 @@ impl JsonRunner { //------------ SourceUri ---------------------------------------------------- /// The URI of the unit’s source. +/// +/// This also contains the runtime status for the source which is perhaps a +/// bit cheeky. #[derive(Clone, Debug, Deserialize)] #[serde(try_from = "String")] enum SourceUri { - Http(Url), - File(ConfigPath), + Http { + url: Url, + last_modified: Option>, + etag: Option, + }, + File { + path: ConfigPath, + last_modified: Option, + } } impl TryFrom for SourceUri { @@ -217,10 +206,18 @@ impl TryFrom for SourceUri { fn try_from(mut src: String) -> Result { if src.starts_with("file:") { let src = src.split_off(5); - Ok(SourceUri::File(src.into())) + Ok(SourceUri::File { + path: src.into(), + last_modified: None, + }) } else { - Url::from_str(&src).map(SourceUri::Http) + let url = Url::from_str(&src)?; + Ok(SourceUri::Http { + url, + last_modified: None, + etag: None + }) } } } @@ -240,6 +237,95 @@ enum ReaderSource { } impl HttpReader { + async fn open( + uri: &mut SourceUri, + component: &Component, + ) -> Result, Failed> { + match uri { + SourceUri::Http { + ref url, ref mut etag, ref mut last_modified + } => { + Self::open_http(url, last_modified, etag, component).await + } + SourceUri::File { ref path, ref mut last_modified } => { + Self::open_file(path, last_modified, component).await + } + } + } + + async fn open_http( + uri: &Url, + last_modified: &mut Option>, + etag: &mut Option, + component: &Component, + ) -> Result, Failed> { + // Create and send the request. + let mut request = component.http_client().get(uri.clone()); + if let Some(etag) = etag.as_ref() { + request = request.header( + header::IF_NONE_MATCH, etag.as_ref() + ); + } + if let Some(ts) = last_modified { + request = request.header( + header::IF_MODIFIED_SINCE, format_http_date(*ts) + ); + } + let response = request.send().await.map_err(|err| { + warn!( + "Unit {}: HTTP request failed: {}", + component.name(), err + ); + Failed + })?; + + // Return early if we anything other than a 200 OK + if response.status() == StatusCode::NOT_MODIFIED { + return Ok(None) + } + else if response.status() != StatusCode::OK { + warn!( + "Unit {}: HTTP request return status {}", + component.name(), response.status() + ); + return Err(Failed) + } + + // Update Etag and Last-Modified. + *etag = Self::parse_etag(&response); + *last_modified = Self::parse_last_modified(&response); + + // And we are good to go! + Ok(Some(Self::new(ReaderSource::Http(response)))) + } + + async fn open_file( + path: &ConfigPath, + last_modified: &mut Option, + component: &Component, + ) -> Result, Failed> { + if let Ok(modified) = metadata(path).and_then(|meta| meta.modified()) { + if let Some(last_modified) = last_modified { + if *last_modified >= modified { + return Ok(None) + } + } + *last_modified = Some(modified) + } + + Ok(Some(Self::new( + ReaderSource::File( + File::open(path).await.map_err(|err| { + warn!( + "Unit {}: Failed to open file {}: {}.", + component.name(), path.display(), err + ); + Failed + })? + ) + ))) + } + fn new(source: ReaderSource) -> Self { HttpReader { source, @@ -276,6 +362,56 @@ impl HttpReader { } Ok(true) } + + fn parse_etag(response: &reqwest::Response) -> Option { + // Take the value of the first Etag header. Return None if there’s + // more than one, just to be safe. + let mut etags = response.headers() + .get_all(header::ETAG) + .into_iter(); + let etag = etags.next()?; + if etags.next().is_some() { + return None + } + let etag = etag.as_bytes(); + + // The tag starts with an optional case-sensitive `W/` followed by + // `"`. Let’s remember where the actual tag starts. + let start = if etag.starts_with(b"W/\"") { + 3 + } + else if etag.first() == Some(&b'"') { + 1 + } + else { + return None + }; + + // We need at least one more character. Empty tags are allowed. + if etag.len() <= start { + return None + } + + // The tag ends with a `"`. + if etag.last() != Some(&b'"') { + return None + } + + Some(Bytes::copy_from_slice(etag)) + } + + fn parse_last_modified( + response: &reqwest::Response + ) -> Option> { + let mut iter = response.headers() + .get_all(header::LAST_MODIFIED) + .into_iter(); + let value = iter.next()?; + if iter.next().is_some() { + return None + } + parse_http_date(value.to_str().ok()?) + } } impl io::Read for HttpReader { From 4d7d4c736498d590d277dd03a4f662947cb6009e Mon Sep 17 00:00:00 2001 From: Martin Hoffmann Date: Wed, 29 Nov 2023 14:45:27 +0100 Subject: [PATCH 4/9] Improve diagnostics. --- src/units/json.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/units/json.rs b/src/units/json.rs index a500988..34a4c1a 100644 --- a/src/units/json.rs +++ b/src/units/json.rs @@ -123,7 +123,10 @@ impl JsonRunner { &self.component, ).await? { Some(reader) => reader, - None => return Ok(None) + None => { + debug!("Unit {}: Source not modified.", self.component.name()); + return Ok(None) + } }; match spawn_blocking(move || { serde_json::from_reader::<_, JsonSet>(reader) @@ -132,7 +135,7 @@ impl JsonRunner { Ok(Err(err)) => { // Joining succeded but JSON parsing didn’t. warn!( - "{}: Failed parsing source: {}", + "Unit {}: Failed parsing source: {}", self.component.name(), err ); From 43254b39b48c903d6e4a8b027f64218059538dc5 Mon Sep 17 00:00:00 2001 From: Martin Hoffmann Date: Tue, 30 Jan 2024 14:48:32 +0100 Subject: [PATCH 5/9] Use HTTP header name constants in the HTTP target. --- src/targets/http.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/targets/http.rs b/src/targets/http.rs index 697e632..01ec773 100644 --- a/src/targets/http.rs +++ b/src/targets/http.rs @@ -6,6 +6,7 @@ use arc_swap::ArcSwap; use chrono::{DateTime, Utc}; use futures::stream; use hyper::{Body, Method, Request, Response, StatusCode}; +use hyper::header::{IF_NONE_MATCH, IF_MODIFIED_SINCE}; use hyper::http::response; use log::debug; use rpki::rtr::State; @@ -149,7 +150,7 @@ impl SourceData { /// Returns whether 304 Not Modified response should be retured. fn is_not_modified(&self, req: &Request) -> bool { // First, check If-None-Match. - for value in req.headers().get_all("If-None-Match").iter() { + for value in req.headers().get_all(IF_NONE_MATCH).iter() { // Skip ill-formatted values. By being lazy here we may falsely // return a full response, so this should be fine. let value = match value.to_str() { @@ -168,7 +169,7 @@ impl SourceData { } // Now, the If-Modified-Since header. - if let Some(value) = req.headers().get("If-Modified-Since") { + if let Some(value) = req.headers().get(IF_MODIFIED_SINCE) { let value = match value.to_str() { Ok(value) => value, Err(_) => return false, From 4dcd41f8e3cf4ac00f148db405911e0d88e75403 Mon Sep 17 00:00:00 2001 From: Martin Hoffmann Date: Thu, 8 Feb 2024 16:10:41 +0100 Subject: [PATCH 6/9] =?UTF-8?q?Don=E2=80=99t=20consider=20If-Modified-Sinc?= =?UTF-8?q?e=20if=20there=20was=20If-None-Match.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/targets/http.rs | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/targets/http.rs b/src/targets/http.rs index 01ec773..5f13e27 100644 --- a/src/targets/http.rs +++ b/src/targets/http.rs @@ -150,7 +150,10 @@ impl SourceData { /// Returns whether 304 Not Modified response should be retured. fn is_not_modified(&self, req: &Request) -> bool { // First, check If-None-Match. + let mut found_if_none_match = false; for value in req.headers().get_all(IF_NONE_MATCH).iter() { + found_if_none_match = true; + // Skip ill-formatted values. By being lazy here we may falsely // return a full response, so this should be fine. let value = match value.to_str() { @@ -168,7 +171,13 @@ impl SourceData { } } - // Now, the If-Modified-Since header. + // If there was at least one If-None-Match, we are supposed to + // ignore If-Modified-Since. + if found_if_none_match { + return false + } + + // Check the If-Modified-Since header. if let Some(value) = req.headers().get(IF_MODIFIED_SINCE) { let value = match value.to_str() { Ok(value) => value, From 96a23f4641f23cf68cf2ea4e16bd0a7df567bddc Mon Sep 17 00:00:00 2001 From: Martin Hoffmann Date: Thu, 8 Feb 2024 16:10:52 +0100 Subject: [PATCH 7/9] Spelling fixes. --- src/units/json.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/units/json.rs b/src/units/json.rs index 34a4c1a..d675f50 100644 --- a/src/units/json.rs +++ b/src/units/json.rs @@ -145,7 +145,7 @@ impl JsonRunner { // Joining failed. This may either be because the JSON // parser panicked or because the future was dropped. The // former probably means the JSON was kaputt in a very - // creative way and the latter can’t really happening. So + // creative way and the latter can’t really happen. So // it is probably safe to ignore the JSON as if it were // broken. if err.is_panic() { @@ -282,7 +282,7 @@ impl HttpReader { Failed })?; - // Return early if we anything other than a 200 OK + // Return early if we receive anything other than a 200 OK if response.status() == StatusCode::NOT_MODIFIED { return Ok(None) } From 97c1d4caef7ca29171e95b47003250a22eeedff8 Mon Sep 17 00:00:00 2001 From: Martin Hoffmann Date: Thu, 8 Feb 2024 16:23:24 +0100 Subject: [PATCH 8/9] Only update last_modified time after successfully opening file. --- src/units/json.rs | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/src/units/json.rs b/src/units/json.rs index d675f50..afa870d 100644 --- a/src/units/json.rs +++ b/src/units/json.rs @@ -307,6 +307,15 @@ impl HttpReader { last_modified: &mut Option, component: &Component, ) -> Result, Failed> { + let modified = metadata(path).and_then(|meta| meta.modified()).ok(); + if let (Some(modified), Some(last_modified)) = + (modified, last_modified.as_ref()) + { + if *last_modified >= modified { + return Ok(None) + } + } + if let Ok(modified) = metadata(path).and_then(|meta| meta.modified()) { if let Some(last_modified) = last_modified { if *last_modified >= modified { @@ -316,7 +325,7 @@ impl HttpReader { *last_modified = Some(modified) } - Ok(Some(Self::new( + let res = Self::new( ReaderSource::File( File::open(path).await.map_err(|err| { warn!( @@ -326,7 +335,14 @@ impl HttpReader { Failed })? ) - ))) + ); + + // Just assigning here should be fine -- if we failed to get the + // modification time then clearing the stored value is probably a + // good idea, anyway. + *last_modified = modified; + + Ok(Some(res)) } fn new(source: ReaderSource) -> Self { From 1a49f3f6aed7459ff202ef54a97aeb3f5f36966e Mon Sep 17 00:00:00 2001 From: Martin Hoffmann Date: Mon, 12 Feb 2024 16:12:56 +0100 Subject: [PATCH 9/9] Remove duplicate code. --- src/targets/http.rs | 4 ++-- src/units/json.rs | 9 --------- 2 files changed, 2 insertions(+), 11 deletions(-) diff --git a/src/targets/http.rs b/src/targets/http.rs index 5f13e27..ea2f490 100644 --- a/src/targets/http.rs +++ b/src/targets/http.rs @@ -55,7 +55,7 @@ impl Target { None => { return Some( Response::builder() - .status(503) + .status(StatusCode::SERVICE_UNAVAILABLE) .header("Content-Type", "text/plain") .body( "Initial validation ongoing. \ @@ -147,7 +147,7 @@ impl SourceData { } } - /// Returns whether 304 Not Modified response should be retured. + /// Returns whether 304 Not Modified response should be returned. fn is_not_modified(&self, req: &Request) -> bool { // First, check If-None-Match. let mut found_if_none_match = false; diff --git a/src/units/json.rs b/src/units/json.rs index eb4abea..dd7956a 100644 --- a/src/units/json.rs +++ b/src/units/json.rs @@ -310,15 +310,6 @@ impl HttpReader { } } - if let Ok(modified) = metadata(path).and_then(|meta| meta.modified()) { - if let Some(last_modified) = last_modified { - if *last_modified >= modified { - return Ok(None) - } - } - *last_modified = Some(modified) - } - let res = Self::new( ReaderSource::File( File::open(path).await.map_err(|err| {