Skip to content

Commit

Permalink
Avoid admin roles in local cluster runner (#2026)
Browse files Browse the repository at this point in the history
* Avoid admin roles in local cluster runner

For the time being we only want to run admin on the metadata node, which
will also be allowed to bootstrap.

* Check admins are ready before bringing up more nodes

* Fix doc comments
  • Loading branch information
jackkleeman authored Oct 7, 2024
1 parent 7756aad commit 060a5ed
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ async fn main() {
let nodes = Node::new_test_nodes_with_metadata(
base_config,
BinarySource::CargoTest,
enum_set!(Role::Admin | Role::Worker),
enum_set!(Role::Worker),
2,
);

Expand Down
42 changes: 25 additions & 17 deletions crates/local-cluster-runner/src/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ pub struct Cluster {
}

impl<C, N> ClusterBuilder<(C, N, ())> {
// Use a tempdir as the basedir; this will be removed on Cluster/StartedCluster drop.
// You may set LOCAL_CLUSTER_RUNNER_RETAIN_TEMPDIR=true to instead log it out and retain
// it.
/// Use a tempdir as the basedir; this will be removed on Cluster/StartedCluster drop.
/// You may set LOCAL_CLUSTER_RUNNER_RETAIN_TEMPDIR=true to instead log it out and retain
/// it.
pub fn temp_base_dir(self) -> ClusterBuilder<(C, N, (MaybeTempDir,))> {
let maybe_temp_dir = tempfile::tempdir().expect("to create a tempdir").into();
let base_dir = (maybe_temp_dir,);
Expand All @@ -52,6 +52,8 @@ fn default_cluster_name() -> String {
pub enum ClusterStartError {
#[error("Failed to start node {0}: {1}")]
NodeStartError(usize, NodeStartError),
#[error("Admin node is not healthy after waiting 60 seconds")]
AdminUnhealthy,
#[error("Failed to create cluster base directory: {0}")]
CreateDirectory(io::Error),
#[error("Failed to create metadata client: {0}")]
Expand Down Expand Up @@ -86,11 +88,17 @@ impl Cluster {
);

for (i, node) in nodes.into_iter().enumerate() {
started_nodes.push(
node.start_clustered(base_dir.as_path(), &cluster_name)
.await
.map_err(|err| ClusterStartError::NodeStartError(i, err))?,
)
let node = node
.start_clustered(base_dir.as_path(), &cluster_name)
.await
.map_err(|err| ClusterStartError::NodeStartError(i, err))?;
if node.admin_address().is_some() {
// admin nodes are needed for later nodes to bootstrap. we should wait until they are serving
if !node.wait_admin_healthy(Duration::from_secs(30)).await {
return Err(ClusterStartError::AdminUnhealthy);
}
}
started_nodes.push(node)
}

Ok(StartedCluster {
Expand All @@ -116,31 +124,31 @@ impl StartedCluster {
&self.cluster_name
}

// Send a SIGKILL to every node in the cluster
/// Send a SIGKILL to every node in the cluster
pub async fn kill(&mut self) -> io::Result<()> {
future::try_join_all(self.nodes.iter_mut().map(|n| n.kill()))
.await
.map(drop)
}

// Send a SIGTERM to every node in the cluster
/// Send a SIGTERM to every node in the cluster
pub fn terminate(&self) -> io::Result<()> {
for node in &self.nodes {
node.terminate()?
}
Ok(())
}

// Send a SIGTERM to every node in the cluster, then wait for `dur` for them to exit,
// otherwise send a SIGKILL to nodes that are still running.
/// Send a SIGTERM to every node in the cluster, then wait for `dur` for them to exit,
/// otherwise send a SIGKILL to nodes that are still running.
pub async fn graceful_shutdown(&mut self, dur: Duration) -> io::Result<()> {
future::try_join_all(self.nodes.iter_mut().map(|n| n.graceful_shutdown(dur)))
.await
.map(drop)
}

// For every node in the cluster with an admin role, wait for up to dur for the admin endpoint
// to respond to health checks, otherwise return false.
/// For every node in the cluster with an admin role, wait for up to dur for the admin endpoint
/// to respond to health checks, otherwise return false.
pub async fn wait_admins_healthy(&self, dur: Duration) -> bool {
future::join_all(
self.nodes
Expand All @@ -153,8 +161,8 @@ impl StartedCluster {
.all(|b| b)
}

// For every node in the cluster with an ingress role, wait for up to dur for the admin endpoint
// to respond to health checks, otherwise return false.
/// For every node in the cluster with an ingress role, wait for up to dur for the admin endpoint
/// to respond to health checks, otherwise return false.
pub async fn wait_ingresses_healthy(&self, dur: Duration) -> bool {
future::join_all(
self.nodes
Expand All @@ -167,7 +175,7 @@ impl StartedCluster {
.all(|b| b)
}

// Wait for all ingress and admin endpoints in the cluster to be healthy
/// Wait for all ingress and admin endpoints in the cluster to be healthy
pub async fn wait_healthy(&self, dur: Duration) -> bool {
future::join(
self.wait_admins_healthy(dur),
Expand Down
62 changes: 33 additions & 29 deletions crates/local-cluster-runner/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ impl Node {
}

// Creates a group of Nodes with a single metadata node "metadata-node", and a given number
// of other nodes ["node-1", ..] each with the provided roles. Node name, roles,
// of other nodes ["node-1", ..] each with the provided roles. Node name, roles,
// bind/advertise addresses, and the metadata address from the base_config will all be overwritten.
pub fn new_test_nodes_with_metadata(
base_config: Configuration,
Expand All @@ -148,18 +148,22 @@ impl Node {
let mut nodes = Vec::with_capacity((size + 1) as usize);

{
let mut base_config = base_config.clone();
base_config.common.allow_bootstrap = true;
nodes.push(Self::new_test_node(
"metadata-node",
base_config.clone(),
base_config,
binary_source.clone(),
enum_set!(Role::Admin | Role::MetadataStore),
));
}

for node in 1..=size {
let mut base_config = base_config.clone();
base_config.common.allow_bootstrap = false;
nodes.push(Self::new_test_node(
format!("node-{node}"),
base_config.clone(),
base_config,
binary_source.clone(),
roles,
));
Expand All @@ -168,10 +172,10 @@ impl Node {
nodes
}

// Start this Node, providing the base_dir and the cluster_name of the cluster its
// expected to attach to. All relative file paths addresses specified in the node config
// (eg, nodename/node.sock) will be absolutized against the base path, and the base dir
// and cluster name present in config will be overwritten.
/// Start this Node, providing the base_dir and the cluster_name of the cluster its
/// expected to attach to. All relative file paths addresses specified in the node config
/// (eg, nodename/node.sock) will be absolutized against the base path, and the base dir
/// and cluster name present in config will be overwritten.
pub async fn start_clustered(
mut self,
base_dir: impl Into<PathBuf>,
Expand Down Expand Up @@ -206,8 +210,8 @@ impl Node {
self.start().await
}

// Start this node with the current config. A subprocess will be created, and a tokio task
// spawned to process output logs and watch for exit.
/// Start this node with the current config. A subprocess will be created, and a tokio task
/// spawned to process output logs and watch for exit.
pub async fn start(self) -> Result<StartedNode, NodeStartError> {
let Self {
base_config,
Expand Down Expand Up @@ -350,13 +354,13 @@ impl Node {
pub enum BinarySource {
Path(OsString),
EnvVar(String),
// Suitable when called from a `cargo run` command, except examples.
// This will attempt to find a `restate-server` binary in the same directory
// as the current binary
/// Suitable when called from a `cargo run` command, except examples.
/// This will attempt to find a `restate-server` binary in the same directory
/// as the current binary
CargoRun,
// Suitable when called from a `cargo test` or `cargo run --example` command;
// this will attempt to find a `restate-server` binary in the parent directory of
// the current binary.
/// Suitable when called from a `cargo test` or `cargo run --example` command;
/// this will attempt to find a `restate-server` binary in the parent directory of
/// the current binary.
CargoTest,
}

Expand Down Expand Up @@ -457,7 +461,7 @@ impl Future for StartedNodeStatus {
}

impl StartedNode {
// Send a SIGKILL to the current process, if it is running, and await for its exit
/// Send a SIGKILL to the current process, if it is running, and await for its exit
pub async fn kill(&mut self) -> io::Result<ExitStatus> {
match self.status {
StartedNodeStatus::Exited(status) => Ok(status),
Expand All @@ -479,7 +483,7 @@ impl StartedNode {
}
}

// Send a SIGTERM to the current process, if it is running
/// Send a SIGTERM to the current process, if it is running
pub fn terminate(&self) -> io::Result<()> {
match self.status {
StartedNodeStatus::Exited(_) => Ok(()),
Expand All @@ -499,7 +503,7 @@ impl StartedNode {
}
}

// Send a SIGTERM, then wait for `dur` for exit, otherwise send a SIGKILL
/// Send a SIGTERM, then wait for `dur` for exit, otherwise send a SIGKILL
pub async fn graceful_shutdown(&mut self, dur: Duration) -> io::Result<ExitStatus> {
match self.status {
StartedNodeStatus::Exited(status) => Ok(status),
Expand All @@ -526,15 +530,15 @@ impl StartedNode {
}
}

// Get the pid of the subprocess. Returns none after it has exited.
/// Get the pid of the subprocess. Returns none after it has exited.
pub fn pid(&self) -> Option<u32> {
match self.status {
StartedNodeStatus::Exited { .. } | StartedNodeStatus::Failed { .. } => None,
StartedNodeStatus::Running { pid, .. } => Some(pid),
}
}

// Wait for the node to exit and report its exist status
/// Wait for the node to exit and report its exist status
pub async fn status(&mut self) -> io::Result<ExitStatus> {
(&mut self.status).await
}
Expand Down Expand Up @@ -575,8 +579,8 @@ impl StartedNode {
}
}

// Obtain a stream of loglines matching this pattern. The stream will end
// when the stdout and stderr files on the process close.
/// Obtain a stream of loglines matching this pattern. The stream will end
/// when the stdout and stderr files on the process close.
pub fn lines(&self, pattern: Regex) -> impl Stream<Item = String> + '_ {
match self.status {
StartedNodeStatus::Exited { .. } => futures::stream::empty().left_stream(),
Expand All @@ -588,7 +592,7 @@ impl StartedNode {
}
}

// Obtain a metadata client based on this nodes client config.
/// Obtain a metadata client based on this nodes client config.
pub async fn metadata_client(
&self,
) -> Result<restate_metadata_store::MetadataStoreClient, GenericError> {
Expand All @@ -598,7 +602,7 @@ impl StartedNode {
.await
}

// Check to see if the admin address is healthy. Returns false if this node has no admin role.
/// Check to see if the admin address is healthy. Returns false if this node has no admin role.
pub async fn admin_healthy(&self) -> bool {
if let Some(address) = self.admin_address() {
match reqwest::get(format!("http://{address}/health")).await {
Expand All @@ -610,8 +614,8 @@ impl StartedNode {
}
}

// Check every 250ms to see if the admin address is healthy, waiting for up to `timeout`.
// Returns false if this node has no admin role.
/// Check every 250ms to see if the admin address is healthy, waiting for up to `timeout`.
/// Returns false if this node has no admin role.
pub async fn wait_admin_healthy(&self, timeout: Duration) -> bool {
let mut attempts = 1;
if tokio::time::timeout(timeout, async {
Expand All @@ -637,7 +641,7 @@ impl StartedNode {
}
}

// Check to see if the ingress address is healthy. Returns false if this node has no ingress role.
/// Check to see if the ingress address is healthy. Returns false if this node has no ingress role.
pub async fn ingress_healthy(&self) -> bool {
if let Some(address) = self.ingress_address() {
match reqwest::get(format!("http://{address}/restate/health")).await {
Expand All @@ -649,8 +653,8 @@ impl StartedNode {
}
}

// Check every 250ms to see if the ingress address is healthy, waiting for up to `timeout`.
// Returns false if this node has no ingress role.
/// Check every 250ms to see if the ingress address is healthy, waiting for up to `timeout`.
/// Returns false if this node has no ingress role.
pub async fn wait_ingress_healthy(&self, timeout: Duration) -> bool {
let mut attempts = 1;
if tokio::time::timeout(timeout, async {
Expand Down
4 changes: 2 additions & 2 deletions server/tests/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ async fn node_id_mismatch() {
let nodes = Node::new_test_nodes_with_metadata(
base_config.clone(),
BinarySource::CargoTest,
enum_set!(Role::Admin | Role::Worker),
enum_set!(Role::Worker),
1,
);

Expand Down Expand Up @@ -64,7 +64,7 @@ async fn cluster_name_mismatch() {
let nodes = Node::new_test_nodes_with_metadata(
base_config.clone(),
BinarySource::CargoTest,
enum_set!(Role::Admin | Role::Worker),
enum_set!(Role::Worker),
1,
);

Expand Down

0 comments on commit 060a5ed

Please sign in to comment.