diff --git a/Cargo.lock b/Cargo.lock index f10097210..1b98329a3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3133,7 +3133,6 @@ dependencies = [ "humantime-serde", "itertools 0.13.0", "lazy_static", - "log", "maplit", "mime", "nom", @@ -3169,6 +3168,7 @@ dependencies = [ "tonic", "tonic-web", "tower-http 0.6.1", + "tracing", "tracing-subscriber", "ulid", "uptime_lib", @@ -4762,9 +4762,9 @@ checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" [[package]] name = "tracing" -version = "0.1.40" +version = "0.1.41" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" +checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0" dependencies = [ "log", "pin-project-lite", @@ -4774,9 +4774,9 @@ dependencies = [ [[package]] name = "tracing-attributes" -version = "0.1.27" +version = "0.1.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" +checksum = "395ae124c09f9e6918a2310af6038fba074bcf474ac352496d5910dd59a2226d" dependencies = [ "proc-macro2", "quote", @@ -4785,9 +4785,9 @@ dependencies = [ [[package]] name = "tracing-core" -version = "0.1.32" +version = "0.1.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" +checksum = "e672c95779cf947c5311f83787af4fa8fffd12fb27e4993211a84bdfd9610f9c" dependencies = [ "once_cell", "valuable", diff --git a/Cargo.toml b/Cargo.toml index 721b183b1..455321afe 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -62,7 +62,6 @@ hostname = "0.4.0" http = "0.2.7" humantime-serde = "1.1" itertools = "0.13.0" -log = "0.4" num_cpus = "1.15" once_cell = "1.17.1" prometheus = { version = "0.13", features = ["process"] } @@ -105,6 +104,7 @@ path-clean = "1.0.1" prost = "0.13.3" prometheus-parse = "0.2.5" sha2 = "0.10.8" +tracing = "0.1.41" [build-dependencies] cargo_toml = "0.20.1" diff --git a/src/alerts/target.rs b/src/alerts/target.rs index f0ee1c54b..c7e2c7586 100644 --- a/src/alerts/target.rs +++ b/src/alerts/target.rs @@ -28,6 +28,7 @@ use chrono::Utc; use http::{header::AUTHORIZATION, HeaderMap, HeaderValue}; use humantime_serde::re::humantime; use reqwest::ClientBuilder; +use tracing::error; use crate::utils::json; @@ -239,7 +240,7 @@ impl CallableTarget for SlackWebHook { }; if let Err(e) = client.post(&self.endpoint).json(&alert).send().await { - log::error!("Couldn't make call to webhook, error: {}", e) + error!("Couldn't make call to webhook, error: {}", e) } } } @@ -277,7 +278,7 @@ impl CallableTarget for OtherWebHook { .headers((&self.headers).try_into().expect("valid_headers")); if let Err(e) = request.body(alert).send().await { - log::error!("Couldn't make call to webhook, error: {}", e) + error!("Couldn't make call to webhook, error: {}", e) } } } @@ -356,7 +357,7 @@ impl CallableTarget for AlertManager { }; if let Err(e) = client.post(&self.endpoint).json(&alerts).send().await { - log::error!("Couldn't make call to alertmanager, error: {}", e) + error!("Couldn't make call to alertmanager, error: {}", e) } } } diff --git a/src/analytics.rs b/src/analytics.rs index 9e6fca098..fad0dca74 100644 --- a/src/analytics.rs +++ b/src/analytics.rs @@ -37,6 +37,7 @@ use std::collections::HashMap; use std::sync::Mutex; use std::time::Duration; use sysinfo::System; +use tracing::{error, info}; use ulid::Ulid; const ANALYTICS_SERVER_URL: &str = "https://analytics.parseable.io:80"; @@ -291,7 +292,7 @@ async fn build_metrics() -> HashMap { } pub fn init_analytics_scheduler() -> anyhow::Result<()> { - log::info!("Setting up schedular for anonymous user analytics"); + info!("Setting up schedular for anonymous user analytics"); let mut scheduler = AsyncScheduler::new(); scheduler @@ -302,7 +303,7 @@ pub fn init_analytics_scheduler() -> anyhow::Result<()> { .unwrap_or_else(|err| { // panicing because seperate thread // TODO: a better way to handle this - log::error!("Error while sending analytics: {}", err.to_string()); + error!("Error while sending analytics: {}", err.to_string()); panic!("{}", err.to_string()); }) .send() diff --git a/src/catalog/mod.rs b/src/catalog/mod.rs index 5c502ac89..8545841e0 100644 --- a/src/catalog/mod.rs +++ b/src/catalog/mod.rs @@ -37,6 +37,7 @@ use bytes::Bytes; use chrono::{DateTime, Local, NaiveTime, Utc}; use relative_path::RelativePathBuf; use std::io::Error as IOError; +use tracing::{error, info}; pub mod column; pub mod manifest; pub mod snapshot; @@ -280,7 +281,7 @@ async fn create_manifest( }; first_event_at = Some(lower_bound.with_timezone(&Local).to_rfc3339()); if let Err(err) = STREAM_INFO.set_first_event_at(stream_name, first_event_at.clone()) { - log::error!( + error!( "Failed to update first_event_at in streaminfo for stream {:?} {err:?}", stream_name ); @@ -360,7 +361,7 @@ pub async fn get_first_event( let manifests = meta_clone.snapshot.manifest_list; let time_partition = meta_clone.time_partition; if manifests.is_empty() { - log::info!("No manifest found for stream {stream_name}"); + info!("No manifest found for stream {stream_name}"); return Err(ObjectStorageError::Custom("No manifest found".to_string())); } let manifest = &manifests[0]; @@ -400,7 +401,7 @@ pub async fn get_first_event( handlers::http::cluster::get_ingestor_info() .await .map_err(|err| { - log::error!("Fatal: failed to get ingestor info: {:?}", err); + error!("Fatal: failed to get ingestor info: {:?}", err); ObjectStorageError::from(err) })?; let mut ingestors_first_event_at: Vec = Vec::new(); diff --git a/src/event/format/json.rs b/src/event/format/json.rs index abb823c46..487cb58a6 100644 --- a/src/event/format/json.rs +++ b/src/event/format/json.rs @@ -27,6 +27,7 @@ use datafusion::arrow::util::bit_util::round_upto_multiple_of_64; use itertools::Itertools; use serde_json::Value; use std::{collections::HashMap, sync::Arc}; +use tracing::error; use super::{EventFormat, Metadata, Tags}; use crate::utils::{arrow::get_field, json::flatten_json_body}; @@ -225,7 +226,7 @@ fn valid_type(data_type: &DataType, value: &Value) -> bool { } DataType::Timestamp(_, _) => value.is_string() || value.is_number(), _ => { - log::error!("Unsupported datatype {:?}, value {:?}", data_type, value); + error!("Unsupported datatype {:?}, value {:?}", data_type, value); unreachable!() } } diff --git a/src/event/mod.rs b/src/event/mod.rs index eda5dd889..42773ed12 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -24,6 +24,7 @@ use arrow_array::RecordBatch; use arrow_schema::{Field, Fields, Schema}; use itertools::Itertools; use std::sync::Arc; +use tracing::error; use self::error::EventError; pub use self::writer::STREAM_WRITERS; @@ -93,7 +94,7 @@ impl Event { .check_alerts(&self.stream_name, &self.rb) .await { - log::error!("Error checking for alerts. {:?}", e); + error!("Error checking for alerts. {:?}", e); } Ok(()) diff --git a/src/handlers/airplane.rs b/src/handlers/airplane.rs index 350de26d4..99b71edf0 100644 --- a/src/handlers/airplane.rs +++ b/src/handlers/airplane.rs @@ -26,6 +26,7 @@ use serde_json::json; use std::net::SocketAddr; use std::time::Instant; use tonic::codec::CompressionEncoding; +use tracing::{error, info}; use futures_util::{Future, TryFutureExt}; @@ -136,7 +137,7 @@ impl FlightService for AirServiceImpl { let ticket = get_query_from_ticket(&req)?; - log::info!("query requested to airplane: {:?}", ticket); + info!("query requested to airplane: {:?}", ticket); // get the query session_state let session_state = QUERY_SESSION.state(); @@ -146,7 +147,7 @@ impl FlightService for AirServiceImpl { .create_logical_plan(&ticket.query) .await .map_err(|err| { - log::error!("Datafusion Error: Failed to create logical plan: {}", err); + error!("Datafusion Error: Failed to create logical plan: {}", err); Status::internal("Failed to create logical plan") })?; @@ -273,7 +274,7 @@ impl FlightService for AirServiceImpl { ) .await { - log::error!("{}", err); + error!("{}", err); }; /* diff --git a/src/handlers/http/cluster/mod.rs b/src/handlers/http/cluster/mod.rs index da5908dbc..ff520af9a 100644 --- a/src/handlers/http/cluster/mod.rs +++ b/src/handlers/http/cluster/mod.rs @@ -43,6 +43,7 @@ use relative_path::RelativePathBuf; use serde::de::Error; use serde_json::error::Error as SerdeError; use serde_json::{to_vec, Value as JsonValue}; +use tracing::{error, info, warn}; use url::Url; type IngestorMetadataArr = Vec; @@ -71,7 +72,7 @@ pub async fn sync_streams_with_ingestors( reqwest_headers.insert(key.clone(), value.clone()); } let ingestor_infos = get_ingestor_info().await.map_err(|err| { - log::error!("Fatal: failed to get ingestor info: {:?}", err); + error!("Fatal: failed to get ingestor info: {:?}", err); StreamError::Anyhow(err) })?; @@ -79,7 +80,7 @@ pub async fn sync_streams_with_ingestors( for ingestor in ingestor_infos { if !utils::check_liveness(&ingestor.domain_name).await { - log::warn!("Ingestor {} is not live", ingestor.domain_name); + warn!("Ingestor {} is not live", ingestor.domain_name); continue; } let url = format!( @@ -96,16 +97,15 @@ pub async fn sync_streams_with_ingestors( .send() .await .map_err(|err| { - log::error!( + error!( "Fatal: failed to forward upsert stream request to ingestor: {}\n Error: {:?}", - ingestor.domain_name, - err + ingestor.domain_name, err ); StreamError::Network(err) })?; if !res.status().is_success() { - log::error!( + error!( "failed to forward upsert stream request to ingestor: {}\nResponse Returned: {:?}", ingestor.domain_name, res.text().await @@ -122,18 +122,18 @@ pub async fn sync_users_with_roles_with_ingestors( role: &HashSet, ) -> Result<(), RBACError> { let ingestor_infos = get_ingestor_info().await.map_err(|err| { - log::error!("Fatal: failed to get ingestor info: {:?}", err); + error!("Fatal: failed to get ingestor info: {:?}", err); RBACError::Anyhow(err) })?; let client = reqwest::Client::new(); let role = to_vec(&role.clone()).map_err(|err| { - log::error!("Fatal: failed to serialize role: {:?}", err); + error!("Fatal: failed to serialize role: {:?}", err); RBACError::SerdeError(err) })?; for ingestor in ingestor_infos.iter() { if !utils::check_liveness(&ingestor.domain_name).await { - log::warn!("Ingestor {} is not live", ingestor.domain_name); + warn!("Ingestor {} is not live", ingestor.domain_name); continue; } let url = format!( @@ -151,16 +151,15 @@ pub async fn sync_users_with_roles_with_ingestors( .send() .await .map_err(|err| { - log::error!( + error!( "Fatal: failed to forward request to ingestor: {}\n Error: {:?}", - ingestor.domain_name, - err + ingestor.domain_name, err ); RBACError::Network(err) })?; if !res.status().is_success() { - log::error!( + error!( "failed to forward request to ingestor: {}\nResponse Returned: {:?}", ingestor.domain_name, res.text().await @@ -174,14 +173,14 @@ pub async fn sync_users_with_roles_with_ingestors( // forward the delete user request to all ingestors to keep them in sync pub async fn sync_user_deletion_with_ingestors(username: &String) -> Result<(), RBACError> { let ingestor_infos = get_ingestor_info().await.map_err(|err| { - log::error!("Fatal: failed to get ingestor info: {:?}", err); + error!("Fatal: failed to get ingestor info: {:?}", err); RBACError::Anyhow(err) })?; let client = reqwest::Client::new(); for ingestor in ingestor_infos.iter() { if !utils::check_liveness(&ingestor.domain_name).await { - log::warn!("Ingestor {} is not live", ingestor.domain_name); + warn!("Ingestor {} is not live", ingestor.domain_name); continue; } let url = format!( @@ -197,16 +196,15 @@ pub async fn sync_user_deletion_with_ingestors(username: &String) -> Result<(), .send() .await .map_err(|err| { - log::error!( + error!( "Fatal: failed to forward request to ingestor: {}\n Error: {:?}", - ingestor.domain_name, - err + ingestor.domain_name, err ); RBACError::Network(err) })?; if !res.status().is_success() { - log::error!( + error!( "failed to forward request to ingestor: {}\nResponse Returned: {:?}", ingestor.domain_name, res.text().await @@ -223,7 +221,7 @@ pub async fn sync_user_creation_with_ingestors( role: &Option>, ) -> Result<(), RBACError> { let ingestor_infos = get_ingestor_info().await.map_err(|err| { - log::error!("Fatal: failed to get ingestor info: {:?}", err); + error!("Fatal: failed to get ingestor info: {:?}", err); RBACError::Anyhow(err) })?; @@ -236,13 +234,13 @@ pub async fn sync_user_creation_with_ingestors( let client = reqwest::Client::new(); let user = to_vec(&user).map_err(|err| { - log::error!("Fatal: failed to serialize user: {:?}", err); + error!("Fatal: failed to serialize user: {:?}", err); RBACError::SerdeError(err) })?; for ingestor in ingestor_infos.iter() { if !utils::check_liveness(&ingestor.domain_name).await { - log::warn!("Ingestor {} is not live", ingestor.domain_name); + warn!("Ingestor {} is not live", ingestor.domain_name); continue; } let url = format!( @@ -260,16 +258,15 @@ pub async fn sync_user_creation_with_ingestors( .send() .await .map_err(|err| { - log::error!( + error!( "Fatal: failed to forward request to ingestor: {}\n Error: {:?}", - ingestor.domain_name, - err + ingestor.domain_name, err ); RBACError::Network(err) })?; if !res.status().is_success() { - log::error!( + error!( "failed to forward request to ingestor: {}\nResponse Returned: {:?}", ingestor.domain_name, res.text().await @@ -283,14 +280,14 @@ pub async fn sync_user_creation_with_ingestors( // forward the password reset request to all ingestors to keep them in sync pub async fn sync_password_reset_with_ingestors(username: &String) -> Result<(), RBACError> { let ingestor_infos = get_ingestor_info().await.map_err(|err| { - log::error!("Fatal: failed to get ingestor info: {:?}", err); + error!("Fatal: failed to get ingestor info: {:?}", err); RBACError::Anyhow(err) })?; let client = reqwest::Client::new(); for ingestor in ingestor_infos.iter() { if !utils::check_liveness(&ingestor.domain_name).await { - log::warn!("Ingestor {} is not live", ingestor.domain_name); + warn!("Ingestor {} is not live", ingestor.domain_name); continue; } let url = format!( @@ -307,16 +304,15 @@ pub async fn sync_password_reset_with_ingestors(username: &String) -> Result<(), .send() .await .map_err(|err| { - log::error!( + error!( "Fatal: failed to forward request to ingestor: {}\n Error: {:?}", - ingestor.domain_name, - err + ingestor.domain_name, err ); RBACError::Network(err) })?; if !res.status().is_success() { - log::error!( + error!( "failed to forward request to ingestor: {}\nResponse Returned: {:?}", ingestor.domain_name, res.text().await @@ -333,12 +329,12 @@ pub async fn sync_role_update_with_ingestors( body: Vec, ) -> Result<(), RoleError> { let ingestor_infos = get_ingestor_info().await.map_err(|err| { - log::error!("Fatal: failed to get ingestor info: {:?}", err); + error!("Fatal: failed to get ingestor info: {:?}", err); RoleError::Anyhow(err) })?; let roles = to_vec(&body).map_err(|err| { - log::error!("Fatal: failed to serialize roles: {:?}", err); + error!("Fatal: failed to serialize roles: {:?}", err); RoleError::SerdeError(err) })?; let roles = Bytes::from(roles); @@ -346,7 +342,7 @@ pub async fn sync_role_update_with_ingestors( for ingestor in ingestor_infos.iter() { if !utils::check_liveness(&ingestor.domain_name).await { - log::warn!("Ingestor {} is not live", ingestor.domain_name); + warn!("Ingestor {} is not live", ingestor.domain_name); continue; } let url = format!( @@ -364,16 +360,15 @@ pub async fn sync_role_update_with_ingestors( .send() .await .map_err(|err| { - log::error!( + error!( "Fatal: failed to forward request to ingestor: {}\n Error: {:?}", - ingestor.domain_name, - err + ingestor.domain_name, err ); RoleError::Network(err) })?; if !res.status().is_success() { - log::error!( + error!( "failed to forward request to ingestor: {}\nResponse Returned: {:?}", ingestor.domain_name, res.text().await @@ -393,7 +388,7 @@ pub async fn fetch_daily_stats_from_ingestors( let mut total_storage_size: u64 = 0; let ingestor_infos = get_ingestor_info().await.map_err(|err| { - log::error!("Fatal: failed to get ingestor info: {:?}", err); + error!("Fatal: failed to get ingestor info: {:?}", err); StreamError::Anyhow(err) })?; for ingestor in ingestor_infos.iter() { @@ -526,10 +521,9 @@ pub async fn send_stream_delete_request( .await .map_err(|err| { // log the error and return a custom error - log::error!( + error!( "Fatal: failed to delete stream: {}\n Error: {:?}", - ingestor.domain_name, - err + ingestor.domain_name, err ); StreamError::Network(err) })?; @@ -537,7 +531,7 @@ pub async fn send_stream_delete_request( // if the response is not successful, log the error and return a custom error // this could be a bit too much, but we need to be sure it covers all cases if !resp.status().is_success() { - log::error!( + error!( "failed to delete stream: {}\nResponse Returned: {:?}", ingestor.domain_name, resp.text().await @@ -567,10 +561,9 @@ pub async fn send_retention_cleanup_request( .await .map_err(|err| { // log the error and return a custom error - log::error!( + error!( "Fatal: failed to perform cleanup on retention: {}\n Error: {:?}", - ingestor.domain_name, - err + ingestor.domain_name, err ); ObjectStorageError::Custom(err.to_string()) })?; @@ -578,7 +571,7 @@ pub async fn send_retention_cleanup_request( // if the response is not successful, log the error and return a custom error // this could be a bit too much, but we need to be sure it covers all cases if !resp.status().is_success() { - log::error!( + error!( "failed to perform cleanup on retention: {}\nResponse Returned: {:?}", ingestor.domain_name, resp.status() @@ -586,7 +579,7 @@ pub async fn send_retention_cleanup_request( } let resp_data = resp.bytes().await.map_err(|err| { - log::error!("Fatal: failed to parse response to bytes: {:?}", err); + error!("Fatal: failed to parse response to bytes: {:?}", err); ObjectStorageError::Custom(err.to_string()) })?; @@ -596,7 +589,7 @@ pub async fn send_retention_cleanup_request( pub async fn get_cluster_info() -> Result { let ingestor_infos = get_ingestor_info().await.map_err(|err| { - log::error!("Fatal: failed to get ingestor info: {:?}", err); + error!("Fatal: failed to get ingestor info: {:?}", err); StreamError::Anyhow(err) })?; @@ -621,13 +614,13 @@ pub async fn get_cluster_info() -> Result { let status = Some(resp.status().to_string()); let resp_data = resp.bytes().await.map_err(|err| { - log::error!("Fatal: failed to parse ingestor info to bytes: {:?}", err); + error!("Fatal: failed to parse ingestor info to bytes: {:?}", err); StreamError::Network(err) })?; let sp = serde_json::from_slice::(&resp_data) .map_err(|err| { - log::error!("Fatal: failed to parse ingestor info: {:?}", err); + error!("Fatal: failed to parse ingestor info: {:?}", err); StreamError::SerdeError(err) })? .get("staging") @@ -665,7 +658,7 @@ pub async fn get_cluster_info() -> Result { pub async fn get_cluster_metrics() -> Result { let dresses = fetch_cluster_metrics().await.map_err(|err| { - log::error!("Fatal: failed to fetch cluster metrics: {:?}", err); + error!("Fatal: failed to fetch cluster metrics: {:?}", err); PostError::Invalid(err.into()) })?; @@ -737,13 +730,13 @@ pub async fn remove_ingestor(req: HttpRequest) -> Result Result, PostError> { let ingestor_metadata = get_ingestor_info().await.map_err(|err| { - log::error!("Fatal: failed to get ingestor info: {:?}", err); + error!("Fatal: failed to get ingestor info: {:?}", err); PostError::Invalid(err) })?; @@ -777,12 +770,12 @@ async fn fetch_cluster_metrics() -> Result, PostError> { let ingestor_metrics = Metrics::from_prometheus_samples(sample, &ingestor) .await .map_err(|err| { - log::error!("Fatal: failed to get ingestor metrics: {:?}", err); + error!("Fatal: failed to get ingestor metrics: {:?}", err); PostError::Invalid(err.into()) })?; dresses.push(ingestor_metrics); } else { - log::warn!( + warn!( "Failed to fetch metrics from ingestor: {}\n", &ingestor.domain_name, ); @@ -792,7 +785,7 @@ async fn fetch_cluster_metrics() -> Result, PostError> { } pub fn init_cluster_metrics_schedular() -> Result<(), PostError> { - log::info!("Setting up schedular for cluster metrics ingestion"); + info!("Setting up schedular for cluster metrics ingestion"); let mut scheduler = AsyncScheduler::new(); scheduler .every(CLUSTER_METRICS_INTERVAL_SECONDS) @@ -801,7 +794,7 @@ pub fn init_cluster_metrics_schedular() -> Result<(), PostError> { let cluster_metrics = fetch_cluster_metrics().await; if let Ok(metrics) = cluster_metrics { if !metrics.is_empty() { - log::info!("Cluster metrics fetched successfully from all ingestors"); + info!("Cluster metrics fetched successfully from all ingestors"); if let Ok(metrics_bytes) = serde_json::to_vec(&metrics) { if matches!( ingest_internal_stream( @@ -811,16 +804,12 @@ pub fn init_cluster_metrics_schedular() -> Result<(), PostError> { .await, Ok(()) ) { - log::info!( - "Cluster metrics successfully ingested into internal stream" - ); + info!("Cluster metrics successfully ingested into internal stream"); } else { - log::error!( - "Failed to ingest cluster metrics into internal stream" - ); + error!("Failed to ingest cluster metrics into internal stream"); } } else { - log::error!("Failed to serialize cluster metrics"); + error!("Failed to serialize cluster metrics"); } } } @@ -829,7 +818,7 @@ pub fn init_cluster_metrics_schedular() -> Result<(), PostError> { .await; if let Err(err) = result { - log::error!("Error in cluster metrics scheduler: {:?}", err); + error!("Error in cluster metrics scheduler: {:?}", err); } }); diff --git a/src/handlers/http/cluster/utils.rs b/src/handlers/http/cluster/utils.rs index 6f41755f4..41f0e91eb 100644 --- a/src/handlers/http/cluster/utils.rs +++ b/src/handlers/http/cluster/utils.rs @@ -25,6 +25,7 @@ use http::StatusCode; use itertools::Itertools; use reqwest::Response; use serde::{Deserialize, Serialize}; +use tracing::error; use url::Url; #[derive(Debug, Default, Serialize, Deserialize)] @@ -229,7 +230,7 @@ pub async fn check_liveness(domain_name: &str) -> bool { )) { Ok(uri) => uri, Err(err) => { - log::error!("Node Indentifier Failed To Parse: {}", err); + error!("Node Indentifier Failed To Parse: {}", err); return false; } }; @@ -262,20 +263,18 @@ pub async fn send_stats_request( .send() .await .map_err(|err| { - log::error!( + error!( "Fatal: failed to fetch stats from ingestor: {}\n Error: {:?}", - ingestor.domain_name, - err + ingestor.domain_name, err ); StreamError::Network(err) })?; if !res.status().is_success() { - log::error!( + error!( "failed to forward create stream request to ingestor: {}\nResponse Returned: {:?}", - ingestor.domain_name, - res + ingestor.domain_name, res ); return Err(StreamError::Custom { msg: format!( diff --git a/src/handlers/http/health_check.rs b/src/handlers/http/health_check.rs index af7de7762..1b97df121 100644 --- a/src/handlers/http/health_check.rs +++ b/src/handlers/http/health_check.rs @@ -26,6 +26,7 @@ use actix_web::{Error, HttpResponse}; use lazy_static::lazy_static; use std::sync::Arc; use tokio::signal::ctrl_c; +use tracing::info; use tokio::sync::{oneshot, Mutex}; @@ -68,11 +69,11 @@ pub async fn handle_signals(shutdown_signal: Arc { - log::info!("Received SIGINT signal at Readiness Probe Handler"); + info!("Received SIGINT signal at Readiness Probe Handler"); shutdown(shutdown_signal).await; }, _ = sigterm.recv() => { - log::info!("Received SIGTERM signal at Readiness Probe Handler"); + info!("Received SIGTERM signal at Readiness Probe Handler"); shutdown(shutdown_signal).await; } } diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index 8ac283cb2..1db58dcd4 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -25,6 +25,7 @@ use super::modal::utils::logstream_utils::{ }; use super::query::update_schema_when_distributed; use crate::alerts::Alerts; +use crate::catalog::get_first_event; use crate::event::format::update_data_type_to_datetime; use crate::handlers::STREAM_TYPE_KEY; use crate::hottier::{HotTierManager, StreamHotTier, CURRENT_HOT_TIER_VERSION}; @@ -34,7 +35,7 @@ use crate::option::{Mode, CONFIG}; use crate::stats::{event_labels_date, storage_size_labels_date, Stats}; use crate::storage::StreamType; use crate::storage::{retention::Retention, StorageDir, StreamInfo}; -use crate::{catalog, event, stats}; +use crate::{event, stats}; use crate::{metadata, validator}; use actix_web::http::header::{self, HeaderMap}; @@ -50,6 +51,7 @@ use std::collections::HashMap; use std::fs; use std::str::FromStr; use std::sync::Arc; +use tracing::{error, warn}; pub async fn delete(req: HttpRequest) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); @@ -62,7 +64,7 @@ pub async fn delete(req: HttpRequest) -> Result { objectstore.delete_stream(&stream_name).await?; let stream_dir = StorageDir::new(&stream_name); if fs::remove_dir_all(&stream_dir.data_path).is_err() { - log::warn!( + warn!( "failed to delete local data for stream {}. Clean {} manually", stream_name, stream_dir.data_path.to_string_lossy() @@ -77,9 +79,8 @@ pub async fn delete(req: HttpRequest) -> Result { metadata::STREAM_INFO.delete_stream(&stream_name); event::STREAM_WRITERS.delete_stream(&stream_name); - stats::delete_stats(&stream_name, "json").unwrap_or_else(|e| { - log::warn!("failed to delete stats for stream {}: {:?}", stream_name, e) - }); + stats::delete_stats(&stream_name, "json") + .unwrap_or_else(|e| warn!("failed to delete stats for stream {}: {:?}", stream_name, e)); Ok((format!("log stream {stream_name} deleted"), StatusCode::OK)) } @@ -577,11 +578,11 @@ pub async fn get_stream_info(req: HttpRequest) -> Result = Vec::new(); - if let Ok(Some(first_event_at)) = catalog::get_first_event(store, &stream_name, dates).await { + if let Ok(Some(first_event_at)) = get_first_event(store, &stream_name, dates).await { if let Err(err) = metadata::STREAM_INFO.set_first_event_at(&stream_name, Some(first_event_at)) { - log::error!( + error!( "Failed to update first_event_at in streaminfo for stream {:?} {err:?}", stream_name ); diff --git a/src/handlers/http/modal/ingest/ingestor_logstream.rs b/src/handlers/http/modal/ingest/ingestor_logstream.rs index 16711ebbb..151e5d745 100644 --- a/src/handlers/http/modal/ingest/ingestor_logstream.rs +++ b/src/handlers/http/modal/ingest/ingestor_logstream.rs @@ -20,6 +20,7 @@ use actix_web::{web, HttpRequest, Responder}; use bytes::Bytes; use http::StatusCode; use itertools::Itertools; +use tracing::{error, warn}; use crate::{ catalog::remove_manifest_from_snapshot, @@ -75,9 +76,8 @@ pub async fn delete(req: HttpRequest) -> Result { metadata::STREAM_INFO.delete_stream(&stream_name); event::STREAM_WRITERS.delete_stream(&stream_name); - stats::delete_stats(&stream_name, "json").unwrap_or_else(|e| { - log::warn!("failed to delete stats for stream {}: {:?}", stream_name, e) - }); + stats::delete_stats(&stream_name, "json") + .unwrap_or_else(|e| warn!("failed to delete stats for stream {}: {:?}", stream_name, e)); Ok((format!("log stream {stream_name} deleted"), StatusCode::OK)) } @@ -110,7 +110,7 @@ pub async fn put_enable_cache( .contains(&stream_name); if !check { - log::error!("Stream {} not found", stream_name.clone()); + error!("Stream {} not found", stream_name.clone()); return Err(StreamError::StreamNotFound(stream_name.clone())); } metadata::STREAM_INFO diff --git a/src/handlers/http/modal/ingest_server.rs b/src/handlers/http/modal/ingest_server.rs index 0e1226b83..829b529de 100644 --- a/src/handlers/http/modal/ingest_server.rs +++ b/src/handlers/http/modal/ingest_server.rs @@ -54,6 +54,7 @@ use bytes::Bytes; use once_cell::sync::Lazy; use relative_path::RelativePathBuf; use serde_json::Value; +use tracing::error; /// ! have to use a guard before using it pub static INGESTOR_META: Lazy = @@ -133,10 +134,10 @@ impl ParseableServer for IngestServer { remote_sync_inbox.send(()).unwrap_or(()); localsync_inbox.send(()).unwrap_or(()); if let Err(e) = localsync_handler.await { - log::error!("Error joining remote_sync_handler: {:?}", e); + error!("Error joining remote_sync_handler: {:?}", e); } if let Err(e) = remote_sync_handler.await { - log::error!("Error joining remote_sync_handler: {:?}", e); + error!("Error joining remote_sync_handler: {:?}", e); } return e }, @@ -148,7 +149,7 @@ impl ParseableServer for IngestServer { _ = &mut remote_sync_outbox => { // remote_sync failed, this is recoverable by just starting remote_sync thread again if let Err(e) = remote_sync_handler.await { - log::error!("Error joining remote_sync_handler: {:?}", e); + error!("Error joining remote_sync_handler: {:?}", e); } (remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync().await; } diff --git a/src/handlers/http/modal/mod.rs b/src/handlers/http/modal/mod.rs index 57d5313f2..4a4cbb842 100644 --- a/src/handlers/http/modal/mod.rs +++ b/src/handlers/http/modal/mod.rs @@ -39,6 +39,7 @@ use serde::Deserialize; use serde::Serialize; use ssl_acceptor::get_ssl_acceptor; use tokio::sync::{oneshot, Mutex}; +use tracing::{error, info, warn}; use super::cross_origin_config; use super::API_BASE_PATH; @@ -140,15 +141,15 @@ pub trait ParseableServer { let _ = shutdown_rx.await; // Perform S3 sync and wait for completion - log::info!("Starting data sync to S3..."); + info!("Starting data sync to S3..."); if let Err(e) = CONFIG.storage().get_object_store().sync(true).await { - log::warn!("Failed to sync local data with object store. {:?}", e); + warn!("Failed to sync local data with object store. {:?}", e); } else { - log::info!("Successfully synced all data to S3."); + info!("Successfully synced all data to S3."); } // Initiate graceful shutdown - log::info!("Graceful shutdown of HTTP server triggered"); + info!("Graceful shutdown of HTTP server triggered"); srv_handle.stop(true).await; }); @@ -157,14 +158,14 @@ pub trait ParseableServer { // Await the signal handler to ensure proper cleanup if let Err(e) = signal_task.await { - log::error!("Error in signal handler: {:?}", e); + error!("Error in signal handler: {:?}", e); } // Wait for the sync task to complete before exiting if let Err(e) = sync_task.await { - log::error!("Error in sync task: {:?}", e); + error!("Error in sync task: {:?}", e); } else { - log::info!("Sync task completed successfully."); + info!("Sync task completed successfully."); } // Return the result of the server diff --git a/src/handlers/http/modal/query/querier_logstream.rs b/src/handlers/http/modal/query/querier_logstream.rs index 86e887e3b..81d7ea2fb 100644 --- a/src/handlers/http/modal/query/querier_logstream.rs +++ b/src/handlers/http/modal/query/querier_logstream.rs @@ -24,6 +24,7 @@ use bytes::Bytes; use chrono::Utc; use http::StatusCode; use tokio::sync::Mutex; +use tracing::{error, warn}; static CREATE_STREAM_LOCK: Mutex<()> = Mutex::const_new(()); @@ -67,7 +68,7 @@ pub async fn delete(req: HttpRequest) -> Result { objectstore.delete_stream(&stream_name).await?; let stream_dir = StorageDir::new(&stream_name); if fs::remove_dir_all(&stream_dir.data_path).is_err() { - log::warn!( + warn!( "failed to delete local data for stream {}. Clean {} manually", stream_name, stream_dir.data_path.to_string_lossy() @@ -81,7 +82,7 @@ pub async fn delete(req: HttpRequest) -> Result { } let ingestor_metadata = cluster::get_ingestor_info().await.map_err(|err| { - log::error!("Fatal: failed to get ingestor info: {:?}", err); + error!("Fatal: failed to get ingestor info: {:?}", err); StreamError::from(err) })?; @@ -99,9 +100,8 @@ pub async fn delete(req: HttpRequest) -> Result { metadata::STREAM_INFO.delete_stream(&stream_name); event::STREAM_WRITERS.delete_stream(&stream_name); - stats::delete_stats(&stream_name, "json").unwrap_or_else(|e| { - log::warn!("failed to delete stats for stream {}: {:?}", stream_name, e) - }); + stats::delete_stats(&stream_name, "json") + .unwrap_or_else(|e| warn!("failed to delete stats for stream {}: {:?}", stream_name, e)); Ok((format!("log stream {stream_name} deleted"), StatusCode::OK)) } diff --git a/src/handlers/http/modal/query_server.rs b/src/handlers/http/modal/query_server.rs index ffde5fb11..0c7261be0 100644 --- a/src/handlers/http/modal/query_server.rs +++ b/src/handlers/http/modal/query_server.rs @@ -33,6 +33,7 @@ use actix_web::web::{resource, ServiceConfig}; use actix_web::{web, Scope}; use async_trait::async_trait; use bytes::Bytes; +use tracing::{error, info}; use crate::{option::CONFIG, ParseableServer}; @@ -105,7 +106,7 @@ impl ParseableServer for QueryServer { } if matches!(init_cluster_metrics_schedular(), Ok(())) { - log::info!("Cluster metrics scheduler started successfully"); + info!("Cluster metrics scheduler started successfully"); } if let Some(hot_tier_manager) = HotTierManager::global() { hot_tier_manager.put_internal_stream_hot_tier().await?; @@ -127,10 +128,10 @@ impl ParseableServer for QueryServer { remote_sync_inbox.send(()).unwrap_or(()); localsync_inbox.send(()).unwrap_or(()); if let Err(e) = localsync_handler.await { - log::error!("Error joining localsync_handler: {:?}", e); + error!("Error joining localsync_handler: {:?}", e); } if let Err(e) = remote_sync_handler.await { - log::error!("Error joining remote_sync_handler: {:?}", e); + error!("Error joining remote_sync_handler: {:?}", e); } return e }, @@ -142,7 +143,7 @@ impl ParseableServer for QueryServer { _ = &mut remote_sync_outbox => { // remote_sync failed, this is recoverable by just starting remote_sync thread again if let Err(e) = remote_sync_handler.await { - log::error!("Error joining remote_sync_handler: {:?}", e); + error!("Error joining remote_sync_handler: {:?}", e); } (remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync().await; } diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index b7b1e354b..f8f7dc2f2 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -42,6 +42,7 @@ use actix_web::Scope; use actix_web_static_files::ResourceFiles; use async_trait::async_trait; use bytes::Bytes; +use tracing::error; use crate::{ handlers::http::{ @@ -140,10 +141,10 @@ impl ParseableServer for Server { remote_sync_inbox.send(()).unwrap_or(()); localsync_inbox.send(()).unwrap_or(()); if let Err(e) = localsync_handler.await { - log::error!("Error joining remote_sync_handler: {:?}", e); + error!("Error joining remote_sync_handler: {:?}", e); } if let Err(e) = remote_sync_handler.await { - log::error!("Error joining remote_sync_handler: {:?}", e); + error!("Error joining remote_sync_handler: {:?}", e); } return e }, @@ -155,7 +156,7 @@ impl ParseableServer for Server { _ = &mut remote_sync_outbox => { // remote_sync failed, this is recoverable by just starting remote_sync thread again if let Err(e) = remote_sync_handler.await { - log::error!("Error joining remote_sync_handler: {:?}", e); + error!("Error joining remote_sync_handler: {:?}", e); } (remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync().await; } diff --git a/src/handlers/http/query.rs b/src/handlers/http/query.rs index 85d432a8a..7ff5ee77b 100644 --- a/src/handlers/http/query.rs +++ b/src/handlers/http/query.rs @@ -30,6 +30,7 @@ use std::collections::HashMap; use std::pin::Pin; use std::sync::Arc; use std::time::Instant; +use tracing::{error, info, warn}; use crate::event::error::EventError; use crate::handlers::http::fetch_schema; @@ -157,7 +158,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result Result<(), QueryError> { match (cache_results, query_cache_manager) { (Some(_), None) => { - log::warn!( - "Instructed to cache query results but Query Caching is not Enabled in Server" - ); + warn!("Instructed to cache query results but Query Caching is not Enabled in Server"); Ok(()) } // do cache (Some(should_cache), Some(query_cache_manager)) => { if should_cache != "true" { - log::error!("value of cache results header is false"); + error!("value of cache results header is false"); return Err(QueryError::CacheError(CacheError::Other( "should not cache results", ))); @@ -243,7 +242,7 @@ pub async fn put_results_in_cache( // guard to stop multiple caching of the same content if let Some(path) = cache.get_file(&cache_key) { - log::info!("File already exists in cache, Removing old file"); + info!("File already exists in cache, Removing old file"); cache.delete(&cache_key, path).await?; } @@ -251,13 +250,13 @@ pub async fn put_results_in_cache( .create_parquet_cache(stream, records, user_id, start, end, query) .await { - log::error!("Error occured while caching query results: {:?}", err); + error!("Error occured while caching query results: {:?}", err); if query_cache_manager .clear_cache(stream, user_id) .await .is_err() { - log::error!("Error Clearing Unwanted files from cache dir"); + error!("Error Clearing Unwanted files from cache dir"); } } // fallthrough @@ -280,14 +279,12 @@ pub async fn get_results_from_cache( ) -> Result { match (show_cached, query_cache_manager) { (Some(_), None) => { - log::warn!( - "Instructed to show cached results but Query Caching is not Enabled on Server" - ); + warn!("Instructed to show cached results but Query Caching is not Enabled on Server"); None } (Some(should_show), Some(query_cache_manager)) => { if should_show != "true" { - log::error!("value of show cached header is false"); + error!("value of show cached header is false"); return Err(QueryError::CacheError(CacheError::Other( "should not return cached results", ))); diff --git a/src/handlers/http/trino.rs b/src/handlers/http/trino.rs index 893a8bc4c..f764842bb 100644 --- a/src/handlers/http/trino.rs +++ b/src/handlers/http/trino.rs @@ -24,6 +24,7 @@ use actix_web::{ }; use http::HeaderMap; use serde_json::Value; +use tracing::warn; use trino_response::QueryResponse; use crate::{ @@ -197,7 +198,7 @@ pub async fn trino_get( break; } _ => { - log::warn!("state '{state}' not covered"); + warn!("state '{state}' not covered"); break; } } @@ -258,6 +259,7 @@ mod trino_response { use actix_web::{web, Responder}; use itertools::Itertools; use serde_json::{json, Map, Value}; + use tracing::info; use crate::handlers::http::query::QueryError; @@ -269,7 +271,7 @@ mod trino_response { impl QueryResponse { pub fn to_http(&self) -> Result { - log::info!("{}", "Returning query results"); + info!("{}", "Returning query results"); let values = if let Some(trino_records) = self.trino_records.clone() { // trino_records = Vec let mut json_records: Vec> = Vec::new(); diff --git a/src/handlers/livetail.rs b/src/handlers/livetail.rs index 3de8426e7..646c982c5 100644 --- a/src/handlers/livetail.rs +++ b/src/handlers/livetail.rs @@ -37,6 +37,7 @@ use arrow_flight::{ }; use tonic_web::GrpcWebLayer; use tower_http::cors::CorsLayer; +use tracing::{info, warn}; use crate::livetail::{Message, LIVETAIL}; use crate::metadata::STREAM_INFO; @@ -102,7 +103,7 @@ impl FlightService for FlightServiceImpl { let ticket: serde_json::Value = serde_json::from_slice(&req.into_inner().ticket) .map_err(|err| Status::internal(err.to_string()))?; let stream = extract_stream(&ticket)?; - log::info!("livetail requested for stream {}", stream); + info!("livetail requested for stream {}", stream); match Users.authorize(key, rbac::role::Action::Query, Some(stream), None) { rbac::Response::Authorized => (), rbac::Response::UnAuthorized => { @@ -128,7 +129,7 @@ impl FlightService for FlightServiceImpl { let rx = rx.map(move |x| match x { Message::Record(t) => Ok(utils::arrow::adapt_batch(&adapter_schema, &t)), Message::Skipped(_) => { - log::warn!("livetail channel capacity is full."); + warn!("livetail channel capacity is full."); Ok(RecordBatch::new_empty(adapter_schema.clone())) } }); diff --git a/src/hottier.rs b/src/hottier.rs index b6f29f609..cc4e4bbb1 100644 --- a/src/hottier.rs +++ b/src/hottier.rs @@ -45,6 +45,7 @@ use sysinfo::{Disks, System}; use tokio::fs::{self, DirEntry}; use tokio::io::AsyncWriteExt; use tokio_stream::wrappers::ReadDirStream; +use tracing::{error, warn}; pub const STREAM_HOT_TIER_FILENAME: &str = ".hot_tier.json"; pub const MIN_STREAM_HOT_TIER_SIZE_BYTES: u64 = 10737418240; // 10 GiB @@ -151,7 +152,7 @@ impl HotTierManager { - existing_hot_tier_used_size as f64); if stream_hot_tier_size as f64 > max_allowed_hot_tier_size { - log::error!("disk_threshold: {}, used_disk_space: {}, total_hot_tier_used_size: {}, existing_hot_tier_used_size: {}, total_hot_tier_size: {}", + error!("disk_threshold: {}, used_disk_space: {}, total_hot_tier_used_size: {}, existing_hot_tier_used_size: {}, total_hot_tier_size: {}", bytes_to_human_size(disk_threshold as u64), bytes_to_human_size(used_disk_space), bytes_to_human_size(total_hot_tier_used_size), bytes_to_human_size(existing_hot_tier_used_size), bytes_to_human_size(total_hot_tier_size)); return Err(HotTierError::ObjectStorageError(ObjectStorageError::Custom(format!( "{} is the total usable disk space for hot tier, cannot set a bigger value.", bytes_to_human_size(max_allowed_hot_tier_size as u64) @@ -222,7 +223,7 @@ impl HotTierManager { .plus(Interval::Seconds(5)) .run(move || async { if let Err(err) = self.sync_hot_tier().await { - log::error!("Error in hot tier scheduler: {:?}", err); + error!("Error in hot tier scheduler: {:?}", err); } }); @@ -249,7 +250,7 @@ impl HotTierManager { let res: Vec<_> = sync_hot_tier_tasks.collect().await; for res in res { if let Err(err) = res { - log::error!("Failed to run hot tier sync task {err:?}"); + error!("Failed to run hot tier sync task {err:?}"); return Err(err); } } @@ -331,7 +332,7 @@ impl HotTierManager { break; } } else { - log::warn!("Invalid date format: {}", str_date); + warn!("Invalid date format: {}", str_date); } } } diff --git a/src/localcache.rs b/src/localcache.rs index 07ee3eaaa..9ac6d5edd 100644 --- a/src/localcache.rs +++ b/src/localcache.rs @@ -27,6 +27,7 @@ use object_store::{local::LocalFileSystem, ObjectStore}; use once_cell::sync::OnceCell; use parquet::errors::ParquetError; use tokio::{fs, sync::Mutex}; +use tracing::{error, info, warn}; use crate::{metadata::error::stream_info::MetadataError, option::CONFIG}; @@ -121,10 +122,9 @@ impl LocalCacheManager { SpecificSize::new(meta.size_capacity as f64, Byte) .unwrap() .into(); - log::warn!( + warn!( "Cache size is updated from {} to {}", - current_size_human, - configured_size_human + current_size_human, configured_size_human ); meta.size_capacity = config_capacity; Some(meta) @@ -145,7 +145,7 @@ impl LocalCacheManager { .filesystem .put(&path, serde_json::to_vec(&updated_cache)?.into()) .await?; - log::info!("Cache meta file updated: {:?}", result); + info!("Cache meta file updated: {:?}", result); } Ok(()) @@ -170,7 +170,7 @@ impl LocalCacheManager { let path = cache_file_path(&self.cache_path, stream).unwrap(); let bytes = serde_json::to_vec(cache)?.into(); let result = self.filesystem.put(&path, bytes).await?; - log::info!("Cache file updated: {:?}", result); + info!("Cache file updated: {:?}", result); Ok(()) } @@ -192,10 +192,10 @@ impl LocalCacheManager { if let Some((_, file_for_removal)) = cache.files.pop_lru() { let lru_file_size = std::fs::metadata(&file_for_removal)?.len(); cache.current_size = cache.current_size.saturating_sub(lru_file_size); - log::info!("removing cache entry"); + info!("removing cache entry"); tokio::spawn(fs::remove_file(file_for_removal)); } else { - log::error!("Cache size too small"); + error!("Cache size too small"); break; } } diff --git a/src/metrics/prom_utils.rs b/src/metrics/prom_utils.rs index e97efe6d2..dab1aaff8 100644 --- a/src/metrics/prom_utils.rs +++ b/src/metrics/prom_utils.rs @@ -28,6 +28,8 @@ use prometheus_parse::Value as PromValue; use serde::Serialize; use serde_json::Error as JsonError; use serde_json::Value as JsonValue; +use tracing::error; +use tracing::warn; use url::Url; #[derive(Debug, Serialize, Clone)] @@ -207,7 +209,7 @@ impl Metrics { let (commit_id, staging, cache) = Self::from_about_api_response(ingestor_metadata.clone()) .await .map_err(|err| { - log::error!("Fatal: failed to get ingestor info: {:?}", err); + error!("Fatal: failed to get ingestor info: {:?}", err); PostError::Invalid(err.into()) })?; @@ -257,7 +259,7 @@ impl Metrics { cache.to_string(), )) } else { - log::warn!( + warn!( "Failed to fetch about API response from ingestor: {}\n", &ingestor_metadata.domain_name, ); diff --git a/src/migration/mod.rs b/src/migration/mod.rs index 53ab5a511..2f5534821 100644 --- a/src/migration/mod.rs +++ b/src/migration/mod.rs @@ -39,6 +39,7 @@ use itertools::Itertools; use relative_path::RelativePathBuf; use serde::Serialize; use serde_json::Value; +use tracing::error; /// Migrate the metdata from v1 or v2 to v3 /// This is a one time migration @@ -289,7 +290,7 @@ async fn migration_stream(stream: &str, storage: &dyn ObjectStorage) -> anyhow:: load_stream_metadata_on_server_start(storage, stream, arrow_schema, stream_metadata_value) .await { - log::error!("could not populate local metadata. {:?}", err); + error!("could not populate local metadata. {:?}", err); return Err(err.into()); } diff --git a/src/querycache.rs b/src/querycache.rs index 4086de7b8..3169e8061 100644 --- a/src/querycache.rs +++ b/src/querycache.rs @@ -30,6 +30,7 @@ use std::collections::HashMap; use std::path::{Path, PathBuf}; use tokio::fs as AsyncFs; use tokio::{fs, sync::Mutex}; +use tracing::{error, info, warn}; use crate::handlers::http::users::USERS_ROOT_DIR; use crate::metadata::STREAM_INFO; @@ -211,10 +212,9 @@ impl QueryCacheManager { SpecificSize::new(meta.size_capacity as f64, Byte) .unwrap() .into(); - log::warn!( + warn!( "Cache size is updated from {} to {}", - current_size_human, - configured_size_human + current_size_human, configured_size_human ); meta.size_capacity = config_capacity; Some(meta) @@ -235,7 +235,7 @@ impl QueryCacheManager { .filesystem .put(&path, serde_json::to_vec(&updated_cache)?.into()) .await?; - log::info!("Cache meta file updated: {:?}", result); + info!("Cache meta file updated: {:?}", result); } Ok(()) @@ -283,7 +283,7 @@ impl QueryCacheManager { let bytes = serde_json::to_vec(cache)?.into(); let result = self.filesystem.put(&path, bytes).await?; - log::info!("Cache file updated: {:?}", result); + info!("Cache file updated: {:?}", result); Ok(()) } @@ -302,10 +302,10 @@ impl QueryCacheManager { if let Some((_, file_for_removal)) = cache.files.pop_lru() { let lru_file_size = fs::metadata(&file_for_removal).await?.len(); cache.current_size = cache.current_size.saturating_sub(lru_file_size); - log::info!("removing cache entry"); + info!("removing cache entry"); tokio::spawn(fs::remove_file(file_for_removal)); } else { - log::error!("Cache size too small"); + error!("Cache size too small"); break; } } @@ -350,7 +350,7 @@ impl QueryCacheManager { for record in records { if let Err(e) = arrow_writer.write(record).await { - log::error!("Error While Writing to Query Cache: {}", e); + error!("Error While Writing to Query Cache: {}", e); } } diff --git a/src/response.rs b/src/response.rs index e2abfa2d2..a18827d71 100644 --- a/src/response.rs +++ b/src/response.rs @@ -28,6 +28,7 @@ use datafusion::arrow::record_batch::RecordBatch; use itertools::Itertools; use serde_json::{json, Value}; use tonic::{Response, Status}; +use tracing::info; pub struct QueryResponse { pub records: Vec, @@ -38,7 +39,7 @@ pub struct QueryResponse { impl QueryResponse { pub fn to_http(&self) -> Result { - log::info!("{}", "Returning query results"); + info!("{}", "Returning query results"); let records: Vec<&RecordBatch> = self.records.iter().collect(); let mut json_records = record_batches_to_json(&records)?; diff --git a/src/stats.rs b/src/stats.rs index 52ffc3b24..0016dbdbe 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -1,3 +1,5 @@ +use tracing::warn; + /* * Parseable Server (C) 2022 - 2024 Parseable, Inc. * @@ -153,7 +155,7 @@ pub async fn update_deleted_stats( let stats = get_current_stats(stream_name, "json"); if let Some(stats) = stats { if let Err(e) = storage.put_stats(stream_name, &stats).await { - log::warn!("Error updating stats to objectstore due to error [{}]", e); + warn!("Error updating stats to objectstore due to error [{}]", e); } } diff --git a/src/storage/azure_blob.rs b/src/storage/azure_blob.rs index d50e2d901..0f3099585 100644 --- a/src/storage/azure_blob.rs +++ b/src/storage/azure_blob.rs @@ -2,6 +2,7 @@ use bytes::Bytes; use datafusion::datasource::listing::ListingTableUrl; use futures::stream::FuturesUnordered; use futures::{StreamExt, TryStreamExt}; +use tracing::{error, info}; /* * Parseable Server (C) 2022 - 2024 Parseable, Inc. * @@ -251,11 +252,11 @@ impl BlobStore { match x { Ok(obj) => { if (self.client.delete(&obj.location).await).is_err() { - log::error!("Failed to fetch object during delete stream"); + error!("Failed to fetch object during delete stream"); } } Err(_) => { - log::error!("Failed to fetch object during delete stream"); + error!("Failed to fetch object during delete stream"); } }; }) @@ -355,7 +356,7 @@ impl BlobStore { } else { let bytes = tokio::fs::read(path).await?; let result = self.client.put(&key.into(), bytes.into()).await?; - log::info!("Uploaded file to Azure Blob Storage: {:?}", result); + info!("Uploaded file to Azure Blob Storage: {:?}", result); Ok(()) }; @@ -378,7 +379,7 @@ impl BlobStore { // /* `abort_multipart()` has been removed */ // // let close_multipart = |err| async move { - // // log::error!("multipart upload failed. {:?}", err); + // // error!("multipart upload failed. {:?}", err); // // self.client // // .abort_multipart(&key.into(), &multipart_id) // // .await @@ -561,10 +562,10 @@ impl ObjectStorage for BlobStore { // if the object is not found, it is not an error // the given url path was incorrect if matches!(err, object_store::Error::NotFound { .. }) { - log::error!("Node does not exist"); + error!("Node does not exist"); Err(err.into()) } else { - log::error!("Error deleting ingestor meta file: {:?}", err); + error!("Error deleting ingestor meta file: {:?}", err); Err(err.into()) } } diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 78f51685d..e70c326bd 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -48,6 +48,7 @@ use datafusion::{datasource::listing::ListingTableUrl, execution::runtime_env::R use itertools::Itertools; use relative_path::RelativePath; use relative_path::RelativePathBuf; +use tracing::error; use std::collections::BTreeMap; use std::{ @@ -306,7 +307,7 @@ pub trait ObjectStorage: Send + Sync + 'static { if let Ok(alerts) = serde_json::from_slice(&alerts) { Ok(alerts) } else { - log::error!("Incompatible alerts found for stream - {stream_name}. Refer https://www.parseable.io/docs/alerts for correct alert config."); + error!("Incompatible alerts found for stream - {stream_name}. Refer https://www.parseable.io/docs/alerts for correct alert config."); Ok(Alerts::default()) } } @@ -611,7 +612,7 @@ pub trait ObjectStorage: Send + Sync + 'static { // Try uploading the file, handle potential errors without breaking the loop if let Err(e) = self.upload_file(&stream_relative_path, &file).await { - log::error!("Failed to upload file {}: {:?}", filename, e); + error!("Failed to upload file {}: {:?}", filename, e); continue; // Skip to the next file } @@ -647,7 +648,7 @@ pub trait ObjectStorage: Send + Sync + 'static { .move_to_cache(&stream, storage_path, file.to_owned()) .await { - log::error!("Failed to move file to cache: {:?}", e); + error!("Failed to move file to cache: {:?}", e); } } } diff --git a/src/storage/retention.rs b/src/storage/retention.rs index c65c94c88..24ffbf199 100644 --- a/src/storage/retention.rs +++ b/src/storage/retention.rs @@ -27,6 +27,7 @@ use clokwerk::TimeUnits; use derive_more::Display; use once_cell::sync::Lazy; use tokio::task::JoinHandle; +use tracing::{info, warn}; use crate::metadata::STREAM_INFO; @@ -35,12 +36,12 @@ type SchedulerHandle = JoinHandle<()>; static SCHEDULER_HANDLER: Lazy>> = Lazy::new(|| Mutex::new(None)); pub fn load_retention_from_global() { - log::info!("loading retention for all streams"); + info!("loading retention for all streams"); init_scheduler(); } pub fn init_scheduler() { - log::info!("Setting up scheduler"); + info!("Setting up scheduler"); let mut scheduler = AsyncScheduler::new(); let func = move || async { //get retention every day at 12 am @@ -63,7 +64,7 @@ pub fn init_scheduler() { } } Err(err) => { - log::warn!("failed to load retention config for {stream} due to {err:?}") + warn!("failed to load retention config for {stream} due to {err:?}") } }; } @@ -84,7 +85,7 @@ pub fn init_scheduler() { }); *SCHEDULER_HANDLER.lock().unwrap() = Some(scheduler_handler); - log::info!("Scheduler is initialized") + info!("Scheduler is initialized") } #[derive(Debug, Clone, PartialEq, Eq, Default, serde::Serialize, serde::Deserialize)] @@ -176,9 +177,10 @@ mod action { use futures::{stream::FuturesUnordered, StreamExt}; use itertools::Itertools; use relative_path::RelativePathBuf; + use tracing::{error, info}; pub(super) async fn delete(stream_name: String, days: u32) { - log::info!("running retention task - delete for stream={stream_name}"); + info!("running retention task - delete for stream={stream_name}"); let store = CONFIG.storage().get_object_store(); let retain_until = get_retain_until(Utc::now().date_naive(), days as u64); @@ -212,7 +214,7 @@ mod action { for res in res { if let Err(err) = res { - log::error!("Failed to run delete task {err:?}"); + error!("Failed to run delete task {err:?}"); return; } } @@ -220,7 +222,7 @@ mod action { if let Err(err) = metadata::STREAM_INFO.set_first_event_at(&stream_name, first_event_at) { - log::error!( + error!( "Failed to update first_event_at in streaminfo for stream {:?} {err:?}", stream_name ); diff --git a/src/storage/s3.rs b/src/storage/s3.rs index 89f5b361d..a501c6250 100644 --- a/src/storage/s3.rs +++ b/src/storage/s3.rs @@ -30,6 +30,7 @@ use object_store::limit::LimitStore; use object_store::path::Path as StorePath; use object_store::{BackoffConfig, ClientOptions, ObjectStore, PutPayload, RetryConfig}; use relative_path::{RelativePath, RelativePathBuf}; +use tracing::{error, info}; use std::collections::BTreeMap; use std::fmt::Display; @@ -379,11 +380,11 @@ impl S3 { match x { Ok(obj) => { if (self.client.delete(&obj.location).await).is_err() { - log::error!("Failed to fetch object during delete stream"); + error!("Failed to fetch object during delete stream"); } } Err(_) => { - log::error!("Failed to fetch object during delete stream"); + error!("Failed to fetch object during delete stream"); } }; }) @@ -483,7 +484,7 @@ impl S3 { } else { let bytes = tokio::fs::read(path).await?; let result = self.client.put(&key.into(), bytes.into()).await?; - log::info!("Uploaded file to S3: {:?}", result); + info!("Uploaded file to S3: {:?}", result); Ok(()) }; @@ -506,7 +507,7 @@ impl S3 { // /* `abort_multipart()` has been removed */ // // let close_multipart = |err| async move { - // // log::error!("multipart upload failed. {:?}", err); + // // error!("multipart upload failed. {:?}", err); // // self.client // // .abort_multipart(&key.into(), &multipart_id) // // .await @@ -689,10 +690,10 @@ impl ObjectStorage for S3 { // if the object is not found, it is not an error // the given url path was incorrect if matches!(err, object_store::Error::NotFound { .. }) { - log::error!("Node does not exist"); + error!("Node does not exist"); Err(err.into()) } else { - log::error!("Error deleting ingestor meta file: {:?}", err); + error!("Error deleting ingestor meta file: {:?}", err); Err(err.into()) } } diff --git a/src/storage/staging.rs b/src/storage/staging.rs index 173fb5d5e..85df86996 100644 --- a/src/storage/staging.rs +++ b/src/storage/staging.rs @@ -50,6 +50,7 @@ use std::{ process, sync::Arc, }; +use tracing::{error, info}; const ARROW_FILE_EXTENSION: &str = "data.arrows"; // const PARQUET_FILE_EXTENSION: &str = "data.parquet"; @@ -175,10 +176,9 @@ impl StorageDir { rand::distributions::Alphanumeric.sample_string(&mut rand::thread_rng(), 15); for arrow_file_path in arrow_files { if arrow_file_path.metadata().unwrap().len() == 0 { - log::error!( + error!( "Invalid arrow file {:?} detected for stream {}, removing it", - &arrow_file_path, - stream + &arrow_file_path, stream ); fs::remove_file(&arrow_file_path).unwrap(); } else { @@ -244,7 +244,7 @@ pub fn convert_disk_files_to_parquet( .set(0); } - // log::warn!("staging files-\n{staging_files:?}\n"); + // warn!("staging files-\n{staging_files:?}\n"); for (parquet_path, files) in staging_files { metrics::STAGING_FILES .with_label_values(&[stream]) @@ -291,19 +291,18 @@ pub fn convert_disk_files_to_parquet( writer.close()?; if parquet_file.metadata().unwrap().len() < parquet::file::FOOTER_SIZE as u64 { - log::error!( + error!( "Invalid parquet file {:?} detected for stream {}, removing it", - &parquet_path, - stream + &parquet_path, stream ); fs::remove_file(parquet_path).unwrap(); } else { for file in files { - // log::warn!("file-\n{file:?}\n"); + // warn!("file-\n{file:?}\n"); let file_size = file.metadata().unwrap().len(); let file_type = file.extension().unwrap().to_str().unwrap(); if fs::remove_file(file.clone()).is_err() { - log::error!("Failed to delete file. Unstable state"); + error!("Failed to delete file. Unstable state"); process::abort() } metrics::STORAGE_SIZE @@ -400,16 +399,15 @@ pub fn get_ingestor_info() -> anyhow::Result { // compare url endpoint and port if meta.domain_name != url { - log::info!( + info!( "Domain Name was Updated. Old: {} New: {}", - meta.domain_name, - url + meta.domain_name, url ); meta.domain_name = url; } if meta.port != port { - log::info!("Port was Updated. Old: {} New: {}", meta.port, port); + info!("Port was Updated. Old: {} New: {}", meta.port, port); meta.port = port; } @@ -422,10 +420,9 @@ pub fn get_ingestor_info() -> anyhow::Result { if meta.token != token { // TODO: Update the message to be more informative with username and password - log::info!( + info!( "Credentials were Updated. Old: {} New: {}", - meta.token, - token + meta.token, token ); meta.token = token; } diff --git a/src/sync.rs b/src/sync.rs index 2367f3de8..2a06d88aa 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -21,6 +21,7 @@ use std::panic::AssertUnwindSafe; use tokio::sync::oneshot; use tokio::task; use tokio::time::{interval, sleep, Duration}; +use tracing::{error, info, warn}; use crate::option::CONFIG; use crate::{storage, STORAGE_UPLOAD_INTERVAL}; @@ -41,7 +42,7 @@ pub async fn object_store_sync() -> ( .plus(5u32.seconds()) .run(|| async { if let Err(e) = CONFIG.storage().get_object_store().sync(false).await { - log::warn!("failed to sync local data with object store. {:?}", e); + warn!("failed to sync local data with object store. {:?}", e); } }); @@ -56,7 +57,7 @@ pub async fn object_store_sync() -> ( Ok(_) => break, Err(tokio::sync::oneshot::error::TryRecvError::Empty) => continue, Err(tokio::sync::oneshot::error::TryRecvError::Closed) => { - log::warn!("Inbox channel closed unexpectedly"); + warn!("Inbox channel closed unexpectedly"); break; } } @@ -68,12 +69,12 @@ pub async fn object_store_sync() -> ( future.await; } Err(panic_error) => { - log::error!("Panic in object store sync task: {:?}", panic_error); + error!("Panic in object store sync task: {:?}", panic_error); let _ = outbox_tx.send(()); } } - log::info!("Object store sync task ended"); + info!("Object store sync task ended"); }); (handle, outbox_rx, inbox_tx) @@ -88,7 +89,7 @@ pub async fn run_local_sync() -> ( let (inbox_tx, inbox_rx) = oneshot::channel::<()>(); let handle = task::spawn(async move { - log::info!("Local sync task started"); + info!("Local sync task started"); let mut inbox_rx = inbox_rx; let result = std::panic::catch_unwind(AssertUnwindSafe(|| async move { @@ -111,7 +112,7 @@ pub async fn run_local_sync() -> ( Ok(_) => break, Err(tokio::sync::oneshot::error::TryRecvError::Empty) => continue, Err(tokio::sync::oneshot::error::TryRecvError::Closed) => { - log::warn!("Inbox channel closed unexpectedly"); + warn!("Inbox channel closed unexpectedly"); break; } } @@ -123,12 +124,12 @@ pub async fn run_local_sync() -> ( future.await; } Err(panic_error) => { - log::error!("Panic in local sync task: {:?}", panic_error); + error!("Panic in local sync task: {:?}", panic_error); } } let _ = outbox_tx.send(()); - log::info!("Local sync task ended"); + info!("Local sync task ended"); }); (handle, outbox_rx, inbox_tx) diff --git a/src/utils/arrow/merged_reader.rs b/src/utils/arrow/merged_reader.rs index e8f31e7ca..3248bd37d 100644 --- a/src/utils/arrow/merged_reader.rs +++ b/src/utils/arrow/merged_reader.rs @@ -27,6 +27,7 @@ use std::{ path::PathBuf, sync::Arc, }; +use tracing::error; use super::{ adapt_batch, @@ -46,13 +47,13 @@ impl MergedRecordReader { for file in files { //remove empty files before reading if file.metadata().unwrap().len() == 0 { - log::error!("Invalid file detected, removing it: {:?}", file); + error!("Invalid file detected, removing it: {:?}", file); fs::remove_file(file).unwrap(); } else { let Ok(reader) = StreamReader::try_new(BufReader::new(File::open(file).unwrap()), None) else { - log::error!("Invalid file detected, ignoring it: {:?}", file); + error!("Invalid file detected, ignoring it: {:?}", file); continue; }; @@ -85,7 +86,7 @@ impl MergedReverseRecordReader { let Ok(reader) = utils::arrow::reverse_reader::get_reverse_reader(File::open(file).unwrap()) else { - log::error!("Invalid file detected, ignoring it: {:?}", file); + error!("Invalid file detected, ignoring it: {:?}", file); continue; }; diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 2c8cbb87d..2db5567bf 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -34,6 +34,7 @@ use regex::Regex; use sha2::{Digest, Sha256}; use std::collections::HashMap; use std::env; +use tracing::debug; use url::Url; #[allow(dead_code)] pub fn hostname() -> Option { @@ -306,7 +307,7 @@ pub fn get_ingestor_id() -> String { hasher.update(now); let result = format!("{:x}", hasher.finalize()); let result = result.split_at(15).0.to_string(); - log::debug!("Ingestor ID: {}", &result); + debug!("Ingestor ID: {}", &result); result }