Skip to content

Commit

Permalink
Merge branch 'main' into time-parsing
Browse files Browse the repository at this point in the history
  • Loading branch information
de-sh authored Dec 9, 2024
2 parents 9913dec + acb26b9 commit 811096a
Show file tree
Hide file tree
Showing 36 changed files with 232 additions and 220 deletions.
14 changes: 7 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ hostname = "0.4.0"
http = "0.2.7"
humantime-serde = "1.1"
itertools = "0.13.0"
log = "0.4"
num_cpus = "1.15"
once_cell = "1.17.1"
prometheus = { version = "0.13", features = ["process"] }
Expand Down Expand Up @@ -105,6 +104,7 @@ path-clean = "1.0.1"
prost = "0.13.3"
prometheus-parse = "0.2.5"
sha2 = "0.10.8"
tracing = "0.1.41"

[build-dependencies]
cargo_toml = "0.20.1"
Expand Down
7 changes: 4 additions & 3 deletions src/alerts/target.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use chrono::Utc;
use http::{header::AUTHORIZATION, HeaderMap, HeaderValue};
use humantime_serde::re::humantime;
use reqwest::ClientBuilder;
use tracing::error;

use crate::utils::json;

Expand Down Expand Up @@ -239,7 +240,7 @@ impl CallableTarget for SlackWebHook {
};

if let Err(e) = client.post(&self.endpoint).json(&alert).send().await {
log::error!("Couldn't make call to webhook, error: {}", e)
error!("Couldn't make call to webhook, error: {}", e)
}
}
}
Expand Down Expand Up @@ -277,7 +278,7 @@ impl CallableTarget for OtherWebHook {
.headers((&self.headers).try_into().expect("valid_headers"));

if let Err(e) = request.body(alert).send().await {
log::error!("Couldn't make call to webhook, error: {}", e)
error!("Couldn't make call to webhook, error: {}", e)
}
}
}
Expand Down Expand Up @@ -356,7 +357,7 @@ impl CallableTarget for AlertManager {
};

if let Err(e) = client.post(&self.endpoint).json(&alerts).send().await {
log::error!("Couldn't make call to alertmanager, error: {}", e)
error!("Couldn't make call to alertmanager, error: {}", e)
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions src/analytics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use std::collections::HashMap;
use std::sync::Mutex;
use std::time::Duration;
use sysinfo::System;
use tracing::{error, info};
use ulid::Ulid;

const ANALYTICS_SERVER_URL: &str = "https://analytics.parseable.io:80";
Expand Down Expand Up @@ -291,7 +292,7 @@ async fn build_metrics() -> HashMap<String, Value> {
}

pub fn init_analytics_scheduler() -> anyhow::Result<()> {
log::info!("Setting up schedular for anonymous user analytics");
info!("Setting up schedular for anonymous user analytics");

let mut scheduler = AsyncScheduler::new();
scheduler
Expand All @@ -302,7 +303,7 @@ pub fn init_analytics_scheduler() -> anyhow::Result<()> {
.unwrap_or_else(|err| {
// panicing because seperate thread
// TODO: a better way to handle this
log::error!("Error while sending analytics: {}", err.to_string());
error!("Error while sending analytics: {}", err.to_string());
panic!("{}", err.to_string());
})
.send()
Expand Down
7 changes: 4 additions & 3 deletions src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use bytes::Bytes;
use chrono::{DateTime, Local, NaiveTime, Utc};
use relative_path::RelativePathBuf;
use std::io::Error as IOError;
use tracing::{error, info};
pub mod column;
pub mod manifest;
pub mod snapshot;
Expand Down Expand Up @@ -280,7 +281,7 @@ async fn create_manifest(
};
first_event_at = Some(lower_bound.with_timezone(&Local).to_rfc3339());
if let Err(err) = STREAM_INFO.set_first_event_at(stream_name, first_event_at.clone()) {
log::error!(
error!(
"Failed to update first_event_at in streaminfo for stream {:?} {err:?}",
stream_name
);
Expand Down Expand Up @@ -360,7 +361,7 @@ pub async fn get_first_event(
let manifests = meta_clone.snapshot.manifest_list;
let time_partition = meta_clone.time_partition;
if manifests.is_empty() {
log::info!("No manifest found for stream {stream_name}");
info!("No manifest found for stream {stream_name}");
return Err(ObjectStorageError::Custom("No manifest found".to_string()));
}
let manifest = &manifests[0];
Expand Down Expand Up @@ -400,7 +401,7 @@ pub async fn get_first_event(
handlers::http::cluster::get_ingestor_info()
.await
.map_err(|err| {
log::error!("Fatal: failed to get ingestor info: {:?}", err);
error!("Fatal: failed to get ingestor info: {:?}", err);
ObjectStorageError::from(err)
})?;
let mut ingestors_first_event_at: Vec<String> = Vec::new();
Expand Down
3 changes: 2 additions & 1 deletion src/event/format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use datafusion::arrow::util::bit_util::round_upto_multiple_of_64;
use itertools::Itertools;
use serde_json::Value;
use std::{collections::HashMap, sync::Arc};
use tracing::error;

use super::{EventFormat, Metadata, Tags};
use crate::utils::{arrow::get_field, json::flatten_json_body};
Expand Down Expand Up @@ -225,7 +226,7 @@ fn valid_type(data_type: &DataType, value: &Value) -> bool {
}
DataType::Timestamp(_, _) => value.is_string() || value.is_number(),
_ => {
log::error!("Unsupported datatype {:?}, value {:?}", data_type, value);
error!("Unsupported datatype {:?}, value {:?}", data_type, value);
unreachable!()
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use arrow_array::RecordBatch;
use arrow_schema::{Field, Fields, Schema};
use itertools::Itertools;
use std::sync::Arc;
use tracing::error;

use self::error::EventError;
pub use self::writer::STREAM_WRITERS;
Expand Down Expand Up @@ -93,7 +94,7 @@ impl Event {
.check_alerts(&self.stream_name, &self.rb)
.await
{
log::error!("Error checking for alerts. {:?}", e);
error!("Error checking for alerts. {:?}", e);
}

Ok(())
Expand Down
7 changes: 4 additions & 3 deletions src/handlers/airplane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use serde_json::json;
use std::net::SocketAddr;
use std::time::Instant;
use tonic::codec::CompressionEncoding;
use tracing::{error, info};

use futures_util::{Future, TryFutureExt};

Expand Down Expand Up @@ -136,7 +137,7 @@ impl FlightService for AirServiceImpl {

let ticket = get_query_from_ticket(&req)?;

log::info!("query requested to airplane: {:?}", ticket);
info!("query requested to airplane: {:?}", ticket);

// get the query session_state
let session_state = QUERY_SESSION.state();
Expand All @@ -146,7 +147,7 @@ impl FlightService for AirServiceImpl {
.create_logical_plan(&ticket.query)
.await
.map_err(|err| {
log::error!("Datafusion Error: Failed to create logical plan: {}", err);
error!("Datafusion Error: Failed to create logical plan: {}", err);
Status::internal("Failed to create logical plan")
})?;

Expand Down Expand Up @@ -273,7 +274,7 @@ impl FlightService for AirServiceImpl {
)
.await
{
log::error!("{}", err);
error!("{}", err);
};

/*
Expand Down
Loading

0 comments on commit 811096a

Please sign in to comment.