From 6d3632a7f5bc9e29e35ae8ad1e165c3a0797195e Mon Sep 17 00:00:00 2001 From: Nikhil Sinha <131262146+nikhilsinhaparseable@users.noreply.github.com> Date: Wed, 11 Dec 2024 13:25:28 -0500 Subject: [PATCH] fix: convert all number data types to float (#1027) server checks if event has any number (all ints and floats) then update the data type of all numbers to Float64 This is useful to allow users to dynamically switch between an int or a float in their events. --------- Signed-off-by: Nitish Tiwari Co-authored-by: Nitish Tiwari Co-authored-by: Devdutt Shenoi --- src/catalog/column.rs | 8 +++++ src/event/format/json.rs | 4 ++- src/event/format/mod.rs | 22 +++++++++++++ src/handlers/http/ingest.rs | 61 ++++++++++++++++------------------- src/metadata.rs | 14 ++++++++ src/storage/object_storage.rs | 18 +++++++++-- 6 files changed, 91 insertions(+), 36 deletions(-) diff --git a/src/catalog/column.rs b/src/catalog/column.rs index d5db2950d..ef4b5858b 100644 --- a/src/catalog/column.rs +++ b/src/catalog/column.rs @@ -66,6 +66,14 @@ impl TypedStatistics { max: max(this.max, other.max), }) } + + // Ints are casted to Float if self is Float and other in Int + (TypedStatistics::Float(this), TypedStatistics::Int(other)) => { + TypedStatistics::Float(Float64Type { + min: this.min.min(other.min as f64), + max: this.max.max(other.max as f64), + }) + } (TypedStatistics::Float(this), TypedStatistics::Float(other)) => { TypedStatistics::Float(Float64Type { min: this.min.min(other.min), diff --git a/src/event/format/json.rs b/src/event/format/json.rs index 487cb58a6..0a137fdb2 100644 --- a/src/event/format/json.rs +++ b/src/event/format/json.rs @@ -185,7 +185,9 @@ fn valid_type(data_type: &DataType, value: &Value) -> bool { DataType::Boolean => value.is_boolean(), DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => value.is_i64(), DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 => value.is_u64(), - DataType::Float16 | DataType::Float32 | DataType::Float64 => value.is_f64(), + DataType::Float16 | DataType::Float32 => value.is_f64(), + // NOTE: All numbers can be ingested as Float64 + DataType::Float64 => value.is_number(), DataType::Utf8 => value.is_string(), DataType::List(field) => { let data_type = field.data_type(); diff --git a/src/event/format/mod.rs b/src/event/format/mod.rs index e637eb4e6..e0bb00daf 100644 --- a/src/event/format/mod.rs +++ b/src/event/format/mod.rs @@ -204,6 +204,24 @@ pub fn override_timestamp_fields( Arc::new(Schema::new(updated_fields)) } +/// All number fields from inferred schema are forced into Float64 +pub fn override_num_fields_from_schema(schema: Vec>) -> Vec> { + schema + .iter() + .map(|field| { + if field.data_type().is_numeric() { + Arc::new(Field::new( + field.name(), + DataType::Float64, + field.is_nullable(), + )) + } else { + field.clone() + } + }) + .collect::>>() +} + pub fn update_field_type_in_schema( inferred_schema: Arc, existing_schema: Option<&HashMap>>, @@ -212,6 +230,10 @@ pub fn update_field_type_in_schema( ) -> Arc { let mut updated_schema = inferred_schema.clone(); + // All number fields from inferred schema are forced into Float64 + updated_schema = Arc::new(Schema::new(override_num_fields_from_schema( + updated_schema.fields().to_vec(), + ))); if let Some(existing_schema) = existing_schema { let existing_fields = get_existing_fields(inferred_schema.clone(), Some(existing_schema)); let existing_timestamp_fields = get_existing_timestamp_fields(existing_schema); diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index 305bfecb0..303f591c3 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -277,7 +277,7 @@ mod tests { use std::{collections::HashMap, sync::Arc}; use actix_web::test::TestRequest; - use arrow_array::{ArrayRef, Float64Array, Int64Array, StringArray}; + use arrow_array::{ArrayRef, Float64Array, StringArray}; use arrow_schema::{DataType, Field}; use serde_json::json; @@ -287,16 +287,11 @@ mod tests { }; trait TestExt { - fn as_int64_arr(&self) -> &Int64Array; fn as_float64_arr(&self) -> &Float64Array; fn as_utf8_arr(&self) -> &StringArray; } impl TestExt for ArrayRef { - fn as_int64_arr(&self) -> &Int64Array { - self.as_any().downcast_ref().unwrap() - } - fn as_float64_arr(&self) -> &Float64Array { self.as_any().downcast_ref().unwrap() } @@ -328,8 +323,8 @@ mod tests { assert_eq!(rb.num_rows(), 1); assert_eq!(rb.num_columns(), 6); assert_eq!( - rb.column_by_name("a").unwrap().as_int64_arr(), - &Int64Array::from_iter([1]) + rb.column_by_name("a").unwrap().as_float64_arr(), + &Float64Array::from_iter([1.0]) ); assert_eq!( rb.column_by_name("b").unwrap().as_utf8_arr(), @@ -368,8 +363,8 @@ mod tests { assert_eq!(rb.num_rows(), 1); assert_eq!(rb.num_columns(), 5); assert_eq!( - rb.column_by_name("a").unwrap().as_int64_arr(), - &Int64Array::from_iter([1]) + rb.column_by_name("a").unwrap().as_float64_arr(), + &Float64Array::from_iter([1.0]) ); assert_eq!( rb.column_by_name("b").unwrap().as_utf8_arr(), @@ -386,7 +381,7 @@ mod tests { let schema = fields_to_map( [ - Field::new("a", DataType::Int64, true), + Field::new("a", DataType::Float64, true), Field::new("b", DataType::Utf8, true), Field::new("c", DataType::Float64, true), ] @@ -400,8 +395,8 @@ mod tests { assert_eq!(rb.num_rows(), 1); assert_eq!(rb.num_columns(), 5); assert_eq!( - rb.column_by_name("a").unwrap().as_int64_arr(), - &Int64Array::from_iter([1]) + rb.column_by_name("a").unwrap().as_float64_arr(), + &Float64Array::from_iter([1.0]) ); assert_eq!( rb.column_by_name("b").unwrap().as_utf8_arr(), @@ -418,7 +413,7 @@ mod tests { let schema = fields_to_map( [ - Field::new("a", DataType::Int64, true), + Field::new("a", DataType::Float64, true), Field::new("b", DataType::Utf8, true), Field::new("c", DataType::Float64, true), ] @@ -488,21 +483,21 @@ mod tests { let schema = rb.schema(); let fields = &schema.fields; - assert_eq!(&*fields[1], &Field::new("a", DataType::Int64, true)); + assert_eq!(&*fields[1], &Field::new("a", DataType::Float64, true)); assert_eq!(&*fields[2], &Field::new("b", DataType::Utf8, true)); - assert_eq!(&*fields[3], &Field::new("c", DataType::Int64, true)); + assert_eq!(&*fields[3], &Field::new("c", DataType::Float64, true)); assert_eq!( - rb.column_by_name("a").unwrap().as_int64_arr(), - &Int64Array::from(vec![None, Some(1), Some(1)]) + rb.column_by_name("a").unwrap().as_float64_arr(), + &Float64Array::from(vec![None, Some(1.0), Some(1.0)]) ); assert_eq!( rb.column_by_name("b").unwrap().as_utf8_arr(), &StringArray::from(vec![Some("hello"), Some("hello"), Some("hello"),]) ); assert_eq!( - rb.column_by_name("c").unwrap().as_int64_arr(), - &Int64Array::from(vec![None, Some(1), None]) + rb.column_by_name("c").unwrap().as_float64_arr(), + &Float64Array::from(vec![None, Some(1.0), None]) ); } @@ -533,8 +528,8 @@ mod tests { assert_eq!(rb.num_rows(), 3); assert_eq!(rb.num_columns(), 6); assert_eq!( - rb.column_by_name("a").unwrap().as_int64_arr(), - &Int64Array::from(vec![None, Some(1), Some(1)]) + rb.column_by_name("a").unwrap().as_float64_arr(), + &Float64Array::from(vec![None, Some(1.0), Some(1.0)]) ); assert_eq!( rb.column_by_name("b").unwrap().as_utf8_arr(), @@ -568,7 +563,7 @@ mod tests { let schema = fields_to_map( [ - Field::new("a", DataType::Int64, true), + Field::new("a", DataType::Float64, true), Field::new("b", DataType::Utf8, true), Field::new("c", DataType::Float64, true), ] @@ -581,8 +576,8 @@ mod tests { assert_eq!(rb.num_rows(), 3); assert_eq!(rb.num_columns(), 6); assert_eq!( - rb.column_by_name("a").unwrap().as_int64_arr(), - &Int64Array::from(vec![None, Some(1), Some(1)]) + rb.column_by_name("a").unwrap().as_float64_arr(), + &Float64Array::from(vec![None, Some(1.0), Some(1.0)]) ); assert_eq!( rb.column_by_name("b").unwrap().as_utf8_arr(), @@ -608,7 +603,7 @@ mod tests { "c": 1 }, { - "a": 1, + "a": "1", "b": "hello", "c": null }, @@ -618,7 +613,7 @@ mod tests { let schema = fields_to_map( [ - Field::new("a", DataType::Int64, true), + Field::new("a", DataType::Float64, true), Field::new("b", DataType::Utf8, true), Field::new("c", DataType::Float64, true), ] @@ -658,8 +653,8 @@ mod tests { assert_eq!(rb.num_rows(), 4); assert_eq!(rb.num_columns(), 7); assert_eq!( - rb.column_by_name("a").unwrap().as_int64_arr(), - &Int64Array::from(vec![Some(1), Some(1), Some(1), Some(1)]) + rb.column_by_name("a").unwrap().as_float64_arr(), + &Float64Array::from(vec![Some(1.0), Some(1.0), Some(1.0), Some(1.0)]) ); assert_eq!( rb.column_by_name("b").unwrap().as_utf8_arr(), @@ -672,13 +667,13 @@ mod tests { ); assert_eq!( - rb.column_by_name("c_a").unwrap().as_int64_arr(), - &Int64Array::from(vec![None, None, Some(1), Some(1)]) + rb.column_by_name("c_a").unwrap().as_float64_arr(), + &Float64Array::from(vec![None, None, Some(1.0), Some(1.0)]) ); assert_eq!( - rb.column_by_name("c_b").unwrap().as_int64_arr(), - &Int64Array::from(vec![None, None, None, Some(2)]) + rb.column_by_name("c_b").unwrap().as_float64_arr(), + &Float64Array::from(vec![None, None, None, Some(2.0)]) ); } } diff --git a/src/metadata.rs b/src/metadata.rs index f768a4e88..5447ea796 100644 --- a/src/metadata.rs +++ b/src/metadata.rs @@ -164,6 +164,20 @@ impl StreamInfo { Ok(Arc::new(schema)) } + /// update the schema in the metadata + pub fn set_schema( + &self, + stream_name: &str, + schema: HashMap>, + ) -> Result<(), MetadataError> { + let mut map = self.write().expect(LOCK_EXPECT); + map.get_mut(stream_name) + .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string())) + .map(|metadata| { + metadata.schema = schema; + }) + } + pub fn set_alert(&self, stream_name: &str, alerts: Alerts) -> Result<(), MetadataError> { let mut map = self.write().expect(LOCK_EXPECT); map.get_mut(stream_name) diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index e70c326bd..e9ee32f18 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -25,6 +25,7 @@ use super::{ SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY, }; +use crate::event::format::override_num_fields_from_schema; use crate::handlers::http::modal::ingest_server::INGESTOR_META; use crate::handlers::http::users::{DASHBOARDS_DIR, FILTER_DIR, USERS_ROOT_DIR}; use crate::metrics::{EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_STORAGE_SIZE}; @@ -40,7 +41,7 @@ use crate::{ }; use actix_web_prometheus::PrometheusMetrics; -use arrow_schema::Schema; +use arrow_schema::{Field, Schema}; use async_trait::async_trait; use bytes::Bytes; use chrono::Local; @@ -667,8 +668,21 @@ pub async fn commit_schema_to_storage( schema: Schema, ) -> Result<(), ObjectStorageError> { let storage = CONFIG.storage().get_object_store(); - let stream_schema = storage.get_schema(stream_name).await?; + let mut stream_schema = storage.get_schema(stream_name).await?; + // override the data type of all numeric fields to Float64 + //if data type is not Float64 already + stream_schema = Schema::new(override_num_fields_from_schema( + stream_schema.fields().iter().cloned().collect(), + )); let new_schema = Schema::try_merge(vec![schema, stream_schema]).unwrap(); + + //update the merged schema in the metadata and storage + let schema_map: HashMap> = new_schema + .fields() + .iter() + .map(|field| (field.name().clone(), Arc::clone(field))) + .collect(); + let _ = STREAM_INFO.set_schema(stream_name, schema_map); storage.put_schema(stream_name, &new_schema).await }