Skip to content

Commit

Permalink
Made most of the daemon generic in algorithm.
Browse files Browse the repository at this point in the history
  • Loading branch information
davidv1992 authored and rnijveld committed Aug 15, 2024
1 parent 9702b38 commit b482cc9
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 55 deletions.
2 changes: 1 addition & 1 deletion ntp-proto/src/algorithm/kalman/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ impl<C: NtpClock, SourceId: Hash + Eq + Copy + Debug> KalmanClockController<C, S
}
}

impl<C: NtpClock, SourceId: Hash + Eq + Copy + Debug> TimeSyncController
impl<C: NtpClock, SourceId: Hash + Eq + Copy + Debug + Send + 'static> TimeSyncController
for KalmanClockController<C, SourceId>
{
type Clock = C;
Expand Down
4 changes: 3 additions & 1 deletion ntp-proto/src/algorithm/kalman/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -704,7 +704,9 @@ impl<SourceId: Copy> KalmanSourceController<SourceId> {
}
}

impl<SourceId: std::fmt::Debug + Copy> SourceController for KalmanSourceController<SourceId> {
impl<SourceId: std::fmt::Debug + Copy + Send + 'static> SourceController
for KalmanSourceController<SourceId>
{
type ControllerMessage = KalmanControllerMessage;
type SourceMessage = KalmanSourceMessage<SourceId>;

Expand Down
14 changes: 7 additions & 7 deletions ntp-proto/src/algorithm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@ impl<SourceId, ControllerMessage> Default for StateUpdate<SourceId, ControllerMe
}
}

pub trait TimeSyncController: Sized {
pub trait TimeSyncController: Sized + Send + 'static {
type Clock: NtpClock;
type SourceId;
type AlgorithmConfig: Debug + Copy + DeserializeOwned;
type ControllerMessage: Debug + Clone;
type SourceMessage: Debug + Clone;
type AlgorithmConfig: Debug + Copy + DeserializeOwned + Send;
type ControllerMessage: Debug + Clone + Send + 'static;
type SourceMessage: Debug + Clone + Send + 'static;
type SourceController: SourceController<
ControllerMessage = Self::ControllerMessage,
SourceMessage = Self::SourceMessage,
Expand Down Expand Up @@ -90,9 +90,9 @@ pub trait TimeSyncController: Sized {
fn time_update(&mut self) -> StateUpdate<Self::SourceId, Self::ControllerMessage>;
}

pub trait SourceController: Sized {
type ControllerMessage: Debug + Clone;
type SourceMessage: Debug + Clone;
pub trait SourceController: Sized + Send + 'static {
type ControllerMessage: Debug + Clone + Send + 'static;
type SourceMessage: Debug + Clone + Send + 'static;

fn handle_message(&mut self, message: Self::ControllerMessage);

Expand Down
3 changes: 2 additions & 1 deletion ntp-proto/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ pub(crate) mod exitcode {
mod exports {
pub use super::algorithm::{
AlgorithmConfig, KalmanClockController, KalmanControllerMessage, KalmanSourceController,
KalmanSourceMessage, ObservableSourceTimedata, StateUpdate, TimeSyncController,
KalmanSourceMessage, ObservableSourceTimedata, SourceController, StateUpdate,
TimeSyncController,
};
pub use super::clock::NtpClock;
pub use super::config::{SourceDefaultsConfig, StepThreshold, SynchronizationConfig};
Expand Down
6 changes: 4 additions & 2 deletions ntpd/src/daemon/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::{error::Error, path::PathBuf};

use ::tracing::info;
pub use config::Config;
use ntp_proto::KalmanClockController;
pub use observer::ObservableState;
pub use system::spawn;
use tracing_subscriber::util::SubscriberInitExt;
Expand Down Expand Up @@ -100,8 +101,9 @@ async fn run(options: NtpDaemonOptions) -> Result<(), Box<dyn Error>> {
let clock_config = config::ClockConfig::default();

::tracing::debug!("Configuration loaded, spawning daemon jobs");
let (main_loop_handle, channels) = spawn(
config.synchronization,
let (main_loop_handle, channels) = spawn::<KalmanClockController<_, _>>(
config.synchronization.synchronization_base,
config.synchronization.algorithm,
config.source_defaults,
clock_config,
&config.sources,
Expand Down
48 changes: 24 additions & 24 deletions ntpd/src/daemon/ntp_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@ use std::{
};

use ntp_proto::{
KalmanControllerMessage, KalmanSourceController, KalmanSourceMessage, NtpClock, NtpInstant,
NtpSource, NtpSourceActionIterator, NtpSourceUpdate, NtpTimestamp, ObservableSourceState,
SystemSourceUpdate,
NtpClock, NtpInstant, NtpSource, NtpSourceActionIterator, NtpSourceUpdate, NtpTimestamp,
ObservableSourceState, SourceController, SystemSourceUpdate,
};
#[cfg(target_os = "linux")]
use timestamped_socket::socket::open_interface_udp;
Expand All @@ -32,27 +31,27 @@ impl Wait for Sleep {

#[derive(Debug, Clone)]
#[allow(clippy::large_enum_variant)]
pub enum MsgForSystem {
pub enum MsgForSystem<SourceMessage> {
/// Received a Kiss-o'-Death and must demobilize
MustDemobilize(SourceId),
/// Experienced a network issue and must be restarted
NetworkIssue(SourceId),
/// Source is unreachable, and should be restarted with new resolved addr.
Unreachable(SourceId),
/// Update from source
SourceUpdate(SourceId, NtpSourceUpdate<KalmanSourceMessage<SourceId>>),
SourceUpdate(SourceId, NtpSourceUpdate<SourceMessage>),
}

#[derive(Debug)]
pub struct SourceChannels {
pub msg_for_system_sender: tokio::sync::mpsc::Sender<MsgForSystem>,
pub struct SourceChannels<ControllerMessage, SourceMessage> {
pub msg_for_system_sender: tokio::sync::mpsc::Sender<MsgForSystem<SourceMessage>>,
pub system_update_receiver:
tokio::sync::broadcast::Receiver<SystemSourceUpdate<KalmanControllerMessage>>,
tokio::sync::broadcast::Receiver<SystemSourceUpdate<ControllerMessage>>,
pub source_snapshots:
Arc<std::sync::RwLock<HashMap<SourceId, ObservableSourceState<SourceId>>>>,
}

pub(crate) struct SourceTask<C: 'static + NtpClock + Send, T: Wait> {
pub(crate) struct SourceTask<C: 'static + NtpClock + Send, Controller: SourceController, T: Wait> {
_wait: PhantomData<T>,
index: SourceId,
clock: C,
Expand All @@ -61,9 +60,9 @@ pub(crate) struct SourceTask<C: 'static + NtpClock + Send, T: Wait> {
name: String,
source_addr: SocketAddr,
socket: Option<Socket<SocketAddr, Connected>>,
channels: SourceChannels,
channels: SourceChannels<Controller::ControllerMessage, Controller::SourceMessage>,

source: NtpSource<KalmanSourceController<SourceId>>,
source: NtpSource<Controller>,

// we don't store the real origin timestamp in the packet, because that would leak our
// system time to the network (and could make attacks easier). So instead there is some
Expand All @@ -79,7 +78,7 @@ enum SocketResult {
Abort,
}

impl<C, T> SourceTask<C, T>
impl<C, Controller: SourceController, T> SourceTask<C, Controller, T>
where
C: 'static + NtpClock + Send + Sync,
T: Wait,
Expand Down Expand Up @@ -115,18 +114,18 @@ where
let mut buf = [0_u8; 1024];

#[allow(clippy::large_enum_variant)]
enum SelectResult {
enum SelectResult<Controller: SourceController> {
Timer,
Recv(Result<RecvResult<SocketAddr>, std::io::Error>),
SystemUpdate(
Result<
SystemSourceUpdate<KalmanControllerMessage>,
SystemSourceUpdate<Controller::ControllerMessage>,
tokio::sync::broadcast::error::RecvError,
>,
),
}

let selected = tokio::select! {
let selected: SelectResult<Controller> = tokio::select! {
() = &mut poll_wait => {
SelectResult::Timer
},
Expand Down Expand Up @@ -318,22 +317,22 @@ where
}
}

impl<C> SourceTask<C, Sleep>
impl<C, Controller: SourceController> SourceTask<C, Controller, Sleep>
where
C: 'static + NtpClock + Send + Sync,
{
#[allow(clippy::too_many_arguments)]
#[instrument(skip(clock, channels))]
#[instrument(skip(clock, channels, source))]
pub fn spawn(
index: SourceId,
name: String,
source_addr: SocketAddr,
interface: Option<InterfaceName>,
clock: C,
timestamp_mode: TimestampMode,
channels: SourceChannels,
source: NtpSource<KalmanSourceController<SourceId>>,
initial_actions: NtpSourceActionIterator<KalmanSourceMessage<SourceId>>,
channels: SourceChannels<Controller::ControllerMessage, Controller::SourceMessage>,
source: NtpSource<Controller>,
initial_actions: NtpSourceActionIterator<Controller::SourceMessage>,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(
(async move {
Expand Down Expand Up @@ -444,8 +443,9 @@ mod tests {
};

use ntp_proto::{
AlgorithmConfig, KalmanClockController, NoCipher, NtpDuration, NtpLeapIndicator, NtpPacket,
ProtocolVersion, SourceDefaultsConfig, SynchronizationConfig, SystemSnapshot, TimeSnapshot,
AlgorithmConfig, KalmanClockController, KalmanControllerMessage, KalmanSourceController,
KalmanSourceMessage, NoCipher, NtpDuration, NtpLeapIndicator, NtpPacket, ProtocolVersion,
SourceDefaultsConfig, SynchronizationConfig, SystemSnapshot, TimeSnapshot,
};
use timestamped_socket::socket::{open_ip, GeneralTimestampMode, Open};
use tokio::sync::{broadcast, mpsc};
Expand Down Expand Up @@ -574,9 +574,9 @@ mod tests {
async fn test_startup<T: Wait>(
port_base: u16,
) -> (
SourceTask<TestClock, T>,
SourceTask<TestClock, KalmanSourceController<SourceId>, T>,
Socket<SocketAddr, Open>,
mpsc::Receiver<MsgForSystem>,
mpsc::Receiver<MsgForSystem<KalmanSourceMessage<SourceId>>>,
broadcast::Sender<SystemSourceUpdate<KalmanControllerMessage>>,
) {
// Note: Ports must be unique among tests to deal with parallelism, hence
Expand Down
49 changes: 30 additions & 19 deletions ntpd/src/daemon/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@ use crate::daemon::spawn::spawner_task;
#[cfg(feature = "unstable_nts-pool")]
use super::spawn::nts_pool::NtsPoolSpawner;
use super::{
config::{
ClockConfig, DaemonSynchronizationConfig, NtpSourceConfig, ServerConfig, TimestampMode,
},
clock::NtpClockWrapper,
config::{ClockConfig, NtpSourceConfig, ServerConfig, TimestampMode},
ntp_source::{MsgForSystem, SourceChannels, SourceTask, Wait},
server::{ServerStats, ServerTask},
spawn::{
Expand All @@ -24,8 +23,8 @@ use std::{
};

use ntp_proto::{
KalmanClockController, KalmanControllerMessage, KeySet, NtpClock, ObservableSourceState,
SourceDefaultsConfig, System, SystemActionIterator, SystemSnapshot, SystemSourceUpdate,
KeySet, NtpClock, ObservableSourceState, SourceDefaultsConfig, SynchronizationConfig, System,
SystemActionIterator, SystemSnapshot, SystemSourceUpdate, TimeSyncController,
};
use timestamped_socket::interface::InterfaceName;
use tokio::{sync::mpsc, task::JoinHandle};
Expand Down Expand Up @@ -86,8 +85,9 @@ pub struct DaemonChannels {
}

/// Spawn the NTP daemon
pub async fn spawn(
synchronization_config: DaemonSynchronizationConfig,
pub async fn spawn<Controller: TimeSyncController<Clock = NtpClockWrapper, SourceId = SourceId>>(
synchronization_config: SynchronizationConfig,
algorithm_config: Controller::AlgorithmConfig,
source_defaults_config: SourceDefaultsConfig,
clock_config: ClockConfig,
source_configs: &[NtpSourceConfig],
Expand All @@ -96,11 +96,12 @@ pub async fn spawn(
) -> std::io::Result<(JoinHandle<std::io::Result<()>>, DaemonChannels)> {
let ip_list = super::local_ip_provider::spawn()?;

let (mut system, channels) = SystemTask::new(
let (mut system, channels) = SystemTask::<_, Controller, _>::new(
clock_config.clock,
clock_config.interface,
clock_config.timestamp_mode,
synchronization_config,
algorithm_config,
source_defaults_config,
keyset,
ip_list,
Expand Down Expand Up @@ -164,20 +165,24 @@ struct SystemSpawnerData {
notify_tx: mpsc::Sender<SystemEvent>,
}

struct SystemTask<C: NtpClock, T: Wait> {
struct SystemTask<
C: NtpClock,
Controller: TimeSyncController<SourceId = SourceId, Clock = C>,
T: Wait,
> {
_wait: PhantomData<SingleshotSleep<T>>,
system: System<SourceId, KalmanClockController<C, SourceId>>,
system: System<SourceId, Controller>,

system_snapshot_sender: tokio::sync::watch::Sender<SystemSnapshot>,
system_update_sender:
tokio::sync::broadcast::Sender<SystemSourceUpdate<KalmanControllerMessage>>,
tokio::sync::broadcast::Sender<SystemSourceUpdate<Controller::ControllerMessage>>,
source_snapshots: Arc<std::sync::RwLock<HashMap<SourceId, ObservableSourceState<SourceId>>>>,
server_data_sender: tokio::sync::watch::Sender<Vec<ServerData>>,
keyset: tokio::sync::watch::Receiver<Arc<KeySet>>,
ip_list: tokio::sync::watch::Receiver<Arc<[IpAddr]>>,

msg_for_system_rx: mpsc::Receiver<MsgForSystem>,
msg_for_system_tx: mpsc::Sender<MsgForSystem>,
msg_for_system_rx: mpsc::Receiver<MsgForSystem<Controller::SourceMessage>>,
msg_for_system_tx: mpsc::Sender<MsgForSystem<Controller::SourceMessage>>,
spawn_tx: mpsc::Sender<SpawnEvent>,
spawn_rx: mpsc::Receiver<SpawnEvent>,

Expand All @@ -195,23 +200,29 @@ struct SystemTask<C: NtpClock, T: Wait> {
interface: Option<InterfaceName>,
}

impl<C: NtpClock + Sync, T: Wait> SystemTask<C, T> {
impl<
C: NtpClock + Sync,
Controller: TimeSyncController<Clock = C, SourceId = SourceId>,
T: Wait,
> SystemTask<C, Controller, T>
{
#[allow(clippy::too_many_arguments)]
fn new(
clock: C,
interface: Option<InterfaceName>,
timestamp_mode: TimestampMode,
synchronization_config: DaemonSynchronizationConfig,
synchronization_config: SynchronizationConfig,
algorithm_config: Controller::AlgorithmConfig,
source_defaults_config: SourceDefaultsConfig,
keyset: tokio::sync::watch::Receiver<Arc<KeySet>>,
ip_list: tokio::sync::watch::Receiver<Arc<[IpAddr]>>,
have_sources: bool,
) -> (Self, DaemonChannels) {
let Ok(mut system) = System::new(
clock.clone(),
synchronization_config.synchronization_base,
synchronization_config,
source_defaults_config,
synchronization_config.algorithm,
algorithm_config,
ip_list.borrow().clone(),
) else {
tracing::error!("Could not start system");
Expand Down Expand Up @@ -327,7 +338,7 @@ impl<C: NtpClock + Sync, T: Wait> SystemTask<C, T> {

fn handle_state_update(
&mut self,
actions: SystemActionIterator<KalmanControllerMessage>,
actions: SystemActionIterator<Controller::ControllerMessage>,
wait: &mut Pin<&mut SingleshotSleep<T>>,
) {
// Don't care if there is no receiver.
Expand All @@ -349,7 +360,7 @@ impl<C: NtpClock + Sync, T: Wait> SystemTask<C, T> {

async fn handle_source_update(
&mut self,
msg: MsgForSystem,
msg: MsgForSystem<Controller::SourceMessage>,
wait: &mut Pin<&mut SingleshotSleep<T>>,
) -> std::io::Result<()> {
tracing::debug!(?msg, "updating source");
Expand Down

0 comments on commit b482cc9

Please sign in to comment.