Skip to content

Commit

Permalink
Add Kafka TLS support
Browse files Browse the repository at this point in the history
  • Loading branch information
SviatoslavBoichuk committed Jan 24, 2025
1 parent 5ecb7d1 commit ec2dd93
Show file tree
Hide file tree
Showing 11 changed files with 210 additions and 36 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ futures = { version = "0.3.0" }
rlimit = { version = "0.10.1" }
tonic = { version = "0.11.0" }
prost = { version = "0.12" }
rdkafka = { version = "0.36.2" }
rdkafka = { version = "0.36.2", features = ["ssl"] }
eui48 = { version = "1.1.0", features = ["serde"] }
uuid = { version = "1.6.1", features = ["serde"] }
redis = { version = "0.25.3", features = [
Expand Down
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ CGW_WSS_CERT - Web socket server certificate file name
CGW_WSS_KEY - Web socket server private key file name
CGW_KAFKA_HOST - IP or hostname of remote KAFKA server to connect to (NB API)
CGW_KAFKA_PORT - PORT of remote KAFKA server to connect to
CGW_KAFKA_TLS - Utilize TLS connection with Kafka broker
CGW_KAFKA_CERT - Client certificate to validate Kafka broker
CGW_DB_HOST - IP or hostname of remote database server to connect to
CGW_DB_PORT - PORT of remote database server to connect to
CGW_DB_USER - PSQL DB username (credentials) to use upon connect to DB
Expand Down Expand Up @@ -125,6 +127,8 @@ declare -x CGW_GRPC_PUBLIC_PORT="50051"
declare -x CGW_ID="0"
declare -x CGW_KAFKA_HOST="localhost"
declare -x CGW_KAFKA_PORT="9092"
declare -x CGW_KAFKA_TLS="no"
declare -x CGW_KAFKA_CERT="kafka.truststore.pem"
declare -x CGW_LOG_LEVEL="debug"
declare -x CGW_REDIS_HOST="localhost"
declare -x CGW_REDIS_PORT="6379"
Expand Down
16 changes: 14 additions & 2 deletions run_cgw.sh
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ DEFAULT_KAFKA_HOST="docker-broker-1"
DEFAULT_KAFKA_PORT=9092
DEFAULT_KAFKA_CONSUME_TOPIC="CnC"
DEFAULT_KAFKA_PRODUCE_TOPIC="CnC_Res"
DEFAULT_KAFKA_TLS="no"
DEFAULT_KAFKA_CERT="kafka.truststore.pem"

DEFAULT_DB_HOST="docker-postgresql-1"
DEFAULT_DB_PORT=5432
Expand Down Expand Up @@ -71,6 +73,8 @@ export CGW_KAFKA_HOST="${CGW_KAFKA_HOST:-$DEFAULT_KAFKA_HOST}"
export CGW_KAFKA_PORT="${CGW_KAFKA_PORT:-$DEFAULT_KAFKA_PORT}"
export CGW_KAFKA_CONSUME_TOPIC="${CGW_KAFKA_CONSUME_TOPIC:-$DEFAULT_KAFKA_CONSUME_TOPIC}"
export CGW_KAFKA_PRODUCE_TOPIC="${CGW_KAFKA_PRODUCE_TOPIC:-$DEFAULT_KAFKA_PRODUCE_TOPIC}"
export CGW_KAFKA_TLS="${CGW_KAFKA_TLS:-$DEFAULT_KAFKA_TLS}"
export CGW_KAFKA_CERT="${CGW_KAFKA_CERT:-$DEFAULT_KAFKA_CERT}"
export CGW_DB_HOST="${CGW_DB_HOST:-$DEFAULT_DB_HOST}"
export CGW_DB_PORT="${CGW_DB_PORT:-$DEFAULT_DB_PORT}"
export CGW_DB_NAME="${CGW_DB_NAME:-$DEFAULT_DB_NAME}"
Expand Down Expand Up @@ -100,6 +104,7 @@ fi
if [ ! -f $CGW_CERTS_PATH/$CGW_WSS_CERT ] ||
[ ! -f $CGW_CERTS_PATH/$CGW_WSS_KEY ] ||
[ ! -f $CGW_CERTS_PATH/$CGW_WSS_CAS ] ||
[ ! -f $CGW_CERTS_PATH/$CGW_KAFKA_CERT] ||
[ ! -f $DEFAULT_CLIENT_CERTS_PATH/$DEFAULT_CLIENT_CERT ] ||
[ ! -f $DEFAULT_CLIENT_CERTS_PATH/$DEFAULT_CLIENT_KEY ]; then
echo "WARNING: at specified path $CGW_CERTS_PATH either CAS, CERT or KEY is missing!"
Expand All @@ -109,6 +114,7 @@ if [ ! -f $CGW_CERTS_PATH/$CGW_WSS_CERT ] ||
export CGW_WSS_CERT="$DEFAULT_WSS_CERT"
export CGW_WSS_KEY="$DEFAULT_WSS_KEY"
export CGW_NB_INFRA_CERTS_PATH="$DEFAULT_CERTS_PATH"
export CGW_KAFKA_CERT="$DEFAULT_KAFKA_CERT"

cd ./utils/cert_generator/ && \
rm ./certs/ca/*crt 2>&1 >/dev/null; \
Expand All @@ -121,10 +127,12 @@ if [ ! -f $CGW_CERTS_PATH/$CGW_WSS_CERT ] ||
./generate_certs.sh -s && \
./generate_certs.sh -c 1 -m 02:00:00:00:00:00 && \
cp ./certs/ca/ca.crt $DEFAULT_CERTS_PATH/$DEFAULT_WSS_CAS && \
cp ./certs/server/gw.crt $DEFAULT_CERTS_PATH/cert.pem && \
cp ./certs/server/gw.key $DEFAULT_CERTS_PATH/key.pem && \
cp ./certs/ca/ca.crt $DEFAULT_CERTS_PATH/$DEFAULT_KAFKA_CERT && \
cp ./certs/server/gw.crt $DEFAULT_CERTS_PATH/$DEFAULT_WSS_CERT && \
cp ./certs/server/gw.key $DEFAULT_CERTS_PATH/$DEFAULT_WSS_KEY && \
cp ./certs/client/*crt $DEFAULT_CLIENT_CERTS_PATH/$DEFAULT_CLIENT_CERT && \
cp ./certs/client/*key $DEFAULT_CLIENT_CERTS_PATH/$DEFAULT_CLIENT_KEY && \
chmod 644 $DEFAULT_CERTS_PATH/$DEFAULT_KAFKA_CERT && \
echo "Generating self-signed certificates done!"
fi

Expand All @@ -142,6 +150,8 @@ echo "CGW GRPC PUBLIC HOST/PORT : $CGW_GRPC_PUBLIC_HOST:$CGW_GRPC_PUBLIC
echo "CGW GRPC LISTENING IP/PORT : $CGW_GRPC_LISTENING_IP:$CGW_GRPC_LISTENING_PORT"
echo "CGW KAFKA HOST/PORT : $CGW_KAFKA_HOST:$CGW_KAFKA_PORT"
echo "CGW KAFKA TOPIC : $CGW_KAFKA_CONSUME_TOPIC:$CGW_KAFKA_PRODUCE_TOPIC"
echo "CGW KAFKA TLS : $CGW_KAFKA_TLS"
echo "CGW KAFKA CERT : $CGW_KAFKA_CERT"
echo "CGW DB NAME : $CGW_DB_NAME"
echo "CGW DB HOST/PORT : $CGW_DB_HOST:$CGW_DB_PORT"
echo "CGW DB TLS : $CGW_DB_TLS"
Expand Down Expand Up @@ -181,6 +191,8 @@ docker run \
-e CGW_KAFKA_PORT \
-e CGW_KAFKA_CONSUME_TOPIC \
-e CGW_KAFKA_PRODUCE_TOPIC \
-e CGW_KAFKA_TLS \
-e CGW_KAFKA_CERT \
-e CGW_DB_NAME \
-e CGW_DB_HOST \
-e CGW_DB_PORT \
Expand Down
17 changes: 16 additions & 1 deletion src/cgw_app_args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ const CGW_DEFAULT_GRPC_PUBLIC_HOST: &str = "localhost";
const CGW_DEFAULT_GRPC_PUBLIC_PORT: u16 = 50051;
const CGW_DEFAULT_KAFKA_HOST: &str = "localhost";
const CGW_DEFAULT_KAFKA_PORT: u16 = 9092;
const CGW_DEFAULT_KAFKA_TLS: &str = "no";
const CGW_DEFAULT_KAFKA_CERT: &str = "kafka.truststore.pem";
const CGW_DEFAULT_KAFKA_CONSUME_TOPIC: &str = "CnC";
const CGW_DEFAULT_KAFKA_PRODUCE_TOPIC: &str = "CnC_Res";
const CGW_DEFAULT_DB_HOST: &str = "localhost";
Expand Down Expand Up @@ -205,6 +207,10 @@ pub struct CGWKafkaArgs {
/// KAFKA topic where to produce messages
#[allow(unused)]
pub kafka_produce_topic: String,
/// Utilize TLS connection with Kafka broker
pub kafka_tls: bool,
/// Certificate name to validate Kafka broker
pub kafka_cert: String,
}

impl CGWKafkaArgs {
Expand Down Expand Up @@ -240,11 +246,19 @@ impl CGWKafkaArgs {
let kafka_produce_topic: String = env::var("CGW_KAFKA_PRODUCER_TOPIC")
.unwrap_or(CGW_DEFAULT_KAFKA_PRODUCE_TOPIC.to_string());

let kafka_tls_var: String =
env::var("CGW_KAFKA_TLS").unwrap_or(CGW_DEFAULT_KAFKA_TLS.to_string());
let kafka_tls = kafka_tls_var == "yes";
let kafka_cert: String =
env::var("CGW_KAFKA_CERT").unwrap_or(CGW_DEFAULT_KAFKA_CERT.to_string());

Ok(CGWKafkaArgs {
kafka_host,
kafka_port,
kafka_consume_topic,
kafka_produce_topic,
kafka_tls,
kafka_cert,
})
}
}
Expand Down Expand Up @@ -622,7 +636,7 @@ impl AppArgs {

let wss_args = CGWWSSArgs::parse()?;
let grpc_args = CGWGRPCArgs::parse()?;
let kafka_args = CGWKafkaArgs::parse()?;
let mut kafka_args = CGWKafkaArgs::parse()?;
let mut db_args = CGWDBArgs::parse()?;
let mut redis_args = CGWRedisArgs::parse()?;
let metrics_args = CGWMetricsArgs::parse()?;
Expand All @@ -631,6 +645,7 @@ impl AppArgs {
if nb_infra_tls {
redis_args.redis_tls = nb_infra_tls;
db_args.db_tls = nb_infra_tls;
kafka_args.kafka_tls = nb_infra_tls;
}

Ok(AppArgs {
Expand Down
26 changes: 18 additions & 8 deletions src/cgw_kafka_init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use rdkafka::config::ClientConfig;

use std::time::Duration;

use crate::cgw_tls::CGW_TLS_NB_INFRA_CERTS_PATH;

const CGW_KAFKA_TOPICS_LIST: [&str; 6] = [
"CnC",
"CnC_Res",
Expand Down Expand Up @@ -59,13 +61,21 @@ async fn cgw_get_active_cgw_number(redis_args: &CGWRedisArgs) -> Result<usize> {
}

fn cgw_create_kafka_admin(kafka_args: &CGWKafkaArgs) -> Result<AdminClient<DefaultClientContext>> {
let admin_client: AdminClient<DefaultClientContext> = match ClientConfig::new()
.set(
"bootstrap.servers",
format!("{}:{}", kafka_args.kafka_host, kafka_args.kafka_port),
)
.create()
{
let mut admin_config: ClientConfig = ClientConfig::new();
admin_config.set(
"bootstrap.servers",
format!("{}:{}", kafka_args.kafka_host, kafka_args.kafka_port),
);

if kafka_args.kafka_tls {
let cert_path = format!("{CGW_TLS_NB_INFRA_CERTS_PATH}/{}", kafka_args.kafka_cert);
admin_config
.set("security.protocol", "SSL")
.set("ssl.ca.location", &cert_path)
.set("ssl.endpoint.identification.algorithm", "none");
}

let admin_client = match admin_config.create() {
Ok(client) => client,
Err(e) => {
return Err(Error::KafkaInit(format!(
Expand Down Expand Up @@ -193,7 +203,7 @@ pub async fn cgw_init_kafka_topics(
let existing_topics: Vec<(String, usize)> = cgw_get_kafka_topics(&admin_client)?;

if existing_topics.is_empty() {
error!("Creating kafka topics");
info!("Creating kafka topics");
cgw_create_kafka_topics(&admin_client).await?;
} else {
// Find missing topics
Expand Down
37 changes: 27 additions & 10 deletions src/cgw_nb_api_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::cgw_ucentral_parser::CGWDeviceChange;
use crate::cgw_connection_server::{CGWConnectionNBAPIReqMsg, CGWConnectionNBAPIReqMsgOrigin};
use crate::cgw_errors::{Error, Result};
use crate::cgw_metrics::{CGWMetrics, CGWMetricsHealthComponent, CGWMetricsHealthComponentStatus};
use crate::cgw_tls::CGW_TLS_NB_INFRA_CERTS_PATH;

use eui48::MacAddress;
use futures::stream::TryStreamExt;
Expand Down Expand Up @@ -817,7 +818,8 @@ impl CGWKafkaConsumer {
}),
};

let consumer: CGWKafkaConsumerType = match ClientConfig::new()
let mut consumer_config = ClientConfig::new();
consumer_config
.set("group.id", GROUP_ID)
.set("client.id", GROUP_ID.to_string() + &cgw_id.to_string())
.set("group.instance.id", cgw_id.to_string())
Expand All @@ -828,11 +830,17 @@ impl CGWKafkaConsumer {
.set("enable.partition.eof", "false")
.set("session.timeout.ms", "6000")
.set("enable.auto.commit", "true")
//.set("statistics.interval.ms", "30000")
//.set("auto.offset.reset", "smallest")
.set_log_level(RDKafkaLogLevel::Debug)
.create_with_context(context)
{
.set_log_level(RDKafkaLogLevel::Debug);

if kafka_args.kafka_tls {
let cert_path = format!("{CGW_TLS_NB_INFRA_CERTS_PATH}/{}", kafka_args.kafka_cert);
consumer_config
.set("security.protocol", "SSL")
.set("ssl.ca.location", &cert_path)
.set("ssl.endpoint.identification.algorithm", "none");
}

let consumer: CGWKafkaConsumerType = match consumer_config.create_with_context(context) {
Ok(c) => c,
Err(e) => {
error!("Failed to create kafka consumer from config! Error: {e}");
Expand Down Expand Up @@ -878,14 +886,23 @@ impl CGWKafkaProducer {
}

fn create_producer(kafka_args: &CGWKafkaArgs) -> Result<CGWKafkaProducerType> {
let producer: FutureProducer = match ClientConfig::new()
let mut producer_config = ClientConfig::new();
producer_config
.set(
"bootstrap.servers",
kafka_args.kafka_host.clone() + ":" + &kafka_args.kafka_port.to_string(),
)
.set("message.timeout.ms", "5000")
.create()
{
.set("message.timeout.ms", "5000");

if kafka_args.kafka_tls {
let cert_path = format!("{CGW_TLS_NB_INFRA_CERTS_PATH}/{}", kafka_args.kafka_cert);
producer_config
.set("security.protocol", "SSL")
.set("ssl.ca.location", &cert_path)
.set("ssl.endpoint.identification.algorithm", "none");
}

let producer: FutureProducer = match producer_config.create() {
Ok(p) => p,
Err(e) => {
error!("Failed to create Kafka producer!");
Expand Down
2 changes: 1 addition & 1 deletion src/cgw_tls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use tokio_rustls::{
use x509_parser::parse_x509_certificate;

const CGW_TLS_CERTIFICATES_PATH: &str = "/etc/cgw/certs";
const CGW_TLS_NB_INFRA_CERTS_PATH: &str = "/etc/cgw/nb_infra/certs";
pub const CGW_TLS_NB_INFRA_CERTS_PATH: &str = "/etc/cgw/nb_infra/certs";

async fn cgw_tls_read_file(file_path: &str) -> Result<Vec<u8>> {
let mut file = match File::open(file_path) {
Expand Down
Loading

0 comments on commit ec2dd93

Please sign in to comment.