From 916505d5b243025307b3851d3a3fcac22edd7034 Mon Sep 17 00:00:00 2001 From: Sviatoslav Boichuk Date: Mon, 21 Oct 2024 15:29:32 +0300 Subject: [PATCH] Update Redis Health state once detected broken connectiuon with server --- src/cgw_remote_discovery.rs | 159 +++++++++++++++++++++++++----------- 1 file changed, 110 insertions(+), 49 deletions(-) diff --git a/src/cgw_remote_discovery.rs b/src/cgw_remote_discovery.rs index f6f7d70..3905b90 100644 --- a/src/cgw_remote_discovery.rs +++ b/src/cgw_remote_discovery.rs @@ -307,11 +307,11 @@ impl CGWRemoteDiscovery { .arg(format!("{REDIS_KEY_SHARD_ID_PREFIX}{}", app_args.cgw_id)) .query_async(&mut con) .await; - if res.is_err() { - warn!( - "Failed to destroy record about shard in REDIS! Error: {}", - res.err().unwrap() - ); + if let Err(e) = res { + if e.is_io_error() { + Self::set_redis_health_state_not_ready(e.to_string()).await; + } + warn!("Failed to destroy record about shard in REDIS! Error: {e}"); } let res: RedisResult<()> = redis::cmd("HSET") @@ -319,8 +319,11 @@ impl CGWRemoteDiscovery { .arg(redis_req_data.to_redis_args()) .query_async(&mut con) .await; - if res.is_err() { - error!("Can't create CGW Remote Discovery client! Failed to create record about shard in REDIS! Error: {}", res.err().unwrap()); + if let Err(e) = res { + if e.is_io_error() { + Self::set_redis_health_state_not_ready(e.to_string()).await; + } + error!("Can't create CGW Remote Discovery client! Failed to create record about shard in REDIS! Error: {e}"); return Err(Error::RemoteDiscovery( "Failed to create record about shard in REDIS", )); @@ -385,6 +388,9 @@ impl CGWRemoteDiscovery { .await { Err(e) => { + if e.is_io_error() { + Self::set_redis_health_state_not_ready(e.to_string()).await; + } error!("Failed to sync gid to cgw map! Error: {e}"); return Err(Error::RemoteDiscovery("Failed to get KEYS list from REDIS")); } @@ -400,6 +406,9 @@ impl CGWRemoteDiscovery { { Ok(gid) => gid, Err(e) => { + if e.is_io_error() { + Self::set_redis_health_state_not_ready(e.to_string()).await; + } warn!("Found proper key '{key}' entry, but failed to fetch GID from it! Error: {e}"); continue; } @@ -413,6 +422,9 @@ impl CGWRemoteDiscovery { { Ok(shard_id) => shard_id, Err(e) => { + if e.is_io_error() { + Self::set_redis_health_state_not_ready(e.to_string()).await; + } warn!("Found proper key '{key}' entry, but failed to fetch SHARD_ID from it! Error: {e}"); continue; } @@ -457,6 +469,9 @@ impl CGWRemoteDiscovery { { Ok(keys) => keys, Err(e) => { + if e.is_io_error() { + Self::set_redis_health_state_not_ready(e.to_string()).await; + } error!( "Can't sync remote CGW map! Failed to get shard record in REDIS! Error: {e}" ); @@ -487,6 +502,9 @@ impl CGWRemoteDiscovery { lock.insert(cgw_iface.shard.id, cgw_iface); } Err(e) => { + if e.is_io_error() { + Self::set_redis_health_state_not_ready(e.to_string()).await; + } warn!("Found proper key '{key}' entry, but failed to fetch Shard info from it! Error: {e}"); continue; } @@ -529,11 +547,11 @@ impl CGWRemoteDiscovery { .arg("1") .query_async(&mut con) .await; - if res.is_err() { - error!( - "Failed to increment assigned groups number! Error: {}", - res.err().unwrap() - ); + if let Err(e) = res { + if e.is_io_error() { + Self::set_redis_health_state_not_ready(e.to_string()).await; + } + error!("Failed to increment assigned groups number! Error: {e}"); return Err(Error::RemoteDiscovery( "Failed to increment assigned groups number", )); @@ -558,11 +576,11 @@ impl CGWRemoteDiscovery { .arg("-1") .query_async(&mut con) .await; - if res.is_err() { - error!( - "Failed to decrement assigned groups number! Error: {}", - res.err().unwrap() - ); + if let Err(e) = res { + if e.is_io_error() { + Self::set_redis_health_state_not_ready(e.to_string()).await; + } + error!("Failed to decrement assigned groups number! Error: {e}"); return Err(Error::RemoteDiscovery( "Failed to decrement assigned groups number", )); @@ -592,11 +610,11 @@ impl CGWRemoteDiscovery { .arg(&incremet_value.to_string()) .query_async(&mut con) .await; - if res.is_err() { - error!( - "Failed to increment assigned infras number! Error: {}", - res.err().unwrap() - ); + if let Err(e) = res { + if e.is_io_error() { + Self::set_redis_health_state_not_ready(e.to_string()).await; + } + error!("Failed to increment assigned infras number! Error: {e}"); return Err(Error::RemoteDiscovery( "Failed to increment assigned infras number", )); @@ -627,11 +645,11 @@ impl CGWRemoteDiscovery { .arg(&(-decremet_value).to_string()) .query_async(&mut con) .await; - if res.is_err() { - error!( - "Failed to decrement assigned infras number! Error: {}", - res.err().unwrap() - ); + if let Err(e) = res { + if e.is_io_error() { + Self::set_redis_health_state_not_ready(e.to_string()).await; + } + error!("Failed to decrement assigned infras number! Error: {e}"); return Err(Error::RemoteDiscovery( "Failed to decrement assigned infras number", )); @@ -734,13 +752,11 @@ impl CGWRemoteDiscovery { .query_async(&mut con) .await; - if res.is_err() { - error!( - "Failed to assign infra group {} to cgw {}! Error: {}", - gid, - dst_cgw_id, - res.err().unwrap() - ); + if let Err(e) = res { + if e.is_io_error() { + Self::set_redis_health_state_not_ready(e.to_string()).await; + } + error!("Failed to assign infra group {gid} to cgw {dst_cgw_id}! Error: {e}"); return Err(Error::RemoteDiscovery( "Failed to assign infra group to cgw", )); @@ -760,12 +776,11 @@ impl CGWRemoteDiscovery { .query_async(&mut con) .await; - if res.is_err() { - error!( - "Failed to deassign infra group {}! Error: {}", - gid, - res.err().unwrap() - ); + if let Err(e) = res { + if e.is_io_error() { + Self::set_redis_health_state_not_ready(e.to_string()).await; + } + error!("Failed to deassign infra group {gid}! Error: {e}"); return Err(Error::RemoteDiscovery( "Failed to deassign infra group to cgw", )); @@ -1143,11 +1158,11 @@ impl CGWRemoteDiscovery { .arg("0") .query_async(&mut con) .await; - if res.is_err() { - warn!( - "Failed to reset CGW{cgw_id} assigned group num count! Error: {}", - res.err().unwrap() - ); + if let Err(e) = res { + if e.is_io_error() { + Self::set_redis_health_state_not_ready(e.to_string()).await; + } + warn!("Failed to reset CGW{cgw_id} assigned group num count! Error: {e}"); } } @@ -1213,10 +1228,15 @@ impl CGWRemoteDiscovery { "Successfully cleaned up Redis for shard id {}", self.local_shard_id ), - Err(e) => error!( - "Failed to cleanup Redis for shard id {}! Error: {}", - self.local_shard_id, e - ), + Err(e) => { + if e.is_io_error() { + Self::set_redis_health_state_not_ready(e.to_string()).await; + } + error!( + "Failed to cleanup Redis for shard id {}! Error: {}", + self.local_shard_id, e + ); + } } } @@ -1231,6 +1251,9 @@ impl CGWRemoteDiscovery { { Ok(cap) => cap, Err(e) => { + if e.is_io_error() { + Self::set_redis_health_state_not_ready(e.to_string()).await; + } warn!("Failed to get infras capacity for GID {gid}! Ereor: {e}"); return Err(Error::RemoteDiscovery("Failed to get infras capacity")); } @@ -1250,6 +1273,9 @@ impl CGWRemoteDiscovery { { Ok(cap) => cap, Err(e) => { + if e.is_io_error() { + Self::set_redis_health_state_not_ready(e.to_string()).await; + } warn!("Failed to get infras assigned number for GID {gid}! Error: {e}"); return Err(Error::RemoteDiscovery( "Failed to get group infras assigned number", @@ -1270,6 +1296,9 @@ impl CGWRemoteDiscovery { match res { Ok(_) => debug!("Switched to Redis Database {database_id}"), Err(e) => { + if e.is_io_error() { + Self::set_redis_health_state_not_ready(e.to_string()).await; + } warn!("Failed to switch to Redis Database {database_id}! Error: {e}"); return Err(Error::RemoteDiscovery("Failed to switch Redis Database")); } @@ -1291,6 +1320,9 @@ impl CGWRemoteDiscovery { match res { Ok(_) => debug!("Added device to Redis cache: {device_json}"), Err(e) => { + if e.is_io_error() { + Self::set_redis_health_state_not_ready(e.to_string()).await; + } warn!("Failed to add device to Redis cache! Error: {e}"); return Err(Error::RemoteDiscovery( "Failed to add device to Redis cache", @@ -1310,6 +1342,9 @@ impl CGWRemoteDiscovery { match res { Ok(_) => debug!("Removed device from Redis cache: {}", device_mac), Err(e) => { + if e.is_io_error() { + Self::set_redis_health_state_not_ready(e.to_string()).await; + } warn!( "Failed to remove device {} from Redis cache! Error: {e}", device_mac.to_hex_string() @@ -1362,6 +1397,9 @@ impl CGWRemoteDiscovery { let redis_keys: Vec = match redis::cmd("KEYS").arg(&key).query_async(&mut con).await { Err(e) => { + if e.is_io_error() { + Self::set_redis_health_state_not_ready(e.to_string()).await; + } error!( "Failed to get devices cache from Redis for shard id {}, Error: {}", self.local_shard_id, e @@ -1377,6 +1415,9 @@ impl CGWRemoteDiscovery { let device_str: String = match redis::cmd("GET").arg(&key).query_async(&mut con).await { Ok(dev) => dev, Err(e) => { + if e.is_io_error() { + Self::set_redis_health_state_not_ready(e.to_string()).await; + } error!( "Failed to get devices cache from Redis for shard id {}, Error: {}", self.local_shard_id, e @@ -1449,6 +1490,9 @@ impl CGWRemoteDiscovery { .await { Err(e) => { + if e.is_io_error() { + Self::set_redis_health_state_not_ready(e.to_string()).await; + } error!( "Failed to get devices cache from Redis for shard id {}, Error: {}", self.local_shard_id, e @@ -1498,6 +1542,9 @@ impl CGWRemoteDiscovery { match res { Ok(_) => debug!("Updated Redis timestamp: {timestamp}"), Err(e) => { + if e.is_io_error() { + Self::set_redis_health_state_not_ready(e.to_string()).await; + } warn!("Failed update Redis timestamp! Error: {e}"); return Err(Error::RemoteDiscovery("Failed update Redis timestamp")); } @@ -1516,6 +1563,9 @@ impl CGWRemoteDiscovery { { Ok(timestamp) => timestamp, Err(e) => { + if e.is_io_error() { + Self::set_redis_health_state_not_ready(e.to_string()).await; + } error!("Failed to get Redis last update timestamp! Error: {}", e); return Err(Error::RemoteDiscovery( "Failed to get Redis last update timestamp", @@ -1525,4 +1575,15 @@ impl CGWRemoteDiscovery { Ok(last_update_timestamp) } + + pub async fn set_redis_health_state_not_ready(error_message: String) { + tokio::spawn(async move { + CGWMetrics::get_ref() + .change_component_health_status( + CGWMetricsHealthComponent::RedisConnection, + CGWMetricsHealthComponentStatus::NotReady(error_message), + ) + .await; + }); + } }