diff --git a/src/cgw_connection_server.rs b/src/cgw_connection_server.rs index b401b64..8635ebe 100644 --- a/src/cgw_connection_server.rs +++ b/src/cgw_connection_server.rs @@ -394,11 +394,18 @@ impl CGWConnectionServer { queue_lock.start_queue_timeout_manager().await; }); - // Sync RAM cache with PostgressDB. - server + if let Err(e) = server.cgw_remote_discovery.sync_devices_cache().await { + error!("Failed to sync infras with Redis devices cache! Error: {e}"); + } + + // Sync RAM cache with Redis. + if let Err(e) = server .cgw_remote_discovery - .sync_device_to_gid_cache(server.devices_cache.clone()) - .await; + .sync_devices_cache_with_redis(server.devices_cache.clone()) + .await + { + error!("Failed to sync Device cache! Error: {e}"); + } tokio::spawn(async move { CGWMetrics::get_ref() @@ -630,6 +637,17 @@ impl CGWConnectionServer { // This only means that original capacity of all buffers is allocated to <100>, // it can still increase on demand or need automatically (upon insert, push_back etc) let cgw_buf_prealloc_size = 100; + let mut last_update_timestamp: i64 = match self + .cgw_remote_discovery + .get_redis_last_update_timestamp() + .await + { + Ok(timestamp) => timestamp, + Err(e) => { + error!("{e}"); + 0i64 + } + }; loop { if num_of_msg_read < buf_capacity { @@ -658,6 +676,40 @@ impl CGWConnectionServer { }; num_of_msg_read += rd_num; + if rd_num == 0 { + let curretn_timestamp: i64 = match self + .cgw_remote_discovery + .get_redis_last_update_timestamp() + .await + { + Ok(timestamp) => timestamp, + Err(e) => { + error!("{e}"); + 0i64 + } + }; + + if last_update_timestamp != curretn_timestamp { + if let Err(e) = self.cgw_remote_discovery.sync_gid_to_cgw_map().await { + error!("process_internal_nb_api_mbox: failed to sync GID to CGW map! Error: {e}"); + } + + if let Err(e) = self.cgw_remote_discovery.sync_devices_cache().await { + error!("process_internal_nb_api_mbox: failed to sync Devices cache! Error: {e}"); + } + + if let Err(e) = self + .cgw_remote_discovery + .sync_devices_cache_with_redis(self.devices_cache.clone()) + .await + { + error!("Failed to sync Device cache! Error: {e}"); + } + + last_update_timestamp = curretn_timestamp; + } + } + // We read some messages, try to continue and read more // If none read - break from recv, process all buffers that've // been filled-up so far (both local and remote). @@ -676,9 +728,6 @@ impl CGWConnectionServer { // This is done to ensure that we don't fallback for redis too much, // but still somewhat fully rely on it. // - if let Err(e) = self.cgw_remote_discovery.sync_gid_to_cgw_map().await { - error!("process_internal_nb_api_mbox: failed to sync GID to CGW map Error: {e}"); - } // TODO: rework to avoid re-allocating these buffers on each loop iteration // (get mut slice of vec / clear when done?) @@ -1529,6 +1578,20 @@ impl CGWConnectionServer { } } device.update_device_capabilities(&caps); + match serde_json::to_string(device) { + Ok(device_json) => { + if let Err(e) = self + .cgw_remote_discovery + .add_device_to_redis_cache(&device_mac, &device_json) + .await + { + error!("{e}"); + } + } + Err(e) => { + error!("Failed to serialize device to json string! Error: {e}"); + } + } } else { let default_caps: CGWDeviceCapabilities = Default::default(); let changes = cgw_detect_device_chages(&default_caps, &caps); @@ -1552,16 +1615,29 @@ impl CGWConnectionServer { } } - devices_cache.add_device( - &device_mac, - &CGWDevice::new( - device_type, - CGWDeviceState::CGWDeviceConnected, - 0, - false, - caps, - ), + let device: CGWDevice = CGWDevice::new( + device_type, + CGWDeviceState::CGWDeviceConnected, + 0, + false, + caps, ); + devices_cache.add_device(&device_mac, &device); + + match serde_json::to_string(&device) { + Ok(device_json) => { + if let Err(e) = self + .cgw_remote_discovery + .add_device_to_redis_cache(&device_mac, &device_json) + .await + { + error!("{e}"); + } + } + Err(e) => { + error!("Failed to serialize device to json string! Error: {e}"); + } + } if let Ok(resp) = cgw_construct_unassigned_infra_connection_msg( device_mac, @@ -1613,8 +1689,30 @@ impl CGWConnectionServer { device_group_id = device.get_device_group_id(); if device.get_device_remains_in_db() { device.set_device_state(CGWDeviceState::CGWDeviceDisconnected); + + match serde_json::to_string(device) { + Ok(device_json) => { + if let Err(e) = self + .cgw_remote_discovery + .add_device_to_redis_cache(&device_mac, &device_json) + .await + { + error!("{e}"); + } + } + Err(e) => { + error!("Failed to serialize device to json string! Error: {e}"); + } + } } else { devices_cache.del_device(&device_mac); + if let Err(e) = self + .cgw_remote_discovery + .del_device_from_redis_cache(&device_mac) + .await + { + error!("{e}"); + } } } diff --git a/src/cgw_devices_cache.rs b/src/cgw_devices_cache.rs index dfcc0b3..e426c94 100644 --- a/src/cgw_devices_cache.rs +++ b/src/cgw_devices_cache.rs @@ -88,6 +88,10 @@ impl CGWDevicesCache { } } + pub fn flush_all(&mut self) { + self.cache.clear(); + } + pub fn dump_devices_cache(&self) { // Debug print - simply ignore errors if any! if let Ok(json_output) = serde_json::to_string_pretty(&self) { diff --git a/src/cgw_remote_discovery.rs b/src/cgw_remote_discovery.rs index 4867694..f6f7d70 100644 --- a/src/cgw_remote_discovery.rs +++ b/src/cgw_remote_discovery.rs @@ -16,6 +16,7 @@ use crate::{ use std::{ collections::HashMap, net::{Ipv4Addr, SocketAddr}, + str::FromStr, sync::Arc, time::Duration, }; @@ -29,6 +30,8 @@ use eui48::MacAddress; use tokio::sync::RwLock; +use chrono::Utc; + // Used in remote lookup static REDIS_KEY_SHARD_ID_PREFIX: &str = "shard_id_"; static REDIS_KEY_SHARD_ID_FIELDS_NUM: usize = 12; @@ -41,6 +44,9 @@ static REDIS_KEY_GID_VALUE_SHARD_ID: &str = "shard_id"; static REDIS_KEY_GID_VALUE_INFRAS_CAPACITY: &str = "infras_capacity"; static REDIS_KEY_GID_VALUE_INFRAS_ASSIGNED: &str = "infras_assigned"; +const CGW_REDIS_DEFAULT_DB: u32 = 0; +const CGW_REDIS_DEVICES_CACHE_DB: u32 = 1; + #[derive(Clone, Debug, Default, PartialEq)] pub struct CGWREDISDBShard { id: i32, @@ -248,6 +254,11 @@ impl CGWRemoteDiscovery { remote_cgws_map: Arc::new(RwLock::new(HashMap::new())), }; + if let Err(e) = rc.set_redis_last_update_timestamp().await { + error!("Can't create CGW Remote Discovery client! Failed to update Redis timestamp! Error: {e}"); + return Err(Error::RemoteDiscovery("Failed to update Redis timestamp")); + } + let assigned_groups_num: i32 = match rc.sync_gid_to_cgw_map().await { Ok(assigned_groups) => assigned_groups, Err(e) => { @@ -432,31 +443,6 @@ impl CGWRemoteDiscovery { Ok(local_cgw_gid_num as i32) } - pub async fn sync_device_to_gid_cache(&self, cache: Arc>) { - if let Some(groups_infra) = self.db_accessor.get_all_infras().await { - let mut devices_cache = cache.write().await; - for item in groups_infra.iter() { - devices_cache.add_device( - &item.mac, - &CGWDevice::new( - CGWDeviceType::default(), - CGWDeviceState::CGWDeviceDisconnected, - item.infra_group_id, - true, - Default::default(), - ), - ); - CGWMetrics::get_ref() - .change_group_counter( - item.infra_group_id, - CGWMetricsCounterType::GroupInfrasAssignedNum, - CGWMetricsCounterOpType::Inc, - ) - .await; - } - } - } - async fn sync_remote_cgw_map(&self) -> Result<()> { let mut lock = self.remote_cgws_map.write().await; @@ -715,7 +701,7 @@ impl CGWRemoteDiscovery { ) -> Result { // Delete key (if exists), recreate with new owner if let Err(e) = self.deassign_infra_group_to_cgw(gid).await { - error!("destroy_infra_group: failed to deassign infra group to CGW! Error: {e}"); + error!("assign_infra_group_to_cgw: failed to deassign infra group to CGW! Error: {e}"); } // Sync CGWs to get lates data @@ -814,7 +800,15 @@ impl CGWRemoteDiscovery { } }; - self.increment_cgw_assigned_groups_num(shard_id).await?; + if let Err(e) = self.increment_cgw_assigned_groups_num(shard_id).await { + error!( + "create_infra_group: failed to decrement assigned groups num to CGW! Error: {e}" + ); + }; + + if let Err(e) = self.set_redis_last_update_timestamp().await { + error!("create_infra_group: failed update Redis timestamp! Error: {e}"); + } Ok(shard_id) } @@ -832,18 +826,24 @@ impl CGWRemoteDiscovery { if let Err(e) = self.decrement_cgw_assigned_groups_num(id).await { error!("destroy_infra_group: failed to decrement assigned groups num to CGW! Error: {e}"); } + + if let Err(e) = self.set_redis_last_update_timestamp().await { + error!("destroy_infra_group: failed to update Redis timestamp! Error: {e}"); + } } //TODO: transaction-based insert/assigned_group_num update (DB) self.db_accessor.delete_infra_group(gid).await?; let mut devices_to_remove: Vec = Vec::new(); + let mut devices_to_update: Vec<(MacAddress, CGWDevice)> = Vec::new(); let mut device_cache = cache.write().await; for (key, device) in device_cache.iter_mut() { if device.get_device_group_id() == gid { if device.get_device_state() == CGWDeviceState::CGWDeviceConnected { device.set_device_remains_in_db(false); device.set_device_group_id(0); + devices_to_update.push((key.clone(), device.clone())); } else { devices_to_remove.push(*key); } @@ -852,6 +852,22 @@ impl CGWRemoteDiscovery { for key in devices_to_remove.iter() { device_cache.del_device(key); + if let Err(e) = self.del_device_from_redis_cache(key).await { + error!("{e}"); + } + } + + for (mac, device) in devices_to_update { + match serde_json::to_string(&device) { + Ok(device_json) => { + if let Err(e) = self.add_device_to_redis_cache(&mac, &device_json).await { + error!("{e}"); + } + } + Err(e) => { + error!("Failed to serialize device to json string! Error: {e}"); + } + } } CGWMetrics::get_ref().delete_group_counter(gid).await; @@ -920,17 +936,43 @@ impl CGWRemoteDiscovery { if let Some(device) = devices_cache.get_device_mut(&device_mac) { device.set_device_group_id(gid); device.set_device_remains_in_db(true); + + match serde_json::to_string(device) { + Ok(device_json) => { + if let Err(e) = self + .add_device_to_redis_cache(&device_mac, &device_json) + .await + { + error!("{e}"); + } + } + Err(e) => { + error!("Failed to serialize device to json string! Error: {e}"); + } + } } else { - devices_cache.add_device( - &device_mac, - &CGWDevice::new( - CGWDeviceType::default(), - CGWDeviceState::CGWDeviceDisconnected, - gid, - true, - Default::default(), - ), + let device = CGWDevice::new( + CGWDeviceType::default(), + CGWDeviceState::CGWDeviceDisconnected, + gid, + true, + Default::default(), ); + devices_cache.add_device(&device_mac, &device); + + match serde_json::to_string(&device) { + Ok(device_json) => { + if let Err(e) = self + .add_device_to_redis_cache(&device_mac, &device_json) + .await + { + error!("{e}"); + } + } + Err(e) => { + error!("Failed to serialize device to json string! Error: {e}"); + } + } } assigned_infras_num += 1; } @@ -991,8 +1033,28 @@ impl CGWRemoteDiscovery { if device.get_device_state() == CGWDeviceState::CGWDeviceConnected { device.set_device_remains_in_db(false); device.set_device_group_id(0); + + match serde_json::to_string(device) { + Ok(device_json) => { + if let Err(e) = self + .add_device_to_redis_cache(&device_mac, &device_json) + .await + { + error!("{e}"); + } + } + Err(e) => { + error!( + "Failed to serialize device to json string! Error: {e}" + ); + } + } } else { devices_cache.del_device(&device_mac); + if let Err(e) = self.del_device_from_redis_cache(&device_mac).await + { + error!("{e}"); + } } } removed_infras += 1; @@ -1121,6 +1183,10 @@ impl CGWRemoteDiscovery { } } + if let Err(e) = self.set_redis_last_update_timestamp().await { + error!("rebalance_all_groups: failed update Redis timestamp! Error: {e}"); + } + if let Err(e) = self.sync_remote_cgw_map().await { error!("rebalance_all_groups: failed to sync remote CGW map! Error: {e}"); } @@ -1193,4 +1259,270 @@ impl CGWRemoteDiscovery { Ok(infras_assigned) } + + async fn switch_database(&self, database_id: u32) -> Result<()> { + let mut con = self.redis_client.clone(); + + let res: RedisResult<()> = redis::cmd("SELECT") + .arg(database_id.to_string()) + .query_async(&mut con) + .await; + match res { + Ok(_) => debug!("Switched to Redis Database {database_id}"), + Err(e) => { + warn!("Failed to switch to Redis Database {database_id}! Error: {e}"); + return Err(Error::RemoteDiscovery("Failed to switch Redis Database")); + } + }; + + Ok(()) + } + + async fn add_device_to_redis(&self, device_mac: &MacAddress, device_json: &str) -> Result<()> { + let mut con = self.redis_client.clone(); + + let key = format!("shard_id_{}|{}", self.local_shard_id, device_mac); + let res: RedisResult<()> = redis::cmd("SET") + .arg(&key) + .arg(device_json) + .query_async(&mut con) + .await; + + match res { + Ok(_) => debug!("Added device to Redis cache: {device_json}"), + Err(e) => { + warn!("Failed to add device to Redis cache! Error: {e}"); + return Err(Error::RemoteDiscovery( + "Failed to add device to Redis cache", + )); + } + }; + + Ok(()) + } + + async fn del_device_from_redis(&self, device_mac: &MacAddress) -> Result<()> { + let mut con = self.redis_client.clone(); + + let key = format!("shard_id_{}|{}", self.local_shard_id, device_mac); + let res: RedisResult<()> = redis::cmd("DEL").arg(&key).query_async(&mut con).await; + + match res { + Ok(_) => debug!("Removed device from Redis cache: {}", device_mac), + Err(e) => { + warn!( + "Failed to remove device {} from Redis cache! Error: {e}", + device_mac.to_hex_string() + ); + return Err(Error::RemoteDiscovery( + "Failed to update Redis devices cache", + )); + } + }; + + Ok(()) + } + + pub async fn add_device_to_redis_cache( + &self, + device_mac: &MacAddress, + device_json: &str, + ) -> Result<()> { + self.switch_database(CGW_REDIS_DEVICES_CACHE_DB).await?; + + self.add_device_to_redis(device_mac, device_json).await?; + + self.switch_database(CGW_REDIS_DEFAULT_DB).await?; + + Ok(()) + } + + pub async fn del_device_from_redis_cache(&self, device_mac: &MacAddress) -> Result<()> { + self.switch_database(CGW_REDIS_DEVICES_CACHE_DB).await?; + + self.del_device_from_redis(device_mac).await?; + + self.switch_database(CGW_REDIS_DEFAULT_DB).await?; + + Ok(()) + } + + pub async fn sync_devices_cache_with_redis( + &self, + cache: Arc>, + ) -> Result<()> { + // flush cache + let mut devices_cache = cache.write().await; + devices_cache.flush_all(); + + self.switch_database(CGW_REDIS_DEVICES_CACHE_DB).await?; + + let mut con = self.redis_client.clone(); + let key = format!("shard_id_{}|*", self.local_shard_id); + let redis_keys: Vec = match redis::cmd("KEYS").arg(&key).query_async(&mut con).await + { + Err(e) => { + error!( + "Failed to get devices cache from Redis for shard id {}, Error: {}", + self.local_shard_id, e + ); + return Err(Error::RemoteDiscovery( + "Failed to get devices cache from Redis", + )); + } + Ok(keys) => keys, + }; + + for key in redis_keys { + let device_str: String = match redis::cmd("GET").arg(&key).query_async(&mut con).await { + Ok(dev) => dev, + Err(e) => { + error!( + "Failed to get devices cache from Redis for shard id {}, Error: {}", + self.local_shard_id, e + ); + return Err(Error::RemoteDiscovery( + "Failed to get devices cache from Redis", + )); + } + }; + + let mut splitted_key = key.split_terminator("|"); + let _shard_id = splitted_key.next(); + let device_mac = match splitted_key.next() { + Some(mac) => match MacAddress::from_str(mac) { + Ok(mac_address) => mac_address, + Err(e) => { + error!( + "Failed to parse device mac address from key {}! Error: {}", + self.local_shard_id, e + ); + return Err(Error::RemoteDiscovery( + "Failed to parse device mac address from key", + )); + } + }, + None => { + error!( + "Failed to get device mac address from key {}!", + self.local_shard_id, + ); + return Err(Error::RemoteDiscovery( + "Failed to get device mac address from key", + )); + } + }; + + match serde_json::from_str(&device_str) { + Ok(dev) => { + devices_cache.add_device(&device_mac, &dev); + CGWMetrics::get_ref() + .change_group_counter( + dev.get_device_group_id(), + CGWMetricsCounterType::GroupInfrasAssignedNum, + CGWMetricsCounterOpType::Inc, + ) + .await; + } + Err(e) => { + error!("Failed to deserialize device from Redis cache! Error: {e}"); + return Err(Error::RemoteDiscovery( + "Failed to deserialize device from Redis cache", + )); + } + }; + } + + self.switch_database(CGW_REDIS_DEFAULT_DB).await?; + + Ok(()) + } + + pub async fn sync_devices_cache(&self) -> Result<()> { + if let Some(infras_list) = self.db_accessor.get_all_infras().await { + self.switch_database(CGW_REDIS_DEVICES_CACHE_DB).await?; + + let mut con = self.redis_client.clone(); + let mut redis_keys: Vec = match redis::cmd("KEYS") + .arg(&format!("shard_id_{}|*", self.local_shard_id)) + .query_async(&mut con) + .await + { + Err(e) => { + error!( + "Failed to get devices cache from Redis for shard id {}, Error: {}", + self.local_shard_id, e + ); + return Err(Error::RemoteDiscovery( + "Failed to get devices cache from Redis", + )); + } + Ok(keys) => keys, + }; + + for infra in infras_list { + redis_keys.retain(|key| { + !key.contains(&format!("shard_id_{}|{}", self.local_shard_id, infra.mac)) + }); + } + + for key in redis_keys { + let res: RedisResult<()> = redis::cmd("DEL").arg(&key).query_async(&mut con).await; + if res.is_err() { + warn!( + "Failed to delete cache entry {}! Error: {}", + key, + res.err().unwrap() + ); + } + } + + self.switch_database(CGW_REDIS_DEFAULT_DB).await?; + } + + Ok(()) + } + + pub async fn set_redis_last_update_timestamp(&self) -> Result<()> { + // Generate current UTC timestamp + let mut con = self.redis_client.clone(); + let now = Utc::now(); + let timestamp = now.timestamp(); // Get seconds since the UNIX epoch + + let res: RedisResult<()> = redis::cmd("SET") + .arg("last_update_timestamp") + .arg(timestamp) + .query_async(&mut con) + .await; + + match res { + Ok(_) => debug!("Updated Redis timestamp: {timestamp}"), + Err(e) => { + warn!("Failed update Redis timestamp! Error: {e}"); + return Err(Error::RemoteDiscovery("Failed update Redis timestamp")); + } + }; + + Ok(()) + } + + pub async fn get_redis_last_update_timestamp(&self) -> Result { + let mut con = self.redis_client.clone(); + + let last_update_timestamp: i64 = match redis::cmd("GET") + .arg("last_update_timestamp") + .query_async(&mut con) + .await + { + Ok(timestamp) => timestamp, + Err(e) => { + error!("Failed to get Redis last update timestamp! Error: {}", e); + return Err(Error::RemoteDiscovery( + "Failed to get Redis last update timestamp", + )); + } + }; + + Ok(last_update_timestamp) + } }