Skip to content

Commit

Permalink
chore: multiple improvements
Browse files Browse the repository at this point in the history
It's an overall negative PR, meaning there are more lines deleted than
added. Following changes have been made:

- remove a few `unwrap()`, partly covers #933
- replace code with idiomatic version of it
- rewrite `time_from_path` to make it simpler + robust
- rewrite `flatten_objects_for_count` reducing loops and simplify the
  accumulator inside it
- make the Regex in `Message::extract_column_names` static
- correct a few panic messages
  • Loading branch information
ByteBaker committed Sep 23, 2024
1 parent a5468b0 commit 08cd2be
Show file tree
Hide file tree
Showing 8 changed files with 139 additions and 194 deletions.
21 changes: 9 additions & 12 deletions server/src/alerts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,15 +134,18 @@ pub struct Message {
}

impl Message {
// checks if message (with a column name) is valid (i.e. the column name is present in the schema)
/// checks if message (with a column name) is valid (i.e. the column name is present in the schema)
pub fn valid(&self, schema: &Schema, column: &str) -> bool {
return get_field(&schema.fields, column).is_some();
}

pub fn extract_column_names(&self) -> Vec<&str> {
lazy_static::lazy_static! {
static ref REGEX: Regex = Regex::new(r"\{(.*?)\}").unwrap();
}

// the message can have either no column name ({column_name} not present) or any number of {column_name} present
Regex::new(r"\{(.*?)\}")
.unwrap()
REGEX
.captures_iter(self.message.as_str())
.map(|cap| cap.get(1).unwrap().as_str())
.collect()
Expand All @@ -156,8 +159,7 @@ impl Message {
let arr = cast(value, &DataType::Utf8).unwrap();
let value = as_string_array(&arr).value(0);

replace_message =
replace_message.replace(&format!("{{{column}}}"), value.to_string().as_str());
replace_message = replace_message.replace(&format!("{{{column}}}"), value);
}
}
replace_message
Expand Down Expand Up @@ -255,20 +257,15 @@ impl DeploymentInfo {
}
}

#[derive(Debug, PartialEq, Eq, Clone, Copy)]
#[derive(Debug, PartialEq, Eq, Clone, Copy, Default)]
pub enum AlertState {
#[default]
Listening,
SetToFiring,
Firing,
Resolved,
}

impl Default for AlertState {
fn default() -> Self {
Self::Listening
}
}

impl fmt::Display for AlertState {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
Expand Down
7 changes: 3 additions & 4 deletions server/src/alerts/rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,10 +219,6 @@ impl ConsecutiveStringRule {
}
}

fn one() -> u32 {
1
}

#[derive(Debug, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub enum CompositeRule {
Expand Down Expand Up @@ -334,6 +330,9 @@ impl fmt::Display for CompositeRule {
}
}

const fn one() -> u32 {
1
}
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct ConsecutiveRepeatState {
#[serde(default = "one")]
Expand Down
2 changes: 1 addition & 1 deletion server/src/catalog/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ impl TypedStatistics {
max: max(this.max, other.max),
})
}
_ => panic!("Cannot update wrong types"),
_ => panic!("Cannot update incompatible types"),
}
}

Expand Down
10 changes: 5 additions & 5 deletions server/src/catalog/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,14 @@ impl Default for Manifest {

impl Manifest {
pub fn apply_change(&mut self, change: File) {
if let Some(pos) = self
if let Some(matched) = self
.files
.iter()
.position(|file| file.file_path == change.file_path)
.iter_mut()
.find(|file| file.file_path == change.file_path)
{
self.files[pos] = change
*matched = change;
} else {
self.files.push(change)
self.files.push(change);
}
}
}
Expand Down
77 changes: 29 additions & 48 deletions server/src/event/format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
*
*/

#![allow(deprecated)]

use anyhow::anyhow;
use arrow_array::RecordBatch;
use arrow_json::reader::{infer_json_schema_from_iterator, ReaderBuilder};
Expand Down Expand Up @@ -127,8 +125,9 @@ impl EventFormat for Event {
}
}

// Returns arrow schema with the fields that are present in the request body
// This schema is an input to convert the request body to arrow record batch
/// Returns arrow schema with the fields that are present in the request body
///
/// This schema is an input to convert the request body to arrow record batch
fn derive_arrow_schema(
schema: &HashMap<String, Arc<Field>>,
fields: Vec<&str>,
Expand Down Expand Up @@ -162,18 +161,13 @@ fn collect_keys<'a>(values: impl Iterator<Item = &'a Value>) -> Result<Vec<&'a s
}

fn fields_mismatch(schema: &[Arc<Field>], body: &Value) -> bool {
for (name, val) in body.as_object().expect("body is of object variant") {
if val.is_null() {
continue;
}
let Some(field) = get_field(schema, name) else {
return true;
};
if !valid_type(field.data_type(), val) {
return true;
}
}
false
body.as_object()
.expect("body is not an object")
.iter()
.filter(|(_, v)| !v.is_null())
.any(|(name, val)| {
get_field(schema, name).map_or(true, |field| !valid_type(field.data_type(), val))
})
}

fn valid_type(data_type: &DataType, value: &Value) -> bool {
Expand All @@ -185,40 +179,27 @@ fn valid_type(data_type: &DataType, value: &Value) -> bool {
DataType::Utf8 => value.is_string(),
DataType::List(field) => {
let data_type = field.data_type();
if let Value::Array(arr) = value {
for elem in arr {
if elem.is_null() {
continue;
}
if !valid_type(data_type, elem) {
return false;
}
}
}
true
value.as_array().map_or(true, |arr| {
arr.iter()
.filter(|v| !v.is_null())
.all(|v| valid_type(data_type, v))
})
}
DataType::Struct(fields) => {
if let Value::Object(val) = value {
for (key, value) in val {
let field = (0..fields.len())
.find(|idx| fields[*idx].name() == key)
.map(|idx| &fields[idx]);

if let Some(field) = field {
if value.is_null() {
continue;
}
if !valid_type(field.data_type(), value) {
return false;
}
} else {
return false;
}
}
true
} else {
false
}
let Value::Object(val) = value else {
return false;
};
let fields_map = fields
.iter()
.map(|field| (field.name(), field))
.collect::<HashMap<_, _>>();

val.iter().filter(|(_, v)| !v.is_null()).all(|(key, val)| {
fields_map
.get(key)
.map(|field| valid_type(field.data_type(), val))
.unwrap_or_default()
})
}
DataType::Timestamp(_, _) => value.is_string() || value.is_number(),
_ => {
Expand Down
47 changes: 26 additions & 21 deletions server/src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,13 @@ use std::sync::Arc;
// ingests events by extracting stream name from header
// creates if stream does not exist
pub async fn ingest(req: HttpRequest, body: Bytes) -> Result<HttpResponse, PostError> {
if let Some((_, stream_name)) = req
if let Some(stream_name) = req
.headers()
.iter()
.find(|&(key, _)| key == STREAM_NAME_HEADER_KEY)
.find(|(key, _)| (*key == STREAM_NAME_HEADER_KEY))
.and_then(|(_, value)| value.to_str().ok())
.map(ToString::to_string)
{
let stream_name = stream_name.to_str().unwrap().to_owned();
let internal_stream_names = STREAM_INFO.list_internal_streams();
if internal_stream_names.contains(&stream_name) {
return Err(PostError::Invalid(anyhow::anyhow!(
Expand Down Expand Up @@ -148,10 +149,14 @@ async fn flatten_and_push_logs(
stream_name: String,
) -> Result<(), PostError> {
//flatten logs
if let Some((_, log_source)) = req.headers().iter().find(|&(key, _)| key == LOG_SOURCE_KEY) {
if let Some(log_source) = req
.headers()
.get(LOG_SOURCE_KEY)
.and_then(|v| v.to_str().ok())
{
let mut json: Vec<BTreeMap<String, Value>> = Vec::new();
let log_source: String = log_source.to_str().unwrap().to_owned();
match log_source.as_str() {

match log_source {
LOG_SOURCE_KINESIS => json = kinesis::flatten_kinesis_logs(&body),
LOG_SOURCE_OTEL => {
json = otel::flatten_otel_logs(&body);
Expand Down Expand Up @@ -506,23 +511,23 @@ pub enum PostError {
impl actix_web::ResponseError for PostError {
fn status_code(&self) -> http::StatusCode {
match self {
PostError::SerdeError(_) => StatusCode::BAD_REQUEST,
PostError::Header(_) => StatusCode::BAD_REQUEST,
PostError::Event(_) => StatusCode::INTERNAL_SERVER_ERROR,
PostError::Invalid(_) => StatusCode::BAD_REQUEST,
PostError::CreateStream(CreateStreamError::StreamNameValidation(_)) => {
Self::SerdeError(_)
| Self::Header(_)
| Self::Invalid(_)
| Self::CreateStream(CreateStreamError::StreamNameValidation(_)) => {
StatusCode::BAD_REQUEST
}
PostError::CreateStream(_) => StatusCode::INTERNAL_SERVER_ERROR,
PostError::MetadataStreamError(_) => StatusCode::INTERNAL_SERVER_ERROR,
PostError::StreamNotFound(_) => StatusCode::NOT_FOUND,
PostError::CustomError(_) => StatusCode::INTERNAL_SERVER_ERROR,
PostError::NetworkError(_) => StatusCode::INTERNAL_SERVER_ERROR,
PostError::ObjectStorageError(_) => StatusCode::INTERNAL_SERVER_ERROR,
PostError::DashboardError(_) => StatusCode::INTERNAL_SERVER_ERROR,
PostError::FiltersError(_) => StatusCode::INTERNAL_SERVER_ERROR,
PostError::CacheError(_) => StatusCode::INTERNAL_SERVER_ERROR,
PostError::StreamError(_) => StatusCode::INTERNAL_SERVER_ERROR,
Self::Event(_)
| Self::CreateStream(_)
| Self::MetadataStreamError(_)
| Self::StreamNotFound(_)
| Self::CustomError(_)
| Self::NetworkError(_)
| Self::ObjectStorageError(_)
| Self::DashboardError(_)
| Self::FiltersError(_)
| Self::CacheError(_)
| Self::StreamError(_) => StatusCode::INTERNAL_SERVER_ERROR,
}
}

Expand Down
Loading

0 comments on commit 08cd2be

Please sign in to comment.