diff --git a/src/catalog/column.rs b/src/catalog/column.rs index d5db2950d..ef4b5858b 100644 --- a/src/catalog/column.rs +++ b/src/catalog/column.rs @@ -66,6 +66,14 @@ impl TypedStatistics { max: max(this.max, other.max), }) } + + // Ints are casted to Float if self is Float and other in Int + (TypedStatistics::Float(this), TypedStatistics::Int(other)) => { + TypedStatistics::Float(Float64Type { + min: this.min.min(other.min as f64), + max: this.max.max(other.max as f64), + }) + } (TypedStatistics::Float(this), TypedStatistics::Float(other)) => { TypedStatistics::Float(Float64Type { min: this.min.min(other.min), diff --git a/src/event/format/json.rs b/src/event/format/json.rs index 487cb58a6..0a137fdb2 100644 --- a/src/event/format/json.rs +++ b/src/event/format/json.rs @@ -185,7 +185,9 @@ fn valid_type(data_type: &DataType, value: &Value) -> bool { DataType::Boolean => value.is_boolean(), DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => value.is_i64(), DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 => value.is_u64(), - DataType::Float16 | DataType::Float32 | DataType::Float64 => value.is_f64(), + DataType::Float16 | DataType::Float32 => value.is_f64(), + // NOTE: All numbers can be ingested as Float64 + DataType::Float64 => value.is_number(), DataType::Utf8 => value.is_string(), DataType::List(field) => { let data_type = field.data_type(); diff --git a/src/event/format/mod.rs b/src/event/format/mod.rs index e637eb4e6..e0bb00daf 100644 --- a/src/event/format/mod.rs +++ b/src/event/format/mod.rs @@ -204,6 +204,24 @@ pub fn override_timestamp_fields( Arc::new(Schema::new(updated_fields)) } +/// All number fields from inferred schema are forced into Float64 +pub fn override_num_fields_from_schema(schema: Vec>) -> Vec> { + schema + .iter() + .map(|field| { + if field.data_type().is_numeric() { + Arc::new(Field::new( + field.name(), + DataType::Float64, + field.is_nullable(), + )) + } else { + field.clone() + } + }) + .collect::>>() +} + pub fn update_field_type_in_schema( inferred_schema: Arc, existing_schema: Option<&HashMap>>, @@ -212,6 +230,10 @@ pub fn update_field_type_in_schema( ) -> Arc { let mut updated_schema = inferred_schema.clone(); + // All number fields from inferred schema are forced into Float64 + updated_schema = Arc::new(Schema::new(override_num_fields_from_schema( + updated_schema.fields().to_vec(), + ))); if let Some(existing_schema) = existing_schema { let existing_fields = get_existing_fields(inferred_schema.clone(), Some(existing_schema)); let existing_timestamp_fields = get_existing_timestamp_fields(existing_schema); diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index 305bfecb0..303f591c3 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -277,7 +277,7 @@ mod tests { use std::{collections::HashMap, sync::Arc}; use actix_web::test::TestRequest; - use arrow_array::{ArrayRef, Float64Array, Int64Array, StringArray}; + use arrow_array::{ArrayRef, Float64Array, StringArray}; use arrow_schema::{DataType, Field}; use serde_json::json; @@ -287,16 +287,11 @@ mod tests { }; trait TestExt { - fn as_int64_arr(&self) -> &Int64Array; fn as_float64_arr(&self) -> &Float64Array; fn as_utf8_arr(&self) -> &StringArray; } impl TestExt for ArrayRef { - fn as_int64_arr(&self) -> &Int64Array { - self.as_any().downcast_ref().unwrap() - } - fn as_float64_arr(&self) -> &Float64Array { self.as_any().downcast_ref().unwrap() } @@ -328,8 +323,8 @@ mod tests { assert_eq!(rb.num_rows(), 1); assert_eq!(rb.num_columns(), 6); assert_eq!( - rb.column_by_name("a").unwrap().as_int64_arr(), - &Int64Array::from_iter([1]) + rb.column_by_name("a").unwrap().as_float64_arr(), + &Float64Array::from_iter([1.0]) ); assert_eq!( rb.column_by_name("b").unwrap().as_utf8_arr(), @@ -368,8 +363,8 @@ mod tests { assert_eq!(rb.num_rows(), 1); assert_eq!(rb.num_columns(), 5); assert_eq!( - rb.column_by_name("a").unwrap().as_int64_arr(), - &Int64Array::from_iter([1]) + rb.column_by_name("a").unwrap().as_float64_arr(), + &Float64Array::from_iter([1.0]) ); assert_eq!( rb.column_by_name("b").unwrap().as_utf8_arr(), @@ -386,7 +381,7 @@ mod tests { let schema = fields_to_map( [ - Field::new("a", DataType::Int64, true), + Field::new("a", DataType::Float64, true), Field::new("b", DataType::Utf8, true), Field::new("c", DataType::Float64, true), ] @@ -400,8 +395,8 @@ mod tests { assert_eq!(rb.num_rows(), 1); assert_eq!(rb.num_columns(), 5); assert_eq!( - rb.column_by_name("a").unwrap().as_int64_arr(), - &Int64Array::from_iter([1]) + rb.column_by_name("a").unwrap().as_float64_arr(), + &Float64Array::from_iter([1.0]) ); assert_eq!( rb.column_by_name("b").unwrap().as_utf8_arr(), @@ -418,7 +413,7 @@ mod tests { let schema = fields_to_map( [ - Field::new("a", DataType::Int64, true), + Field::new("a", DataType::Float64, true), Field::new("b", DataType::Utf8, true), Field::new("c", DataType::Float64, true), ] @@ -488,21 +483,21 @@ mod tests { let schema = rb.schema(); let fields = &schema.fields; - assert_eq!(&*fields[1], &Field::new("a", DataType::Int64, true)); + assert_eq!(&*fields[1], &Field::new("a", DataType::Float64, true)); assert_eq!(&*fields[2], &Field::new("b", DataType::Utf8, true)); - assert_eq!(&*fields[3], &Field::new("c", DataType::Int64, true)); + assert_eq!(&*fields[3], &Field::new("c", DataType::Float64, true)); assert_eq!( - rb.column_by_name("a").unwrap().as_int64_arr(), - &Int64Array::from(vec![None, Some(1), Some(1)]) + rb.column_by_name("a").unwrap().as_float64_arr(), + &Float64Array::from(vec![None, Some(1.0), Some(1.0)]) ); assert_eq!( rb.column_by_name("b").unwrap().as_utf8_arr(), &StringArray::from(vec![Some("hello"), Some("hello"), Some("hello"),]) ); assert_eq!( - rb.column_by_name("c").unwrap().as_int64_arr(), - &Int64Array::from(vec![None, Some(1), None]) + rb.column_by_name("c").unwrap().as_float64_arr(), + &Float64Array::from(vec![None, Some(1.0), None]) ); } @@ -533,8 +528,8 @@ mod tests { assert_eq!(rb.num_rows(), 3); assert_eq!(rb.num_columns(), 6); assert_eq!( - rb.column_by_name("a").unwrap().as_int64_arr(), - &Int64Array::from(vec![None, Some(1), Some(1)]) + rb.column_by_name("a").unwrap().as_float64_arr(), + &Float64Array::from(vec![None, Some(1.0), Some(1.0)]) ); assert_eq!( rb.column_by_name("b").unwrap().as_utf8_arr(), @@ -568,7 +563,7 @@ mod tests { let schema = fields_to_map( [ - Field::new("a", DataType::Int64, true), + Field::new("a", DataType::Float64, true), Field::new("b", DataType::Utf8, true), Field::new("c", DataType::Float64, true), ] @@ -581,8 +576,8 @@ mod tests { assert_eq!(rb.num_rows(), 3); assert_eq!(rb.num_columns(), 6); assert_eq!( - rb.column_by_name("a").unwrap().as_int64_arr(), - &Int64Array::from(vec![None, Some(1), Some(1)]) + rb.column_by_name("a").unwrap().as_float64_arr(), + &Float64Array::from(vec![None, Some(1.0), Some(1.0)]) ); assert_eq!( rb.column_by_name("b").unwrap().as_utf8_arr(), @@ -608,7 +603,7 @@ mod tests { "c": 1 }, { - "a": 1, + "a": "1", "b": "hello", "c": null }, @@ -618,7 +613,7 @@ mod tests { let schema = fields_to_map( [ - Field::new("a", DataType::Int64, true), + Field::new("a", DataType::Float64, true), Field::new("b", DataType::Utf8, true), Field::new("c", DataType::Float64, true), ] @@ -658,8 +653,8 @@ mod tests { assert_eq!(rb.num_rows(), 4); assert_eq!(rb.num_columns(), 7); assert_eq!( - rb.column_by_name("a").unwrap().as_int64_arr(), - &Int64Array::from(vec![Some(1), Some(1), Some(1), Some(1)]) + rb.column_by_name("a").unwrap().as_float64_arr(), + &Float64Array::from(vec![Some(1.0), Some(1.0), Some(1.0), Some(1.0)]) ); assert_eq!( rb.column_by_name("b").unwrap().as_utf8_arr(), @@ -672,13 +667,13 @@ mod tests { ); assert_eq!( - rb.column_by_name("c_a").unwrap().as_int64_arr(), - &Int64Array::from(vec![None, None, Some(1), Some(1)]) + rb.column_by_name("c_a").unwrap().as_float64_arr(), + &Float64Array::from(vec![None, None, Some(1.0), Some(1.0)]) ); assert_eq!( - rb.column_by_name("c_b").unwrap().as_int64_arr(), - &Int64Array::from(vec![None, None, None, Some(2)]) + rb.column_by_name("c_b").unwrap().as_float64_arr(), + &Float64Array::from(vec![None, None, None, Some(2.0)]) ); } } diff --git a/src/metadata.rs b/src/metadata.rs index f768a4e88..5447ea796 100644 --- a/src/metadata.rs +++ b/src/metadata.rs @@ -164,6 +164,20 @@ impl StreamInfo { Ok(Arc::new(schema)) } + /// update the schema in the metadata + pub fn set_schema( + &self, + stream_name: &str, + schema: HashMap>, + ) -> Result<(), MetadataError> { + let mut map = self.write().expect(LOCK_EXPECT); + map.get_mut(stream_name) + .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string())) + .map(|metadata| { + metadata.schema = schema; + }) + } + pub fn set_alert(&self, stream_name: &str, alerts: Alerts) -> Result<(), MetadataError> { let mut map = self.write().expect(LOCK_EXPECT); map.get_mut(stream_name) diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index e70c326bd..e9ee32f18 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -25,6 +25,7 @@ use super::{ SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY, }; +use crate::event::format::override_num_fields_from_schema; use crate::handlers::http::modal::ingest_server::INGESTOR_META; use crate::handlers::http::users::{DASHBOARDS_DIR, FILTER_DIR, USERS_ROOT_DIR}; use crate::metrics::{EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_STORAGE_SIZE}; @@ -40,7 +41,7 @@ use crate::{ }; use actix_web_prometheus::PrometheusMetrics; -use arrow_schema::Schema; +use arrow_schema::{Field, Schema}; use async_trait::async_trait; use bytes::Bytes; use chrono::Local; @@ -667,8 +668,21 @@ pub async fn commit_schema_to_storage( schema: Schema, ) -> Result<(), ObjectStorageError> { let storage = CONFIG.storage().get_object_store(); - let stream_schema = storage.get_schema(stream_name).await?; + let mut stream_schema = storage.get_schema(stream_name).await?; + // override the data type of all numeric fields to Float64 + //if data type is not Float64 already + stream_schema = Schema::new(override_num_fields_from_schema( + stream_schema.fields().iter().cloned().collect(), + )); let new_schema = Schema::try_merge(vec![schema, stream_schema]).unwrap(); + + //update the merged schema in the metadata and storage + let schema_map: HashMap> = new_schema + .fields() + .iter() + .map(|field| (field.name().clone(), Arc::clone(field))) + .collect(); + let _ = STREAM_INFO.set_schema(stream_name, schema_map); storage.put_schema(stream_name, &new_schema).await }