Skip to content

Commit

Permalink
support replacing nodes by shutting down the old node when a new one …
Browse files Browse the repository at this point in the history
…with the same name spawns (#7)
  • Loading branch information
jobafr authored Nov 8, 2024
1 parent 3589921 commit cb029d1
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 21 deletions.
25 changes: 23 additions & 2 deletions src/client_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ impl ClientApi {
caller_id: &str,
topic: &str,
publisher_apis: &Vec<String>,
) -> anyhow::Result<()> {
) -> anyhow::Result<Value> {
let request = Call::new("publisherUpdate", (caller_id, topic, publisher_apis));
let result = self.client.call::<_, ()>(request).await;
let result = self.client.call::<_, _>(request).await;
Ok(result?)
}

Expand All @@ -67,4 +67,25 @@ impl ClientApi {
let result = self.client.call(request).await;
Ok(result?)
}

/// Requests the node to shut down
///
/// # Arguments
///
/// * `caller_id` - A string slice representing the ID of the caller.
/// * `reason` - Reason for shutting the node down. Will likely show up in logs.
///
/// # Returns
///
/// An `anyhow::Result` indicating whether the request was successful.
pub async fn shutdown(
&self,
caller_id: &str,
reason: &str,
) -> anyhow::Result<()> {
let request = Call::new("shutdown", (caller_id, reason));
let result = self.client.call(request).await;
Ok(result?)
}
}
64 changes: 45 additions & 19 deletions src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ extern crate dxr;
use dxr_client::{Call, Client, ClientBuilder, Url};
use maplit::hashmap;
use paste::paste;
use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet};
use std::sync::{Arc, RwLock};
use tokio::task::JoinSet;
Expand Down Expand Up @@ -168,16 +169,43 @@ impl Handler for RegisterServiceHandler {
.or_default()
.insert(caller_id.clone(), service_api);

self.data
.nodes
.write()
.unwrap()
.insert(caller_id, caller_api);
register_node(&self.data.nodes, &caller_id, &caller_api).await;

Ok((1, String::from(""), 0).try_to_value()?)
}
}

async fn register_node(nodes : &RwLock<Nodes>, caller_id: &str, caller_api : &str) -> () {
let shutdown_api_url;
{
let mut nodes = nodes.write().unwrap();
match nodes.entry(caller_id.to_owned()) {
Entry::Vacant(v) => {
v.insert(caller_api.to_owned());
return
},
Entry::Occupied(mut e) => {
let e = e.get_mut();
if e == caller_api {
return
} else {
shutdown_api_url = std::mem::replace(e, caller_api.to_owned());
}
}
}
}
let res = shutdown_node(&shutdown_api_url, caller_id).await;
if let Err(e) = res {
log::warn!("Error shutting down previous instance of node '{caller_id}': {e:?}. New node will be registered regardless. Check for stray processes.");
}
}

async fn shutdown_node(client_api_url: &str, node_id : &str) -> anyhow::Result<()> {
let client_api = ClientApi::new(client_api_url);
let res = client_api.shutdown("/master", &format!("[{}] Reason: new node registered with same name", node_id)).await;
res
}

/// Handler for unregistering the caller as a provider of the specified service.
///
/// # Parameters
Expand Down Expand Up @@ -268,11 +296,8 @@ impl Handler for RegisterSubscriberHandler {
.entry(topic.clone())
.or_default()
.insert(caller_id.clone());
self.data
.nodes
.write()
.unwrap()
.insert(caller_id, caller_api);

register_node(&self.data.nodes, &caller_id, &caller_api).await;

let publishers = self
.data
Expand Down Expand Up @@ -376,6 +401,8 @@ impl Handler for RegisterPublisherHandler {
}
}

register_node(&self.data.nodes, &caller_id, &caller_api).await;

// TODO(patwie): Maybe holding the lock for a longer time?
// let mut publications = self.data.publications.write().unwrap();
self.data
Expand All @@ -390,11 +417,6 @@ impl Handler for RegisterPublisherHandler {
.write()
.unwrap()
.insert(topic.clone(), topic_type.clone());
self.data
.nodes
.write()
.unwrap()
.insert(caller_id.clone(), caller_api.clone());

let nodes = self.data.nodes.read().unwrap().clone();
let subscribers_api_urls = self
Expand Down Expand Up @@ -435,9 +457,11 @@ impl Handler for RegisterPublisherHandler {
let r = client_api
.publisher_update(&caller_id.as_str(), &topic.as_str(), &publisher_apis)
.await;
if let Err(r) = r {
log::warn!("publisherUpdate call to {} failed: {}", client_api_url, r);
match r {
Err(e) => log::warn!("publisherUpdate call to {} failed: {}", client_api_url, e),
Ok(v) => log::debug!("publisherUpdate call to {} succeeded, returning: {:?}", client_api_url, v)
}

}

return Ok((1, "", subscribers_api_urls).try_to_value()?);
Expand Down Expand Up @@ -1070,6 +1094,8 @@ impl Handler for SubscribeParamHandler {
let (caller_id, caller_api, key) = Request::try_from_params(params)?;
let key = resolve(&caller_id, &key);

register_node(&self.data.nodes, &caller_id, &caller_api).await;

let mut new_subscription = Some(ParamSubscription {
node_id: caller_id.clone(),
param: key.clone(),
Expand Down Expand Up @@ -1132,13 +1158,13 @@ impl Handler for UnSubscribeParamHandler {
async fn handle(&self, params: &[Value], _headers: HeaderMap) -> HandlerResult {
log::debug!("UnSubscribeParamHandler {:?} ", params);
type Request = (String, String, String);
let (caller_id, _caller_api, key) = Request::try_from_params(params)?;
let (caller_id, caller_api, key) = Request::try_from_params(params)?;
let key = resolve(&caller_id, &key);

let mut parameter_subscriptions = self.data.parameter_subscriptions.write().unwrap();
let mut removed = false;
parameter_subscriptions.retain(|subscription| {
if subscription.node_id == caller_id && subscription.param == key {
if subscription.api_uri == caller_api && subscription.param == key {
removed = true;
false
} else {
Expand Down

0 comments on commit cb029d1

Please sign in to comment.