diff --git a/Cargo.lock b/Cargo.lock index 1874873d7..d95d50d80 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -333,7 +333,7 @@ checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" [[package]] name = "agent" -version = "0.11.4" +version = "0.11.5" dependencies = [ "akri-debug-echo", "akri-discovery-utils", @@ -402,7 +402,7 @@ dependencies = [ [[package]] name = "akri-debug-echo" -version = "0.11.4" +version = "0.11.5" dependencies = [ "akri-discovery-utils", "akri-shared", @@ -422,7 +422,7 @@ dependencies = [ [[package]] name = "akri-discovery-utils" -version = "0.11.4" +version = "0.11.5" dependencies = [ "akri-shared", "anyhow", @@ -444,7 +444,7 @@ dependencies = [ [[package]] name = "akri-onvif" -version = "0.11.4" +version = "0.11.5" dependencies = [ "akri-discovery-utils", "akri-shared", @@ -472,7 +472,7 @@ dependencies = [ [[package]] name = "akri-opcua" -version = "0.11.4" +version = "0.11.5" dependencies = [ "akri-discovery-utils", "akri-shared", @@ -496,7 +496,7 @@ dependencies = [ [[package]] name = "akri-shared" -version = "0.11.4" +version = "0.11.5" dependencies = [ "anyhow", "async-trait", @@ -525,7 +525,7 @@ dependencies = [ [[package]] name = "akri-udev" -version = "0.11.4" +version = "0.11.5" dependencies = [ "akri-discovery-utils", "anyhow", @@ -1043,7 +1043,7 @@ checksum = "fbdcdcb6d86f71c5e97409ad45898af11cbc995b4ee8112d59095a28d376c935" [[package]] name = "controller" -version = "0.11.4" +version = "0.11.5" dependencies = [ "akri-shared", "anyhow", @@ -1243,7 +1243,7 @@ dependencies = [ [[package]] name = "debug-echo-discovery-handler" -version = "0.11.4" +version = "0.11.5" dependencies = [ "akri-debug-echo", "akri-discovery-utils", @@ -2540,7 +2540,7 @@ checksum = "b7e5500299e16ebb147ae15a00a942af264cf3688f47923b8fc2cd5858f23ad3" [[package]] name = "onvif-discovery-handler" -version = "0.11.4" +version = "0.11.5" dependencies = [ "akri-discovery-utils", "akri-onvif", @@ -2590,7 +2590,7 @@ dependencies = [ [[package]] name = "opcua-discovery-handler" -version = "0.11.4" +version = "0.11.5" dependencies = [ "akri-discovery-utils", "akri-opcua", @@ -4189,7 +4189,7 @@ dependencies = [ [[package]] name = "udev-discovery-handler" -version = "0.11.4" +version = "0.11.5" dependencies = [ "akri-discovery-utils", "akri-udev", @@ -4200,7 +4200,7 @@ dependencies = [ [[package]] name = "udev-video-broker" -version = "0.11.4" +version = "0.11.5" dependencies = [ "akri-shared", "env_logger", @@ -4477,7 +4477,7 @@ dependencies = [ [[package]] name = "webhook-configuration" -version = "0.11.4" +version = "0.11.5" dependencies = [ "actix", "actix-rt 2.7.0", diff --git a/agent/Cargo.toml b/agent/Cargo.toml index dc910aa21..f4a2974de 100644 --- a/agent/Cargo.toml +++ b/agent/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "agent" -version = "0.11.4" +version = "0.11.5" authors = ["Kate Goldenring ", ""] edition = "2018" rust-version = "1.68.1" diff --git a/agent/src/util/crictl_containers.rs b/agent/src/util/crictl_containers.rs index a6d257176..241690f44 100644 --- a/agent/src/util/crictl_containers.rs +++ b/agent/src/util/crictl_containers.rs @@ -1,5 +1,6 @@ -use akri_shared::akri::AKRI_SLOT_ANNOTATION_NAME_PREFIX; -use std::collections::{HashMap, HashSet}; +use akri_shared::akri::{instance::device_usage::NodeUsage, AKRI_SLOT_ANNOTATION_NAME_PREFIX}; +use std::collections::HashMap; +use std::str::FromStr; /// Output from crictl query #[derive(Serialize, Deserialize, Clone, Debug)] @@ -16,19 +17,21 @@ struct CriCtlContainer { } /// This gets the usage slots for an instance by getting the annotations that were stored at id `AKRI_SLOT_ANNOTATION_NAME_PREFIX` during allocate. -pub fn get_container_slot_usage(crictl_output: &str) -> HashSet { +pub fn get_container_slot_usage(crictl_output: &str) -> HashMap { match serde_json::from_str::(crictl_output) { Ok(crictl_output_parsed) => crictl_output_parsed .containers .iter() .flat_map(|container| &container.annotations) .filter_map(|(key, value)| { - if key.starts_with(AKRI_SLOT_ANNOTATION_NAME_PREFIX) - && value.eq(key + if key.starts_with(AKRI_SLOT_ANNOTATION_NAME_PREFIX) { + let slot_id = key .strip_prefix(AKRI_SLOT_ANNOTATION_NAME_PREFIX) - .unwrap_or_default()) - { - Some(value.clone()) + .unwrap_or_default(); + match NodeUsage::from_str(value) { + Ok(node_usage) => Some((slot_id.to_string(), node_usage)), + Err(_) => None, + } } else { None } @@ -40,7 +43,7 @@ pub fn get_container_slot_usage(crictl_output: &str) -> HashSet { e, &crictl_output ); - HashSet::default() + HashMap::default() } } } @@ -48,6 +51,7 @@ pub fn get_container_slot_usage(crictl_output: &str) -> HashSet { #[cfg(test)] mod tests { use super::*; + use akri_shared::akri::instance::device_usage::DeviceUsageKind; fn get_container_str(annotation: &str) -> String { format!("{{ \ @@ -86,35 +90,44 @@ mod tests { let _ = env_logger::builder().is_test(true).try_init(); // Empty output - assert_eq!(HashSet::::new(), get_container_slot_usage(r#""#)); + assert_eq!( + HashMap::::new(), + get_container_slot_usage(r#""#) + ); // Empty json output - assert_eq!(HashSet::::new(), get_container_slot_usage(r#"{}"#)); + assert_eq!( + HashMap::::new(), + get_container_slot_usage(r#"{}"#) + ); // Expected output with no containers assert_eq!( - HashSet::::new(), + HashMap::::new(), get_container_slot_usage(r#"{\"containers\": []}"#) ); // Output with syntax error assert_eq!( - HashSet::::new(), + HashMap::::new(), get_container_slot_usage(r#"{ddd}"#) ); // syntax error // Expected output with no slot assert_eq!( - HashSet::::new(), + HashMap::::new(), get_container_slot_usage(&format!( "{{ \"containers\": [ {} ] }}", &get_container_str("") )) ); // Expected output with slot (including unexpected property) - let mut expected = HashSet::new(); - expected.insert("foo".to_string()); + let mut expected = HashMap::new(); + expected.insert( + "foo".to_string(), + NodeUsage::create(&DeviceUsageKind::Instance, "node-a").unwrap(), + ); assert_eq!( expected, get_container_slot_usage(&format!( "{{ \"ddd\": \"\", \"containers\": [ {} ] }}", - &get_container_str("\"akri.agent.slot-foo\": \"foo\",") + &get_container_str("\"akri.agent.slot-foo\": \"node-a\",") )) ); // Expected output with slot @@ -122,19 +135,25 @@ mod tests { expected, get_container_slot_usage(&format!( "{{ \"containers\": [ {} ] }}", - &get_container_str("\"akri.agent.slot-foo\": \"foo\",") + &get_container_str("\"akri.agent.slot-foo\": \"node-a\",") )) ); // Expected output with multiple containers - let mut expected_2 = HashSet::new(); - expected_2.insert("foo1".to_string()); - expected_2.insert("foo2".to_string()); + let mut expected_2 = HashMap::new(); + expected_2.insert( + "foo1".to_string(), + NodeUsage::create(&DeviceUsageKind::Instance, "node-a").unwrap(), + ); + expected_2.insert( + "foo2".to_string(), + NodeUsage::create(&DeviceUsageKind::Instance, "node-b").unwrap(), + ); assert_eq!( expected_2, get_container_slot_usage(&format!( "{{ \"containers\": [ {}, {} ] }}", - &get_container_str("\"akri.agent.slot-foo1\": \"foo1\","), - &get_container_str("\"akri.agent.slot-foo2\": \"foo2\","), + &get_container_str("\"akri.agent.slot-foo1\": \"node-a\","), + &get_container_str("\"akri.agent.slot-foo2\": \"node-b\","), )) ); } diff --git a/agent/src/util/device_plugin_service.rs b/agent/src/util/device_plugin_service.rs index ccc4e19ba..34a4e4876 100644 --- a/agent/src/util/device_plugin_service.rs +++ b/agent/src/util/device_plugin_service.rs @@ -11,6 +11,7 @@ use akri_discovery_utils::discovery::v0::Device; use akri_shared::{ akri::{ configuration::ConfigurationSpec, + instance::device_usage::{DeviceUsageKind, NodeUsage}, instance::InstanceSpec, retry::{random_delay, MAX_INSTANCE_UPDATE_TRIES}, AKRI_SLOT_ANNOTATION_NAME_PREFIX, @@ -23,7 +24,7 @@ use log::{error, info, trace}; use mock_instant::Instant; #[cfg(not(test))] use std::time::Instant; -use std::{collections::HashMap, sync::Arc, time::Duration}; +use std::{collections::HashMap, str::FromStr, sync::Arc, time::Duration}; use tokio::{ sync::{broadcast, mpsc, RwLock}, time::timeout, @@ -66,6 +67,21 @@ pub enum DevicePluginBehavior { Instance(InstanceDevicePlugin), } +#[derive(PartialEq, Clone, Debug)] +pub enum DeviceUsageStatus { + /// Free + Free, + /// Reserved by Configuration Device Plugin on current node + ReservedByConfiguration(String), + /// Reserved by Instance Device Plugin on current node + ReservedByInstance, + /// Reserved by other nodes + ReservedByOtherNode, + /// Unknown, insufficient information to determine the status, + /// mostly due to the device usage slot is not found from the instance map + Unknown, +} + /// Kubernetes Device-Plugin for an Instance. /// /// `DevicePluginService` implements Kubernetes Device-Plugin v1beta1 API specification @@ -97,7 +113,7 @@ pub struct DevicePluginService { pub list_and_watch_message_sender: broadcast::Sender, /// Upon send, terminates function that acts as the shutdown signal for this service pub server_ender_sender: mpsc::Sender<()>, - /// enum object that defines the behavior of the device plugin + /// Enum object that defines the behavior of the device plugin pub device_plugin_behavior: DevicePluginBehavior, } @@ -247,10 +263,27 @@ impl InstanceDevicePlugin { dps.instance_name ); - let virtual_devices = - build_list_and_watch_response(dps.clone(), kube_interface.clone()) - .await - .unwrap(); + let device_usage_states = get_instance_device_usage_states( + &dps.node_name, + &dps.instance_name, + &dps.config_namespace, + &dps.config.capacity, + kube_interface.clone(), + ) + .await; + + let virtual_devices = device_usage_states + .into_iter() + .map(|(id, state)| v1beta1::Device { + id, + health: match state { + DeviceUsageStatus::Free | DeviceUsageStatus::ReservedByInstance => { + HEALTHY.to_string() + } + _ => UNHEALTHY.to_string(), + }, + }) + .collect::>(); // Only send the virtual devices if the list has changed if !(prev_virtual_devices .iter() @@ -261,6 +294,10 @@ impl InstanceDevicePlugin { let resp = v1beta1::ListAndWatchResponse { devices: virtual_devices, }; + info!( + "InstanceDevicePlugin::list_and_watch - for device plugin {}, response = {:?}", + dps.instance_name, resp + ); // Send virtual devices list back to kubelet if let Err(e) = kubelet_update_sender.send(Ok(resp)).await { trace!( @@ -292,13 +329,23 @@ impl InstanceDevicePlugin { "InstanceDevicePlugin::list_and_watch - for Instance {} received message to end", dps.instance_name ); - let devices = build_unhealthy_virtual_devices( - dps.config.capacity, - &dps.instance_name, - ); - kubelet_update_sender.send(Ok(v1beta1::ListAndWatchResponse { devices })) - .await - .unwrap(); + let devices = prev_virtual_devices + .iter() + .map(|d| v1beta1::Device { + id: d.id.clone(), + health: UNHEALTHY.into() + }) + .collect::>(); + if !devices.is_empty() { + let resp = v1beta1::ListAndWatchResponse { devices }; + info!( + "InstanceDevicePlugin::list_and_watch - for device plugin {}, end response = {:?}", + dps.instance_name, resp + ); + kubelet_update_sender.send(Ok(resp)) + .await + .unwrap(); + } dps.server_ender_sender.clone().send(()).await.unwrap(); keep_looping = false; } @@ -324,7 +371,7 @@ impl InstanceDevicePlugin { kube_interface: Arc, ) -> Result, Status> { let mut container_responses: Vec = Vec::new(); - // suffix to add to each device property + // Suffix to add to each device property let device_property_suffix = self.instance_id.to_uppercase(); for request in requests.into_inner().container_requests { @@ -335,6 +382,7 @@ impl InstanceDevicePlugin { ); let mut akri_annotations = HashMap::new(); let mut akri_device_properties = HashMap::new(); + let mut akri_devices = HashMap::::new(); for device_usage_id in request.devices_i_ds { trace!( "InstanceDevicePlugin::allocate - for Instance {} processing request for device usage slot id {}", @@ -342,12 +390,31 @@ impl InstanceDevicePlugin { device_usage_id ); + if let Err(e) = try_update_instance_device_usage( + &device_usage_id, + &dps.node_name, + &dps.instance_name, + &dps.config_namespace, + DeviceUsageKind::Instance, + kube_interface.clone(), + ) + .await + { + trace!("InstanceDevicePlugin::allocate - could not assign {} slot to {} node ... forcing list_and_watch to continue", device_usage_id, &dps.node_name); + dps.list_and_watch_message_sender + .send(ListAndWatchMessageKind::Continue) + .unwrap(); + return Err(e); + } + + let node_usage = + NodeUsage::create(&DeviceUsageKind::Instance, &dps.node_name).unwrap(); akri_annotations.insert( format!("{}{}", AKRI_SLOT_ANNOTATION_NAME_PREFIX, &device_usage_id), - device_usage_id.clone(), + node_usage.to_string(), ); - // add suffix _ to each device property + // Add suffix _ to each device property let converted_properties = self .device .properties @@ -360,22 +427,7 @@ impl InstanceDevicePlugin { }) .collect::>(); akri_device_properties.extend(converted_properties); - - if let Err(e) = try_update_instance_device_usage( - &device_usage_id, - &dps.node_name, - &dps.instance_name, - &dps.config_namespace, - kube_interface.clone(), - ) - .await - { - trace!("InstanceDevicePlugin::allocate - could not assign {} slot to {} node ... forcing list_and_watch to continue", device_usage_id, &dps.node_name); - dps.list_and_watch_message_sender - .send(ListAndWatchMessageKind::Continue) - .unwrap(); - return Err(e); - } + akri_devices.insert(dps.instance_name.clone(), self.device.clone()); trace!( "InstanceDevicePlugin::allocate - finished processing device_usage_id {}", @@ -389,7 +441,7 @@ impl InstanceDevicePlugin { let response = build_container_allocate_response( broker_properties, akri_annotations, - &self.device, + &akri_devices.into_values().collect(), ); container_responses.push(response); } @@ -403,56 +455,82 @@ impl InstanceDevicePlugin { } } -/// This returns true if this node can reserve a `device_usage_id` slot for an instance -/// and false if it is already reserved. -/// # More details -/// Cases based on the usage slot (`device_usage_id`) value -/// 1. device_usage\[id\] == "" ... this means that the device is available for use -/// * (ACTION) return true -/// 2. device_usage\[id\] == self.nodeName ... this means THIS node previously used id, but the DevicePluginManager knows that this is no longer true -/// * (ACTION) return false -/// 3. device_usage\[id\] == (some other node) ... this means that we believe this device is in use by another node and should be marked unhealthy -/// * (ACTION) return error -/// 4. No corresponding id found ... this is an unknown error condition (BAD) -/// * (ACTION) return error -fn slot_available_to_reserve( - device_usage_id: &str, +/// This returns device usage status of all slots for an Instance on a given node +/// if the Instance doesn't exist or fail to parse device usage of its slots return +/// DeviceUsageStatus::Unknown since insufficient information to decide the usage state +pub async fn get_instance_device_usage_states( node_name: &str, - instance: &InstanceSpec, -) -> Result { - if let Some(allocated_node) = instance.device_usage.get(device_usage_id) { - if allocated_node.is_empty() { - Ok(true) - } else if allocated_node == node_name { - Ok(false) - } else { - trace!("slot_available_to_reserve - request for device slot {} previously claimed by a diff node {} than this one {} ... indicates the device on THIS node must be marked unhealthy, invoking ListAndWatch ... returning failure, next scheduling should succeed!", device_usage_id, allocated_node, node_name); - Err(Status::new( - Code::Unknown, - "Requested device already in use", - )) + instance_name: &str, + instance_namespace: &str, + capacity: &i32, + kube_interface: Arc, +) -> Vec<(String, DeviceUsageStatus)> { + let mut device_usage_states = Vec::new(); + match kube_interface + .find_instance(instance_name, instance_namespace) + .await + { + Ok(kube_akri_instance) => { + for (device_name, device_usage_string) in kube_akri_instance.spec.device_usage { + let device_usage_status = match NodeUsage::from_str(&device_usage_string) { + Ok(node_usage) => get_device_usage_state(&node_usage, node_name), + Err(_) => { + error!( + "get_instance_device_usage_states - fail to parse device usage {}", + device_usage_string + ); + DeviceUsageStatus::Unknown + } + }; + device_usage_states.push((device_name.clone(), device_usage_status)); + } + device_usage_states } - } else { - // No corresponding id found - trace!( - "slot_available_to_reserve - could not find {} id in device_usage", - device_usage_id - ); - Err(Status::new( - Code::Unknown, - "Could not find device usage slot", - )) + Err(_) => (0..*capacity) + .map(|x| { + ( + format!("{}-{}", instance_name, x), + DeviceUsageStatus::Unknown, + ) + }) + .collect(), + } +} + +/// This returns device usage status of a `device_usage_id` slot for an instance on a given node +/// # More details +/// Cases based on the device usage value +/// 1. DeviceUsageKind::Free ... this means that the device is available for use +/// * (ACTION) return DeviceUsageStatus::Free +/// 2. node_usage.node_name == node_name ... this means node_name previously used device_usage +/// * (ACTION) return previously reserved kind, DeviceUsageStatus::ReservedByConfiguration or DeviceUsageStatus::ReservedByInstance +/// 3. node_usage.node_name == (some other node) ... this means that we believe this device is in use by another node +/// * (ACTION) return DeviceUsageStatus::ReservedByOtherNode +fn get_device_usage_state(node_usage: &NodeUsage, node_name: &str) -> DeviceUsageStatus { + let device_usage_state = match node_usage.get_kind() { + DeviceUsageKind::Free => DeviceUsageStatus::Free, + DeviceUsageKind::Configuration(vdev_id) => { + DeviceUsageStatus::ReservedByConfiguration(vdev_id) + } + DeviceUsageKind::Instance => DeviceUsageStatus::ReservedByInstance, + }; + if device_usage_state != DeviceUsageStatus::Free && !node_usage.is_same_node(node_name) { + return DeviceUsageStatus::ReservedByOtherNode; } + device_usage_state } /// This tries up to `MAX_INSTANCE_UPDATE_TRIES` to update the requested slot of the Instance with the this node's name. -/// It cannot be assumed that this will successfully update Instance on first try since Device Plugins on other nodes may be simultaneously trying to update the Instance. -/// This returns an error if slot does not need to be updated or `MAX_INSTANCE_UPDATE_TRIES` attempted. +/// It cannot be assumed that this will successfully update Instance on first try since Device Plugins on other nodes +/// may be simultaneously trying to update the Instance. +/// This returns an error if slot already be reserved by other nodes or device plugins, +/// cannot be updated or `MAX_INSTANCE_UPDATE_TRIES` attempted. async fn try_update_instance_device_usage( device_usage_id: &str, node_name: &str, instance_name: &str, instance_namespace: &str, + desired_device_usage_kind: DeviceUsageKind, kube_interface: Arc, ) -> Result<(), Status> { let mut instance: InstanceSpec; @@ -476,27 +554,98 @@ async fn try_update_instance_device_usage( } // Update the instance to reserve this slot for this node iff it is available and not already reserved for this node. - if slot_available_to_reserve(device_usage_id, node_name, &instance)? { - instance - .device_usage - .insert(device_usage_id.to_string(), node_name.to_string()); + let current_device_usage_string = instance.device_usage.get(device_usage_id); + if current_device_usage_string.is_none() { + // No corresponding id found + trace!( + "try_update_instance_device_usage - could not find {} id in device_usage", + device_usage_id + ); + return Err(Status::new( + Code::Unknown, + "Could not find device usage slot", + )); + } - if let Err(e) = kube_interface - .update_instance(&instance, instance_name, instance_namespace) - .await - { - if x == (MAX_INSTANCE_UPDATE_TRIES - 1) { - trace!("try_update_instance_device_usage - update_instance returned error [{}] after max tries ... returning error", e); - return Err(Status::new(Code::Unknown, "Could not update Instance")); + let current_device_usage = NodeUsage::from_str(current_device_usage_string.unwrap()) + .map_err(|_| { + Status::new( + Code::Unknown, + format!( + "Fails to parse {} to DeviceUsage ", + current_device_usage_string.unwrap() + ), + ) + })?; + // Call get_device_usage_state to check current device usage to see if the slot can be reserved. + // A device usage slot can be reserved if it's free or already reserved by this node and the desired usage kind matches. + // For slots owned by this node, get_device_usage_state returns ReservedByConfiguration or ReservedByInstance. + // For slots owned by other nodes (by Configuration or Instance), get_device_usage_state returns ReservedByOtherNode. + match get_device_usage_state(¤t_device_usage, node_name) { + DeviceUsageStatus::Free => { + let new_device_usage = NodeUsage::create(&desired_device_usage_kind, node_name) + .map_err(|e| { + Status::new( + Code::Unknown, + format!("Fails to create DeviceUsage - {}", e), + ) + })?; + instance + .device_usage + .insert(device_usage_id.to_string(), new_device_usage.to_string()); + + if let Err(e) = kube_interface + .update_instance(&instance, instance_name, instance_namespace) + .await + { + if x == (MAX_INSTANCE_UPDATE_TRIES - 1) { + trace!("try_update_instance_device_usage - update_instance returned error [{}] after max tries ... returning error", e); + return Err(Status::new(Code::Unknown, "Could not update Instance")); + } + random_delay().await; + } else { + return Ok(()); } - random_delay().await; - } else { - return Ok(()); } - } else { - // Instance slot already reserved for this node - return Ok(()); - } + DeviceUsageStatus::ReservedByConfiguration(_) => { + if matches!(desired_device_usage_kind, DeviceUsageKind::Configuration(_)) { + return Ok(()); + } else { + return Err(Status::new( + Code::Unknown, + "Requested device already in use", + )); + } + } + DeviceUsageStatus::ReservedByInstance => { + if matches!(desired_device_usage_kind, DeviceUsageKind::Instance) { + return Ok(()); + } else { + return Err(Status::new( + Code::Unknown, + "Requested device already in use", + )); + } + } + DeviceUsageStatus::ReservedByOtherNode => { + trace!("try_update_instance_device_usage - request for device slot {} previously claimed by a diff node {} than this one {} ... indicates the device on THIS node must be marked unhealthy, invoking ListAndWatch ... returning failure, next scheduling should succeed!", + device_usage_id, current_device_usage.get_node_name(), node_name); + return Err(Status::new( + Code::Unknown, + "Requested device already in use", + )); + } + DeviceUsageStatus::Unknown => { + trace!( + "try_update_instance_device_usage - request for device slot {} status unknown!", + device_usage_id + ); + return Err(Status::new( + Code::Unknown, + "Requested device usage status unknown", + )); + } + }; } Ok(()) } @@ -505,35 +654,41 @@ async fn try_update_instance_device_usage( fn build_container_allocate_response( broker_properties: HashMap, annotations: HashMap, - device: &Device, + devices: &Vec, ) -> v1beta1::ContainerAllocateResponse { - // Cast v0 discovery Mount and DeviceSpec types to v1beta1 DevicePlugin types - let mounts: Vec = device - .mounts - .clone() - .into_iter() - .map(|mount| Mount { - container_path: mount.container_path, - host_path: mount.host_path, - read_only: mount.read_only, - }) - .collect(); - let device_specs: Vec = device - .device_specs - .clone() - .into_iter() - .map(|device_spec| DeviceSpec { - container_path: device_spec.container_path, - host_path: device_spec.host_path, - permissions: device_spec.permissions, - }) - .collect(); + let mut total_mounts = Vec::new(); + let mut total_device_specs = Vec::new(); + for device in devices { + // Cast v0 discovery Mount and DeviceSpec types to v1beta1 DevicePlugin types + let mounts: Vec = device + .mounts + .clone() + .into_iter() + .map(|mount| Mount { + container_path: mount.container_path, + host_path: mount.host_path, + read_only: mount.read_only, + }) + .collect(); + total_mounts.extend(mounts); + let device_specs: Vec = device + .device_specs + .clone() + .into_iter() + .map(|device_spec| DeviceSpec { + container_path: device_spec.container_path, + host_path: device_spec.host_path, + permissions: device_spec.permissions, + }) + .collect(); + total_device_specs.extend(device_specs); + } // Create response, setting environment variables to be an instance's properties. v1beta1::ContainerAllocateResponse { annotations, - mounts, - devices: device_specs, + mounts: total_mounts, + devices: total_device_specs, envs: broker_properties, } } @@ -560,7 +715,12 @@ async fn try_create_instance( } let device_usage: std::collections::HashMap = (0..dps.config.capacity) - .map(|x| (format!("{}-{}", dps.instance_name, x), "".to_string())) + .map(|x| { + ( + format!("{}-{}", dps.instance_name, x), + NodeUsage::default().to_string(), + ) + }) .collect(); let instance = InstanceSpec { configuration_name: dps.config_name.clone(), @@ -658,125 +818,6 @@ async fn try_create_instance( Ok(()) } -/// Returns list of "virtual" Devices and their health. -/// If the instance is offline, returns all unhealthy virtual Devices. -async fn build_list_and_watch_response( - dps: Arc, - kube_interface: Arc, -) -> Result, Box> { - info!( - "build_list_and_watch_response -- for Instance {} entered", - dps.instance_name - ); - - // If instance has been removed from map, send back all unhealthy device slots - if !dps - .instance_map - .read() - .await - .contains_key(&dps.instance_name) - { - trace!("build_list_and_watch_response - Instance {} removed from map ... returning unhealthy devices", dps.instance_name); - return Ok(build_unhealthy_virtual_devices( - dps.config.capacity, - &dps.instance_name, - )); - } - // If instance is offline, send back all unhealthy device slots - if dps - .instance_map - .read() - .await - .get(&dps.instance_name) - .unwrap() - .connectivity_status - != InstanceConnectivityStatus::Online - { - trace!("build_list_and_watch_response - device for Instance {} is offline ... returning unhealthy devices", dps.instance_name); - return Ok(build_unhealthy_virtual_devices( - dps.config.capacity, - &dps.instance_name, - )); - } - - trace!( - "build_list_and_watch_response -- device for Instance {} is online", - dps.instance_name - ); - - match kube_interface - .find_instance(&dps.instance_name, &dps.config_namespace) - .await - { - Ok(kube_akri_instance) => Ok(build_virtual_devices( - &kube_akri_instance.spec.device_usage, - kube_akri_instance.spec.shared, - &dps.node_name, - )), - Err(_) => { - trace!("build_list_and_watch_response - could not find instance {} so returning unhealthy devices", dps.instance_name); - Ok(build_unhealthy_virtual_devices( - dps.config.capacity, - &dps.instance_name, - )) - } - } -} - -/// This builds a list of unhealthy virtual Devices. -fn build_unhealthy_virtual_devices(capacity: i32, instance_name: &str) -> Vec { - let mut devices: Vec = Vec::new(); - for x in 0..capacity { - let device = v1beta1::Device { - id: format!("{}-{}", instance_name, x), - health: UNHEALTHY.to_string(), - }; - trace!( - "build_unhealthy_virtual_devices -- for Instance {} reporting unhealthy devices for device with name [{}] and health: [{}]", - instance_name, - device.id, - device.health, - ); - devices.push(device); - } - devices -} - -/// This builds a list of virtual Devices, determining the health of each virtual Device as follows: -/// Healthy if it is available to be used by this node or Unhealthy if it is already taken by another node. -fn build_virtual_devices( - device_usage: &HashMap, - shared: bool, - node_name: &str, -) -> Vec { - let mut devices: Vec = Vec::new(); - for (device_name, allocated_node) in device_usage { - // Throw error if unshared resource is reserved by another node - if !shared && !allocated_node.is_empty() && allocated_node != node_name { - panic!("build_virtual_devices - unshared device reserved by a different node"); - } - // Advertise the device as Unhealthy if it is - // USED by !this_node && SHARED - let unhealthy = shared && !allocated_node.is_empty() && allocated_node != node_name; - let health = if unhealthy { - UNHEALTHY.to_string() - } else { - HEALTHY.to_string() - }; - trace!( - "build_virtual_devices - [shared = {}] device with name [{}] and health: [{}]", - shared, - device_name, - health - ); - devices.push(v1beta1::Device { - id: device_name.clone(), - health, - }); - } - devices -} - /// This sends message to end `list_and_watch` and removes instance from InstanceMap. /// Called when an instance has been offline for too long. pub async fn terminate_device_plugin_service( @@ -879,6 +920,37 @@ mod device_plugin_service_tests { }); } + fn setup_find_instance_with_mock_instances( + mock: &mut MockKubeInterface, + instance_namespace: &str, + mock_instances: Vec<(String, Instance)>, + ) { + for (instance_name, kube_instance) in mock_instances { + let instance_namespace = instance_namespace.to_string(); + mock.expect_find_instance() + .times(1) + .withf(move |name: &str, namespace: &str| { + namespace == instance_namespace && name == instance_name + }) + .returning(move |_, _| Ok(kube_instance.clone())); + } + } + + fn setup_find_instance_with_not_found_err( + mock: &mut MockKubeInterface, + instance_name: &str, + instance_namespace: &str, + ) { + let instance_name = instance_name.to_string(); + let instance_namespace = instance_namespace.to_string(); + mock.expect_find_instance() + .times(1) + .withf(move |name: &str, namespace: &str| { + namespace == instance_namespace && name == instance_name + }) + .returning(move |_, _| Err(get_kube_not_found_error().into())); + } + fn create_device_plugin_service( _device_plugin_kind: DevicePluginKind, connectivity_status: InstanceConnectivityStatus, @@ -939,21 +1011,6 @@ mod device_plugin_service_tests { ) } - fn check_devices(instance_name: String, devices: Vec) { - let capacity: usize = 5; - // update_virtual_devices_health returns devices in jumbled order (ie 2, 4, 1, 5, 3) - let expected_device_ids: Vec = (0..capacity) - .map(|x| format!("{}-{}", instance_name, x)) - .collect(); - assert_eq!(devices.len(), capacity); - for device in expected_device_ids { - assert!(devices - .iter() - .map(|device| device.id.clone()) - .any(|d| d == device)); - } - } - fn get_kube_not_found_error() -> kube::Error { // Mock error thrown when instance not found kube::Error::Api(kube::error::ErrorResponse { @@ -997,6 +1054,79 @@ mod device_plugin_service_tests { assert_eq!(all_properties.get("OVERWRITE").unwrap(), "222"); } + // Test correct device usage status is returned when a device usage slot is used on the same node + #[test] + fn test_get_device_usage_state_same_node() { + let _ = env_logger::builder().is_test(true).try_init(); + let this_node = "node-a"; + let vdev_id = "vdev_0"; + // Free + assert_eq!( + get_device_usage_state( + &NodeUsage::create(&DeviceUsageKind::Free, "").unwrap(), + this_node + ), + DeviceUsageStatus::Free + ); + // Used by Configuration + assert_eq!( + get_device_usage_state( + &NodeUsage::create( + &DeviceUsageKind::Configuration(vdev_id.to_string()), + this_node + ) + .unwrap(), + this_node + ), + DeviceUsageStatus::ReservedByConfiguration(vdev_id.to_string()) + ); + // Used by Instance + assert_eq!( + get_device_usage_state( + &NodeUsage::create(&DeviceUsageKind::Instance, this_node).unwrap(), + this_node + ), + DeviceUsageStatus::ReservedByInstance + ); + } + + // Test DeviceUsageStatus::ReservedByOtherNode is returned when a device usage slot is used on a different node + #[test] + fn test_get_device_usage_state_different_node() { + let _ = env_logger::builder().is_test(true).try_init(); + let this_node = "node-a"; + let that_node = "node-b"; + let vdev_id = "vdev_0"; + // Free + assert_eq!( + get_device_usage_state( + &NodeUsage::create(&DeviceUsageKind::Free, "").unwrap(), + this_node + ), + DeviceUsageStatus::Free + ); + // Used by Configuration + assert_eq!( + get_device_usage_state( + &NodeUsage::create( + &DeviceUsageKind::Configuration(vdev_id.to_string()), + that_node + ) + .unwrap(), + this_node + ), + DeviceUsageStatus::ReservedByOtherNode + ); + // Used by Instance + assert_eq!( + get_device_usage_state( + &NodeUsage::create(&DeviceUsageKind::Instance, that_node).unwrap(), + this_node + ), + DeviceUsageStatus::ReservedByOtherNode + ); + } + fn configure_find_configuration( mock: &mut MockKubeInterface, config_name: String, @@ -1297,137 +1427,6 @@ mod device_plugin_service_tests { }; } - #[tokio::test] - async fn test_build_virtual_devices() { - let mut device_usage: HashMap = HashMap::new(); - let mut expected_devices_nodea: HashMap = HashMap::new(); - let mut expected_devices_nodeb: HashMap = HashMap::new(); - let instance_name = "s0meH@sH"; - for x in 0..5 { - if x % 2 == 0 { - device_usage.insert(format!("{}-{}", instance_name, x), "nodeA".to_string()); - expected_devices_nodea - .insert(format!("{}-{}", instance_name, x), HEALTHY.to_string()); - expected_devices_nodeb - .insert(format!("{}-{}", instance_name, x), UNHEALTHY.to_string()); - } else { - device_usage.insert(format!("{}-{}", instance_name, x), "".to_string()); - expected_devices_nodea - .insert(format!("{}-{}", instance_name, x), HEALTHY.to_string()); - expected_devices_nodeb - .insert(format!("{}-{}", instance_name, x), HEALTHY.to_string()); - } - } - - // Test shared all healthy - let mut devices: Vec = build_virtual_devices(&device_usage, true, "nodeA"); - for device in devices { - assert_eq!( - expected_devices_nodea.get(&device.id).unwrap(), - &device.health - ); - } - - // Test unshared all healthy - devices = build_virtual_devices(&device_usage, false, "nodeA"); - for device in devices { - assert_eq!( - expected_devices_nodea.get(&device.id).unwrap(), - &device.health - ); - } - - // Test shared some unhealthy (taken by another node) - devices = build_virtual_devices(&device_usage, true, "nodeB"); - for device in devices { - assert_eq!( - expected_devices_nodeb.get(&device.id).unwrap(), - &device.health - ); - } - - // Test unshared panic. A different node should never be listed under any device usage slots - let result = - std::panic::catch_unwind(|| build_virtual_devices(&device_usage, false, "nodeB")); - assert!(result.is_err()); - } - - // Tests when InstanceConnectivityStatus is offline and unhealthy devices are returned - #[tokio::test] - async fn test_build_list_and_watch_response_offline() { - let _ = env_logger::builder().is_test(true).try_init(); - let (device_plugin_service, _device_plugin_service_receivers) = - create_device_plugin_service( - DevicePluginKind::Instance, - InstanceConnectivityStatus::Offline(Instant::now()), - true, - ); - let mock = MockKubeInterface::new(); - let devices = - build_list_and_watch_response(Arc::new(device_plugin_service), Arc::new(mock)) - .await - .unwrap(); - devices - .into_iter() - .for_each(|device| assert!(device.health == UNHEALTHY)); - } - - // Tests when instance has not yet been created for this device, all devices are returned as UNHEALTHY - #[tokio::test] - async fn test_build_list_and_watch_response_no_instance() { - let _ = env_logger::builder().is_test(true).try_init(); - let (device_plugin_service, _device_plugin_service_receivers) = - create_device_plugin_service( - DevicePluginKind::Instance, - InstanceConnectivityStatus::Online, - true, - ); - let instance_name = device_plugin_service.instance_name.clone(); - let instance_namespace = device_plugin_service.config_namespace.clone(); - let mut mock = MockKubeInterface::new(); - mock.expect_find_instance() - .times(1) - .withf(move |name: &str, namespace: &str| { - namespace == instance_namespace && name == instance_name - }) - .returning(move |_, _| Err(get_kube_not_found_error().into())); - let devices = - build_list_and_watch_response(Arc::new(device_plugin_service), Arc::new(mock)) - .await - .unwrap(); - devices - .into_iter() - .for_each(|device| assert!(device.health == UNHEALTHY)); - } - - // Test when instance has already been created and includes this node - #[tokio::test] - async fn test_build_list_and_watch_response_no_instance_update() { - let _ = env_logger::builder().is_test(true).try_init(); - let (device_plugin_service, _device_plugin_service_receivers) = - create_device_plugin_service( - DevicePluginKind::Instance, - InstanceConnectivityStatus::Online, - true, - ); - let instance_name = device_plugin_service.instance_name.clone(); - let instance_namespace = device_plugin_service.config_namespace.clone(); - let mut mock = MockKubeInterface::new(); - configure_find_instance( - &mut mock, - "../test/json/local-instance.json", - instance_name.clone(), - instance_namespace.clone(), - String::new(), - NodeName::ThisNode, - ); - let devices = - build_list_and_watch_response(Arc::new(device_plugin_service), Arc::new(mock)) - .await - .unwrap(); - check_devices(instance_name, devices); - } - fn setup_internal_allocate_tests( mock: &mut MockKubeInterface, device_plugin_service: &DevicePluginService, @@ -1644,4 +1643,138 @@ mod device_plugin_service_tests { ListAndWatchMessageKind::Continue ); } + + // Tests correct device usage is returned when an Instance is found + // Expected behavior: should return correct device usage state for all usage slots + #[tokio::test] + async fn test_get_instance_device_usage_state() { + let _ = env_logger::builder().is_test(true).try_init(); + let node_name = "node-a"; + let instance_name = "instance-1"; + let instance_namespace = "test-namespace"; + let mock_device_usages = vec![(DeviceUsageKind::Free, "".to_string()); 5]; + let capacity = mock_device_usages.len() as i32; + let mut kube_instance_builder = KubeInstanceBuilder::new(instance_name, instance_namespace); + kube_instance_builder.add_node(node_name); + kube_instance_builder.add_device_usages(instance_name, mock_device_usages); + let kube_instance = kube_instance_builder.build(); + let mock_instances = vec![(instance_name.to_string(), kube_instance)]; + let mut mock = MockKubeInterface::new(); + setup_find_instance_with_mock_instances(&mut mock, instance_namespace, mock_instances); + + let device_usage_states = get_instance_device_usage_states( + node_name, + instance_name, + instance_namespace, + &capacity, + Arc::new(mock), + ) + .await; + assert!(device_usage_states + .into_iter() + .all(|(_, v)| { v == DeviceUsageStatus::Free })); + } + + // Tests correct device usage is returned when an Instance is not found + // Expected behavior: should return DeviceUsageStatus::Unknown for all usage slots + #[tokio::test] + async fn test_get_instance_device_usage_state_no_instance() { + let _ = env_logger::builder().is_test(true).try_init(); + let node_name = "node-a"; + let instance_name = "instance-1"; + let instance_namespace = "test-namespace"; + let capacity = 5i32; + let mut mock = MockKubeInterface::new(); + setup_find_instance_with_not_found_err(&mut mock, instance_name, instance_namespace); + + let device_usage_states = get_instance_device_usage_states( + node_name, + instance_name, + instance_namespace, + &capacity, + Arc::new(mock), + ) + .await; + assert!(device_usage_states + .into_iter() + .all(|(_, v)| { v == DeviceUsageStatus::Unknown })); + } + + #[derive(Clone)] + struct KubeInstanceBuilder { + name: String, + namespace: String, + configuration_name: String, + nodes: Vec, + shared: bool, + device_usages: HashMap>, + } + + impl KubeInstanceBuilder { + pub fn new(name: &str, namespace: &str) -> Self { + Self { + name: name.to_string(), + namespace: namespace.to_string(), + configuration_name: String::default(), + nodes: Vec::new(), + shared: true, + device_usages: HashMap::new(), + } + } + + pub fn add_node(&mut self, node: &str) -> &mut Self { + self.nodes.push(node.to_string()); + self + } + + pub fn add_device_usages( + &mut self, + instance_name: &str, + device_usages: Vec<(DeviceUsageKind, String)>, + ) -> &mut Self { + self.device_usages + .entry(instance_name.to_string()) + .or_insert(Vec::new()) + .extend(device_usages); + self + } + + pub fn build(&self) -> Instance { + let instance_json = format!( + r#"{{ + "apiVersion": "akri.sh/v0", + "kind": "Instance", + "metadata": {{ + "name": "{}", + "namespace": "{}", + "uid": "abcdegfh-ijkl-mnop-qrst-uvwxyz012345" + }}, + "spec": {{ + "configurationName": "", + "nodes": [], + "shared": true, + "deviceUsage": {{ + }} + }} + }} + "#, + self.name, self.namespace + ); + let mut instance: Instance = serde_json::from_str(&instance_json).unwrap(); + instance.spec.configuration_name = self.configuration_name.clone(); + instance.spec.nodes = self.nodes.clone(); + instance.spec.shared = self.shared; + instance.spec.device_usage = self + .device_usages + .iter() + .flat_map(|(instance_name, usages)| { + usages.iter().enumerate().map(move |(pos, (kind, node))| { + let key = format!("{}-{}", instance_name, pos); + (key, NodeUsage::create(kind, node).unwrap().to_string()) + }) + }) + .collect::>(); + instance + } + } } diff --git a/agent/src/util/slot_reconciliation.rs b/agent/src/util/slot_reconciliation.rs index f678dd61d..a7c7c335f 100644 --- a/agent/src/util/slot_reconciliation.rs +++ b/agent/src/util/slot_reconciliation.rs @@ -1,4 +1,5 @@ use super::{constants::SLOT_RECONCILIATION_CHECK_DELAY_SECS, crictl_containers}; +use akri_shared::akri::instance::device_usage::NodeUsage; use akri_shared::{akri::instance::InstanceSpec, k8s::KubeInterface}; use async_trait::async_trait; use k8s_openapi::api::core::v1::PodStatus; @@ -6,12 +7,14 @@ use k8s_openapi::api::core::v1::PodStatus; use mockall::{automock, predicate::*}; use std::{ collections::{HashMap, HashSet}, + str::FromStr, sync::{Arc, Mutex}, time::{Duration, Instant}, }; use tokio::process::Command; -type SlotQueryResult = Result, Box>; +type SlotQueryResult = + Result, Box>; #[cfg_attr(test, automock)] #[async_trait] @@ -100,10 +103,13 @@ impl DevicePluginSlotReconciler { ); // Any slot found in use should be scrubbed from our list - node_slot_usage.iter().for_each(|slot| { - trace!("reconcile - remove slot from tracked slots: {:?}", slot); - self.removal_slot_map.lock().unwrap().remove(slot); - }); + { + let mut removal_slot_map_guard = self.removal_slot_map.lock().unwrap(); + node_slot_usage.iter().for_each(|(slot, _)| { + trace!("reconcile - remove slot from tracked slots: {:?}", slot); + removal_slot_map_guard.remove(slot); + }); + } trace!( "reconcile - removal_slot_map after removing node_slot_usage: {:?}", self.removal_slot_map @@ -163,16 +169,22 @@ impl DevicePluginSlotReconciler { .device_usage .iter() .filter_map(|(k, v)| { - if v != node_name && node_slot_usage.contains(k) { + let same_node_name = match NodeUsage::from_str(v) { + Ok(node_usage) => node_usage.is_same_node(node_name), + Err(_) => false, + }; + if !same_node_name { // We need to add node_name to this slot IF // the slot is not labeled with node_name AND // there is a container using that slot on this node - Some(k.to_string()) + node_slot_usage + .get_key_value(k) + .map(|(slot, node_usage)| (slot.to_string(), node_usage.clone())) } else { None } }) - .collect::>(); + .collect::>(); // Check Instance to find slots that are registered to this node, but // there is no actual pod using the slot. We should update the Instance @@ -185,7 +197,11 @@ impl DevicePluginSlotReconciler { .device_usage .iter() .filter_map(|(k, v)| { - if v == node_name && !node_slot_usage.contains(k) { + let same_node_name = match NodeUsage::from_str(v) { + Ok(usage) => usage.is_same_node(node_name), + Err(_) => false, + }; + if same_node_name && !node_slot_usage.contains_key(k) { // We need to clean this slot IF // this slot is handled by this node AND // there are no containers using that slot on this node @@ -233,22 +249,22 @@ impl DevicePluginSlotReconciler { .spec .device_usage .iter() - .map(|(slot, node)| { + .map(|(slot, usage)| { ( slot.to_string(), - if slots_missing_this_node_name.contains(slot) { - // Set this to node_name because there have been + if slots_missing_this_node_name.contains_key(slot) { + // Restore usage because there have been // cases where a Pod is running (which corresponds // to an Allocate call, but the Instance slot is empty. - node_name.into() + slots_missing_this_node_name.get(slot).unwrap().to_string() } else if slots_to_clean.contains(slot) { - // Set this to empty string because there is no + // Set usage to free because there is no // Deallocate message from kubelet for us to know // when a slot is no longer in use - "".into() + NodeUsage::default().to_string() } else { // This slot remains unchanged. - node.into() + usage.into() }, ) }) @@ -349,11 +365,16 @@ pub async fn periodic_slot_reconciliation( #[cfg(test)] mod reconcile_tests { use super::*; + use akri_shared::akri::instance::device_usage::DeviceUsageKind; use akri_shared::{akri::instance::InstanceList, k8s::MockKubeInterface, os::file}; use k8s_openapi::api::core::v1::Pod; use kube::api::ObjectList; - fn configure_get_node_slots(mock: &mut MockSlotQuery, result: HashSet, error: bool) { + fn configure_get_node_slots( + mock: &mut MockSlotQuery, + result: HashMap, + error: bool, + ) { mock.expect_get_node_slots().times(1).returning(move || { if !error { Ok(result.clone()) @@ -387,7 +408,7 @@ mod reconcile_tests { } struct NodeSlots { - node_slots: HashSet, + node_slots: HashMap, node_slots_error: bool, } @@ -466,7 +487,7 @@ mod reconcile_tests { }; configure_scnenario( NodeSlots { - node_slots: HashSet::new(), + node_slots: HashMap::new(), node_slots_error: false, }, "../test/json/shared-instance-list.json", @@ -486,7 +507,7 @@ mod reconcile_tests { }; configure_scnenario( NodeSlots { - node_slots: HashSet::new(), + node_slots: HashMap::new(), node_slots_error: true, }, "", @@ -506,9 +527,15 @@ mod reconcile_tests { }; let grace_period = Duration::from_millis(100); - let mut node_slots = HashSet::new(); - node_slots.insert("config-a-359973-3".to_string()); - node_slots.insert("config-a-359973-5".to_string()); + let mut node_slots = HashMap::new(); + node_slots.insert( + "config-a-359973-3".to_string(), + NodeUsage::create(&DeviceUsageKind::Instance, "node-a").unwrap(), + ); + node_slots.insert( + "config-a-359973-5".to_string(), + NodeUsage::create(&DeviceUsageKind::Instance, "node-a").unwrap(), + ); configure_scnenario( // slot_query to identify one slot used by this node NodeSlots { @@ -545,8 +572,11 @@ mod reconcile_tests { }; let grace_period = Duration::from_millis(100); - let mut node_slots = HashSet::new(); - node_slots.insert("config-a-359973-3".to_string()); + let mut node_slots = HashMap::new(); + node_slots.insert( + "config-a-359973-3".to_string(), + NodeUsage::create(&DeviceUsageKind::Instance, "node-a").unwrap(), + ); configure_scnenario( // slot_query to identify one slot used by this node NodeSlots { @@ -605,8 +635,11 @@ mod reconcile_tests { }; let grace_period = Duration::from_millis(100); - let mut node_slots = HashSet::new(); - node_slots.insert("config-a-359973-3".to_string()); + let mut node_slots = HashMap::new(); + node_slots.insert( + "config-a-359973-3".to_string(), + NodeUsage::create(&DeviceUsageKind::Instance, "node-a").unwrap(), + ); configure_scnenario( // slot_query to identify one slot used by this node NodeSlots { @@ -634,9 +667,15 @@ mod reconcile_tests { std::thread::sleep(grace_period); std::thread::sleep(grace_period); - let mut node_slots_added = HashSet::new(); - node_slots_added.insert("config-a-359973-3".to_string()); - node_slots_added.insert("config-a-359973-5".to_string()); + let mut node_slots_added = HashMap::new(); + node_slots_added.insert( + "config-a-359973-3".to_string(), + NodeUsage::create(&DeviceUsageKind::Instance, "node-a").unwrap(), + ); + node_slots_added.insert( + "config-a-359973-5".to_string(), + NodeUsage::create(&DeviceUsageKind::Instance, "node-a").unwrap(), + ); configure_scnenario( // slot_query to identify one slot used by this node NodeSlots { @@ -668,8 +707,11 @@ mod reconcile_tests { }; let grace_period = Duration::from_millis(100); - let mut node_slots = HashSet::new(); - node_slots.insert("config-a-359973-3".to_string()); + let mut node_slots = HashMap::new(); + node_slots.insert( + "config-a-359973-3".to_string(), + NodeUsage::create(&DeviceUsageKind::Instance, "node-a").unwrap(), + ); configure_scnenario( // slot_query to identify one slot used by this node NodeSlots { @@ -697,9 +739,15 @@ mod reconcile_tests { std::thread::sleep(grace_period); std::thread::sleep(grace_period); - let mut node_slots_added = HashSet::new(); - node_slots_added.insert("config-a-359973-1".to_string()); - node_slots_added.insert("config-a-359973-3".to_string()); + let mut node_slots_added = HashMap::new(); + node_slots_added.insert( + "config-a-359973-1".to_string(), + NodeUsage::create(&DeviceUsageKind::Instance, "node-a").unwrap(), + ); + node_slots_added.insert( + "config-a-359973-3".to_string(), + NodeUsage::create(&DeviceUsageKind::Instance, "node-a").unwrap(), + ); configure_scnenario( // slot_query to identify two slots used by this node NodeSlots { diff --git a/controller/Cargo.toml b/controller/Cargo.toml index 684b23bb2..8678d5cb8 100644 --- a/controller/Cargo.toml +++ b/controller/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "controller" -version = "0.11.4" +version = "0.11.5" authors = ["", ""] edition = "2018" rust-version = "1.68.1" diff --git a/controller/src/util/node_watcher.rs b/controller/src/util/node_watcher.rs index 98079dcab..0b669d089 100644 --- a/controller/src/util/node_watcher.rs +++ b/controller/src/util/node_watcher.rs @@ -1,5 +1,6 @@ use akri_shared::{ akri::{ + instance::device_usage::NodeUsage, instance::{Instance, InstanceSpec}, retry::{random_delay, MAX_INSTANCE_UPDATE_TRIES}, }, @@ -13,6 +14,7 @@ use kube_runtime::watcher::{default_backoff, watcher, Event}; use kube_runtime::WatchStreamExt; use log::{error, info, trace}; use std::collections::HashMap; +use std::str::FromStr; /// Node states that NodeWatcher is interested in /// @@ -295,13 +297,18 @@ impl NodeWatcher { .spec .device_usage .iter() - .map(|(slot, node)| { + .map(|(slot, usage)| { + let same_node_name = match NodeUsage::from_str(usage) { + Ok(node_usage) => node_usage.is_same_node(vanished_node_name), + Err(_) => false, + }; + ( slot.to_string(), - if vanished_node_name == node { - "".into() + if same_node_name { + NodeUsage::default().to_string() } else { - node.into() + usage.into() }, ) }) diff --git a/deployment/helm/Chart.yaml b/deployment/helm/Chart.yaml index f4fc3e6a1..7de5d3145 100644 --- a/deployment/helm/Chart.yaml +++ b/deployment/helm/Chart.yaml @@ -16,9 +16,9 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. # Versions are expected to follow Semantic Versioning (https://semver.org/) -version: 0.11.4 +version: 0.11.5 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. Versions are not expected to # follow Semantic Versioning. They should reflect the version the application is using. -appVersion: 0.11.4 +appVersion: 0.11.5 diff --git a/discovery-handler-modules/debug-echo-discovery-handler/Cargo.toml b/discovery-handler-modules/debug-echo-discovery-handler/Cargo.toml index 0c0bbe9f1..4fa171bdc 100644 --- a/discovery-handler-modules/debug-echo-discovery-handler/Cargo.toml +++ b/discovery-handler-modules/debug-echo-discovery-handler/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "debug-echo-discovery-handler" -version = "0.11.4" +version = "0.11.5" authors = ["Kate Goldenring "] edition = "2018" rust-version = "1.68.1" diff --git a/discovery-handler-modules/onvif-discovery-handler/Cargo.toml b/discovery-handler-modules/onvif-discovery-handler/Cargo.toml index e60660808..470dd7bcd 100644 --- a/discovery-handler-modules/onvif-discovery-handler/Cargo.toml +++ b/discovery-handler-modules/onvif-discovery-handler/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "onvif-discovery-handler" -version = "0.11.4" +version = "0.11.5" authors = ["Kate Goldenring "] edition = "2018" rust-version = "1.68.1" diff --git a/discovery-handler-modules/opcua-discovery-handler/Cargo.toml b/discovery-handler-modules/opcua-discovery-handler/Cargo.toml index 06531ad7e..dd6c2641e 100644 --- a/discovery-handler-modules/opcua-discovery-handler/Cargo.toml +++ b/discovery-handler-modules/opcua-discovery-handler/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "opcua-discovery-handler" -version = "0.11.4" +version = "0.11.5" authors = ["Kate Goldenring "] edition = "2018" rust-version = "1.68.1" diff --git a/discovery-handler-modules/udev-discovery-handler/Cargo.toml b/discovery-handler-modules/udev-discovery-handler/Cargo.toml index f49750c77..481601e2c 100644 --- a/discovery-handler-modules/udev-discovery-handler/Cargo.toml +++ b/discovery-handler-modules/udev-discovery-handler/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "udev-discovery-handler" -version = "0.11.4" +version = "0.11.5" authors = ["Kate Goldenring "] edition = "2018" rust-version = "1.68.1" diff --git a/discovery-handlers/debug-echo/Cargo.toml b/discovery-handlers/debug-echo/Cargo.toml index bca84d1c2..1b22e32da 100644 --- a/discovery-handlers/debug-echo/Cargo.toml +++ b/discovery-handlers/debug-echo/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "akri-debug-echo" -version = "0.11.4" +version = "0.11.5" authors = ["Kate Goldenring "] edition = "2018" rust-version = "1.68.1" diff --git a/discovery-handlers/onvif/Cargo.toml b/discovery-handlers/onvif/Cargo.toml index 0f8be3bb7..7c59106dc 100644 --- a/discovery-handlers/onvif/Cargo.toml +++ b/discovery-handlers/onvif/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "akri-onvif" -version = "0.11.4" +version = "0.11.5" authors = ["Kate Goldenring "] edition = "2018" rust-version = "1.68.1" diff --git a/discovery-handlers/opcua/Cargo.toml b/discovery-handlers/opcua/Cargo.toml index 17d8bba6a..ccf0df3e8 100644 --- a/discovery-handlers/opcua/Cargo.toml +++ b/discovery-handlers/opcua/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "akri-opcua" -version = "0.11.4" +version = "0.11.5" authors = ["Kate Goldenring "] edition = "2018" rust-version = "1.68.1" diff --git a/discovery-handlers/udev/Cargo.toml b/discovery-handlers/udev/Cargo.toml index c2108c9ef..42b818d94 100644 --- a/discovery-handlers/udev/Cargo.toml +++ b/discovery-handlers/udev/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "akri-udev" -version = "0.11.4" +version = "0.11.5" authors = ["Kate Goldenring "] edition = "2018" rust-version = "1.68.1" diff --git a/discovery-utils/Cargo.toml b/discovery-utils/Cargo.toml index c3aa9139f..598a8ce51 100644 --- a/discovery-utils/Cargo.toml +++ b/discovery-utils/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "akri-discovery-utils" -version = "0.11.4" +version = "0.11.5" authors = ["Kate Goldenring "] edition = "2018" rust-version = "1.68.1" diff --git a/samples/brokers/udev-video-broker/Cargo.toml b/samples/brokers/udev-video-broker/Cargo.toml index d55442d60..5dc7bdc98 100644 --- a/samples/brokers/udev-video-broker/Cargo.toml +++ b/samples/brokers/udev-video-broker/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "udev-video-broker" -version = "0.11.4" +version = "0.11.5" authors = ["Kate Goldenring ", ""] edition = "2018" rust-version = "1.68.1" diff --git a/shared/Cargo.toml b/shared/Cargo.toml index 76091b125..51fb22d67 100644 --- a/shared/Cargo.toml +++ b/shared/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "akri-shared" -version = "0.11.4" +version = "0.11.5" authors = [""] edition = "2018" rust-version = "1.68.1" diff --git a/shared/src/akri/instance.rs b/shared/src/akri/instance.rs index a436907b1..f50bd6aeb 100644 --- a/shared/src/akri/instance.rs +++ b/shared/src/akri/instance.rs @@ -323,6 +323,107 @@ fn default_shared() -> bool { false } +pub mod device_usage { + #[derive(PartialEq, Clone, Debug, Default)] + pub enum DeviceUsageKind { + /// Device is free + #[default] + Free, + /// Device is reserved by Instance Device Plugin + Instance, + /// Device is reserved by Configuration Device Plugin + Configuration(String), + } + #[derive(Debug, PartialEq, Eq)] + pub struct ParseNodeUsageError; + #[derive(PartialEq, Clone, Debug, Default)] + pub struct NodeUsage { + kind: DeviceUsageKind, + node_name: String, + } + + impl std::fmt::Display for NodeUsage { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match &self.kind { + DeviceUsageKind::Free => write!(f, ""), + DeviceUsageKind::Configuration(vdev_id) => { + write!(f, "C:{}:{}", vdev_id, self.node_name) + } + DeviceUsageKind::Instance => write!(f, "{}", self.node_name), + } + } + } + + impl std::str::FromStr for NodeUsage { + type Err = ParseNodeUsageError; + fn from_str(s: &str) -> Result { + if s.is_empty() { + return Ok(NodeUsage { + kind: DeviceUsageKind::Free, + node_name: s.to_string(), + }); + } + + // Format "C::" + if let Some((vdev_id, node_name)) = s.strip_prefix("C:").and_then(|s| s.split_once(':')) + { + if node_name.is_empty() { + return Err(ParseNodeUsageError); + } + return Ok(NodeUsage { + kind: DeviceUsageKind::Configuration(vdev_id.to_string()), + node_name: node_name.to_string(), + }); + } + + // Format "" + Ok(NodeUsage { + kind: DeviceUsageKind::Instance, + node_name: s.to_string(), + }) + } + } + + impl NodeUsage { + pub fn create(kind: &DeviceUsageKind, node_name: &str) -> Result { + match kind { + DeviceUsageKind::Free => { + if !node_name.is_empty() { + return Err(anyhow::anyhow!( + "Invalid input parameter, node name: {} provided for free node usage", + node_name + )); + }; + } + _ => { + if node_name.is_empty() { + return Err(anyhow::anyhow!( + "Invalid input parameter, no node name provided for node usage" + )); + }; + } + }; + + Ok(Self { + kind: kind.clone(), + node_name: node_name.into(), + }) + } + + pub fn get_kind(&self) -> DeviceUsageKind { + self.kind.clone() + } + + pub fn get_node_name(&self) -> String { + self.node_name.clone() + } + + pub fn is_same_node(&self, node_name: &str) -> bool { + self.node_name == node_name + } + } +} + #[cfg(test)] mod crd_serializeation_tests { use super::super::super::os::file; diff --git a/version.txt b/version.txt index 35ad34429..62d5dbdf3 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -0.11.4 +0.11.5 diff --git a/webhooks/validating/configuration/Cargo.toml b/webhooks/validating/configuration/Cargo.toml index 4a9108488..eab872d15 100644 --- a/webhooks/validating/configuration/Cargo.toml +++ b/webhooks/validating/configuration/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "webhook-configuration" -version = "0.11.4" +version = "0.11.5" authors = ["DazWilkin "] edition = "2018" rust-version = "1.68.1"