From 1c6d38336635253d1a8e9b0d309ad09b29225dfa Mon Sep 17 00:00:00 2001 From: Nicolas Belouin Date: Mon, 11 Sep 2023 13:22:33 +0200 Subject: [PATCH] fix(agent): correct agent's behavior on Instance deletion (#654) * fix(agent): correct agent's behavior on Instance deletion Make the agent remove itself from the instance nodes list instead of deleting the instance if it's not the last node with access to the device. Refactored the way we get the AGENT_NODE_NAME to only fetch it once at startup. This fixes #650. Signed-off-by: Nicolas Belouin * Update patch version Signed-off-by: github-actions[bot] --------- Signed-off-by: Nicolas Belouin Signed-off-by: github-actions[bot] Co-authored-by: github-actions[bot] --- Cargo.lock | 28 +- agent/Cargo.toml | 2 +- agent/src/main.rs | 12 +- agent/src/util/config_action.rs | 15 + agent/src/util/device_plugin_builder.rs | 19 +- agent/src/util/discovery_operator.rs | 311 ++++++++++++++---- controller/Cargo.toml | 2 +- deployment/helm/Chart.yaml | 4 +- .../debug-echo-discovery-handler/Cargo.toml | 2 +- .../onvif-discovery-handler/Cargo.toml | 2 +- .../opcua-discovery-handler/Cargo.toml | 2 +- .../udev-discovery-handler/Cargo.toml | 2 +- discovery-handlers/debug-echo/Cargo.toml | 2 +- discovery-handlers/onvif/Cargo.toml | 2 +- discovery-handlers/opcua/Cargo.toml | 2 +- discovery-handlers/udev/Cargo.toml | 2 +- discovery-utils/Cargo.toml | 2 +- samples/brokers/udev-video-broker/Cargo.toml | 2 +- shared/Cargo.toml | 2 +- version.txt | 2 +- webhooks/validating/configuration/Cargo.toml | 2 +- 21 files changed, 321 insertions(+), 98 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index dca5c20f4..59031e586 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -298,7 +298,7 @@ checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" [[package]] name = "agent" -version = "0.12.7" +version = "0.12.8" dependencies = [ "akri-debug-echo", "akri-discovery-utils", @@ -358,7 +358,7 @@ dependencies = [ [[package]] name = "akri-debug-echo" -version = "0.12.7" +version = "0.12.8" dependencies = [ "akri-discovery-utils", "akri-shared", @@ -376,7 +376,7 @@ dependencies = [ [[package]] name = "akri-discovery-utils" -version = "0.12.7" +version = "0.12.8" dependencies = [ "akri-shared", "anyhow", @@ -398,7 +398,7 @@ dependencies = [ [[package]] name = "akri-onvif" -version = "0.12.7" +version = "0.12.8" dependencies = [ "akri-discovery-utils", "anyhow", @@ -427,7 +427,7 @@ dependencies = [ [[package]] name = "akri-opcua" -version = "0.12.7" +version = "0.12.8" dependencies = [ "akri-discovery-utils", "anyhow", @@ -446,7 +446,7 @@ dependencies = [ [[package]] name = "akri-shared" -version = "0.12.7" +version = "0.12.8" dependencies = [ "anyhow", "async-trait", @@ -471,7 +471,7 @@ dependencies = [ [[package]] name = "akri-udev" -version = "0.12.7" +version = "0.12.8" dependencies = [ "akri-discovery-utils", "anyhow", @@ -986,7 +986,7 @@ checksum = "fbdcdcb6d86f71c5e97409ad45898af11cbc995b4ee8112d59095a28d376c935" [[package]] name = "controller" -version = "0.12.7" +version = "0.12.8" dependencies = [ "akri-shared", "anyhow", @@ -1172,7 +1172,7 @@ dependencies = [ [[package]] name = "debug-echo-discovery-handler" -version = "0.12.7" +version = "0.12.8" dependencies = [ "akri-debug-echo", "akri-discovery-utils", @@ -2469,7 +2469,7 @@ checksum = "b7e5500299e16ebb147ae15a00a942af264cf3688f47923b8fc2cd5858f23ad3" [[package]] name = "onvif-discovery-handler" -version = "0.12.7" +version = "0.12.8" dependencies = [ "akri-discovery-utils", "akri-onvif", @@ -2519,7 +2519,7 @@ dependencies = [ [[package]] name = "opcua-discovery-handler" -version = "0.12.7" +version = "0.12.8" dependencies = [ "akri-discovery-utils", "akri-opcua", @@ -4118,7 +4118,7 @@ dependencies = [ [[package]] name = "udev-discovery-handler" -version = "0.12.7" +version = "0.12.8" dependencies = [ "akri-discovery-utils", "akri-udev", @@ -4129,7 +4129,7 @@ dependencies = [ [[package]] name = "udev-video-broker" -version = "0.12.7" +version = "0.12.8" dependencies = [ "akri-shared", "env_logger", @@ -4406,7 +4406,7 @@ dependencies = [ [[package]] name = "webhook-configuration" -version = "0.12.7" +version = "0.12.8" dependencies = [ "actix-rt 2.7.0", "actix-web", diff --git a/agent/Cargo.toml b/agent/Cargo.toml index 860b60061..aab6a81bd 100644 --- a/agent/Cargo.toml +++ b/agent/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "agent" -version = "0.12.7" +version = "0.12.8" license = "Apache-2.0" authors = ["Kate Goldenring ", ""] edition = "2018" diff --git a/agent/src/main.rs b/agent/src/main.rs index ea7568b87..c5dc99210 100644 --- a/agent/src/main.rs +++ b/agent/src/main.rs @@ -12,6 +12,7 @@ use log::{info, trace}; use prometheus::{HistogramVec, IntGaugeVec}; use std::{ collections::HashMap, + env, sync::{Arc, Mutex}, time::Duration, }; @@ -52,6 +53,7 @@ async fn main() -> Result<(), Box ); let mut tasks = Vec::new(); + let node_name = env::var("AGENT_NODE_NAME")?; // Start server for Prometheus metrics tasks.push(tokio::spawn(async move { @@ -83,9 +85,13 @@ async fn main() -> Result<(), Box })); tasks.push(tokio::spawn(async move { - config_action::do_config_watch(discovery_handler_map, new_discovery_handler_sender_clone) - .await - .unwrap() + config_action::do_config_watch( + discovery_handler_map, + new_discovery_handler_sender_clone, + node_name, + ) + .await + .unwrap() })); futures::future::try_join_all(tasks).await?; diff --git a/agent/src/util/config_action.rs b/agent/src/util/config_action.rs index 4ac2bc6d8..f97d542b0 100644 --- a/agent/src/util/config_action.rs +++ b/agent/src/util/config_action.rs @@ -50,6 +50,7 @@ pub struct ConfigInfo { pub async fn do_config_watch( discovery_handler_map: RegisteredDiscoveryHandlerMap, new_discovery_handler_sender: broadcast::Sender, + node_name: String, ) -> Result<(), Box> { info!("do_config_watch - enter"); let config_map: ConfigMap = Arc::new(RwLock::new(HashMap::new())); @@ -63,6 +64,7 @@ pub async fn do_config_watch( let discovery_handler_map = discovery_handler_map.clone(); let new_discovery_handler_sender = new_discovery_handler_sender.clone(); let new_kube_interface = kube_interface.clone(); + let new_node_name = node_name.clone(); tasks.push(tokio::spawn(async move { handle_config_add( new_kube_interface, @@ -70,6 +72,7 @@ pub async fn do_config_watch( config_map, discovery_handler_map, new_discovery_handler_sender, + new_node_name, ) .await .unwrap(); @@ -83,6 +86,7 @@ pub async fn do_config_watch( config_map, discovery_handler_map, new_discovery_handler_sender, + node_name, ) .await .unwrap(); @@ -99,6 +103,7 @@ async fn watch_for_config_changes( config_map: ConfigMap, discovery_handler_map: RegisteredDiscoveryHandlerMap, new_discovery_handler_sender: broadcast::Sender, + node_name: String, ) -> Result<(), Box> { trace!("watch_for_config_changes - start"); let resource = Api::::all(kube_interface.get_kube_client()); @@ -121,6 +126,7 @@ async fn watch_for_config_changes( config_map.clone(), discovery_handler_map.clone(), new_discovery_handler_sender, + node_name.clone(), ) .await? } @@ -135,6 +141,7 @@ async fn handle_config( config_map: ConfigMap, discovery_handler_map: RegisteredDiscoveryHandlerMap, new_discovery_handler_sender: broadcast::Sender, + node_name: String, ) -> anyhow::Result<()> { trace!("handle_config - something happened to a configuration"); match event { @@ -149,6 +156,7 @@ async fn handle_config( config_map, discovery_handler_map, new_discovery_handler_sender, + node_name, ) .await?; } @@ -186,6 +194,7 @@ async fn handle_config( config_map.clone(), discovery_handler_map.clone(), new_discovery_handler_sender.clone(), + node_name.clone(), ) .await?; } @@ -200,6 +209,7 @@ async fn handle_config_apply( config_map: ConfigMap, discovery_handler_map: RegisteredDiscoveryHandlerMap, new_discovery_handler_sender: broadcast::Sender, + node_name: String, ) -> anyhow::Result<()> { // Applied events can either be newly added Configurations or modified Configurations. // If modified delete all associated instances and device plugins and then recreate them to reflect updated config @@ -231,6 +241,7 @@ async fn handle_config_apply( config_map, discovery_handler_map, new_discovery_handler_sender, + node_name, ) .await .unwrap(); @@ -246,6 +257,7 @@ async fn handle_config_add( config_map: ConfigMap, discovery_handler_map: RegisteredDiscoveryHandlerMap, new_discovery_handler_sender: broadcast::Sender, + node_name: String, ) -> Result<(), Box> { let config_id: ConfigId = ( config.metadata.namespace.clone().unwrap(), @@ -276,6 +288,7 @@ async fn handle_config_add( stop_discovery_sender, &mut finished_discovery_sender, kube_interface, + node_name, ) .await .unwrap(); @@ -446,6 +459,7 @@ mod config_action_tests { config_map.clone(), dh_map.clone(), tx.clone(), + "node-a".to_string(), ) .await .is_ok()); @@ -461,6 +475,7 @@ mod config_action_tests { config_map.clone(), dh_map, tx, + "node-a".to_string(), ) .await .is_ok()); diff --git a/agent/src/util/device_plugin_builder.rs b/agent/src/util/device_plugin_builder.rs index 3c4eccb74..c9dd41b88 100644 --- a/agent/src/util/device_plugin_builder.rs +++ b/agent/src/util/device_plugin_builder.rs @@ -4,8 +4,8 @@ use super::{ KUBELET_SOCKET, LIST_AND_WATCH_MESSAGE_CHANNEL_CAPACITY, }, device_plugin_service::{ - ConfigurationDevicePlugin, DevicePluginBehavior, DevicePluginContext, DevicePluginService, - InstanceDevicePlugin, ListAndWatchMessageKind, + get_device_instance_name, ConfigurationDevicePlugin, DevicePluginBehavior, + DevicePluginContext, DevicePluginService, InstanceDevicePlugin, ListAndWatchMessageKind, }, v1beta1, v1beta1::{ @@ -24,7 +24,7 @@ use log::{info, trace}; #[cfg(test)] use mockall::{automock, predicate::*}; use std::sync::Arc; -use std::{convert::TryFrom, env, path::Path, time::SystemTime}; +use std::{convert::TryFrom, path::Path, time::SystemTime}; use tokio::{ net::UnixListener, net::UnixStream, @@ -39,12 +39,12 @@ use tower::service_fn; pub trait DevicePluginBuilderInterface: Send + Sync { async fn build_device_plugin( &self, - instance_name: String, instance_id: String, config: &Configuration, shared: bool, device_plugin_context: Arc>, device: Device, + node_name: String, ) -> Result<(), Box>; async fn build_configuration_device_plugin( @@ -52,6 +52,7 @@ pub trait DevicePluginBuilderInterface: Send + Sync { device_plugin_name: String, config: &Configuration, device_plugin_context: Arc>, + node_name: String, ) -> Result< broadcast::Sender, Box, @@ -66,13 +67,15 @@ impl DevicePluginBuilderInterface for DevicePluginBuilder { /// This creates a new DevicePluginService for an instance and registers it with the kubelet async fn build_device_plugin( &self, - instance_name: String, instance_id: String, config: &Configuration, shared: bool, device_plugin_context: Arc>, device: Device, + node_name: String, ) -> Result<(), Box> { + let instance_name = + get_device_instance_name(&instance_id, config.metadata.name.as_ref().unwrap()); info!("build_device_plugin - entered for device {}", instance_name); let device_plugin_behavior = DevicePluginBehavior::Instance(InstanceDevicePlugin { instance_id: instance_id.clone(), @@ -87,6 +90,7 @@ impl DevicePluginBuilderInterface for DevicePluginBuilder { device_plugin_context, device_plugin_behavior, list_and_watch_message_sender, + node_name, ) .await } @@ -97,6 +101,7 @@ impl DevicePluginBuilderInterface for DevicePluginBuilder { device_plugin_name: String, config: &Configuration, device_plugin_context: Arc>, + node_name: String, ) -> Result< broadcast::Sender, Box, @@ -115,6 +120,7 @@ impl DevicePluginBuilderInterface for DevicePluginBuilder { device_plugin_context, device_plugin_behavior, list_and_watch_message_sender.clone(), + node_name, ) .await?; Ok(list_and_watch_message_sender) @@ -129,6 +135,7 @@ impl DevicePluginBuilder { device_plugin_context: Arc>, device_plugin_behavior: DevicePluginBehavior, list_and_watch_message_sender: broadcast::Sender, + node_name: String, ) -> Result<(), Box> { let capability_id: String = format!("{}/{}", AKRI_PREFIX, device_plugin_name); let unique_time = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)?; @@ -147,7 +154,7 @@ impl DevicePluginBuilder { config_name: config.metadata.name.clone().unwrap(), config_uid: config.metadata.uid.as_ref().unwrap().clone(), config_namespace: config.metadata.namespace.as_ref().unwrap().clone(), - node_name: env::var("AGENT_NODE_NAME")?, + node_name, device_plugin_context, list_and_watch_message_sender, server_ender_sender: server_ender_sender.clone(), diff --git a/agent/src/util/discovery_operator.rs b/agent/src/util/discovery_operator.rs index e06fc5112..ebdd0e4ff 100644 --- a/agent/src/util/discovery_operator.rs +++ b/agent/src/util/discovery_operator.rs @@ -17,11 +17,13 @@ use akri_discovery_utils::discovery::v0::{ DiscoverResponse, }; use akri_shared::{ - akri::configuration::{ - Configuration, DiscoveryProperty, DiscoveryPropertyKeySelector, DiscoveryPropertySource, + akri::{ + configuration::{ + Configuration, DiscoveryProperty, DiscoveryPropertyKeySelector, DiscoveryPropertySource, + }, + retry::MAX_INSTANCE_UPDATE_TRIES, }, k8s, - os::env_var::{ActualEnvVarQuery, EnvVarQuery}, }; use blake2::{ digest::{Update, VariableOutput}, @@ -240,6 +242,7 @@ impl DiscoveryOperator { kube_interface: Arc, dh_details: &'a DiscoveryDetails, stream: &'a mut dyn StreamingExt, + node_name: String, ) -> anyhow::Result<()> { // clone objects for thread let discovery_operator = Arc::new(self.clone()); @@ -273,6 +276,7 @@ impl DiscoveryOperator { response.devices, dh_details.shared, Box::new(DevicePluginBuilder{}), + node_name.clone(), ) .await?; } @@ -288,6 +292,7 @@ impl DiscoveryOperator { pub async fn delete_offline_instances( &self, kube_interface: Arc, + node_name: String, ) -> Result<(), Box> { trace!( "delete_offline_instances - entered for configuration {:?}", @@ -309,10 +314,11 @@ impl DiscoveryOperator { ) .await .unwrap(); - k8s::try_delete_instance( + try_delete_instance( kube_interface_clone.as_ref(), &instance, self.config.metadata.namespace.as_ref().unwrap(), + node_name.clone(), ) .await?; } @@ -331,6 +337,7 @@ impl DiscoveryOperator { discovery_results: Vec, shared: bool, device_plugin_builder: Box, + node_name: String, ) -> anyhow::Result<()> { let config_name = self.config.metadata.name.clone().unwrap(); trace!( @@ -341,7 +348,7 @@ impl DiscoveryOperator { let currently_visible_instances: HashMap = discovery_results .iter() .map(|discovery_result| { - let id = generate_instance_digest(&discovery_result.id, shared); + let id = generate_instance_digest(&discovery_result.id, shared, &node_name); let instance_name = get_device_instance_name(&id, &config_name); (instance_name, discovery_result.clone()) }) @@ -361,13 +368,14 @@ impl DiscoveryOperator { kube_interface, currently_visible_instances, shared, + node_name.clone(), ) .await?; // If there are newly visible instances associated with a Config, make a device plugin and Instance CR for them if !new_discovery_results.is_empty() { for discovery_result in new_discovery_results { - let id = generate_instance_digest(&discovery_result.id, shared); + let id = generate_instance_digest(&discovery_result.id, shared, &node_name); let instance_name = get_device_instance_name(&id, &config_name); trace!( "handle_discovery_results - new instance {} came online", @@ -376,12 +384,12 @@ impl DiscoveryOperator { let device_plugin_context = self.device_plugin_context.clone(); if let Err(e) = device_plugin_builder .build_device_plugin( - instance_name, id, &self.config, shared, device_plugin_context, discovery_result.clone(), + node_name.clone(), ) .await { @@ -403,6 +411,7 @@ impl DiscoveryOperator { kube_interface: Arc, currently_visible_instances: HashMap, shared: bool, + node_name: String, ) -> anyhow::Result<()> { let instance_map = self.device_plugin_context.read().await.clone().instances; for (instance, instance_info) in instance_map { @@ -495,10 +504,11 @@ impl DiscoveryOperator { ) .await .unwrap(); - k8s::try_delete_instance( + try_delete_instance( kube_interface.as_ref(), &instance, self.config.metadata.namespace.as_ref().unwrap(), + node_name.clone(), ) .await .unwrap(); @@ -582,6 +592,66 @@ impl DiscoveryOperator { } } +async fn try_delete_instance( + kube_interface: &dyn k8s::KubeInterface, + instance_name: &str, + instance_namespace: &str, + node_name: String, +) -> Result<(), anyhow::Error> { + for x in 0..MAX_INSTANCE_UPDATE_TRIES { + // First check if instance still exists + if let Ok(mut instance) = kube_interface + .find_instance(instance_name, instance_namespace) + .await + { + if instance.spec.nodes.contains(&node_name) { + instance.spec.nodes.retain(|node| node != &node_name); + } + if instance.spec.nodes.is_empty() { + match k8s::try_delete_instance(kube_interface, instance_name, instance_namespace) + .await + { + Ok(()) => { + trace!("try_delete_instance - deleted Instance {}", instance_name); + break; + } + Err(e) => { + trace!("try_delete_instance - call to delete_instance returned with error {} on try # {} of {}", e, x, MAX_INSTANCE_UPDATE_TRIES); + if x == (MAX_INSTANCE_UPDATE_TRIES - 1) { + return Err(e); + } + } + } + } else { + match kube_interface + .update_instance( + &instance.spec, + &instance.metadata.name.unwrap(), + instance_namespace, + ) + .await + { + Ok(()) => { + trace!( + "try_delete_instance - updated Instance {} to remove {}", + instance_name, + node_name + ); + break; + } + Err(e) => { + trace!("try_delete_instance - call to update_instance returned with error {} on try # {} of {}", e, x, MAX_INSTANCE_UPDATE_TRIES); + if x == (MAX_INSTANCE_UPDATE_TRIES - 1) { + return Err(e); + } + } + }; + } + } + } + Ok(()) +} + /// This provides a mockable way to query configMap and secret #[cfg_attr(test, automock)] #[tonic::async_trait] @@ -750,6 +820,7 @@ pub mod start_discovery { stop_all_discovery_sender: broadcast::Sender<()>, finished_all_discovery_sender: &mut mpsc::Sender<()>, kube_interface: Arc, + node_name: String, ) -> Result<(), Box> { internal_start_discovery( discovery_operator, @@ -758,6 +829,7 @@ pub mod start_discovery { finished_all_discovery_sender, kube_interface, Box::new(DevicePluginBuilder {}), + node_name, ) .await } @@ -769,6 +841,7 @@ pub mod start_discovery { finished_all_discovery_sender: &mut mpsc::Sender<()>, kube_interface: Arc, device_plugin_builder: Box, + node_name: String, ) -> Result<(), Box> { let config = discovery_operator.get_config(); info!( @@ -791,6 +864,7 @@ pub mod start_discovery { config_dp_name, &config, device_plugin_context.clone(), + node_name.clone(), ) .await { @@ -811,21 +885,28 @@ pub mod start_discovery { // Call discover on already registered Discovery Handlers requested by this Configuration's let known_dh_discovery_operator = discovery_operator.clone(); let known_dh_kube_interface = kube_interface.clone(); + let known_node_name = node_name.clone(); tasks.push(tokio::spawn(async move { - do_discover(known_dh_discovery_operator, known_dh_kube_interface) - .await - .unwrap(); + do_discover( + known_dh_discovery_operator, + known_dh_kube_interface, + known_node_name, + ) + .await + .unwrap(); })); // Listen for new discovery handlers to call discover on let mut stop_all_discovery_receiver = stop_all_discovery_sender.subscribe(); let mut new_discovery_handler_receiver = new_discovery_handler_sender.subscribe(); let new_dh_discovery_operator = discovery_operator.clone(); + let new_node_name = node_name.clone(); tasks.push(tokio::spawn(async move { listen_for_new_discovery_handlers( new_dh_discovery_operator, &mut new_discovery_handler_receiver, &mut stop_all_discovery_receiver, + new_node_name, ) .await .unwrap(); @@ -836,10 +917,11 @@ pub mod start_discovery { let mut stop_all_discovery_receiver = stop_all_discovery_sender.subscribe(); let offline_dh_discovery_operator = discovery_operator.clone(); let offline_dh_kube_interface = kube_interface.clone(); + let offline_node_name = node_name.clone(); tasks.push(tokio::spawn(async move { loop { offline_dh_discovery_operator - .delete_offline_instances(offline_dh_kube_interface.clone()) + .delete_offline_instances(offline_dh_kube_interface.clone(), offline_node_name.clone()) .await .unwrap(); if tokio::time::timeout( @@ -864,6 +946,7 @@ pub mod start_discovery { discovery_operator: Arc, new_discovery_handler_receiver: &mut broadcast::Receiver, stop_all_discovery_receiver: &mut broadcast::Receiver<()>, + node_name: String, ) -> Result<(), Box> { let mut discovery_tasks = Vec::new(); loop { @@ -879,8 +962,9 @@ pub mod start_discovery { if discovery_handler_name == discovery_operator.get_config().spec.discovery_handler.name { trace!("listen_for_new_discovery_handlers - received new registered discovery handler for configuration {:?}", discovery_operator.get_config().metadata.name); let new_discovery_operator = discovery_operator.clone(); + let node_name = node_name.clone(); discovery_tasks.push(tokio::spawn(async move { - do_discover(new_discovery_operator, Arc::new(k8s::KubeImpl::new().await.unwrap())).await.unwrap(); + do_discover(new_discovery_operator, Arc::new(k8s::KubeImpl::new().await.unwrap()), node_name.clone()).await.unwrap(); })); } } @@ -899,6 +983,7 @@ pub mod start_discovery { pub async fn do_discover( discovery_operator: Arc, kube_interface: Arc, + node_name: String, ) -> Result<(), Box> { let mut discovery_tasks = Vec::new(); let config = discovery_operator.get_config(); @@ -927,12 +1012,14 @@ pub mod start_discovery { ); let discovery_operator = discovery_operator.clone(); let kube_interface = kube_interface.clone(); + let node_name = node_name.clone(); discovery_tasks.push(tokio::spawn(async move { do_discover_on_discovery_handler( discovery_operator.clone(), kube_interface.clone(), &endpoint, &dh_details, + node_name.clone(), ) .await .unwrap(); @@ -949,6 +1036,7 @@ pub mod start_discovery { kube_interface: Arc, endpoint: &'a DiscoveryHandlerEndpoint, dh_details: &'a DiscoveryDetails, + node_name: String, ) -> anyhow::Result<()> { loop { if let Some(stream_type) = discovery_operator @@ -958,7 +1046,12 @@ pub mod start_discovery { match stream_type { StreamType::External(mut stream) => { match discovery_operator - .internal_do_discover(kube_interface.clone(), dh_details, &mut stream) + .internal_do_discover( + kube_interface.clone(), + dh_details, + &mut stream, + node_name.clone(), + ) .await { Ok(_) => { @@ -974,6 +1067,7 @@ pub mod start_discovery { kube_interface.clone(), std::collections::HashMap::new(), dh_details.shared, + node_name.clone(), ) .await?; } else { @@ -984,6 +1078,7 @@ pub mod start_discovery { kube_interface.clone(), std::collections::HashMap::new(), dh_details.shared, + node_name.clone(), ) .await?; } @@ -996,7 +1091,12 @@ pub mod start_discovery { #[cfg(any(test, feature = "agent-full"))] StreamType::Embedded(mut stream) => { discovery_operator - .internal_do_discover(kube_interface.clone(), dh_details, &mut stream) + .internal_do_discover( + kube_interface.clone(), + dh_details, + &mut stream, + node_name.clone(), + ) .await?; // Embedded discovery should only return okay if signaled to stop. Otherwise, bubble up error. break; @@ -1041,24 +1141,11 @@ pub mod start_discovery { /// to the same instance name (which is suffixed with this digest). /// However, local devices' Instances should have unique hashes even if they have the same id. /// To ensure this, the node's name is added to the id before it is hashed. -pub fn generate_instance_digest(id_to_digest: &str, shared: bool) -> String { - let env_var_query = ActualEnvVarQuery {}; - inner_generate_instance_digest(id_to_digest, shared, &env_var_query) -} - -pub fn inner_generate_instance_digest( - id_to_digest: &str, - shared: bool, - query: &impl EnvVarQuery, -) -> String { +pub fn generate_instance_digest(id_to_digest: &str, shared: bool, node_name: &str) -> String { let mut id_to_digest = id_to_digest.to_string(); // For local devices, include node hostname in id_to_digest so instances have unique names if !shared { - id_to_digest = format!( - "{}{}", - &id_to_digest, - query.get_env_var("AGENT_NODE_NAME").unwrap() - ); + id_to_digest = format!("{}{}", &id_to_digest, node_name,); } let mut digest = String::new(); let mut hasher = VarBlake2b::new(3).unwrap(); @@ -1251,25 +1338,18 @@ pub mod tests { #[test] fn test_generate_instance_digest() { - let mut mock_env_var_a = MockEnvVarQuery::new(); - mock_env_var_a - .expect_get_env_var() - .returning(|_| Ok("node-a".to_string())); let id = "video1"; - let first_unshared_video_digest = - inner_generate_instance_digest(id, false, &mock_env_var_a); - let first_shared_video_digest = inner_generate_instance_digest(id, true, &mock_env_var_a); - let mut mock_env_var_b = MockEnvVarQuery::new(); - mock_env_var_b - .expect_get_env_var() - .returning(|_| Ok("node-b".to_string())); - let second_unshared_video_digest = - inner_generate_instance_digest(id, false, &mock_env_var_b); - let second_shared_video_digest = inner_generate_instance_digest(id, true, &mock_env_var_b); + let first_unshared_video_digest = generate_instance_digest(id, false, "node-a"); + let first_shared_video_digest = generate_instance_digest(id, true, "node-a"); + + let second_unshared_video_digest = generate_instance_digest(id, false, "node-b"); + let second_shared_video_digest = generate_instance_digest(id, true, "node-b"); // unshared instances visible to different nodes should NOT have the same digest assert_ne!(first_unshared_video_digest, second_unshared_video_digest); // shared instances visible to different nodes should have the same digest assert_eq!(first_shared_video_digest, second_shared_video_digest); + // shared and unshared instance for same node should NOT have the same digest + assert_ne!(first_unshared_video_digest, first_shared_video_digest); } #[tokio::test] @@ -1307,7 +1387,12 @@ pub mod tests { let local_do1 = discovery_operator1.clone(); let discover1 = tokio::spawn(async move { discovery_operator1 - .internal_do_discover(mock_kube_interface1, &dh_details1, &mut rx1) + .internal_do_discover( + mock_kube_interface1, + &dh_details1, + &mut rx1, + "node-a".to_string(), + ) .await .unwrap() }); @@ -1325,7 +1410,12 @@ pub mod tests { )); let discover2 = tokio::spawn(async move { discovery_operator2 - .internal_do_discover(mock_kube_interface2, &dh_details2, &mut rx2) + .internal_do_discover( + mock_kube_interface2, + &dh_details2, + &mut rx2, + "node-a".to_string(), + ) .await .unwrap() }); @@ -1461,7 +1551,7 @@ pub mod tests { mock_discovery_operator .expect_delete_offline_instances() .times(1) - .returning(move |_| Ok(())); + .returning(move |_, _| Ok(())); if terminate { let stop_dh_discovery_sender = discovery_handler_map .lock() @@ -1494,7 +1584,7 @@ pub mod tests { mock_device_plugin_builder .expect_build_configuration_device_plugin() .times(1) - .returning(move |_, _, _| { + .returning(move |_, _, _, _| { let (sender, _) = broadcast::channel(2); Ok(sender) }); @@ -1507,6 +1597,7 @@ pub mod tests { &mut finished_discovery_sender, mock_kube_interface, mock_device_plugin_builder, + "node-a".to_string(), ) .await .unwrap(); @@ -1544,18 +1635,20 @@ pub mod tests { mock_discovery_operator .expect_internal_do_discover() .times(1) - .returning(|_, _, _| Ok(())); + .returning(|_, _, _, _| Ok(())); let mock_kube_interface: Arc = Arc::new(MockKubeInterface::new()); - start_discovery::do_discover(Arc::new(mock_discovery_operator), mock_kube_interface) - .await - .unwrap(); + start_discovery::do_discover( + Arc::new(mock_discovery_operator), + mock_kube_interface, + "node-a".to_string(), + ) + .await + .unwrap(); } #[tokio::test] async fn test_handle_discovery_results() { let _ = env_logger::builder().is_test(true).try_init(); - // Set node name for generating instance id - std::env::set_var("AGENT_NODE_NAME", "node-a"); let mock_kube_interface: Arc = Arc::new(MockKubeInterface::new()); let discovery_handler_map: RegisteredDiscoveryHandlerMap = Arc::new(std::sync::Mutex::new(HashMap::new())); @@ -1595,10 +1688,10 @@ pub mod tests { discovery_results, true, Box::new(mock_device_plugin_builder), + "node-a".to_string(), ) .await .unwrap(); - assert_eq!( INSTANCE_COUNT_METRIC .with_label_values(&[&config_name, "true"]) @@ -1651,6 +1744,32 @@ pub mod tests { } } + fn get_test_instance(nodes: Vec<&str>) -> akri_shared::akri::instance::Instance { + let nodes = nodes.into_iter().map(|e| e.to_string()).collect(); + let mut instance: akri_shared::akri::instance::Instance = serde_json::from_str( + r#" + { + "apiVersion": "akri.sh/v0", + "kind": "Instance", + "metadata": { + "name": "foo", + "namespace": "bar", + "uid": "abcdegfh-ijkl-mnop-qrst-uvwxyz012345" + }, + "spec": { + "configurationName": "", + "nodes": [], + "shared": true, + "deviceUsage": {} + } + } + "#, + ) + .unwrap(); + instance.spec.nodes = nodes; + instance + } + #[tokio::test] async fn test_delete_offline_instances() { let _ = env_logger::builder().is_test(true).try_init(); @@ -1677,7 +1796,7 @@ pub mod tests { device_plugin_context, )); discovery_operator - .delete_offline_instances(Arc::new(mock)) + .delete_offline_instances(Arc::new(mock), "node-a".to_string()) .await .unwrap(); @@ -1698,7 +1817,7 @@ pub mod tests { device_plugin_context, )); discovery_operator - .delete_offline_instances(Arc::new(mock)) + .delete_offline_instances(Arc::new(mock), "node-a".to_string()) .await .unwrap(); @@ -1713,6 +1832,10 @@ pub mod tests { ) .await; let mut mock = MockKubeInterface::new(); + let instance = get_test_instance(vec![]); + mock.expect_find_instance() + .times(2) + .returning(move |_, _| Ok(instance.clone())); mock.expect_delete_instance() .times(2) .returning(move |_, _| Ok(())); @@ -1722,9 +1845,10 @@ pub mod tests { device_plugin_context.clone(), )); discovery_operator - .delete_offline_instances(Arc::new(mock)) + .delete_offline_instances(Arc::new(mock), "node-a".to_string()) .await .unwrap(); + // Make sure all instances are deleted from map. Note, first 3 arguments are ignored. check_status_or_empty_loop( InstanceConnectivityStatus::Online, @@ -1843,6 +1967,10 @@ pub mod tests { ) .await; let mut mock = MockKubeInterface::new(); + let instance = get_test_instance(vec![]); + mock.expect_find_instance() + .times(2) + .returning(move |_, _| Ok(instance.clone())); mock.expect_delete_instance() .times(2) .returning(move |_, _| Ok(())); @@ -1869,6 +1997,10 @@ pub mod tests { // 4: Assert that local devices that go offline are removed from the instance map // let mut mock = MockKubeInterface::new(); + let instance = get_test_instance(vec![]); + mock.expect_find_instance() + .times(2) + .returning(move |_, _| Ok(instance.clone())); mock.expect_delete_instance() .times(2) .returning(move |_, _| Ok(())); @@ -1918,6 +2050,7 @@ pub mod tests { Arc::new(mock), currently_visible_instances, shared, + "node-a".to_string(), ) .await .unwrap(); @@ -2725,4 +2858,66 @@ pub mod tests { get_discovery_property_value_from_config_map(&mock_kube_client, &selector).await; assert_eq!(result.unwrap().unwrap(), expected_result); } + + #[tokio::test] + async fn test_try_delete_instance() { + let _ = env_logger::builder().is_test(true).try_init(); + // should do nothing for non existing instance + let mut kube_interface = MockKubeInterface::new(); + kube_interface + .expect_find_instance() + .with(eq("foo"), eq("bar")) + .returning(move |_, _| Err(anyhow::format_err!("Not Found"))); + try_delete_instance(&kube_interface, "foo", "bar", "node-a".to_string()) + .await + .unwrap(); + + // should delete instance with already empty node list + let mut kube_interface = MockKubeInterface::new(); + let instance = get_test_instance(vec![]); + kube_interface + .expect_find_instance() + .with(eq("foo"), eq("bar")) + .returning(move |_, _| Ok(instance.clone())); + kube_interface + .expect_delete_instance() + .with(eq("foo"), eq("bar")) + .returning(move |_, _| Ok(())); + try_delete_instance(&kube_interface, "foo", "bar", "node-a".to_string()) + .await + .unwrap(); + + // should delete instance with then empty node list + let mut kube_interface = MockKubeInterface::new(); + let instance = get_test_instance(vec!["node-a"]); + kube_interface + .expect_find_instance() + .with(eq("foo"), eq("bar")) + .returning(move |_, _| Ok(instance.clone())); + kube_interface + .expect_delete_instance() + .with(eq("foo"), eq("bar")) + .returning(move |_, _| Ok(())); + try_delete_instance(&kube_interface, "foo", "bar", "node-a".to_string()) + .await + .unwrap(); + + // should update instance with non empty node list + let mut kube_interface = MockKubeInterface::new(); + let instance = get_test_instance(vec!["node-a", "node-b"]); + kube_interface + .expect_find_instance() + .with(eq("foo"), eq("bar")) + .returning(move |_, _| Ok(instance.clone())); + kube_interface + .expect_update_instance() + .times(1) + .withf(move |instance, name, namespace| { + name == "foo" && namespace == "bar" && instance.nodes == vec!["node-b"] + }) + .returning(move |_, _, _| Ok(())); + try_delete_instance(&kube_interface, "foo", "bar", "node-a".to_string()) + .await + .unwrap(); + } } diff --git a/controller/Cargo.toml b/controller/Cargo.toml index f56228c0b..8d7bb4e36 100644 --- a/controller/Cargo.toml +++ b/controller/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "controller" -version = "0.12.7" +version = "0.12.8" license = "Apache-2.0" authors = ["", ""] edition = "2018" diff --git a/deployment/helm/Chart.yaml b/deployment/helm/Chart.yaml index f30c566f0..b5b7d2450 100644 --- a/deployment/helm/Chart.yaml +++ b/deployment/helm/Chart.yaml @@ -16,9 +16,9 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. # Versions are expected to follow Semantic Versioning (https://semver.org/) -version: 0.12.7 +version: 0.12.8 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. Versions are not expected to # follow Semantic Versioning. They should reflect the version the application is using. -appVersion: 0.12.7 +appVersion: 0.12.8 diff --git a/discovery-handler-modules/debug-echo-discovery-handler/Cargo.toml b/discovery-handler-modules/debug-echo-discovery-handler/Cargo.toml index cf7a5bb2e..afe8b2be7 100644 --- a/discovery-handler-modules/debug-echo-discovery-handler/Cargo.toml +++ b/discovery-handler-modules/debug-echo-discovery-handler/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "debug-echo-discovery-handler" -version = "0.12.7" +version = "0.12.8" license = "Apache-2.0" authors = ["Kate Goldenring "] edition = "2018" diff --git a/discovery-handler-modules/onvif-discovery-handler/Cargo.toml b/discovery-handler-modules/onvif-discovery-handler/Cargo.toml index 6baf39820..81e237cbc 100644 --- a/discovery-handler-modules/onvif-discovery-handler/Cargo.toml +++ b/discovery-handler-modules/onvif-discovery-handler/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "onvif-discovery-handler" -version = "0.12.7" +version = "0.12.8" license = "Apache-2.0" authors = ["Kate Goldenring "] edition = "2018" diff --git a/discovery-handler-modules/opcua-discovery-handler/Cargo.toml b/discovery-handler-modules/opcua-discovery-handler/Cargo.toml index 15072b5ef..7f2856b6c 100644 --- a/discovery-handler-modules/opcua-discovery-handler/Cargo.toml +++ b/discovery-handler-modules/opcua-discovery-handler/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "opcua-discovery-handler" -version = "0.12.7" +version = "0.12.8" license = "Apache-2.0" authors = ["Kate Goldenring "] edition = "2018" diff --git a/discovery-handler-modules/udev-discovery-handler/Cargo.toml b/discovery-handler-modules/udev-discovery-handler/Cargo.toml index ad42361a3..9318fa9ce 100644 --- a/discovery-handler-modules/udev-discovery-handler/Cargo.toml +++ b/discovery-handler-modules/udev-discovery-handler/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "udev-discovery-handler" -version = "0.12.7" +version = "0.12.8" license = "Apache-2.0" authors = ["Kate Goldenring "] edition = "2018" diff --git a/discovery-handlers/debug-echo/Cargo.toml b/discovery-handlers/debug-echo/Cargo.toml index d23eb69b6..1c5dc5d39 100644 --- a/discovery-handlers/debug-echo/Cargo.toml +++ b/discovery-handlers/debug-echo/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "akri-debug-echo" -version = "0.12.7" +version = "0.12.8" license = "Apache-2.0" authors = ["Kate Goldenring "] edition = "2018" diff --git a/discovery-handlers/onvif/Cargo.toml b/discovery-handlers/onvif/Cargo.toml index 91003b0e4..f5d2d59e3 100644 --- a/discovery-handlers/onvif/Cargo.toml +++ b/discovery-handlers/onvif/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "akri-onvif" -version = "0.12.7" +version = "0.12.8" license = "Apache-2.0" authors = ["Kate Goldenring "] edition = "2018" diff --git a/discovery-handlers/opcua/Cargo.toml b/discovery-handlers/opcua/Cargo.toml index d8b8c3170..a7fe628a6 100644 --- a/discovery-handlers/opcua/Cargo.toml +++ b/discovery-handlers/opcua/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "akri-opcua" -version = "0.12.7" +version = "0.12.8" license = "Apache-2.0" authors = ["Kate Goldenring "] edition = "2018" diff --git a/discovery-handlers/udev/Cargo.toml b/discovery-handlers/udev/Cargo.toml index aacbfd374..4e64cdcbf 100644 --- a/discovery-handlers/udev/Cargo.toml +++ b/discovery-handlers/udev/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "akri-udev" -version = "0.12.7" +version = "0.12.8" license = "Apache-2.0" authors = ["Kate Goldenring "] edition = "2018" diff --git a/discovery-utils/Cargo.toml b/discovery-utils/Cargo.toml index 3da36d8c5..dcdcbaa78 100644 --- a/discovery-utils/Cargo.toml +++ b/discovery-utils/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "akri-discovery-utils" -version = "0.12.7" +version = "0.12.8" license = "Apache-2.0" authors = ["Kate Goldenring "] edition = "2018" diff --git a/samples/brokers/udev-video-broker/Cargo.toml b/samples/brokers/udev-video-broker/Cargo.toml index 5d611bcee..67055fa7a 100644 --- a/samples/brokers/udev-video-broker/Cargo.toml +++ b/samples/brokers/udev-video-broker/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "udev-video-broker" -version = "0.12.7" +version = "0.12.8" license = "Apache-2.0" authors = ["Kate Goldenring ", ""] edition = "2018" diff --git a/shared/Cargo.toml b/shared/Cargo.toml index 5aeeecbf8..855ba6178 100644 --- a/shared/Cargo.toml +++ b/shared/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "akri-shared" -version = "0.12.7" +version = "0.12.8" license = "Apache-2.0" authors = [""] edition = "2018" diff --git a/version.txt b/version.txt index e2e3067dd..7bfd8360b 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -0.12.7 +0.12.8 diff --git a/webhooks/validating/configuration/Cargo.toml b/webhooks/validating/configuration/Cargo.toml index ae8e3fc47..2a34706ce 100644 --- a/webhooks/validating/configuration/Cargo.toml +++ b/webhooks/validating/configuration/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "webhook-configuration" -version = "0.12.7" +version = "0.12.8" license = "Apache-2.0" authors = ["DazWilkin "] edition = "2018"