diff --git a/src/comms.rs b/src/comms.rs index a1cf721..40c7dd0 100644 --- a/src/comms.rs +++ b/src/comms.rs @@ -16,7 +16,7 @@ use std::fmt; use std::sync::atomic; use std::sync::Arc; -use std::sync::atomic::{AtomicU32, AtomicUsize}; +use std::sync::atomic::AtomicUsize; use chrono::{DateTime, Utc}; use crossbeam_utils::atomic::AtomicCell; use futures::pin_mut; @@ -277,9 +277,6 @@ pub struct GateMetrics { /// The current unit status. status: AtomicCell, - /// The serial number of the last update. - serial: AtomicU32, - /// The number of payload items in the last update. count: AtomicUsize, @@ -292,7 +289,6 @@ pub struct GateMetrics { impl GateMetrics { /// Updates the metrics to match the given update. fn update(&self, update: &payload::Update) { - self.serial.store(update.serial().into(), atomic::Ordering::Relaxed); self.count.store(update.set().len(), atomic::Ordering::Relaxed); self.update.store(Some(Utc::now())); } @@ -308,10 +304,6 @@ impl GateMetrics { "unit_status", "the operational status of the unit", MetricType::Text, MetricUnit::Info ); - const SERIAL_METRIC: Metric = Metric::new( - "gate_serial", "the serial number of the unit's updates", - MetricType::Counter, MetricUnit::Info - ); const COUNT_METRIC: Metric = Metric::new( "vrps", "the number of VRPs in the last update", MetricType::Gauge, MetricUnit::Total @@ -335,10 +327,6 @@ impl metrics::Source for GateMetrics { target.append_simple( &Self::STATUS_METRIC, Some(unit_name), self.status.load() ); - target.append_simple( - &Self::SERIAL_METRIC, Some(unit_name), - self.serial.load(atomic::Ordering::Relaxed) - ); target.append_simple( &Self::COUNT_METRIC, Some(unit_name), self.count.load(atomic::Ordering::Relaxed) diff --git a/src/payload.rs b/src/payload.rs index 049a462..a40af55 100644 --- a/src/payload.rs +++ b/src/payload.rs @@ -57,7 +57,6 @@ use std::sync::Arc; use rpki::rtr::client::PayloadError; use rpki::rtr::payload::{Action, Payload, PayloadRef}; use rpki::rtr::server::{PayloadDiff, PayloadSet}; -use rpki::rtr::state::Serial; //------------ Pack ---------------------------------------------------------- @@ -1136,27 +1135,16 @@ impl DiffBuilder { /// be copied cheaply. #[derive(Clone, Debug)] pub struct Update { - /// The serial number of this update. - serial: Serial, - /// The new payload set. set: Set, - - /// The optional diff from the previous update. - diff: Option, } impl Update { /// Creates a new update. pub fn new( - serial: Serial, set: Set, diff: Option + set: Set ) -> Self { - Update { serial, set, diff } - } - - /// Returns the serial number of the update. - pub fn serial(&self) -> Serial { - self.serial + Update { set } } /// Returns the payload set of the update. @@ -1169,27 +1157,9 @@ impl Update { self.set } - /// Returns the diff if it can be used for the given serial. - /// - /// The method will return the diff if it is preset and if the given - /// serial is one less than the update’s serial. - pub fn get_usable_diff(&self, serial: Serial) -> Option<&Diff> { - self.diff.as_ref().and_then(|diff| { - if serial.add(1) == self.serial { - Some(diff) - } - else { - None - } - }) - } - /// Applies a diff to the update. - /// - /// The update will retain its current serial number. pub fn apply_diff_relaxed(&mut self, diff: &Diff) { self.set = diff.apply_relaxed(&self.set); - self.diff = None; } } diff --git a/src/targets/rtr.rs b/src/targets/rtr.rs index e296e29..b0a96c6 100644 --- a/src/targets/rtr.rs +++ b/src/targets/rtr.rs @@ -281,17 +281,13 @@ impl Source { None => { SourceData { state: data.state, - unit_serial: update.serial(), current: Some(update.set().clone()), diffs: Vec::new(), timing: Timing::default(), } } Some(current) => { - let diff = match update.get_usable_diff(data.unit_serial) { - Some(diff) => diff.clone(), - None => update.set().diff_from(current), - }; + let diff = update.set().diff_from(current); if diff.is_empty() { // If there is no change in data, don’t update. return @@ -313,7 +309,6 @@ impl Source { state.inc(); SourceData { state, - unit_serial: update.serial(), current: Some(update.set().clone()), diffs, timing: Timing::default(), @@ -369,9 +364,6 @@ struct SourceData { /// The current RTR state of the target. state: State, - /// The current serial of the source unit. - unit_serial: Serial, - /// The current set of RTR data. current: Option, diff --git a/src/units/json.rs b/src/units/json.rs index 0ce0f64..4fe0452 100644 --- a/src/units/json.rs +++ b/src/units/json.rs @@ -7,7 +7,6 @@ use std::str::FromStr; use std::time::Duration; use log::{debug, warn}; use reqwest::Url; -use rpki::rtr::Serial; use serde::Deserialize; use tokio::sync::oneshot; use tokio::time::{Instant, timeout_at}; @@ -44,7 +43,6 @@ struct JsonRunner { json: Json, component: Component, gate: Gate, - serial: Serial, status: UnitStatus, current: Option, } @@ -55,7 +53,6 @@ impl JsonRunner { ) -> Self { JsonRunner { json, component, gate, - serial: Serial::default(), status: UnitStatus::Stalled, current: Default::default(), } @@ -75,14 +72,13 @@ impl JsonRunner { Some(res) => { let res = res.into_payload(); if self.current.as_ref() != Some(&res) { - self.serial = self.serial.add(1); self.current = Some(res.clone()); if self.status != UnitStatus::Healthy { self.status = UnitStatus::Healthy; self.gate.update_status(self.status).await } self.gate.update_data( - payload::Update::new(self.serial, res, None) + payload::Update::new(res) ).await; debug!( "Unit {}: successfully updated.", diff --git a/src/units/rtr.rs b/src/units/rtr.rs index ecec343..f2d8c28 100644 --- a/src/units/rtr.rs +++ b/src/units/rtr.rs @@ -21,7 +21,7 @@ use log::{debug, error, info, warn}; use pin_project_lite::pin_project; use rpki::rtr::client::{Client, PayloadError, PayloadTarget, PayloadUpdate}; use rpki::rtr::payload::{Action, Payload, Timing}; -use rpki::rtr::state::{Serial, State}; +use rpki::rtr::state::State; use serde::Deserialize; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use tokio::net::TcpStream; @@ -255,9 +255,6 @@ struct RtrClient { /// Our gate status. status: GateStatus, - /// Our current serial. - serial: Serial, - /// The unit’s metrics. metrics: Arc, } @@ -269,7 +266,6 @@ impl RtrClient { connect, retry, status: Default::default(), - serial: Serial::default(), metrics, } } @@ -343,7 +339,6 @@ where } }; if let Some(update) = update { - this.serial = update.serial(); client.target_mut().current = update.set().clone(); gate.update_data(update).await; } @@ -414,14 +409,13 @@ where async fn update( &mut self, client: &mut Client, gate: &mut Gate ) -> Result, io::Error>, Terminated> { - let next_serial = self.serial.add(1); let update_fut = async { let update = client.update().await?; let state = client.state(); if update.is_definitely_empty() { return Ok((state, None)) } - match update.into_update(next_serial) { + match update.into_update() { Ok(res) => Ok((state, Some(res))), Err(err) => { client.send_error(err).await?; @@ -571,20 +565,15 @@ impl TargetUpdate { /// Converts the target update into a payload update. /// /// This will fail if the diff of a serial update doesn’t apply cleanly. - /// - /// Upon success, the returned payload update will have the given serial - /// number. - fn into_update( - self, serial: Serial - ) -> Result { + fn into_update(self) -> Result { match self { TargetUpdate::Reset(pack) => { - Ok(payload::Update::new(serial, pack.finalize().into(), None)) + Ok(payload::Update::new(pack.finalize().into())) } TargetUpdate::Serial { set, diff } => { let diff = diff.finalize(); let set = diff.apply(&set)?; - Ok(payload::Update::new(serial, set, Some(diff))) + Ok(payload::Update::new(set)) } } } diff --git a/src/units/slurm.rs b/src/units/slurm.rs index 21f295e..b522f49 100644 --- a/src/units/slurm.rs +++ b/src/units/slurm.rs @@ -121,7 +121,6 @@ impl ExceptionSet { } fn apply(&self, unit: &str, update: payload::Update) -> payload::Update { - let serial = update.serial(); let mut set = update.into_set(); for (path, file) in @@ -131,7 +130,7 @@ impl ExceptionSet { } - payload::Update::new(serial, set, None) + payload::Update::new(set) } async fn notified(&self) {