From 8dee91ac4a2291275cc1e3b51f3f34593a80a7ea Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Thu, 19 Dec 2024 00:28:09 +0530 Subject: [PATCH] feat: don't clone on events --- src/event/format/json.rs | 10 +++--- src/event/format/mod.rs | 31 +++++++++---------- src/handlers/http/ingest.rs | 2 +- src/handlers/http/modal/utils/ingest_utils.rs | 6 +++- 4 files changed, 26 insertions(+), 23 deletions(-) diff --git a/src/event/format/json.rs b/src/event/format/json.rs index 0a137fdb2..9e228c646 100644 --- a/src/event/format/json.rs +++ b/src/event/format/json.rs @@ -45,9 +45,9 @@ impl EventFormat for Event { // also extract the arrow schema, tags and metadata from the incoming json fn to_data( self, - schema: HashMap>, - static_schema_flag: Option, - time_partition: Option, + schema: &HashMap>, + static_schema_flag: Option<&String>, + time_partition: Option<&String>, ) -> Result<(Self::Data, Vec>, bool, Tags, Metadata), anyhow::Error> { let data = flatten_json_body(self.data, None, None, None, false)?; let stream_schema = schema; @@ -66,13 +66,13 @@ impl EventFormat for Event { collect_keys(value_arr.iter()).expect("fields can be collected from array of objects"); let mut is_first = false; - let schema = match derive_arrow_schema(&stream_schema, fields) { + let schema = match derive_arrow_schema(stream_schema, fields) { Ok(schema) => schema, Err(_) => match infer_json_schema_from_iterator(value_arr.iter().map(Ok)) { Ok(mut infer_schema) => { let new_infer_schema = super::super::format::update_field_type_in_schema( Arc::new(infer_schema), - Some(&stream_schema), + Some(stream_schema), time_partition, Some(&value_arr), ); diff --git a/src/event/format/mod.rs b/src/event/format/mod.rs index e0bb00daf..433b96cb0 100644 --- a/src/event/format/mod.rs +++ b/src/event/format/mod.rs @@ -42,22 +42,21 @@ pub trait EventFormat: Sized { fn to_data( self, - schema: HashMap>, - static_schema_flag: Option, - time_partition: Option, + schema: &HashMap>, + static_schema_flag: Option<&String>, + time_partition: Option<&String>, ) -> Result<(Self::Data, EventSchema, bool, Tags, Metadata), AnyError>; + fn decode(data: Self::Data, schema: Arc) -> Result; + fn into_recordbatch( self, - storage_schema: HashMap>, - static_schema_flag: Option, - time_partition: Option, + storage_schema: &HashMap>, + static_schema_flag: Option<&String>, + time_partition: Option<&String>, ) -> Result<(RecordBatch, bool), AnyError> { - let (data, mut schema, is_first, tags, metadata) = self.to_data( - storage_schema.clone(), - static_schema_flag.clone(), - time_partition.clone(), - )?; + let (data, mut schema, is_first, tags, metadata) = + self.to_data(storage_schema, static_schema_flag, time_partition)?; if get_field(&schema, DEFAULT_TAGS_KEY).is_some() { return Err(anyhow!("field {} is a reserved field", DEFAULT_TAGS_KEY)); @@ -120,8 +119,8 @@ pub trait EventFormat: Sized { fn is_schema_matching( new_schema: Arc, - storage_schema: HashMap>, - static_schema_flag: Option, + storage_schema: &HashMap>, + static_schema_flag: Option<&String>, ) -> bool { if static_schema_flag.is_none() { return true; @@ -225,7 +224,7 @@ pub fn override_num_fields_from_schema(schema: Vec>) -> Vec, existing_schema: Option<&HashMap>>, - time_partition: Option, + time_partition: Option<&String>, log_records: Option<&Vec>, ) -> Arc { let mut updated_schema = inferred_schema.clone(); @@ -258,12 +257,12 @@ pub fn update_field_type_in_schema( if time_partition.is_none() { return updated_schema; } - let time_partition_field_name = time_partition.unwrap(); + let new_schema: Vec = updated_schema .fields() .iter() .map(|field| { - if *field.name() == time_partition_field_name { + if field.name() == time_partition.unwrap() { if field.data_type() == &DataType::Utf8 { let new_data_type = DataType::Timestamp(TimeUnit::Millisecond, None); Field::new(field.name().clone(), new_data_type, true) diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index 790d37ba9..733c2f175 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -84,7 +84,7 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result< tags: String::default(), metadata: String::default(), }; - event.into_recordbatch(schema, None, None)? + event.into_recordbatch(&schema, None, None)? }; event::Event { rb, diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs index c21f22a10..8f4ddc744 100644 --- a/src/handlers/http/modal/utils/ingest_utils.rs +++ b/src/handlers/http/modal/utils/ingest_utils.rs @@ -230,7 +230,11 @@ pub fn into_event_batch( tags, metadata, }; - let (rb, is_first) = event.into_recordbatch(schema, static_schema_flag, time_partition)?; + let (rb, is_first) = event.into_recordbatch( + &schema, + static_schema_flag.as_ref(), + time_partition.as_ref(), + )?; Ok((rb, is_first)) }