Skip to content

Commit

Permalink
implement kafka integration
Browse files Browse the repository at this point in the history
  • Loading branch information
parmesant committed Dec 10, 2024
1 parent acb26b9 commit 4432ee2
Show file tree
Hide file tree
Showing 6 changed files with 380 additions and 1 deletion.
51 changes: 51 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ num_cpus = "1.15"
once_cell = "1.17.1"
prometheus = { version = "0.13", features = ["process"] }
rand = "0.8.5"
rdkafka = {version = "0.36.2", default-features = false, features = ["tokio"]}
regex = "1.7.3"
relative-path = { version = "1.7", features = ["serde"] }
reqwest = { version = "0.11.27", default-features = false, features = [
Expand Down
65 changes: 65 additions & 0 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,14 @@ pub struct Cli {
pub trino_auth: Option<String>,
pub trino_schema: Option<String>,
pub trino_catalog: Option<String>,

// Kafka specific env vars
pub kafka_topic: Option<String>,
pub kafka_host: Option<String>,
pub kafka_group: Option<String>,
pub kafka_client_id: Option<String>,
pub kafka_security_protocol: Option<String>,
pub kafka_partitions: Option<String>,
}

impl Cli {
Expand Down Expand Up @@ -164,6 +172,14 @@ impl Cli {
pub const TRINO_AUTHORIZATION: &'static str = "p-trino-authorization";
pub const TRINO_SCHEMA: &'static str = "p-trino-schema";

// Kafka specific env vars
pub const KAFKA_TOPIC: &'static str = "kafka-topic";
pub const KAFKA_HOST: &'static str = "kafka-host";
pub const KAFKA_GROUP: &'static str = "kafka-group";
pub const KAFKA_CLIENT_ID: &'static str = "kafka-client-id";
pub const KAFKA_SECURITY_PROTOCOL: &'static str = "kafka-security-protocol";
pub const KAFKA_PARTITIONS: &'static str = "kafka-partitions";

pub fn local_stream_data_path(&self, stream_name: &str) -> PathBuf {
self.local_staging_path.join(stream_name)
}
Expand All @@ -177,6 +193,48 @@ impl Cli {

pub fn create_cli_command_with_clap(name: &'static str) -> Command {
Command::new(name).next_line_help(false)
.arg(
Arg::new(Self::KAFKA_TOPIC)
.long(Self::KAFKA_TOPIC)
.env("P_KAFKA_TOPIC")
.value_name("STRING")
.help("Kafka topic to subscribe to"),
)
.arg(
Arg::new(Self::KAFKA_HOST)
.long(Self::KAFKA_HOST)
.env("P_KAFKA_HOST")
.value_name("STRING")
.help("Address and port for Kafka server"),
)
.arg(
Arg::new(Self::KAFKA_GROUP)
.long(Self::KAFKA_GROUP)
.env("P_KAFKA_GROUP")
.value_name("STRING")
.help("Kafka group"),
)
.arg(
Arg::new(Self::KAFKA_CLIENT_ID)
.long(Self::KAFKA_CLIENT_ID)
.env("P_KAFKA_CLIENT_ID")
.value_name("STRING")
.help("Kafka client id"),
)
.arg(
Arg::new(Self::KAFKA_SECURITY_PROTOCOL)
.long(Self::KAFKA_SECURITY_PROTOCOL)
.env("P_KAFKA_SECURITY_PROTOCOL")
.value_name("STRING")
.help("Kafka security protocol (ssl)"),
)
.arg(
Arg::new(Self::KAFKA_PARTITIONS)
.long(Self::KAFKA_PARTITIONS)
.env("P_KAFKA_PARTITIONS")
.value_name("STRING")
.help("Kafka partitions"),
)
.arg(
Arg::new(Self::TRINO_ENDPOINT)
.long(Self::TRINO_ENDPOINT)
Expand Down Expand Up @@ -520,6 +578,13 @@ impl FromArgMatches for Cli {
self.trino_schema = m.get_one::<String>(Self::TRINO_SCHEMA).cloned();
self.trino_username = m.get_one::<String>(Self::TRINO_USER_NAME).cloned();

self.kafka_topic = m.get_one::<String>(Self::KAFKA_TOPIC).cloned();
self.kafka_host = m.get_one::<String>(Self::KAFKA_HOST).cloned();
self.kafka_group = m.get_one::<String>(Self::KAFKA_GROUP).cloned();
self.kafka_client_id = m.get_one::<String>(Self::KAFKA_CLIENT_ID).cloned();
self.kafka_security_protocol = m.get_one::<String>(Self::KAFKA_SECURITY_PROTOCOL).cloned();
self.kafka_partitions = m.get_one::<String>(Self::KAFKA_PARTITIONS).cloned();

self.local_cache_path = m.get_one::<PathBuf>(Self::CACHE).cloned();
self.query_cache_path = m.get_one::<PathBuf>(Self::QUERY_CACHE).cloned();
self.tls_cert_path = m.get_one::<PathBuf>(Self::TLS_CERT).cloned();
Expand Down
Loading

0 comments on commit 4432ee2

Please sign in to comment.