Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhancement: add schema type to stream info #1026

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 21 additions & 6 deletions src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::event::{
format::{self, EventFormat},
};
use crate::handlers::http::modal::utils::logstream_utils::create_stream_and_schema_from_storage;
use crate::handlers::STREAM_NAME_HEADER_KEY;
use crate::handlers::{LOG_SOURCE_KEY, LOG_SOURCE_OTEL, STREAM_NAME_HEADER_KEY};
use crate::localcache::CacheError;
use crate::metadata::error::stream_info::MetadataError;
use crate::metadata::STREAM_INFO;
Expand Down Expand Up @@ -60,9 +60,9 @@ pub async fn ingest(req: HttpRequest, body: Bytes) -> Result<HttpResponse, PostE
stream_name
)));
}
create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?;
create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string(), "").await?;

flatten_and_push_logs(req, body, stream_name).await?;
flatten_and_push_logs(req, body, &stream_name).await?;
Ok(HttpResponse::Ok().finish())
} else {
Err(PostError::Header(ParseHeaderError::MissingStreamName))
Expand Down Expand Up @@ -116,8 +116,21 @@ pub async fn handle_otel_ingestion(
.find(|&(key, _)| key == STREAM_NAME_HEADER_KEY)
{
let stream_name = stream_name.to_str().unwrap().to_owned();
create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?;
push_logs(stream_name.to_string(), req.clone(), body).await?;
if req
.headers()
.iter()
.any(|(key, value)| key == LOG_SOURCE_KEY && value == LOG_SOURCE_OTEL)
{
create_stream_if_not_exists(
&stream_name,
&StreamType::UserDefined.to_string(),
LOG_SOURCE_OTEL,
)
.await?;
push_logs(&stream_name, req.clone(), body).await?;
} else {
return Err(PostError::CustomError("Unknown log source".to_string()));
}
} else {
return Err(PostError::Header(ParseHeaderError::MissingStreamName));
}
Expand Down Expand Up @@ -150,7 +163,7 @@ pub async fn post_event(req: HttpRequest, body: Bytes) -> Result<HttpResponse, P
}
}

flatten_and_push_logs(req, body, stream_name).await?;
flatten_and_push_logs(req, body, &stream_name).await?;
Ok(HttpResponse::Ok().finish())
}

Expand Down Expand Up @@ -178,6 +191,7 @@ pub async fn push_logs_unchecked(
pub async fn create_stream_if_not_exists(
stream_name: &str,
stream_type: &str,
schema_type: &str,
) -> Result<bool, PostError> {
let mut stream_exists = false;
if STREAM_INFO.stream_exists(stream_name) {
Expand All @@ -202,6 +216,7 @@ pub async fn create_stream_if_not_exists(
"",
Arc::new(Schema::empty()),
stream_type,
schema_type,
)
.await?;

Expand Down
13 changes: 11 additions & 2 deletions src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,7 @@ fn remove_id_from_alerts(value: &mut Value) {
}
}

#[allow(clippy::too_many_arguments)]
pub async fn create_stream(
stream_name: String,
time_partition: &str,
Expand All @@ -513,6 +514,7 @@ pub async fn create_stream(
static_schema_flag: &str,
schema: Arc<Schema>,
stream_type: &str,
schema_type: &str,
) -> Result<(), CreateStreamError> {
// fail to proceed if invalid stream name
if stream_type != StreamType::Internal.to_string() {
Expand All @@ -530,6 +532,7 @@ pub async fn create_stream(
static_schema_flag,
schema.clone(),
stream_type,
schema_type,
)
.await
{
Expand All @@ -553,6 +556,7 @@ pub async fn create_stream(
static_schema_flag.to_string(),
static_schema,
stream_type,
schema_type,
);
}
Err(err) => {
Expand Down Expand Up @@ -602,6 +606,7 @@ pub async fn get_stream_info(req: HttpRequest) -> Result<impl Responder, StreamE
custom_partition: stream_meta.custom_partition.clone(),
cache_enabled: stream_meta.cache_enabled,
static_schema_flag: stream_meta.static_schema_flag.clone(),
schema_type: stream_meta.schema_type.clone(),
};

// get the other info from
Expand Down Expand Up @@ -744,8 +749,12 @@ pub async fn delete_stream_hot_tier(req: HttpRequest) -> Result<impl Responder,
}

pub async fn create_internal_stream_if_not_exists() -> Result<(), StreamError> {
if let Ok(stream_exists) =
create_stream_if_not_exists(INTERNAL_STREAM_NAME, &StreamType::Internal.to_string()).await
if let Ok(stream_exists) = create_stream_if_not_exists(
INTERNAL_STREAM_NAME,
&StreamType::Internal.to_string(),
INTERNAL_STREAM_NAME,
)
.await
{
if stream_exists {
return Ok(());
Expand Down
58 changes: 30 additions & 28 deletions src/handlers/http/modal/utils/ingest_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use crate::{
},
handlers::{
http::{ingest::PostError, kinesis},
LOG_SOURCE_KEY, LOG_SOURCE_KINESIS, PREFIX_META, PREFIX_TAGS, SEPARATOR,
LOG_SOURCE_KEY, LOG_SOURCE_KINESIS, LOG_SOURCE_OTEL, PREFIX_META, PREFIX_TAGS, SEPARATOR,
},
metadata::STREAM_INFO,
storage::StreamType,
Expand All @@ -41,42 +41,44 @@ use crate::{
pub async fn flatten_and_push_logs(
req: HttpRequest,
body: Bytes,
stream_name: String,
stream_name: &str,
) -> Result<(), PostError> {
let log_source = req
.headers()
.get(LOG_SOURCE_KEY)
.map(|header| header.to_str().unwrap_or_default())
.unwrap_or_default();
if log_source == LOG_SOURCE_KINESIS {
let json = kinesis::flatten_kinesis_logs(&body);
for record in json.iter() {
let body: Bytes = serde_json::to_vec(record).unwrap().into();
push_logs(stream_name.clone(), req.clone(), body.clone()).await?;
match log_source {
LOG_SOURCE_KINESIS => {
let json = kinesis::flatten_kinesis_logs(&body);
for record in json.iter() {
let body: Bytes = serde_json::to_vec(record).unwrap().into();
push_logs(stream_name, req.clone(), body.clone()).await?;
}
return Ok(());
}
} else {
push_logs(stream_name, req, body).await?;
LOG_SOURCE_OTEL => {
STREAM_INFO.set_schema_type(stream_name, Some(LOG_SOURCE_OTEL.to_string()))?
}
_ => {}
}
push_logs(stream_name, req.clone(), body.clone()).await?;
Ok(())
}

pub async fn push_logs(
stream_name: String,
req: HttpRequest,
body: Bytes,
) -> Result<(), PostError> {
let time_partition = STREAM_INFO.get_time_partition(&stream_name)?;
let time_partition_limit = STREAM_INFO.get_time_partition_limit(&stream_name)?;
let static_schema_flag = STREAM_INFO.get_static_schema_flag(&stream_name)?;
let custom_partition = STREAM_INFO.get_custom_partition(&stream_name)?;
pub async fn push_logs(stream_name: &str, req: HttpRequest, body: Bytes) -> Result<(), PostError> {
let time_partition = STREAM_INFO.get_time_partition(stream_name)?;
let time_partition_limit = STREAM_INFO.get_time_partition_limit(stream_name)?;
let static_schema_flag = STREAM_INFO.get_static_schema_flag(stream_name)?;
let custom_partition = STREAM_INFO.get_custom_partition(stream_name)?;
let body_val: Value = serde_json::from_slice(&body)?;
let size: usize = body.len();
let mut parsed_timestamp = Utc::now().naive_utc();
if time_partition.is_none() {
if custom_partition.is_none() {
let size = size as u64;
create_process_record_batch(
stream_name.clone(),
stream_name,
req.clone(),
body_val.clone(),
static_schema_flag.clone(),
Expand All @@ -98,7 +100,7 @@ pub async fn push_logs(

let size = value.to_string().into_bytes().len() as u64;
create_process_record_batch(
stream_name.clone(),
stream_name,
req.clone(),
value.clone(),
static_schema_flag.clone(),
Expand All @@ -121,7 +123,7 @@ pub async fn push_logs(
parsed_timestamp = get_parsed_timestamp(&value, &time_partition);
let size = value.to_string().into_bytes().len() as u64;
create_process_record_batch(
stream_name.clone(),
stream_name,
req.clone(),
value.clone(),
static_schema_flag.clone(),
Expand Down Expand Up @@ -149,7 +151,7 @@ pub async fn push_logs(
parsed_timestamp = get_parsed_timestamp(&value, &time_partition);
let size = value.to_string().into_bytes().len() as u64;
create_process_record_batch(
stream_name.clone(),
stream_name,
req.clone(),
value.clone(),
static_schema_flag.clone(),
Expand All @@ -167,7 +169,7 @@ pub async fn push_logs(

#[allow(clippy::too_many_arguments)]
pub async fn create_process_record_batch(
stream_name: String,
stream_name: &str,
req: HttpRequest,
value: Value,
static_schema_flag: Option<String>,
Expand All @@ -177,15 +179,15 @@ pub async fn create_process_record_batch(
origin_size: u64,
) -> Result<(), PostError> {
let (rb, is_first_event) = get_stream_schema(
stream_name.clone(),
stream_name,
req.clone(),
value.clone(),
static_schema_flag.clone(),
time_partition.clone(),
)?;
event::Event {
rb,
stream_name: stream_name.clone(),
stream_name: stream_name.to_string(),
origin_format: "json",
origin_size,
is_first_event,
Expand All @@ -201,16 +203,16 @@ pub async fn create_process_record_batch(
}

pub fn get_stream_schema(
stream_name: String,
stream_name: &str,
req: HttpRequest,
body: Value,
static_schema_flag: Option<String>,
time_partition: Option<String>,
) -> Result<(arrow_array::RecordBatch, bool), PostError> {
let hash_map = STREAM_INFO.read().unwrap();
let schema = hash_map
.get(&stream_name)
.ok_or(PostError::StreamNotFound(stream_name))?
.get(stream_name)
.ok_or(PostError::StreamNotFound(stream_name.to_string()))?
.schema
.clone();
into_event_batch(req, body, schema, static_schema_flag, time_partition)
Expand Down
20 changes: 16 additions & 4 deletions src/handlers/http/modal/utils/logstream_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ use http::StatusCode;
use crate::{
handlers::{
http::logstream::error::{CreateStreamError, StreamError},
CUSTOM_PARTITION_KEY, STATIC_SCHEMA_FLAG, STREAM_TYPE_KEY, TIME_PARTITION_KEY,
TIME_PARTITION_LIMIT_KEY, UPDATE_STREAM_KEY,
CUSTOM_PARTITION_KEY, SCHEMA_TYPE_KEY, STATIC_SCHEMA_FLAG, STREAM_TYPE_KEY,
TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY, UPDATE_STREAM_KEY,
},
metadata::{self, STREAM_INFO},
option::{Mode, CONFIG},
Expand All @@ -48,6 +48,7 @@ pub async fn create_update_stream(
static_schema_flag,
update_stream_flag,
stream_type,
schema_type,
) = fetch_headers_from_put_stream_request(req);

if metadata::STREAM_INFO.stream_exists(stream_name) && update_stream_flag != "true" {
Expand Down Expand Up @@ -113,6 +114,7 @@ pub async fn create_update_stream(
&static_schema_flag,
schema,
&stream_type,
&schema_type,
)
.await?;

Expand Down Expand Up @@ -167,13 +169,14 @@ async fn validate_and_update_custom_partition(

pub fn fetch_headers_from_put_stream_request(
req: &HttpRequest,
) -> (String, String, String, String, String, String) {
) -> (String, String, String, String, String, String, String) {
let mut time_partition = String::default();
let mut time_partition_limit = String::default();
let mut custom_partition = String::default();
let mut static_schema_flag = String::default();
let mut update_stream = String::default();
let mut stream_type = StreamType::UserDefined.to_string();
let mut schema_type = String::default();
req.headers().iter().for_each(|(key, value)| {
if key == TIME_PARTITION_KEY {
time_partition = value.to_str().unwrap().to_string();
Expand All @@ -193,6 +196,9 @@ pub fn fetch_headers_from_put_stream_request(
if key == STREAM_TYPE_KEY {
stream_type = value.to_str().unwrap().to_string();
}
if key == SCHEMA_TYPE_KEY {
schema_type = value.to_str().unwrap().to_string();
}
});

(
Expand All @@ -202,6 +208,7 @@ pub fn fetch_headers_from_put_stream_request(
static_schema_flag,
update_stream,
stream_type,
schema_type,
)
}

Expand Down Expand Up @@ -378,6 +385,7 @@ pub async fn update_custom_partition_in_stream(
Ok(())
}

#[allow(clippy::too_many_arguments)]
pub async fn create_stream(
stream_name: String,
time_partition: &str,
Expand All @@ -386,6 +394,7 @@ pub async fn create_stream(
static_schema_flag: &str,
schema: Arc<Schema>,
stream_type: &str,
schema_type: &str,
) -> Result<(), CreateStreamError> {
// fail to proceed if invalid stream name
if stream_type != StreamType::Internal.to_string() {
Expand All @@ -403,6 +412,7 @@ pub async fn create_stream(
static_schema_flag,
schema.clone(),
stream_type,
schema_type,
)
.await
{
Expand All @@ -426,6 +436,7 @@ pub async fn create_stream(
static_schema_flag.to_string(),
static_schema,
stream_type,
schema_type,
);
}
Err(err) => {
Expand Down Expand Up @@ -475,7 +486,7 @@ pub async fn create_stream_and_schema_from_storage(stream_name: &str) -> Result<
let custom_partition = stream_metadata.custom_partition.as_deref().unwrap_or("");
let static_schema_flag = stream_metadata.static_schema_flag.as_deref().unwrap_or("");
let stream_type = stream_metadata.stream_type.as_deref().unwrap_or("");

let schema_type = stream_metadata.schema_type.as_deref().unwrap_or("");
metadata::STREAM_INFO.add_stream(
stream_name.to_string(),
stream_metadata.created_at,
Expand All @@ -485,6 +496,7 @@ pub async fn create_stream_and_schema_from_storage(stream_name: &str) -> Result<
static_schema_flag.to_string(),
static_schema,
stream_type,
schema_type,
);
} else {
return Ok(false);
Expand Down
3 changes: 2 additions & 1 deletion src/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ const AUTHORIZATION_KEY: &str = "authorization";
const SEPARATOR: char = '^';
const UPDATE_STREAM_KEY: &str = "x-p-update-stream";
const STREAM_TYPE_KEY: &str = "x-p-stream-type";
const SCHEMA_TYPE_KEY: &str = "x-p-schema-type";
const OIDC_SCOPE: &str = "openid profile email";
const COOKIE_AGE_DAYS: usize = 7;
const SESSION_COOKIE_NAME: &str = "session";
Expand All @@ -47,6 +48,6 @@ const TRINO_USER: &str = "x-trino-user";

// constants for log Source values for known sources and formats
const LOG_SOURCE_KINESIS: &str = "kinesis";

const LOG_SOURCE_OTEL: &str = "otel";
// AWS Kinesis constants
const KINESIS_COMMON_ATTRIBUTES_KEY: &str = "x-amz-firehose-common-attributes";
Loading
Loading