Skip to content

Commit

Permalink
refactored: used serde
Browse files Browse the repository at this point in the history
  • Loading branch information
parmesant committed Dec 16, 2024
1 parent 05b9965 commit bc3019d
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 29 deletions.
7 changes: 5 additions & 2 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::path::PathBuf;
use url::Url;

use crate::{
kafka::SslProtocol,
oidc::{self, OpenidConfig},
option::{validation, Compression, Mode},
};
Expand Down Expand Up @@ -125,7 +126,7 @@ pub struct Cli {
pub kafka_host: Option<String>,
pub kafka_group: Option<String>,
pub kafka_client_id: Option<String>,
pub kafka_security_protocol: Option<String>,
pub kafka_security_protocol: Option<SslProtocol>,
pub kafka_partitions: Option<String>,
}

Expand Down Expand Up @@ -582,7 +583,9 @@ impl FromArgMatches for Cli {
self.kafka_host = m.get_one::<String>(Self::KAFKA_HOST).cloned();
self.kafka_group = m.get_one::<String>(Self::KAFKA_GROUP).cloned();
self.kafka_client_id = m.get_one::<String>(Self::KAFKA_CLIENT_ID).cloned();
self.kafka_security_protocol = m.get_one::<String>(Self::KAFKA_SECURITY_PROTOCOL).cloned();
self.kafka_security_protocol = m
.get_one::<SslProtocol>(Self::KAFKA_SECURITY_PROTOCOL)
.cloned();
self.kafka_partitions = m.get_one::<String>(Self::KAFKA_PARTITIONS).cloned();

self.local_cache_path = m.get_one::<PathBuf>(Self::CACHE).cloned();
Expand Down
32 changes: 5 additions & 27 deletions src/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use rdkafka::error::{KafkaError as NativeKafkaError, RDKafkaError};
use rdkafka::message::BorrowedMessage;
use rdkafka::util::Timeout;
use rdkafka::{Message, TopicPartitionList};
use std::fmt::Display;
use serde::{Deserialize, Serialize};
use std::num::ParseIntError;
use std::sync::Arc;
use std::{collections::HashMap, fmt::Debug, str::FromStr};
Expand All @@ -26,34 +26,13 @@ use crate::{
storage::StreamType,
};

enum SslProtocol {
#[derive(Debug, Deserialize, Serialize, Clone, Copy)]
pub enum SslProtocol {
Plaintext,
Ssl,
SaslPlaintext,
SaslSsl,
}
impl Display for SslProtocol {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(match self {
SslProtocol::Plaintext => "plaintext",
SslProtocol::Ssl => "ssl",
SslProtocol::SaslPlaintext => "sasl_plaintext",
SslProtocol::SaslSsl => "sasl_ssl",
})
}
}
impl FromStr for SslProtocol {
type Err = KafkaError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_ascii_lowercase().as_str() {
"plaintext" => Ok(SslProtocol::Plaintext),
"ssl" => Ok(SslProtocol::Ssl),
"sasl_plaintext" => Ok(SslProtocol::SaslPlaintext),
"sasl_ssl" => Ok(SslProtocol::SaslSsl),
_ => Err(KafkaError::InvalidSslProtocolError(s.to_string())),
}
}
}

#[allow(dead_code)]
#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -150,9 +129,8 @@ fn setup_consumer() -> Result<(StreamConsumer, String), KafkaError> {
// conf.set("api.version.request", val.to_string());
// }

if let Some(val) = CONFIG.parseable.kafka_security_protocol.as_ref() {
let mapped: SslProtocol = val.parse()?;
conf.set("security.protocol", mapped.to_string());
if let Some(ssl_protocol) = CONFIG.parseable.kafka_security_protocol.as_ref() {
conf.set("security.protocol", serde_json::to_string(&ssl_protocol)?);
}

let consumer: StreamConsumer = conf.create()?;
Expand Down

0 comments on commit bc3019d

Please sign in to comment.