Skip to content

Commit

Permalink
feat(loki_client): handle absent logs gracefully
Browse files Browse the repository at this point in the history
We have formatting code for it, but didn't use our Option=None path.
  • Loading branch information
alextes committed Oct 11, 2023
1 parent eea0e9c commit dfaffd8
Showing 1 changed file with 38 additions and 32 deletions.
70 changes: 38 additions & 32 deletions src/phoenix/inclusion_monitor/loki_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ fn extract_date_time(json_map: &serde_json::Value, key: &str) -> anyhow::Result<
/// Takes the payload logs response and extracts the first log line. We rely on the query being
/// crafted in such a way that the oldest matching result is the one we're looking for.
fn parse_payload_published_log(text: &str) -> anyhow::Result<Option<PayloadLogStats>> {
let payload_published_log: serde_json::Value = {
let payload_published_log: Option<serde_json::Value> = {
let log_response_json: serde_json::Value = serde_json::from_str(text)
.context("failed to parse payload log request body as JSON")?;

Expand All @@ -45,39 +45,45 @@ fn parse_payload_published_log(text: &str) -> anyhow::Result<Option<PayloadLogSt
// Each of these lines is expected to contain parsed JSON values in the `stream` field and
// raw values in the `values` field.
.and_then(|line| line.get("stream"))
.context("failed to get stream from payload published log")
.cloned()?
.cloned()
};

let received_at = extract_date_time(&payload_published_log, "timestampRequestStart")?;
let decoded_at = extract_date_time(&payload_published_log, "timestampAfterDecode")?;
let pre_publish_at = extract_date_time(&payload_published_log, "timestampBeforePublishing")?;
let post_publish_at = extract_date_time(&payload_published_log, "timestampAfterPublishing")?;
let decoded_at_slot_age_ms = payload_published_log["msIntoSlot"]
.as_str()
.and_then(|s| s.parse::<i64>().ok())
.context("failed to parse msIntoSlot as i64")?;

let pre_publish_duration_ms = pre_publish_at
.signed_duration_since(received_at)
.num_milliseconds();

let publish_duration_ms = post_publish_at
.signed_duration_since(pre_publish_at)
.num_milliseconds();

let request_download_duration_ms = decoded_at
.signed_duration_since(received_at)
.num_milliseconds();

let payload_log_stats = PayloadLogStats {
decoded_at_slot_age_ms,
pre_publish_duration_ms,
publish_duration_ms,
request_download_duration_ms,
};

Ok(Some(payload_log_stats))
match payload_published_log {
None => Ok(None),
Some(payload_published_log) => {
let received_at = extract_date_time(&payload_published_log, "timestampRequestStart")?;
let decoded_at = extract_date_time(&payload_published_log, "timestampAfterDecode")?;
let pre_publish_at =
extract_date_time(&payload_published_log, "timestampBeforePublishing")?;
let post_publish_at =
extract_date_time(&payload_published_log, "timestampAfterPublishing")?;
let decoded_at_slot_age_ms = payload_published_log["msIntoSlot"]
.as_str()
.and_then(|s| s.parse::<i64>().ok())
.context("failed to parse msIntoSlot as i64")?;

let pre_publish_duration_ms = pre_publish_at
.signed_duration_since(received_at)
.num_milliseconds();

let publish_duration_ms = post_publish_at
.signed_duration_since(pre_publish_at)
.num_milliseconds();

let request_download_duration_ms = decoded_at
.signed_duration_since(received_at)
.num_milliseconds();

let payload_log_stats = PayloadLogStats {
decoded_at_slot_age_ms,
pre_publish_duration_ms,
publish_duration_ms,
request_download_duration_ms,
};

Ok(Some(payload_log_stats))
}
}
}

pub struct LokiClient {
Expand Down

0 comments on commit dfaffd8

Please sign in to comment.