diff --git a/Cargo.lock b/Cargo.lock index 58a8223..83b543c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -401,12 +401,36 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b6a852b24ab71dffc585bcb46eaf7959d175cb865a7152e35b348d1b2960422" +[[package]] +name = "combine" +version = "4.6.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba5a308b75df32fe02788e748662718f03fde005016435c444eea572398219fd" +dependencies = [ + "bytes", + "futures-core", + "memchr", + "pin-project-lite", + "tokio", + "tokio-util", +] + [[package]] name = "convert_case" version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6245d59a3e82a7fc217c5828a6692dbc6dfb63a0c8c90495621f7b9d79704a0e" +[[package]] +name = "core-foundation" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91e195e091a93c46f7102ec7818a2aa394e1e1771c3ab4825963fa03e45afb8f" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "core-foundation-sys" version = "0.8.6" @@ -1398,6 +1422,12 @@ version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" +[[package]] +name = "openssl-probe" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" + [[package]] name = "parking_lot" version = "0.12.3" @@ -1739,20 +1769,29 @@ dependencies = [ ] [[package]] -name = "redis-async" -version = "0.17.2" +name = "redis" +version = "0.25.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e44d2172ccb44736798c4cdc8690fb0a3776a74daa502bfe124708ffc144909c" +checksum = "e0d7a6955c7511f60f3ba9e86c6d02b3c3f144f8c24b288d1f4e18074ab8bbec" dependencies = [ + "async-trait", "bytes", - "futures-channel", - "futures-sink", + "combine", "futures-util", - "log", - "pin-project", + "itoa", + "percent-encoding", + "pin-project-lite", + "rustls 0.22.4", + "rustls-native-certs", + "rustls-pemfile", + "rustls-pki-types", + "ryu", + "sha1_smol", "socket2", "tokio", + "tokio-rustls 0.25.0", "tokio-util", + "url", ] [[package]] @@ -1875,6 +1914,20 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "rustls" +version = "0.22.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf4ef73721ac7bcd79b2b315da7779d8fc09718c6b3d2d1b2d94850eb8c18432" +dependencies = [ + "log", + "ring", + "rustls-pki-types", + "rustls-webpki", + "subtle", + "zeroize", +] + [[package]] name = "rustls" version = "0.23.10" @@ -1890,6 +1943,19 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rustls-native-certs" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a88d6d420651b496bdd98684116959239430022a115c1240e6c3993be0b15fba" +dependencies = [ + "openssl-probe", + "rustls-pemfile", + "rustls-pki-types", + "schannel", + "security-framework", +] + [[package]] name = "rustls-pemfile" version = "2.1.2" @@ -1930,6 +1996,15 @@ version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f" +[[package]] +name = "schannel" +version = "0.1.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fbc91545643bcf3a0bbb6569265615222618bdf33ce4ffbbd13c4bbd4c093534" +dependencies = [ + "windows-sys 0.52.0", +] + [[package]] name = "scoped-tls" version = "1.0.1" @@ -1942,6 +2017,29 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "security-framework" +version = "2.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c627723fd09706bacdb5cf41499e95098555af3c3c29d014dc3c458ef6be11c0" +dependencies = [ + "bitflags 2.5.0", + "core-foundation", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + +[[package]] +name = "security-framework-sys" +version = "2.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "317936bbbd05227752583946b9e66d7ce3b489f84e11a94a510b4437fef407d7" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "semver" version = "1.0.23" @@ -2002,6 +2100,12 @@ dependencies = [ "digest", ] +[[package]] +name = "sha1_smol" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae1a47186c03a32177042e55dbc5fd5aee900b8e0069a8d70fba96a9375cd012" + [[package]] name = "sha2" version = "0.10.8" @@ -2290,13 +2394,24 @@ dependencies = [ "whoami", ] +[[package]] +name = "tokio-rustls" +version = "0.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "775e0c0f0adb3a2f22a00c4745d728b479985fc15ee7ca6a2608388c5569860f" +dependencies = [ + "rustls 0.22.4", + "rustls-pki-types", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c7bc40d0e5a97695bb96e27995cd3a08538541b0a846f65bba7a359f36700d4" dependencies = [ - "rustls", + "rustls 0.23.10", "rustls-pki-types", "tokio", ] @@ -2541,7 +2656,7 @@ dependencies = [ "prost", "prost-build", "rdkafka", - "redis-async", + "redis", "rlimit", "rustls-pemfile", "rustls-pki-types", @@ -2550,7 +2665,7 @@ dependencies = [ "tokio", "tokio-pg-mapper", "tokio-postgres", - "tokio-rustls", + "tokio-rustls 0.26.0", "tokio-stream", "tokio-tungstenite 0.23.0", "tonic", diff --git a/Cargo.toml b/Cargo.toml index 7a20d60..bf63ef0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,9 +12,9 @@ tokio = { version = "1.34.0", features = ["full"] } tokio-stream = { version = "0.1.15", features = ["full"] } tokio-tungstenite = { version = "0.23.0" } tokio-rustls = "0.26.0" -tokio-postgres = { version = "0.7.10", features = ["with-eui48-1"]} +tokio-postgres = { version = "0.7.10", features = ["with-eui48-1"] } tokio-pg-mapper = "0.2.0" -tungstenite = { version = "0.23.0"} +tungstenite = { version = "0.23.0" } futures-util = { version = "0.3.0", default-features = false } futures-channel = "0.3.0" futures-executor = { version = "0.3.0", optional = true } @@ -23,9 +23,12 @@ rlimit = "0.10.1" tonic = "0.11.0" prost = "0.12" rdkafka = "0.36.2" -eui48 = { version = "1.1.0", features = ["serde"]} +eui48 = { version = "1.1.0", features = ["serde"] } uuid = { version = "1.6.1", features = ["serde"] } -redis-async = "0.17.2" +redis = { version = "0.25.3", features = [ + "tokio-rustls-comp", + "tls-rustls-insecure", +] } warp = "0.3.7" prometheus = { version = "0.13.4", features = ["process"] } lazy_static = "1.4.0" diff --git a/README.md b/README.md index 4df275c..4103186 100644 --- a/README.md +++ b/README.md @@ -74,6 +74,8 @@ CGW_DB_USER - PSQL DB username (credentials) to use upon connect to CGW_DB_PASS - PSQL DB password (credentials) to use upon connect to DB CGW_REDIS_HOST - IP or hostname of remote redis-db server to connect to CGW_REDIS_PORT - PORT of remote redis-db server to connect to +CGW_REDIS_USERNAME - REDIS username (credentials) to use upon connect to +CGW_REDIS_PASSWORD - REDIS password (credentials) to use upon connect to CGW_LOG_LEVEL - Log level to start CGW application with (debug, info) CGW_METRICS_PORT - PORT of metrics to connect to CGW_CERTS_PATH - Path to certificates located on host machine @@ -97,6 +99,8 @@ declare -x CGW_KAFKA_PORT="9092" declare -x CGW_LOG_LEVEL="debug" declare -x CGW_REDIS_HOST="localhost" # Redis server can be found at the local host declare -x CGW_REDIS_PORT="6379" +declare -x CGW_REDIS_USERNAME="cgw" # REDIS login credentials (username) - optional +declare -x CGW_REDIS_PASSWORD="123" # REDIS login credentials (password) - optional declare -x CGW_METRICS_PORT="8080" declare -x CGW_WSS_IP="0.0.0.0" # Accept WSS connections at all interfaces / subnets declare -x CGW_WSS_PORT="15002" diff --git a/run_cgw.sh b/run_cgw.sh index 15f3d75..f831b77 100755 --- a/run_cgw.sh +++ b/run_cgw.sh @@ -66,6 +66,14 @@ export CGW_METRICS_PORT="${CGW_METRICS_PORT:-$DEFAULT_METRICS_PORT}" export CGW_CERTS_PATH="${CGW_CERTS_PATH:-$DEFAULT_CERTS_PATH}" export CGW_ALLOW_CERT_MISMATCH="${CGW_ALLOW_CERT_MISMATCH:-$DEFAULT_ALLOW_CERT_MISMATCH}" +if [ -z "${!CGW_REDIS_USERNAME}" ]; then + export CGW_REDIS_USERNAME="${CGW_REDIS_USERNAME}" +fi + +if [ -z "${!CGW_REDIS_PASSWORD}" ]; then + export CGW_REDIS_PASSWORD="${CGW_REDIS_PASSWORD}" +fi + echo "Starting CGW..." echo "CGW LOG LEVEL : $CGW_LOG_LEVEL" echo "CGW ID : $CGW_ID" @@ -88,30 +96,32 @@ echo "CGW ALLOW CERT MISMATCH : $CGW_ALLOW_CERT_MISMATCH" docker run \ --cap-add=SYS_PTRACE --security-opt seccomp=unconfined \ -v $CGW_CERTS_PATH:$CONTAINTER_CERTS_VOLUME \ - -e CGW_LOG_LEVEL \ - -e CGW_ID \ - -e CGW_WSS_IP \ - -e CGW_WSS_PORT \ - -e DEFAULT_WSS_THREAD_NUM \ - -e CGW_WSS_CAS \ - -e CGW_WSS_CERT \ - -e CGW_WSS_KEY \ - -e CGW_GRPC_LISTENING_IP \ - -e CGW_GRPC_LISTENING_PORT \ - -e CGW_GRPC_PUBLIC_HOST \ - -e CGW_GRPC_PUBLIC_PORT \ - -e CGW_KAFKA_HOST \ - -e CGW_KAFKA_PORT \ - -e CGW_KAFKA_CONSUME_TOPIC \ - -e CGW_KAFKA_PRODUCE_TOPIC \ - -e CGW_DB_NAME \ - -e CGW_DB_HOST \ - -e CGW_DB_PORT \ - -e CGW_DB_USERNAME \ - -e CGW_DB_PASSWORD \ - -e CGW_REDIS_HOST \ - -e CGW_REDIS_PORT \ + -e CGW_LOG_LEVEL \ + -e CGW_ID \ + -e CGW_WSS_IP \ + -e CGW_WSS_PORT \ + -e DEFAULT_WSS_THREAD_NUM \ + -e CGW_WSS_CAS \ + -e CGW_WSS_CERT \ + -e CGW_WSS_KEY \ + -e CGW_GRPC_LISTENING_IP \ + -e CGW_GRPC_LISTENING_PORT \ + -e CGW_GRPC_PUBLIC_HOST \ + -e CGW_GRPC_PUBLIC_PORT \ + -e CGW_KAFKA_HOST \ + -e CGW_KAFKA_PORT \ + -e CGW_KAFKA_CONSUME_TOPIC \ + -e CGW_KAFKA_PRODUCE_TOPIC \ + -e CGW_DB_NAME \ + -e CGW_DB_HOST \ + -e CGW_DB_PORT \ + -e CGW_DB_USERNAME \ + -e CGW_DB_PASSWORD \ + -e CGW_REDIS_HOST \ + -e CGW_REDIS_PORT \ + -e CGW_REDIS_USERNAME \ + -e CGW_REDIS_PASSWORD \ -e CGW_FEATURE_TOPOMAP_ENABLE \ - -e CGW_METRICS_PORT \ - -e CGW_ALLOW_CERT_MISMATCH \ + -e CGW_METRICS_PORT \ + -e CGW_ALLOW_CERT_MISMATCH \ -d -t --network=host --name $2 $1 ucentral-cgw diff --git a/src/cgw_app_args.rs b/src/cgw_app_args.rs index 554bff8..5e6ce9f 100644 --- a/src/cgw_app_args.rs +++ b/src/cgw_app_args.rs @@ -304,6 +304,10 @@ pub struct CGWRedisArgs { pub redis_host: String, /// PORT to connect to REDIS pub redis_port: u16, + /// REDIS username + pub redis_username: Option, + /// REDIS password + pub redis_password: Option, } impl CGWRedisArgs { @@ -335,9 +339,33 @@ impl CGWRedisArgs { Err(_) => CGW_DEFAULT_REDIS_PORT, }; + let redis_username: Option = match env::var("CGW_REDIS_USERNAME") { + Ok(username) => { + if username.is_empty() { + None + } else { + Some(username) + } + } + Err(_) => None, + }; + + let redis_password: Option = match env::var("CGW_REDIS_PASSWORD") { + Ok(password) => { + if password.is_empty() { + None + } else { + Some(password) + } + } + Err(_) => None, + }; + Ok(CGWRedisArgs { redis_host, redis_port, + redis_username, + redis_password, }) } } diff --git a/src/cgw_errors.rs b/src/cgw_errors.rs index b93f50d..22b80b9 100644 --- a/src/cgw_errors.rs +++ b/src/cgw_errors.rs @@ -17,6 +17,8 @@ pub enum Error { Tls(String), + Redis(String), + UCentralParser(&'static str), UCentralMessagesQueue(&'static str), @@ -63,9 +65,6 @@ pub enum Error { #[from] InvalidUri(warp::http::uri::InvalidUri), - #[from] - RedisAsync(redis_async::error::Error), - #[from] StaticStr(&'static str), diff --git a/src/cgw_remote_discovery.rs b/src/cgw_remote_discovery.rs index be98f89..f87ef1e 100644 --- a/src/cgw_remote_discovery.rs +++ b/src/cgw_remote_discovery.rs @@ -1,4 +1,5 @@ use crate::{ + cgw_app_args::CGWRedisArgs, cgw_db_accessor::{CGWDBAccessor, CGWDBInfra, CGWDBInfrastructureGroup}, cgw_device::{CGWDevice, CGWDeviceState}, cgw_devices_cache::CGWDevicesCache, @@ -17,7 +18,10 @@ use std::{ sync::Arc, }; -use redis_async::resp_array; +use redis::{ + aio::MultiplexedConnection, Client, ConnectionInfo, RedisConnectionInfo, RedisResult, + ToRedisArgs, +}; use eui48::MacAddress; @@ -137,12 +141,28 @@ pub struct CGWRemoteIface { #[derive(Clone)] pub struct CGWRemoteDiscovery { db_accessor: Arc, - redis_client: redis_async::client::paired::PairedConnection, + redis_client: MultiplexedConnection, gid_to_cgw_cache: Arc>>, remote_cgws_map: Arc>>, local_shard_id: i32, } +fn cgw_create_redis_client(redis_args: &CGWRedisArgs) -> Result { + let redis_client_info = ConnectionInfo { + addr: redis::ConnectionAddr::Tcp(redis_args.redis_host.clone(), redis_args.redis_port), + redis: RedisConnectionInfo { + username: redis_args.redis_username.clone(), + password: redis_args.redis_password.clone(), + ..Default::default() + }, + }; + + match redis::Client::open(redis_client_info) { + Ok(client) => Ok(client), + Err(e) => Err(Error::Redis(format!("Failed to start Redis Client: {}", e))), + } +} + impl CGWRemoteDiscovery { pub async fn new(app_args: &AppArgs) -> Result { debug!( @@ -150,12 +170,7 @@ impl CGWRemoteDiscovery { app_args.redis_args.redis_host, app_args.redis_args.redis_port ); - let redis_client = match redis_async::client::paired::paired_connect( - app_args.redis_args.redis_host.clone(), - app_args.redis_args.redis_port, - ) - .await - { + let redis_client = match cgw_create_redis_client(&app_args.redis_args) { Ok(c) => c, Err(e) => { error!( @@ -166,6 +181,17 @@ impl CGWRemoteDiscovery { } }; + let redis_client = match redis_client.get_multiplexed_tokio_connection().await { + Ok(conn) => conn, + Err(e) => { + error!( + "Can't create CGW Remote Discovery client: Get Redis async connection failed ({})", + e + ); + return Err(Error::RemoteDiscovery("Redis client create failed")); + } + }; + let db_accessor = match CGWDBAccessor::new(&app_args.db_args).await { Ok(c) => c, Err(e) => { @@ -215,33 +241,26 @@ impl CGWRemoteDiscovery { }; let redis_req_data: Vec = redisdb_shard_info.into(); + let mut con = rc.redis_client.clone(); - if let Err(e) = rc - .redis_client - .send::(resp_array![ - "DEL", - format!("{REDIS_KEY_SHARD_ID_PREFIX}{}", app_args.cgw_id) - ]) - .await - { + let res: RedisResult<()> = redis::cmd("DEL") + .arg(format!("{REDIS_KEY_SHARD_ID_PREFIX}{}", app_args.cgw_id)) + .query_async(&mut con) + .await; + if res.is_err() { warn!( - "Failed to destroy record about shard in REDIS, first launch? ({:?})", - e + "Failed to destroy record about shard in REDIS, first launch? ({})", + res.err().unwrap() ); } - if let Err(e) = rc - .redis_client - .send::( - resp_array![ - "HSET", - format!("{REDIS_KEY_SHARD_ID_PREFIX}{}", app_args.cgw_id) - ] - .append(redis_req_data), - ) - .await - { - error!("Can't create CGW Remote Discovery client: Failed to create record about shard in REDIS: {:?}", e); + let res: RedisResult<()> = redis::cmd("HSET") + .arg(format!("{REDIS_KEY_SHARD_ID_PREFIX}{}", app_args.cgw_id)) + .arg(redis_req_data.to_redis_args()) + .query_async(&mut con) + .await; + if res.is_err() { + error!("Can't create CGW Remote Discovery client: Failed to create record about shard in REDIS: {}", res.err().unwrap()); return Err(Error::RemoteDiscovery( "Failed to create record about shard in REDIS", )); @@ -301,50 +320,41 @@ impl CGWRemoteDiscovery { // Clear hashmap lock.clear(); + let mut con = self.redis_client.clone(); - let redis_keys: Vec = match self - .redis_client - .send::>(resp_array!["KEYS", format!("{}*", REDIS_KEY_GID)]) + let redis_keys: Vec = match redis::cmd("KEYS") + .arg(format!("{REDIS_KEY_GID}*")) + .query_async(&mut con) .await { - Err(_) => { + Err(e) => { + error!("Failed to sync gid to cgw map:\n{}", e); return Err(Error::RemoteDiscovery("Failed to get KEYS list from REDIS")); } - Ok(r) => r, + Ok(keys) => keys, }; for key in redis_keys { - let gid: i32 = match self - .redis_client - .send::(resp_array!["HGET", &key, REDIS_KEY_GID_VALUE_GID]) + let gid: i32 = match redis::cmd("HGET") + .arg(&key) + .arg(REDIS_KEY_GID_VALUE_GID) + .query_async(&mut con) .await { - Ok(res) => { - match res.parse::() { - Ok(res) => res, - Err(e) => { - warn!("Found proper key '{key}' entry, but failed to parse GID from it:\n{e}"); - continue; - } - } - } + Ok(gid) => gid, Err(e) => { warn!("Found proper key '{key}' entry, but failed to fetch GID from it:\n{e}"); continue; } }; - let shard_id: i32 = match self - .redis_client - .send::(resp_array!["HGET", &key, REDIS_KEY_GID_VALUE_SHARD_ID]) + + let shard_id: i32 = match redis::cmd("HGET") + .arg(&key) + .arg(REDIS_KEY_GID_VALUE_SHARD_ID) + .query_async(&mut con) .await { - Ok(res) => match res.parse::() { - Ok(res) => res, - Err(e) => { - warn!("Found proper key '{key}' entry, but failed to parse SHARD_ID from it:\n{e}"); - continue; - } - }, + Ok(shard_id) => shard_id, Err(e) => { warn!("Found proper key '{key}' entry, but failed to fetch SHARD_ID from it:\n{e}"); continue; @@ -399,20 +409,27 @@ impl CGWRemoteDiscovery { // Clear hashmap lock.clear(); - let redis_keys: Vec = self - .redis_client - .send::>(resp_array![ - "KEYS", - format!("{}*", REDIS_KEY_SHARD_ID_PREFIX) - ]) - .await?; + let mut con = self.redis_client.clone(); + let redis_keys: Vec = match redis::cmd("KEYS") + .arg(format!("{REDIS_KEY_SHARD_ID_PREFIX}*")) + .query_async(&mut con) + .await + { + Ok(keys) => keys, + Err(e) => { + error!( + "Can't sync remote CGW map: Failed to get shard record in REDIS: {}", + e + ); + return Err(Error::RemoteDiscovery("Failed to get KEYS list from REDIS")); + } + }; for key in redis_keys { - match self - .redis_client - .send::>(resp_array!["HGETALL", &key]) - .await - { + let res: RedisResult> = + redis::cmd("HGETALL").arg(&key).query_async(&mut con).await; + + match res { Ok(res) => { let shrd: CGWREDISDBShard = CGWREDISDBShard::from(res); if shrd == CGWREDISDBShard::default() { @@ -464,14 +481,22 @@ impl CGWRemoteDiscovery { async fn increment_cgw_assigned_groups_num(&self, cgw_id: i32) -> Result<()> { debug!("Incrementing assigned groups num cgw_id_{cgw_id}"); - self.redis_client - .send::(resp_array![ - "HINCRBY", - format!("{}{cgw_id}", REDIS_KEY_SHARD_ID_PREFIX), - REDIS_KEY_SHARD_VALUE_ASSIGNED_G_NUM, - "1" - ]) - .await?; + let mut con = self.redis_client.clone(); + let res: RedisResult<()> = redis::cmd("HINCRBY") + .arg(format!("{}{cgw_id}", REDIS_KEY_SHARD_ID_PREFIX)) + .arg(REDIS_KEY_SHARD_VALUE_ASSIGNED_G_NUM) + .arg("1") + .query_async(&mut con) + .await; + if res.is_err() { + error!( + "Failed to increment assigned group number:\n{}", + res.err().unwrap() + ); + return Err(Error::RemoteDiscovery( + "Failed to increment assigned group number", + )); + } if cgw_id == self.local_shard_id { CGWMetrics::get_ref().change_counter( @@ -485,14 +510,22 @@ impl CGWRemoteDiscovery { async fn decrement_cgw_assigned_groups_num(&self, cgw_id: i32) -> Result<()> { debug!("Decrementing assigned groups num cgw_id_{cgw_id}"); - self.redis_client - .send::(resp_array![ - "HINCRBY", - format!("{}{cgw_id}", REDIS_KEY_SHARD_ID_PREFIX), - REDIS_KEY_SHARD_VALUE_ASSIGNED_G_NUM, - "-1" - ]) - .await?; + let mut con = self.redis_client.clone(); + let res: RedisResult<()> = redis::cmd("HINCRBY") + .arg(format!("{}{cgw_id}", REDIS_KEY_SHARD_ID_PREFIX)) + .arg(REDIS_KEY_SHARD_VALUE_ASSIGNED_G_NUM) + .arg("-1") + .query_async(&mut con) + .await; + if res.is_err() { + error!( + "Failed to decrement assigned group number:\n{}", + res.err().unwrap() + ); + return Err(Error::RemoteDiscovery( + "Failed to decrement assigned groups number", + )); + } if cgw_id == self.local_shard_id { CGWMetrics::get_ref().change_counter( @@ -549,16 +582,27 @@ impl CGWRemoteDiscovery { let dst_cgw_id: i32 = self.get_infra_group_cgw_assignee().await?; - self.redis_client - .send::(resp_array![ - "HSET", - format!("{REDIS_KEY_GID}{gid}"), - REDIS_KEY_GID_VALUE_GID, - gid.to_string(), - REDIS_KEY_GID_VALUE_SHARD_ID, - dst_cgw_id.to_string() - ]) - .await?; + let mut con = self.redis_client.clone(); + let res: RedisResult<()> = redis::cmd("HSET") + .arg(format!("{REDIS_KEY_GID}{gid}")) + .arg(REDIS_KEY_GID_VALUE_GID) + .arg(gid.to_string()) + .arg(REDIS_KEY_GID_VALUE_SHARD_ID) + .arg(dst_cgw_id.to_string()) + .query_async(&mut con) + .await; + + if res.is_err() { + error!( + "Failed to assign infra group {} to cgw {}:\n{}", + gid, + dst_cgw_id, + res.err().unwrap() + ); + return Err(Error::RemoteDiscovery( + "Failed to assign infra group to cgw", + )); + } self.gid_to_cgw_cache.write().await.insert(gid, dst_cgw_id); @@ -568,11 +612,24 @@ impl CGWRemoteDiscovery { } pub async fn deassign_infra_group_to_cgw(&self, gid: i32) -> Result<()> { - self.redis_client - .send::(resp_array!["DEL", format!("{REDIS_KEY_GID}{gid}")]) - .await?; + let mut con = self.redis_client.clone(); + let res: RedisResult<()> = redis::cmd("DEL") + .arg(format!("{REDIS_KEY_GID}{gid}")) + .query_async(&mut con) + .await; + + if res.is_err() { + error!( + "Failed to deassign infra group {}:\n{}", + gid, + res.err().unwrap() + ); + return Err(Error::RemoteDiscovery( + "Failed to deassign infra group to cgw", + )); + } - debug!("REDIS: deassigned gid{gid} from controlled CGW"); + debug!("REDIS: deassigned gid {gid} from controlled CGW"); self.gid_to_cgw_cache.write().await.remove(&gid); @@ -799,18 +856,19 @@ impl CGWRemoteDiscovery { // Clear local cache self.gid_to_cgw_cache.write().await.clear(); + let mut con = self.redis_client.clone(); for (cgw_id, _val) in self.remote_cgws_map.read().await.iter() { - if let Err(e) = self - .redis_client - .send::(resp_array![ - "HSET", - format!("{}{cgw_id}", REDIS_KEY_SHARD_ID_PREFIX), - REDIS_KEY_SHARD_VALUE_ASSIGNED_G_NUM, - "0" - ]) - .await - { - warn!("Failed to reset CGW{cgw_id} assigned group num count, e:{e}"); + let res: RedisResult<()> = redis::cmd("HSET") + .arg(format!("{}{cgw_id}", REDIS_KEY_SHARD_ID_PREFIX)) + .arg(REDIS_KEY_SHARD_VALUE_ASSIGNED_G_NUM) + .arg("0") + .query_async(&mut con) + .await; + if res.is_err() { + warn!( + "Failed to reset CGW{cgw_id} assigned group num count, e:{}", + res.err().unwrap() + ); } } @@ -835,12 +893,13 @@ impl CGWRemoteDiscovery { pub async fn cleanup_redis(&self) { debug!("Remove from Redis shard id {}", self.local_shard_id); // We are on de-init stage - ignore any errors on Redis clean-up - let _ = self - .redis_client - .send::(resp_array![ - "DEL", - format!("{REDIS_KEY_SHARD_ID_PREFIX}{}", self.local_shard_id) - ]) + let mut con = self.redis_client.clone(); + let _res: RedisResult<()> = redis::cmd("DEL") + .arg(format!( + "{REDIS_KEY_SHARD_ID_PREFIX}{}", + self.local_shard_id + )) + .query_async(&mut con) .await; } }