Skip to content

Commit

Permalink
mend
Browse files Browse the repository at this point in the history
  • Loading branch information
parmesant committed Dec 4, 2024
1 parent ad59452 commit 5997e31
Showing 1 changed file with 29 additions and 33 deletions.
62 changes: 29 additions & 33 deletions server/src/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use std::fmt::Display;
use std::num::ParseIntError;
use std::sync::Arc;
use std::{collections::HashMap, fmt::Debug, str::FromStr};
use tokio::task::{self};

use crate::option::CONFIG;
use crate::{
Expand Down Expand Up @@ -89,7 +88,7 @@ pub enum KafkaError {
#[error("Invalid unicode for environment variable {0}")]
EnvNotUnicode(&'static str),
#[error("")]
DoNotPrintError
DoNotPrintError,
}

// // Commented out functions
Expand Down Expand Up @@ -126,13 +125,13 @@ pub enum KafkaError {

fn setup_consumer() -> Result<(StreamConsumer, String), KafkaError> {
if let Some(topic) = &CONFIG.parseable.kafka_topic {
let host = if let Some(_) = &CONFIG.parseable.kafka_host {
let host = if CONFIG.parseable.kafka_host.is_some() {
CONFIG.parseable.kafka_host.as_ref()
} else {
return Err(KafkaError::NoVarError("Please set P_KAKFA_HOST env var (To use Kafka integration env vars P_KAFKA_TOPIC, P_KAFKA_HOST, and P_KAFKA_GROUP are mandatory)"));
};

let group = if let Some(_) = &CONFIG.parseable.kafka_group {
let group = if CONFIG.parseable.kafka_group.is_some() {
CONFIG.parseable.kafka_group.as_ref()
} else {
return Err(KafkaError::NoVarError("Please set P_KAKFA_GROUP env var (To use Kafka integration env vars P_KAFKA_TOPIC, P_KAFKA_HOST, and P_KAFKA_GROUP are mandatory)"));
Expand All @@ -141,42 +140,42 @@ fn setup_consumer() -> Result<(StreamConsumer, String), KafkaError> {
let mut conf = ClientConfig::new();
conf.set("bootstrap.servers", host.unwrap());
conf.set("group.id", group.unwrap());

if let Some(val) = CONFIG.parseable.kafka_client_id.as_ref() {
conf.set("client.id", val);
}

// if let Some(val) = get_flag_env_val("a")? {
// conf.set("api.version.request", val.to_string());
// }

if let Some(val) = CONFIG.parseable.kafka_security_protocol.as_ref() {
let mapped: SslProtocol = val.parse()?;
conf.set("security.protocol", &mapped.to_string());
conf.set("security.protocol", mapped.to_string());
}

let consumer: StreamConsumer = conf.create()?;
consumer.subscribe(&[topic.as_str()])?;

if let Some(vals_raw) = CONFIG.parseable.kafka_partitions.as_ref() {
let vals = vals_raw
.split(',')
.map(i32::from_str)
.collect::<Result<Vec<i32>, ParseIntError>>()
.map_err(|raw| KafkaError::ParseIntError("P_KAFKA_PARTITIONS", raw))?;

let mut parts = TopicPartitionList::new();
for val in vals {
parts.add_partition(&topic, val);
parts.add_partition(topic, val);
}
consumer.seek_partitions(parts, Timeout::Never)?;
}
Ok((consumer,topic.clone()))
Ok((consumer, topic.clone()))
} else {
// if the user hasn't even set KAFKA_TOPIC
// then they probably don't want to use the integration
// send back the DoNotPrint error
return Err(KafkaError::DoNotPrintError)
Err(KafkaError::DoNotPrintError)
}
}

Expand All @@ -189,10 +188,9 @@ fn resolve_schema(stream_name: &str) -> Result<HashMap<String, Arc<Field>>, Kafk
}

async fn ingest_message<'a>(stream_name: &str, msg: BorrowedMessage<'a>) -> Result<(), KafkaError> {

if let Some(payload) = msg.payload() {
// stream should get created only if there is an incoming event, not before that
create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?;
create_stream_if_not_exists(stream_name, &StreamType::UserDefined.to_string()).await?;

let schema = resolve_schema(stream_name)?;
let event = format::json::Event {
Expand All @@ -201,13 +199,12 @@ async fn ingest_message<'a>(stream_name: &str, msg: BorrowedMessage<'a>) -> Resu
metadata: String::default(),
};

let time_partition = STREAM_INFO.get_time_partition(&stream_name)?;
let static_schema_flag = STREAM_INFO.get_static_schema_flag(&stream_name)?;
let time_partition = STREAM_INFO.get_time_partition(stream_name)?;
let static_schema_flag = STREAM_INFO.get_static_schema_flag(stream_name)?;

let (rb, is_first) = event.into_recordbatch(schema, static_schema_flag, time_partition)
.map_err(|err|{
KafkaError::PostError(PostError::CustomError(err.to_string()))
})?;
let (rb, is_first) = event
.into_recordbatch(schema, static_schema_flag, time_partition)
.map_err(|err| KafkaError::PostError(PostError::CustomError(err.to_string())))?;

event::Event {
rb,
Expand All @@ -229,39 +226,38 @@ async fn ingest_message<'a>(stream_name: &str, msg: BorrowedMessage<'a>) -> Resu
}

pub async fn setup_integration() {
task::spawn(async move {
tokio::task::spawn(async move {
// check if this is standalone or ingest, should not work on query
match CONFIG.parseable.mode {
crate::option::Mode::Ingest|
crate::option::Mode::All => {},
crate::option::Mode::Ingest | crate::option::Mode::All => {}
_ => {
log::error!("Kafka integration is only allowed on modes `ingest` or `all`");
return
},
return;
}
}

// if this is error then print and error message and return
let (consumer,stream_name) = match setup_consumer() {
let (consumer, stream_name) = match setup_consumer() {
Ok(c) => c,
Err(err) => {
match err {
KafkaError::DoNotPrintError => {
log::debug!("P_KAFKA_TOPIC not set, skipping kafka integration");
},
}
_ => {
log::error!("{err}");
}
}
return
},
return;
}
};

log::info!("Setup kafka integration for {stream_name}");
let mut stream = consumer.stream();

while let Ok(curr) = stream.next().await.unwrap() {
match ingest_message(&stream_name, curr).await {
Ok(_) => {},
Ok(_) => {}
Err(err) => log::error!("Unable to ingest incoming kafka message- {err}"),
};
}
Expand Down

0 comments on commit 5997e31

Please sign in to comment.