diff --git a/Cargo.lock b/Cargo.lock index 1b98329a3..32fffdbaf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2920,6 +2920,27 @@ dependencies = [ "libc", ] +[[package]] +name = "num_enum" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e613fc340b2220f734a8595782c551f1250e969d87d3be1ae0579e8d4065179" +dependencies = [ + "num_enum_derive", +] + +[[package]] +name = "num_enum_derive" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af1844ef2428cc3e1cb900be36181049ef3d3193c63e43026cfe202983b27a56" +dependencies = [ + "proc-macro-crate", + "proc-macro2", + "quote", + "syn 2.0.79", +] + [[package]] name = "num_threads" version = "0.1.7" @@ -3147,6 +3168,7 @@ dependencies = [ "prost", "prost-build", "rand", + "rdkafka", "regex", "relative-path", "reqwest 0.11.27", @@ -3617,6 +3639,35 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "rdkafka" +version = "0.36.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1beea247b9a7600a81d4cc33f659ce1a77e1988323d7d2809c7ed1c21f4c316d" +dependencies = [ + "futures-channel", + "futures-util", + "libc", + "log", + "rdkafka-sys", + "serde", + "serde_derive", + "serde_json", + "slab", + "tokio", +] + +[[package]] +name = "rdkafka-sys" +version = "4.8.0+2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ced38182dc436b3d9df0c77976f37a67134df26b050df1f0006688e46fc4c8be" +dependencies = [ + "libc", + "num_enum", + "pkg-config", +] + [[package]] name = "redox_syscall" version = "0.4.1" diff --git a/Cargo.toml b/Cargo.toml index 7186e4589..21938aadd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -66,6 +66,7 @@ num_cpus = "1.15" once_cell = "1.17.1" prometheus = { version = "0.13", features = ["process"] } rand = "0.8.5" +rdkafka = {version = "0.36.2", default-features = false, features = ["tokio"]} regex = "1.7.3" relative-path = { version = "1.7", features = ["serde"] } reqwest = { version = "0.11.27", default-features = false, features = [ diff --git a/src/cli.rs b/src/cli.rs index 193da9632..e5c16342a 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -22,6 +22,7 @@ use std::path::PathBuf; use url::Url; use crate::{ + kafka::SslProtocol, oidc::{self, OpenidConfig}, option::{validation, Compression, Mode}, }; @@ -107,6 +108,14 @@ pub struct Cli { pub trino_auth: Option, pub trino_schema: Option, pub trino_catalog: Option, + + // Kafka specific env vars + pub kafka_topics: Option, + pub kafka_host: Option, + pub kafka_group: Option, + pub kafka_client_id: Option, + pub kafka_security_protocol: Option, + pub kafka_partitions: Option, } impl Cli { @@ -148,6 +157,14 @@ impl Cli { pub const TRINO_AUTHORIZATION: &'static str = "p-trino-authorization"; pub const TRINO_SCHEMA: &'static str = "p-trino-schema"; + // Kafka specific env vars + pub const KAFKA_TOPICS: &'static str = "kafka-topics"; + pub const KAFKA_HOST: &'static str = "kafka-host"; + pub const KAFKA_GROUP: &'static str = "kafka-group"; + pub const KAFKA_CLIENT_ID: &'static str = "kafka-client-id"; + pub const KAFKA_SECURITY_PROTOCOL: &'static str = "kafka-security-protocol"; + pub const KAFKA_PARTITIONS: &'static str = "kafka-partitions"; + pub fn local_stream_data_path(&self, stream_name: &str) -> PathBuf { self.local_staging_path.join(stream_name) } @@ -161,6 +178,48 @@ impl Cli { pub fn create_cli_command_with_clap(name: &'static str) -> Command { Command::new(name).next_line_help(false) + .arg( + Arg::new(Self::KAFKA_TOPICS) + .long(Self::KAFKA_TOPICS) + .env("P_KAFKA_TOPICS") + .value_name("STRING") + .help("Kafka topics to subscribe to"), + ) + .arg( + Arg::new(Self::KAFKA_HOST) + .long(Self::KAFKA_HOST) + .env("P_KAFKA_HOST") + .value_name("STRING") + .help("Address and port for Kafka server"), + ) + .arg( + Arg::new(Self::KAFKA_GROUP) + .long(Self::KAFKA_GROUP) + .env("P_KAFKA_GROUP") + .value_name("STRING") + .help("Kafka group"), + ) + .arg( + Arg::new(Self::KAFKA_CLIENT_ID) + .long(Self::KAFKA_CLIENT_ID) + .env("P_KAFKA_CLIENT_ID") + .value_name("STRING") + .help("Kafka client id"), + ) + .arg( + Arg::new(Self::KAFKA_SECURITY_PROTOCOL) + .long(Self::KAFKA_SECURITY_PROTOCOL) + .env("P_KAFKA_SECURITY_PROTOCOL") + .value_name("STRING") + .help("Kafka security protocol"), + ) + .arg( + Arg::new(Self::KAFKA_PARTITIONS) + .long(Self::KAFKA_PARTITIONS) + .env("P_KAFKA_PARTITIONS") + .value_name("STRING") + .help("Kafka partitions"), + ) .arg( Arg::new(Self::TRINO_ENDPOINT) .long(Self::TRINO_ENDPOINT) @@ -466,6 +525,13 @@ impl FromArgMatches for Cli { self.trino_schema = m.get_one::(Self::TRINO_SCHEMA).cloned(); self.trino_username = m.get_one::(Self::TRINO_USER_NAME).cloned(); + self.kafka_topics = m.get_one::(Self::KAFKA_TOPICS).cloned(); + self.kafka_host = m.get_one::(Self::KAFKA_HOST).cloned(); + self.kafka_group = m.get_one::(Self::KAFKA_GROUP).cloned(); + self.kafka_client_id = m.get_one::(Self::KAFKA_CLIENT_ID).cloned(); + self.kafka_security_protocol = m.get_one::(Self::KAFKA_SECURITY_PROTOCOL).cloned(); + self.kafka_partitions = m.get_one::(Self::KAFKA_PARTITIONS).cloned(); + self.tls_cert_path = m.get_one::(Self::TLS_CERT).cloned(); self.tls_key_path = m.get_one::(Self::TLS_KEY).cloned(); self.trusted_ca_certs_path = m.get_one::(Self::TRUSTED_CA_CERTS_PATH).cloned(); diff --git a/src/kafka.rs b/src/kafka.rs new file mode 100644 index 000000000..9322720c6 --- /dev/null +++ b/src/kafka.rs @@ -0,0 +1,275 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use arrow_schema::Field; +use chrono::Utc; +use futures_util::StreamExt; +use itertools::Itertools; +use rdkafka::config::ClientConfig; +use rdkafka::consumer::stream_consumer::StreamConsumer; +use rdkafka::consumer::Consumer; +use rdkafka::error::{KafkaError as NativeKafkaError, RDKafkaError}; +use rdkafka::message::BorrowedMessage; +use rdkafka::util::Timeout; +use rdkafka::{Message, TopicPartitionList}; +use serde::{Deserialize, Serialize}; +use std::num::ParseIntError; +use std::sync::Arc; +use std::{collections::HashMap, fmt::Debug}; +use tracing::{debug, error, info, warn}; + +use crate::option::CONFIG; +use crate::{ + event::{ + self, + error::EventError, + format::{self, EventFormat}, + }, + handlers::http::ingest::{create_stream_if_not_exists, PostError}, + metadata::{error::stream_info::MetadataError, STREAM_INFO}, + storage::StreamType, +}; + +#[derive(Debug, Deserialize, Serialize, Clone, Copy)] +pub enum SslProtocol { + Plaintext, + Ssl, + SaslPlaintext, + SaslSsl, +} + +#[allow(dead_code)] +#[derive(Debug, thiserror::Error)] +pub enum KafkaError { + #[error("Please set env var {0} (To use Kafka integration env vars P_KAFKA_TOPICS, P_KAFKA_HOST, and P_KAFKA_GROUP are mandatory)")] + NoVarError(&'static str), + + #[error("Kafka error {0}")] + NativeError(#[from] NativeKafkaError), + #[error("RDKafka error {0}")] + RDKError(#[from] RDKafkaError), + + #[error("Error parsing int {1} for environment variable {0}")] + ParseIntError(&'static str, ParseIntError), + #[error("Error parsing duration int {1} for environment variable {0}")] + ParseDurationError(&'static str, ParseIntError), + + #[error("Stream not found: #{0}")] + StreamNotFound(String), + #[error("Post error: #{0}")] + PostError(#[from] PostError), + #[error("Metadata error: #{0}")] + MetadataError(#[from] MetadataError), + #[error("Event error: #{0}")] + EventError(#[from] EventError), + #[error("JSON error: #{0}")] + JsonError(#[from] serde_json::Error), + #[error("Invalid group offset storage: #{0}")] + InvalidGroupOffsetStorage(String), + + #[error("Invalid SSL protocol: #{0}")] + InvalidSslProtocolError(String), + #[error("Invalid unicode for environment variable {0}")] + EnvNotUnicode(&'static str), + #[error("")] + DoNotPrintError, +} + +// // Commented out functions +// // Might come in handy later +// fn parse_auto_env(key: &'static str) -> Result, ::Err> +// where +// T: FromStr, +// { +// Ok(if let Ok(val) = env::var(key) { +// Some(val.parse::()?) +// } else { +// None +// }) +// } + +// fn handle_duration_env_prefix(key: &'static str) -> Result, ParseIntError> { +// if let Ok(raw_secs) = env::var(format!("{key}_S")) { +// Ok(Some(Duration::from_secs(u64::from_str(&raw_secs)?))) +// } else if let Ok(raw_secs) = env::var(format!("{key}_M")) { +// Ok(Some(Duration::from_secs(u64::from_str(&raw_secs)? * 60))) +// } else { +// Ok(None) +// } +// } + +// fn parse_i32_env(key: &'static str) -> Result, KafkaError> { +// parse_auto_env::(key).map_err(|raw| KafkaError::ParseIntError(key, raw)) +// } + +// fn parse_duration_env_prefixed(key_prefix: &'static str) -> Result, KafkaError> { +// handle_duration_env_prefix(key_prefix) +// .map_err(|raw| KafkaError::ParseDurationError(key_prefix, raw)) +// } + +fn setup_consumer() -> Result<(StreamConsumer, Vec), KafkaError> { + if let Some(topics) = &CONFIG.parseable.kafka_topics { + // topics can be a comma separated list of topics to subscribe to + let topics = topics.split(",").map(|v| v.to_owned()).collect_vec(); + + let host = if CONFIG.parseable.kafka_host.is_some() { + CONFIG.parseable.kafka_host.as_ref() + } else { + return Err(KafkaError::NoVarError("P_KAKFA_HOST")); + }; + + let group = if CONFIG.parseable.kafka_group.is_some() { + CONFIG.parseable.kafka_group.as_ref() + } else { + return Err(KafkaError::NoVarError("P_KAKFA_GROUP")); + }; + + let mut conf = ClientConfig::new(); + conf.set("bootstrap.servers", host.unwrap()); + conf.set("group.id", group.unwrap()); + + if let Some(val) = CONFIG.parseable.kafka_client_id.as_ref() { + conf.set("client.id", val); + } + + // if let Some(val) = get_flag_env_val("a")? { + // conf.set("api.version.request", val.to_string()); + // } + + if let Some(ssl_protocol) = CONFIG.parseable.kafka_security_protocol.as_ref() { + conf.set("security.protocol", serde_json::to_string(&ssl_protocol)?); + } + + let consumer: StreamConsumer = conf.create()?; + consumer.subscribe(&topics.iter().map(|v| v.as_str()).collect_vec())?; + + if let Some(vals_raw) = CONFIG.parseable.kafka_partitions.as_ref() { + // partitions is a comma separated pairs of topic:partitions + let mut topic_partition_pairs = Vec::new(); + let mut set = true; + for vals in vals_raw.split(",") { + let intermediate = vals.split(":").collect_vec(); + if intermediate.len() != 2 { + warn!( + "Value for P_KAFKA_PARTITIONS is incorrect! Skipping setting partitions!" + ); + set = false; + break; + } + topic_partition_pairs.push(intermediate); + } + + if set { + let mut parts = TopicPartitionList::new(); + for pair in topic_partition_pairs { + let topic = pair[0]; + match pair[1].parse::() { + Ok(partition) => { + parts.add_partition(topic, partition); + } + Err(_) => warn!("Skipping setting partition for topic- {topic}"), + } + } + consumer.seek_partitions(parts, Timeout::Never)?; + } + } + Ok((consumer, topics.clone())) + } else { + // if the user hasn't even set KAFKA_TOPICS + // then they probably don't want to use the integration + // send back the DoNotPrint error + Err(KafkaError::DoNotPrintError) + } +} + +fn resolve_schema(stream_name: &str) -> Result>, KafkaError> { + let hash_map = STREAM_INFO.read().unwrap(); + let raw = hash_map + .get(stream_name) + .ok_or_else(|| KafkaError::StreamNotFound(stream_name.to_owned()))?; + Ok(raw.schema.clone()) +} + +async fn ingest_message(msg: BorrowedMessage<'_>) -> Result<(), KafkaError> { + let Some(payload) = msg.payload() else { + debug!("No payload received"); + return Ok(()); + }; + + let msg = msg.detach(); + let stream_name = msg.topic(); + + // stream should get created only if there is an incoming event, not before that + create_stream_if_not_exists(stream_name, &StreamType::UserDefined.to_string()).await?; + + let schema = resolve_schema(stream_name)?; + let event = format::json::Event { + data: serde_json::from_slice(payload)?, + tags: String::default(), + metadata: String::default(), + }; + + let time_partition = STREAM_INFO.get_time_partition(stream_name)?; + let static_schema_flag = STREAM_INFO.get_static_schema_flag(stream_name)?; + + let (rb, is_first) = event + .into_recordbatch(schema, static_schema_flag, time_partition) + .map_err(|err| KafkaError::PostError(PostError::CustomError(err.to_string())))?; + + event::Event { + rb, + stream_name: stream_name.to_string(), + origin_format: "json", + origin_size: payload.len() as u64, + is_first_event: is_first, + parsed_timestamp: Utc::now().naive_utc(), + time_partition: None, + custom_partition_values: HashMap::new(), + stream_type: StreamType::UserDefined, + } + .process() + .await?; + + Ok(()) +} + +pub async fn setup_integration() { + let (consumer, stream_names) = match setup_consumer() { + Ok(c) => c, + Err(err) => { + match err { + KafkaError::DoNotPrintError => { + debug!("P_KAFKA_TOPICS not set, skipping kafka integration"); + } + _ => { + error!("{err}"); + } + } + return; + } + }; + + info!("Setup kafka integration for {stream_names:?}"); + let mut stream = consumer.stream(); + + while let Ok(curr) = stream.next().await.unwrap() { + if let Err(err) = ingest_message(curr).await { + error!("Unable to ingest incoming kafka message- {err}") + } + } +} diff --git a/src/lib.rs b/src/lib.rs index ddb7f8244..7a85f54e7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -25,6 +25,7 @@ mod cli; mod event; pub mod handlers; pub mod hottier; +pub mod kafka; mod livetail; mod metadata; pub mod metrics; diff --git a/src/main.rs b/src/main.rs index f9f2e5993..d1663d539 100644 --- a/src/main.rs +++ b/src/main.rs @@ -17,7 +17,7 @@ */ use parseable::{ - banner, + banner, kafka, option::{Mode, CONFIG}, rbac, storage, IngestServer, ParseableServer, QueryServer, Server, }; @@ -46,6 +46,11 @@ async fn main() -> anyhow::Result<()> { // keep metadata info in mem metadata.set_global(); + // load kafka server + if CONFIG.parseable.mode != Mode::Query { + tokio::task::spawn(kafka::setup_integration()); + } + server.init().await?; Ok(())