From 81c6b57b09dd18addf1b54fb201899f1df8a471d Mon Sep 17 00:00:00 2001 From: Muhamad Awad Date: Wed, 22 Jan 2025 17:10:42 +0100 Subject: [PATCH] Option to disable `LogsController` Summary: Support disabling logs controller via a configuration flag --- .../src/cluster_controller/logs_controller.rs | 198 ++++++++++++++---- .../admin/src/cluster_controller/scheduler.rs | 11 +- .../src/cluster_controller/service/state.rs | 8 +- crates/types/src/config/admin.rs | 6 + 4 files changed, 180 insertions(+), 43 deletions(-) diff --git a/crates/admin/src/cluster_controller/logs_controller.rs b/crates/admin/src/cluster_controller/logs_controller.rs index 4eeeb4efb8..9682a6f65e 100644 --- a/crates/admin/src/cluster_controller/logs_controller.rs +++ b/crates/admin/src/cluster_controller/logs_controller.rs @@ -928,7 +928,7 @@ pub struct LogTailUpdate { pub type LogsTailUpdates = HashMap; /// Runs the inner logs controller and processes the [`Effect`]. -pub struct LogsController { +pub struct Controller { effects: Option>, inner: LogsControllerInner, bifrost: Bifrost, @@ -937,7 +937,7 @@ pub struct LogsController { find_logs_tail_semaphore: Arc, } -impl LogsController { +impl Controller { pub fn new( bifrost: Bifrost, metadata_writer: MetadataWriter, @@ -1217,51 +1217,179 @@ impl LogsController { } /// Placement hints for the [`scheduler::Scheduler`] based on the logs configuration +/// If logs controller is enabled, it uses the data of the logs controller to collect +/// the preferred nodes and leader for a partition. Otherwise +/// it uses the current known metadata instead. #[derive(derive_more::From)] pub struct LogsBasedPartitionProcessorPlacementHints<'a> { logs_controller: &'a LogsController, } -impl<'a> scheduler::PartitionProcessorPlacementHints - for LogsBasedPartitionProcessorPlacementHints<'a> -{ - fn preferred_nodes(&self, partition_id: &PartitionId) -> impl Iterator { +impl<'a> LogsBasedPartitionProcessorPlacementHints<'a> { + fn preferred_nodes_from_logs( + &self, + partition_id: &PartitionId, + ) -> impl Iterator { + use itertools::Either; let log_id = LogId::from(*partition_id); + let logs = Metadata::with_current(|m| m.logs_ref()); + let Some(segment) = logs.chain(&log_id).map(|chain| chain.head()) else { + return Either::Right(iter::empty()); + }; - self.logs_controller - .inner - .logs_state - .get(&log_id) - .and_then(|log_state| match log_state { - LogState::Available { configuration, .. } => configuration - .as_ref() - .map(|configuration| itertools::Either::Left(configuration.node_set_iter())), - LogState::Sealing { .. } - | LogState::Sealed { .. } - | LogState::Provisioning { .. } => None, - }) - .unwrap_or_else(|| itertools::Either::Right(iter::empty())) + match segment.config.kind { + ProviderKind::InMemory | ProviderKind::Local => itertools::Either::Right(iter::empty()), + ProviderKind::Replicated => { + let Ok(params) = + ReplicatedLogletParams::deserialize_from(segment.config.params.as_bytes()) + else { + return itertools::Either::Right(iter::empty()); + }; + + Either::Left(params.nodeset.into_iter()) + } + } } - fn preferred_leader(&self, partition_id: &PartitionId) -> Option { + fn preferred_leader_from_logs(&self, partition_id: &PartitionId) -> Option { let log_id = LogId::from(*partition_id); + let logs = Metadata::with_current(|m| m.logs_ref()); + let segment = logs.chain(&log_id).map(|chain| chain.head())?; + + match segment.config.kind { + ProviderKind::InMemory | ProviderKind::Local => None, + ProviderKind::Replicated => { + let Ok(params) = + ReplicatedLogletParams::deserialize_from(segment.config.params.as_bytes()) + else { + return None; + }; + + Some(params.sequencer.into()) + } + } + } +} - self.logs_controller - .inner - .logs_state - .get(&log_id) - .and_then(|log_state| match log_state { - LogState::Available { configuration, .. } => { - configuration.as_ref().and_then(|configuration| { - configuration - .sequencer_node() - .map(GenerationalNodeId::as_plain) +impl<'a> scheduler::PartitionProcessorPlacementHints + for LogsBasedPartitionProcessorPlacementHints<'a> +{ + fn preferred_nodes(&self, partition_id: &PartitionId) -> impl Iterator { + use itertools::Either; + + match self.logs_controller { + LogsController::Disabled => Either::Right(self.preferred_nodes_from_logs(partition_id)), + LogsController::Enabled(ref controller) => { + let log_id = LogId::from(*partition_id); + controller + .inner + .logs_state + .get(&log_id) + .and_then(|log_state| match log_state { + LogState::Available { configuration, .. } => { + configuration.as_ref().map(|configuration| { + Either::Left(Either::Left(configuration.node_set_iter().cloned())) + }) + } + LogState::Sealing { .. } + | LogState::Sealed { .. } + | LogState::Provisioning { .. } => None, }) - } - LogState::Sealing { .. } - | LogState::Sealed { .. } - | LogState::Provisioning { .. } => None, - }) + .unwrap_or_else(|| Either::Left(Either::Right(iter::empty()))) + } + } + } + + fn preferred_leader(&self, partition_id: &PartitionId) -> Option { + match self.logs_controller { + LogsController::Disabled => self.preferred_leader_from_logs(partition_id), + LogsController::Enabled(ref controller) => { + let log_id = LogId::from(*partition_id); + + controller + .inner + .logs_state + .get(&log_id) + .and_then(|log_state| match log_state { + LogState::Available { configuration, .. } => { + configuration.as_ref().and_then(|configuration| { + configuration + .sequencer_node() + .map(GenerationalNodeId::as_plain) + }) + } + LogState::Sealing { .. } + | LogState::Sealed { .. } + | LogState::Provisioning { .. } => None, + }) + } + } + } +} + +pub enum LogsController { + Disabled, + Enabled(Controller), +} + +impl LogsController { + pub fn disabled() -> LogsController { + Self::Disabled + } + + pub fn enabled( + bifrost: Bifrost, + metadata_writer: MetadataWriter, + ) -> Result { + Ok(Self::Enabled(Controller::new(bifrost, metadata_writer)?)) + } + + pub fn on_observed_cluster_state_update( + &mut self, + nodes_config: &NodesConfiguration, + observed_cluster_state: &ObservedClusterState, + node_set_selector_hints: impl NodeSetSelectorHints, + ) -> Result<(), anyhow::Error> { + match self { + Self::Disabled => Ok(()), + Self::Enabled(ref mut controller) => controller.on_observed_cluster_state_update( + nodes_config, + observed_cluster_state, + node_set_selector_hints, + ), + } + } + + pub fn find_logs_tail(&mut self) { + match self { + Self::Disabled => {} + Self::Enabled(ref mut controller) => { + controller.find_logs_tail(); + } + } + } + + pub async fn run_async_operations(&mut self) -> Result { + match self { + Self::Disabled => futures::future::pending().await, + Self::Enabled(ref mut controller) => controller.run_async_operations().await, + } + } + + pub fn on_logs_update(&mut self, logs: Pinned) -> Result<()> { + match self { + Self::Disabled => Ok(()), + Self::Enabled(ref mut controller) => controller.on_logs_update(logs), + } + } + + pub fn on_partition_table_update(&mut self, partition_table: &PartitionTable) { + match self { + Self::Disabled => {} + Self::Enabled(ref mut controller) => { + controller.on_partition_table_update(partition_table) + } + } } } diff --git a/crates/admin/src/cluster_controller/scheduler.rs b/crates/admin/src/cluster_controller/scheduler.rs index ec2b97c616..159e4c8ee8 100644 --- a/crates/admin/src/cluster_controller/scheduler.rs +++ b/crates/admin/src/cluster_controller/scheduler.rs @@ -60,13 +60,13 @@ pub enum Error { /// Placement hints for the [`Scheduler`]. The hints can specify which nodes should be chosen for /// the partition processor placement and on which node the leader should run. pub trait PartitionProcessorPlacementHints { - fn preferred_nodes(&self, partition_id: &PartitionId) -> impl Iterator; + fn preferred_nodes(&self, partition_id: &PartitionId) -> impl Iterator; fn preferred_leader(&self, partition_id: &PartitionId) -> Option; } impl PartitionProcessorPlacementHints for &T { - fn preferred_nodes(&self, partition_id: &PartitionId) -> impl Iterator { + fn preferred_nodes(&self, partition_id: &PartitionId) -> impl Iterator { (*self).preferred_nodes(partition_id) } @@ -290,8 +290,7 @@ impl Scheduler { // todo: Implement cleverer strategies // randomly choose from the preferred workers nodes first let new_nodes = preferred_worker_nodes - .filter(|node_id| !target_state.node_set.contains(**node_id)) - .cloned() + .filter(|node_id| !target_state.node_set.contains(*node_id)) .choose_multiple( &mut rng, replication_factor - target_state.node_set.len(), @@ -314,7 +313,7 @@ impl Scheduler { } } else if target_state.node_set.len() > replication_factor { let preferred_worker_nodes: HashSet = - preferred_worker_nodes.cloned().collect(); + preferred_worker_nodes.collect(); // first remove the not preferred nodes for node_id in target_state @@ -588,7 +587,7 @@ mod tests { fn preferred_nodes( &self, _partition_id: &PartitionId, - ) -> impl Iterator { + ) -> impl Iterator { iter::empty() } diff --git a/crates/admin/src/cluster_controller/service/state.rs b/crates/admin/src/cluster_controller/service/state.rs index 7623910916..c94999e8fc 100644 --- a/crates/admin/src/cluster_controller/service/state.rs +++ b/crates/admin/src/cluster_controller/service/state.rs @@ -161,8 +161,12 @@ where let scheduler = Scheduler::new(service.metadata_writer.clone(), service.networking.clone()); - let logs_controller = - LogsController::new(service.bifrost.clone(), service.metadata_writer.clone())?; + let logs_controller = if configuration.admin.disable_logs_controller { + info!("Log controller is disabled"); + LogsController::disabled() + } else { + LogsController::enabled(service.bifrost.clone(), service.metadata_writer.clone())? + }; let (log_trim_interval, log_trim_threshold) = create_log_trim_interval(&configuration.admin); diff --git a/crates/types/src/config/admin.rs b/crates/types/src/config/admin.rs index e699609390..1586703f06 100644 --- a/crates/types/src/config/admin.rs +++ b/crates/types/src/config/admin.rs @@ -77,6 +77,11 @@ pub struct AdminOptions { #[cfg(any(test, feature = "test-util"))] pub disable_cluster_controller: bool, + + /// # Disable logs controller + /// + /// Default: false + pub disable_logs_controller: bool, } impl AdminOptions { @@ -116,6 +121,7 @@ impl Default for AdminOptions { #[cfg(any(test, feature = "test-util"))] disable_cluster_controller: false, log_tail_update_interval: Duration::from_secs(5 * 60).into(), + disable_logs_controller: false, } } }