diff --git a/Cargo.lock b/Cargo.lock index 1b98329a3..32fffdbaf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2920,6 +2920,27 @@ dependencies = [ "libc", ] +[[package]] +name = "num_enum" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e613fc340b2220f734a8595782c551f1250e969d87d3be1ae0579e8d4065179" +dependencies = [ + "num_enum_derive", +] + +[[package]] +name = "num_enum_derive" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af1844ef2428cc3e1cb900be36181049ef3d3193c63e43026cfe202983b27a56" +dependencies = [ + "proc-macro-crate", + "proc-macro2", + "quote", + "syn 2.0.79", +] + [[package]] name = "num_threads" version = "0.1.7" @@ -3147,6 +3168,7 @@ dependencies = [ "prost", "prost-build", "rand", + "rdkafka", "regex", "relative-path", "reqwest 0.11.27", @@ -3617,6 +3639,35 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "rdkafka" +version = "0.36.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1beea247b9a7600a81d4cc33f659ce1a77e1988323d7d2809c7ed1c21f4c316d" +dependencies = [ + "futures-channel", + "futures-util", + "libc", + "log", + "rdkafka-sys", + "serde", + "serde_derive", + "serde_json", + "slab", + "tokio", +] + +[[package]] +name = "rdkafka-sys" +version = "4.8.0+2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ced38182dc436b3d9df0c77976f37a67134df26b050df1f0006688e46fc4c8be" +dependencies = [ + "libc", + "num_enum", + "pkg-config", +] + [[package]] name = "redox_syscall" version = "0.4.1" diff --git a/Cargo.toml b/Cargo.toml index 455321afe..47a42caeb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -66,6 +66,7 @@ num_cpus = "1.15" once_cell = "1.17.1" prometheus = { version = "0.13", features = ["process"] } rand = "0.8.5" +rdkafka = {version = "0.36.2", default-features = false, features = ["tokio"]} regex = "1.7.3" relative-path = { version = "1.7", features = ["serde"] } reqwest = { version = "0.11.27", default-features = false, features = [ diff --git a/src/cli.rs b/src/cli.rs index 982a2a765..4a159308a 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -119,6 +119,14 @@ pub struct Cli { pub trino_auth: Option, pub trino_schema: Option, pub trino_catalog: Option, + + // Kafka specific env vars + pub kafka_topic: Option, + pub kafka_host: Option, + pub kafka_group: Option, + pub kafka_client_id: Option, + pub kafka_security_protocol: Option, + pub kafka_partitions: Option, } impl Cli { @@ -164,6 +172,14 @@ impl Cli { pub const TRINO_AUTHORIZATION: &'static str = "p-trino-authorization"; pub const TRINO_SCHEMA: &'static str = "p-trino-schema"; + // Kafka specific env vars + pub const KAFKA_TOPIC: &'static str = "kafka-topic"; + pub const KAFKA_HOST: &'static str = "kafka-host"; + pub const KAFKA_GROUP: &'static str = "kafka-group"; + pub const KAFKA_CLIENT_ID: &'static str = "kafka-client-id"; + pub const KAFKA_SECURITY_PROTOCOL: &'static str = "kafka-security-protocol"; + pub const KAFKA_PARTITIONS: &'static str = "kafka-partitions"; + pub fn local_stream_data_path(&self, stream_name: &str) -> PathBuf { self.local_staging_path.join(stream_name) } @@ -177,6 +193,48 @@ impl Cli { pub fn create_cli_command_with_clap(name: &'static str) -> Command { Command::new(name).next_line_help(false) + .arg( + Arg::new(Self::KAFKA_TOPIC) + .long(Self::KAFKA_TOPIC) + .env("P_KAFKA_TOPIC") + .value_name("STRING") + .help("Kafka topic to subscribe to"), + ) + .arg( + Arg::new(Self::KAFKA_HOST) + .long(Self::KAFKA_HOST) + .env("P_KAFKA_HOST") + .value_name("STRING") + .help("Address and port for Kafka server"), + ) + .arg( + Arg::new(Self::KAFKA_GROUP) + .long(Self::KAFKA_GROUP) + .env("P_KAFKA_GROUP") + .value_name("STRING") + .help("Kafka group"), + ) + .arg( + Arg::new(Self::KAFKA_CLIENT_ID) + .long(Self::KAFKA_CLIENT_ID) + .env("P_KAFKA_CLIENT_ID") + .value_name("STRING") + .help("Kafka client id"), + ) + .arg( + Arg::new(Self::KAFKA_SECURITY_PROTOCOL) + .long(Self::KAFKA_SECURITY_PROTOCOL) + .env("P_KAFKA_SECURITY_PROTOCOL") + .value_name("STRING") + .help("Kafka security protocol (ssl)"), + ) + .arg( + Arg::new(Self::KAFKA_PARTITIONS) + .long(Self::KAFKA_PARTITIONS) + .env("P_KAFKA_PARTITIONS") + .value_name("STRING") + .help("Kafka partitions"), + ) .arg( Arg::new(Self::TRINO_ENDPOINT) .long(Self::TRINO_ENDPOINT) @@ -520,6 +578,13 @@ impl FromArgMatches for Cli { self.trino_schema = m.get_one::(Self::TRINO_SCHEMA).cloned(); self.trino_username = m.get_one::(Self::TRINO_USER_NAME).cloned(); + self.kafka_topic = m.get_one::(Self::KAFKA_TOPIC).cloned(); + self.kafka_host = m.get_one::(Self::KAFKA_HOST).cloned(); + self.kafka_group = m.get_one::(Self::KAFKA_GROUP).cloned(); + self.kafka_client_id = m.get_one::(Self::KAFKA_CLIENT_ID).cloned(); + self.kafka_security_protocol = m.get_one::(Self::KAFKA_SECURITY_PROTOCOL).cloned(); + self.kafka_partitions = m.get_one::(Self::KAFKA_PARTITIONS).cloned(); + self.local_cache_path = m.get_one::(Self::CACHE).cloned(); self.query_cache_path = m.get_one::(Self::QUERY_CACHE).cloned(); self.tls_cert_path = m.get_one::(Self::TLS_CERT).cloned(); diff --git a/src/kafka.rs b/src/kafka.rs new file mode 100644 index 000000000..eae86d851 --- /dev/null +++ b/src/kafka.rs @@ -0,0 +1,256 @@ +use arrow_schema::Field; +use chrono::Utc; +use futures_util::StreamExt; +use rdkafka::config::ClientConfig; +use rdkafka::consumer::stream_consumer::StreamConsumer; +use rdkafka::consumer::Consumer; +use rdkafka::error::{KafkaError as NativeKafkaError, RDKafkaError}; +use rdkafka::message::BorrowedMessage; +use rdkafka::util::Timeout; +use rdkafka::{Message, TopicPartitionList}; +use std::fmt::Display; +use std::num::ParseIntError; +use std::sync::Arc; +use std::{collections::HashMap, fmt::Debug, str::FromStr}; +use tracing::{debug, error, info}; + +use crate::option::CONFIG; +use crate::{ + event::{ + self, + error::EventError, + format::{self, EventFormat}, + }, + handlers::http::ingest::{create_stream_if_not_exists, PostError}, + metadata::{error::stream_info::MetadataError, STREAM_INFO}, + storage::StreamType, +}; + +enum SslProtocol { + Plaintext, + Ssl, + SaslPlaintext, + SaslSsl, +} +impl Display for SslProtocol { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(match self { + SslProtocol::Plaintext => "plaintext", + SslProtocol::Ssl => "ssl", + SslProtocol::SaslPlaintext => "sasl_plaintext", + SslProtocol::SaslSsl => "sasl_ssl", + }) + } +} +impl FromStr for SslProtocol { + type Err = KafkaError; + fn from_str(s: &str) -> Result { + match s.to_ascii_lowercase().as_str() { + "plaintext" => Ok(SslProtocol::Plaintext), + "ssl" => Ok(SslProtocol::Ssl), + "sasl_plaintext" => Ok(SslProtocol::SaslPlaintext), + "sasl_ssl" => Ok(SslProtocol::SaslSsl), + _ => Err(KafkaError::InvalidSslProtocolError(s.to_string())), + } + } +} + +#[allow(dead_code)] +#[derive(Debug, thiserror::Error)] +pub enum KafkaError { + #[error("Error loading environment variable {0}")] + NoVarError(&'static str), + + #[error("Kafka error {0}")] + NativeError(#[from] NativeKafkaError), + #[error("RDKafka error {0}")] + RDKError(#[from] RDKafkaError), + + #[error("Error parsing int {1} for environment variable {0}")] + ParseIntError(&'static str, ParseIntError), + #[error("Error parsing duration int {1} for environment variable {0}")] + ParseDurationError(&'static str, ParseIntError), + + #[error("Stream not found: #{0}")] + StreamNotFound(String), + #[error("Post error: #{0}")] + PostError(#[from] PostError), + #[error("Metadata error: #{0}")] + MetadataError(#[from] MetadataError), + #[error("Event error: #{0}")] + EventError(#[from] EventError), + #[error("JSON error: #{0}")] + JsonError(#[from] serde_json::Error), + #[error("Invalid group offset storage: #{0}")] + InvalidGroupOffsetStorage(String), + + #[error("Invalid SSL protocol: #{0}")] + InvalidSslProtocolError(String), + #[error("Invalid unicode for environment variable {0}")] + EnvNotUnicode(&'static str), + #[error("")] + DoNotPrintError, +} + +// // Commented out functions +// // Might come in handy later +// fn parse_auto_env(key: &'static str) -> Result, ::Err> +// where +// T: FromStr, +// { +// Ok(if let Ok(val) = env::var(key) { +// Some(val.parse::()?) +// } else { +// None +// }) +// } + +// fn handle_duration_env_prefix(key: &'static str) -> Result, ParseIntError> { +// if let Ok(raw_secs) = env::var(format!("{key}_S")) { +// Ok(Some(Duration::from_secs(u64::from_str(&raw_secs)?))) +// } else if let Ok(raw_secs) = env::var(format!("{key}_M")) { +// Ok(Some(Duration::from_secs(u64::from_str(&raw_secs)? * 60))) +// } else { +// Ok(None) +// } +// } + +// fn parse_i32_env(key: &'static str) -> Result, KafkaError> { +// parse_auto_env::(key).map_err(|raw| KafkaError::ParseIntError(key, raw)) +// } + +// fn parse_duration_env_prefixed(key_prefix: &'static str) -> Result, KafkaError> { +// handle_duration_env_prefix(key_prefix) +// .map_err(|raw| KafkaError::ParseDurationError(key_prefix, raw)) +// } + +fn setup_consumer() -> Result<(StreamConsumer, String), KafkaError> { + if let Some(topic) = &CONFIG.parseable.kafka_topic { + let host = if CONFIG.parseable.kafka_host.is_some() { + CONFIG.parseable.kafka_host.as_ref() + } else { + return Err(KafkaError::NoVarError("Please set P_KAKFA_HOST env var (To use Kafka integration env vars P_KAFKA_TOPIC, P_KAFKA_HOST, and P_KAFKA_GROUP are mandatory)")); + }; + + let group = if CONFIG.parseable.kafka_group.is_some() { + CONFIG.parseable.kafka_group.as_ref() + } else { + return Err(KafkaError::NoVarError("Please set P_KAKFA_GROUP env var (To use Kafka integration env vars P_KAFKA_TOPIC, P_KAFKA_HOST, and P_KAFKA_GROUP are mandatory)")); + }; + + let mut conf = ClientConfig::new(); + conf.set("bootstrap.servers", host.unwrap()); + conf.set("group.id", group.unwrap()); + + if let Some(val) = CONFIG.parseable.kafka_client_id.as_ref() { + conf.set("client.id", val); + } + + // if let Some(val) = get_flag_env_val("a")? { + // conf.set("api.version.request", val.to_string()); + // } + + if let Some(val) = CONFIG.parseable.kafka_security_protocol.as_ref() { + let mapped: SslProtocol = val.parse()?; + conf.set("security.protocol", mapped.to_string()); + } + + let consumer: StreamConsumer = conf.create()?; + consumer.subscribe(&[topic.as_str()])?; + + if let Some(vals_raw) = CONFIG.parseable.kafka_partitions.as_ref() { + let vals = vals_raw + .split(',') + .map(i32::from_str) + .collect::, ParseIntError>>() + .map_err(|raw| KafkaError::ParseIntError("P_KAFKA_PARTITIONS", raw))?; + + let mut parts = TopicPartitionList::new(); + for val in vals { + parts.add_partition(topic, val); + } + consumer.seek_partitions(parts, Timeout::Never)?; + } + Ok((consumer, topic.clone())) + } else { + // if the user hasn't even set KAFKA_TOPIC + // then they probably don't want to use the integration + // send back the DoNotPrint error + Err(KafkaError::DoNotPrintError) + } +} + +fn resolve_schema(stream_name: &str) -> Result>, KafkaError> { + let hash_map = STREAM_INFO.read().unwrap(); + let raw = hash_map + .get(stream_name) + .ok_or_else(|| KafkaError::StreamNotFound(stream_name.to_owned()))?; + Ok(raw.schema.clone()) +} + +async fn ingest_message<'a>(stream_name: &str, msg: BorrowedMessage<'a>) -> Result<(), KafkaError> { + if let Some(payload) = msg.payload() { + // stream should get created only if there is an incoming event, not before that + create_stream_if_not_exists(stream_name, &StreamType::UserDefined.to_string()).await?; + + let schema = resolve_schema(stream_name)?; + let event = format::json::Event { + data: serde_json::from_slice(payload)?, + tags: String::default(), + metadata: String::default(), + }; + + let time_partition = STREAM_INFO.get_time_partition(stream_name)?; + let static_schema_flag = STREAM_INFO.get_static_schema_flag(stream_name)?; + + let (rb, is_first) = event + .into_recordbatch(schema, static_schema_flag, time_partition) + .map_err(|err| KafkaError::PostError(PostError::CustomError(err.to_string())))?; + + event::Event { + rb, + stream_name: stream_name.to_string(), + origin_format: "json", + origin_size: payload.len() as u64, + is_first_event: is_first, + parsed_timestamp: Utc::now().naive_utc(), + time_partition: None, + custom_partition_values: HashMap::new(), + stream_type: StreamType::UserDefined, + } + .process() + .await?; + } else { + debug!("{} No payload for stream", stream_name); + } + Ok(()) +} + +pub async fn setup_integration() { + tokio::task::spawn(async move { + let (consumer, stream_name) = match setup_consumer() { + Ok(c) => c, + Err(err) => { + match err { + KafkaError::DoNotPrintError => { + debug!("P_KAFKA_TOPIC not set, skipping kafka integration"); + } + _ => { + error!("{err}"); + } + } + return; + } + }; + + info!("Setup kafka integration for {stream_name}"); + let mut stream = consumer.stream(); + + while let Ok(curr) = stream.next().await.unwrap() { + match ingest_message(&stream_name, curr).await { + Ok(_) => {} + Err(err) => error!("Unable to ingest incoming kafka message- {err}"), + }; + } + }); +} diff --git a/src/lib.rs b/src/lib.rs index 140c32dcc..9186adba4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -25,6 +25,7 @@ mod cli; mod event; pub mod handlers; pub mod hottier; +pub mod kafka; mod livetail; pub mod localcache; mod metadata; diff --git a/src/main.rs b/src/main.rs index f9f2e5993..641a2d7c8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -17,7 +17,7 @@ */ use parseable::{ - banner, + banner, kafka, option::{Mode, CONFIG}, rbac, storage, IngestServer, ParseableServer, QueryServer, Server, }; @@ -46,6 +46,11 @@ async fn main() -> anyhow::Result<()> { // keep metadata info in mem metadata.set_global(); + // load kafka server + if CONFIG.parseable.mode.ne(&Mode::Query) { + kafka::setup_integration().await; + } + server.init().await?; Ok(())