Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Partition placement based on nodeset selector #2545

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 41 additions & 40 deletions crates/admin/src/cluster_controller/logs_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
// by the Apache License, Version 2.0.

use std::collections::HashMap;
use std::iter;
use std::ops::Deref;
use std::sync::Arc;
use std::time::Duration;
Expand All @@ -19,11 +18,11 @@ use rand::prelude::IteratorRandom;
use rand::thread_rng;
use tokio::sync::Semaphore;
use tokio::task::JoinSet;
use tracing::{debug, error, trace, trace_span, Instrument};
use tracing::{debug, error, info, trace, trace_span, Instrument};

use restate_bifrost::{Bifrost, Error as BifrostError};
use restate_core::metadata_store::{Precondition, WriteError};
use restate_core::{Metadata, MetadataWriter, ShutdownError, TaskCenterFutureExt};
use restate_core::{Metadata, MetadataWriter, ShutdownError, TargetVersion, TaskCenterFutureExt};
use restate_types::errors::GenericError;
use restate_types::identifiers::PartitionId;
use restate_types::live::Pinned;
Expand All @@ -37,7 +36,7 @@ use restate_types::metadata_store::keys::BIFROST_CONFIG_KEY;
use restate_types::nodes_config::{NodeConfig, NodesConfiguration, Role, StorageState};
use restate_types::partition_table::PartitionTable;
use restate_types::replicated_loglet::{EffectiveNodeSet, ReplicatedLogletParams};
use restate_types::replication::{NodeSetSelector, NodeSetSelectorOptions};
use restate_types::replication::{NodeSet, NodeSetSelector, NodeSetSelectorOptions};
use restate_types::retries::{RetryIter, RetryPolicy};
use restate_types::{logs, GenerationalNodeId, NodeId, PlainNodeId, Version, Versioned};

Expand Down Expand Up @@ -418,7 +417,7 @@ pub fn build_new_replicated_loglet_configuration(
debug_assert!(nodeset.len() >= replication.num_copies() as usize);
if replication.num_copies() > 1 && nodeset.len() == replication.num_copies() as usize {
warn!(
?log_id,
%log_id,
%replication,
generated_nodeset_size = nodeset.len(),
"The number of writeable log-servers is too small for the configured \
Expand All @@ -433,7 +432,7 @@ pub fn build_new_replicated_loglet_configuration(
})
}
Err(err) => {
warn!(?log_id, "Cannot select node-set for log: {err}");
warn!(%log_id, "Cannot select node-set for log: {err}");
None
}
}
Expand Down Expand Up @@ -515,6 +514,18 @@ impl LogletConfiguration {
return false;
};

if params.replication != config.replication_property {
// replication property has changed, we need to reconfigure.
debug!(
%log_id,
loglet_id = ?params.loglet_id,
current_replication = %params.replication,
new_replication = %config.replication_property,
"Replicated loglet default replication has can changed, will attempt reconfiguration"
);
return true;
}

// todo 1: This is an over-simplifying check, ideally we'd want to see if the new nodeset
// improves our safety-margin (fault-tolerance) or not by running nodeset checker with higher
// replication on every scope, or any other reasonable metric that let us decide
Expand Down Expand Up @@ -598,15 +609,13 @@ impl LogletConfiguration {
}
}

fn node_set_iter(&self) -> impl Iterator<Item = &PlainNodeId> {
fn nodeset(&self) -> Option<&NodeSet> {
match self {
#[cfg(feature = "replicated-loglet")]
LogletConfiguration::Replicated(configuration) => {
itertools::Either::Left(configuration.nodeset.iter())
}
LogletConfiguration::Local(_) => itertools::Either::Right(iter::empty()),
LogletConfiguration::Replicated(configuration) => Some(&configuration.nodeset),
LogletConfiguration::Local(_) => None,
#[cfg(any(test, feature = "memory-loglet"))]
LogletConfiguration::Memory(_) => itertools::Either::Right(iter::empty()),
LogletConfiguration::Memory(_) => None,
}
}

Expand Down Expand Up @@ -756,7 +765,7 @@ impl LogsControllerInner {
)?;

if let Some(logs) = builder.build_if_modified() {
debug!("Proposing new logs configuration: {logs:?}");
debug!("Proposing new log configuration: {logs:?}");
self.logs_write_in_progress = Some(logs.version());
let logs = Arc::new(logs);
effects.push(Effect::WriteLogs {
Expand Down Expand Up @@ -1176,7 +1185,7 @@ impl LogsController {
self.async_operations.spawn(async move {
if let Some(debounce) = &mut debounce {
let delay = debounce.next().unwrap_or(FALLBACK_MAX_RETRY_DELAY);
debug!(?delay, %previous_version, "Wait before attempting to write logs");
trace!(?delay, %previous_version, "Wait before attempting to write logs");
tokio::time::sleep(delay).await;
}

Expand All @@ -1190,30 +1199,23 @@ impl LogsController {
{
return match err {
WriteError::FailedPrecondition(err) => {
debug!("Detected a concurrent modification of logs. Fetching the latest logs now. {err}");
// There was a concurrent modification of the logs. Fetch the latest version.
match metadata_writer.metadata_store_client()
.get::<Logs>(BIFROST_CONFIG_KEY.clone())
.await
{
Ok(result) => {
let logs = result.expect("should be present");
// we are only failing if we are shutting down
let _ = metadata_writer.update(Arc::new(logs)).await;
Event::NewLogs
}
Err(err) => {
debug!("failed committing new logs: {err}");
Event::WriteLogsFailed {
logs,
previous_version,
debounce,
}
}
}
debug!(
?err,
"Detected a concurrent modification of the log chain. Fetching latest"
);
// Will not request metadata if we already know about a newer log metadata
// without an direct get from metadata store
let _ = Metadata::current().sync(
restate_core::MetadataKind::Logs,
TargetVersion::Version(previous_version.next())
).await;
Event::NewLogs
}
err => {
debug!("failed writing new logs: {err}");
info!(
%err,
"Failed writing new log chain to metadata store, will retry later"
);
Event::WriteLogsFailed {
logs,
previous_version,
Expand All @@ -1225,7 +1227,6 @@ impl LogsController {

// we don't update metadata with the new logs here because it would cause a race
// condition with the outer on_logs_updated signal

let version = logs.version();
Event::WriteLogsSucceeded(version)
}.in_current_tc());
Expand Down Expand Up @@ -1308,7 +1309,7 @@ pub struct LogsBasedPartitionProcessorPlacementHints<'a> {
impl<'a> scheduler::PartitionProcessorPlacementHints
for LogsBasedPartitionProcessorPlacementHints<'a>
{
fn preferred_nodes(&self, partition_id: &PartitionId) -> impl Iterator<Item = &PlainNodeId> {
fn preferred_nodes(&self, partition_id: &PartitionId) -> Option<&NodeSet> {
let log_id = LogId::from(*partition_id);

self.logs_controller
Expand All @@ -1318,12 +1319,12 @@ impl<'a> scheduler::PartitionProcessorPlacementHints
.and_then(|log_state| match log_state {
LogState::Available { configuration, .. } => configuration
.as_ref()
.map(|configuration| itertools::Either::Left(configuration.node_set_iter())),
.map(|configuration| configuration.nodeset()),
LogState::Sealing { .. }
| LogState::Sealed { .. }
| LogState::Provisioning { .. } => None,
})
.unwrap_or_else(|| itertools::Either::Right(iter::empty()))
.unwrap_or_default()
}

fn preferred_leader(&self, partition_id: &PartitionId) -> Option<PlainNodeId> {
Expand Down
Loading
Loading