Skip to content

Commit

Permalink
Merge pull request #115 from samply/merge_main_into_develop_because_c…
Browse files Browse the repository at this point in the history
…annot_push_to_develop

merge main into develop
  • Loading branch information
enola-dkfz authored Feb 14, 2024
2 parents fb693e5 + 6108d3e commit 69217ab
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 5 deletions.
19 changes: 14 additions & 5 deletions src/beam.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::time::Duration;

use beam_lib::{TaskResult, BeamClient, BlockingOptions, MsgId, TaskRequest, RawString};
use http::StatusCode;
use once_cell::sync::Lazy;
use serde::Serialize;
use tracing::{debug, warn};
Expand Down Expand Up @@ -85,20 +86,28 @@ pub async fn retrieve_tasks() -> Result<Vec<TaskRequest<String>>, FocusError> {
.map_err(FocusError::UnableToRetrieveTasksHttp)
}

pub async fn answer_task<T: Serialize + 'static>(result: &TaskResult<T>) -> Result<bool, FocusError> {
pub async fn answer_task<T: Serialize + 'static>(result: &TaskResult<T>) -> Result<(), FocusError> {
debug!("Answer task with id: {}", result.task);
BEAM_CLIENT.put_result(result, &result.task)
.await
.map_err(FocusError::UnableToAnswerTask)
.await
.map(|_| ())
.or_else(|e| match e {
beam_lib::BeamError::UnexpectedStatus(s) if s == StatusCode::NOT_FOUND => Ok(()),
other => Err(FocusError::UnableToAnswerTask(other))
})
}

pub async fn fail_task<T>(task: &TaskRequest<T>, body: impl Into<String>) -> Result<bool, FocusError> {
pub async fn fail_task<T>(task: &TaskRequest<T>, body: impl Into<String>) -> Result<(), FocusError> {
let body = body.into();
warn!("Reporting failed task with id {}: {}", task.id, body);
let result = beam_result::perm_failed(CONFIG.beam_app_id_long.clone(), vec![task.from.clone()], task.id, body);
BEAM_CLIENT.put_result(&result, &task.id)
.await
.map_err(FocusError::UnableToAnswerTask)
.map(|_| ())
.or_else(|e| match e {
beam_lib::BeamError::UnexpectedStatus(s) if s == StatusCode::NOT_FOUND => Ok(()),
other => Err(FocusError::UnableToAnswerTask(other))
})
}

pub async fn claim_task<T>(task: &TaskRequest<T>) -> Result<bool, FocusError> {
Expand Down
9 changes: 9 additions & 0 deletions src/task_processing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,15 @@ async fn process_task(
execute: true,
});

if metadata.project == "focus-healthcheck" {
return Ok(beam::beam_result::succeeded(
CONFIG.beam_app_id_long.clone(),
vec![task.from.clone()],
task.id,
"healthy".into()
));
}

if metadata.project == "exporter" {
let body = &task.body;
return Ok(run_exporter_query(task, body, metadata.execute).await)?;
Expand Down

0 comments on commit 69217ab

Please sign in to comment.