Skip to content

Commit

Permalink
feat: don't clone on events
Browse files Browse the repository at this point in the history
  • Loading branch information
de-sh committed Dec 18, 2024
1 parent fcd6988 commit 8dee91a
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 23 deletions.
10 changes: 5 additions & 5 deletions src/event/format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ impl EventFormat for Event {
// also extract the arrow schema, tags and metadata from the incoming json
fn to_data(
self,
schema: HashMap<String, Arc<Field>>,
static_schema_flag: Option<String>,
time_partition: Option<String>,
schema: &HashMap<String, Arc<Field>>,
static_schema_flag: Option<&String>,
time_partition: Option<&String>,
) -> Result<(Self::Data, Vec<Arc<Field>>, bool, Tags, Metadata), anyhow::Error> {
let data = flatten_json_body(self.data, None, None, None, false)?;
let stream_schema = schema;
Expand All @@ -66,13 +66,13 @@ impl EventFormat for Event {
collect_keys(value_arr.iter()).expect("fields can be collected from array of objects");

let mut is_first = false;
let schema = match derive_arrow_schema(&stream_schema, fields) {
let schema = match derive_arrow_schema(stream_schema, fields) {
Ok(schema) => schema,
Err(_) => match infer_json_schema_from_iterator(value_arr.iter().map(Ok)) {
Ok(mut infer_schema) => {
let new_infer_schema = super::super::format::update_field_type_in_schema(
Arc::new(infer_schema),
Some(&stream_schema),
Some(stream_schema),
time_partition,
Some(&value_arr),
);
Expand Down
31 changes: 15 additions & 16 deletions src/event/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,22 +42,21 @@ pub trait EventFormat: Sized {

fn to_data(
self,
schema: HashMap<String, Arc<Field>>,
static_schema_flag: Option<String>,
time_partition: Option<String>,
schema: &HashMap<String, Arc<Field>>,
static_schema_flag: Option<&String>,
time_partition: Option<&String>,
) -> Result<(Self::Data, EventSchema, bool, Tags, Metadata), AnyError>;

fn decode(data: Self::Data, schema: Arc<Schema>) -> Result<RecordBatch, AnyError>;

fn into_recordbatch(
self,
storage_schema: HashMap<String, Arc<Field>>,
static_schema_flag: Option<String>,
time_partition: Option<String>,
storage_schema: &HashMap<String, Arc<Field>>,
static_schema_flag: Option<&String>,
time_partition: Option<&String>,
) -> Result<(RecordBatch, bool), AnyError> {
let (data, mut schema, is_first, tags, metadata) = self.to_data(
storage_schema.clone(),
static_schema_flag.clone(),
time_partition.clone(),
)?;
let (data, mut schema, is_first, tags, metadata) =
self.to_data(storage_schema, static_schema_flag, time_partition)?;

if get_field(&schema, DEFAULT_TAGS_KEY).is_some() {
return Err(anyhow!("field {} is a reserved field", DEFAULT_TAGS_KEY));
Expand Down Expand Up @@ -120,8 +119,8 @@ pub trait EventFormat: Sized {

fn is_schema_matching(
new_schema: Arc<Schema>,
storage_schema: HashMap<String, Arc<Field>>,
static_schema_flag: Option<String>,
storage_schema: &HashMap<String, Arc<Field>>,
static_schema_flag: Option<&String>,
) -> bool {
if static_schema_flag.is_none() {
return true;
Expand Down Expand Up @@ -225,7 +224,7 @@ pub fn override_num_fields_from_schema(schema: Vec<Arc<Field>>) -> Vec<Arc<Field
pub fn update_field_type_in_schema(
inferred_schema: Arc<Schema>,
existing_schema: Option<&HashMap<String, Arc<Field>>>,
time_partition: Option<String>,
time_partition: Option<&String>,
log_records: Option<&Vec<Value>>,
) -> Arc<Schema> {
let mut updated_schema = inferred_schema.clone();
Expand Down Expand Up @@ -258,12 +257,12 @@ pub fn update_field_type_in_schema(
if time_partition.is_none() {
return updated_schema;
}
let time_partition_field_name = time_partition.unwrap();

let new_schema: Vec<Field> = updated_schema
.fields()
.iter()
.map(|field| {
if *field.name() == time_partition_field_name {
if field.name() == time_partition.unwrap() {
if field.data_type() == &DataType::Utf8 {
let new_data_type = DataType::Timestamp(TimeUnit::Millisecond, None);
Field::new(field.name().clone(), new_data_type, true)
Expand Down
2 changes: 1 addition & 1 deletion src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<
tags: String::default(),
metadata: String::default(),
};
event.into_recordbatch(schema, None, None)?
event.into_recordbatch(&schema, None, None)?
};
event::Event {
rb,
Expand Down
6 changes: 5 additions & 1 deletion src/handlers/http/modal/utils/ingest_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,11 @@ pub fn into_event_batch(
tags,
metadata,
};
let (rb, is_first) = event.into_recordbatch(schema, static_schema_flag, time_partition)?;
let (rb, is_first) = event.into_recordbatch(
&schema,
static_schema_flag.as_ref(),
time_partition.as_ref(),
)?;
Ok((rb, is_first))
}

Expand Down

0 comments on commit 8dee91a

Please sign in to comment.