Skip to content

Commit

Permalink
Merge branch 'main' into kafka-integration
Browse files Browse the repository at this point in the history
  • Loading branch information
nitisht authored Dec 19, 2024
2 parents d46c4e3 + 702dbc8 commit 757c598
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 91 deletions.
8 changes: 0 additions & 8 deletions src/catalog/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,6 @@ impl TypedStatistics {
max: max(this.max, other.max),
})
}

// Ints are casted to Float if self is Float and other in Int
(TypedStatistics::Float(this), TypedStatistics::Int(other)) => {
TypedStatistics::Float(Float64Type {
min: this.min.min(other.min as f64),
max: this.max.max(other.max as f64),
})
}
(TypedStatistics::Float(this), TypedStatistics::Float(other)) => {
TypedStatistics::Float(Float64Type {
min: this.min.min(other.min),
Expand Down
4 changes: 1 addition & 3 deletions src/event/format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,7 @@ fn valid_type(data_type: &DataType, value: &Value) -> bool {
DataType::Boolean => value.is_boolean(),
DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => value.is_i64(),
DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 => value.is_u64(),
DataType::Float16 | DataType::Float32 => value.is_f64(),
// NOTE: All numbers can be ingested as Float64
DataType::Float64 => value.is_number(),
DataType::Float16 | DataType::Float32 | DataType::Float64 => value.is_f64(),
DataType::Utf8 => value.is_string(),
DataType::List(field) => {
let data_type = field.data_type();
Expand Down
22 changes: 0 additions & 22 deletions src/event/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,24 +204,6 @@ pub fn override_timestamp_fields(
Arc::new(Schema::new(updated_fields))
}

/// All number fields from inferred schema are forced into Float64
pub fn override_num_fields_from_schema(schema: Vec<Arc<Field>>) -> Vec<Arc<Field>> {
schema
.iter()
.map(|field| {
if field.data_type().is_numeric() {
Arc::new(Field::new(
field.name(),
DataType::Float64,
field.is_nullable(),
))
} else {
field.clone()
}
})
.collect::<Vec<Arc<Field>>>()
}

pub fn update_field_type_in_schema(
inferred_schema: Arc<Schema>,
existing_schema: Option<&HashMap<String, Arc<Field>>>,
Expand All @@ -230,10 +212,6 @@ pub fn update_field_type_in_schema(
) -> Arc<Schema> {
let mut updated_schema = inferred_schema.clone();

// All number fields from inferred schema are forced into Float64
updated_schema = Arc::new(Schema::new(override_num_fields_from_schema(
updated_schema.fields().to_vec(),
)));
if let Some(existing_schema) = existing_schema {
let existing_fields = get_existing_fields(inferred_schema.clone(), Some(existing_schema));
let existing_timestamp_fields = get_existing_timestamp_fields(existing_schema);
Expand Down
61 changes: 33 additions & 28 deletions src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ mod tests {
use std::{collections::HashMap, sync::Arc};

use actix_web::test::TestRequest;
use arrow_array::{ArrayRef, Float64Array, StringArray};
use arrow_array::{ArrayRef, Float64Array, Int64Array, StringArray};
use arrow_schema::{DataType, Field};
use serde_json::json;

Expand All @@ -283,11 +283,16 @@ mod tests {
};

trait TestExt {
fn as_int64_arr(&self) -> &Int64Array;
fn as_float64_arr(&self) -> &Float64Array;
fn as_utf8_arr(&self) -> &StringArray;
}

impl TestExt for ArrayRef {
fn as_int64_arr(&self) -> &Int64Array {
self.as_any().downcast_ref().unwrap()
}

fn as_float64_arr(&self) -> &Float64Array {
self.as_any().downcast_ref().unwrap()
}
Expand Down Expand Up @@ -319,8 +324,8 @@ mod tests {
assert_eq!(rb.num_rows(), 1);
assert_eq!(rb.num_columns(), 6);
assert_eq!(
rb.column_by_name("a").unwrap().as_float64_arr(),
&Float64Array::from_iter([1.0])
rb.column_by_name("a").unwrap().as_int64_arr(),
&Int64Array::from_iter([1])
);
assert_eq!(
rb.column_by_name("b").unwrap().as_utf8_arr(),
Expand Down Expand Up @@ -359,8 +364,8 @@ mod tests {
assert_eq!(rb.num_rows(), 1);
assert_eq!(rb.num_columns(), 5);
assert_eq!(
rb.column_by_name("a").unwrap().as_float64_arr(),
&Float64Array::from_iter([1.0])
rb.column_by_name("a").unwrap().as_int64_arr(),
&Int64Array::from_iter([1])
);
assert_eq!(
rb.column_by_name("b").unwrap().as_utf8_arr(),
Expand All @@ -377,7 +382,7 @@ mod tests {

let schema = fields_to_map(
[
Field::new("a", DataType::Float64, true),
Field::new("a", DataType::Int64, true),
Field::new("b", DataType::Utf8, true),
Field::new("c", DataType::Float64, true),
]
Expand All @@ -391,8 +396,8 @@ mod tests {
assert_eq!(rb.num_rows(), 1);
assert_eq!(rb.num_columns(), 5);
assert_eq!(
rb.column_by_name("a").unwrap().as_float64_arr(),
&Float64Array::from_iter([1.0])
rb.column_by_name("a").unwrap().as_int64_arr(),
&Int64Array::from_iter([1])
);
assert_eq!(
rb.column_by_name("b").unwrap().as_utf8_arr(),
Expand All @@ -409,7 +414,7 @@ mod tests {

let schema = fields_to_map(
[
Field::new("a", DataType::Float64, true),
Field::new("a", DataType::Int64, true),
Field::new("b", DataType::Utf8, true),
Field::new("c", DataType::Float64, true),
]
Expand Down Expand Up @@ -479,21 +484,21 @@ mod tests {
let schema = rb.schema();
let fields = &schema.fields;

assert_eq!(&*fields[1], &Field::new("a", DataType::Float64, true));
assert_eq!(&*fields[1], &Field::new("a", DataType::Int64, true));
assert_eq!(&*fields[2], &Field::new("b", DataType::Utf8, true));
assert_eq!(&*fields[3], &Field::new("c", DataType::Float64, true));
assert_eq!(&*fields[3], &Field::new("c", DataType::Int64, true));

assert_eq!(
rb.column_by_name("a").unwrap().as_float64_arr(),
&Float64Array::from(vec![None, Some(1.0), Some(1.0)])
rb.column_by_name("a").unwrap().as_int64_arr(),
&Int64Array::from(vec![None, Some(1), Some(1)])
);
assert_eq!(
rb.column_by_name("b").unwrap().as_utf8_arr(),
&StringArray::from(vec![Some("hello"), Some("hello"), Some("hello"),])
);
assert_eq!(
rb.column_by_name("c").unwrap().as_float64_arr(),
&Float64Array::from(vec![None, Some(1.0), None])
rb.column_by_name("c").unwrap().as_int64_arr(),
&Int64Array::from(vec![None, Some(1), None])
);
}

Expand Down Expand Up @@ -524,8 +529,8 @@ mod tests {
assert_eq!(rb.num_rows(), 3);
assert_eq!(rb.num_columns(), 6);
assert_eq!(
rb.column_by_name("a").unwrap().as_float64_arr(),
&Float64Array::from(vec![None, Some(1.0), Some(1.0)])
rb.column_by_name("a").unwrap().as_int64_arr(),
&Int64Array::from(vec![None, Some(1), Some(1)])
);
assert_eq!(
rb.column_by_name("b").unwrap().as_utf8_arr(),
Expand Down Expand Up @@ -559,7 +564,7 @@ mod tests {

let schema = fields_to_map(
[
Field::new("a", DataType::Float64, true),
Field::new("a", DataType::Int64, true),
Field::new("b", DataType::Utf8, true),
Field::new("c", DataType::Float64, true),
]
Expand All @@ -572,8 +577,8 @@ mod tests {
assert_eq!(rb.num_rows(), 3);
assert_eq!(rb.num_columns(), 6);
assert_eq!(
rb.column_by_name("a").unwrap().as_float64_arr(),
&Float64Array::from(vec![None, Some(1.0), Some(1.0)])
rb.column_by_name("a").unwrap().as_int64_arr(),
&Int64Array::from(vec![None, Some(1), Some(1)])
);
assert_eq!(
rb.column_by_name("b").unwrap().as_utf8_arr(),
Expand All @@ -599,7 +604,7 @@ mod tests {
"c": 1
},
{
"a": "1",
"a": 1,
"b": "hello",
"c": null
},
Expand All @@ -609,7 +614,7 @@ mod tests {

let schema = fields_to_map(
[
Field::new("a", DataType::Float64, true),
Field::new("a", DataType::Int64, true),
Field::new("b", DataType::Utf8, true),
Field::new("c", DataType::Float64, true),
]
Expand Down Expand Up @@ -649,8 +654,8 @@ mod tests {
assert_eq!(rb.num_rows(), 4);
assert_eq!(rb.num_columns(), 7);
assert_eq!(
rb.column_by_name("a").unwrap().as_float64_arr(),
&Float64Array::from(vec![Some(1.0), Some(1.0), Some(1.0), Some(1.0)])
rb.column_by_name("a").unwrap().as_int64_arr(),
&Int64Array::from(vec![Some(1), Some(1), Some(1), Some(1)])
);
assert_eq!(
rb.column_by_name("b").unwrap().as_utf8_arr(),
Expand All @@ -663,13 +668,13 @@ mod tests {
);

assert_eq!(
rb.column_by_name("c_a").unwrap().as_float64_arr(),
&Float64Array::from(vec![None, None, Some(1.0), Some(1.0)])
rb.column_by_name("c_a").unwrap().as_int64_arr(),
&Int64Array::from(vec![None, None, Some(1), Some(1)])
);

assert_eq!(
rb.column_by_name("c_b").unwrap().as_float64_arr(),
&Float64Array::from(vec![None, None, None, Some(2.0)])
rb.column_by_name("c_b").unwrap().as_int64_arr(),
&Int64Array::from(vec![None, None, None, Some(2)])
);
}
}
14 changes: 0 additions & 14 deletions src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,20 +164,6 @@ impl StreamInfo {
Ok(Arc::new(schema))
}

/// update the schema in the metadata
pub fn set_schema(
&self,
stream_name: &str,
schema: HashMap<String, Arc<Field>>,
) -> Result<(), MetadataError> {
let mut map = self.write().expect(LOCK_EXPECT);
map.get_mut(stream_name)
.ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string()))
.map(|metadata| {
metadata.schema = schema;
})
}

pub fn set_alert(&self, stream_name: &str, alerts: Alerts) -> Result<(), MetadataError> {
let mut map = self.write().expect(LOCK_EXPECT);
map.get_mut(stream_name)
Expand Down
18 changes: 2 additions & 16 deletions src/storage/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use super::{
SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY,
};

use crate::event::format::override_num_fields_from_schema;
use crate::handlers::http::modal::ingest_server::INGESTOR_META;
use crate::handlers::http::users::{DASHBOARDS_DIR, FILTER_DIR, USERS_ROOT_DIR};
use crate::metrics::{EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_STORAGE_SIZE};
Expand All @@ -40,7 +39,7 @@ use crate::{
};

use actix_web_prometheus::PrometheusMetrics;
use arrow_schema::{Field, Schema};
use arrow_schema::Schema;
use async_trait::async_trait;
use bytes::Bytes;
use chrono::Local;
Expand Down Expand Up @@ -633,21 +632,8 @@ pub async fn commit_schema_to_storage(
schema: Schema,
) -> Result<(), ObjectStorageError> {
let storage = CONFIG.storage().get_object_store();
let mut stream_schema = storage.get_schema(stream_name).await?;
// override the data type of all numeric fields to Float64
//if data type is not Float64 already
stream_schema = Schema::new(override_num_fields_from_schema(
stream_schema.fields().iter().cloned().collect(),
));
let stream_schema = storage.get_schema(stream_name).await?;
let new_schema = Schema::try_merge(vec![schema, stream_schema]).unwrap();

//update the merged schema in the metadata and storage
let schema_map: HashMap<String, Arc<Field>> = new_schema
.fields()
.iter()
.map(|field| (field.name().clone(), Arc::clone(field)))
.collect();
let _ = STREAM_INFO.set_schema(stream_name, schema_map);
storage.put_schema(stream_name, &new_schema).await
}

Expand Down

0 comments on commit 757c598

Please sign in to comment.