diff --git a/src/event/format/json.rs b/src/event/format/json.rs index 6d1cf3419..0cba267bb 100644 --- a/src/event/format/json.rs +++ b/src/event/format/json.rs @@ -52,8 +52,17 @@ impl EventFormat for Event { static_schema_flag: Option<&String>, time_partition: Option<&String>, schema_version: SchemaVersion, + log_source: &str, ) -> Result<(Self::Data, Vec>, bool, Tags, Metadata), anyhow::Error> { - let data = flatten_json_body(self.data, None, None, None, schema_version, false)?; + let data = flatten_json_body( + self.data, + None, + None, + None, + schema_version, + false, + log_source, + )?; let stream_schema = schema; // incoming event may be a single json or a json array diff --git a/src/event/format/mod.rs b/src/event/format/mod.rs index b3cb8e4dd..0c65d1402 100644 --- a/src/event/format/mod.rs +++ b/src/event/format/mod.rs @@ -54,6 +54,7 @@ pub trait EventFormat: Sized { static_schema_flag: Option<&String>, time_partition: Option<&String>, schema_version: SchemaVersion, + log_source: &str, ) -> Result<(Self::Data, EventSchema, bool, Tags, Metadata), AnyError>; fn decode(data: Self::Data, schema: Arc) -> Result; @@ -64,12 +65,14 @@ pub trait EventFormat: Sized { static_schema_flag: Option<&String>, time_partition: Option<&String>, schema_version: SchemaVersion, + log_source: &str, ) -> Result<(RecordBatch, bool), AnyError> { let (data, mut schema, is_first, tags, metadata) = self.to_data( storage_schema, static_schema_flag, time_partition, schema_version, + log_source, )?; // DEFAULT_TAGS_KEY, DEFAULT_METADATA_KEY and DEFAULT_TIMESTAMP_KEY are reserved field names diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index e5a03277f..0961f6b6d 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -95,7 +95,7 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result< metadata: String::default(), }; // For internal streams, use old schema - event.into_recordbatch(&schema, None, None, SchemaVersion::V0)? + event.into_recordbatch(&schema, None, None, SchemaVersion::V0, "")? }; event::Event { rb, @@ -127,7 +127,8 @@ pub async fn handle_otel_logs_ingestion( let Some(log_source) = req.headers().get(LOG_SOURCE_KEY) else { return Err(PostError::Header(ParseHeaderError::MissingLogSource)); }; - if log_source.to_str().unwrap() != LOG_SOURCE_OTEL_LOGS { + let log_source = log_source.to_str().unwrap(); + if log_source != LOG_SOURCE_OTEL_LOGS { return Err(PostError::Invalid(anyhow::anyhow!( "Please use x-p-log-source: otel-logs for ingesting otel logs" ))); @@ -141,7 +142,7 @@ pub async fn handle_otel_logs_ingestion( let mut json = flatten_otel_logs(&logs); for record in json.iter_mut() { let body: Bytes = serde_json::to_vec(record).unwrap().into(); - push_logs(&stream_name, &req, &body).await?; + push_logs(&stream_name, &req, &body, log_source).await?; } Ok(HttpResponse::Ok().finish()) @@ -160,7 +161,8 @@ pub async fn handle_otel_metrics_ingestion( let Some(log_source) = req.headers().get(LOG_SOURCE_KEY) else { return Err(PostError::Header(ParseHeaderError::MissingLogSource)); }; - if log_source.to_str().unwrap() != LOG_SOURCE_OTEL_METRICS { + let log_source = log_source.to_str().unwrap(); + if log_source != LOG_SOURCE_OTEL_METRICS { return Err(PostError::Invalid(anyhow::anyhow!( "Please use x-p-log-source: otel-metrics for ingesting otel metrics" ))); @@ -173,7 +175,7 @@ pub async fn handle_otel_metrics_ingestion( let mut json = flatten_otel_metrics(metrics); for record in json.iter_mut() { let body: Bytes = serde_json::to_vec(record).unwrap().into(); - push_logs(&stream_name, &req, &body).await?; + push_logs(&stream_name, &req, &body, log_source).await?; } Ok(HttpResponse::Ok().finish()) @@ -193,7 +195,8 @@ pub async fn handle_otel_traces_ingestion( let Some(log_source) = req.headers().get(LOG_SOURCE_KEY) else { return Err(PostError::Header(ParseHeaderError::MissingLogSource)); }; - if log_source.to_str().unwrap() != LOG_SOURCE_OTEL_TRACES { + let log_source = log_source.to_str().unwrap(); + if log_source != LOG_SOURCE_OTEL_TRACES { return Err(PostError::Invalid(anyhow::anyhow!( "Please use x-p-log-source: otel-traces for ingesting otel traces" ))); @@ -206,7 +209,7 @@ pub async fn handle_otel_traces_ingestion( let mut json = flatten_otel_traces(&traces); for record in json.iter_mut() { let body: Bytes = serde_json::to_vec(record).unwrap().into(); - push_logs(&stream_name, &req, &body).await?; + push_logs(&stream_name, &req, &body, log_source).await?; } Ok(HttpResponse::Ok().finish()) @@ -417,6 +420,7 @@ mod tests { None, None, SchemaVersion::V0, + "", ) .unwrap(); @@ -467,6 +471,7 @@ mod tests { None, None, SchemaVersion::V0, + "", ) .unwrap(); @@ -500,7 +505,8 @@ mod tests { let req = TestRequest::default().to_http_request(); - let (rb, _) = into_event_batch(&req, &json, schema, None, None, SchemaVersion::V0).unwrap(); + let (rb, _) = + into_event_batch(&req, &json, schema, None, None, SchemaVersion::V0, "").unwrap(); assert_eq!(rb.num_rows(), 1); assert_eq!(rb.num_columns(), 5); @@ -532,7 +538,7 @@ mod tests { let req = TestRequest::default().to_http_request(); - assert!(into_event_batch(&req, &json, schema, None, None, SchemaVersion::V0).is_err()); + assert!(into_event_batch(&req, &json, schema, None, None, SchemaVersion::V0, "").is_err()); } #[test] @@ -550,7 +556,8 @@ mod tests { let req = TestRequest::default().to_http_request(); - let (rb, _) = into_event_batch(&req, &json, schema, None, None, SchemaVersion::V0).unwrap(); + let (rb, _) = + into_event_batch(&req, &json, schema, None, None, SchemaVersion::V0, "").unwrap(); assert_eq!(rb.num_rows(), 1); assert_eq!(rb.num_columns(), 3); @@ -568,7 +575,8 @@ mod tests { HashMap::default(), None, None, - SchemaVersion::V0 + SchemaVersion::V0, + "" ) .is_err()) } @@ -600,6 +608,7 @@ mod tests { None, None, SchemaVersion::V0, + "", ) .unwrap(); @@ -656,6 +665,7 @@ mod tests { None, None, SchemaVersion::V0, + "", ) .unwrap(); @@ -705,7 +715,8 @@ mod tests { ); let req = TestRequest::default().to_http_request(); - let (rb, _) = into_event_batch(&req, &json, schema, None, None, SchemaVersion::V0).unwrap(); + let (rb, _) = + into_event_batch(&req, &json, schema, None, None, SchemaVersion::V0, "").unwrap(); assert_eq!(rb.num_rows(), 3); assert_eq!(rb.num_columns(), 6); @@ -754,7 +765,7 @@ mod tests { .into_iter(), ); - assert!(into_event_batch(&req, &json, schema, None, None, SchemaVersion::V0).is_err()); + assert!(into_event_batch(&req, &json, schema, None, None, SchemaVersion::V0, "").is_err()); } #[test] @@ -789,6 +800,7 @@ mod tests { None, None, SchemaVersion::V0, + "", ) .unwrap(); @@ -869,6 +881,7 @@ mod tests { None, None, SchemaVersion::V1, + "", ) .unwrap(); diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs index f790e74f4..3bafca463 100644 --- a/src/handlers/http/modal/utils/ingest_utils.rs +++ b/src/handlers/http/modal/utils/ingest_utils.rs @@ -45,7 +45,7 @@ pub async fn flatten_and_push_logs( stream_name: &str, ) -> Result<(), PostError> { let Some(log_source) = req.headers().get(LOG_SOURCE_KEY) else { - push_logs(stream_name, &req, &body).await?; + push_logs(stream_name, &req, &body, "").await?; return Ok(()); }; let log_source = log_source.to_str().unwrap(); @@ -53,7 +53,7 @@ pub async fn flatten_and_push_logs( let json = kinesis::flatten_kinesis_logs(&body); for record in json.iter() { let body: Bytes = serde_json::to_vec(record).unwrap().into(); - push_logs(stream_name, &req, &body).await?; + push_logs(stream_name, &req, &body, "").await?; } } else if log_source.contains("otel") { return Err(PostError::Invalid(anyhow!( @@ -61,7 +61,7 @@ pub async fn flatten_and_push_logs( ))); } else { tracing::warn!("Unknown log source: {}", log_source); - push_logs(stream_name, &req, &body).await?; + push_logs(stream_name, &req, &body, "").await?; } Ok(()) @@ -71,6 +71,7 @@ pub async fn push_logs( stream_name: &str, req: &HttpRequest, body: &Bytes, + log_source: &str, ) -> Result<(), PostError> { let time_partition = STREAM_INFO.get_time_partition(stream_name)?; let time_partition_limit = STREAM_INFO.get_time_partition_limit(stream_name)?; @@ -84,6 +85,7 @@ pub async fn push_logs( time_partition_limit, custom_partition.as_ref(), schema_version, + log_source, )?; for value in data { @@ -113,6 +115,7 @@ pub async fn push_logs( static_schema_flag.as_ref(), time_partition.as_ref(), schema_version, + log_source, )?; Event { @@ -140,6 +143,7 @@ pub fn into_event_batch( static_schema_flag: Option<&String>, time_partition: Option<&String>, schema_version: SchemaVersion, + log_source: &str, ) -> Result<(arrow_array::RecordBatch, bool), PostError> { let tags = collect_labelled_headers(req, PREFIX_TAGS, SEPARATOR)?; let metadata = collect_labelled_headers(req, PREFIX_META, SEPARATOR)?; @@ -148,8 +152,13 @@ pub fn into_event_batch( tags, metadata, }; - let (rb, is_first) = - event.into_recordbatch(&schema, static_schema_flag, time_partition, schema_version)?; + let (rb, is_first) = event.into_recordbatch( + &schema, + static_schema_flag, + time_partition, + schema_version, + log_source, + )?; Ok((rb, is_first)) } diff --git a/src/kafka.rs b/src/kafka.rs index 678b6ed2a..bb35cc863 100644 --- a/src/kafka.rs +++ b/src/kafka.rs @@ -198,6 +198,7 @@ async fn ingest_message(msg: BorrowedMessage<'_>) -> Result<(), KafkaError> { static_schema_flag.as_ref(), time_partition.as_ref(), schema_version, + "", ) .map_err(|err| KafkaError::PostError(PostError::CustomError(err.to_string())))?; diff --git a/src/utils/json/mod.rs b/src/utils/json/mod.rs index 1ef31c804..46f81a193 100644 --- a/src/utils/json/mod.rs +++ b/src/utils/json/mod.rs @@ -32,8 +32,9 @@ pub fn flatten_json_body( custom_partition: Option<&String>, schema_version: SchemaVersion, validation_required: bool, + log_source: &str, ) -> Result { - let mut nested_value = if schema_version == SchemaVersion::V1 { + let mut nested_value = if schema_version == SchemaVersion::V1 && !log_source.contains("otel") { flatten::generic_flattening(body)? } else { body @@ -57,6 +58,7 @@ pub fn convert_array_to_object( time_partition_limit: Option, custom_partition: Option<&String>, schema_version: SchemaVersion, + log_source: &str, ) -> Result, anyhow::Error> { let data = flatten_json_body( body, @@ -65,6 +67,7 @@ pub fn convert_array_to_object( custom_partition, schema_version, true, + log_source, )?; let value_arr = match data { Value::Array(arr) => arr,