Skip to content

Commit

Permalink
restrict generic flattening for otel data
Browse files Browse the repository at this point in the history
  • Loading branch information
nikhilsinhaparseable committed Jan 3, 2025
1 parent 8a7dafd commit 1ef9619
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 20 deletions.
11 changes: 10 additions & 1 deletion src/event/format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,17 @@ impl EventFormat for Event {
static_schema_flag: Option<&String>,
time_partition: Option<&String>,
schema_version: SchemaVersion,
log_source: &str,
) -> Result<(Self::Data, Vec<Arc<Field>>, bool, Tags, Metadata), anyhow::Error> {
let data = flatten_json_body(self.data, None, None, None, schema_version, false)?;
let data = flatten_json_body(
self.data,
None,
None,
None,
schema_version,
false,
log_source,
)?;
let stream_schema = schema;

// incoming event may be a single json or a json array
Expand Down
3 changes: 3 additions & 0 deletions src/event/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ pub trait EventFormat: Sized {
static_schema_flag: Option<&String>,
time_partition: Option<&String>,
schema_version: SchemaVersion,
log_source: &str,
) -> Result<(Self::Data, EventSchema, bool, Tags, Metadata), AnyError>;

fn decode(data: Self::Data, schema: Arc<Schema>) -> Result<RecordBatch, AnyError>;
Expand All @@ -64,12 +65,14 @@ pub trait EventFormat: Sized {
static_schema_flag: Option<&String>,
time_partition: Option<&String>,
schema_version: SchemaVersion,
log_source: &str,
) -> Result<(RecordBatch, bool), AnyError> {
let (data, mut schema, is_first, tags, metadata) = self.to_data(
storage_schema,
static_schema_flag,
time_partition,
schema_version,
log_source,
)?;

// DEFAULT_TAGS_KEY, DEFAULT_METADATA_KEY and DEFAULT_TIMESTAMP_KEY are reserved field names
Expand Down
39 changes: 26 additions & 13 deletions src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<
metadata: String::default(),
};
// For internal streams, use old schema
event.into_recordbatch(&schema, None, None, SchemaVersion::V0)?
event.into_recordbatch(&schema, None, None, SchemaVersion::V0, "")?
};
event::Event {
rb,
Expand Down Expand Up @@ -127,7 +127,8 @@ pub async fn handle_otel_logs_ingestion(
let Some(log_source) = req.headers().get(LOG_SOURCE_KEY) else {
return Err(PostError::Header(ParseHeaderError::MissingLogSource));
};
if log_source.to_str().unwrap() != LOG_SOURCE_OTEL_LOGS {
let log_source = log_source.to_str().unwrap();
if log_source != LOG_SOURCE_OTEL_LOGS {
return Err(PostError::Invalid(anyhow::anyhow!(
"Please use x-p-log-source: otel-logs for ingesting otel logs"
)));
Expand All @@ -141,7 +142,7 @@ pub async fn handle_otel_logs_ingestion(
let mut json = flatten_otel_logs(&logs);
for record in json.iter_mut() {
let body: Bytes = serde_json::to_vec(record).unwrap().into();
push_logs(&stream_name, &req, &body).await?;
push_logs(&stream_name, &req, &body, log_source).await?;
}

Ok(HttpResponse::Ok().finish())
Expand All @@ -160,7 +161,8 @@ pub async fn handle_otel_metrics_ingestion(
let Some(log_source) = req.headers().get(LOG_SOURCE_KEY) else {
return Err(PostError::Header(ParseHeaderError::MissingLogSource));
};
if log_source.to_str().unwrap() != LOG_SOURCE_OTEL_METRICS {
let log_source = log_source.to_str().unwrap();
if log_source != LOG_SOURCE_OTEL_METRICS {
return Err(PostError::Invalid(anyhow::anyhow!(
"Please use x-p-log-source: otel-metrics for ingesting otel metrics"
)));
Expand All @@ -173,7 +175,7 @@ pub async fn handle_otel_metrics_ingestion(
let mut json = flatten_otel_metrics(metrics);
for record in json.iter_mut() {
let body: Bytes = serde_json::to_vec(record).unwrap().into();
push_logs(&stream_name, &req, &body).await?;
push_logs(&stream_name, &req, &body, log_source).await?;
}

Ok(HttpResponse::Ok().finish())
Expand All @@ -193,7 +195,8 @@ pub async fn handle_otel_traces_ingestion(
let Some(log_source) = req.headers().get(LOG_SOURCE_KEY) else {
return Err(PostError::Header(ParseHeaderError::MissingLogSource));
};
if log_source.to_str().unwrap() != LOG_SOURCE_OTEL_TRACES {
let log_source = log_source.to_str().unwrap();
if log_source != LOG_SOURCE_OTEL_TRACES {
return Err(PostError::Invalid(anyhow::anyhow!(
"Please use x-p-log-source: otel-traces for ingesting otel traces"
)));
Expand All @@ -206,7 +209,7 @@ pub async fn handle_otel_traces_ingestion(
let mut json = flatten_otel_traces(&traces);
for record in json.iter_mut() {
let body: Bytes = serde_json::to_vec(record).unwrap().into();
push_logs(&stream_name, &req, &body).await?;
push_logs(&stream_name, &req, &body, log_source).await?;
}

Ok(HttpResponse::Ok().finish())
Expand Down Expand Up @@ -417,6 +420,7 @@ mod tests {
None,
None,
SchemaVersion::V0,
"",
)
.unwrap();

Expand Down Expand Up @@ -467,6 +471,7 @@ mod tests {
None,
None,
SchemaVersion::V0,
"",
)
.unwrap();

Expand Down Expand Up @@ -500,7 +505,8 @@ mod tests {

let req = TestRequest::default().to_http_request();

let (rb, _) = into_event_batch(&req, &json, schema, None, None, SchemaVersion::V0).unwrap();
let (rb, _) =
into_event_batch(&req, &json, schema, None, None, SchemaVersion::V0, "").unwrap();

assert_eq!(rb.num_rows(), 1);
assert_eq!(rb.num_columns(), 5);
Expand Down Expand Up @@ -532,7 +538,7 @@ mod tests {

let req = TestRequest::default().to_http_request();

assert!(into_event_batch(&req, &json, schema, None, None, SchemaVersion::V0).is_err());
assert!(into_event_batch(&req, &json, schema, None, None, SchemaVersion::V0, "").is_err());
}

#[test]
Expand All @@ -550,7 +556,8 @@ mod tests {

let req = TestRequest::default().to_http_request();

let (rb, _) = into_event_batch(&req, &json, schema, None, None, SchemaVersion::V0).unwrap();
let (rb, _) =
into_event_batch(&req, &json, schema, None, None, SchemaVersion::V0, "").unwrap();

assert_eq!(rb.num_rows(), 1);
assert_eq!(rb.num_columns(), 3);
Expand All @@ -568,7 +575,8 @@ mod tests {
HashMap::default(),
None,
None,
SchemaVersion::V0
SchemaVersion::V0,
""
)
.is_err())
}
Expand Down Expand Up @@ -600,6 +608,7 @@ mod tests {
None,
None,
SchemaVersion::V0,
"",
)
.unwrap();

Expand Down Expand Up @@ -656,6 +665,7 @@ mod tests {
None,
None,
SchemaVersion::V0,
"",
)
.unwrap();

Expand Down Expand Up @@ -705,7 +715,8 @@ mod tests {
);
let req = TestRequest::default().to_http_request();

let (rb, _) = into_event_batch(&req, &json, schema, None, None, SchemaVersion::V0).unwrap();
let (rb, _) =
into_event_batch(&req, &json, schema, None, None, SchemaVersion::V0, "").unwrap();

assert_eq!(rb.num_rows(), 3);
assert_eq!(rb.num_columns(), 6);
Expand Down Expand Up @@ -754,7 +765,7 @@ mod tests {
.into_iter(),
);

assert!(into_event_batch(&req, &json, schema, None, None, SchemaVersion::V0).is_err());
assert!(into_event_batch(&req, &json, schema, None, None, SchemaVersion::V0, "").is_err());
}

#[test]
Expand Down Expand Up @@ -789,6 +800,7 @@ mod tests {
None,
None,
SchemaVersion::V0,
"",
)
.unwrap();

Expand Down Expand Up @@ -869,6 +881,7 @@ mod tests {
None,
None,
SchemaVersion::V1,
"",
)
.unwrap();

Expand Down
19 changes: 14 additions & 5 deletions src/handlers/http/modal/utils/ingest_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,23 +45,23 @@ pub async fn flatten_and_push_logs(
stream_name: &str,
) -> Result<(), PostError> {
let Some(log_source) = req.headers().get(LOG_SOURCE_KEY) else {
push_logs(stream_name, &req, &body).await?;
push_logs(stream_name, &req, &body, "").await?;
return Ok(());
};
let log_source = log_source.to_str().unwrap();
if log_source == LOG_SOURCE_KINESIS {
let json = kinesis::flatten_kinesis_logs(&body);
for record in json.iter() {
let body: Bytes = serde_json::to_vec(record).unwrap().into();
push_logs(stream_name, &req, &body).await?;
push_logs(stream_name, &req, &body, "").await?;
}
} else if log_source.contains("otel") {
return Err(PostError::Invalid(anyhow!(
"Please use endpoints `/v1/logs` for otel logs, `/v1/metrics` for otel metrics and `/v1/traces` for otel traces"
)));
} else {
tracing::warn!("Unknown log source: {}", log_source);
push_logs(stream_name, &req, &body).await?;
push_logs(stream_name, &req, &body, "").await?;
}

Ok(())
Expand All @@ -71,6 +71,7 @@ pub async fn push_logs(
stream_name: &str,
req: &HttpRequest,
body: &Bytes,
log_source: &str,
) -> Result<(), PostError> {
let time_partition = STREAM_INFO.get_time_partition(stream_name)?;
let time_partition_limit = STREAM_INFO.get_time_partition_limit(stream_name)?;
Expand All @@ -84,6 +85,7 @@ pub async fn push_logs(
time_partition_limit,
custom_partition.as_ref(),
schema_version,
log_source,
)?;

for value in data {
Expand Down Expand Up @@ -113,6 +115,7 @@ pub async fn push_logs(
static_schema_flag.as_ref(),
time_partition.as_ref(),
schema_version,
log_source,
)?;

Event {
Expand Down Expand Up @@ -140,6 +143,7 @@ pub fn into_event_batch(
static_schema_flag: Option<&String>,
time_partition: Option<&String>,
schema_version: SchemaVersion,
log_source: &str,
) -> Result<(arrow_array::RecordBatch, bool), PostError> {
let tags = collect_labelled_headers(req, PREFIX_TAGS, SEPARATOR)?;
let metadata = collect_labelled_headers(req, PREFIX_META, SEPARATOR)?;
Expand All @@ -148,8 +152,13 @@ pub fn into_event_batch(
tags,
metadata,
};
let (rb, is_first) =
event.into_recordbatch(&schema, static_schema_flag, time_partition, schema_version)?;
let (rb, is_first) = event.into_recordbatch(
&schema,
static_schema_flag,
time_partition,
schema_version,
log_source,
)?;
Ok((rb, is_first))
}

Expand Down
1 change: 1 addition & 0 deletions src/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ async fn ingest_message(msg: BorrowedMessage<'_>) -> Result<(), KafkaError> {
static_schema_flag.as_ref(),
time_partition.as_ref(),
schema_version,
"",
)
.map_err(|err| KafkaError::PostError(PostError::CustomError(err.to_string())))?;

Expand Down
5 changes: 4 additions & 1 deletion src/utils/json/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ pub fn flatten_json_body(
custom_partition: Option<&String>,
schema_version: SchemaVersion,
validation_required: bool,
log_source: &str,
) -> Result<Value, anyhow::Error> {
let mut nested_value = if schema_version == SchemaVersion::V1 {
let mut nested_value = if schema_version == SchemaVersion::V1 && !log_source.contains("otel") {
flatten::generic_flattening(body)?
} else {
body
Expand All @@ -57,6 +58,7 @@ pub fn convert_array_to_object(
time_partition_limit: Option<NonZeroU32>,
custom_partition: Option<&String>,
schema_version: SchemaVersion,
log_source: &str,
) -> Result<Vec<Value>, anyhow::Error> {
let data = flatten_json_body(
body,
Expand All @@ -65,6 +67,7 @@ pub fn convert_array_to_object(
custom_partition,
schema_version,
true,
log_source,
)?;
let value_arr = match data {
Value::Array(arr) => arr,
Expand Down

0 comments on commit 1ef9619

Please sign in to comment.