Skip to content

Commit

Permalink
feat(inclusion_monitor): add payload metadata
Browse files Browse the repository at this point in the history
Adds details of the missed payload to the alert using our logs.
  • Loading branch information
alextes committed Oct 9, 2023
1 parent 905b2df commit 3c935b5
Show file tree
Hide file tree
Showing 7 changed files with 554 additions and 28 deletions.
11 changes: 9 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ flate2 = "1.0.25"
futures = "0.3.25"
gcp-bigquery-client = { git = "https://github.com/blombern/gcp-bigquery-client.git" }
hex = "0.4.3"
indoc = "2.0.4"
itertools = "0.10.5"
lazy_static = "1"
rand = "0.8.5"
Expand Down
6 changes: 4 additions & 2 deletions src/phoenix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ use crate::phoenix::{
};

use self::{
demotion_monitor::run_demotion_monitor, inclusion_monitor::run_inclusion_monitor,
demotion_monitor::run_demotion_monitor,
inclusion_monitor::{run_inclusion_monitor, LokiClient},
promotion_monitor::run_promotion_monitor,
};

Expand Down Expand Up @@ -220,11 +221,12 @@ async fn run_ops_monitors() -> Result<()> {
&max_retry_duration,
)
.await?;
let loki_client = LokiClient::new(APP_CONFIG.loki_url.clone());

loop {
let canonical_horizon = Utc::now() - Duration::minutes(APP_CONFIG.canonical_wait_minutes);
run_demotion_monitor(&relay_pool, &mev_pool).await?;
run_inclusion_monitor(&relay_pool, &mev_pool, &canonical_horizon).await?;
run_inclusion_monitor(&relay_pool, &mev_pool, &canonical_horizon, &loki_client).await?;
run_promotion_monitor(&relay_pool, &mev_pool, &canonical_horizon).await?;
tokio::time::sleep(Duration::minutes(1).to_std().unwrap()).await;
}
Expand Down
29 changes: 15 additions & 14 deletions src/phoenix/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,26 @@ use crate::env::{deserialize_urls, get_app_config, Env};

#[derive(Deserialize)]
pub struct AppConfig {
pub env: Env,
pub port: u16,
pub database_url: String,
pub relay_database_url: String,
#[serde(deserialize_with = "deserialize_urls")]
pub consensus_nodes: Vec<Url>,
#[serde(deserialize_with = "deserialize_urls")]
pub validation_nodes: Vec<Url>,
pub opsgenie_api_key: String,
pub telegram_api_key: String,
pub telegram_channel_id: String,
#[serde(default = "default_wait")]
pub canonical_wait_minutes: i64,
/// Slot range to check for counting missed slots
#[serde(default = "default_missed_slots_range")]
pub missed_slots_check_range: i64,
#[serde(deserialize_with = "deserialize_urls")]
pub consensus_nodes: Vec<Url>,
pub database_url: String,
pub env: Env,
pub loki_url: String,
/// Minimum number of missed slots per check interval to trigger an alert
#[serde(default = "default_missed_slots_alert_threshold")]
pub missed_slots_alert_threshold: i64,
/// Slot range to check for counting missed slots
#[serde(default = "default_missed_slots_range")]
pub missed_slots_check_range: i64,
pub opsgenie_api_key: String,
pub port: u16,
pub relay_database_url: String,
pub telegram_api_key: String,
pub telegram_channel_id: String,
#[serde(deserialize_with = "deserialize_urls")]
pub validation_nodes: Vec<Url>,
}

fn default_wait() -> i64 {
Expand Down
51 changes: 41 additions & 10 deletions src/phoenix/inclusion_monitor.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
use anyhow::Result;
mod loki_client;

pub use loki_client::LokiClient;

use chrono::{DateTime, TimeZone, Utc};
use indoc::formatdoc;
use reqwest::StatusCode;
use sqlx::{PgPool, Row};
use tracing::{error, info, warn};

use loki_client::PayloadLogStats;

use crate::{
beacon_api::BeaconApi,
env::{ToBeaconExplorerUrl, ToNetwork},
Expand All @@ -27,7 +33,7 @@ async fn get_delivered_payloads(
relay_pool: &PgPool,
start: &DateTime<Utc>,
end: &DateTime<Utc>,
) -> Result<Vec<DeliveredPayload>> {
) -> anyhow::Result<Vec<DeliveredPayload>> {
let query = format!(
"
SELECT
Expand Down Expand Up @@ -64,7 +70,7 @@ async fn insert_missed_slot(
slot_number: &i64,
relayed: &String,
canonical: Option<&String>,
) -> Result<()> {
) -> anyhow::Result<()> {
sqlx::query!(
r#"
INSERT INTO missed_slots (slot_number, relayed_block_hash, canonical_block_hash)
Expand Down Expand Up @@ -96,7 +102,8 @@ pub async fn run_inclusion_monitor(
relay_pool: &PgPool,
mev_pool: &PgPool,
canonical_horizon: &DateTime<Utc>,
) -> Result<()> {
log_client: &LokiClient,
) -> anyhow::Result<()> {
let beacon_api = BeaconApi::new(&APP_CONFIG.consensus_nodes);

let checkpoint = match checkpoint::get_checkpoint(mev_pool, CheckpointId::Inclusion).await? {
Expand Down Expand Up @@ -152,12 +159,36 @@ pub async fn run_inclusion_monitor(

insert_missed_slot(mev_pool, &payload.slot, &payload.block_hash, None).await?;

alert::send_telegram_alert(&format!(
"delivered block not found for slot [{slot}]({url}/slot/{slot})",
slot = payload.slot,
url = explorer_url,
))
.await?;
let PayloadLogStats {
pre_publish_duration_ms,
publish_duration_ms,
received_at_slot_age_ms,
request_download_duration_ms,
} = log_client.payload_logs(&(payload.slot as i32)).await?;

let publish_took_too_long = publish_duration_ms > 1000;
let request_arrived_too_late = request_download_duration_ms > 1000;
let safe_to_ignore = request_arrived_too_late && !publish_took_too_long;

let msg = formatdoc!(
"
delivered block not found for slot
[beaconcha\\.in/slot/{slot}]({explorer_url}/slot/{slot})
```
pre_publish_duration_ms: {pre_publish_duration_ms}
publish_duration_ms: {publish_duration_ms}
received_at_slot_age_ms: {received_at_slot_age_ms}
request_download_duration_ms: {request_download_duration_ms}
safe_to_ignore: {safe_to_ignore}
slot: {slot}
```
",
slot = payload.slot
);

alert::send_telegram_alert(&msg).await?;
} else {
error!(
"error getting block hash for slot {}: {}",
Expand Down
134 changes: 134 additions & 0 deletions src/phoenix/inclusion_monitor/loki_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
use std::str::FromStr;

use anyhow::Context;
use chrono::{DateTime, TimeZone, Utc};
use reqwest::Url;

/// Statistics on payloads requested. Used to determine if a payload which failed to make it
/// on-chain should concern us.
#[derive(Debug)]
pub struct PayloadLogStats {
pub pre_publish_duration_ms: i64,
// The time it took to call our consensus node and have it publish the block.
pub publish_duration_ms: i64,
pub received_at_slot_age_ms: i64,
pub request_download_duration_ms: i64,
}

fn date_time_from_timestamp(
request_finished_log: &serde_json::Value,
key: &str,
) -> anyhow::Result<DateTime<Utc>> {
request_finished_log[key]
.as_str()
.and_then(|timestamp| timestamp.parse::<i64>().ok())
.and_then(|timestamp| Utc.timestamp_millis_opt(timestamp).single())
.with_context(|| format!("failed to parse {key} as timestamp from payload log"))
}

impl FromStr for PayloadLogStats {
type Err = anyhow::Error;

fn from_str(text: &str) -> Result<Self, Self::Err> {
let request_finished_log: serde_json::Value = {
let log_data: serde_json::Value = serde_json::from_str(&text)
.context("failed to parse payload log request body as JSON")?;

// This is the array of parsed log lines and their raw values.
let results = log_data["data"]["result"]
.as_array()
.context("expected at least one log line in payload logs response")?;

results
.iter()
.find(|result| {
let stream = &result["stream"];
let msg = stream["msg"].as_str().unwrap_or("");
msg.contains("request finished")
})
.map(|result| &result["stream"])
.cloned()
.with_context(|| format!("no proposer-api log lines with msg field found"))?
};

let received_at = date_time_from_timestamp(&request_finished_log, "timestampRequestStart")?;
let decoded_at = date_time_from_timestamp(&request_finished_log, "timestampAfterDecode")?;
let pre_publish_at =
date_time_from_timestamp(&request_finished_log, "timestampBeforePublishing")?;
let post_publish_at =
date_time_from_timestamp(&request_finished_log, "timestampAfterPublishing")?;
let received_at_slot_age_ms = request_finished_log["msIntoSlot"]
.as_str()
.and_then(|s| s.parse::<i64>().ok())
.context("failed to parse msIntoSlot as i64")?;

let pre_publish_duration_ms = pre_publish_at
.signed_duration_since(received_at)
.num_milliseconds();

let publish_duration_ms = post_publish_at
.signed_duration_since(pre_publish_at)
.num_milliseconds();

let request_download_duration_ms = decoded_at
.signed_duration_since(received_at)
.num_milliseconds();

let payload_log_stats = PayloadLogStats {
pre_publish_duration_ms,
publish_duration_ms,
received_at_slot_age_ms,
request_download_duration_ms,
};

Ok(payload_log_stats)
}
}

pub struct LokiClient {
client: reqwest::Client,
server_url: String,
}

impl LokiClient {
pub fn new(server_url: String) -> Self {
Self {
client: reqwest::Client::new(),
server_url,
}
}

pub async fn payload_logs(&self, slot: &i32) -> anyhow::Result<PayloadLogStats> {
let query = format!(r#"{{app="proposer-api"}} |= `"slot":{slot}` | json"#);
let since = "15m";

let url = format!("{}/loki/api/v1/query_range", self.server_url);
let url_with_params =
Url::parse_with_params(&url, &[("query", query.as_str()), ("since", since)])?;

let response = self.client.get(url_with_params).send().await?;
let body = response.text().await?;

body.parse::<PayloadLogStats>()
}
}

#[cfg(test)]
mod tests {
use std::{fs::File, io::Read};

use super::*;

#[test]
fn parse_log_response() {
let str = File::open("src/phoenix/inclusion_monitor/test_data/payload_logs_7496729.json")
.map(|mut file| {
let mut str = String::new();
file.read_to_string(&mut str).unwrap();
str
})
.unwrap();

str.parse::<PayloadLogStats>().unwrap();
}
}
Loading

0 comments on commit 3c935b5

Please sign in to comment.