From 429d19930ac2a1ea244b67e36cfcf344454925ea Mon Sep 17 00:00:00 2001 From: Sviatoslav Boichuk Date: Thu, 16 Jan 2025 16:59:16 +0200 Subject: [PATCH] Create/update kafka topics on CGW init --- src/cgw_errors.rs | 3 + src/cgw_kafka_init.rs | 234 ++++++++++++++++++++ src/cgw_remote_discovery.rs | 2 +- src/main.rs | 8 + utils/docker/docker-compose-template.yml.j2 | 33 +-- 5 files changed, 257 insertions(+), 23 deletions(-) create mode 100644 src/cgw_kafka_init.rs diff --git a/src/cgw_errors.rs b/src/cgw_errors.rs index 80601a3..d5d563e 100644 --- a/src/cgw_errors.rs +++ b/src/cgw_errors.rs @@ -32,6 +32,8 @@ pub enum Error { Runtime(String), + KafkaInit(String), + // -- Externals #[from] Io(std::io::Error), @@ -89,6 +91,7 @@ impl std::fmt::Display for Error { | Error::Tls(message) | Error::ConnectionServer(message) | Error::Runtime(message) + | Error::KafkaInit(message) | Error::Redis(message) | Error::Tcp(message) | Error::UCentralMessagesQueue(message) diff --git a/src/cgw_kafka_init.rs b/src/cgw_kafka_init.rs new file mode 100644 index 0000000..d15bf43 --- /dev/null +++ b/src/cgw_kafka_init.rs @@ -0,0 +1,234 @@ +use crate::cgw_app_args::{CGWKafkaArgs, CGWRedisArgs}; +use crate::cgw_errors::{Error, Result}; +use crate::cgw_remote_discovery::cgw_create_redis_client; + +use rdkafka::admin::{AdminClient, AdminOptions, NewPartitions, NewTopic, TopicReplication}; +use rdkafka::client::DefaultClientContext; +use rdkafka::config::ClientConfig; + +use std::time::Duration; + +const CGW_KAFKA_TOPICS_LIST: [&str; 6] = [ + "CnC", + "CnC_Res", + "Connection", + "Infra_Realtime", + "State", + "Topology", +]; + +async fn cgw_get_active_cgw_number(redis_args: &CGWRedisArgs) -> Result { + let redis_client = match cgw_create_redis_client(redis_args).await { + Ok(client) => client, + Err(e) => { + return Err(Error::KafkaInit(format!( + "Failed to create redis client! Error: {e}" + ))); + } + }; + + let mut redis_connection = match redis_client + .get_multiplexed_tokio_connection_with_response_timeouts( + Duration::from_secs(1), + Duration::from_secs(5), + ) + .await + { + Ok(conn) => conn, + Err(e) => { + return Err(Error::KafkaInit(format!( + "Failed to get redis connection! Error: {e}" + ))); + } + }; + + let redis_keys: Vec = match redis::cmd("KEYS") + .arg("shard_id_*".to_string()) + .query_async(&mut redis_connection) + .await + { + Err(e) => { + return Err(Error::KafkaInit(format!( + "Failed to get shard_id list from REDIS! Error: {e}" + ))); + } + Ok(keys) => keys, + }; + + Ok(redis_keys.len()) +} + +fn cgw_create_kafka_admin(kafka_args: &CGWKafkaArgs) -> Result> { + let admin_client: AdminClient = match ClientConfig::new() + .set( + "bootstrap.servers", + format!("{}:{}", kafka_args.kafka_host, kafka_args.kafka_port), + ) + .create() + { + Ok(client) => client, + Err(e) => { + return Err(Error::KafkaInit(format!( + "Failed to create kafka admin client! Error: {e}" + ))); + } + }; + + Ok(admin_client) +} + +fn cgw_get_kafka_topics( + admin_client: &AdminClient, +) -> Result> { + let metadata = match admin_client + .inner() + .fetch_metadata(None, Duration::from_millis(2000)) + { + Ok(data) => data, + Err(e) => { + return Err(Error::KafkaInit(format!( + "Failed to get kafka topics metadata! Error: {e}" + ))); + } + }; + + let existing_topics: Vec<(String, usize)> = metadata + .topics() + .iter() + .map(|t| (t.name().to_string(), t.partitions().len())) + .collect(); + + Ok(existing_topics) +} + +async fn cgw_create_kafka_topics(admin_client: &AdminClient) -> Result<()> { + let mut new_topics: Vec> = Vec::new(); + let default_replication: i32 = 1; + let default_topic_partitions_num: i32 = 2; + let default_cnc_topic_partitions_num: i32 = 1; + + for topic_name in CGW_KAFKA_TOPICS_LIST { + new_topics.push(NewTopic::new( + topic_name, + if topic_name == "CnC" { + default_cnc_topic_partitions_num + } else { + default_topic_partitions_num + }, + TopicReplication::Fixed(default_replication), + )); + } + + match admin_client + .create_topics(&new_topics, &AdminOptions::new()) + .await + { + Ok(results) => { + for result in results { + match result { + Ok(topic) => info!("Successfully created topic: {}", topic), + Err((topic, err)) => { + return Err(Error::KafkaInit(format!( + "Failed to create topic {topic}!, Error: {err}" + ))); + } + } + } + } + Err(e) => { + return Err(Error::KafkaInit(format!( + "Failed to create kafka topics! Error: {e}" + ))); + } + } + + Ok(()) +} + +async fn cgw_update_kafka_topics_partitions( + admin_client: &AdminClient, + topic_name: &str, + partitions_num: usize, +) -> Result<()> { + match admin_client + .create_partitions( + &[NewPartitions::new(topic_name, partitions_num)], + &AdminOptions::new(), + ) + .await + { + Ok(results) => { + for result in results { + match result { + Ok(topic) => { + info!("Successfully increased partitions for topic: {}", topic) + } + Err((topic, e)) => { + return Err(Error::KafkaInit(format!( + "Failed to update partitions num for {topic} topic! Error: {e}" + ))); + } + } + } + } + Err(e) => { + return Err(Error::KafkaInit(format!( + "Failed to update topic partitions num for! Error: {e}" + ))); + } + } + + Ok(()) +} + +pub async fn cgw_init_kafka_topics( + kafka_args: &CGWKafkaArgs, + redis_args: &CGWRedisArgs, +) -> Result<()> { + // Kafka topics creation is done at CGW start early begin + // At that moment of time we do not create shard info record in Redis + // So, just simply add 1 to received number of CGW instances + let active_cgw_number = cgw_get_active_cgw_number(redis_args).await? + 1; + let admin_client = cgw_create_kafka_admin(kafka_args)?; + let existing_topics: Vec<(String, usize)> = cgw_get_kafka_topics(&admin_client)?; + + if existing_topics.is_empty() { + error!("Creating kafka topics"); + cgw_create_kafka_topics(&admin_client).await?; + } else { + // Find missing topics + let missing_topics: Vec<&str> = CGW_KAFKA_TOPICS_LIST + .iter() + .filter(|topic| !existing_topics.iter().any(|(name, _)| name == *topic)) + .copied() + .collect(); + + if !missing_topics.is_empty() { + return Err(Error::KafkaInit(format!( + "Failed to init kafka topics! Missed kafka topics: {}", + missing_topics.join(", ") + ))); + } + + match existing_topics.iter().find(|(key, _)| key == "CnC") { + Some((topic_name, partitions_num)) => { + if active_cgw_number > *partitions_num { + error!("Updating number of partitions for CnC topic!"); + cgw_update_kafka_topics_partitions( + &admin_client, + topic_name, + active_cgw_number, + ) + .await?; + } + } + None => { + return Err(Error::KafkaInit( + "Failed to find CnC topic in existing topics list!".to_string(), + )); + } + } + } + + Ok(()) +} diff --git a/src/cgw_remote_discovery.rs b/src/cgw_remote_discovery.rs index 450774e..4e824ea 100644 --- a/src/cgw_remote_discovery.rs +++ b/src/cgw_remote_discovery.rs @@ -157,7 +157,7 @@ pub struct CGWRemoteDiscovery { local_shard_id: i32, } -async fn cgw_create_redis_client(redis_args: &CGWRedisArgs) -> Result { +pub async fn cgw_create_redis_client(redis_args: &CGWRedisArgs) -> Result { let redis_client_info = ConnectionInfo { addr: match redis_args.redis_tls { true => redis::ConnectionAddr::TcpTls { diff --git a/src/main.rs b/src/main.rs index 4763308..d1220ae 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,6 +6,7 @@ mod cgw_db_accessor; mod cgw_device; mod cgw_devices_cache; mod cgw_errors; +mod cgw_kafka_init; mod cgw_metrics; mod cgw_nb_api_listener; mod cgw_remote_client; @@ -26,6 +27,7 @@ extern crate log; extern crate lazy_static; use cgw_app_args::AppArgs; +use cgw_kafka_init::cgw_init_kafka_topics; use cgw_runtime::cgw_initialize_runtimes; use nix::sys::socket::{setsockopt, sockopt}; @@ -356,6 +358,12 @@ async fn main() -> Result<()> { // Configure logger setup_logger(args.log_level); + // Initialize Kafka topics + if let Err(e) = cgw_init_kafka_topics(&args.kafka_args, &args.redis_args).await { + error!("Failed to initialize kafka topics! Error: {e}"); + return Err(e); + } + // Initialize runtimes if let Err(e) = cgw_initialize_runtimes(args.wss_args.wss_t_num) { error!("Failed to initialize CGW runtimes! Error: {e}"); diff --git a/utils/docker/docker-compose-template.yml.j2 b/utils/docker/docker-compose-template.yml.j2 index 17979db..504126f 100644 --- a/utils/docker/docker-compose-template.yml.j2 +++ b/utils/docker/docker-compose-template.yml.j2 @@ -15,7 +15,7 @@ services: - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@docker-broker-1:9093 - ALLOW_PLAINTEXT_LISTENER=yes - KAFKA_CFG_NODE_ID=1 - - KAFKA_AUTO_CREATE_TOPICS_ENABLE=true + - KAFKA_AUTO_CREATE_TOPICS_ENABLE=false - BITNAMI_DEBUG=yes - KAFKA_CFG_NUM_PARTITIONS=2 healthcheck: @@ -54,27 +54,6 @@ services: - ALLOW_EMPTY_PASSWORD=yes networks: - cgw_multi_instances_network - init-broker-container: - image: docker.io/bitnami/kafka:latest - depends_on: - - broker - entrypoint: [ '/bin/sh', '-c' ] - command: | - " - # rather than giving sleep 15 use this - # to block init container to wait for Kafka broker to be ready - kafka-topics --bootstrap-server broker:9092 --list - - # create CnC, CnC_Res, Connection, State, Infra_Realtime and Topology topics - kafka-topics.sh --create --bootstrap-server broker:9092 --partitions {{ cgw_instances_num }} --topic CnC - kafka-topics.sh --create --bootstrap-server broker:9092 --partitions 2 --topic CnC_Res - kafka-topics.sh --create --bootstrap-server broker:9092 --partitions 2 --topic Connection - kafka-topics.sh --create --bootstrap-server broker:9092 --partitions 2 --topic State - kafka-topics.sh --create --bootstrap-server broker:9092 --partitions 2 --topic Infra_Realtime - kafka-topics.sh --create --bootstrap-server broker:9092 --partitions 2 --topic Topology - " - networks: - - cgw_multi_instances_network {% for i in range(0, cgw_instances_num) %} cgw_instance_{{ i }}: @@ -124,11 +103,21 @@ services: depends_on: broker: condition: service_healthy + {% if i != 0 %} + cgw_instance_{{ i - 1 }}: + condition: service_healthy + {% endif %} volumes: - {{ default_certs_path }}:{{ container_certs_volume }} - {{ default_certs_path }}:{{ container_nb_infra_certs_volume }} networks: - cgw_multi_instances_network + healthcheck: + test: ["CMD-SHELL", "ps -aux | grep cgw_instance_{{ i }}"] + interval: 10s + timeout: 5s + retries: 3 + start_period: 3s {% endfor %} networks: