Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(phoenix): Trigger consensus / validatio node alerts only when all are out of sync #27

Merged
merged 6 commits into from
Aug 5, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,418 changes: 742 additions & 676 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 0 additions & 2 deletions src/censorship/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ pub struct AppConfig {
pub chain_data_interval: Duration,
#[serde(deserialize_with = "deserialize_duration_minutes")]
pub chain_data_batch_size: Duration,
#[serde(deserialize_with = "deserialize_duration_minutes")]
pub block_production_interval: Duration,
}

lazy_static! {
Expand Down
1 change: 0 additions & 1 deletion src/censorship/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ pub struct DeliveredPayload {
pub block_hash: String,
pub builder_pubkey: String,
pub proposer_pubkey: String,
pub proposer_fee_recipient: String,
pub value: String,
}

Expand Down
3 changes: 0 additions & 3 deletions src/censorship/relay/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ struct DeliveredPayloadResponse {
block_hash: String,
builder_pubkey: String,
proposer_pubkey: String,
proposer_fee_recipient: String,
value: String,
}

Expand Down Expand Up @@ -49,7 +48,6 @@ impl RelayApi for RelayId {
block_hash,
builder_pubkey,
proposer_pubkey,
proposer_fee_recipient,
value,
}| DeliveredPayload {
relay_id: self.clone(),
Expand All @@ -58,7 +56,6 @@ impl RelayApi for RelayId {
block_hash,
builder_pubkey,
proposer_pubkey,
proposer_fee_recipient,
value,
},
)
Expand Down
25 changes: 20 additions & 5 deletions src/phoenix/consensus_node.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,28 @@
use anyhow::{anyhow, Result};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use tracing::{error, info};
use tracing::{error, info, warn};

use super::{env::APP_CONFIG, PhoenixMonitor};
use super::{
alerts::{
telegram::{TelegramAlerts, TelegramSafeAlert},
SendAlert,
},
env::APP_CONFIG,
PhoenixMonitor,
};
use crate::beacon_api::BeaconApi;

pub struct ConsensusNodeMonitor {
beacon_api: BeaconApi,
telegram_alerts: TelegramAlerts,
}

impl ConsensusNodeMonitor {
pub fn new() -> Self {
Self {
beacon_api: BeaconApi::new(&APP_CONFIG.consensus_nodes),
telegram_alerts: TelegramAlerts::new(),
}
}

Expand All @@ -35,11 +44,17 @@ impl ConsensusNodeMonitor {
let synced: Vec<&bool> = results.iter().filter(|is_synced| **is_synced).collect();

info!("{}/{} consensus nodes synced", synced.len(), results.len());
let num_out_of_sync = results.len() - synced.len();

if synced.len() == APP_CONFIG.consensus_nodes.len() {
Ok(Utc::now())
if num_out_of_sync > 1 {
Err(anyhow!("all consensus nodes out of sync"))
ckoopmann marked this conversation as resolved.
Show resolved Hide resolved
} else {
Err(anyhow!("one or more consensus nodes out of sync"))
if num_out_of_sync == 1 {
warn!("one consensus node is out of sync");
let message = TelegramSafeAlert::new("one consensus node is out of sync");
self.telegram_alerts.send_warning(message).await;
}
Ok(Utc::now())
}
}
}
Expand Down
5 changes: 1 addition & 4 deletions src/phoenix/demotion_monitor.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use anyhow::Result;
use chrono::{DateTime, TimeZone, Utc};
use chrono::{DateTime, Utc};
use indoc::formatdoc;
use itertools::Itertools;
use sqlx::{PgPool, Row};
Expand All @@ -21,7 +21,6 @@ use super::{

#[derive(Debug, Clone)]
pub struct BuilderDemotion {
pub inserted_at: DateTime<Utc>,
pub builder_pubkey: String,
pub builder_id: Option<String>,
pub slot: i64,
Expand All @@ -36,7 +35,6 @@ pub async fn get_builder_demotions(
let query = format!(
"
SELECT
bd.inserted_at,
bd.builder_pubkey,
bb.builder_id,
bd.slot,
Expand All @@ -59,7 +57,6 @@ pub async fn get_builder_demotions(
.map(|rows| {
rows.iter()
.map(|row| BuilderDemotion {
inserted_at: Utc.from_utc_datetime(&row.get("inserted_at")),
builder_pubkey: row.get("builder_pubkey"),
builder_id: row.try_get("builder_id").ok(),
slot: row.get("slot"),
Expand Down
9 changes: 7 additions & 2 deletions src/phoenix/inclusion_monitor/loki_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,13 @@ impl LokiClient {
pub async fn error_messages(&self, slot: i64) -> anyhow::Result<Vec<String>> {
let query = format!(r#"{{app="payload-api",level="error"}} |= `"slot":"{slot}"`"#);
let slot = Slot(slot as i32);
let start = slot.date_time().timestamp_nanos();
let end = (slot.date_time() + chrono::Duration::seconds(12)).timestamp_nanos();
let start = slot
.date_time()
.timestamp_nanos_opt()
.ok_or_else(|| anyhow::anyhow!("Start Time out of range for nanosecond timestamp"))?;
let end = (slot.date_time() + chrono::Duration::seconds(12))
.timestamp_nanos_opt()
.ok_or_else(|| anyhow::anyhow!("End Time out of range for nanosecond timestamp"))?;

let url = format!("{}/loki/api/v1/query_range", self.server_url);
let url_with_params = Url::parse_with_params(
Expand Down
14 changes: 0 additions & 14 deletions src/phoenix/promotion_monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,17 +169,14 @@ mod tests {
use super::*;
#[test]
fn test_get_eligible_builders_all_eligible() {
let inserted_at = Utc::now();
let demotions = vec![
BuilderDemotion {
inserted_at,
builder_pubkey: "pubkey1".to_string(),
sim_error: "json error: request timeout hit before processing".to_string(),
slot: 1,
builder_id: Some("builder1".to_string()),
},
BuilderDemotion {
inserted_at,
builder_pubkey: "pubkey2".to_string(),
sim_error: "simulation failed: unknown ancestor".to_string(),
slot: 2,
Expand All @@ -194,17 +191,14 @@ mod tests {

#[test]
fn test_get_eligible_builders_none_eligible() {
let inserted_at = Utc::now();
let demotions = vec![
BuilderDemotion {
inserted_at,
builder_pubkey: "pubkey1".to_string(),
sim_error: "invalid error".to_string(),
slot: 1,
builder_id: Some("builder1".to_string()),
},
BuilderDemotion {
inserted_at,
builder_pubkey: "pubkey2".to_string(),
sim_error: "simulation failed: unknown ancestor".to_string(),
slot: 2,
Expand All @@ -220,24 +214,20 @@ mod tests {

#[test]
fn test_get_eligible_builders_some_eligible() {
let inserted_at = Utc::now();
let demotions = vec![
BuilderDemotion {
inserted_at,
builder_pubkey: "pubkey1".to_string(),
sim_error: "json error: request timeout hit before processing".to_string(),
slot: 1,
builder_id: Some("builder1".to_string()),
},
BuilderDemotion {
inserted_at,
builder_pubkey: "pubkey2".to_string(),
sim_error: "invalid error".to_string(),
slot: 2,
builder_id: Some("builder2".to_string()),
},
BuilderDemotion {
inserted_at,
builder_pubkey: "pubkey2".to_string(),
sim_error: "simulation failed: unknown ancestor".to_string(),
slot: 3,
Expand All @@ -253,24 +243,20 @@ mod tests {

#[test]
fn test_same_slot_both_valid_and_invalid() {
let inserted_at = Utc::now();
let demotions = vec![
BuilderDemotion {
inserted_at,
builder_pubkey: "pubkey2".to_string(),
sim_error: "invalid error".to_string(),
slot: 2,
builder_id: Some("builder2".to_string()),
},
BuilderDemotion {
inserted_at,
builder_pubkey: "pubkey1".to_string(),
sim_error: "json error: request timeout hit before processing".to_string(),
slot: 1,
builder_id: Some("builder1".to_string()),
},
BuilderDemotion {
inserted_at,
builder_pubkey: "pubkey2".to_string(),
sim_error: "simulation failed: unknown ancestor".to_string(),
slot: 2,
Expand Down
25 changes: 20 additions & 5 deletions src/phoenix/validation_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,16 @@ use async_trait::async_trait;
use chrono::{DateTime, Utc};
use serde::Deserialize;
use serde_json::json;
use tracing::{error, info};
use tracing::{error, info, warn};

use super::{env::APP_CONFIG, PhoenixMonitor};
use super::{
alerts::{
telegram::{TelegramAlerts, TelegramSafeAlert},
SendAlert,
},
env::APP_CONFIG,
PhoenixMonitor,
};

#[derive(Deserialize)]
struct SyncResponse {
Expand All @@ -25,12 +32,14 @@ async fn get_sync_status(client: &reqwest::Client, url: String) -> reqwest::Resu

pub struct ValidationNodeMonitor {
client: reqwest::Client,
telegram_alerts: TelegramAlerts,
}

impl ValidationNodeMonitor {
pub fn new() -> Self {
Self {
client: reqwest::Client::new(),
telegram_alerts: TelegramAlerts::new(),
}
}

Expand All @@ -52,11 +61,17 @@ impl ValidationNodeMonitor {
let synced: Vec<&bool> = results.iter().filter(|is_synced| **is_synced).collect();

info!("{}/{} validation nodes synced", synced.len(), results.len());
let num_out_of_sync = results.len() - synced.len();

if synced.len() == APP_CONFIG.validation_nodes.len() {
Ok(Utc::now())
if num_out_of_sync > 1 {
Err(anyhow!("all validation nodes out of sync"))
} else {
Err(anyhow!("one or more validation nodes out of sync"))
if num_out_of_sync == 1 {
warn!("one validation node is out of sync");
let message = TelegramSafeAlert::new("one validation node is out of sync");
self.telegram_alerts.send_warning(message).await;
}
Ok(Utc::now())
}
}
}
Expand Down
Loading