From af69ce7f0cf68d92bd0ed8d6e5e516ddf639eaed Mon Sep 17 00:00:00 2001 From: janskiba Date: Thu, 8 Feb 2024 13:07:17 +0000 Subject: [PATCH] fix: Dont retry expired task --- src/beam.rs | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/src/beam.rs b/src/beam.rs index 900ac1d..1d29c64 100644 --- a/src/beam.rs +++ b/src/beam.rs @@ -1,6 +1,7 @@ use std::time::Duration; use beam_lib::{TaskResult, BeamClient, BlockingOptions, MsgId, TaskRequest, RawString}; +use http::StatusCode; use once_cell::sync::Lazy; use serde::Serialize; use tracing::{debug, warn}; @@ -85,20 +86,28 @@ pub async fn retrieve_tasks() -> Result>, FocusError> { .map_err(FocusError::UnableToRetrieveTasksHttp) } -pub async fn answer_task(task_id: MsgId, result: &TaskResult) -> Result { +pub async fn answer_task(task_id: MsgId, result: &TaskResult) -> Result<(), FocusError> { debug!("Answer task with id: {task_id}"); BEAM_CLIENT.put_result(result, &task_id) .await - .map_err(FocusError::UnableToAnswerTask) + .map(|_| ()) + .or_else(|e| match e { + beam_lib::BeamError::UnexpectedStatus(s) if s == StatusCode::NOT_FOUND => Ok(()), + other => Err(FocusError::UnableToAnswerTask(other)) + }) } -pub async fn fail_task(task: &TaskRequest, body: impl Into) -> Result { +pub async fn fail_task(task: &TaskRequest, body: impl Into) -> Result<(), FocusError> { let body = body.into(); warn!("Reporting failed task with id {}: {}", task.id, body); let result = beam_result::perm_failed(CONFIG.beam_app_id_long.clone(), vec![task.from.clone()], task.id, body); BEAM_CLIENT.put_result(&result, &task.id) .await - .map_err(FocusError::UnableToAnswerTask) + .map(|_| ()) + .or_else(|e| match e { + beam_lib::BeamError::UnexpectedStatus(s) if s == StatusCode::NOT_FOUND => Ok(()), + other => Err(FocusError::UnableToAnswerTask(other)) + }) } pub async fn claim_task(task: &TaskRequest) -> Result {