Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Custom Flattening for OTEL logs, metrics and traces #1043

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 63 additions & 2 deletions src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use super::logstream::error::{CreateStreamError, StreamError};
use super::modal::utils::ingest_utils::{flatten_and_push_logs, push_logs};
use super::users::dashboards::DashboardError;
use super::users::filters::FiltersError;
use super::{otel_logs, otel_metrics, otel_traces};
use crate::event::{
self,
error::EventError,
Expand Down Expand Up @@ -105,7 +106,7 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<
// Handler for POST /v1/logs to ingest OTEL logs
// ingests events by extracting stream name from header
// creates if stream does not exist
pub async fn handle_otel_ingestion(
pub async fn handle_otel_logs_ingestion(
req: HttpRequest,
body: Bytes,
) -> Result<HttpResponse, PostError> {
Expand All @@ -116,7 +117,67 @@ pub async fn handle_otel_ingestion(
{
let stream_name = stream_name.to_str().unwrap().to_owned();
create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?;
push_logs(stream_name.to_string(), req.clone(), body).await?;

//custom flattening required for otel logs
let mut json = otel_logs::flatten_otel_logs(&body);
for record in json.iter_mut() {
let body: Bytes = serde_json::to_vec(record).unwrap().into();
push_logs(stream_name.to_string(), req.clone(), body).await?;
}
} else {
return Err(PostError::Header(ParseHeaderError::MissingStreamName));
}
Ok(HttpResponse::Ok().finish())
}

// Handler for POST /v1/metrics to ingest OTEL metrics
// ingests events by extracting stream name from header
// creates if stream does not exist
pub async fn handle_otel_metrics_ingestion(
req: HttpRequest,
body: Bytes,
) -> Result<HttpResponse, PostError> {
if let Some((_, stream_name)) = req
.headers()
.iter()
.find(|&(key, _)| key == STREAM_NAME_HEADER_KEY)
{
let stream_name = stream_name.to_str().unwrap().to_owned();
create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?;

//custom flattening required for otel metrics
let mut json = otel_metrics::flatten_otel_metrics(&body);
for record in json.iter_mut() {
let body: Bytes = serde_json::to_vec(record).unwrap().into();
push_logs(stream_name.to_string(), req.clone(), body).await?;
}
} else {
return Err(PostError::Header(ParseHeaderError::MissingStreamName));
}
Ok(HttpResponse::Ok().finish())
}

// Handler for POST /v1/traces to ingest OTEL traces
// ingests events by extracting stream name from header
// creates if stream does not exist
pub async fn handle_otel_traces_ingestion(
req: HttpRequest,
body: Bytes,
) -> Result<HttpResponse, PostError> {
if let Some((_, stream_name)) = req
.headers()
.iter()
.find(|&(key, _)| key == STREAM_NAME_HEADER_KEY)
{
let stream_name = stream_name.to_str().unwrap().to_owned();
create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?;

//custom flattening required for otel traces
let mut json = otel_traces::flatten_otel_traces(&body);
for record in json.iter_mut() {
let body: Bytes = serde_json::to_vec(record).unwrap().into();
push_logs(stream_name.to_string(), req.clone(), body).await?;
}
} else {
return Err(PostError::Header(ParseHeaderError::MissingStreamName));
}
Expand Down
4 changes: 4 additions & 0 deletions src/handlers/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ pub mod logstream;
pub mod middleware;
pub mod modal;
pub mod oidc;
pub mod otel;
pub mod otel_logs;
pub mod otel_metrics;
pub mod otel_traces;
pub mod query;
pub mod rbac;
pub mod role;
Expand Down
6 changes: 3 additions & 3 deletions src/handlers/http/modal/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ impl Server {
web::resource("/logs")
.route(
web::post()
.to(ingest::handle_otel_ingestion)
.to(ingest::handle_otel_logs_ingestion)
.authorize_for_stream(Action::Ingest),
)
.app_data(web::PayloadConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)),
Expand All @@ -407,7 +407,7 @@ impl Server {
web::resource("/metrics")
.route(
web::post()
.to(ingest::handle_otel_ingestion)
.to(ingest::handle_otel_metrics_ingestion)
.authorize_for_stream(Action::Ingest),
)
.app_data(web::PayloadConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)),
Expand All @@ -416,7 +416,7 @@ impl Server {
web::resource("/traces")
.route(
web::post()
.to(ingest::handle_otel_ingestion)
.to(ingest::handle_otel_traces_ingestion)
.authorize_for_stream(Action::Ingest),
)
.app_data(web::PayloadConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)),
Expand Down
49 changes: 36 additions & 13 deletions src/handlers/http/modal/utils/ingest_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
*
*/

use std::{collections::HashMap, sync::Arc};
use std::{
collections::{BTreeMap, HashMap},
sync::Arc,
};

use actix_web::HttpRequest;
use arrow_schema::Field;
Expand All @@ -30,8 +33,9 @@ use crate::{
format::{self, EventFormat},
},
handlers::{
http::{ingest::PostError, kinesis},
LOG_SOURCE_KEY, LOG_SOURCE_KINESIS, PREFIX_META, PREFIX_TAGS, SEPARATOR,
http::{ingest::PostError, kinesis, otel_logs, otel_metrics, otel_traces},
LOG_SOURCE_KEY, LOG_SOURCE_KINESIS, LOG_SOURCE_OTEL_LOGS, LOG_SOURCE_OTEL_METRICS,
LOG_SOURCE_OTEL_TRACES, PREFIX_META, PREFIX_TAGS, SEPARATOR,
},
metadata::STREAM_INFO,
storage::StreamType,
Expand All @@ -43,19 +47,38 @@ pub async fn flatten_and_push_logs(
body: Bytes,
stream_name: String,
) -> Result<(), PostError> {
let log_source = req
.headers()
.get(LOG_SOURCE_KEY)
.map(|header| header.to_str().unwrap_or_default())
.unwrap_or_default();
if log_source == LOG_SOURCE_KINESIS {
let json = kinesis::flatten_kinesis_logs(&body);
for record in json.iter() {
//flatten logs
if let Some((_, log_source)) = req.headers().iter().find(|&(key, _)| key == LOG_SOURCE_KEY) {
let mut json: Vec<BTreeMap<String, Value>> = Vec::new();
let log_source: String = log_source.to_str().unwrap().to_owned();
match log_source.as_str() {
LOG_SOURCE_KINESIS => json = kinesis::flatten_kinesis_logs(&body),

//custom flattening required for otel logs
LOG_SOURCE_OTEL_LOGS => {
json = otel_logs::flatten_otel_logs(&body);
}

//custom flattening required for otel metrics
LOG_SOURCE_OTEL_METRICS => {
json = otel_metrics::flatten_otel_metrics(&body);
}

//custom flattening required for otel traces
LOG_SOURCE_OTEL_TRACES => {
json = otel_traces::flatten_otel_traces(&body);
}
_ => {
tracing::warn!("Unknown log source: {}", log_source);
push_logs(stream_name.to_string(), req.clone(), body).await?;
}
}
for record in json.iter_mut() {
let body: Bytes = serde_json::to_vec(record).unwrap().into();
push_logs(stream_name.clone(), req.clone(), body.clone()).await?;
push_logs(stream_name.to_string(), req.clone(), body).await?;
}
} else {
push_logs(stream_name, req, body).await?;
push_logs(stream_name.to_string(), req, body).await?;
}
Ok(())
}
Expand Down
137 changes: 137 additions & 0 deletions src/handlers/http/otel.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* Parseable Server (C) 2022 - 2024 Parseable, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
pub mod proto;
use proto::common::v1::KeyValue;
use serde_json::Value;
use std::collections::BTreeMap;
// Value can be one of types - String, Bool, Int, Double, ArrayValue, AnyValue, KeyValueList, Byte
pub fn collect_json_from_any_value(
key: &String,
value: super::otel::proto::common::v1::Value,
) -> BTreeMap<String, Value> {
let mut value_json: BTreeMap<String, Value> = BTreeMap::new();
if value.str_val.is_some() {
value_json.insert(
key.to_string(),
Value::String(value.str_val.as_ref().unwrap().to_owned()),
);
}
if value.bool_val.is_some() {
value_json.insert(key.to_string(), Value::Bool(value.bool_val.unwrap()));
}
if value.int_val.is_some() {
value_json.insert(
key.to_string(),
Value::String(value.int_val.as_ref().unwrap().to_owned()),
);
}
if value.double_val.is_some() {
value_json.insert(
key.to_string(),
Value::Number(serde_json::Number::from_f64(value.double_val.unwrap()).unwrap()),
);
}

//ArrayValue is a vector of AnyValue
//traverse by recursively calling the same function
if value.array_val.is_some() {
let array_val = value.array_val.as_ref().unwrap();
let values = &array_val.values;
for value in values {
let array_value_json = collect_json_from_any_value(key, value.clone());
for key in array_value_json.keys() {
value_json.insert(
format!(
"{}_{}",
key.to_owned(),
value_to_string(array_value_json[key].to_owned())
),
array_value_json[key].to_owned(),
);
}
}
}

//KeyValueList is a vector of KeyValue
//traverse through each element in the vector
if value.kv_list_val.is_some() {
let kv_list_val = value.kv_list_val.unwrap();
for key_value in kv_list_val.values {
let value = key_value.value;
if value.is_some() {
let value = value.unwrap();
let key_value_json = collect_json_from_any_value(key, value);

for key in key_value_json.keys() {
value_json.insert(
format!(
"{}_{}_{}",
key.to_owned(),
key_value.key,
value_to_string(key_value_json[key].to_owned())
),
key_value_json[key].to_owned(),
);
}
}
}
}
if value.bytes_val.is_some() {
value_json.insert(
key.to_string(),
Value::String(value.bytes_val.as_ref().unwrap().to_owned()),
);
}

value_json
}

//traverse through Value by calling function ollect_json_from_any_value
pub fn collect_json_from_values(
values: &Option<super::otel::proto::common::v1::Value>,
key: &String,
) -> BTreeMap<String, Value> {
let mut value_json: BTreeMap<String, Value> = BTreeMap::new();

for value in values.iter() {
value_json = collect_json_from_any_value(key, value.clone());
}

value_json
}

pub fn value_to_string(value: serde_json::Value) -> String {
match value.clone() {
e @ Value::Number(_) | e @ Value::Bool(_) => e.to_string(),
Value::String(s) => s,
_ => "".to_string(),
}
}

pub fn flatten_attributes(attributes: &Vec<KeyValue>) -> BTreeMap<String, Value> {
let mut attributes_json: BTreeMap<String, Value> = BTreeMap::new();
for attribute in attributes {
let key = &attribute.key;
let value = &attribute.value;
let value_json = collect_json_from_values(value, &key.to_string());
for key in value_json.keys() {
attributes_json.insert(key.to_owned(), value_json[key].to_owned());
}
}
attributes_json
}
Loading
Loading