From 4432ee22bb11edf7e69f436bf86843f93bc75b6e Mon Sep 17 00:00:00 2001 From: anant Date: Thu, 5 Dec 2024 09:24:15 +0530 Subject: [PATCH 1/7] implement kafka integration --- Cargo.lock | 51 ++++++++++ Cargo.toml | 1 + src/cli.rs | 65 +++++++++++++ src/kafka.rs | 256 +++++++++++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 1 + src/main.rs | 7 +- 6 files changed, 380 insertions(+), 1 deletion(-) create mode 100644 src/kafka.rs diff --git a/Cargo.lock b/Cargo.lock index 1b98329a3..32fffdbaf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2920,6 +2920,27 @@ dependencies = [ "libc", ] +[[package]] +name = "num_enum" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e613fc340b2220f734a8595782c551f1250e969d87d3be1ae0579e8d4065179" +dependencies = [ + "num_enum_derive", +] + +[[package]] +name = "num_enum_derive" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af1844ef2428cc3e1cb900be36181049ef3d3193c63e43026cfe202983b27a56" +dependencies = [ + "proc-macro-crate", + "proc-macro2", + "quote", + "syn 2.0.79", +] + [[package]] name = "num_threads" version = "0.1.7" @@ -3147,6 +3168,7 @@ dependencies = [ "prost", "prost-build", "rand", + "rdkafka", "regex", "relative-path", "reqwest 0.11.27", @@ -3617,6 +3639,35 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "rdkafka" +version = "0.36.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1beea247b9a7600a81d4cc33f659ce1a77e1988323d7d2809c7ed1c21f4c316d" +dependencies = [ + "futures-channel", + "futures-util", + "libc", + "log", + "rdkafka-sys", + "serde", + "serde_derive", + "serde_json", + "slab", + "tokio", +] + +[[package]] +name = "rdkafka-sys" +version = "4.8.0+2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ced38182dc436b3d9df0c77976f37a67134df26b050df1f0006688e46fc4c8be" +dependencies = [ + "libc", + "num_enum", + "pkg-config", +] + [[package]] name = "redox_syscall" version = "0.4.1" diff --git a/Cargo.toml b/Cargo.toml index 455321afe..47a42caeb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -66,6 +66,7 @@ num_cpus = "1.15" once_cell = "1.17.1" prometheus = { version = "0.13", features = ["process"] } rand = "0.8.5" +rdkafka = {version = "0.36.2", default-features = false, features = ["tokio"]} regex = "1.7.3" relative-path = { version = "1.7", features = ["serde"] } reqwest = { version = "0.11.27", default-features = false, features = [ diff --git a/src/cli.rs b/src/cli.rs index 982a2a765..4a159308a 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -119,6 +119,14 @@ pub struct Cli { pub trino_auth: Option, pub trino_schema: Option, pub trino_catalog: Option, + + // Kafka specific env vars + pub kafka_topic: Option, + pub kafka_host: Option, + pub kafka_group: Option, + pub kafka_client_id: Option, + pub kafka_security_protocol: Option, + pub kafka_partitions: Option, } impl Cli { @@ -164,6 +172,14 @@ impl Cli { pub const TRINO_AUTHORIZATION: &'static str = "p-trino-authorization"; pub const TRINO_SCHEMA: &'static str = "p-trino-schema"; + // Kafka specific env vars + pub const KAFKA_TOPIC: &'static str = "kafka-topic"; + pub const KAFKA_HOST: &'static str = "kafka-host"; + pub const KAFKA_GROUP: &'static str = "kafka-group"; + pub const KAFKA_CLIENT_ID: &'static str = "kafka-client-id"; + pub const KAFKA_SECURITY_PROTOCOL: &'static str = "kafka-security-protocol"; + pub const KAFKA_PARTITIONS: &'static str = "kafka-partitions"; + pub fn local_stream_data_path(&self, stream_name: &str) -> PathBuf { self.local_staging_path.join(stream_name) } @@ -177,6 +193,48 @@ impl Cli { pub fn create_cli_command_with_clap(name: &'static str) -> Command { Command::new(name).next_line_help(false) + .arg( + Arg::new(Self::KAFKA_TOPIC) + .long(Self::KAFKA_TOPIC) + .env("P_KAFKA_TOPIC") + .value_name("STRING") + .help("Kafka topic to subscribe to"), + ) + .arg( + Arg::new(Self::KAFKA_HOST) + .long(Self::KAFKA_HOST) + .env("P_KAFKA_HOST") + .value_name("STRING") + .help("Address and port for Kafka server"), + ) + .arg( + Arg::new(Self::KAFKA_GROUP) + .long(Self::KAFKA_GROUP) + .env("P_KAFKA_GROUP") + .value_name("STRING") + .help("Kafka group"), + ) + .arg( + Arg::new(Self::KAFKA_CLIENT_ID) + .long(Self::KAFKA_CLIENT_ID) + .env("P_KAFKA_CLIENT_ID") + .value_name("STRING") + .help("Kafka client id"), + ) + .arg( + Arg::new(Self::KAFKA_SECURITY_PROTOCOL) + .long(Self::KAFKA_SECURITY_PROTOCOL) + .env("P_KAFKA_SECURITY_PROTOCOL") + .value_name("STRING") + .help("Kafka security protocol (ssl)"), + ) + .arg( + Arg::new(Self::KAFKA_PARTITIONS) + .long(Self::KAFKA_PARTITIONS) + .env("P_KAFKA_PARTITIONS") + .value_name("STRING") + .help("Kafka partitions"), + ) .arg( Arg::new(Self::TRINO_ENDPOINT) .long(Self::TRINO_ENDPOINT) @@ -520,6 +578,13 @@ impl FromArgMatches for Cli { self.trino_schema = m.get_one::(Self::TRINO_SCHEMA).cloned(); self.trino_username = m.get_one::(Self::TRINO_USER_NAME).cloned(); + self.kafka_topic = m.get_one::(Self::KAFKA_TOPIC).cloned(); + self.kafka_host = m.get_one::(Self::KAFKA_HOST).cloned(); + self.kafka_group = m.get_one::(Self::KAFKA_GROUP).cloned(); + self.kafka_client_id = m.get_one::(Self::KAFKA_CLIENT_ID).cloned(); + self.kafka_security_protocol = m.get_one::(Self::KAFKA_SECURITY_PROTOCOL).cloned(); + self.kafka_partitions = m.get_one::(Self::KAFKA_PARTITIONS).cloned(); + self.local_cache_path = m.get_one::(Self::CACHE).cloned(); self.query_cache_path = m.get_one::(Self::QUERY_CACHE).cloned(); self.tls_cert_path = m.get_one::(Self::TLS_CERT).cloned(); diff --git a/src/kafka.rs b/src/kafka.rs new file mode 100644 index 000000000..eae86d851 --- /dev/null +++ b/src/kafka.rs @@ -0,0 +1,256 @@ +use arrow_schema::Field; +use chrono::Utc; +use futures_util::StreamExt; +use rdkafka::config::ClientConfig; +use rdkafka::consumer::stream_consumer::StreamConsumer; +use rdkafka::consumer::Consumer; +use rdkafka::error::{KafkaError as NativeKafkaError, RDKafkaError}; +use rdkafka::message::BorrowedMessage; +use rdkafka::util::Timeout; +use rdkafka::{Message, TopicPartitionList}; +use std::fmt::Display; +use std::num::ParseIntError; +use std::sync::Arc; +use std::{collections::HashMap, fmt::Debug, str::FromStr}; +use tracing::{debug, error, info}; + +use crate::option::CONFIG; +use crate::{ + event::{ + self, + error::EventError, + format::{self, EventFormat}, + }, + handlers::http::ingest::{create_stream_if_not_exists, PostError}, + metadata::{error::stream_info::MetadataError, STREAM_INFO}, + storage::StreamType, +}; + +enum SslProtocol { + Plaintext, + Ssl, + SaslPlaintext, + SaslSsl, +} +impl Display for SslProtocol { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(match self { + SslProtocol::Plaintext => "plaintext", + SslProtocol::Ssl => "ssl", + SslProtocol::SaslPlaintext => "sasl_plaintext", + SslProtocol::SaslSsl => "sasl_ssl", + }) + } +} +impl FromStr for SslProtocol { + type Err = KafkaError; + fn from_str(s: &str) -> Result { + match s.to_ascii_lowercase().as_str() { + "plaintext" => Ok(SslProtocol::Plaintext), + "ssl" => Ok(SslProtocol::Ssl), + "sasl_plaintext" => Ok(SslProtocol::SaslPlaintext), + "sasl_ssl" => Ok(SslProtocol::SaslSsl), + _ => Err(KafkaError::InvalidSslProtocolError(s.to_string())), + } + } +} + +#[allow(dead_code)] +#[derive(Debug, thiserror::Error)] +pub enum KafkaError { + #[error("Error loading environment variable {0}")] + NoVarError(&'static str), + + #[error("Kafka error {0}")] + NativeError(#[from] NativeKafkaError), + #[error("RDKafka error {0}")] + RDKError(#[from] RDKafkaError), + + #[error("Error parsing int {1} for environment variable {0}")] + ParseIntError(&'static str, ParseIntError), + #[error("Error parsing duration int {1} for environment variable {0}")] + ParseDurationError(&'static str, ParseIntError), + + #[error("Stream not found: #{0}")] + StreamNotFound(String), + #[error("Post error: #{0}")] + PostError(#[from] PostError), + #[error("Metadata error: #{0}")] + MetadataError(#[from] MetadataError), + #[error("Event error: #{0}")] + EventError(#[from] EventError), + #[error("JSON error: #{0}")] + JsonError(#[from] serde_json::Error), + #[error("Invalid group offset storage: #{0}")] + InvalidGroupOffsetStorage(String), + + #[error("Invalid SSL protocol: #{0}")] + InvalidSslProtocolError(String), + #[error("Invalid unicode for environment variable {0}")] + EnvNotUnicode(&'static str), + #[error("")] + DoNotPrintError, +} + +// // Commented out functions +// // Might come in handy later +// fn parse_auto_env(key: &'static str) -> Result, ::Err> +// where +// T: FromStr, +// { +// Ok(if let Ok(val) = env::var(key) { +// Some(val.parse::()?) +// } else { +// None +// }) +// } + +// fn handle_duration_env_prefix(key: &'static str) -> Result, ParseIntError> { +// if let Ok(raw_secs) = env::var(format!("{key}_S")) { +// Ok(Some(Duration::from_secs(u64::from_str(&raw_secs)?))) +// } else if let Ok(raw_secs) = env::var(format!("{key}_M")) { +// Ok(Some(Duration::from_secs(u64::from_str(&raw_secs)? * 60))) +// } else { +// Ok(None) +// } +// } + +// fn parse_i32_env(key: &'static str) -> Result, KafkaError> { +// parse_auto_env::(key).map_err(|raw| KafkaError::ParseIntError(key, raw)) +// } + +// fn parse_duration_env_prefixed(key_prefix: &'static str) -> Result, KafkaError> { +// handle_duration_env_prefix(key_prefix) +// .map_err(|raw| KafkaError::ParseDurationError(key_prefix, raw)) +// } + +fn setup_consumer() -> Result<(StreamConsumer, String), KafkaError> { + if let Some(topic) = &CONFIG.parseable.kafka_topic { + let host = if CONFIG.parseable.kafka_host.is_some() { + CONFIG.parseable.kafka_host.as_ref() + } else { + return Err(KafkaError::NoVarError("Please set P_KAKFA_HOST env var (To use Kafka integration env vars P_KAFKA_TOPIC, P_KAFKA_HOST, and P_KAFKA_GROUP are mandatory)")); + }; + + let group = if CONFIG.parseable.kafka_group.is_some() { + CONFIG.parseable.kafka_group.as_ref() + } else { + return Err(KafkaError::NoVarError("Please set P_KAKFA_GROUP env var (To use Kafka integration env vars P_KAFKA_TOPIC, P_KAFKA_HOST, and P_KAFKA_GROUP are mandatory)")); + }; + + let mut conf = ClientConfig::new(); + conf.set("bootstrap.servers", host.unwrap()); + conf.set("group.id", group.unwrap()); + + if let Some(val) = CONFIG.parseable.kafka_client_id.as_ref() { + conf.set("client.id", val); + } + + // if let Some(val) = get_flag_env_val("a")? { + // conf.set("api.version.request", val.to_string()); + // } + + if let Some(val) = CONFIG.parseable.kafka_security_protocol.as_ref() { + let mapped: SslProtocol = val.parse()?; + conf.set("security.protocol", mapped.to_string()); + } + + let consumer: StreamConsumer = conf.create()?; + consumer.subscribe(&[topic.as_str()])?; + + if let Some(vals_raw) = CONFIG.parseable.kafka_partitions.as_ref() { + let vals = vals_raw + .split(',') + .map(i32::from_str) + .collect::, ParseIntError>>() + .map_err(|raw| KafkaError::ParseIntError("P_KAFKA_PARTITIONS", raw))?; + + let mut parts = TopicPartitionList::new(); + for val in vals { + parts.add_partition(topic, val); + } + consumer.seek_partitions(parts, Timeout::Never)?; + } + Ok((consumer, topic.clone())) + } else { + // if the user hasn't even set KAFKA_TOPIC + // then they probably don't want to use the integration + // send back the DoNotPrint error + Err(KafkaError::DoNotPrintError) + } +} + +fn resolve_schema(stream_name: &str) -> Result>, KafkaError> { + let hash_map = STREAM_INFO.read().unwrap(); + let raw = hash_map + .get(stream_name) + .ok_or_else(|| KafkaError::StreamNotFound(stream_name.to_owned()))?; + Ok(raw.schema.clone()) +} + +async fn ingest_message<'a>(stream_name: &str, msg: BorrowedMessage<'a>) -> Result<(), KafkaError> { + if let Some(payload) = msg.payload() { + // stream should get created only if there is an incoming event, not before that + create_stream_if_not_exists(stream_name, &StreamType::UserDefined.to_string()).await?; + + let schema = resolve_schema(stream_name)?; + let event = format::json::Event { + data: serde_json::from_slice(payload)?, + tags: String::default(), + metadata: String::default(), + }; + + let time_partition = STREAM_INFO.get_time_partition(stream_name)?; + let static_schema_flag = STREAM_INFO.get_static_schema_flag(stream_name)?; + + let (rb, is_first) = event + .into_recordbatch(schema, static_schema_flag, time_partition) + .map_err(|err| KafkaError::PostError(PostError::CustomError(err.to_string())))?; + + event::Event { + rb, + stream_name: stream_name.to_string(), + origin_format: "json", + origin_size: payload.len() as u64, + is_first_event: is_first, + parsed_timestamp: Utc::now().naive_utc(), + time_partition: None, + custom_partition_values: HashMap::new(), + stream_type: StreamType::UserDefined, + } + .process() + .await?; + } else { + debug!("{} No payload for stream", stream_name); + } + Ok(()) +} + +pub async fn setup_integration() { + tokio::task::spawn(async move { + let (consumer, stream_name) = match setup_consumer() { + Ok(c) => c, + Err(err) => { + match err { + KafkaError::DoNotPrintError => { + debug!("P_KAFKA_TOPIC not set, skipping kafka integration"); + } + _ => { + error!("{err}"); + } + } + return; + } + }; + + info!("Setup kafka integration for {stream_name}"); + let mut stream = consumer.stream(); + + while let Ok(curr) = stream.next().await.unwrap() { + match ingest_message(&stream_name, curr).await { + Ok(_) => {} + Err(err) => error!("Unable to ingest incoming kafka message- {err}"), + }; + } + }); +} diff --git a/src/lib.rs b/src/lib.rs index 140c32dcc..9186adba4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -25,6 +25,7 @@ mod cli; mod event; pub mod handlers; pub mod hottier; +pub mod kafka; mod livetail; pub mod localcache; mod metadata; diff --git a/src/main.rs b/src/main.rs index f9f2e5993..641a2d7c8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -17,7 +17,7 @@ */ use parseable::{ - banner, + banner, kafka, option::{Mode, CONFIG}, rbac, storage, IngestServer, ParseableServer, QueryServer, Server, }; @@ -46,6 +46,11 @@ async fn main() -> anyhow::Result<()> { // keep metadata info in mem metadata.set_global(); + // load kafka server + if CONFIG.parseable.mode.ne(&Mode::Query) { + kafka::setup_integration().await; + } + server.init().await?; Ok(()) From bc3019dcd2f6ebb3449c4ee0190b81e470981b94 Mon Sep 17 00:00:00 2001 From: anant Date: Mon, 16 Dec 2024 11:34:24 +0530 Subject: [PATCH 2/7] refactored: used serde --- src/cli.rs | 7 +++++-- src/kafka.rs | 32 +++++--------------------------- 2 files changed, 10 insertions(+), 29 deletions(-) diff --git a/src/cli.rs b/src/cli.rs index 4a159308a..a5e03f3b0 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -22,6 +22,7 @@ use std::path::PathBuf; use url::Url; use crate::{ + kafka::SslProtocol, oidc::{self, OpenidConfig}, option::{validation, Compression, Mode}, }; @@ -125,7 +126,7 @@ pub struct Cli { pub kafka_host: Option, pub kafka_group: Option, pub kafka_client_id: Option, - pub kafka_security_protocol: Option, + pub kafka_security_protocol: Option, pub kafka_partitions: Option, } @@ -582,7 +583,9 @@ impl FromArgMatches for Cli { self.kafka_host = m.get_one::(Self::KAFKA_HOST).cloned(); self.kafka_group = m.get_one::(Self::KAFKA_GROUP).cloned(); self.kafka_client_id = m.get_one::(Self::KAFKA_CLIENT_ID).cloned(); - self.kafka_security_protocol = m.get_one::(Self::KAFKA_SECURITY_PROTOCOL).cloned(); + self.kafka_security_protocol = m + .get_one::(Self::KAFKA_SECURITY_PROTOCOL) + .cloned(); self.kafka_partitions = m.get_one::(Self::KAFKA_PARTITIONS).cloned(); self.local_cache_path = m.get_one::(Self::CACHE).cloned(); diff --git a/src/kafka.rs b/src/kafka.rs index eae86d851..65a4987c9 100644 --- a/src/kafka.rs +++ b/src/kafka.rs @@ -8,7 +8,7 @@ use rdkafka::error::{KafkaError as NativeKafkaError, RDKafkaError}; use rdkafka::message::BorrowedMessage; use rdkafka::util::Timeout; use rdkafka::{Message, TopicPartitionList}; -use std::fmt::Display; +use serde::{Deserialize, Serialize}; use std::num::ParseIntError; use std::sync::Arc; use std::{collections::HashMap, fmt::Debug, str::FromStr}; @@ -26,34 +26,13 @@ use crate::{ storage::StreamType, }; -enum SslProtocol { +#[derive(Debug, Deserialize, Serialize, Clone, Copy)] +pub enum SslProtocol { Plaintext, Ssl, SaslPlaintext, SaslSsl, } -impl Display for SslProtocol { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.write_str(match self { - SslProtocol::Plaintext => "plaintext", - SslProtocol::Ssl => "ssl", - SslProtocol::SaslPlaintext => "sasl_plaintext", - SslProtocol::SaslSsl => "sasl_ssl", - }) - } -} -impl FromStr for SslProtocol { - type Err = KafkaError; - fn from_str(s: &str) -> Result { - match s.to_ascii_lowercase().as_str() { - "plaintext" => Ok(SslProtocol::Plaintext), - "ssl" => Ok(SslProtocol::Ssl), - "sasl_plaintext" => Ok(SslProtocol::SaslPlaintext), - "sasl_ssl" => Ok(SslProtocol::SaslSsl), - _ => Err(KafkaError::InvalidSslProtocolError(s.to_string())), - } - } -} #[allow(dead_code)] #[derive(Debug, thiserror::Error)] @@ -150,9 +129,8 @@ fn setup_consumer() -> Result<(StreamConsumer, String), KafkaError> { // conf.set("api.version.request", val.to_string()); // } - if let Some(val) = CONFIG.parseable.kafka_security_protocol.as_ref() { - let mapped: SslProtocol = val.parse()?; - conf.set("security.protocol", mapped.to_string()); + if let Some(ssl_protocol) = CONFIG.parseable.kafka_security_protocol.as_ref() { + conf.set("security.protocol", serde_json::to_string(&ssl_protocol)?); } let consumer: StreamConsumer = conf.create()?; From 83a319bf941895888353015ef4c7711a356ab026 Mon Sep 17 00:00:00 2001 From: anant Date: Mon, 16 Dec 2024 11:39:21 +0530 Subject: [PATCH 3/7] refactor: edited error msg --- src/kafka.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/kafka.rs b/src/kafka.rs index 65a4987c9..2c2aff987 100644 --- a/src/kafka.rs +++ b/src/kafka.rs @@ -37,7 +37,7 @@ pub enum SslProtocol { #[allow(dead_code)] #[derive(Debug, thiserror::Error)] pub enum KafkaError { - #[error("Error loading environment variable {0}")] + #[error("Please set env var {0} (To use Kafka integration env vars P_KAFKA_TOPIC, P_KAFKA_HOST, and P_KAFKA_GROUP are mandatory)")] NoVarError(&'static str), #[error("Kafka error {0}")] @@ -108,13 +108,13 @@ fn setup_consumer() -> Result<(StreamConsumer, String), KafkaError> { let host = if CONFIG.parseable.kafka_host.is_some() { CONFIG.parseable.kafka_host.as_ref() } else { - return Err(KafkaError::NoVarError("Please set P_KAKFA_HOST env var (To use Kafka integration env vars P_KAFKA_TOPIC, P_KAFKA_HOST, and P_KAFKA_GROUP are mandatory)")); + return Err(KafkaError::NoVarError("P_KAKFA_HOST")); }; let group = if CONFIG.parseable.kafka_group.is_some() { CONFIG.parseable.kafka_group.as_ref() } else { - return Err(KafkaError::NoVarError("Please set P_KAKFA_GROUP env var (To use Kafka integration env vars P_KAFKA_TOPIC, P_KAFKA_HOST, and P_KAFKA_GROUP are mandatory)")); + return Err(KafkaError::NoVarError("P_KAKFA_GROUP")); }; let mut conf = ClientConfig::new(); From 5f9ec0c6b7896e080f716c6141235cc4849a0501 Mon Sep 17 00:00:00 2001 From: parmesant Date: Mon, 16 Dec 2024 11:40:44 +0530 Subject: [PATCH 4/7] refactor: pattern match Co-authored-by: Devdutt Shenoi Signed-off-by: parmesant --- src/kafka.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/kafka.rs b/src/kafka.rs index 2c2aff987..07e7c9bf8 100644 --- a/src/kafka.rs +++ b/src/kafka.rs @@ -225,10 +225,9 @@ pub async fn setup_integration() { let mut stream = consumer.stream(); while let Ok(curr) = stream.next().await.unwrap() { - match ingest_message(&stream_name, curr).await { - Ok(_) => {} - Err(err) => error!("Unable to ingest incoming kafka message- {err}"), - }; + if let Err(err) = ingest_message(&stream_name, curr).await { + error!("Unable to ingest incoming kafka message- {err}"), + } } }); } From 9b21a13e8900855964d20e0ee9a22d1b89a23e9c Mon Sep 17 00:00:00 2001 From: anant Date: Mon, 16 Dec 2024 11:45:47 +0530 Subject: [PATCH 5/7] refactor: readability --- src/kafka.rs | 104 +++++++++++++++++++++++++-------------------------- src/main.rs | 4 +- 2 files changed, 54 insertions(+), 54 deletions(-) diff --git a/src/kafka.rs b/src/kafka.rs index 07e7c9bf8..7f129938c 100644 --- a/src/kafka.rs +++ b/src/kafka.rs @@ -167,67 +167,67 @@ fn resolve_schema(stream_name: &str) -> Result>, Kafk } async fn ingest_message<'a>(stream_name: &str, msg: BorrowedMessage<'a>) -> Result<(), KafkaError> { - if let Some(payload) = msg.payload() { - // stream should get created only if there is an incoming event, not before that - create_stream_if_not_exists(stream_name, &StreamType::UserDefined.to_string()).await?; - - let schema = resolve_schema(stream_name)?; - let event = format::json::Event { - data: serde_json::from_slice(payload)?, - tags: String::default(), - metadata: String::default(), - }; - - let time_partition = STREAM_INFO.get_time_partition(stream_name)?; - let static_schema_flag = STREAM_INFO.get_static_schema_flag(stream_name)?; - - let (rb, is_first) = event - .into_recordbatch(schema, static_schema_flag, time_partition) - .map_err(|err| KafkaError::PostError(PostError::CustomError(err.to_string())))?; - - event::Event { - rb, - stream_name: stream_name.to_string(), - origin_format: "json", - origin_size: payload.len() as u64, - is_first_event: is_first, - parsed_timestamp: Utc::now().naive_utc(), - time_partition: None, - custom_partition_values: HashMap::new(), - stream_type: StreamType::UserDefined, - } - .process() - .await?; - } else { + let Some(payload) = msg.payload() else { debug!("{} No payload for stream", stream_name); + return Ok(()); + }; + + // stream should get created only if there is an incoming event, not before that + create_stream_if_not_exists(stream_name, &StreamType::UserDefined.to_string()).await?; + + let schema = resolve_schema(stream_name)?; + let event = format::json::Event { + data: serde_json::from_slice(payload)?, + tags: String::default(), + metadata: String::default(), + }; + + let time_partition = STREAM_INFO.get_time_partition(stream_name)?; + let static_schema_flag = STREAM_INFO.get_static_schema_flag(stream_name)?; + + let (rb, is_first) = event + .into_recordbatch(schema, static_schema_flag, time_partition) + .map_err(|err| KafkaError::PostError(PostError::CustomError(err.to_string())))?; + + event::Event { + rb, + stream_name: stream_name.to_string(), + origin_format: "json", + origin_size: payload.len() as u64, + is_first_event: is_first, + parsed_timestamp: Utc::now().naive_utc(), + time_partition: None, + custom_partition_values: HashMap::new(), + stream_type: StreamType::UserDefined, } + .process() + .await?; + Ok(()) } pub async fn setup_integration() { - tokio::task::spawn(async move { - let (consumer, stream_name) = match setup_consumer() { - Ok(c) => c, - Err(err) => { - match err { - KafkaError::DoNotPrintError => { - debug!("P_KAFKA_TOPIC not set, skipping kafka integration"); - } - _ => { - error!("{err}"); - } + let (consumer, stream_name) = match setup_consumer() { + Ok(c) => c, + Err(err) => { + match err { + KafkaError::DoNotPrintError => { + debug!("P_KAFKA_TOPIC not set, skipping kafka integration"); + } + _ => { + error!("{err}"); } - return; } - }; + return; + } + }; - info!("Setup kafka integration for {stream_name}"); - let mut stream = consumer.stream(); + info!("Setup kafka integration for {stream_name}"); + let mut stream = consumer.stream(); - while let Ok(curr) = stream.next().await.unwrap() { - if let Err(err) = ingest_message(&stream_name, curr).await { - error!("Unable to ingest incoming kafka message- {err}"), - } + while let Ok(curr) = stream.next().await.unwrap() { + if let Err(err) = ingest_message(&stream_name, curr).await { + error!("Unable to ingest incoming kafka message- {err}") } - }); + } } diff --git a/src/main.rs b/src/main.rs index 641a2d7c8..d1663d539 100644 --- a/src/main.rs +++ b/src/main.rs @@ -47,8 +47,8 @@ async fn main() -> anyhow::Result<()> { metadata.set_global(); // load kafka server - if CONFIG.parseable.mode.ne(&Mode::Query) { - kafka::setup_integration().await; + if CONFIG.parseable.mode != Mode::Query { + tokio::task::spawn(kafka::setup_integration()); } server.init().await?; From 266a5a6ef5691dd175a8bcf51ce77fe4ec799b29 Mon Sep 17 00:00:00 2001 From: parmesant Date: Tue, 17 Dec 2024 09:24:30 +0530 Subject: [PATCH 6/7] added license Signed-off-by: parmesant --- src/kafka.rs | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/src/kafka.rs b/src/kafka.rs index 7f129938c..a330dae5e 100644 --- a/src/kafka.rs +++ b/src/kafka.rs @@ -1,3 +1,21 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + use arrow_schema::Field; use chrono::Utc; use futures_util::StreamExt; From 7c2189f922c196633871d03961b45fdd4a6601ba Mon Sep 17 00:00:00 2001 From: anant Date: Wed, 18 Dec 2024 12:24:42 +0530 Subject: [PATCH 7/7] update: sub to multiple topics --- src/cli.rs | 12 ++++----- src/kafka.rs | 72 ++++++++++++++++++++++++++++++++++------------------ 2 files changed, 54 insertions(+), 30 deletions(-) diff --git a/src/cli.rs b/src/cli.rs index a5e03f3b0..7b1f9a60d 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -122,7 +122,7 @@ pub struct Cli { pub trino_catalog: Option, // Kafka specific env vars - pub kafka_topic: Option, + pub kafka_topics: Option, pub kafka_host: Option, pub kafka_group: Option, pub kafka_client_id: Option, @@ -174,7 +174,7 @@ impl Cli { pub const TRINO_SCHEMA: &'static str = "p-trino-schema"; // Kafka specific env vars - pub const KAFKA_TOPIC: &'static str = "kafka-topic"; + pub const KAFKA_TOPICS: &'static str = "kafka-topics"; pub const KAFKA_HOST: &'static str = "kafka-host"; pub const KAFKA_GROUP: &'static str = "kafka-group"; pub const KAFKA_CLIENT_ID: &'static str = "kafka-client-id"; @@ -195,9 +195,9 @@ impl Cli { pub fn create_cli_command_with_clap(name: &'static str) -> Command { Command::new(name).next_line_help(false) .arg( - Arg::new(Self::KAFKA_TOPIC) - .long(Self::KAFKA_TOPIC) - .env("P_KAFKA_TOPIC") + Arg::new(Self::KAFKA_TOPICS) + .long(Self::KAFKA_TOPICS) + .env("P_KAFKA_TOPICS") .value_name("STRING") .help("Kafka topic to subscribe to"), ) @@ -579,7 +579,7 @@ impl FromArgMatches for Cli { self.trino_schema = m.get_one::(Self::TRINO_SCHEMA).cloned(); self.trino_username = m.get_one::(Self::TRINO_USER_NAME).cloned(); - self.kafka_topic = m.get_one::(Self::KAFKA_TOPIC).cloned(); + self.kafka_topics = m.get_one::(Self::KAFKA_TOPICS).cloned(); self.kafka_host = m.get_one::(Self::KAFKA_HOST).cloned(); self.kafka_group = m.get_one::(Self::KAFKA_GROUP).cloned(); self.kafka_client_id = m.get_one::(Self::KAFKA_CLIENT_ID).cloned(); diff --git a/src/kafka.rs b/src/kafka.rs index a330dae5e..9322720c6 100644 --- a/src/kafka.rs +++ b/src/kafka.rs @@ -19,6 +19,7 @@ use arrow_schema::Field; use chrono::Utc; use futures_util::StreamExt; +use itertools::Itertools; use rdkafka::config::ClientConfig; use rdkafka::consumer::stream_consumer::StreamConsumer; use rdkafka::consumer::Consumer; @@ -29,8 +30,8 @@ use rdkafka::{Message, TopicPartitionList}; use serde::{Deserialize, Serialize}; use std::num::ParseIntError; use std::sync::Arc; -use std::{collections::HashMap, fmt::Debug, str::FromStr}; -use tracing::{debug, error, info}; +use std::{collections::HashMap, fmt::Debug}; +use tracing::{debug, error, info, warn}; use crate::option::CONFIG; use crate::{ @@ -55,7 +56,7 @@ pub enum SslProtocol { #[allow(dead_code)] #[derive(Debug, thiserror::Error)] pub enum KafkaError { - #[error("Please set env var {0} (To use Kafka integration env vars P_KAFKA_TOPIC, P_KAFKA_HOST, and P_KAFKA_GROUP are mandatory)")] + #[error("Please set env var {0} (To use Kafka integration env vars P_KAFKA_TOPICS, P_KAFKA_HOST, and P_KAFKA_GROUP are mandatory)")] NoVarError(&'static str), #[error("Kafka error {0}")] @@ -121,8 +122,11 @@ pub enum KafkaError { // .map_err(|raw| KafkaError::ParseDurationError(key_prefix, raw)) // } -fn setup_consumer() -> Result<(StreamConsumer, String), KafkaError> { - if let Some(topic) = &CONFIG.parseable.kafka_topic { +fn setup_consumer() -> Result<(StreamConsumer, Vec), KafkaError> { + if let Some(topics) = &CONFIG.parseable.kafka_topics { + // topics can be a comma separated list of topics to subscribe to + let topics = topics.split(",").map(|v| v.to_owned()).collect_vec(); + let host = if CONFIG.parseable.kafka_host.is_some() { CONFIG.parseable.kafka_host.as_ref() } else { @@ -152,24 +156,41 @@ fn setup_consumer() -> Result<(StreamConsumer, String), KafkaError> { } let consumer: StreamConsumer = conf.create()?; - consumer.subscribe(&[topic.as_str()])?; + consumer.subscribe(&topics.iter().map(|v| v.as_str()).collect_vec())?; if let Some(vals_raw) = CONFIG.parseable.kafka_partitions.as_ref() { - let vals = vals_raw - .split(',') - .map(i32::from_str) - .collect::, ParseIntError>>() - .map_err(|raw| KafkaError::ParseIntError("P_KAFKA_PARTITIONS", raw))?; - - let mut parts = TopicPartitionList::new(); - for val in vals { - parts.add_partition(topic, val); + // partitions is a comma separated pairs of topic:partitions + let mut topic_partition_pairs = Vec::new(); + let mut set = true; + for vals in vals_raw.split(",") { + let intermediate = vals.split(":").collect_vec(); + if intermediate.len() != 2 { + warn!( + "Value for P_KAFKA_PARTITIONS is incorrect! Skipping setting partitions!" + ); + set = false; + break; + } + topic_partition_pairs.push(intermediate); + } + + if set { + let mut parts = TopicPartitionList::new(); + for pair in topic_partition_pairs { + let topic = pair[0]; + match pair[1].parse::() { + Ok(partition) => { + parts.add_partition(topic, partition); + } + Err(_) => warn!("Skipping setting partition for topic- {topic}"), + } + } + consumer.seek_partitions(parts, Timeout::Never)?; } - consumer.seek_partitions(parts, Timeout::Never)?; } - Ok((consumer, topic.clone())) + Ok((consumer, topics.clone())) } else { - // if the user hasn't even set KAFKA_TOPIC + // if the user hasn't even set KAFKA_TOPICS // then they probably don't want to use the integration // send back the DoNotPrint error Err(KafkaError::DoNotPrintError) @@ -184,12 +205,15 @@ fn resolve_schema(stream_name: &str) -> Result>, Kafk Ok(raw.schema.clone()) } -async fn ingest_message<'a>(stream_name: &str, msg: BorrowedMessage<'a>) -> Result<(), KafkaError> { +async fn ingest_message(msg: BorrowedMessage<'_>) -> Result<(), KafkaError> { let Some(payload) = msg.payload() else { - debug!("{} No payload for stream", stream_name); + debug!("No payload received"); return Ok(()); }; + let msg = msg.detach(); + let stream_name = msg.topic(); + // stream should get created only if there is an incoming event, not before that create_stream_if_not_exists(stream_name, &StreamType::UserDefined.to_string()).await?; @@ -225,12 +249,12 @@ async fn ingest_message<'a>(stream_name: &str, msg: BorrowedMessage<'a>) -> Resu } pub async fn setup_integration() { - let (consumer, stream_name) = match setup_consumer() { + let (consumer, stream_names) = match setup_consumer() { Ok(c) => c, Err(err) => { match err { KafkaError::DoNotPrintError => { - debug!("P_KAFKA_TOPIC not set, skipping kafka integration"); + debug!("P_KAFKA_TOPICS not set, skipping kafka integration"); } _ => { error!("{err}"); @@ -240,11 +264,11 @@ pub async fn setup_integration() { } }; - info!("Setup kafka integration for {stream_name}"); + info!("Setup kafka integration for {stream_names:?}"); let mut stream = consumer.stream(); while let Ok(curr) = stream.next().await.unwrap() { - if let Err(err) = ingest_message(&stream_name, curr).await { + if let Err(err) = ingest_message(curr).await { error!("Unable to ingest incoming kafka message- {err}") } }