From 7db6d27bc690ab64f65c99e5dacebe813b0c697c Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Sat, 7 Dec 2024 20:52:52 -0500 Subject: [PATCH] validate log source header key and value for otel ingestion --- src/handlers/http/ingest.rs | 18 +++++++++++++++--- src/metadata.rs | 1 - 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index 28c3586f0..073f404ab 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -26,7 +26,7 @@ use crate::event::{ format::{self, EventFormat}, }; use crate::handlers::http::modal::utils::logstream_utils::create_stream_and_schema_from_storage; -use crate::handlers::STREAM_NAME_HEADER_KEY; +use crate::handlers::{LOG_SOURCE_KEY, LOG_SOURCE_OTEL, STREAM_NAME_HEADER_KEY}; use crate::localcache::CacheError; use crate::metadata::error::stream_info::MetadataError; use crate::metadata::STREAM_INFO; @@ -116,9 +116,21 @@ pub async fn handle_otel_ingestion( .find(|&(key, _)| key == STREAM_NAME_HEADER_KEY) { let stream_name = stream_name.to_str().unwrap().to_owned(); - create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string(), "otel") + if req + .headers() + .iter() + .any(|(key, value)| key == LOG_SOURCE_KEY && value == LOG_SOURCE_OTEL) + { + create_stream_if_not_exists( + &stream_name, + &StreamType::UserDefined.to_string(), + LOG_SOURCE_OTEL, + ) .await?; - push_logs(&stream_name, req.clone(), body).await?; + push_logs(&stream_name, req.clone(), body).await?; + } else { + return Err(PostError::CustomError("Unknown log source".to_string())); + } } else { return Err(PostError::Header(ParseHeaderError::MissingStreamName)); } diff --git a/src/metadata.rs b/src/metadata.rs index b71e479b2..dbb2e8fc0 100644 --- a/src/metadata.rs +++ b/src/metadata.rs @@ -208,7 +208,6 @@ impl StreamInfo { }) } - #[allow(dead_code)] pub fn set_schema_type( &self, stream_name: &str,