From 3b42f592e3650dd1ee6880e6c191a54f797458cc Mon Sep 17 00:00:00 2001 From: Enola Knezevic <115070135+enola-dkfz@users.noreply.github.com> Date: Thu, 8 Feb 2024 13:14:24 +0100 Subject: [PATCH 1/5] Lowered number of retries 3600 (2 hours) was too much --- src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main.rs b/src/main.rs index 4e2d41d..5b2f893 100644 --- a/src/main.rs +++ b/src/main.rs @@ -238,7 +238,7 @@ async fn process_tasks( } } - const MAX_TRIES: u32 = 3600; + const MAX_TRIES: u32 = 150; for attempt in 0..MAX_TRIES { let comm_result = if let Some(ref err_msg) = error_msg { beam::fail_task(&task, err_msg).await From af69ce7f0cf68d92bd0ed8d6e5e516ddf639eaed Mon Sep 17 00:00:00 2001 From: janskiba Date: Thu, 8 Feb 2024 13:07:17 +0000 Subject: [PATCH 2/5] fix: Dont retry expired task --- src/beam.rs | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/src/beam.rs b/src/beam.rs index 900ac1d..1d29c64 100644 --- a/src/beam.rs +++ b/src/beam.rs @@ -1,6 +1,7 @@ use std::time::Duration; use beam_lib::{TaskResult, BeamClient, BlockingOptions, MsgId, TaskRequest, RawString}; +use http::StatusCode; use once_cell::sync::Lazy; use serde::Serialize; use tracing::{debug, warn}; @@ -85,20 +86,28 @@ pub async fn retrieve_tasks() -> Result>, FocusError> { .map_err(FocusError::UnableToRetrieveTasksHttp) } -pub async fn answer_task(task_id: MsgId, result: &TaskResult) -> Result { +pub async fn answer_task(task_id: MsgId, result: &TaskResult) -> Result<(), FocusError> { debug!("Answer task with id: {task_id}"); BEAM_CLIENT.put_result(result, &task_id) .await - .map_err(FocusError::UnableToAnswerTask) + .map(|_| ()) + .or_else(|e| match e { + beam_lib::BeamError::UnexpectedStatus(s) if s == StatusCode::NOT_FOUND => Ok(()), + other => Err(FocusError::UnableToAnswerTask(other)) + }) } -pub async fn fail_task(task: &TaskRequest, body: impl Into) -> Result { +pub async fn fail_task(task: &TaskRequest, body: impl Into) -> Result<(), FocusError> { let body = body.into(); warn!("Reporting failed task with id {}: {}", task.id, body); let result = beam_result::perm_failed(CONFIG.beam_app_id_long.clone(), vec![task.from.clone()], task.id, body); BEAM_CLIENT.put_result(&result, &task.id) .await - .map_err(FocusError::UnableToAnswerTask) + .map(|_| ()) + .or_else(|e| match e { + beam_lib::BeamError::UnexpectedStatus(s) if s == StatusCode::NOT_FOUND => Ok(()), + other => Err(FocusError::UnableToAnswerTask(other)) + }) } pub async fn claim_task(task: &TaskRequest) -> Result { From 0ceda2ec5d05fc1b7f703bb429549c92b58a071f Mon Sep 17 00:00:00 2001 From: Enola Knezevic Date: Wed, 14 Feb 2024 08:57:35 +0100 Subject: [PATCH 3/5] focus healthcheck --- src/main.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/main.rs b/src/main.rs index 5b2f893..4f93d7f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -148,6 +148,15 @@ async fn process_task( execute: true, }); + if metadata.project == "focus-healthcheck" { + return Ok(beam::beam_result::succeeded( + CONFIG.beam_app_id_long.clone(), + vec![task.from.clone()], + task.id, + "healthy".into() + )); + } + if metadata.project == "exporter" { let body = &task.body; return Ok(run_exporter_query(task, body, metadata.execute).await)?; From 5e0033f1c20e3da965ad4e7d7af07132591181cd Mon Sep 17 00:00:00 2001 From: Enola Knezevic Date: Mon, 26 Feb 2024 20:59:05 +0100 Subject: [PATCH 4/5] updated readme --- README.md | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 24bbd2a..6b7ea89 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,8 @@ # Focus -Focus is a Samply component ran on the sites, which distributes tasks from Beam.Proxy to the applications on the site and re-transmits the results through Samply.Beam. Currenly, only Samply.Blaze is supported as a target application, but Focus is easily extensible. +Focus is a Samply component ran on the sites, which distributes tasks from Beam.Proxy to the applications on the site and re-transmits the results through Samply.Beam. -It is possible to specify the queries whose results are to be cached to speed up retrieval. The cached results expire after 24 hours. +It is possible to specify Blaze queries whose results are to be cached to speed up retrieval. The cached results expire after 24 hours. ## Installation @@ -59,7 +59,13 @@ Optionally, you can provide the `TLS_CA_CERTIFICATES_DIR` environment variable t ## Usage -Creating a sample task using CURL: +Creating a sample focus healthcheck task using CURL (body can be any string and is ignored): + +```bash +curl -v -X POST -H "Content-Type: application/json" --data '{"id":"7fffefff-ffef-fcff-feef-feffffffffff","from":"app1.proxy1.broker","to":["app1.proxy1.broker"],"ttl":"10s","failure_strategy":{"retry":{"backoff_millisecs":1000,"max_tries":5}},"metadata":{"project":"focus-healthcheck"},"body":"wie geht es"}' -H "Authorization: ApiKey app1.proxy1.broker App1Secret" http://localhost:8081/v1/tasks +``` + +Creating a sample task containing a Blaze query using CURL: ```bash curl -v -X POST -H "Content-Type: application/json" --data '{"id":"7fffefff-ffef-fcff-feef-fefbffffeeff","from":"app1.proxy1.broker","to":["app1.proxy1.broker"],"ttl":"10s","failure_strategy":{"retry":{"backoff_millisecs":1000,"max_tries":5}},"metadata":{"project":"exliquid"},"body":"ewoJImxhbmciOiAiY3FsIiwKCSJsaWIiOiB7CgkJImNvbnRlbnQiOiBbCgkJCXsKCQkJCSJjb250ZW50VHlwZSI6ICJ0ZXh0L2NxbCIsCgkJCQkiZGF0YSI6ICJiR2xpY21GeWVTQlNaWFJ5YVdWMlpRcDFjMmx1WnlCR1NFbFNJSFpsY25OcGIyNGdKelF1TUM0d0p3cHBibU5zZFdSbElFWklTVkpJWld4d1pYSnpJSFpsY25OcGIyNGdKelF1TUM0d0p3b0tZMjlrWlhONWMzUmxiU0JzYjJsdVl6b2dKMmgwZEhBNkx5OXNiMmx1WXk1dmNtY25DbU52WkdWemVYTjBaVzBnYVdOa01UQTZJQ2RvZEhSd09pOHZhR3czTG05eVp5OW1hR2x5TDNOcFpDOXBZMlF0TVRBbkNtTnZaR1Z6ZVhOMFpXMGdVMkZ0Y0d4bFRXRjBaWEpwWVd4VWVYQmxPaUFuYUhSMGNITTZMeTltYUdseUxtSmliWEpwTG1SbEwwTnZaR1ZUZVhOMFpXMHZVMkZ0Y0d4bFRXRjBaWEpwWVd4VWVYQmxKd29LQ21OdmJuUmxlSFFnVUdGMGFXVnVkQW9LUWtKTlVrbGZVMVJTUVZSZlIwVk9SRVZTWDFOVVVrRlVTVVpKUlZJS0NrSkNUVkpKWDFOVVVrRlVYMFJGUmw5VFVFVkRTVTFGVGdwcFppQkpia2x1YVhScFlXeFFiM0IxYkdGMGFXOXVJSFJvWlc0Z1cxTndaV05wYldWdVhTQmxiSE5sSUh0OUlHRnpJRXhwYzNROFUzQmxZMmx0Wlc0K0NncENRazFTU1Y5VFZGSkJWRjlUUVUxUVRFVmZWRmxRUlY5VFZGSkJWRWxHU1VWU0NncENRazFTU1Y5VFZGSkJWRjlEVlZOVVQwUkpRVTVmVTFSU1FWUkpSa2xGVWdvS1FrSk5Va2xmVTFSU1FWUmZSRWxCUjA1UFUwbFRYMU5VVWtGVVNVWkpSVklLQ2tKQ1RWSkpYMU5VVWtGVVgwRkhSVjlUVkZKQlZFbEdTVVZTQ2dwQ1FrMVNTVjlUVkZKQlZGOUVSVVpmU1U1ZlNVNUpWRWxCVEY5UVQxQlZURUZVU1U5T0NuUnlkV1U9IgoJCQl9CgkJXSwKCQkicmVzb3VyY2VUeXBlIjogIkxpYnJhcnkiLAoJCSJzdGF0dXMiOiAiYWN0aXZlIiwKCQkidHlwZSI6IHsKCQkJImNvZGluZyI6IFsKCQkJCXsKCQkJCQkiY29kZSI6ICJsb2dpYy1saWJyYXJ5IiwKCQkJCQkic3lzdGVtIjogImh0dHA6Ly90ZXJtaW5vbG9neS5obDcub3JnL0NvZGVTeXN0ZW0vbGlicmFyeS10eXBlIgoJCQkJfQoJCQldCgkJfSwKCQkidXJsIjogInVybjp1dWlkOjdmZjUzMmFkLTY5ZTQtNDhlZC1hMmQzLTllZmFmYjYwOWY2MiIKCX0sCgkibWVhc3VyZSI6IHsKCQkiZ3JvdXAiOiBbCgkJCXsKCQkJCSJjb2RlIjogewoJCQkJCSJ0ZXh0IjogInBhdGllbnRzIgoJCQkJfSwKCQkJCSJwb3B1bGF0aW9uIjogWwoJCQkJCXsKCQkJCQkJImNvZGUiOiB7CgkJCQkJCQkiY29kaW5nIjogWwoJCQkJCQkJCXsKCQkJCQkJCQkJImNvZGUiOiAiaW5pdGlhbC1wb3B1bGF0aW9uIiwKCQkJCQkJCQkJInN5c3RlbSI6ICJodHRwOi8vdGVybWlub2xvZ3kuaGw3Lm9yZy9Db2RlU3lzdGVtL21lYXN1cmUtcG9wdWxhdGlvbiIKCQkJCQkJCQl9CgkJCQkJCQldCgkJCQkJCX0sCgkJCQkJCSJjcml0ZXJpYSI6IHsKCQkJCQkJCSJleHByZXNzaW9uIjogIkluSW5pdGlhbFBvcHVsYXRpb24iLAoJCQkJCQkJImxhbmd1YWdlIjogInRleHQvY3FsLWlkZW50aWZpZXIiCgkJCQkJCX0KCQkJCQl9CgkJCQldLAoJCQkJInN0cmF0aWZpZXIiOiBbCgkJCQkJewoJCQkJCQkiY29kZSI6IHsKCQkJCQkJCSJ0ZXh0IjogIkdlbmRlciIKCQkJCQkJfSwKCQkJCQkJImNyaXRlcmlhIjogewoJCQkJCQkJImV4cHJlc3Npb24iOiAiR2VuZGVyIiwKCQkJCQkJCSJsYW5ndWFnZSI6ICJ0ZXh0L2NxbCIKCQkJCQkJfQoJCQkJCX0sCgkJCQkJewoJCQkJCQkiY29kZSI6IHsKCQkJCQkJCSJ0ZXh0IjogIkFnZSIKCQkJCQkJfSwKCQkJCQkJImNyaXRlcmlhIjogewoJCQkJCQkJImV4cHJlc3Npb24iOiAiQWdlQ2xhc3MiLAoJCQkJCQkJImxhbmd1YWdlIjogInRleHQvY3FsIgoJCQkJCQl9CgkJCQkJfSwKCQkJCQl7CgkJCQkJCSJjb2RlIjogewoJCQkJCQkJInRleHQiOiAiQ3VzdG9kaWFuIgoJCQkJCQl9LAoJCQkJCQkiY3JpdGVyaWEiOiB7CgkJCQkJCQkiZXhwcmVzc2lvbiI6ICJDdXN0b2RpYW4iLAoJCQkJCQkJImxhbmd1YWdlIjogInRleHQvY3FsIgoJCQkJCQl9CgkJCQkJfQoJCQkJXQoJCQl9LAoJCQl7CgkJCQkiY29kZSI6IHsKCQkJCQkidGV4dCI6ICJkaWFnbm9zaXMiCgkJCQl9LAoJCQkJImV4dGVuc2lvbiI6IFsKCQkJCQl7CgkJCQkJCSJ1cmwiOiAiaHR0cDovL2hsNy5vcmcvZmhpci91cy9jcWZtZWFzdXJlcy9TdHJ1Y3R1cmVEZWZpbml0aW9uL2NxZm0tcG9wdWxhdGlvbkJhc2lzIiwKCQkJCQkJInZhbHVlQ29kZSI6ICJDb25kaXRpb24iCgkJCQkJfQoJCQkJXSwKCQkJCSJwb3B1bGF0aW9uIjogWwoJCQkJCXsKCQkJCQkJImNvZGUiOiB7CgkJCQkJCQkiY29kaW5nIjogWwoJCQkJCQkJCXsKCQkJCQkJCQkJImNvZGUiOiAiaW5pdGlhbC1wb3B1bGF0aW9uIiwKCQkJCQkJCQkJInN5c3RlbSI6ICJodHRwOi8vdGVybWlub2xvZ3kuaGw3Lm9yZy9Db2RlU3lzdGVtL21lYXN1cmUtcG9wdWxhdGlvbiIKCQkJCQkJCQl9CgkJCQkJCQldCgkJCQkJCX0sCgkJCQkJCSJjcml0ZXJpYSI6IHsKCQkJCQkJCSJleHByZXNzaW9uIjogIkRpYWdub3NpcyIsCgkJCQkJCQkibGFuZ3VhZ2UiOiAidGV4dC9jcWwtaWRlbnRpZmllciIKCQkJCQkJfQoJCQkJCX0KCQkJCV0sCgkJCQkic3RyYXRpZmllciI6IFsKCQkJCQl7CgkJCQkJCSJjb2RlIjogewoJCQkJCQkJInRleHQiOiAiZGlhZ25vc2lzIgoJCQkJCQl9LAoJCQkJCQkiY3JpdGVyaWEiOiB7CgkJCQkJCQkiZXhwcmVzc2lvbiI6ICJEaWFnbm9zaXNDb2RlIiwKCQkJCQkJCSJsYW5ndWFnZSI6ICJ0ZXh0L2NxbC1pZGVudGlmaWVyIgoJCQkJCQl9CgkJCQkJfQoJCQkJXQoJCQl9LAoJCQl7CgkJCQkiY29kZSI6IHsKCQkJCQkidGV4dCI6ICJzcGVjaW1lbiIKCQkJCX0sCgkJCQkiZXh0ZW5zaW9uIjogWwoJCQkJCXsKCQkJCQkJInVybCI6ICJodHRwOi8vaGw3Lm9yZy9maGlyL3VzL2NxZm1lYXN1cmVzL1N0cnVjdHVyZURlZmluaXRpb24vY3FmbS1wb3B1bGF0aW9uQmFzaXMiLAoJCQkJCQkidmFsdWVDb2RlIjogIlNwZWNpbWVuIgoJCQkJCX0KCQkJCV0sCgkJCQkicG9wdWxhdGlvbiI6IFsKCQkJCQl7CgkJCQkJCSJjb2RlIjogewoJCQkJCQkJImNvZGluZyI6IFsKCQkJCQkJCQl7CgkJCQkJCQkJCSJjb2RlIjogImluaXRpYWwtcG9wdWxhdGlvbiIsCgkJCQkJCQkJCSJzeXN0ZW0iOiAiaHR0cDovL3Rlcm1pbm9sb2d5LmhsNy5vcmcvQ29kZVN5c3RlbS9tZWFzdXJlLXBvcHVsYXRpb24iCgkJCQkJCQkJfQoJCQkJCQkJXQoJCQkJCQl9LAoJCQkJCQkiY3JpdGVyaWEiOiB7CgkJCQkJCQkiZXhwcmVzc2lvbiI6ICJTcGVjaW1lbiIsCgkJCQkJCQkibGFuZ3VhZ2UiOiAidGV4dC9jcWwtaWRlbnRpZmllciIKCQkJCQkJfQoJCQkJCX0KCQkJCV0sCgkJCQkic3RyYXRpZmllciI6IFsKCQkJCQl7CgkJCQkJCSJjb2RlIjogewoJCQkJCQkJInRleHQiOiAic2FtcGxlX2tpbmQiCgkJCQkJCX0sCgkJCQkJCSJjcml0ZXJpYSI6IHsKCQkJCQkJCSJleHByZXNzaW9uIjogIlNhbXBsZVR5cGUiLAoJCQkJCQkJImxhbmd1YWdlIjogInRleHQvY3FsIgoJCQkJCQl9CgkJCQkJfQoJCQkJXQoJCQl9CgkJXSwKCQkibGlicmFyeSI6ICJ1cm46dXVpZDo3ZmY1MzJhZC02OWU0LTQ4ZWQtYTJkMy05ZWZhZmI2MDlmNjIiLAoJCSJyZXNvdXJjZVR5cGUiOiAiTWVhc3VyZSIsCgkJInNjb3JpbmciOiB7CgkJCSJjb2RpbmciOiBbCgkJCQl7CgkJCQkJImNvZGUiOiAiY29ob3J0IiwKCQkJCQkic3lzdGVtIjogImh0dHA6Ly90ZXJtaW5vbG9neS5obDcub3JnL0NvZGVTeXN0ZW0vbWVhc3VyZS1zY29yaW5nIgoJCQkJfQoJCQldCgkJfSwKCQkic3RhdHVzIjogImFjdGl2ZSIsCgkJInN1YmplY3RDb2RlYWJsZUNvbmNlcHQiOiB7CgkJCSJjb2RpbmciOiBbCgkJCQl7CgkJCQkJImNvZGUiOiAiUGF0aWVudCIsCgkJCQkJInN5c3RlbSI6ICJodHRwOi8vaGw3Lm9yZy9maGlyL3Jlc291cmNlLXR5cGVzIgoJCQkJfQoJCQldCgkJfSwKCQkidXJsIjogInVybjp1dWlkOjVlZThkZTczLTM0N2UtNDdjYS1hMDE0LWYyZTcxNzY3YWRmYyIKCX0KfQ=="}' -H "Authorization: ApiKey app1.proxy1.broker App1Secret" http://localhost:8081/v1/tasks From e1252a8fc61bb56738b8befdb885ea0c22cd13e6 Mon Sep 17 00:00:00 2001 From: Enola Knezevic Date: Mon, 26 Feb 2024 21:03:52 +0100 Subject: [PATCH 5/5] formatting --- src/main.rs | 36 +++++++++++++++++------------------- 1 file changed, 17 insertions(+), 19 deletions(-) diff --git a/src/main.rs b/src/main.rs index 0b02a8f..64218b9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -54,32 +54,31 @@ struct ReportCache { } impl ReportCache { - pub fn new() -> Self { let mut cache = HashMap::new(); - + if let Some(filename) = CONFIG.queries_to_cache_file_path.clone() { let lines = util::read_lines(filename.clone().to_string()); match lines { Ok(ok_lines) => { - for line in ok_lines { - let Ok(ok_line) = line else { - warn!("A line in the file {} is not readable", filename); - continue; - }; - cache.insert((ok_line.clone(), false), ("".into(), UNIX_EPOCH)); - cache.insert((ok_line, true), ("".into(), UNIX_EPOCH)); - } - }, + for line in ok_lines { + let Ok(ok_line) = line else { + warn!("A line in the file {} is not readable", filename); + continue; + }; + cache.insert((ok_line.clone(), false), ("".into(), UNIX_EPOCH)); + cache.insert((ok_line, true), ("".into(), UNIX_EPOCH)); + } + } Err(_) => { - error!("The file {} cannot be opened", filename); //This shouldn't stop focus from running, it's just going to go to blaze every time, but that's not too slow + error!("The file {} cannot be opened", filename); //This shouldn't stop focus from running, it's just going to go to blaze every time, but that's not too slow } } } - - Self {cache} + + Self { cache } } - } +} const REPORTCACHE_TTL: Duration = Duration::from_secs(86400); //24h @@ -133,7 +132,7 @@ async fn main_loop() -> ExitCode { if let Err(e) = process_tasks(&mut task_queue, &mut seen_tasks).await { warn!("Encountered the following error, while processing tasks: {e}"); - failures += 1; + failures += 1; } else { failures = 0; } @@ -152,10 +151,8 @@ async fn process_tasks( debug!("Start processing tasks..."); let tasks = beam::retrieve_tasks().await?; for task in tasks { - if seen.contains(&task.id) { continue; - } seen.insert(task.id); task_queue @@ -183,7 +180,8 @@ async fn run_cql_query( let mut key_exists = false; - let obfuscate = CONFIG.obfuscate == config::Obfuscate::Yes && !CONFIG.unobfuscated.contains(&project); + let obfuscate = + CONFIG.obfuscate == config::Obfuscate::Yes && !CONFIG.unobfuscated.contains(&project); let report_from_cache = match report_cache .lock()