Skip to content

Commit

Permalink
Merge pull request #96 from Telecominfraproject/dev-redis-health-stat…
Browse files Browse the repository at this point in the history
…e-update

Update Redis Health state once detected broken connectiuon with server
  • Loading branch information
Cahb authored Oct 22, 2024
2 parents e374a99 + 916505d commit 678aa34
Showing 1 changed file with 110 additions and 49 deletions.
159 changes: 110 additions & 49 deletions src/cgw_remote_discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,20 +307,23 @@ impl CGWRemoteDiscovery {
.arg(format!("{REDIS_KEY_SHARD_ID_PREFIX}{}", app_args.cgw_id))
.query_async(&mut con)
.await;
if res.is_err() {
warn!(
"Failed to destroy record about shard in REDIS! Error: {}",
res.err().unwrap()
);
if let Err(e) = res {
if e.is_io_error() {
Self::set_redis_health_state_not_ready(e.to_string()).await;
}
warn!("Failed to destroy record about shard in REDIS! Error: {e}");
}

let res: RedisResult<()> = redis::cmd("HSET")
.arg(format!("{REDIS_KEY_SHARD_ID_PREFIX}{}", app_args.cgw_id))
.arg(redis_req_data.to_redis_args())
.query_async(&mut con)
.await;
if res.is_err() {
error!("Can't create CGW Remote Discovery client! Failed to create record about shard in REDIS! Error: {}", res.err().unwrap());
if let Err(e) = res {
if e.is_io_error() {
Self::set_redis_health_state_not_ready(e.to_string()).await;
}
error!("Can't create CGW Remote Discovery client! Failed to create record about shard in REDIS! Error: {e}");
return Err(Error::RemoteDiscovery(
"Failed to create record about shard in REDIS",
));
Expand Down Expand Up @@ -385,6 +388,9 @@ impl CGWRemoteDiscovery {
.await
{
Err(e) => {
if e.is_io_error() {
Self::set_redis_health_state_not_ready(e.to_string()).await;
}
error!("Failed to sync gid to cgw map! Error: {e}");
return Err(Error::RemoteDiscovery("Failed to get KEYS list from REDIS"));
}
Expand All @@ -400,6 +406,9 @@ impl CGWRemoteDiscovery {
{
Ok(gid) => gid,
Err(e) => {
if e.is_io_error() {
Self::set_redis_health_state_not_ready(e.to_string()).await;
}
warn!("Found proper key '{key}' entry, but failed to fetch GID from it! Error: {e}");
continue;
}
Expand All @@ -413,6 +422,9 @@ impl CGWRemoteDiscovery {
{
Ok(shard_id) => shard_id,
Err(e) => {
if e.is_io_error() {
Self::set_redis_health_state_not_ready(e.to_string()).await;
}
warn!("Found proper key '{key}' entry, but failed to fetch SHARD_ID from it! Error: {e}");
continue;
}
Expand Down Expand Up @@ -457,6 +469,9 @@ impl CGWRemoteDiscovery {
{
Ok(keys) => keys,
Err(e) => {
if e.is_io_error() {
Self::set_redis_health_state_not_ready(e.to_string()).await;
}
error!(
"Can't sync remote CGW map! Failed to get shard record in REDIS! Error: {e}"
);
Expand Down Expand Up @@ -487,6 +502,9 @@ impl CGWRemoteDiscovery {
lock.insert(cgw_iface.shard.id, cgw_iface);
}
Err(e) => {
if e.is_io_error() {
Self::set_redis_health_state_not_ready(e.to_string()).await;
}
warn!("Found proper key '{key}' entry, but failed to fetch Shard info from it! Error: {e}");
continue;
}
Expand Down Expand Up @@ -529,11 +547,11 @@ impl CGWRemoteDiscovery {
.arg("1")
.query_async(&mut con)
.await;
if res.is_err() {
error!(
"Failed to increment assigned groups number! Error: {}",
res.err().unwrap()
);
if let Err(e) = res {
if e.is_io_error() {
Self::set_redis_health_state_not_ready(e.to_string()).await;
}
error!("Failed to increment assigned groups number! Error: {e}");
return Err(Error::RemoteDiscovery(
"Failed to increment assigned groups number",
));
Expand All @@ -558,11 +576,11 @@ impl CGWRemoteDiscovery {
.arg("-1")
.query_async(&mut con)
.await;
if res.is_err() {
error!(
"Failed to decrement assigned groups number! Error: {}",
res.err().unwrap()
);
if let Err(e) = res {
if e.is_io_error() {
Self::set_redis_health_state_not_ready(e.to_string()).await;
}
error!("Failed to decrement assigned groups number! Error: {e}");
return Err(Error::RemoteDiscovery(
"Failed to decrement assigned groups number",
));
Expand Down Expand Up @@ -592,11 +610,11 @@ impl CGWRemoteDiscovery {
.arg(&incremet_value.to_string())
.query_async(&mut con)
.await;
if res.is_err() {
error!(
"Failed to increment assigned infras number! Error: {}",
res.err().unwrap()
);
if let Err(e) = res {
if e.is_io_error() {
Self::set_redis_health_state_not_ready(e.to_string()).await;
}
error!("Failed to increment assigned infras number! Error: {e}");
return Err(Error::RemoteDiscovery(
"Failed to increment assigned infras number",
));
Expand Down Expand Up @@ -627,11 +645,11 @@ impl CGWRemoteDiscovery {
.arg(&(-decremet_value).to_string())
.query_async(&mut con)
.await;
if res.is_err() {
error!(
"Failed to decrement assigned infras number! Error: {}",
res.err().unwrap()
);
if let Err(e) = res {
if e.is_io_error() {
Self::set_redis_health_state_not_ready(e.to_string()).await;
}
error!("Failed to decrement assigned infras number! Error: {e}");
return Err(Error::RemoteDiscovery(
"Failed to decrement assigned infras number",
));
Expand Down Expand Up @@ -734,13 +752,11 @@ impl CGWRemoteDiscovery {
.query_async(&mut con)
.await;

if res.is_err() {
error!(
"Failed to assign infra group {} to cgw {}! Error: {}",
gid,
dst_cgw_id,
res.err().unwrap()
);
if let Err(e) = res {
if e.is_io_error() {
Self::set_redis_health_state_not_ready(e.to_string()).await;
}
error!("Failed to assign infra group {gid} to cgw {dst_cgw_id}! Error: {e}");
return Err(Error::RemoteDiscovery(
"Failed to assign infra group to cgw",
));
Expand All @@ -760,12 +776,11 @@ impl CGWRemoteDiscovery {
.query_async(&mut con)
.await;

if res.is_err() {
error!(
"Failed to deassign infra group {}! Error: {}",
gid,
res.err().unwrap()
);
if let Err(e) = res {
if e.is_io_error() {
Self::set_redis_health_state_not_ready(e.to_string()).await;
}
error!("Failed to deassign infra group {gid}! Error: {e}");
return Err(Error::RemoteDiscovery(
"Failed to deassign infra group to cgw",
));
Expand Down Expand Up @@ -1143,11 +1158,11 @@ impl CGWRemoteDiscovery {
.arg("0")
.query_async(&mut con)
.await;
if res.is_err() {
warn!(
"Failed to reset CGW{cgw_id} assigned group num count! Error: {}",
res.err().unwrap()
);
if let Err(e) = res {
if e.is_io_error() {
Self::set_redis_health_state_not_ready(e.to_string()).await;
}
warn!("Failed to reset CGW{cgw_id} assigned group num count! Error: {e}");
}
}

Expand Down Expand Up @@ -1213,10 +1228,15 @@ impl CGWRemoteDiscovery {
"Successfully cleaned up Redis for shard id {}",
self.local_shard_id
),
Err(e) => error!(
"Failed to cleanup Redis for shard id {}! Error: {}",
self.local_shard_id, e
),
Err(e) => {
if e.is_io_error() {
Self::set_redis_health_state_not_ready(e.to_string()).await;
}
error!(
"Failed to cleanup Redis for shard id {}! Error: {}",
self.local_shard_id, e
);
}
}
}

Expand All @@ -1231,6 +1251,9 @@ impl CGWRemoteDiscovery {
{
Ok(cap) => cap,
Err(e) => {
if e.is_io_error() {
Self::set_redis_health_state_not_ready(e.to_string()).await;
}
warn!("Failed to get infras capacity for GID {gid}! Ereor: {e}");
return Err(Error::RemoteDiscovery("Failed to get infras capacity"));
}
Expand All @@ -1250,6 +1273,9 @@ impl CGWRemoteDiscovery {
{
Ok(cap) => cap,
Err(e) => {
if e.is_io_error() {
Self::set_redis_health_state_not_ready(e.to_string()).await;
}
warn!("Failed to get infras assigned number for GID {gid}! Error: {e}");
return Err(Error::RemoteDiscovery(
"Failed to get group infras assigned number",
Expand All @@ -1270,6 +1296,9 @@ impl CGWRemoteDiscovery {
match res {
Ok(_) => debug!("Switched to Redis Database {database_id}"),
Err(e) => {
if e.is_io_error() {
Self::set_redis_health_state_not_ready(e.to_string()).await;
}
warn!("Failed to switch to Redis Database {database_id}! Error: {e}");
return Err(Error::RemoteDiscovery("Failed to switch Redis Database"));
}
Expand All @@ -1291,6 +1320,9 @@ impl CGWRemoteDiscovery {
match res {
Ok(_) => debug!("Added device to Redis cache: {device_json}"),
Err(e) => {
if e.is_io_error() {
Self::set_redis_health_state_not_ready(e.to_string()).await;
}
warn!("Failed to add device to Redis cache! Error: {e}");
return Err(Error::RemoteDiscovery(
"Failed to add device to Redis cache",
Expand All @@ -1310,6 +1342,9 @@ impl CGWRemoteDiscovery {
match res {
Ok(_) => debug!("Removed device from Redis cache: {}", device_mac),
Err(e) => {
if e.is_io_error() {
Self::set_redis_health_state_not_ready(e.to_string()).await;
}
warn!(
"Failed to remove device {} from Redis cache! Error: {e}",
device_mac.to_hex_string()
Expand Down Expand Up @@ -1362,6 +1397,9 @@ impl CGWRemoteDiscovery {
let redis_keys: Vec<String> = match redis::cmd("KEYS").arg(&key).query_async(&mut con).await
{
Err(e) => {
if e.is_io_error() {
Self::set_redis_health_state_not_ready(e.to_string()).await;
}
error!(
"Failed to get devices cache from Redis for shard id {}, Error: {}",
self.local_shard_id, e
Expand All @@ -1377,6 +1415,9 @@ impl CGWRemoteDiscovery {
let device_str: String = match redis::cmd("GET").arg(&key).query_async(&mut con).await {
Ok(dev) => dev,
Err(e) => {
if e.is_io_error() {
Self::set_redis_health_state_not_ready(e.to_string()).await;
}
error!(
"Failed to get devices cache from Redis for shard id {}, Error: {}",
self.local_shard_id, e
Expand Down Expand Up @@ -1449,6 +1490,9 @@ impl CGWRemoteDiscovery {
.await
{
Err(e) => {
if e.is_io_error() {
Self::set_redis_health_state_not_ready(e.to_string()).await;
}
error!(
"Failed to get devices cache from Redis for shard id {}, Error: {}",
self.local_shard_id, e
Expand Down Expand Up @@ -1498,6 +1542,9 @@ impl CGWRemoteDiscovery {
match res {
Ok(_) => debug!("Updated Redis timestamp: {timestamp}"),
Err(e) => {
if e.is_io_error() {
Self::set_redis_health_state_not_ready(e.to_string()).await;
}
warn!("Failed update Redis timestamp! Error: {e}");
return Err(Error::RemoteDiscovery("Failed update Redis timestamp"));
}
Expand All @@ -1516,6 +1563,9 @@ impl CGWRemoteDiscovery {
{
Ok(timestamp) => timestamp,
Err(e) => {
if e.is_io_error() {
Self::set_redis_health_state_not_ready(e.to_string()).await;
}
error!("Failed to get Redis last update timestamp! Error: {}", e);
return Err(Error::RemoteDiscovery(
"Failed to get Redis last update timestamp",
Expand All @@ -1525,4 +1575,15 @@ impl CGWRemoteDiscovery {

Ok(last_update_timestamp)
}

pub async fn set_redis_health_state_not_ready(error_message: String) {
tokio::spawn(async move {
CGWMetrics::get_ref()
.change_component_health_status(
CGWMetricsHealthComponent::RedisConnection,
CGWMetricsHealthComponentStatus::NotReady(error_message),
)
.await;
});
}
}

0 comments on commit 678aa34

Please sign in to comment.