Skip to content

Commit

Permalink
feat(alerts): use safer alerts
Browse files Browse the repository at this point in the history
Adds in retries.
Forces safe messages through indirection.
  • Loading branch information
alextes committed Mar 21, 2024
1 parent 69c082e commit 42b9283
Show file tree
Hide file tree
Showing 10 changed files with 349 additions and 213 deletions.
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,4 @@ tracing-subscriber = { version = "0.3", features = [
"json",
"std",
] }
trait-variant = "0.1.2"
3 changes: 2 additions & 1 deletion src/bin/phoenix-service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@ use anyhow::Result;

#[tokio::main]
pub async fn main() -> Result<()> {
relay_backend::monitor_critical_services().await
relay_backend::monitor_critical_services().await?;
Ok(())
}
42 changes: 42 additions & 0 deletions src/phoenix/alerts/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
mod opsgenie;
pub mod telegram;

use tracing::{debug, error};

use crate::env::Env;

pub use self::telegram::TelegramSafeAlert;

use super::env::APP_CONFIG;

/// Ability to communicate an alert to a dev.
#[trait_variant::make(SendAlert: Send)]
pub trait LocalSendAlert {
async fn send_warning(&self, message: TelegramSafeAlert);
async fn send_alert(&self, message: TelegramSafeAlert);
}

pub async fn send_opsgenie_telegram_alert(message: &str) {
let telegram_alerts = telegram::TelegramAlerts::new();

SendAlert::send_alert(&telegram_alerts, TelegramSafeAlert::new(message)).await;

if APP_CONFIG.env == Env::Prod {
let result_send_opsgenie_alert = opsgenie::send_opsgenie_alert(message).await;
match result_send_opsgenie_alert {
Ok(_) => {
debug!(message, "sent OpsGenie alert");
}
Err(err) => {
error!(?err, "failed to send OpsGenie alert");

let escaped_err = telegram::escape_str(&err.to_string());
let message = {
let message = format!("failed to send OpsGenie alert: {}", escaped_err);
TelegramSafeAlert::from_escaped_string(message)
};
SendAlert::send_alert(&telegram_alerts, message).await;
}
}
}
}
43 changes: 20 additions & 23 deletions src/phoenix/alert.rs → src/phoenix/alerts/opsgenie.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
use anyhow::Result;
use anyhow::bail;
use axum::http::{HeaderMap, HeaderValue};
use reqwest::StatusCode;
use serde::Deserialize;
use serde_json::json;

use crate::{env::Env, phoenix::env::APP_CONFIG};

use super::telegram::{self};
use crate::phoenix::env::APP_CONFIG;

#[derive(Deserialize)]
struct OpsGenieError {
message: String,
}

async fn send_opsgenie_alert(message: &str) -> Result<()> {
pub async fn send_opsgenie_alert(message: &str) -> anyhow::Result<()> {
let mut headers = HeaderMap::new();
let auth_header = format!("GenieKey {}", &APP_CONFIG.opsgenie_api_key);

Expand All @@ -28,28 +27,26 @@ async fn send_opsgenie_alert(message: &str) -> Result<()> {
.send()
.await?;

if res.status() != 202 {
match res.json::<OpsGenieError>().await {
Err(_) => {
panic!("failed to create alarm with OpsGenie")
match res.status() {
StatusCode::ACCEPTED => {
tracing::debug!(message, "sent opsgenie alert");
Ok(())
}
status => match res.json::<OpsGenieError>().await {
Err(err) => {
bail!(
"failed to create alarm with OpsGenie, status: {}, err: {}",
status,
err
)
}
Ok(body) => {
panic!(
"failed to create alarm with OpsGenie, message: {}",
bail!(
"failed to create alarm with OpsGenie, status: {:?}, message: {}",
status,
body.message
)
}
}
} else {
Ok(())
},
}
}

pub async fn send_opsgenie_telegram_alert(message: &str) -> Result<()> {
if APP_CONFIG.env == Env::Prod {
send_opsgenie_alert(message).await?;
}
telegram::send_telegram_alert(message).await?;

Ok(())
}
215 changes: 215 additions & 0 deletions src/phoenix/alerts/telegram.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
use std::fmt;

use anyhow::{anyhow, Result};
use reqwest::StatusCode;

use crate::phoenix::env::APP_CONFIG;

use super::SendAlert;

// Used to escape characters in telegram messages.
// https://core.telegram.org/bots/api#markdownv2-style
pub fn escape_str(input: &str) -> String {
let mut output = String::new();
for c in input.chars() {
match c {
'_' | '*' | '[' | ']' | '(' | ')' | '~' | '`' | '>' | '#' | '+' | '-' | '=' | '|'
| '{' | '}' | '.' | '!' => {
output.push('\\');
}
_ => (),
};
output.push(c);
}
output
}

// Used to escape characters inside markdown code blocks
// https://core.telegram.org/bots/api#markdownv2-style
pub fn escape_code_block(input: &str) -> String {
let mut output = String::new();
for c in input.chars() {
match c {
'`' | '\\' => {
output.push('\\');
}
_ => {}
}
output.push(c);
}
output
}

/// Formats a message to be compatible with the Telegram bot API.
/// Respect escaping as described in: https://core.telegram.org/bots/api#markdownv2-style
/// Respect character limit of 4096.
#[derive(Clone, Debug, PartialEq)]
pub struct TelegramSafeAlert(String);

const TELEGRAM_MAX_MESSAGE_LENGTH: usize = 4096;
// Leave a little room for the escape characters and unknowns.
const TELEGRAM_SAFE_MESSAGE_LENGTH: usize = TELEGRAM_MAX_MESSAGE_LENGTH - 1024;

impl TelegramSafeAlert {
pub fn new(input: &str) -> Self {
let escaped = escape_str(input);
Self::from_escaped_string(escaped)
}

fn slice_to_limit(self) -> Self {
Self(self.0.chars().take(TELEGRAM_SAFE_MESSAGE_LENGTH).collect())
}

pub fn from_escaped_string(input: String) -> Self {
if input.len() > TELEGRAM_SAFE_MESSAGE_LENGTH {
tracing::warn!(
"telegram alert too long, truncating to {} characters",
TELEGRAM_SAFE_MESSAGE_LENGTH
);
Self(input).slice_to_limit()
} else {
Self(input)
}
}
}

impl fmt::Display for TelegramSafeAlert {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}

enum NotificationType {
Warning,
Alert,
}

impl fmt::Display for NotificationType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
NotificationType::Warning => write!(f, "warning"),
NotificationType::Alert => write!(f, "alert"),
}
}
}

#[derive(Clone)]
pub struct TelegramAlerts {
client: reqwest::Client,
}

impl Default for TelegramAlerts {
fn default() -> Self {
Self::new()
}
}

impl TelegramAlerts {
pub fn new() -> Self {
Self {
client: reqwest::Client::new(),
}
}

async fn send_telegram_message(
&self,
notification_type: NotificationType,
message: &str,
) -> Result<()> {
let channel_id = match notification_type {
NotificationType::Warning => APP_CONFIG.telegram_warnings_channel_id.as_str(),
NotificationType::Alert => APP_CONFIG.telegram_alerts_channel_id.as_str(),
};

let url = format!(
"https://api.telegram.org/bot{}/sendMessage",
APP_CONFIG.telegram_api_key
);

let response = self
.client
.get(&url)
.query(&[
("chat_id", channel_id),
("text", message),
("parse_mode", "MarkdownV2"),
("disable_web_page_preview", "true"),
])
.send()
.await?;

match response.status() {
StatusCode::OK => {
tracing::debug!(%notification_type, message, "sent telegram message");
Ok(())
}
StatusCode::BAD_REQUEST => {
let body = response.text().await?;
Err(anyhow!("failed to send telegram message: {}", body))
}
_ => Err(anyhow!(
"failed to send telegram message, status: {:?}",
response.status()
)),
}
}

async fn send_telegram_warning(&self, message: &TelegramSafeAlert) -> anyhow::Result<()> {
self.send_telegram_message(NotificationType::Warning, &message.0)
.await
}

async fn send_telegram_alert(&self, message: &TelegramSafeAlert) -> anyhow::Result<()> {
self.send_telegram_message(NotificationType::Alert, &message.0)
.await
}

/// Allows to send a telegram alert, with retry, and a simple fallback in case the passed message
/// fails to be delivered. Telegram has very sensitive rules about escaping. We may also at times
/// be rate limited.
async fn send_telegram_alert_with_fallback(&self, message: TelegramSafeAlert) {
for index in 0..3 {
let message = if index == 2 {
// Last attempt. This message intentionally does not contain *any* special
// characters as many require escaping, and is within the character limit.
TelegramSafeAlert::new("failed to send telegram alert please check logs")
} else {
message.clone()
};

// We may be timing out, if this is not our first attempt, wait a bit.
if index != 0 {
tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
};

let send_result = self.send_telegram_alert(&message).await;

match send_result {
Ok(_) => {
tracing::debug!(%message, "sent telegram alert");
return;
}
Err(err) => {
tracing::error!(
attempt = index,
%message,
%err,
"failed to send telegram alert"
);
}
}
}
}
}

impl SendAlert for TelegramAlerts {
async fn send_alert(&self, message: TelegramSafeAlert) {
self.send_telegram_alert_with_fallback(message).await;
}
async fn send_warning(&self, message: TelegramSafeAlert) {
let result = self.send_telegram_warning(&message).await;
if let Err(err) = result {
tracing::error!(?err, "failed to send telegram warning");
}
}
}
Loading

0 comments on commit 42b9283

Please sign in to comment.