diff --git a/src/catalog/column.rs b/src/catalog/column.rs index ef4b5858b..d5db2950d 100644 --- a/src/catalog/column.rs +++ b/src/catalog/column.rs @@ -66,14 +66,6 @@ impl TypedStatistics { max: max(this.max, other.max), }) } - - // Ints are casted to Float if self is Float and other in Int - (TypedStatistics::Float(this), TypedStatistics::Int(other)) => { - TypedStatistics::Float(Float64Type { - min: this.min.min(other.min as f64), - max: this.max.max(other.max as f64), - }) - } (TypedStatistics::Float(this), TypedStatistics::Float(other)) => { TypedStatistics::Float(Float64Type { min: this.min.min(other.min), diff --git a/src/event/format/json.rs b/src/event/format/json.rs index 0a137fdb2..487cb58a6 100644 --- a/src/event/format/json.rs +++ b/src/event/format/json.rs @@ -185,9 +185,7 @@ fn valid_type(data_type: &DataType, value: &Value) -> bool { DataType::Boolean => value.is_boolean(), DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => value.is_i64(), DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 => value.is_u64(), - DataType::Float16 | DataType::Float32 => value.is_f64(), - // NOTE: All numbers can be ingested as Float64 - DataType::Float64 => value.is_number(), + DataType::Float16 | DataType::Float32 | DataType::Float64 => value.is_f64(), DataType::Utf8 => value.is_string(), DataType::List(field) => { let data_type = field.data_type(); diff --git a/src/event/format/mod.rs b/src/event/format/mod.rs index e0bb00daf..e637eb4e6 100644 --- a/src/event/format/mod.rs +++ b/src/event/format/mod.rs @@ -204,24 +204,6 @@ pub fn override_timestamp_fields( Arc::new(Schema::new(updated_fields)) } -/// All number fields from inferred schema are forced into Float64 -pub fn override_num_fields_from_schema(schema: Vec>) -> Vec> { - schema - .iter() - .map(|field| { - if field.data_type().is_numeric() { - Arc::new(Field::new( - field.name(), - DataType::Float64, - field.is_nullable(), - )) - } else { - field.clone() - } - }) - .collect::>>() -} - pub fn update_field_type_in_schema( inferred_schema: Arc, existing_schema: Option<&HashMap>>, @@ -230,10 +212,6 @@ pub fn update_field_type_in_schema( ) -> Arc { let mut updated_schema = inferred_schema.clone(); - // All number fields from inferred schema are forced into Float64 - updated_schema = Arc::new(Schema::new(override_num_fields_from_schema( - updated_schema.fields().to_vec(), - ))); if let Some(existing_schema) = existing_schema { let existing_fields = get_existing_fields(inferred_schema.clone(), Some(existing_schema)); let existing_timestamp_fields = get_existing_timestamp_fields(existing_schema); diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index 790d37ba9..50c7b5079 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -273,7 +273,7 @@ mod tests { use std::{collections::HashMap, sync::Arc}; use actix_web::test::TestRequest; - use arrow_array::{ArrayRef, Float64Array, StringArray}; + use arrow_array::{ArrayRef, Float64Array, Int64Array, StringArray}; use arrow_schema::{DataType, Field}; use serde_json::json; @@ -283,11 +283,16 @@ mod tests { }; trait TestExt { + fn as_int64_arr(&self) -> &Int64Array; fn as_float64_arr(&self) -> &Float64Array; fn as_utf8_arr(&self) -> &StringArray; } impl TestExt for ArrayRef { + fn as_int64_arr(&self) -> &Int64Array { + self.as_any().downcast_ref().unwrap() + } + fn as_float64_arr(&self) -> &Float64Array { self.as_any().downcast_ref().unwrap() } @@ -319,8 +324,8 @@ mod tests { assert_eq!(rb.num_rows(), 1); assert_eq!(rb.num_columns(), 6); assert_eq!( - rb.column_by_name("a").unwrap().as_float64_arr(), - &Float64Array::from_iter([1.0]) + rb.column_by_name("a").unwrap().as_int64_arr(), + &Int64Array::from_iter([1]) ); assert_eq!( rb.column_by_name("b").unwrap().as_utf8_arr(), @@ -359,8 +364,8 @@ mod tests { assert_eq!(rb.num_rows(), 1); assert_eq!(rb.num_columns(), 5); assert_eq!( - rb.column_by_name("a").unwrap().as_float64_arr(), - &Float64Array::from_iter([1.0]) + rb.column_by_name("a").unwrap().as_int64_arr(), + &Int64Array::from_iter([1]) ); assert_eq!( rb.column_by_name("b").unwrap().as_utf8_arr(), @@ -377,7 +382,7 @@ mod tests { let schema = fields_to_map( [ - Field::new("a", DataType::Float64, true), + Field::new("a", DataType::Int64, true), Field::new("b", DataType::Utf8, true), Field::new("c", DataType::Float64, true), ] @@ -391,8 +396,8 @@ mod tests { assert_eq!(rb.num_rows(), 1); assert_eq!(rb.num_columns(), 5); assert_eq!( - rb.column_by_name("a").unwrap().as_float64_arr(), - &Float64Array::from_iter([1.0]) + rb.column_by_name("a").unwrap().as_int64_arr(), + &Int64Array::from_iter([1]) ); assert_eq!( rb.column_by_name("b").unwrap().as_utf8_arr(), @@ -409,7 +414,7 @@ mod tests { let schema = fields_to_map( [ - Field::new("a", DataType::Float64, true), + Field::new("a", DataType::Int64, true), Field::new("b", DataType::Utf8, true), Field::new("c", DataType::Float64, true), ] @@ -479,21 +484,21 @@ mod tests { let schema = rb.schema(); let fields = &schema.fields; - assert_eq!(&*fields[1], &Field::new("a", DataType::Float64, true)); + assert_eq!(&*fields[1], &Field::new("a", DataType::Int64, true)); assert_eq!(&*fields[2], &Field::new("b", DataType::Utf8, true)); - assert_eq!(&*fields[3], &Field::new("c", DataType::Float64, true)); + assert_eq!(&*fields[3], &Field::new("c", DataType::Int64, true)); assert_eq!( - rb.column_by_name("a").unwrap().as_float64_arr(), - &Float64Array::from(vec![None, Some(1.0), Some(1.0)]) + rb.column_by_name("a").unwrap().as_int64_arr(), + &Int64Array::from(vec![None, Some(1), Some(1)]) ); assert_eq!( rb.column_by_name("b").unwrap().as_utf8_arr(), &StringArray::from(vec![Some("hello"), Some("hello"), Some("hello"),]) ); assert_eq!( - rb.column_by_name("c").unwrap().as_float64_arr(), - &Float64Array::from(vec![None, Some(1.0), None]) + rb.column_by_name("c").unwrap().as_int64_arr(), + &Int64Array::from(vec![None, Some(1), None]) ); } @@ -524,8 +529,8 @@ mod tests { assert_eq!(rb.num_rows(), 3); assert_eq!(rb.num_columns(), 6); assert_eq!( - rb.column_by_name("a").unwrap().as_float64_arr(), - &Float64Array::from(vec![None, Some(1.0), Some(1.0)]) + rb.column_by_name("a").unwrap().as_int64_arr(), + &Int64Array::from(vec![None, Some(1), Some(1)]) ); assert_eq!( rb.column_by_name("b").unwrap().as_utf8_arr(), @@ -559,7 +564,7 @@ mod tests { let schema = fields_to_map( [ - Field::new("a", DataType::Float64, true), + Field::new("a", DataType::Int64, true), Field::new("b", DataType::Utf8, true), Field::new("c", DataType::Float64, true), ] @@ -572,8 +577,8 @@ mod tests { assert_eq!(rb.num_rows(), 3); assert_eq!(rb.num_columns(), 6); assert_eq!( - rb.column_by_name("a").unwrap().as_float64_arr(), - &Float64Array::from(vec![None, Some(1.0), Some(1.0)]) + rb.column_by_name("a").unwrap().as_int64_arr(), + &Int64Array::from(vec![None, Some(1), Some(1)]) ); assert_eq!( rb.column_by_name("b").unwrap().as_utf8_arr(), @@ -599,7 +604,7 @@ mod tests { "c": 1 }, { - "a": "1", + "a": 1, "b": "hello", "c": null }, @@ -609,7 +614,7 @@ mod tests { let schema = fields_to_map( [ - Field::new("a", DataType::Float64, true), + Field::new("a", DataType::Int64, true), Field::new("b", DataType::Utf8, true), Field::new("c", DataType::Float64, true), ] @@ -649,8 +654,8 @@ mod tests { assert_eq!(rb.num_rows(), 4); assert_eq!(rb.num_columns(), 7); assert_eq!( - rb.column_by_name("a").unwrap().as_float64_arr(), - &Float64Array::from(vec![Some(1.0), Some(1.0), Some(1.0), Some(1.0)]) + rb.column_by_name("a").unwrap().as_int64_arr(), + &Int64Array::from(vec![Some(1), Some(1), Some(1), Some(1)]) ); assert_eq!( rb.column_by_name("b").unwrap().as_utf8_arr(), @@ -663,13 +668,13 @@ mod tests { ); assert_eq!( - rb.column_by_name("c_a").unwrap().as_float64_arr(), - &Float64Array::from(vec![None, None, Some(1.0), Some(1.0)]) + rb.column_by_name("c_a").unwrap().as_int64_arr(), + &Int64Array::from(vec![None, None, Some(1), Some(1)]) ); assert_eq!( - rb.column_by_name("c_b").unwrap().as_float64_arr(), - &Float64Array::from(vec![None, None, None, Some(2.0)]) + rb.column_by_name("c_b").unwrap().as_int64_arr(), + &Int64Array::from(vec![None, None, None, Some(2)]) ); } } diff --git a/src/metadata.rs b/src/metadata.rs index 132ce540f..1fe01034c 100644 --- a/src/metadata.rs +++ b/src/metadata.rs @@ -164,20 +164,6 @@ impl StreamInfo { Ok(Arc::new(schema)) } - /// update the schema in the metadata - pub fn set_schema( - &self, - stream_name: &str, - schema: HashMap>, - ) -> Result<(), MetadataError> { - let mut map = self.write().expect(LOCK_EXPECT); - map.get_mut(stream_name) - .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string())) - .map(|metadata| { - metadata.schema = schema; - }) - } - pub fn set_alert(&self, stream_name: &str, alerts: Alerts) -> Result<(), MetadataError> { let mut map = self.write().expect(LOCK_EXPECT); map.get_mut(stream_name) diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index ca70b3bd7..c824ae8ba 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -25,7 +25,6 @@ use super::{ SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY, }; -use crate::event::format::override_num_fields_from_schema; use crate::handlers::http::modal::ingest_server::INGESTOR_META; use crate::handlers::http::users::{DASHBOARDS_DIR, FILTER_DIR, USERS_ROOT_DIR}; use crate::metrics::{EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_STORAGE_SIZE}; @@ -40,7 +39,7 @@ use crate::{ }; use actix_web_prometheus::PrometheusMetrics; -use arrow_schema::{Field, Schema}; +use arrow_schema::Schema; use async_trait::async_trait; use bytes::Bytes; use chrono::Local; @@ -633,21 +632,8 @@ pub async fn commit_schema_to_storage( schema: Schema, ) -> Result<(), ObjectStorageError> { let storage = CONFIG.storage().get_object_store(); - let mut stream_schema = storage.get_schema(stream_name).await?; - // override the data type of all numeric fields to Float64 - //if data type is not Float64 already - stream_schema = Schema::new(override_num_fields_from_schema( - stream_schema.fields().iter().cloned().collect(), - )); + let stream_schema = storage.get_schema(stream_name).await?; let new_schema = Schema::try_merge(vec![schema, stream_schema]).unwrap(); - - //update the merged schema in the metadata and storage - let schema_map: HashMap> = new_schema - .fields() - .iter() - .map(|field| (field.name().clone(), Arc::clone(field))) - .collect(); - let _ = STREAM_INFO.set_schema(stream_name, schema_map); storage.put_schema(stream_name, &new_schema).await }