diff --git a/server/src/alerts/mod.rs b/server/src/alerts/mod.rs index 587bad773..62a40d496 100644 --- a/server/src/alerts/mod.rs +++ b/server/src/alerts/mod.rs @@ -134,15 +134,18 @@ pub struct Message { } impl Message { - // checks if message (with a column name) is valid (i.e. the column name is present in the schema) + /// checks if message (with a column name) is valid (i.e. the column name is present in the schema) pub fn valid(&self, schema: &Schema, column: &str) -> bool { return get_field(&schema.fields, column).is_some(); } pub fn extract_column_names(&self) -> Vec<&str> { + lazy_static::lazy_static! { + static ref REGEX: Regex = Regex::new(r"\{(.*?)\}").unwrap(); + } + // the message can have either no column name ({column_name} not present) or any number of {column_name} present - Regex::new(r"\{(.*?)\}") - .unwrap() + REGEX .captures_iter(self.message.as_str()) .map(|cap| cap.get(1).unwrap().as_str()) .collect() @@ -156,8 +159,7 @@ impl Message { let arr = cast(value, &DataType::Utf8).unwrap(); let value = as_string_array(&arr).value(0); - replace_message = - replace_message.replace(&format!("{{{column}}}"), value.to_string().as_str()); + replace_message = replace_message.replace(&format!("{{{column}}}"), value); } } replace_message @@ -255,20 +257,15 @@ impl DeploymentInfo { } } -#[derive(Debug, PartialEq, Eq, Clone, Copy)] +#[derive(Debug, PartialEq, Eq, Clone, Copy, Default)] pub enum AlertState { + #[default] Listening, SetToFiring, Firing, Resolved, } -impl Default for AlertState { - fn default() -> Self { - Self::Listening - } -} - impl fmt::Display for AlertState { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match *self { diff --git a/server/src/alerts/rule.rs b/server/src/alerts/rule.rs index dc9c19964..22c40bd2e 100644 --- a/server/src/alerts/rule.rs +++ b/server/src/alerts/rule.rs @@ -219,10 +219,6 @@ impl ConsecutiveStringRule { } } -fn one() -> u32 { - 1 -} - #[derive(Debug, serde::Serialize, serde::Deserialize, PartialEq, Eq)] #[serde(rename_all = "camelCase")] pub enum CompositeRule { @@ -334,6 +330,9 @@ impl fmt::Display for CompositeRule { } } +const fn one() -> u32 { + 1 +} #[derive(Debug, serde::Serialize, serde::Deserialize)] pub struct ConsecutiveRepeatState { #[serde(default = "one")] diff --git a/server/src/catalog/column.rs b/server/src/catalog/column.rs index e689bd868..e6bba717d 100644 --- a/server/src/catalog/column.rs +++ b/server/src/catalog/column.rs @@ -84,7 +84,7 @@ impl TypedStatistics { max: max(this.max, other.max), }) } - _ => panic!("Cannot update wrong types"), + _ => panic!("Cannot update incompatible types"), } } diff --git a/server/src/catalog/manifest.rs b/server/src/catalog/manifest.rs index ad5b32422..6cde703bb 100644 --- a/server/src/catalog/manifest.rs +++ b/server/src/catalog/manifest.rs @@ -76,14 +76,14 @@ impl Default for Manifest { impl Manifest { pub fn apply_change(&mut self, change: File) { - if let Some(pos) = self + if let Some(matched) = self .files - .iter() - .position(|file| file.file_path == change.file_path) + .iter_mut() + .find(|file| file.file_path == change.file_path) { - self.files[pos] = change + *matched = change; } else { - self.files.push(change) + self.files.push(change); } } } diff --git a/server/src/event/format/json.rs b/server/src/event/format/json.rs index 82cd9e3aa..9b4247486 100644 --- a/server/src/event/format/json.rs +++ b/server/src/event/format/json.rs @@ -17,8 +17,6 @@ * */ -#![allow(deprecated)] - use anyhow::anyhow; use arrow_array::RecordBatch; use arrow_json::reader::{infer_json_schema_from_iterator, ReaderBuilder}; @@ -127,8 +125,9 @@ impl EventFormat for Event { } } -// Returns arrow schema with the fields that are present in the request body -// This schema is an input to convert the request body to arrow record batch +/// Returns arrow schema with the fields that are present in the request body +/// +/// This schema is an input to convert the request body to arrow record batch fn derive_arrow_schema( schema: &HashMap>, fields: Vec<&str>, @@ -162,18 +161,13 @@ fn collect_keys<'a>(values: impl Iterator) -> Result], body: &Value) -> bool { - for (name, val) in body.as_object().expect("body is of object variant") { - if val.is_null() { - continue; - } - let Some(field) = get_field(schema, name) else { - return true; - }; - if !valid_type(field.data_type(), val) { - return true; - } - } - false + body.as_object() + .expect("body is not an object") + .iter() + .filter(|(_, v)| !v.is_null()) + .any(|(name, val)| { + get_field(schema, name).map_or(true, |field| !valid_type(field.data_type(), val)) + }) } fn valid_type(data_type: &DataType, value: &Value) -> bool { @@ -185,40 +179,27 @@ fn valid_type(data_type: &DataType, value: &Value) -> bool { DataType::Utf8 => value.is_string(), DataType::List(field) => { let data_type = field.data_type(); - if let Value::Array(arr) = value { - for elem in arr { - if elem.is_null() { - continue; - } - if !valid_type(data_type, elem) { - return false; - } - } - } - true + value.as_array().map_or(true, |arr| { + arr.iter() + .filter(|v| !v.is_null()) + .all(|v| valid_type(data_type, v)) + }) } DataType::Struct(fields) => { - if let Value::Object(val) = value { - for (key, value) in val { - let field = (0..fields.len()) - .find(|idx| fields[*idx].name() == key) - .map(|idx| &fields[idx]); - - if let Some(field) = field { - if value.is_null() { - continue; - } - if !valid_type(field.data_type(), value) { - return false; - } - } else { - return false; - } - } - true - } else { - false - } + let Value::Object(val) = value else { + return false; + }; + let fields_map = fields + .iter() + .map(|field| (field.name(), field)) + .collect::>(); + + val.iter().filter(|(_, v)| !v.is_null()).all(|(key, val)| { + fields_map + .get(key) + .map(|field| valid_type(field.data_type(), val)) + .unwrap_or_default() + }) } DataType::Timestamp(_, _) => value.is_string() || value.is_number(), _ => { diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index 9e6203018..e017ec3a2 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -50,12 +50,13 @@ use std::sync::Arc; // ingests events by extracting stream name from header // creates if stream does not exist pub async fn ingest(req: HttpRequest, body: Bytes) -> Result { - if let Some((_, stream_name)) = req + if let Some(stream_name) = req .headers() .iter() - .find(|&(key, _)| key == STREAM_NAME_HEADER_KEY) + .find(|(key, _)| (*key == STREAM_NAME_HEADER_KEY)) + .and_then(|(_, value)| value.to_str().ok()) + .map(ToString::to_string) { - let stream_name = stream_name.to_str().unwrap().to_owned(); let internal_stream_names = STREAM_INFO.list_internal_streams(); if internal_stream_names.contains(&stream_name) { return Err(PostError::Invalid(anyhow::anyhow!( @@ -148,10 +149,14 @@ async fn flatten_and_push_logs( stream_name: String, ) -> Result<(), PostError> { //flatten logs - if let Some((_, log_source)) = req.headers().iter().find(|&(key, _)| key == LOG_SOURCE_KEY) { + if let Some(log_source) = req + .headers() + .get(LOG_SOURCE_KEY) + .and_then(|v| v.to_str().ok()) + { let mut json: Vec> = Vec::new(); - let log_source: String = log_source.to_str().unwrap().to_owned(); - match log_source.as_str() { + + match log_source { LOG_SOURCE_KINESIS => json = kinesis::flatten_kinesis_logs(&body), LOG_SOURCE_OTEL => { json = otel::flatten_otel_logs(&body); @@ -506,23 +511,23 @@ pub enum PostError { impl actix_web::ResponseError for PostError { fn status_code(&self) -> http::StatusCode { match self { - PostError::SerdeError(_) => StatusCode::BAD_REQUEST, - PostError::Header(_) => StatusCode::BAD_REQUEST, - PostError::Event(_) => StatusCode::INTERNAL_SERVER_ERROR, - PostError::Invalid(_) => StatusCode::BAD_REQUEST, - PostError::CreateStream(CreateStreamError::StreamNameValidation(_)) => { + Self::SerdeError(_) + | Self::Header(_) + | Self::Invalid(_) + | Self::CreateStream(CreateStreamError::StreamNameValidation(_)) => { StatusCode::BAD_REQUEST } - PostError::CreateStream(_) => StatusCode::INTERNAL_SERVER_ERROR, - PostError::MetadataStreamError(_) => StatusCode::INTERNAL_SERVER_ERROR, - PostError::StreamNotFound(_) => StatusCode::NOT_FOUND, - PostError::CustomError(_) => StatusCode::INTERNAL_SERVER_ERROR, - PostError::NetworkError(_) => StatusCode::INTERNAL_SERVER_ERROR, - PostError::ObjectStorageError(_) => StatusCode::INTERNAL_SERVER_ERROR, - PostError::DashboardError(_) => StatusCode::INTERNAL_SERVER_ERROR, - PostError::FiltersError(_) => StatusCode::INTERNAL_SERVER_ERROR, - PostError::CacheError(_) => StatusCode::INTERNAL_SERVER_ERROR, - PostError::StreamError(_) => StatusCode::INTERNAL_SERVER_ERROR, + Self::Event(_) + | Self::CreateStream(_) + | Self::MetadataStreamError(_) + | Self::StreamNotFound(_) + | Self::CustomError(_) + | Self::NetworkError(_) + | Self::ObjectStorageError(_) + | Self::DashboardError(_) + | Self::FiltersError(_) + | Self::CacheError(_) + | Self::StreamError(_) => StatusCode::INTERNAL_SERVER_ERROR, } } diff --git a/server/src/metadata.rs b/server/src/metadata.rs index 21b5e100c..8e398a561 100644 --- a/server/src/metadata.rs +++ b/server/src/metadata.rs @@ -273,37 +273,16 @@ impl StreamInfo { ) { let mut map = self.write().expect(LOCK_EXPECT); let metadata = LogStreamMetadata { - created_at: if created_at.is_empty() { - Local::now().to_rfc3339() - } else { - created_at - }, - time_partition: if time_partition.is_empty() { - None - } else { - Some(time_partition) - }, - time_partition_limit: if time_partition_limit.is_empty() { - None - } else { - Some(time_partition_limit) - }, - custom_partition: if custom_partition.is_empty() { - None - } else { - Some(custom_partition) - }, - static_schema_flag: if static_schema_flag != "true" { - None - } else { - Some(static_schema_flag) - }, - schema: if static_schema.is_empty() { - HashMap::new() - } else { - static_schema - }, - stream_type: Some(stream_type.to_string()), + created_at: (!created_at.is_empty()) + .then_some(created_at) + .unwrap_or_else(|| Local::now().to_rfc3339()), + time_partition: (!time_partition.is_empty()).then_some(time_partition), + time_partition_limit: (!time_partition_limit.is_empty()) + .then_some(time_partition_limit), + custom_partition: (!custom_partition.is_empty()).then_some(custom_partition), + static_schema_flag: static_schema_flag.ne("true").then_some(static_schema_flag), + schema: static_schema, + stream_type: (!stream_type.is_empty()).then(|| stream_type.to_string()), ..Default::default() }; map.insert(stream_name, metadata); @@ -354,19 +333,17 @@ impl StreamInfo { } pub fn list_streams(&self) -> Vec { - self.read() - .expect(LOCK_EXPECT) - .keys() - .map(String::clone) - .collect() + self.read().expect(LOCK_EXPECT).keys().cloned().collect() } pub fn list_internal_streams(&self) -> Vec { self.read() .expect(LOCK_EXPECT) .iter() - .filter(|(_, v)| v.stream_type.clone().unwrap() == StreamType::Internal.to_string()) - .map(|(k, _)| k.clone()) + .filter_map(|(k, v)| { + (v.stream_type.as_deref().unwrap() == StreamType::Internal.to_string()).then_some(k) + }) + .cloned() .collect() } @@ -425,30 +402,32 @@ fn update_schema_from_staging(stream_name: &str, current_schema: Schema) -> Sche pub async fn update_data_type_time_partition( storage: &(impl ObjectStorage + ?Sized), stream_name: &str, - schema: Schema, + mut schema: Schema, meta: ObjectStoreFormat, ) -> anyhow::Result { - let mut schema = schema.clone(); - if meta.time_partition.is_some() { - let time_partition = meta.time_partition.unwrap(); - if let Ok(time_partition_field) = schema.field_with_name(&time_partition) { - if time_partition_field.data_type() != &DataType::Timestamp(TimeUnit::Millisecond, None) - { - let mut fields = schema - .fields() - .iter() - .filter(|field| *field.name() != time_partition) - .cloned() - .collect::>>(); - let time_partition_field = Arc::new(Field::new( - time_partition, - DataType::Timestamp(TimeUnit::Millisecond, None), - true, - )); - fields.push(time_partition_field); - schema = Schema::new(fields); - storage.put_schema(stream_name, &schema).await?; - } + if let Some(time_partition) = meta.time_partition { + if schema + .field_with_name(&time_partition) + .ok() + .filter(|f| f.data_type() != &DataType::Timestamp(TimeUnit::Millisecond, None)) + .is_some() + { + let mut fields = schema + .fields() + .iter() + .filter(|field| *field.name() != time_partition) + .cloned() + .collect::>>(); + + let time_partition_field = Arc::new(Field::new( + time_partition, + DataType::Timestamp(TimeUnit::Millisecond, None), + true, + )); + + fields.push(time_partition_field); + schema = Schema::new(fields); + storage.put_schema(stream_name, &schema).await?; } } Ok(schema) diff --git a/server/src/query.rs b/server/src/query.rs index 4e345148a..ae25fe5c0 100644 --- a/server/src/query.rs +++ b/server/src/query.rs @@ -20,8 +20,7 @@ mod filter_optimizer; mod listing_table_builder; pub mod stream_schema_provider; -use chrono::{DateTime, Utc}; -use chrono::{NaiveDateTime, TimeZone}; +use chrono::{DateTime, NaiveDateTime, Utc}; use datafusion::arrow::record_batch::RecordBatch; use datafusion::common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeVisitor}; @@ -318,27 +317,19 @@ fn path_intersects_query(path: &Path, starttime: DateTime, endtime: DateTim } fn time_from_path(path: &Path) -> DateTime { - let prefix = path + let path_str = path .file_name() - .expect("all given path are file") + .expect("all given paths are files") .to_str() .expect("filename is valid"); - // Next three in order will be date, hour and minute - let mut components = prefix.splitn(3, '.'); + const DATETIME_FORMAT: &str = "date=%Y-%m-%d.hour=%H.minute=%M"; - let date = components.next().expect("date=xxxx-xx-xx"); - let hour = components.next().expect("hour=xx"); - let minute = components.next().expect("minute=xx"); - - let year = date[5..9].parse().unwrap(); - let month = date[10..12].parse().unwrap(); - let day = date[13..15].parse().unwrap(); - let hour = hour[5..7].parse().unwrap(); - let minute = minute[7..9].parse().unwrap(); - - Utc.with_ymd_and_hms(year, month, day, hour, minute, 0) - .unwrap() + // Parse the date-time using the format + NaiveDateTime::parse_and_remainder(path_str, DATETIME_FORMAT) + .expect("valid date-time") + .0 + .and_utc() } /// unused for now might need it later @@ -348,39 +339,32 @@ pub fn flatten_objects_for_count(objects: Vec) -> Vec { return objects; } + let first_object_first_key = objects + .first() + .and_then(Value::as_object) + .and_then(|obj| obj.keys().next()) + .unwrap(); + // check if all the keys start with "COUNT" let flag = objects.iter().all(|obj| { obj.as_object() - .unwrap() - .keys() - .all(|key| key.starts_with("COUNT")) - }) && objects.iter().all(|obj| { - obj.as_object() - .unwrap() - .keys() - .all(|key| key == objects[0].as_object().unwrap().keys().next().unwrap()) + .map(|obj| { + obj.keys() + .all(|key| key.starts_with("COUNT") && key == first_object_first_key) + }) + .unwrap_or_default() }); if flag { - let mut accum = 0u64; - let key = objects[0] - .as_object() - .unwrap() - .keys() - .next() - .unwrap() - .clone(); - - for obj in objects { - let count = obj.as_object().unwrap().keys().fold(0, |acc, key| { - let value = obj.as_object().unwrap().get(key).unwrap().as_u64().unwrap(); - acc + value - }); - accum += count; - } + let accum = objects + .iter() + .filter_map(Value::as_object) + .flat_map(|obj| obj.values()) + .filter_map(Value::as_u64) + .sum::(); vec![json!({ - key: accum + first_object_first_key: accum })] } else { objects