Skip to content

Commit

Permalink
Drops keeping serial number and diff with updates. (#96)
Browse files Browse the repository at this point in the history
This PR removes the serial number and optional diff from payload::Update.

The serial number was never actually used anywhere so has always been
unnecessary. However, it did provide a sort of safety net for using the
optional diff in case an update was missed somehow. This, however, only ever
happened with the RTR target, so dropping the diff and re-calculating is
probably acceptable both from a performance and reliability perspective.
  • Loading branch information
partim authored Jan 26, 2024
1 parent f707d64 commit 87f73ee
Show file tree
Hide file tree
Showing 6 changed files with 11 additions and 77 deletions.
14 changes: 1 addition & 13 deletions src/comms.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
use std::fmt;
use std::sync::atomic;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, AtomicUsize};
use std::sync::atomic::AtomicUsize;
use chrono::{DateTime, Utc};
use crossbeam_utils::atomic::AtomicCell;
use futures::pin_mut;
Expand Down Expand Up @@ -277,9 +277,6 @@ pub struct GateMetrics {
/// The current unit status.
status: AtomicCell<UnitStatus>,

/// The serial number of the last update.
serial: AtomicU32,

/// The number of payload items in the last update.
count: AtomicUsize,

Expand All @@ -292,7 +289,6 @@ pub struct GateMetrics {
impl GateMetrics {
/// Updates the metrics to match the given update.
fn update(&self, update: &payload::Update) {
self.serial.store(update.serial().into(), atomic::Ordering::Relaxed);
self.count.store(update.set().len(), atomic::Ordering::Relaxed);
self.update.store(Some(Utc::now()));
}
Expand All @@ -308,10 +304,6 @@ impl GateMetrics {
"unit_status", "the operational status of the unit",
MetricType::Text, MetricUnit::Info
);
const SERIAL_METRIC: Metric = Metric::new(
"gate_serial", "the serial number of the unit's updates",
MetricType::Counter, MetricUnit::Info
);
const COUNT_METRIC: Metric = Metric::new(
"vrps", "the number of VRPs in the last update",
MetricType::Gauge, MetricUnit::Total
Expand All @@ -335,10 +327,6 @@ impl metrics::Source for GateMetrics {
target.append_simple(
&Self::STATUS_METRIC, Some(unit_name), self.status.load()
);
target.append_simple(
&Self::SERIAL_METRIC, Some(unit_name),
self.serial.load(atomic::Ordering::Relaxed)
);
target.append_simple(
&Self::COUNT_METRIC, Some(unit_name),
self.count.load(atomic::Ordering::Relaxed)
Expand Down
34 changes: 2 additions & 32 deletions src/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ use std::sync::Arc;
use rpki::rtr::client::PayloadError;
use rpki::rtr::payload::{Action, Payload, PayloadRef};
use rpki::rtr::server::{PayloadDiff, PayloadSet};
use rpki::rtr::state::Serial;


//------------ Pack ----------------------------------------------------------
Expand Down Expand Up @@ -1136,27 +1135,16 @@ impl DiffBuilder {
/// be copied cheaply.
#[derive(Clone, Debug)]
pub struct Update {
/// The serial number of this update.
serial: Serial,

/// The new payload set.
set: Set,

/// The optional diff from the previous update.
diff: Option<Diff>,
}

impl Update {
/// Creates a new update.
pub fn new(
serial: Serial, set: Set, diff: Option<Diff>
set: Set
) -> Self {
Update { serial, set, diff }
}

/// Returns the serial number of the update.
pub fn serial(&self) -> Serial {
self.serial
Update { set }
}

/// Returns the payload set of the update.
Expand All @@ -1169,27 +1157,9 @@ impl Update {
self.set
}

/// Returns the diff if it can be used for the given serial.
///
/// The method will return the diff if it is preset and if the given
/// serial is one less than the update’s serial.
pub fn get_usable_diff(&self, serial: Serial) -> Option<&Diff> {
self.diff.as_ref().and_then(|diff| {
if serial.add(1) == self.serial {
Some(diff)
}
else {
None
}
})
}

/// Applies a diff to the update.
///
/// The update will retain its current serial number.
pub fn apply_diff_relaxed(&mut self, diff: &Diff) {
self.set = diff.apply_relaxed(&self.set);
self.diff = None;
}
}

Expand Down
10 changes: 1 addition & 9 deletions src/targets/rtr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,17 +281,13 @@ impl Source {
None => {
SourceData {
state: data.state,
unit_serial: update.serial(),
current: Some(update.set().clone()),
diffs: Vec::new(),
timing: Timing::default(),
}
}
Some(current) => {
let diff = match update.get_usable_diff(data.unit_serial) {
Some(diff) => diff.clone(),
None => update.set().diff_from(current),
};
let diff = update.set().diff_from(current);
if diff.is_empty() {
// If there is no change in data, don’t update.
return
Expand All @@ -313,7 +309,6 @@ impl Source {
state.inc();
SourceData {
state,
unit_serial: update.serial(),
current: Some(update.set().clone()),
diffs,
timing: Timing::default(),
Expand Down Expand Up @@ -369,9 +364,6 @@ struct SourceData {
/// The current RTR state of the target.
state: State,

/// The current serial of the source unit.
unit_serial: Serial,

/// The current set of RTR data.
current: Option<payload::Set>,

Expand Down
6 changes: 1 addition & 5 deletions src/units/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use std::str::FromStr;
use std::time::Duration;
use log::{debug, warn};
use reqwest::Url;
use rpki::rtr::Serial;
use serde::Deserialize;
use tokio::sync::oneshot;
use tokio::time::{Instant, timeout_at};
Expand Down Expand Up @@ -44,7 +43,6 @@ struct JsonRunner {
json: Json,
component: Component,
gate: Gate,
serial: Serial,
status: UnitStatus,
current: Option<payload::Set>,
}
Expand All @@ -55,7 +53,6 @@ impl JsonRunner {
) -> Self {
JsonRunner {
json, component, gate,
serial: Serial::default(),
status: UnitStatus::Stalled,
current: Default::default(),
}
Expand All @@ -75,14 +72,13 @@ impl JsonRunner {
Some(res) => {
let res = res.into_payload();
if self.current.as_ref() != Some(&res) {
self.serial = self.serial.add(1);
self.current = Some(res.clone());
if self.status != UnitStatus::Healthy {
self.status = UnitStatus::Healthy;
self.gate.update_status(self.status).await
}
self.gate.update_data(
payload::Update::new(self.serial, res, None)
payload::Update::new(res)
).await;
debug!(
"Unit {}: successfully updated.",
Expand Down
21 changes: 5 additions & 16 deletions src/units/rtr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use log::{debug, error, info, warn};
use pin_project_lite::pin_project;
use rpki::rtr::client::{Client, PayloadError, PayloadTarget, PayloadUpdate};
use rpki::rtr::payload::{Action, Payload, Timing};
use rpki::rtr::state::{Serial, State};
use rpki::rtr::state::State;
use serde::Deserialize;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio::net::TcpStream;
Expand Down Expand Up @@ -255,9 +255,6 @@ struct RtrClient<Connect> {
/// Our gate status.
status: GateStatus,

/// Our current serial.
serial: Serial,

/// The unit’s metrics.
metrics: Arc<RtrMetrics>,
}
Expand All @@ -269,7 +266,6 @@ impl<Connect> RtrClient<Connect> {
connect,
retry,
status: Default::default(),
serial: Serial::default(),
metrics,
}
}
Expand Down Expand Up @@ -343,7 +339,6 @@ where
}
};
if let Some(update) = update {
this.serial = update.serial();
client.target_mut().current = update.set().clone();
gate.update_data(update).await;
}
Expand Down Expand Up @@ -414,14 +409,13 @@ where
async fn update(
&mut self, client: &mut Client<Socket, Target>, gate: &mut Gate
) -> Result<Result<Option<payload::Update>, io::Error>, Terminated> {
let next_serial = self.serial.add(1);
let update_fut = async {
let update = client.update().await?;
let state = client.state();
if update.is_definitely_empty() {
return Ok((state, None))
}
match update.into_update(next_serial) {
match update.into_update() {
Ok(res) => Ok((state, Some(res))),
Err(err) => {
client.send_error(err).await?;
Expand Down Expand Up @@ -571,20 +565,15 @@ impl TargetUpdate {
/// Converts the target update into a payload update.
///
/// This will fail if the diff of a serial update doesn’t apply cleanly.
///
/// Upon success, the returned payload update will have the given serial
/// number.
fn into_update(
self, serial: Serial
) -> Result<payload::Update, PayloadError> {
fn into_update(self) -> Result<payload::Update, PayloadError> {
match self {
TargetUpdate::Reset(pack) => {
Ok(payload::Update::new(serial, pack.finalize().into(), None))
Ok(payload::Update::new(pack.finalize().into()))
}
TargetUpdate::Serial { set, diff } => {
let diff = diff.finalize();
let set = diff.apply(&set)?;
Ok(payload::Update::new(serial, set, Some(diff)))
Ok(payload::Update::new(set))
}
}
}
Expand Down
3 changes: 1 addition & 2 deletions src/units/slurm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ impl ExceptionSet {
}

fn apply(&self, unit: &str, update: payload::Update) -> payload::Update {
let serial = update.serial();
let mut set = update.into_set();

for (path, file) in
Expand All @@ -131,7 +130,7 @@ impl ExceptionSet {

}

payload::Update::new(serial, set, None)
payload::Update::new(set)
}

async fn notified(&self) {
Expand Down

0 comments on commit 87f73ee

Please sign in to comment.