Skip to content

Commit

Permalink
feat(inclusion_monitor): handle no logs available
Browse files Browse the repository at this point in the history
Able to format a message even if no payload logs are found for a given
slot.
  • Loading branch information
alextes committed Oct 10, 2023
1 parent 5d1d7a3 commit eeda8be
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 89 deletions.
81 changes: 53 additions & 28 deletions src/phoenix/inclusion_monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,57 @@ async fn get_missed_slot_count(
.map_err(Into::into)
}

fn format_delivered_not_found_message(log_stats: Option<PayloadLogStats>, slot: &i64) -> String {
let explorer_url = APP_CONFIG.env.to_beacon_explorer_url();

match log_stats {
Some(stats) => {
let PayloadLogStats {
decoded_at_slot_age_ms,
pre_publish_duration_ms,
publish_duration_ms,
request_download_duration_ms,
} = stats;

let publish_took_too_long = publish_duration_ms > 1000;
let request_arrived_too_late = request_download_duration_ms > 1000;
let safe_to_ignore = request_arrived_too_late && !publish_took_too_long;

formatdoc!(
"
delivered block not found for slot
[beaconcha\\.in/slot/{slot}]({explorer_url}/slot/{slot})
```
decoded_at_slot_age_ms: {decoded_at_slot_age_ms}
pre_publish_duration_ms: {pre_publish_duration_ms}
publish_duration_ms: {publish_duration_ms}
request_download_duration_ms: {request_download_duration_ms}
safe_to_ignore: {safe_to_ignore}
slot: {slot}
```
",
)
}
None => {
error!("no payload log stats found for slot {}", slot);

formatdoc!(
"
delivered block not found for slot
[beaconcha\\.in/slot/{slot}]({explorer_url}/slot/{slot})
```
log based stats not found, check logs for details
```
",
)
}
}
}

pub async fn run_inclusion_monitor(
relay_pool: &PgPool,
mev_pool: &PgPool,
Expand Down Expand Up @@ -163,34 +214,8 @@ pub async fn run_inclusion_monitor(

insert_missed_slot(mev_pool, &payload.slot, &payload.block_hash, None).await?;

let PayloadLogStats {
decoded_at_slot_age_ms,
pre_publish_duration_ms,
publish_duration_ms,
request_download_duration_ms,
} = log_client.payload_logs(&(payload.slot as i32)).await?;

let publish_took_too_long = publish_duration_ms > 1000;
let request_arrived_too_late = request_download_duration_ms > 1000;
let safe_to_ignore = request_arrived_too_late && !publish_took_too_long;

let msg = formatdoc!(
"
delivered block not found for slot
[beaconcha\\.in/slot/{slot}]({explorer_url}/slot/{slot})
```
decoded_at_slot_age_ms: {decoded_at_slot_age_ms}
pre_publish_duration_ms: {pre_publish_duration_ms}
publish_duration_ms: {publish_duration_ms}
request_download_duration_ms: {request_download_duration_ms}
safe_to_ignore: {safe_to_ignore}
slot: {slot}
```
",
slot = payload.slot
);
let log_stats = log_client.payload_logs(&(payload.slot as i32)).await?;
let msg = format_delivered_not_found_message(log_stats, &payload.slot);

alert::send_telegram_alert(&msg).await?;
} else {
Expand Down
121 changes: 60 additions & 61 deletions src/phoenix/inclusion_monitor/loki_client.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::str::FromStr;

use anyhow::Context;
use chrono::{DateTime, TimeZone, Utc};
use reqwest::Url;
Expand All @@ -26,63 +24,64 @@ fn date_time_from_timestamp(
.with_context(|| format!("failed to parse {key} as timestamp from payload log"))
}

impl FromStr for PayloadLogStats {
type Err = anyhow::Error;

fn from_str(text: &str) -> Result<Self, Self::Err> {
let request_finished_log: serde_json::Value = {
let log_data: serde_json::Value = serde_json::from_str(text)
.context("failed to parse payload log request body as JSON")?;

// This is the array of parsed log lines and their raw values.
let results = log_data["data"]["result"]
.as_array()
.context("expected at least one log line in payload logs response")?;

results
.iter()
.find(|result| {
let stream = &result["stream"];
let msg = stream["msg"].as_str().unwrap_or("");
msg.contains("request finished")
})
.map(|result| &result["stream"])
.cloned()
.context("no proposer-api log lines with msg field found")?
};

let received_at = date_time_from_timestamp(&request_finished_log, "timestampRequestStart")?;
let decoded_at = date_time_from_timestamp(&request_finished_log, "timestampAfterDecode")?;
let pre_publish_at =
date_time_from_timestamp(&request_finished_log, "timestampBeforePublishing")?;
let post_publish_at =
date_time_from_timestamp(&request_finished_log, "timestampAfterPublishing")?;
let decoded_at_slot_age_ms = request_finished_log["msIntoSlot"]
.as_str()
.and_then(|s| s.parse::<i64>().ok())
.context("failed to parse msIntoSlot as i64")?;

let pre_publish_duration_ms = pre_publish_at
.signed_duration_since(received_at)
.num_milliseconds();

let publish_duration_ms = post_publish_at
.signed_duration_since(pre_publish_at)
.num_milliseconds();

let request_download_duration_ms = decoded_at
.signed_duration_since(received_at)
.num_milliseconds();

let payload_log_stats = PayloadLogStats {
decoded_at_slot_age_ms,
pre_publish_duration_ms,
publish_duration_ms,
request_download_duration_ms,
};

Ok(payload_log_stats)
}
fn parse_log_response(text: &str) -> anyhow::Result<Option<PayloadLogStats>> {
let request_finished_log: serde_json::Value = {
let log_data: serde_json::Value = serde_json::from_str(text)
.context("failed to parse payload log request body as JSON")?;

// This is the array of parsed log lines and their raw values.
let results = log_data["data"]["result"]
.as_array()
.context("expected at least one log line in payload logs response")?;

let log_data = results
.iter()
.find(|result| {
let stream = &result["stream"];
let msg = stream["msg"].as_str().unwrap_or("");
msg.contains("request finished")
})
.map(|result| &result["stream"])
.cloned();

match log_data {
Some(log_data) => log_data,
// If there are no logs, we stop here.
None => return Ok(None),
}
};

let received_at = date_time_from_timestamp(&request_finished_log, "timestampRequestStart")?;
let decoded_at = date_time_from_timestamp(&request_finished_log, "timestampAfterDecode")?;
let pre_publish_at =
date_time_from_timestamp(&request_finished_log, "timestampBeforePublishing")?;
let post_publish_at =
date_time_from_timestamp(&request_finished_log, "timestampAfterPublishing")?;
let decoded_at_slot_age_ms = request_finished_log["msIntoSlot"]
.as_str()
.and_then(|s| s.parse::<i64>().ok())
.context("failed to parse msIntoSlot as i64")?;

let pre_publish_duration_ms = pre_publish_at
.signed_duration_since(received_at)
.num_milliseconds();

let publish_duration_ms = post_publish_at
.signed_duration_since(pre_publish_at)
.num_milliseconds();

let request_download_duration_ms = decoded_at
.signed_duration_since(received_at)
.num_milliseconds();

let payload_log_stats = PayloadLogStats {
decoded_at_slot_age_ms,
pre_publish_duration_ms,
publish_duration_ms,
request_download_duration_ms,
};

Ok(Some(payload_log_stats))
}

pub struct LokiClient {
Expand All @@ -98,7 +97,7 @@ impl LokiClient {
}
}

pub async fn payload_logs(&self, slot: &i32) -> anyhow::Result<PayloadLogStats> {
pub async fn payload_logs(&self, slot: &i32) -> anyhow::Result<Option<PayloadLogStats>> {
let query = format!(r#"{{app="proposer-api"}} |= `"slot":{slot}` | json"#);
let since = "24h";

Expand All @@ -109,7 +108,7 @@ impl LokiClient {
let response = self.client.get(url_with_params).send().await?;
let body = response.text().await?;

body.parse::<PayloadLogStats>()
parse_log_response(&body)
}
}

Expand Down

0 comments on commit eeda8be

Please sign in to comment.