Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drops keeping serial number and diff with updates. #96

Merged
merged 2 commits into from
Jan 26, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Drop keeping a serial number with updates.
partim committed Nov 23, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
commit 1fccd3ebec2edd73850c7ff1d94f0398886b280c
14 changes: 1 addition & 13 deletions src/comms.rs
Original file line number Diff line number Diff line change
@@ -16,7 +16,7 @@
use std::fmt;
use std::sync::atomic;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, AtomicUsize};
use std::sync::atomic::AtomicUsize;
use chrono::{DateTime, Utc};
use crossbeam_utils::atomic::AtomicCell;
use futures::pin_mut;
@@ -277,9 +277,6 @@ pub struct GateMetrics {
/// The current unit status.
status: AtomicCell<UnitStatus>,

/// The serial number of the last update.
serial: AtomicU32,

/// The number of payload items in the last update.
count: AtomicUsize,

@@ -292,7 +289,6 @@ pub struct GateMetrics {
impl GateMetrics {
/// Updates the metrics to match the given update.
fn update(&self, update: &payload::Update) {
self.serial.store(update.serial().into(), atomic::Ordering::Relaxed);
self.count.store(update.set().len(), atomic::Ordering::Relaxed);
self.update.store(Some(Utc::now()));
}
@@ -308,10 +304,6 @@ impl GateMetrics {
"unit_status", "the operational status of the unit",
MetricType::Text, MetricUnit::Info
);
const SERIAL_METRIC: Metric = Metric::new(
"gate_serial", "the serial number of the unit's updates",
MetricType::Counter, MetricUnit::Info
);
const COUNT_METRIC: Metric = Metric::new(
"vrps", "the number of VRPs in the last update",
MetricType::Gauge, MetricUnit::Total
@@ -335,10 +327,6 @@ impl metrics::Source for GateMetrics {
target.append_simple(
&Self::STATUS_METRIC, Some(unit_name), self.status.load()
);
target.append_simple(
&Self::SERIAL_METRIC, Some(unit_name),
self.serial.load(atomic::Ordering::Relaxed)
);
target.append_simple(
&Self::COUNT_METRIC, Some(unit_name),
self.count.load(atomic::Ordering::Relaxed)
31 changes: 5 additions & 26 deletions src/payload.rs
Original file line number Diff line number Diff line change
@@ -57,7 +57,6 @@ use std::sync::Arc;
use rpki::rtr::client::PayloadError;
use rpki::rtr::payload::{Action, Payload, PayloadRef};
use rpki::rtr::server::{PayloadDiff, PayloadSet};
use rpki::rtr::state::Serial;


//------------ Pack ----------------------------------------------------------
@@ -1136,9 +1135,6 @@ impl DiffBuilder {
/// be copied cheaply.
#[derive(Clone, Debug)]
pub struct Update {
/// The serial number of this update.
serial: Serial,

/// The new payload set.
set: Set,

@@ -1149,14 +1145,9 @@ pub struct Update {
impl Update {
/// Creates a new update.
pub fn new(
serial: Serial, set: Set, diff: Option<Diff>
set: Set, diff: Option<Diff>
) -> Self {
Update { serial, set, diff }
}

/// Returns the serial number of the update.
pub fn serial(&self) -> Serial {
self.serial
Update { set, diff }
}

/// Returns the payload set of the update.
@@ -1169,24 +1160,12 @@ impl Update {
self.set
}

/// Returns the diff if it can be used for the given serial.
///
/// The method will return the diff if it is preset and if the given
/// serial is one less than the update’s serial.
pub fn get_usable_diff(&self, serial: Serial) -> Option<&Diff> {
self.diff.as_ref().and_then(|diff| {
if serial.add(1) == self.serial {
Some(diff)
}
else {
None
}
})
/// Returns the diff if it is available.
pub fn diff(&self) -> Option<&Diff> {
self.diff.as_ref()
}

/// Applies a diff to the update.
///
/// The update will retain its current serial number.
pub fn apply_diff_relaxed(&mut self, diff: &Diff) {
self.set = diff.apply_relaxed(&self.set);
self.diff = None;
7 changes: 1 addition & 6 deletions src/targets/rtr.rs
Original file line number Diff line number Diff line change
@@ -281,14 +281,13 @@ impl Source {
None => {
SourceData {
state: data.state,
unit_serial: update.serial(),
current: Some(update.set().clone()),
diffs: Vec::new(),
timing: Timing::default(),
}
}
Some(current) => {
let diff = match update.get_usable_diff(data.unit_serial) {
let diff = match update.diff() {
Some(diff) => diff.clone(),
None => update.set().diff_from(current),
};
@@ -313,7 +312,6 @@ impl Source {
state.inc();
SourceData {
state,
unit_serial: update.serial(),
current: Some(update.set().clone()),
diffs,
timing: Timing::default(),
@@ -369,9 +367,6 @@ struct SourceData {
/// The current RTR state of the target.
state: State,

/// The current serial of the source unit.
unit_serial: Serial,

/// The current set of RTR data.
current: Option<payload::Set>,

6 changes: 1 addition & 5 deletions src/units/json.rs
Original file line number Diff line number Diff line change
@@ -7,7 +7,6 @@ use std::str::FromStr;
use std::time::Duration;
use log::{debug, warn};
use reqwest::Url;
use rpki::rtr::Serial;
use serde::Deserialize;
use tokio::sync::oneshot;
use tokio::time::{Instant, timeout_at};
@@ -44,7 +43,6 @@ struct JsonRunner {
json: Json,
component: Component,
gate: Gate,
serial: Serial,
status: UnitStatus,
current: Option<payload::Set>,
}
@@ -55,7 +53,6 @@ impl JsonRunner {
) -> Self {
JsonRunner {
json, component, gate,
serial: Serial::default(),
status: UnitStatus::Stalled,
current: Default::default(),
}
@@ -75,14 +72,13 @@ impl JsonRunner {
Some(res) => {
let res = res.into_payload();
if self.current.as_ref() != Some(&res) {
self.serial = self.serial.add(1);
self.current = Some(res.clone());
if self.status != UnitStatus::Healthy {
self.status = UnitStatus::Healthy;
self.gate.update_status(self.status).await
}
self.gate.update_data(
payload::Update::new(self.serial, res, None)
payload::Update::new(res, None)
).await;
debug!(
"Unit {}: successfully updated.",
21 changes: 5 additions & 16 deletions src/units/rtr.rs
Original file line number Diff line number Diff line change
@@ -21,7 +21,7 @@ use log::{debug, error, info, warn};
use pin_project_lite::pin_project;
use rpki::rtr::client::{Client, PayloadError, PayloadTarget, PayloadUpdate};
use rpki::rtr::payload::{Action, Payload, Timing};
use rpki::rtr::state::{Serial, State};
use rpki::rtr::state::State;
use serde::Deserialize;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio::net::TcpStream;
@@ -255,9 +255,6 @@ struct RtrClient<Connect> {
/// Our gate status.
status: GateStatus,

/// Our current serial.
serial: Serial,

/// The unit’s metrics.
metrics: Arc<RtrMetrics>,
}
@@ -269,7 +266,6 @@ impl<Connect> RtrClient<Connect> {
connect,
retry,
status: Default::default(),
serial: Serial::default(),
metrics,
}
}
@@ -343,7 +339,6 @@ where
}
};
if let Some(update) = update {
this.serial = update.serial();
client.target_mut().current = update.set().clone();
gate.update_data(update).await;
}
@@ -414,14 +409,13 @@ where
async fn update(
&mut self, client: &mut Client<Socket, Target>, gate: &mut Gate
) -> Result<Result<Option<payload::Update>, io::Error>, Terminated> {
let next_serial = self.serial.add(1);
let update_fut = async {
let update = client.update().await?;
let state = client.state();
if update.is_definitely_empty() {
return Ok((state, None))
}
match update.into_update(next_serial) {
match update.into_update() {
Ok(res) => Ok((state, Some(res))),
Err(err) => {
client.send_error(err).await?;
@@ -571,20 +565,15 @@ impl TargetUpdate {
/// Converts the target update into a payload update.
///
/// This will fail if the diff of a serial update doesn’t apply cleanly.
///
/// Upon success, the returned payload update will have the given serial
/// number.
fn into_update(
self, serial: Serial
) -> Result<payload::Update, PayloadError> {
fn into_update(self) -> Result<payload::Update, PayloadError> {
match self {
TargetUpdate::Reset(pack) => {
Ok(payload::Update::new(serial, pack.finalize().into(), None))
Ok(payload::Update::new(pack.finalize().into(), None))
}
TargetUpdate::Serial { set, diff } => {
let diff = diff.finalize();
let set = diff.apply(&set)?;
Ok(payload::Update::new(serial, set, Some(diff)))
Ok(payload::Update::new(set, Some(diff)))
}
}
}
3 changes: 1 addition & 2 deletions src/units/slurm.rs
Original file line number Diff line number Diff line change
@@ -121,7 +121,6 @@ impl ExceptionSet {
}

fn apply(&self, unit: &str, update: payload::Update) -> payload::Update {
let serial = update.serial();
let mut set = update.into_set();

for (path, file) in
@@ -131,7 +130,7 @@ impl ExceptionSet {

}

payload::Update::new(serial, set, None)
payload::Update::new(set, None)
}

async fn notified(&self) {