Skip to content

Commit

Permalink
Create/update kafka topics on CGW init
Browse files Browse the repository at this point in the history
  • Loading branch information
SviatoslavBoichuk committed Jan 16, 2025
1 parent b1f840d commit 429d199
Show file tree
Hide file tree
Showing 5 changed files with 257 additions and 23 deletions.
3 changes: 3 additions & 0 deletions src/cgw_errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ pub enum Error {

Runtime(String),

KafkaInit(String),

// -- Externals
#[from]
Io(std::io::Error),
Expand Down Expand Up @@ -89,6 +91,7 @@ impl std::fmt::Display for Error {
| Error::Tls(message)
| Error::ConnectionServer(message)
| Error::Runtime(message)
| Error::KafkaInit(message)
| Error::Redis(message)
| Error::Tcp(message)
| Error::UCentralMessagesQueue(message)
Expand Down
234 changes: 234 additions & 0 deletions src/cgw_kafka_init.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
use crate::cgw_app_args::{CGWKafkaArgs, CGWRedisArgs};
use crate::cgw_errors::{Error, Result};
use crate::cgw_remote_discovery::cgw_create_redis_client;

use rdkafka::admin::{AdminClient, AdminOptions, NewPartitions, NewTopic, TopicReplication};
use rdkafka::client::DefaultClientContext;
use rdkafka::config::ClientConfig;

use std::time::Duration;

const CGW_KAFKA_TOPICS_LIST: [&str; 6] = [
"CnC",
"CnC_Res",
"Connection",
"Infra_Realtime",
"State",
"Topology",
];

async fn cgw_get_active_cgw_number(redis_args: &CGWRedisArgs) -> Result<usize> {
let redis_client = match cgw_create_redis_client(redis_args).await {
Ok(client) => client,
Err(e) => {
return Err(Error::KafkaInit(format!(
"Failed to create redis client! Error: {e}"
)));
}
};

let mut redis_connection = match redis_client
.get_multiplexed_tokio_connection_with_response_timeouts(
Duration::from_secs(1),
Duration::from_secs(5),
)
.await
{
Ok(conn) => conn,
Err(e) => {
return Err(Error::KafkaInit(format!(
"Failed to get redis connection! Error: {e}"
)));
}
};

let redis_keys: Vec<String> = match redis::cmd("KEYS")
.arg("shard_id_*".to_string())
.query_async(&mut redis_connection)
.await
{
Err(e) => {
return Err(Error::KafkaInit(format!(
"Failed to get shard_id list from REDIS! Error: {e}"
)));
}
Ok(keys) => keys,
};

Ok(redis_keys.len())
}

fn cgw_create_kafka_admin(kafka_args: &CGWKafkaArgs) -> Result<AdminClient<DefaultClientContext>> {
let admin_client: AdminClient<DefaultClientContext> = match ClientConfig::new()
.set(
"bootstrap.servers",
format!("{}:{}", kafka_args.kafka_host, kafka_args.kafka_port),
)
.create()
{
Ok(client) => client,
Err(e) => {
return Err(Error::KafkaInit(format!(
"Failed to create kafka admin client! Error: {e}"
)));
}
};

Ok(admin_client)
}

fn cgw_get_kafka_topics(
admin_client: &AdminClient<DefaultClientContext>,
) -> Result<Vec<(String, usize)>> {
let metadata = match admin_client
.inner()
.fetch_metadata(None, Duration::from_millis(2000))
{
Ok(data) => data,
Err(e) => {
return Err(Error::KafkaInit(format!(
"Failed to get kafka topics metadata! Error: {e}"
)));
}
};

let existing_topics: Vec<(String, usize)> = metadata
.topics()
.iter()
.map(|t| (t.name().to_string(), t.partitions().len()))
.collect();

Ok(existing_topics)
}

async fn cgw_create_kafka_topics(admin_client: &AdminClient<DefaultClientContext>) -> Result<()> {
let mut new_topics: Vec<NewTopic<'_>> = Vec::new();
let default_replication: i32 = 1;
let default_topic_partitions_num: i32 = 2;
let default_cnc_topic_partitions_num: i32 = 1;

for topic_name in CGW_KAFKA_TOPICS_LIST {
new_topics.push(NewTopic::new(
topic_name,
if topic_name == "CnC" {
default_cnc_topic_partitions_num
} else {
default_topic_partitions_num
},
TopicReplication::Fixed(default_replication),
));
}

match admin_client
.create_topics(&new_topics, &AdminOptions::new())
.await
{
Ok(results) => {
for result in results {
match result {
Ok(topic) => info!("Successfully created topic: {}", topic),
Err((topic, err)) => {
return Err(Error::KafkaInit(format!(
"Failed to create topic {topic}!, Error: {err}"
)));
}
}
}
}
Err(e) => {
return Err(Error::KafkaInit(format!(
"Failed to create kafka topics! Error: {e}"
)));
}
}

Ok(())
}

async fn cgw_update_kafka_topics_partitions(
admin_client: &AdminClient<DefaultClientContext>,
topic_name: &str,
partitions_num: usize,
) -> Result<()> {
match admin_client
.create_partitions(
&[NewPartitions::new(topic_name, partitions_num)],
&AdminOptions::new(),
)
.await
{
Ok(results) => {
for result in results {
match result {
Ok(topic) => {
info!("Successfully increased partitions for topic: {}", topic)
}
Err((topic, e)) => {
return Err(Error::KafkaInit(format!(
"Failed to update partitions num for {topic} topic! Error: {e}"
)));
}
}
}
}
Err(e) => {
return Err(Error::KafkaInit(format!(
"Failed to update topic partitions num for! Error: {e}"
)));
}
}

Ok(())
}

pub async fn cgw_init_kafka_topics(
kafka_args: &CGWKafkaArgs,
redis_args: &CGWRedisArgs,
) -> Result<()> {
// Kafka topics creation is done at CGW start early begin
// At that moment of time we do not create shard info record in Redis
// So, just simply add 1 to received number of CGW instances
let active_cgw_number = cgw_get_active_cgw_number(redis_args).await? + 1;
let admin_client = cgw_create_kafka_admin(kafka_args)?;
let existing_topics: Vec<(String, usize)> = cgw_get_kafka_topics(&admin_client)?;

if existing_topics.is_empty() {
error!("Creating kafka topics");
cgw_create_kafka_topics(&admin_client).await?;
} else {
// Find missing topics
let missing_topics: Vec<&str> = CGW_KAFKA_TOPICS_LIST
.iter()
.filter(|topic| !existing_topics.iter().any(|(name, _)| name == *topic))
.copied()
.collect();

if !missing_topics.is_empty() {
return Err(Error::KafkaInit(format!(
"Failed to init kafka topics! Missed kafka topics: {}",
missing_topics.join(", ")
)));
}

match existing_topics.iter().find(|(key, _)| key == "CnC") {
Some((topic_name, partitions_num)) => {
if active_cgw_number > *partitions_num {
error!("Updating number of partitions for CnC topic!");
cgw_update_kafka_topics_partitions(
&admin_client,
topic_name,
active_cgw_number,
)
.await?;
}
}
None => {
return Err(Error::KafkaInit(
"Failed to find CnC topic in existing topics list!".to_string(),
));
}
}
}

Ok(())
}
2 changes: 1 addition & 1 deletion src/cgw_remote_discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ pub struct CGWRemoteDiscovery {
local_shard_id: i32,
}

async fn cgw_create_redis_client(redis_args: &CGWRedisArgs) -> Result<Client> {
pub async fn cgw_create_redis_client(redis_args: &CGWRedisArgs) -> Result<Client> {
let redis_client_info = ConnectionInfo {
addr: match redis_args.redis_tls {
true => redis::ConnectionAddr::TcpTls {
Expand Down
8 changes: 8 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ mod cgw_db_accessor;
mod cgw_device;
mod cgw_devices_cache;
mod cgw_errors;
mod cgw_kafka_init;
mod cgw_metrics;
mod cgw_nb_api_listener;
mod cgw_remote_client;
Expand All @@ -26,6 +27,7 @@ extern crate log;
extern crate lazy_static;

use cgw_app_args::AppArgs;
use cgw_kafka_init::cgw_init_kafka_topics;
use cgw_runtime::cgw_initialize_runtimes;

use nix::sys::socket::{setsockopt, sockopt};
Expand Down Expand Up @@ -356,6 +358,12 @@ async fn main() -> Result<()> {
// Configure logger
setup_logger(args.log_level);

// Initialize Kafka topics
if let Err(e) = cgw_init_kafka_topics(&args.kafka_args, &args.redis_args).await {
error!("Failed to initialize kafka topics! Error: {e}");
return Err(e);
}

// Initialize runtimes
if let Err(e) = cgw_initialize_runtimes(args.wss_args.wss_t_num) {
error!("Failed to initialize CGW runtimes! Error: {e}");
Expand Down
33 changes: 11 additions & 22 deletions utils/docker/docker-compose-template.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ services:
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@docker-broker-1:9093
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_NODE_ID=1
- KAFKA_AUTO_CREATE_TOPICS_ENABLE=true
- KAFKA_AUTO_CREATE_TOPICS_ENABLE=false
- BITNAMI_DEBUG=yes
- KAFKA_CFG_NUM_PARTITIONS=2
healthcheck:
Expand Down Expand Up @@ -54,27 +54,6 @@ services:
- ALLOW_EMPTY_PASSWORD=yes
networks:
- cgw_multi_instances_network
init-broker-container:
image: docker.io/bitnami/kafka:latest
depends_on:
- broker
entrypoint: [ '/bin/sh', '-c' ]
command: |
"
# rather than giving sleep 15 use this
# to block init container to wait for Kafka broker to be ready
kafka-topics --bootstrap-server broker:9092 --list

# create CnC, CnC_Res, Connection, State, Infra_Realtime and Topology topics
kafka-topics.sh --create --bootstrap-server broker:9092 --partitions {{ cgw_instances_num }} --topic CnC
kafka-topics.sh --create --bootstrap-server broker:9092 --partitions 2 --topic CnC_Res
kafka-topics.sh --create --bootstrap-server broker:9092 --partitions 2 --topic Connection
kafka-topics.sh --create --bootstrap-server broker:9092 --partitions 2 --topic State
kafka-topics.sh --create --bootstrap-server broker:9092 --partitions 2 --topic Infra_Realtime
kafka-topics.sh --create --bootstrap-server broker:9092 --partitions 2 --topic Topology
"
networks:
- cgw_multi_instances_network

{% for i in range(0, cgw_instances_num) %}
cgw_instance_{{ i }}:
Expand Down Expand Up @@ -124,11 +103,21 @@ services:
depends_on:
broker:
condition: service_healthy
{% if i != 0 %}
cgw_instance_{{ i - 1 }}:
condition: service_healthy
{% endif %}
volumes:
- {{ default_certs_path }}:{{ container_certs_volume }}
- {{ default_certs_path }}:{{ container_nb_infra_certs_volume }}
networks:
- cgw_multi_instances_network
healthcheck:
test: ["CMD-SHELL", "ps -aux | grep cgw_instance_{{ i }}"]
interval: 10s
timeout: 5s
retries: 3
start_period: 3s
{% endfor %}

networks:
Expand Down

0 comments on commit 429d199

Please sign in to comment.