diff --git a/src/phoenix/inclusion_monitor.rs b/src/phoenix/inclusion_monitor.rs index e780fd6..4599663 100644 --- a/src/phoenix/inclusion_monitor.rs +++ b/src/phoenix/inclusion_monitor.rs @@ -102,6 +102,57 @@ async fn get_missed_slot_count( .map_err(Into::into) } +fn format_delivered_not_found_message(log_stats: Option, slot: &i64) -> String { + let explorer_url = APP_CONFIG.env.to_beacon_explorer_url(); + + match log_stats { + Some(stats) => { + let PayloadLogStats { + decoded_at_slot_age_ms, + pre_publish_duration_ms, + publish_duration_ms, + request_download_duration_ms, + } = stats; + + let publish_took_too_long = publish_duration_ms > 1000; + let request_arrived_too_late = request_download_duration_ms > 1000; + let safe_to_ignore = request_arrived_too_late && !publish_took_too_long; + + formatdoc!( + " + delivered block not found for slot + + [beaconcha\\.in/slot/{slot}]({explorer_url}/slot/{slot}) + + ``` + decoded_at_slot_age_ms: {decoded_at_slot_age_ms} + pre_publish_duration_ms: {pre_publish_duration_ms} + publish_duration_ms: {publish_duration_ms} + request_download_duration_ms: {request_download_duration_ms} + safe_to_ignore: {safe_to_ignore} + slot: {slot} + ``` + ", + ) + } + None => { + error!("no payload log stats found for slot {}", slot); + + formatdoc!( + " + delivered block not found for slot + + [beaconcha\\.in/slot/{slot}]({explorer_url}/slot/{slot}) + + ``` + log based stats not found, check logs for details + ``` + ", + ) + } + } +} + pub async fn run_inclusion_monitor( relay_pool: &PgPool, mev_pool: &PgPool, @@ -163,34 +214,8 @@ pub async fn run_inclusion_monitor( insert_missed_slot(mev_pool, &payload.slot, &payload.block_hash, None).await?; - let PayloadLogStats { - decoded_at_slot_age_ms, - pre_publish_duration_ms, - publish_duration_ms, - request_download_duration_ms, - } = log_client.payload_logs(&(payload.slot as i32)).await?; - - let publish_took_too_long = publish_duration_ms > 1000; - let request_arrived_too_late = request_download_duration_ms > 1000; - let safe_to_ignore = request_arrived_too_late && !publish_took_too_long; - - let msg = formatdoc!( - " - delivered block not found for slot - - [beaconcha\\.in/slot/{slot}]({explorer_url}/slot/{slot}) - - ``` - decoded_at_slot_age_ms: {decoded_at_slot_age_ms} - pre_publish_duration_ms: {pre_publish_duration_ms} - publish_duration_ms: {publish_duration_ms} - request_download_duration_ms: {request_download_duration_ms} - safe_to_ignore: {safe_to_ignore} - slot: {slot} - ``` - ", - slot = payload.slot - ); + let log_stats = log_client.payload_logs(&(payload.slot as i32)).await?; + let msg = format_delivered_not_found_message(log_stats, &payload.slot); alert::send_telegram_alert(&msg).await?; } else { diff --git a/src/phoenix/inclusion_monitor/loki_client.rs b/src/phoenix/inclusion_monitor/loki_client.rs index 9ff1925..b1f40a0 100644 --- a/src/phoenix/inclusion_monitor/loki_client.rs +++ b/src/phoenix/inclusion_monitor/loki_client.rs @@ -1,5 +1,3 @@ -use std::str::FromStr; - use anyhow::Context; use chrono::{DateTime, TimeZone, Utc}; use reqwest::Url; @@ -26,63 +24,64 @@ fn date_time_from_timestamp( .with_context(|| format!("failed to parse {key} as timestamp from payload log")) } -impl FromStr for PayloadLogStats { - type Err = anyhow::Error; - - fn from_str(text: &str) -> Result { - let request_finished_log: serde_json::Value = { - let log_data: serde_json::Value = serde_json::from_str(text) - .context("failed to parse payload log request body as JSON")?; - - // This is the array of parsed log lines and their raw values. - let results = log_data["data"]["result"] - .as_array() - .context("expected at least one log line in payload logs response")?; - - results - .iter() - .find(|result| { - let stream = &result["stream"]; - let msg = stream["msg"].as_str().unwrap_or(""); - msg.contains("request finished") - }) - .map(|result| &result["stream"]) - .cloned() - .context("no proposer-api log lines with msg field found")? - }; - - let received_at = date_time_from_timestamp(&request_finished_log, "timestampRequestStart")?; - let decoded_at = date_time_from_timestamp(&request_finished_log, "timestampAfterDecode")?; - let pre_publish_at = - date_time_from_timestamp(&request_finished_log, "timestampBeforePublishing")?; - let post_publish_at = - date_time_from_timestamp(&request_finished_log, "timestampAfterPublishing")?; - let decoded_at_slot_age_ms = request_finished_log["msIntoSlot"] - .as_str() - .and_then(|s| s.parse::().ok()) - .context("failed to parse msIntoSlot as i64")?; - - let pre_publish_duration_ms = pre_publish_at - .signed_duration_since(received_at) - .num_milliseconds(); - - let publish_duration_ms = post_publish_at - .signed_duration_since(pre_publish_at) - .num_milliseconds(); - - let request_download_duration_ms = decoded_at - .signed_duration_since(received_at) - .num_milliseconds(); - - let payload_log_stats = PayloadLogStats { - decoded_at_slot_age_ms, - pre_publish_duration_ms, - publish_duration_ms, - request_download_duration_ms, - }; - - Ok(payload_log_stats) - } +fn parse_log_response(text: &str) -> anyhow::Result> { + let request_finished_log: serde_json::Value = { + let log_data: serde_json::Value = serde_json::from_str(text) + .context("failed to parse payload log request body as JSON")?; + + // This is the array of parsed log lines and their raw values. + let results = log_data["data"]["result"] + .as_array() + .context("expected at least one log line in payload logs response")?; + + let log_data = results + .iter() + .find(|result| { + let stream = &result["stream"]; + let msg = stream["msg"].as_str().unwrap_or(""); + msg.contains("request finished") + }) + .map(|result| &result["stream"]) + .cloned(); + + match log_data { + Some(log_data) => log_data, + // If there are no logs, we stop here. + None => return Ok(None), + } + }; + + let received_at = date_time_from_timestamp(&request_finished_log, "timestampRequestStart")?; + let decoded_at = date_time_from_timestamp(&request_finished_log, "timestampAfterDecode")?; + let pre_publish_at = + date_time_from_timestamp(&request_finished_log, "timestampBeforePublishing")?; + let post_publish_at = + date_time_from_timestamp(&request_finished_log, "timestampAfterPublishing")?; + let decoded_at_slot_age_ms = request_finished_log["msIntoSlot"] + .as_str() + .and_then(|s| s.parse::().ok()) + .context("failed to parse msIntoSlot as i64")?; + + let pre_publish_duration_ms = pre_publish_at + .signed_duration_since(received_at) + .num_milliseconds(); + + let publish_duration_ms = post_publish_at + .signed_duration_since(pre_publish_at) + .num_milliseconds(); + + let request_download_duration_ms = decoded_at + .signed_duration_since(received_at) + .num_milliseconds(); + + let payload_log_stats = PayloadLogStats { + decoded_at_slot_age_ms, + pre_publish_duration_ms, + publish_duration_ms, + request_download_duration_ms, + }; + + Ok(Some(payload_log_stats)) } pub struct LokiClient { @@ -98,7 +97,7 @@ impl LokiClient { } } - pub async fn payload_logs(&self, slot: &i32) -> anyhow::Result { + pub async fn payload_logs(&self, slot: &i32) -> anyhow::Result> { let query = format!(r#"{{app="proposer-api"}} |= `"slot":{slot}` | json"#); let since = "24h"; @@ -109,7 +108,7 @@ impl LokiClient { let response = self.client.get(url_with_params).send().await?; let body = response.text().await?; - body.parse::() + parse_log_response(&body) } }