Skip to content

Commit

Permalink
fix: refactor ingestion (parseablehq#1073)
Browse files Browse the repository at this point in the history
update ingestion flow to flatten only once
improve readability

Fixes parseablehq#1064
  • Loading branch information
nikhilsinhaparseable authored Jan 7, 2025
1 parent 1abc8a9 commit e4e0ae5
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 284 deletions.
19 changes: 3 additions & 16 deletions src/event/format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,8 @@ use serde_json::Value;
use std::{collections::HashMap, sync::Arc};
use tracing::error;

use super::{EventFormat, LogSource};
use crate::{
metadata::SchemaVersion,
utils::{arrow::get_field, json::flatten_json_body},
};
use super::EventFormat;
use crate::{metadata::SchemaVersion, utils::arrow::get_field};

pub struct Event {
pub data: Value,
Expand All @@ -50,23 +47,13 @@ impl EventFormat for Event {
static_schema_flag: Option<&String>,
time_partition: Option<&String>,
schema_version: SchemaVersion,
log_source: &LogSource,
) -> Result<(Self::Data, Vec<Arc<Field>>, bool), anyhow::Error> {
let data = flatten_json_body(
self.data,
None,
None,
None,
schema_version,
false,
log_source,
)?;
let stream_schema = schema;

// incoming event may be a single json or a json array
// but Data (type defined above) is a vector of json values
// hence we need to convert the incoming event to a vector of json values
let value_arr = match data {
let value_arr = match self.data {
Value::Array(arr) => arr,
value @ Value::Object(_) => vec![value],
_ => unreachable!("flatten would have failed beforehand"),
Expand Down
3 changes: 0 additions & 3 deletions src/event/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ pub trait EventFormat: Sized {
static_schema_flag: Option<&String>,
time_partition: Option<&String>,
schema_version: SchemaVersion,
log_source: &LogSource,
) -> Result<(Self::Data, EventSchema, bool), AnyError>;

fn decode(data: Self::Data, schema: Arc<Schema>) -> Result<RecordBatch, AnyError>;
Expand All @@ -91,14 +90,12 @@ pub trait EventFormat: Sized {
static_schema_flag: Option<&String>,
time_partition: Option<&String>,
schema_version: SchemaVersion,
log_source: &LogSource,
) -> Result<(RecordBatch, bool), AnyError> {
let (data, mut schema, is_first) = self.to_data(
storage_schema,
static_schema_flag,
time_partition,
schema_version,
log_source,
)?;

if get_field(&schema, DEFAULT_TIMESTAMP_KEY).is_some() {
Expand Down
149 changes: 50 additions & 99 deletions src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use crate::otel::metrics::flatten_otel_metrics;
use crate::otel::traces::flatten_otel_traces;
use crate::storage::{ObjectStorageError, StreamType};
use crate::utils::header_parsing::ParseHeaderError;
use crate::utils::json::flatten::JsonFlattenError;
use actix_web::{http::header::ContentType, HttpRequest, HttpResponse};
use arrow_array::RecordBatch;
use arrow_schema::Schema;
Expand Down Expand Up @@ -89,13 +90,7 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<
.clone();
let event = format::json::Event { data: body_val };
// For internal streams, use old schema
event.into_recordbatch(
&schema,
None,
None,
SchemaVersion::V0,
&LogSource::default(),
)?
event.into_recordbatch(&schema, None, None, SchemaVersion::V0)?
};
event::Event {
rb,
Expand Down Expand Up @@ -328,6 +323,8 @@ pub enum PostError {
DashboardError(#[from] DashboardError),
#[error("Error: {0}")]
StreamError(#[from] StreamError),
#[error("Error: {0}")]
JsonFlattenError(#[from] JsonFlattenError),
}

impl actix_web::ResponseError for PostError {
Expand All @@ -349,6 +346,7 @@ impl actix_web::ResponseError for PostError {
PostError::DashboardError(_) => StatusCode::INTERNAL_SERVER_ERROR,
PostError::FiltersError(_) => StatusCode::INTERNAL_SERVER_ERROR,
PostError::StreamError(_) => StatusCode::INTERNAL_SERVER_ERROR,
PostError::JsonFlattenError(_) => StatusCode::INTERNAL_SERVER_ERROR,
}
}

Expand All @@ -369,8 +367,9 @@ mod tests {
use std::{collections::HashMap, sync::Arc};

use crate::{
event::format::LogSource, handlers::http::modal::utils::ingest_utils::into_event_batch,
handlers::http::modal::utils::ingest_utils::into_event_batch,
metadata::SchemaVersion,
utils::json::{convert_array_to_object, flatten::convert_to_array},
};

trait TestExt {
Expand Down Expand Up @@ -405,15 +404,8 @@ mod tests {
"b": "hello",
});

let (rb, _) = into_event_batch(
&json,
HashMap::default(),
None,
None,
SchemaVersion::V0,
&LogSource::default(),
)
.unwrap();
let (rb, _) =
into_event_batch(&json, HashMap::default(), None, None, SchemaVersion::V0).unwrap();

assert_eq!(rb.num_rows(), 1);
assert_eq!(rb.num_columns(), 4);
Expand All @@ -439,15 +431,8 @@ mod tests {
"c": null
});

let (rb, _) = into_event_batch(
&json,
HashMap::default(),
None,
None,
SchemaVersion::V0,
&LogSource::default(),
)
.unwrap();
let (rb, _) =
into_event_batch(&json, HashMap::default(), None, None, SchemaVersion::V0).unwrap();

assert_eq!(rb.num_rows(), 1);
assert_eq!(rb.num_columns(), 3);
Expand Down Expand Up @@ -477,15 +462,7 @@ mod tests {
.into_iter(),
);

let (rb, _) = into_event_batch(
&json,
schema,
None,
None,
SchemaVersion::V0,
&LogSource::default(),
)
.unwrap();
let (rb, _) = into_event_batch(&json, schema, None, None, SchemaVersion::V0).unwrap();

assert_eq!(rb.num_rows(), 1);
assert_eq!(rb.num_columns(), 3);
Expand Down Expand Up @@ -515,15 +492,7 @@ mod tests {
.into_iter(),
);

assert!(into_event_batch(
&json,
schema,
None,
None,
SchemaVersion::V0,
&LogSource::default()
)
.is_err());
assert!(into_event_batch(&json, schema, None, None, SchemaVersion::V0,).is_err());
}

#[test]
Expand All @@ -539,15 +508,7 @@ mod tests {
.into_iter(),
);

let (rb, _) = into_event_batch(
&json,
schema,
None,
None,
SchemaVersion::V0,
&LogSource::default(),
)
.unwrap();
let (rb, _) = into_event_batch(&json, schema, None, None, SchemaVersion::V0).unwrap();

assert_eq!(rb.num_rows(), 1);
assert_eq!(rb.num_columns(), 1);
Expand All @@ -556,14 +517,13 @@ mod tests {
#[test]
fn non_object_arr_is_err() {
let json = json!([1]);

assert!(into_event_batch(
&json,
HashMap::default(),
assert!(convert_array_to_object(
json,
None,
None,
None,
SchemaVersion::V0,
&LogSource::default()
&crate::event::format::LogSource::default()
)
.is_err())
}
Expand All @@ -586,15 +546,8 @@ mod tests {
},
]);

let (rb, _) = into_event_batch(
&json,
HashMap::default(),
None,
None,
SchemaVersion::V0,
&LogSource::default(),
)
.unwrap();
let (rb, _) =
into_event_batch(&json, HashMap::default(), None, None, SchemaVersion::V0).unwrap();

assert_eq!(rb.num_rows(), 3);
assert_eq!(rb.num_columns(), 4);
Expand Down Expand Up @@ -640,15 +593,8 @@ mod tests {
},
]);

let (rb, _) = into_event_batch(
&json,
HashMap::default(),
None,
None,
SchemaVersion::V0,
&LogSource::default(),
)
.unwrap();
let (rb, _) =
into_event_batch(&json, HashMap::default(), None, None, SchemaVersion::V0).unwrap();

assert_eq!(rb.num_rows(), 3);
assert_eq!(rb.num_columns(), 4);
Expand Down Expand Up @@ -695,15 +641,7 @@ mod tests {
.into_iter(),
);

let (rb, _) = into_event_batch(
&json,
schema,
None,
None,
SchemaVersion::V0,
&LogSource::default(),
)
.unwrap();
let (rb, _) = into_event_batch(&json, schema, None, None, SchemaVersion::V0).unwrap();

assert_eq!(rb.num_rows(), 3);
assert_eq!(rb.num_columns(), 4);
Expand Down Expand Up @@ -750,15 +688,7 @@ mod tests {
.into_iter(),
);

assert!(into_event_batch(
&json,
schema,
None,
None,
SchemaVersion::V0,
&LogSource::default()
)
.is_err());
assert!(into_event_batch(&json, schema, None, None, SchemaVersion::V0,).is_err());
}

#[test]
Expand All @@ -783,17 +713,27 @@ mod tests {
"c": [{"a": 1, "b": 2}]
},
]);
let flattened_json = convert_to_array(
convert_array_to_object(
json,
None,
None,
None,
SchemaVersion::V0,
&crate::event::format::LogSource::default(),
)
.unwrap(),
)
.unwrap();

let (rb, _) = into_event_batch(
&json,
&flattened_json,
HashMap::default(),
None,
None,
SchemaVersion::V0,
&LogSource::default(),
)
.unwrap();

assert_eq!(rb.num_rows(), 4);
assert_eq!(rb.num_columns(), 5);
assert_eq!(
Expand Down Expand Up @@ -861,14 +801,25 @@ mod tests {
"c": [{"a": 1, "b": 2}]
},
]);
let flattened_json = convert_to_array(
convert_array_to_object(
json,
None,
None,
None,
SchemaVersion::V1,
&crate::event::format::LogSource::default(),
)
.unwrap(),
)
.unwrap();

let (rb, _) = into_event_batch(
&json,
&flattened_json,
HashMap::default(),
None,
None,
SchemaVersion::V1,
&LogSource::default(),
)
.unwrap();

Expand Down
Loading

0 comments on commit e4e0ae5

Please sign in to comment.