Skip to content

Commit

Permalink
refactor: readability
Browse files Browse the repository at this point in the history
  • Loading branch information
parmesant committed Dec 16, 2024
1 parent 5f9ec0c commit 9b21a13
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 54 deletions.
104 changes: 52 additions & 52 deletions src/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,67 +167,67 @@ fn resolve_schema(stream_name: &str) -> Result<HashMap<String, Arc<Field>>, Kafk
}

async fn ingest_message<'a>(stream_name: &str, msg: BorrowedMessage<'a>) -> Result<(), KafkaError> {
if let Some(payload) = msg.payload() {
// stream should get created only if there is an incoming event, not before that
create_stream_if_not_exists(stream_name, &StreamType::UserDefined.to_string()).await?;

let schema = resolve_schema(stream_name)?;
let event = format::json::Event {
data: serde_json::from_slice(payload)?,
tags: String::default(),
metadata: String::default(),
};

let time_partition = STREAM_INFO.get_time_partition(stream_name)?;
let static_schema_flag = STREAM_INFO.get_static_schema_flag(stream_name)?;

let (rb, is_first) = event
.into_recordbatch(schema, static_schema_flag, time_partition)
.map_err(|err| KafkaError::PostError(PostError::CustomError(err.to_string())))?;

event::Event {
rb,
stream_name: stream_name.to_string(),
origin_format: "json",
origin_size: payload.len() as u64,
is_first_event: is_first,
parsed_timestamp: Utc::now().naive_utc(),
time_partition: None,
custom_partition_values: HashMap::new(),
stream_type: StreamType::UserDefined,
}
.process()
.await?;
} else {
let Some(payload) = msg.payload() else {
debug!("{} No payload for stream", stream_name);
return Ok(());
};

// stream should get created only if there is an incoming event, not before that
create_stream_if_not_exists(stream_name, &StreamType::UserDefined.to_string()).await?;

let schema = resolve_schema(stream_name)?;
let event = format::json::Event {
data: serde_json::from_slice(payload)?,
tags: String::default(),
metadata: String::default(),
};

let time_partition = STREAM_INFO.get_time_partition(stream_name)?;
let static_schema_flag = STREAM_INFO.get_static_schema_flag(stream_name)?;

let (rb, is_first) = event
.into_recordbatch(schema, static_schema_flag, time_partition)
.map_err(|err| KafkaError::PostError(PostError::CustomError(err.to_string())))?;

event::Event {
rb,
stream_name: stream_name.to_string(),
origin_format: "json",
origin_size: payload.len() as u64,
is_first_event: is_first,
parsed_timestamp: Utc::now().naive_utc(),
time_partition: None,
custom_partition_values: HashMap::new(),
stream_type: StreamType::UserDefined,
}
.process()
.await?;

Ok(())
}

pub async fn setup_integration() {
tokio::task::spawn(async move {
let (consumer, stream_name) = match setup_consumer() {
Ok(c) => c,
Err(err) => {
match err {
KafkaError::DoNotPrintError => {
debug!("P_KAFKA_TOPIC not set, skipping kafka integration");
}
_ => {
error!("{err}");
}
let (consumer, stream_name) = match setup_consumer() {
Ok(c) => c,
Err(err) => {
match err {
KafkaError::DoNotPrintError => {
debug!("P_KAFKA_TOPIC not set, skipping kafka integration");
}
_ => {
error!("{err}");
}
return;
}
};
return;
}
};

info!("Setup kafka integration for {stream_name}");
let mut stream = consumer.stream();
info!("Setup kafka integration for {stream_name}");
let mut stream = consumer.stream();

while let Ok(curr) = stream.next().await.unwrap() {
if let Err(err) = ingest_message(&stream_name, curr).await {
error!("Unable to ingest incoming kafka message- {err}"),
}
while let Ok(curr) = stream.next().await.unwrap() {
if let Err(err) = ingest_message(&stream_name, curr).await {
error!("Unable to ingest incoming kafka message- {err}")
}
});
}
}
4 changes: 2 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ async fn main() -> anyhow::Result<()> {
metadata.set_global();

// load kafka server
if CONFIG.parseable.mode.ne(&Mode::Query) {
kafka::setup_integration().await;
if CONFIG.parseable.mode != Mode::Query {
tokio::task::spawn(kafka::setup_integration());
}

server.init().await?;
Expand Down

0 comments on commit 9b21a13

Please sign in to comment.