Skip to content

Commit

Permalink
Option to disable LogsController
Browse files Browse the repository at this point in the history
Summary:
Support disabling logs controller via a configuration flag
  • Loading branch information
muhamadazmy committed Jan 22, 2025
1 parent 7377bf3 commit 81c6b57
Show file tree
Hide file tree
Showing 4 changed files with 180 additions and 43 deletions.
198 changes: 163 additions & 35 deletions crates/admin/src/cluster_controller/logs_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -928,7 +928,7 @@ pub struct LogTailUpdate {
pub type LogsTailUpdates = HashMap<LogId, LogTailUpdate>;

/// Runs the inner logs controller and processes the [`Effect`].
pub struct LogsController {
pub struct Controller {
effects: Option<Vec<Effect>>,
inner: LogsControllerInner,
bifrost: Bifrost,
Expand All @@ -937,7 +937,7 @@ pub struct LogsController {
find_logs_tail_semaphore: Arc<Semaphore>,
}

impl LogsController {
impl Controller {
pub fn new(
bifrost: Bifrost,
metadata_writer: MetadataWriter,
Expand Down Expand Up @@ -1217,51 +1217,179 @@ impl LogsController {
}

/// Placement hints for the [`scheduler::Scheduler`] based on the logs configuration
/// If logs controller is enabled, it uses the data of the logs controller to collect
/// the preferred nodes and leader for a partition. Otherwise
/// it uses the current known metadata instead.
#[derive(derive_more::From)]
pub struct LogsBasedPartitionProcessorPlacementHints<'a> {
logs_controller: &'a LogsController,
}

impl<'a> scheduler::PartitionProcessorPlacementHints
for LogsBasedPartitionProcessorPlacementHints<'a>
{
fn preferred_nodes(&self, partition_id: &PartitionId) -> impl Iterator<Item = &PlainNodeId> {
impl<'a> LogsBasedPartitionProcessorPlacementHints<'a> {
fn preferred_nodes_from_logs(
&self,
partition_id: &PartitionId,
) -> impl Iterator<Item = PlainNodeId> {
use itertools::Either;
let log_id = LogId::from(*partition_id);
let logs = Metadata::with_current(|m| m.logs_ref());
let Some(segment) = logs.chain(&log_id).map(|chain| chain.head()) else {
return Either::Right(iter::empty());
};

self.logs_controller
.inner
.logs_state
.get(&log_id)
.and_then(|log_state| match log_state {
LogState::Available { configuration, .. } => configuration
.as_ref()
.map(|configuration| itertools::Either::Left(configuration.node_set_iter())),
LogState::Sealing { .. }
| LogState::Sealed { .. }
| LogState::Provisioning { .. } => None,
})
.unwrap_or_else(|| itertools::Either::Right(iter::empty()))
match segment.config.kind {
ProviderKind::InMemory | ProviderKind::Local => itertools::Either::Right(iter::empty()),
ProviderKind::Replicated => {
let Ok(params) =
ReplicatedLogletParams::deserialize_from(segment.config.params.as_bytes())
else {
return itertools::Either::Right(iter::empty());
};

Either::Left(params.nodeset.into_iter())
}
}
}

fn preferred_leader(&self, partition_id: &PartitionId) -> Option<PlainNodeId> {
fn preferred_leader_from_logs(&self, partition_id: &PartitionId) -> Option<PlainNodeId> {
let log_id = LogId::from(*partition_id);
let logs = Metadata::with_current(|m| m.logs_ref());
let segment = logs.chain(&log_id).map(|chain| chain.head())?;

match segment.config.kind {
ProviderKind::InMemory | ProviderKind::Local => None,
ProviderKind::Replicated => {
let Ok(params) =
ReplicatedLogletParams::deserialize_from(segment.config.params.as_bytes())
else {
return None;
};

Some(params.sequencer.into())
}
}
}
}

self.logs_controller
.inner
.logs_state
.get(&log_id)
.and_then(|log_state| match log_state {
LogState::Available { configuration, .. } => {
configuration.as_ref().and_then(|configuration| {
configuration
.sequencer_node()
.map(GenerationalNodeId::as_plain)
impl<'a> scheduler::PartitionProcessorPlacementHints
for LogsBasedPartitionProcessorPlacementHints<'a>
{
fn preferred_nodes(&self, partition_id: &PartitionId) -> impl Iterator<Item = PlainNodeId> {
use itertools::Either;

match self.logs_controller {
LogsController::Disabled => Either::Right(self.preferred_nodes_from_logs(partition_id)),
LogsController::Enabled(ref controller) => {
let log_id = LogId::from(*partition_id);
controller
.inner
.logs_state
.get(&log_id)
.and_then(|log_state| match log_state {
LogState::Available { configuration, .. } => {
configuration.as_ref().map(|configuration| {
Either::Left(Either::Left(configuration.node_set_iter().cloned()))
})
}
LogState::Sealing { .. }
| LogState::Sealed { .. }
| LogState::Provisioning { .. } => None,
})
}
LogState::Sealing { .. }
| LogState::Sealed { .. }
| LogState::Provisioning { .. } => None,
})
.unwrap_or_else(|| Either::Left(Either::Right(iter::empty())))
}
}
}

fn preferred_leader(&self, partition_id: &PartitionId) -> Option<PlainNodeId> {
match self.logs_controller {
LogsController::Disabled => self.preferred_leader_from_logs(partition_id),
LogsController::Enabled(ref controller) => {
let log_id = LogId::from(*partition_id);

controller
.inner
.logs_state
.get(&log_id)
.and_then(|log_state| match log_state {
LogState::Available { configuration, .. } => {
configuration.as_ref().and_then(|configuration| {
configuration
.sequencer_node()
.map(GenerationalNodeId::as_plain)
})
}
LogState::Sealing { .. }
| LogState::Sealed { .. }
| LogState::Provisioning { .. } => None,
})
}
}
}
}

pub enum LogsController {
Disabled,
Enabled(Controller),
}

impl LogsController {
pub fn disabled() -> LogsController {
Self::Disabled
}

pub fn enabled(
bifrost: Bifrost,
metadata_writer: MetadataWriter,
) -> Result<LogsController, LogsControllerError> {
Ok(Self::Enabled(Controller::new(bifrost, metadata_writer)?))
}

pub fn on_observed_cluster_state_update(
&mut self,
nodes_config: &NodesConfiguration,
observed_cluster_state: &ObservedClusterState,
node_set_selector_hints: impl NodeSetSelectorHints,
) -> Result<(), anyhow::Error> {
match self {
Self::Disabled => Ok(()),
Self::Enabled(ref mut controller) => controller.on_observed_cluster_state_update(
nodes_config,
observed_cluster_state,
node_set_selector_hints,
),
}
}

pub fn find_logs_tail(&mut self) {
match self {
Self::Disabled => {}
Self::Enabled(ref mut controller) => {
controller.find_logs_tail();
}
}
}

pub async fn run_async_operations(&mut self) -> Result<Never> {
match self {
Self::Disabled => futures::future::pending().await,
Self::Enabled(ref mut controller) => controller.run_async_operations().await,
}
}

pub fn on_logs_update(&mut self, logs: Pinned<Logs>) -> Result<()> {
match self {
Self::Disabled => Ok(()),
Self::Enabled(ref mut controller) => controller.on_logs_update(logs),
}
}

pub fn on_partition_table_update(&mut self, partition_table: &PartitionTable) {
match self {
Self::Disabled => {}
Self::Enabled(ref mut controller) => {
controller.on_partition_table_update(partition_table)
}
}
}
}

Expand Down
11 changes: 5 additions & 6 deletions crates/admin/src/cluster_controller/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,13 @@ pub enum Error {
/// Placement hints for the [`Scheduler`]. The hints can specify which nodes should be chosen for
/// the partition processor placement and on which node the leader should run.
pub trait PartitionProcessorPlacementHints {
fn preferred_nodes(&self, partition_id: &PartitionId) -> impl Iterator<Item = &PlainNodeId>;
fn preferred_nodes(&self, partition_id: &PartitionId) -> impl Iterator<Item = PlainNodeId>;

fn preferred_leader(&self, partition_id: &PartitionId) -> Option<PlainNodeId>;
}

impl<T: PartitionProcessorPlacementHints> PartitionProcessorPlacementHints for &T {
fn preferred_nodes(&self, partition_id: &PartitionId) -> impl Iterator<Item = &PlainNodeId> {
fn preferred_nodes(&self, partition_id: &PartitionId) -> impl Iterator<Item = PlainNodeId> {
(*self).preferred_nodes(partition_id)
}

Expand Down Expand Up @@ -290,8 +290,7 @@ impl<T: TransportConnect> Scheduler<T> {
// todo: Implement cleverer strategies
// randomly choose from the preferred workers nodes first
let new_nodes = preferred_worker_nodes
.filter(|node_id| !target_state.node_set.contains(**node_id))
.cloned()
.filter(|node_id| !target_state.node_set.contains(*node_id))
.choose_multiple(
&mut rng,
replication_factor - target_state.node_set.len(),
Expand All @@ -314,7 +313,7 @@ impl<T: TransportConnect> Scheduler<T> {
}
} else if target_state.node_set.len() > replication_factor {
let preferred_worker_nodes: HashSet<PlainNodeId> =
preferred_worker_nodes.cloned().collect();
preferred_worker_nodes.collect();

// first remove the not preferred nodes
for node_id in target_state
Expand Down Expand Up @@ -588,7 +587,7 @@ mod tests {
fn preferred_nodes(
&self,
_partition_id: &PartitionId,
) -> impl Iterator<Item = &PlainNodeId> {
) -> impl Iterator<Item = PlainNodeId> {
iter::empty()
}

Expand Down
8 changes: 6 additions & 2 deletions crates/admin/src/cluster_controller/service/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,12 @@ where

let scheduler = Scheduler::new(service.metadata_writer.clone(), service.networking.clone());

let logs_controller =
LogsController::new(service.bifrost.clone(), service.metadata_writer.clone())?;
let logs_controller = if configuration.admin.disable_logs_controller {
info!("Log controller is disabled");
LogsController::disabled()
} else {
LogsController::enabled(service.bifrost.clone(), service.metadata_writer.clone())?
};

let (log_trim_interval, log_trim_threshold) =
create_log_trim_interval(&configuration.admin);
Expand Down
6 changes: 6 additions & 0 deletions crates/types/src/config/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ pub struct AdminOptions {

#[cfg(any(test, feature = "test-util"))]
pub disable_cluster_controller: bool,

/// # Disable logs controller
///
/// Default: false
pub disable_logs_controller: bool,
}

impl AdminOptions {
Expand Down Expand Up @@ -116,6 +121,7 @@ impl Default for AdminOptions {
#[cfg(any(test, feature = "test-util"))]
disable_cluster_controller: false,
log_tail_update_interval: Duration::from_secs(5 * 60).into(),
disable_logs_controller: false,
}
}
}

0 comments on commit 81c6b57

Please sign in to comment.