Skip to content

Commit

Permalink
fix for daily stats for querier (#811)
Browse files Browse the repository at this point in the history
returns querier stats + sum of all ingestors' stats
  • Loading branch information
nikhilsinhaparseable authored Jun 7, 2024
1 parent 6129992 commit 3ffa9f1
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 5 deletions.
62 changes: 62 additions & 0 deletions server/src/handlers/http/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::handlers::{STATIC_SCHEMA_FLAG, TIME_PARTITION_KEY};
use crate::option::CONFIG;

use crate::metrics::prom_utils::Metrics;
use crate::stats::Stats;
use crate::storage::object_storage::ingestor_metadata_path;
use crate::storage::PARSEABLE_ROOT_DIRECTORY;
use crate::storage::{ObjectStorageError, STREAM_ROOT_DIRECTORY};
Expand Down Expand Up @@ -156,6 +157,67 @@ pub async fn sync_streams_with_ingestors(
Ok(())
}

pub async fn fetch_daily_stats_from_ingestors(
stream_name: &str,
date: &str,
) -> Result<Stats, StreamError> {
let mut total_events_ingested: u64 = 0;
let mut total_ingestion_size: u64 = 0;
let mut total_storage_size: u64 = 0;

let ingestor_infos = get_ingestor_info().await.map_err(|err| {
log::error!("Fatal: failed to get ingestor info: {:?}", err);
StreamError::Anyhow(err)
})?;
for ingestor in ingestor_infos.iter() {
let uri = Url::parse(&format!(
"{}{}/metrics",
&ingestor.domain_name,
base_path_without_preceding_slash()
))
.map_err(|err| {
StreamError::Anyhow(anyhow::anyhow!("Invalid URL in Ingestor Metadata: {}", err))
})?;

let res = reqwest::Client::new()
.get(uri)
.header(header::CONTENT_TYPE, "application/json")
.send()
.await;

if let Ok(res) = res {
let text = res
.text()
.await
.map_err(|err| StreamError::Anyhow(anyhow::anyhow!("Request failed: {}", err)))?;
let lines: Vec<Result<String, std::io::Error>> =
text.lines().map(|line| Ok(line.to_owned())).collect_vec();

let sample = prometheus_parse::Scrape::parse(lines.into_iter())
.map_err(|err| {
StreamError::Anyhow(anyhow::anyhow!(
"Invalid URL in Ingestor Metadata: {}",
err
))
})?
.samples;

let (events_ingested, ingestion_size, storage_size) =
Metrics::get_daily_stats_from_samples(sample, stream_name, date);
total_events_ingested += events_ingested;
total_ingestion_size += ingestion_size;
total_storage_size += storage_size;
}
}

let stats = Stats {
events: total_events_ingested,
ingestion: total_ingestion_size,
storage: total_storage_size,
};
Ok(stats)
}

/// get the cumulative stats from all ingestors
pub async fn fetch_stats_from_ingestors(
stream_name: &str,
Expand Down
26 changes: 21 additions & 5 deletions server/src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
use self::error::{CreateStreamError, StreamError};
use super::base_path_without_preceding_slash;
use super::cluster::utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats};
use super::cluster::{fetch_stats_from_ingestors, INTERNAL_STREAM_NAME};
use super::cluster::{
fetch_daily_stats_from_ingestors, fetch_stats_from_ingestors, INTERNAL_STREAM_NAME,
};
use crate::alerts::Alerts;
use crate::handlers::{
CUSTOM_PARTITION_KEY, STATIC_SCHEMA_FLAG, TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY,
Expand Down Expand Up @@ -607,10 +609,24 @@ pub async fn get_stats(req: HttpRequest) -> Result<impl Responder, StreamError>
}

if !date_value.is_empty() {
let stats = get_stats_date(&stream_name, date_value).await?;
let stats = serde_json::to_value(stats)?;

return Ok((web::Json(stats), StatusCode::OK));
if CONFIG.parseable.mode == Mode::Query {
let querier_stats = get_stats_date(&stream_name, date_value).await?;
let ingestor_stats =
fetch_daily_stats_from_ingestors(&stream_name, date_value).await?;
let total_stats = Stats {
events: querier_stats.events + ingestor_stats.events,
ingestion: querier_stats.ingestion + ingestor_stats.ingestion,
storage: querier_stats.storage + ingestor_stats.storage,
};
let stats = serde_json::to_value(total_stats)?;

return Ok((web::Json(stats), StatusCode::OK));
} else {
let stats = get_stats_date(&stream_name, date_value).await?;
let stats = serde_json::to_value(stats)?;

return Ok((web::Json(stats), StatusCode::OK));
}
}
}

Expand Down
41 changes: 41 additions & 0 deletions server/src/metrics/prom_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,47 @@ impl Metrics {
}

impl Metrics {
pub fn get_daily_stats_from_samples(
samples: Vec<PromSample>,
stream_name: &str,
date: &str,
) -> (u64, u64, u64) {
let mut events_ingested: u64 = 0;
let mut ingestion_size: u64 = 0;
let mut storage_size: u64 = 0;
for sample in samples {
if let PromValue::Gauge(val) = sample.value {
match sample.metric.as_str() {
"parseable_events_ingested_date" => {
if sample.labels.get("stream").expect("stream name is present")
== stream_name
&& sample.labels.get("date").expect("date is present") == date
{
events_ingested = val as u64;
}
}
"parseable_events_ingested_size_date" => {
if sample.labels.get("stream").expect("stream name is present")
== stream_name
&& sample.labels.get("date").expect("date is present") == date
{
ingestion_size = val as u64;
}
}
"parseable_events_storage_size_date" => {
if sample.labels.get("stream").expect("stream name is present")
== stream_name
&& sample.labels.get("date").expect("date is present") == date
{
storage_size = val as u64;
}
}
_ => {}
}
}
}
(events_ingested, ingestion_size, storage_size)
}
pub async fn from_prometheus_samples(
samples: Vec<PromSample>,
ingestor_metadata: &IngestorMetadata,
Expand Down

0 comments on commit 3ffa9f1

Please sign in to comment.