Skip to content

Commit

Permalink
Fallback to V4 should V5 not work for some reason.
Browse files Browse the repository at this point in the history
  • Loading branch information
davidv1992 authored and rnijveld committed Nov 2, 2024
1 parent 6a6b18c commit 3022e5b
Showing 1 changed file with 144 additions and 5 deletions.
149 changes: 144 additions & 5 deletions ntp-proto/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ use tracing::{debug, trace, warn};
const MAX_STRATUM: u8 = 16;
const POLL_WINDOW: std::time::Duration = std::time::Duration::from_secs(5);
const STARTUP_TRIES_THRESHOLD: usize = 3;
#[cfg(feature = "ntpv5")]
const AFTER_UPGRADE_TRIES_THRESHOLD: u32 = 2;

pub struct SourceNtsData {
pub(crate) cookies: CookieStash,
Expand Down Expand Up @@ -301,6 +303,8 @@ pub enum ProtocolVersion {
tries_left: u8,
},
#[cfg(feature = "ntpv5")]
UpgradedToV5,
#[cfg(feature = "ntpv5")]
V5,
}

Expand All @@ -311,7 +315,7 @@ impl ProtocolVersion {
#[cfg(feature = "ntpv5")]
ProtocolVersion::V4UpgradingToV5 { .. } => incoming_version == 4,
#[cfg(feature = "ntpv5")]
ProtocolVersion::V5 => incoming_version == 5,
ProtocolVersion::UpgradedToV5 | ProtocolVersion::V5 => incoming_version == 5,
}
}
}
Expand Down Expand Up @@ -508,6 +512,14 @@ impl<Controller: SourceController> NtpSource<Controller> {
return actions!(NtpSourceAction::Reset);
}

#[cfg(feature = "ntpv5")]
if matches!(self.protocol_version, ProtocolVersion::UpgradedToV5)
&& self.reach.unanswered_polls() >= AFTER_UPGRADE_TRIES_THRESHOLD
{
// For some reason V5 communication isn't working, even though we and the server support it. Fall back.
self.protocol_version = ProtocolVersion::V4;
}

self.reach.poll();
self.tries = self.tries.saturating_add(1);

Expand All @@ -530,7 +542,9 @@ impl<Controller: SourceController> NtpSource<Controller> {
NtpPacket::nts_poll_message(&cookie, new_cookies, poll_interval)
}
#[cfg(feature = "ntpv5")]
ProtocolVersion::V4UpgradingToV5 { .. } | ProtocolVersion::V5 => {
ProtocolVersion::V4UpgradingToV5 { .. }
| ProtocolVersion::V5
| ProtocolVersion::UpgradedToV5 => {
NtpPacket::nts_poll_message_v5(&cookie, new_cookies, poll_interval)
}
}
Expand All @@ -542,7 +556,9 @@ impl<Controller: SourceController> NtpSource<Controller> {
NtpPacket::poll_message_upgrade_request(poll_interval)
}
#[cfg(feature = "ntpv5")]
ProtocolVersion::V5 => NtpPacket::poll_message_v5(poll_interval),
ProtocolVersion::UpgradedToV5 | ProtocolVersion::V5 => {
NtpPacket::poll_message_v5(poll_interval)
}
},
};
self.current_request_identifier = Some((identifier, NtpInstant::now() + POLL_WINDOW));
Expand Down Expand Up @@ -637,14 +653,16 @@ impl<Controller: SourceController> NtpSource<Controller> {
let tries_left = tries_left.saturating_sub(1);
if message.is_upgrade() {
debug!("Received a valid upgrade response, switching to NTPv5!");
self.protocol_version = ProtocolVersion::V5;
self.protocol_version = ProtocolVersion::UpgradedToV5;
} else if tries_left == 0 {
debug!("Server does not support NTPv5, stopping the upgrade process");
self.protocol_version = ProtocolVersion::V4;
} else {
debug!(tries_left, "Server did not yet respond with upgrade code");
self.protocol_version = ProtocolVersion::V4UpgradingToV5 { tries_left };
};
} else if let ProtocolVersion::UpgradedToV5 = self.protocol_version {
self.protocol_version = ProtocolVersion::V5;
}
}

Expand Down Expand Up @@ -1493,7 +1511,10 @@ mod test {
}

// We should have received a upgrade response and updated to NTPv5
assert!(matches!(source.protocol_version, ProtocolVersion::V5));
assert!(matches!(
source.protocol_version,
ProtocolVersion::UpgradedToV5
));

let actions = source.handle_timer();
let mut outgoingbuf = None;
Expand All @@ -1509,6 +1530,124 @@ mod test {
let poll = outgoingbuf.unwrap();
let (poll, _) = NtpPacket::deserialize(&poll, &NoCipher).unwrap();
assert_eq!(poll.version(), 5);

let response = NtpPacket::timestamp_response(
&SystemSnapshot::default(),
poll,
NtpTimestamp::default(),
&clock,
);
let response = response
.serialize_without_encryption_vec(Some(poll_len))
.unwrap();

let actions = source.handle_incoming(
&response,
NtpInstant::now(),
NtpTimestamp::default(),
NtpTimestamp::default(),
);
for action in actions {
assert!(!matches!(
action,
NtpSourceAction::Demobilize | NtpSourceAction::Reset
));
}

// NtpV5 is confirmed to work now
assert!(matches!(source.protocol_version, ProtocolVersion::V5));
}

#[cfg(feature = "ntpv5")]
#[test]
fn upgrade_state_machine_does_fallback_after_upgrade() {
let mut source = NtpSource::test_ntp_source(NoopController);
let clock = TestClock {};

assert!(matches!(
source.protocol_version,
ProtocolVersion::V4UpgradingToV5 { .. }
));

let actions = source.handle_timer();
let mut outgoingbuf = None;
for action in actions {
assert!(!matches!(
action,
NtpSourceAction::Reset | NtpSourceAction::Demobilize
));
if let NtpSourceAction::Send(buf) = action {
outgoingbuf = Some(buf);
}
}
let poll = outgoingbuf.unwrap();

let poll_len = poll.len();
let (poll, _) = NtpPacket::deserialize(&poll, &NoCipher).unwrap();
assert_eq!(poll.version(), 4);
assert!(poll.is_upgrade());

let response = NtpPacket::timestamp_response(
&SystemSnapshot::default(),
poll,
NtpTimestamp::default(),
&clock,
);
let response = response
.serialize_without_encryption_vec(Some(poll_len))
.unwrap();

let actions = source.handle_incoming(
&response,
NtpInstant::now(),
NtpTimestamp::default(),
NtpTimestamp::default(),
);
for action in actions {
assert!(!matches!(
action,
NtpSourceAction::Demobilize | NtpSourceAction::Reset
));
}

// We should have received a upgrade response and updated to NTPv5
assert!(matches!(
source.protocol_version,
ProtocolVersion::UpgradedToV5
));

for _ in 0..2 {
let actions = source.handle_timer();
let mut outgoingbuf = None;
for action in actions {
assert!(!matches!(
action,
NtpSourceAction::Reset | NtpSourceAction::Demobilize
));
if let NtpSourceAction::Send(buf) = action {
outgoingbuf = Some(buf);
}
}
let poll = outgoingbuf.unwrap();
let (poll, _) = NtpPacket::deserialize(&poll, &NoCipher).unwrap();
assert_eq!(poll.version(), 5);
}

let actions = source.handle_timer();
let mut outgoingbuf = None;
for action in actions {
assert!(!matches!(
action,
NtpSourceAction::Reset | NtpSourceAction::Demobilize
));
if let NtpSourceAction::Send(buf) = action {
outgoingbuf = Some(buf);
}
}
let poll = outgoingbuf.unwrap();
let (poll, _) = NtpPacket::deserialize(&poll, &NoCipher).unwrap();
assert!(matches!(source.protocol_version, ProtocolVersion::V4));
assert_eq!(poll.version(), 4);
}

#[cfg(feature = "ntpv5")]
Expand Down

0 comments on commit 3022e5b

Please sign in to comment.