Skip to content

Commit

Permalink
fix: abandon failed Webhooks when status is not included (#717)
Browse files Browse the repository at this point in the history
  • Loading branch information
michael1011 authored Nov 10, 2024
1 parent 1f8a2e1 commit 54223af
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 22 deletions.
4 changes: 3 additions & 1 deletion boltzr/src/grpc/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,9 @@ where
.call_webhook(&hook, &params.status)
.await
{
Ok(ok) => Ok(Response::new(SendWebHookResponse { ok })),
Ok(res) => Ok(Response::new(SendWebHookResponse {
ok: res != crate::webhook::caller::CallResult::Failed,
})),
Err(err) => Err(Status::new(Code::Internal, err.to_string())),
}
}
Expand Down
96 changes: 75 additions & 21 deletions boltzr/src/webhook/caller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@ const DEFAULT_RETRY_INTERVAL: u64 = 60;

const MAX_URL_LENGTH: usize = 250;

#[derive(Debug, Clone, PartialEq)]
pub enum CallResult {
Success,
Failed,
NotIncluded,
}

#[derive(Debug, Clone)]
pub enum UrlError {
MoreThanMaxLen,
Expand Down Expand Up @@ -121,16 +128,16 @@ impl Caller {
}
}

#[instrument(skip(self, hook, status))]
#[instrument(name = "Caller::call_webhook", skip(self, hook, status))]
pub async fn call_webhook(
&self,
hook: &WebHook,
status: &String,
) -> Result<bool, Box<dyn Error>> {
) -> Result<CallResult, Box<dyn Error>> {
if let Some(status_include) = &hook.status {
if !status_include.contains(status) {
debug!("Not calling WebHook for swap {} because status update {} is not in include list", hook.id, status);
return Ok(true);
return Ok(CallResult::NotIncluded);
}
}

Expand Down Expand Up @@ -178,7 +185,7 @@ impl Caller {
self.retry_count.remove(&hook.id);
self.web_hook_helper.set_state(&hook.id, WebHookState::Ok)?;

Ok(true)
Ok(CallResult::Success)
}
Some(err) => {
warn!("Request for swap {} failed: {}", hook.id, err);
Expand All @@ -190,12 +197,12 @@ impl Caller {
self.web_hook_helper
.set_state(&hook.id, WebHookState::Failed)?;

Ok(false)
Ok(CallResult::Failed)
}
}
}

#[instrument(skip(self))]
#[instrument(name = "Caller::retry_calls", skip(self))]
async fn retry_calls(&self) -> Result<(), Box<dyn Error>> {
let to_retry = self.web_hook_helper.get_by_state(WebHookState::Failed)?;

Expand Down Expand Up @@ -232,7 +239,7 @@ impl Caller {
Ok(())
}

#[instrument(skip(self))]
#[instrument(name = "Caller::retry_call", skip(self))]
async fn retry_call(&self, hook: &WebHook) -> Result<(), Box<dyn Error>> {
let status = self.web_hook_helper.get_swap_status(&hook.id)?;

Expand All @@ -243,9 +250,9 @@ impl Caller {
status,
hook.url
);
let ok = self.call_webhook(hook, &status.to_string()).await?;
let res = self.call_webhook(hook, &status.to_string()).await?;

if ok {
if res == CallResult::Success {
self.retry_count.remove(&hook.id);
return Ok(());
}
Expand All @@ -261,7 +268,7 @@ impl Caller {
failed_count, self.max_retries, hook.id
);

if failed_count >= self.max_retries {
if res == CallResult::NotIncluded || failed_count >= self.max_retries {
info!(
"Abandoning WebHook call for swap {} with status {}",
hook.id, status,
Expand Down Expand Up @@ -319,7 +326,7 @@ mod caller_test {
use crate::db::helpers::web_hook::WebHookHelper;
use crate::db::helpers::QueryResponse;
use crate::db::models::{WebHook, WebHookState};
use crate::webhook::caller::{Caller, Config, UrlError, MAX_URL_LENGTH};
use crate::webhook::caller::{CallResult, Caller, Config, UrlError, MAX_URL_LENGTH};
use crate::webhook::types::{WebHookCallData, WebHookCallParams, WebHookEvent};
use axum::http::StatusCode;
use axum::response::IntoResponse;
Expand Down Expand Up @@ -373,7 +380,7 @@ mod caller_test {
let status = "some.update";

caller.retry_count.insert(id.to_string(), 21);
let ok = caller
let res = caller
.call_webhook(
&WebHook {
id: id.to_string(),
Expand All @@ -386,7 +393,7 @@ mod caller_test {
)
.await
.unwrap();
assert!(ok);
assert_eq!(res, CallResult::Success);

assert!(caller.retry_count.get(&id.to_string()).is_none());

Expand Down Expand Up @@ -427,7 +434,7 @@ mod caller_test {

let status = "some.update";
let url = format!("http://127.0.0.1:{}", 10002);
let ok = caller
let res = caller
.call_webhook(
&WebHook {
id: id.to_string(),
Expand All @@ -440,7 +447,7 @@ mod caller_test {
)
.await
.unwrap();
assert!(!ok);
assert_eq!(res, CallResult::Failed);
}

#[tokio::test]
Expand Down Expand Up @@ -468,7 +475,7 @@ mod caller_test {

let status = "some.update";
let url = format!("http://127.0.0.1:{}/fail", port);
let ok = caller
let res = caller
.call_webhook(
&WebHook {
id: id.to_string(),
Expand All @@ -481,7 +488,7 @@ mod caller_test {
)
.await
.unwrap();
assert!(!ok);
assert_eq!(res, CallResult::Failed);

assert_eq!(received_calls.lock().unwrap().len(), 1);
assert_eq!(
Expand Down Expand Up @@ -523,7 +530,7 @@ mod caller_test {

let status = "some.update";
let url = format!("http://127.0.0.1:{}", port);
let ok = caller
let res = caller
.call_webhook(
&WebHook {
id: id.to_string(),
Expand All @@ -536,10 +543,10 @@ mod caller_test {
)
.await
.unwrap();
assert!(ok);
assert_eq!(res, CallResult::NotIncluded);
assert_eq!(received_calls.lock().unwrap().len(), 0);

let ok = caller
let res = caller
.call_webhook(
&WebHook {
id: id.to_string(),
Expand All @@ -552,7 +559,7 @@ mod caller_test {
)
.await
.unwrap();
assert!(ok);
assert_eq!(res, CallResult::Success);
assert_eq!(received_calls.lock().unwrap().len(), 1);

cancel_token.cancel();
Expand Down Expand Up @@ -775,6 +782,53 @@ mod caller_test {
assert!(caller.retry_count.get(&id.to_string()).is_none());
}

#[tokio::test]
async fn test_retry_calls_not_included() {
let mut web_hook_helper = make_mock_hook_helper();

let id = "included";
let status = "not.included";
let url = format!("http://127.0.0.1:{}", 1234);

web_hook_helper
.expect_get_by_state()
.with(predicate::eq(WebHookState::Failed))
.returning(move |_| {
Ok(vec![WebHook {
url: url.clone(),
id: id.to_string(),
hash_swap_id: false,
status: Some(vec!["invoice.set".to_string()]),
state: WebHookState::Failed.as_ref().to_string(),
}])
});

web_hook_helper
.expect_get_swap_status()
.returning(move |_| Ok(Some(status.to_string())));

web_hook_helper
.expect_set_state()
.with(predicate::eq(id), predicate::eq(WebHookState::Abandoned))
.returning(move |_, _| Ok(1));

let caller_cancel = CancellationToken::new();
let max_retries = 2;
let caller = Caller::new(
caller_cancel.clone(),
Config {
max_retries: Some(max_retries),
retry_interval: Some(5),
request_timeout: Some(5),
},
Box::new(web_hook_helper),
);

assert!(caller.retry_count.get(&id.to_string()).is_none());
caller.retry_calls().await.unwrap();
assert!(caller.retry_count.get(&id.to_string()).is_none());
}

#[test]
fn test_validate_url_valid() {
assert!(Caller::validate_url("https://bol.tz").is_none());
Expand Down

0 comments on commit 54223af

Please sign in to comment.