Skip to content

Commit

Permalink
Force a partition leadership change to new node-3 after snapshot restore
Browse files Browse the repository at this point in the history
  • Loading branch information
pcholakov committed Jan 7, 2025
1 parent 60dbc4e commit 8fb6ab9
Show file tree
Hide file tree
Showing 3 changed files with 173 additions and 47 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ restate-core = { workspace = true, features = ["test-util"] }
restate-local-cluster-runner = { workspace = true }
restate-test-util = { workspace = true }
restate-types = { workspace = true, features = ["test-util"] }
restate-metadata-store = { workspace = true }
mock-service-endpoint = { workspace = true }

anyhow = { workspace = true }
Expand Down
218 changes: 171 additions & 47 deletions server/tests/trim_gap_handling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::collections::BTreeMap;
use std::net::SocketAddr;
use std::time::Duration;

Expand All @@ -22,29 +23,38 @@ use url::Url;

use restate_admin::cluster_controller::protobuf::cluster_ctrl_svc_client::ClusterCtrlSvcClient;
use restate_admin::cluster_controller::protobuf::{
ClusterStateRequest, CreatePartitionSnapshotRequest, TrimLogRequest,
ChainExtension, ClusterStateRequest, CreatePartitionSnapshotRequest, ListLogsRequest,
ListNodesRequest, SealAndExtendChainRequest, TrimLogRequest,
};
use restate_core::network::net_util::{
create_tonic_channel_from_advertised_address, CommonClientConnectionOptions,
};
use restate_local_cluster_runner::cluster::StartedCluster;
use restate_local_cluster_runner::{
cluster::Cluster,
node::{BinarySource, Node},
};
use restate_types::cluster_controller::SchedulingPlan;
use restate_types::config::{LogFormat, MetadataStoreClient};
use restate_types::logs::metadata::ProviderKind::Replicated;
use restate_types::identifiers::PartitionId;
use restate_types::logs::metadata::{Logs, ProviderKind};
use restate_types::logs::{LogId, LogletId};
use restate_types::metadata_store::keys::SCHEDULING_PLAN_KEY;
use restate_types::nodes_config::NodesConfiguration;
use restate_types::protobuf::cluster::node_state::State;
use restate_types::protobuf::cluster::RunMode;
use restate_types::replicated_loglet::ReplicatedLogletParams;
use restate_types::retries::RetryPolicy;
use restate_types::{config::Configuration, nodes_config::Role};
use restate_types::storage::StorageCodec;
use restate_types::{config::Configuration, nodes_config::Role, PlainNodeId};

mod common;

#[test_log::test(tokio::test)]
async fn fast_forward_over_trim_gap() -> googletest::Result<()> {
let mut base_config = Configuration::default();
base_config.common.bootstrap_num_partitions = 1.try_into()?;
base_config.bifrost.default_provider = Replicated;
base_config.bifrost.default_provider = ProviderKind::Replicated;
base_config.common.log_filter = "restate=debug,warn".to_owned();
base_config.common.log_format = LogFormat::Compact;

Expand Down Expand Up @@ -90,6 +100,10 @@ async fn fast_forward_over_trim_gap() -> googletest::Result<()> {

any_partition_active(&mut client, Duration::from_secs(5)).await?;

let metadata_config = MetadataStoreClient::Embedded {
address: cluster.nodes[0].node_address().clone(),
};

let addr: SocketAddr = "127.0.0.1:9080".parse()?;
tokio::spawn(async move {
info!("Starting mock service on http://{}", addr);
Expand Down Expand Up @@ -143,9 +157,7 @@ async fn fast_forward_over_trim_gap() -> googletest::Result<()> {
BinarySource::CargoTest,
enum_set!(Role::HttpIngress | Role::Worker),
);
*worker_3.metadata_store_client_mut() = MetadataStoreClient::Embedded {
address: cluster.nodes[0].node_address().clone(),
};
*worker_3.metadata_store_client_mut() = metadata_config.clone();

let mut trim_gap_encountered =
worker_3.lines("Partition processor stopped due to a log trim gap, and no snapshot repository is configured".parse()?);
Expand All @@ -168,14 +180,8 @@ async fn fast_forward_over_trim_gap() -> googletest::Result<()> {
BinarySource::CargoTest,
enum_set!(Role::HttpIngress | Role::Worker),
);
*worker_3.metadata_store_client_mut() = MetadataStoreClient::Embedded {
address: cluster.nodes[0].node_address().clone(),
};
*worker_3.metadata_store_client_mut() = metadata_config.clone();

let ingress_url = format!(
"http://{}/Counter/0/get",
worker_3.config().ingress.bind_address
);
let mut worker_3_imported_snapshot = worker_3.lines(
format!(
"Importing partition store snapshot.*{}",
Expand All @@ -192,15 +198,44 @@ async fn fast_forward_over_trim_gap() -> googletest::Result<()> {
.is_ok()
);

// todo(pavel): promote node 3 to be the leader for partition 0 and invoke the service again
// right now, all we are asserting is that the new node is applying newly appended log records
// Make the new node the sequencer for the partition log - this makes it the
// preferred candidate to become the next partition processor leader.
let extension =
Some(replicated_loglet_extension(&mut client, LogId::new(0), PlainNodeId::new(3)).await?);
let reconfigure_response = client
.seal_and_extend_chain(SealAndExtendChainRequest {
log_id: 0,
min_version: None,
extension,
})
.await?
.into_inner();
assert!(reconfigure_response.sealed_segment.is_some());

force_promote_partition_leader(
&cluster,
&mut client,
PartitionId::from(0),
PlainNodeId::new(3),
Duration::from_secs(10),
)
.await?;

// Verify that node 3 can process the incoming request while being the partition leader:
let ingress_url = format!(
"http://{}/Counter/0/get",
worker_3.config().ingress.bind_address
);
assert!(http_client
.post(ingress_url)
.send()
.await?
.status()
.is_success());
applied_lsn_converged(&mut client, Duration::from_secs(5), 3, 0).await?;
assert_eq!(
effective_partition_leader(&mut client, PartitionId::from(0)).await?,
Some(PlainNodeId::from(3))
);

worker_3.graceful_shutdown(Duration::from_secs(1)).await?;
cluster.graceful_shutdown(Duration::from_secs(1)).await?;
Expand Down Expand Up @@ -272,63 +307,152 @@ async fn trim_log(
Ok(())
}

async fn applied_lsn_converged(
async fn force_promote_partition_leader(
cluster: &StartedCluster,
client: &mut ClusterCtrlSvcClient<Channel>,
partition_id: PartitionId,
leader: PlainNodeId,
timeout: Duration,
expected_processors: usize,
partition_id: u32,
) -> googletest::Result<()> {
assert!(expected_processors > 0);
info!(
"Waiting for {} partition processors to converge on the same applied LSN",
expected_processors
);
let metadata_client = cluster.nodes[0]
.metadata_client()
.await
.expect("can create metadata client");

let deadline = tokio::time::Instant::now() + timeout;
loop {
let plan = metadata_client
.get::<SchedulingPlan>(SCHEDULING_PLAN_KEY.clone())
.await?;
if plan.is_none_or(|p| {
p.get(&partition_id)
.is_none_or(|p| p.leader != Some(leader))
}) {
metadata_client
.read_modify_write(
SCHEDULING_PLAN_KEY.clone(),
|scheduling_plan: Option<SchedulingPlan>| {
let mut plan_builder = scheduling_plan.unwrap().into_builder();
plan_builder.modify_partition(&partition_id, |partition| {
if partition.leader == Some(leader) {
return false;
}
partition.leader = Some(leader);
true
});
anyhow::Ok::<SchedulingPlan>(plan_builder.build())
},
)
.await?;
}
tokio::time::sleep(Duration::from_millis(500)).await;

let cluster_state = client
.get_cluster_state(ClusterStateRequest {})
.await?
.into_inner()
.cluster_state
.unwrap();

let applied_lsn: Vec<_> = cluster_state
if cluster_state
.nodes
.values()
.filter_map(|n| {
n.state
.as_ref()
.map(|s| match s {
State::Alive(s) => s
.partitions
.get(&partition_id)
.map(|p| p.last_applied_log_lsn)
.unwrap_or_default()
.map(|lsn| (partition_id, lsn.value)),
_ => None,
})
.unwrap_or_default()
.get(&leader.into())
.is_some_and(|ns| match ns.state.as_ref() {
Some(State::Alive(n)) => n.partitions.get(&partition_id.into()).is_some_and(|p| {
RunMode::try_from(p.effective_mode).is_ok_and(|m| m == RunMode::Leader)
}),
_ => false,
})
.collect();

if applied_lsn.len() == expected_processors
&& applied_lsn.iter().all(|(_, lsn)| *lsn == applied_lsn[0].1)
{
info!(
"Node {:#} became leader for partition {:#}",
leader, partition_id
);
break;
}

if tokio::time::Instant::now() > deadline {
fail!(
"Partition processors did not converge on the same applied LSN within {:?}: {:?}",
timeout,
applied_lsn
"Node {:#} did not become leader for partition {:#} within {:?}",
leader,
partition_id,
timeout
)?;
}
tokio::time::sleep(Duration::from_millis(250)).await;
}
Ok(())
}

async fn effective_partition_leader(
client: &mut ClusterCtrlSvcClient<Channel>,
partition_id: PartitionId,
) -> googletest::Result<Option<PlainNodeId>> {
let cluster_state = client
.get_cluster_state(ClusterStateRequest {})
.await?
.into_inner()
.cluster_state
.unwrap();

Ok(cluster_state
.nodes
.iter()
.find(|(_, ns)| match &ns.state {
Some(State::Alive(n)) => n
.partitions
.get(&partition_id.into())
.map(|p| RunMode::try_from(p.effective_mode).is_ok_and(|m| m == RunMode::Leader))
.unwrap_or(false),
_ => false,
})
.map(|(id, _)| PlainNodeId::from(*id)))
}

async fn replicated_loglet_extension(
client: &mut ClusterCtrlSvcClient<Channel>,
log_id: LogId,
new_sequencer: PlainNodeId,
) -> googletest::Result<ChainExtension> {
let mut logs_response = client.list_logs(ListLogsRequest {}).await?.into_inner();

let logs = StorageCodec::decode::<Logs, _>(&mut logs_response.logs)?;
let chain = logs.chain(&log_id).expect("log exists");

let tail_index = chain.tail_index();
let loglet_id = LogletId::new(log_id, tail_index.next());
let tail_segment = chain.tail();

let last_params =
ReplicatedLogletParams::deserialize_from(tail_segment.config.params.as_bytes())?;

// --- get nodes ---
let mut nodes_response = client.list_nodes(ListNodesRequest {}).await?.into_inner();
let nodes_configuration =
StorageCodec::decode::<NodesConfiguration, _>(&mut nodes_response.nodes_configuration)?;
let nodes = nodes_configuration.iter().collect::<BTreeMap<_, _>>();

// --- construct new replicated log params ---
let mut nodeset = last_params.nodeset.clone();
nodeset.insert(new_sequencer); // just in case

let params = ReplicatedLogletParams {
loglet_id,
nodeset,
replication: last_params.replication.clone(),
sequencer: nodes
.get(&new_sequencer)
.expect("proposed sequencer node exists")
.current_generation,
};

Ok(ChainExtension {
provider: ProviderKind::Replicated.to_string(),
segment_index: None,
params: params.serialize()?,
})
}

struct TestNetworkOptions {
connect_timeout: u64,
request_timeout: u64,
Expand Down

0 comments on commit 8fb6ab9

Please sign in to comment.