From 153a176594d0bd9e3dd7a23f1b6dbdae165f1ee9 Mon Sep 17 00:00:00 2001 From: Enola Knezevic Date: Mon, 29 Jan 2024 17:20:28 +0100 Subject: [PATCH 01/19] gender strings not in code system -> other --- resources/cql/BBMRI_STRAT_GENDER_STRATIFIER | 4 +++- src/util.rs | 1 + 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/resources/cql/BBMRI_STRAT_GENDER_STRATIFIER b/resources/cql/BBMRI_STRAT_GENDER_STRATIFIER index 7223b2d..a7daf87 100644 --- a/resources/cql/BBMRI_STRAT_GENDER_STRATIFIER +++ b/resources/cql/BBMRI_STRAT_GENDER_STRATIFIER @@ -1,2 +1,4 @@ define Gender: - if (Patient.gender is null) then 'unknown' else Patient.gender + if (Patient.gender is null) then 'unknown' + else if (Patient.gender != 'male' and Patient.gender != 'female' and Patient.gender != 'other' and Patient.gender != 'unknown') then 'other' + else Patient.gender \ No newline at end of file diff --git a/src/util.rs b/src/util.rs index 668fde7..b5c961e 100644 --- a/src/util.rs +++ b/src/util.rs @@ -239,6 +239,7 @@ pub fn obfuscate_counts_mr( let measure_report_obfuscated = serde_json::to_string_pretty(&measure_report) .map_err(|e| FocusError::SerializationError(e.to_string()))?; + dbg!(&measure_report_obfuscated); Ok(measure_report_obfuscated) } From 4fce1aad1be04667440cefac5aaf04c780b7b81b Mon Sep 17 00:00:00 2001 From: Enola Knezevic Date: Mon, 29 Jan 2024 17:20:28 +0100 Subject: [PATCH 02/19] gender strings not in code system -> other --- resources/cql/BBMRI_STRAT_GENDER_STRATIFIER | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/resources/cql/BBMRI_STRAT_GENDER_STRATIFIER b/resources/cql/BBMRI_STRAT_GENDER_STRATIFIER index 7223b2d..a7daf87 100644 --- a/resources/cql/BBMRI_STRAT_GENDER_STRATIFIER +++ b/resources/cql/BBMRI_STRAT_GENDER_STRATIFIER @@ -1,2 +1,4 @@ define Gender: - if (Patient.gender is null) then 'unknown' else Patient.gender + if (Patient.gender is null) then 'unknown' + else if (Patient.gender != 'male' and Patient.gender != 'female' and Patient.gender != 'other' and Patient.gender != 'unknown') then 'other' + else Patient.gender \ No newline at end of file From bceb588725f132e37484d850c9a3f18233144177 Mon Sep 17 00:00:00 2001 From: Enola Knezevic Date: Mon, 29 Jan 2024 17:23:29 +0100 Subject: [PATCH 03/19] removed dbg --- src/util.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/util.rs b/src/util.rs index b5c961e..668fde7 100644 --- a/src/util.rs +++ b/src/util.rs @@ -239,7 +239,6 @@ pub fn obfuscate_counts_mr( let measure_report_obfuscated = serde_json::to_string_pretty(&measure_report) .map_err(|e| FocusError::SerializationError(e.to_string()))?; - dbg!(&measure_report_obfuscated); Ok(measure_report_obfuscated) } From 4dfa7bc0da30070891c1973ffe1a593018da922c Mon Sep 17 00:00:00 2001 From: Enola Knezevic <115070135+enola-dkfz@users.noreply.github.com> Date: Mon, 29 Jan 2024 17:26:20 +0100 Subject: [PATCH 04/19] removed dbg! --- src/util.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/util.rs b/src/util.rs index b5c961e..668fde7 100644 --- a/src/util.rs +++ b/src/util.rs @@ -239,7 +239,6 @@ pub fn obfuscate_counts_mr( let measure_report_obfuscated = serde_json::to_string_pretty(&measure_report) .map_err(|e| FocusError::SerializationError(e.to_string()))?; - dbg!(&measure_report_obfuscated); Ok(measure_report_obfuscated) } From b40c76b83ed75691f555e79be07683d358575347 Mon Sep 17 00:00:00 2001 From: Enola Knezevic Date: Mon, 29 Jan 2024 17:34:25 +0100 Subject: [PATCH 05/19] fixed tests --- resources/test/query_bbmri.cql | 5 +++-- src/util.rs | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/resources/test/query_bbmri.cql b/resources/test/query_bbmri.cql index b080800..3493054 100644 --- a/resources/test/query_bbmri.cql +++ b/resources/test/query_bbmri.cql @@ -15,8 +15,9 @@ define AgeClass: define Gender: - if (Patient.gender is null) then 'unknown' else Patient.gender - + if (Patient.gender is null) then 'unknown' + else if (Patient.gender != 'male' and Patient.gender != 'female' and Patient.gender != 'other' and Patient.gender != 'unknown') then 'other' + else Patient.gender define Custodian: First(from Specimen.extension E diff --git a/src/util.rs b/src/util.rs index 668fde7..2cd3da9 100644 --- a/src/util.rs +++ b/src/util.rs @@ -411,7 +411,7 @@ mod test { let decoded_library = "BBMRI_STRAT_GENDER_STRATIFIER"; - let expected_result = "define Gender:\n if (Patient.gender is null) then 'unknown' else Patient.gender\n"; + let expected_result = "define Gender:\n if (Patient.gender is null) then 'unknown'\n else if (Patient.gender != 'male' and Patient.gender != 'female' and Patient.gender != 'other' and Patient.gender != 'unknown') then 'other'\n else Patient.gender"; assert_eq!(replace_cql(decoded_library), expected_result); let decoded_library = "BBMRI_STRAT_CUSTODIAN_STRATIFIER"; From 7e026e6beb71bdec16b1afd9dc0e38878cda8599 Mon Sep 17 00:00:00 2001 From: Jan <59206115+Threated@users.noreply.github.com> Date: Tue, 30 Jan 2024 13:04:41 +0100 Subject: [PATCH 06/19] feat: Handle beam tasks asynchronously (#101) * feat: Handle beam tasks asynchronously * refactor: Use a limited amount of tasks * added obfuscate bool to report cache key (#102) * added obfuscate bool to report cache key * Update src/main.rs Co-authored-by: Jan <59206115+Threated@users.noreply.github.com> * renamed init to new --------- Co-authored-by: Enola Knezevic Co-authored-by: Jan <59206115+Threated@users.noreply.github.com> --------- Co-authored-by: Enola Knezevic <115070135+enola-dkfz@users.noreply.github.com> Co-authored-by: Enola Knezevic --- src/blaze.rs | 8 ++ src/config.rs | 4 +- src/main.rs | 289 ++++++++++++----------------------------- src/task_processing.rs | 155 ++++++++++++++++++++++ src/util.rs | 8 ++ 5 files changed, 258 insertions(+), 206 deletions(-) create mode 100644 src/task_processing.rs diff --git a/src/blaze.rs b/src/blaze.rs index 1ef9870..6e5f310 100644 --- a/src/blaze.rs +++ b/src/blaze.rs @@ -4,7 +4,9 @@ use serde::Serialize; use serde_json::Value; use tracing::{debug, warn}; +use crate::BeamTask; use crate::errors::FocusError; +use crate::util; use crate::util::get_json_field; use crate::config::CONFIG; @@ -120,3 +122,9 @@ pub async fn run_cql_query(library: &Value, measure: &Value) -> Result Result { + let decoded = util::base64_decode(&task.body)?; + serde_json::from_slice(&decoded).map_err(|e| FocusError::ParsingError(e.to_string())) +} diff --git a/src/config.rs b/src/config.rs index f5910d3..5dc7808 100644 --- a/src/config.rs +++ b/src/config.rs @@ -11,13 +11,13 @@ use tracing::{debug, info, warn}; use crate::errors::FocusError; -#[derive(clap::ValueEnum, Clone, Debug)] +#[derive(clap::ValueEnum, Clone, PartialEq, Debug)] pub enum Obfuscate { No, Yes, } -#[derive(clap::ValueEnum, Clone, Debug, PartialEq, Copy)] +#[derive(clap::ValueEnum, Clone, PartialEq, Debug, Copy)] pub enum EndpointType { Blaze, Omop, diff --git a/src/main.rs b/src/main.rs index 4e2d41d..dffb9a5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,30 +9,33 @@ mod graceful_shutdown; mod logger; mod intermediate_rep; +mod task_processing; mod util; -use beam_lib::{TaskRequest, TaskResult}; +use base64::{engine::general_purpose::STANDARD as BASE64, Engine as _}; +use beam_lib::{MsgId, TaskRequest, TaskResult}; use laplace_rs::ObfCache; +use task_processing::TaskQueue; +use tokio::sync::Mutex; use crate::util::{is_cql_tampered_with, obfuscate_counts_mr}; use crate::{config::CONFIG, errors::FocusError}; use blaze::CqlQuery; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; +use std::ops::DerefMut; use std::process::ExitCode; use std::str; +use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; use std::{process::exit, time::Duration}; -use base64::{engine::general_purpose, Engine as _}; - use serde::{Deserialize, Serialize}; -use serde_json::from_slice; - use tracing::{debug, error, warn}; // result cache type SearchQuery = String; +type Obfuscated = bool; type Report = String; type Created = std::time::SystemTime; //epoch type BeamTask = TaskRequest; @@ -45,11 +48,39 @@ struct Metadata { execute: bool, } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Default)] struct ReportCache { - cache: HashMap, + cache: HashMap<(SearchQuery, Obfuscated), (Report, Created)>, } +impl ReportCache { + + pub fn new() -> Self { + let mut cache = HashMap::new(); + + if let Some(filename) = CONFIG.queries_to_cache_file_path.clone() { + let lines = util::read_lines(filename.clone().to_string()); + match lines { + Ok(ok_lines) => { + for line in ok_lines { + let Ok(ok_line) = line else { + warn!("A line in the file {} is not readable", filename); + continue; + }; + cache.insert((ok_line.clone(), false), ("".into(), UNIX_EPOCH)); + cache.insert((ok_line, true), ("".into(), UNIX_EPOCH)); + } + }, + Err(_) => { + error!("The file {} cannot be opened", filename); //This shouldn't stop focus from running, it's just going to go to blaze every time, but that's not too slow + } + } + } + + Self {cache} + } + } + const REPORTCACHE_TTL: Duration = Duration::from_secs(86400); //24h #[tokio::main] @@ -73,33 +104,11 @@ pub async fn main() -> ExitCode { } async fn main_loop() -> ExitCode { - let mut obf_cache: ObfCache = ObfCache { - cache: HashMap::new(), - }; - - let mut report_cache: ReportCache = ReportCache { - cache: HashMap::new(), - }; - - if let Some(filename) = CONFIG.queries_to_cache_file_path.clone() { - let lines = util::read_lines(filename.clone().to_string()); - match lines { - Ok(ok_lines) => { - for line in ok_lines { - let Ok(ok_line) = line else { - warn!("A line in the file {} is not readable", filename); - continue; - }; - report_cache.cache.insert(ok_line, ("".into(), UNIX_EPOCH)); - } - } - Err(_) => { - error!("The file {} cannot be opened", filename); - exit(2); - } - } - } + // TODO: The report cache init should be an fn on the cache + let report_cache: ReportCache = ReportCache::new(); + let mut seen_tasks = Default::default(); + let mut task_queue = task_processing::spawn_task_workers(report_cache); let mut failures = 0; while failures < CONFIG.retry_count { if failures > 0 { @@ -122,9 +131,9 @@ async fn main_loop() -> ExitCode { //TODO health check } - if let Err(e) = process_tasks(&mut obf_cache, &mut report_cache).await { + if let Err(e) = process_tasks(&mut task_queue, &mut seen_tasks).await { warn!("Encountered the following error, while processing tasks: {e}"); - //failures += 1; + //failures += 1; //I believe this can be uncommented now } else { failures = 0; } @@ -136,158 +145,31 @@ async fn main_loop() -> ExitCode { ExitCode::from(22) } -async fn process_task( - task: &BeamTask, - obf_cache: &mut ObfCache, - report_cache: &mut ReportCache, -) -> Result { - debug!("Processing task {}", task.id); - - let metadata: Metadata = serde_json::from_value(task.metadata.clone()).unwrap_or(Metadata { - project: "default_obfuscation".to_string(), - execute: true, - }); - - if metadata.project == "exporter" { - let body = &task.body; - return Ok(run_exporter_query(task, body, metadata.execute).await)?; - } - - if CONFIG.endpoint_type == config::EndpointType::Blaze { - let query = parse_blaze_query(task)?; - if query.lang == "cql" { - // TODO: Change query.lang to an enum - - Ok(run_cql_query(task, &query, obf_cache, report_cache, metadata.project).await)? - } else { - warn!("Can't run queries with language {} in Blaze", query.lang); - Ok(beam::beam_result::perm_failed( - CONFIG.beam_app_id_long.clone(), - vec![task.from.clone()], - task.id, - format!( - "Can't run queries with language {} and/or endpoint type {}", - query.lang, CONFIG.endpoint_type - ), - )) - } - } else if CONFIG.endpoint_type == config::EndpointType::Omop { - let decoded = decode_body(task)?; - let intermediate_rep_query: intermediate_rep::IntermediateRepQuery = - from_slice(&decoded).map_err(|e| FocusError::ParsingError(e.to_string()))?; - //TODO check that the language is ast - let query_decoded = general_purpose::STANDARD - .decode(intermediate_rep_query.query) - .map_err(FocusError::DecodeError)?; - let ast: ast::Ast = - from_slice(&query_decoded).map_err(|e| FocusError::ParsingError(e.to_string()))?; - - Ok(run_intermediate_rep_query(task, ast).await)? - } else { - warn!( - "Can't run queries with endpoint type {}", - CONFIG.endpoint_type - ); - Ok(beam::beam_result::perm_failed( - CONFIG.beam_app_id_long.clone(), - vec![task.from.clone()], - task.id, - format!( - "Can't run queries with endpoint type {}", - CONFIG.endpoint_type - ), - )) - } -} - async fn process_tasks( - obf_cache: &mut ObfCache, - report_cache: &mut ReportCache, + task_queue: &mut TaskQueue, + seen: &mut HashSet, ) -> Result<(), FocusError> { debug!("Start processing tasks..."); let tasks = beam::retrieve_tasks().await?; for task in tasks { - let task_cloned = task.clone(); - let claiming = tokio::task::spawn(async move { beam::claim_task(&task_cloned).await }); - let res = process_task(&task, obf_cache, report_cache).await; - let error_msg = match res { - Err(FocusError::DecodeError(_)) | Err(FocusError::ParsingError(_)) => { - Some("Cannot parse query".to_string()) - } - Err(FocusError::LaplaceError(_)) => Some("Cannot obfuscate result".to_string()), - Err(ref e) => Some(format!("Cannot execute query: {}", e)), - Ok(_) => None, - }; - - let res = res.ok(); - let res = res.as_ref(); - - // Make sure that claiming the task is done before we update it again. - match claiming.await.unwrap() { - Ok(_) => {} - Err(FocusError::ConfigurationError(s)) => { - error!("FATAL: Unable to report back to Beam due to a configuration issue: {s}"); - return Err(FocusError::ConfigurationError(s)); - } - Err(FocusError::UnableToAnswerTask(e)) => { - warn!("Unable to report claimed task to Beam: {e}"); - } - Err(e) => { - warn!("Unknown error reporting claimed task back to Beam: {e}"); - } - } - - const MAX_TRIES: u32 = 3600; - for attempt in 0..MAX_TRIES { - let comm_result = if let Some(ref err_msg) = error_msg { - beam::fail_task(&task, err_msg).await - } else { - beam::answer_task(task.id, res.unwrap()).await - }; - match comm_result { - Ok(_) => break, - Err(FocusError::ConfigurationError(s)) => { - error!( - "FATAL: Unable to report back to Beam due to a configuration issue: {s}" - ); - return Err(FocusError::ConfigurationError(s)); - } - Err(FocusError::UnableToAnswerTask(e)) => { - warn!("Unable to report task result to Beam: {e}. Retrying (attempt {attempt}/{MAX_TRIES})."); - } - Err(e) => { - warn!("Unknown error reporting task result back to Beam: {e}. Retrying (attempt {attempt}/{MAX_TRIES})."); - } - } - tokio::time::sleep(Duration::from_secs(2)).await; + if seen.contains(&task.id) { + continue; } + seen.insert(task.id); + task_queue + .send(task) + .await + .expect("Receiver is never dropped"); } Ok(()) } -fn decode_body(task: &BeamTask) -> Result, FocusError> { - let decoded = general_purpose::STANDARD - .decode(&task.body) - .map_err(FocusError::DecodeError)?; - - Ok(decoded) -} - -fn parse_blaze_query(task: &BeamTask) -> Result { - let decoded = decode_body(task)?; - - let query: blaze::CqlQuery = - from_slice(&decoded).map_err(|e| FocusError::ParsingError(e.to_string()))?; - - Ok(query) -} - async fn run_cql_query( task: &BeamTask, query: &CqlQuery, - obf_cache: &mut ObfCache, - report_cache: &mut ReportCache, + obf_cache: Arc>, + report_cache: Arc>, project: String, ) -> Result { let encoded_query = @@ -300,8 +182,14 @@ async fn run_cql_query( let mut key_exists = false; - let cached_report = report_cache.cache.get(encoded_query); - let report_from_cache = match cached_report { + let obfuscate = CONFIG.obfuscate == config::Obfuscate::Yes && !CONFIG.unobfuscated.contains(&project); + + let report_from_cache = match report_cache + .lock() + .await + .cache + .get(&(encoded_query.to_string(), obfuscate)) + { Some(existing_report) => { key_exists = true; if SystemTime::now().duration_since(existing_report.1).unwrap() < REPORTCACHE_TTL { @@ -320,32 +208,26 @@ async fn run_cql_query( let cql_result = blaze::run_cql_query(&query.lib, &query.measure).await?; - let cql_result_new: String = match CONFIG.obfuscate { - config::Obfuscate::Yes => { - if !CONFIG.unobfuscated.contains(&project) { - obfuscate_counts_mr( - &cql_result, - obf_cache, - CONFIG.obfuscate_zero, - CONFIG.obfuscate_below_10_mode, - CONFIG.delta_patient, - CONFIG.delta_specimen, - CONFIG.delta_diagnosis, - CONFIG.delta_procedures, - CONFIG.delta_medication_statements, - CONFIG.epsilon, - CONFIG.rounding_step, - )? - } else { - cql_result - } - } - config::Obfuscate::No => cql_result, + let cql_result_new: String = match obfuscate { + true => obfuscate_counts_mr( + &cql_result, + obf_cache.lock().await.deref_mut(), + CONFIG.obfuscate_zero, + CONFIG.obfuscate_below_10_mode, + CONFIG.delta_patient, + CONFIG.delta_specimen, + CONFIG.delta_diagnosis, + CONFIG.delta_procedures, + CONFIG.delta_medication_statements, + CONFIG.epsilon, + CONFIG.rounding_step, + )?, + false => cql_result, }; if key_exists { - report_cache.cache.insert( - encoded_query.to_string(), + report_cache.lock().await.cache.insert( + (encoded_query.to_string(), obfuscate), (cql_result_new.clone(), std::time::SystemTime::now()), ); } @@ -434,9 +316,7 @@ fn replace_cql_library(mut query: CqlQuery) -> Result { query.lib )))?; - let decoded_cql = general_purpose::STANDARD - .decode(old_data_string) - .map_err(FocusError::DecodeError)?; + let decoded_cql = util::base64_decode(old_data_string)?; let decoded_string = str::from_utf8(&decoded_cql) .map_err(|_| FocusError::ParsingError("CQL query was invalid".into()))?; @@ -452,7 +332,7 @@ fn replace_cql_library(mut query: CqlQuery) -> Result { }; let replaced_cql_str = util::replace_cql(decoded_string); - let replaced_cql_str_base64 = general_purpose::STANDARD.encode(replaced_cql_str); + let replaced_cql_str_base64 = BASE64.encode(replaced_cql_str); let new_data_value = serde_json::to_value(replaced_cql_str_base64) .expect("unable to turn base64 string into json value - this should not happen"); @@ -463,7 +343,7 @@ fn replace_cql_library(mut query: CqlQuery) -> Result { } fn beam_result(task: BeamTask, measure_report: String) -> Result { - let data = general_purpose::STANDARD.encode(measure_report.as_bytes()); + let data = BASE64.encode(measure_report.as_bytes()); Ok(beam::beam_result::succeeded( CONFIG.beam_app_id_long.clone(), vec![task.from], @@ -472,6 +352,7 @@ fn beam_result(task: BeamTask, measure_report: String) -> Result; + +pub fn spawn_task_workers(report_cache: ReportCache) -> TaskQueue { + let (tx, mut rx) = mpsc::channel(WORKER_BUFFER); + + let obf_cache = Arc::new(Mutex::new(ObfCache { + cache: HashMap::new(), + })); + + let report_cache: Arc> = Arc::new(Mutex::new(report_cache)); + + tokio::spawn(async move { + let semaphore = Arc::new(Semaphore::new(NUM_WORKERS)); + while let Some(task) = rx.recv().await { + let permit = semaphore.clone().acquire_owned().await.unwrap(); + let local_report_cache = report_cache.clone(); + let local_obf_cache = obf_cache.clone(); + tokio::spawn(async move { + handle_beam_task(task, local_obf_cache, local_report_cache).await; + drop(permit) + }); + } + }); + + tx +} + +async fn handle_beam_task(task: BeamTask, local_obf_cache: Arc>, local_report_cache: Arc>) { + let task_cloned = task.clone(); + let claiming = tokio::task::spawn(async move { beam::claim_task(&task_cloned).await }); + let res = process_task(&task, local_obf_cache, local_report_cache).await; + let error_msg = match res { + Err(FocusError::DecodeError(_)) | Err(FocusError::ParsingError(_)) => { + Some("Cannot parse query".to_string()) + } + Err(FocusError::LaplaceError(_)) => Some("Cannot obfuscate result".to_string()), + Err(ref e) => Some(format!("Cannot execute query: {}", e)), + Ok(_) => None, + }; + + let res = res.ok(); + // Make sure that claiming the task is done before we update it again. + match claiming.await.unwrap() { + Ok(_) => {} + Err(FocusError::ConfigurationError(s)) => { + error!("FATAL: Unable to report back to Beam due to a configuration issue: {s}"); + } + Err(FocusError::UnableToAnswerTask(e)) => { + warn!("Unable to report claimed task to Beam: {e}"); + } + Err(e) => { + warn!("Unknown error reporting claimed task back to Beam: {e}"); + } + } + + const MAX_TRIES: u32 = 3600; + for attempt in 0..MAX_TRIES { + let comm_result = if let Some(ref err_msg) = error_msg { + beam::fail_task(&task, err_msg).await + } else { + beam::answer_task(task.id, res.as_ref().unwrap()).await + }; + match comm_result { + Ok(_) => break, + Err(FocusError::ConfigurationError(s)) => { + error!( + "FATAL: Unable to report back to Beam due to a configuration issue: {s}" + ); + } + Err(FocusError::UnableToAnswerTask(e)) => { + warn!("Unable to report task result to Beam: {e}. Retrying (attempt {attempt}/{MAX_TRIES})."); + } + Err(e) => { + warn!("Unknown error reporting task result back to Beam: {e}. Retrying (attempt {attempt}/{MAX_TRIES})."); + } + }; + tokio::time::sleep(Duration::from_secs(2)).await; + } +} + +async fn process_task( + task: &BeamTask, + obf_cache: Arc>, + report_cache: Arc>, +) -> Result { + debug!("Processing task {}", task.id); + + let metadata: Metadata = serde_json::from_value(task.metadata.clone()).unwrap_or(Metadata { + project: "default_obfuscation".to_string(), + execute: true, + }); + + if metadata.project == "exporter" { + let body = &task.body; + return Ok(run_exporter_query(task, body, metadata.execute).await)?; + } + + if CONFIG.endpoint_type == EndpointType::Blaze { + let query = parse_blaze_query(task)?; + if query.lang == "cql" { + // TODO: Change query.lang to an enum + + Ok(run_cql_query(task, &query, obf_cache, report_cache, metadata.project).await)? + } else { + warn!("Can't run queries with language {} in Blaze", query.lang); + Ok(beam::beam_result::perm_failed( + CONFIG.beam_app_id_long.clone(), + vec![task.from.clone()], + task.id, + format!( + "Can't run queries with language {} and/or endpoint type {}", + query.lang, CONFIG.endpoint_type + ), + )) + } + } else if CONFIG.endpoint_type == EndpointType::Omop { + let decoded = util::base64_decode(&task.body)?; + let intermediate_rep_query: intermediate_rep::IntermediateRepQuery = + serde_json::from_slice(&decoded).map_err(|e| FocusError::ParsingError(e.to_string()))?; + //TODO check that the language is ast + let query_decoded = general_purpose::STANDARD + .decode(intermediate_rep_query.query) + .map_err(FocusError::DecodeError)?; + let ast: ast::Ast = + serde_json::from_slice(&query_decoded).map_err(|e| FocusError::ParsingError(e.to_string()))?; + + Ok(run_intermediate_rep_query(task, ast).await)? + } else { + warn!( + "Can't run queries with endpoint type {}", + CONFIG.endpoint_type + ); + Ok(beam::beam_result::perm_failed( + CONFIG.beam_app_id_long.clone(), + vec![task.from.clone()], + task.id, + format!( + "Can't run queries with endpoint type {}", + CONFIG.endpoint_type + ), + )) + } +} diff --git a/src/util.rs b/src/util.rs index 2cd3da9..47b938d 100644 --- a/src/util.rs +++ b/src/util.rs @@ -1,4 +1,6 @@ use crate::errors::FocusError; +use base64::Engine as _; +use base64::engine::general_purpose; use laplace_rs::{get_from_cache_or_privatize, Bin, ObfCache, ObfuscateBelow10Mode}; use rand::thread_rng; use serde::{Deserialize, Serialize}; @@ -81,6 +83,12 @@ pub(crate) fn read_lines(filename: String) -> Result>, Ok(io::BufReader::new(file).lines()) } +pub(crate) fn base64_decode(data: impl AsRef<[u8]>) -> Result, FocusError> { + general_purpose::STANDARD + .decode(data) + .map_err(FocusError::DecodeError) +} + // REPLACE_MAP is built in build.rs include!(concat!(env!("OUT_DIR"), "/replace_map.rs")); From f19bd4898bf2918b2762d87996367d7513da5c8a Mon Sep 17 00:00:00 2001 From: Enola Knezevic Date: Tue, 30 Jan 2024 16:40:51 +0100 Subject: [PATCH 07/19] obfuscation turned off for ehds2 --- README.md | 2 +- src/config.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 63aa672..24bbd2a 100644 --- a/README.md +++ b/README.md @@ -46,7 +46,7 @@ DELTA_PROCEDURES = "1.7" # Sensitivity parameter for obfuscating the counts in t DELTA_MEDICATION_STATEMENTS = "2.1" # Sensitivity parameter for obfuscating the counts in the Medication Statements stratifier, has no effect if OBFUSCATE = "no", default value: 2.1 EPSILON = "0.1" # Privacy budget parameter for obfuscating the counts in the stratifiers, has no effect if OBFUSCATE = "no", default value: 0.1 ROUNDING_STEP = "10" # The granularity of the rounding of the obfuscated values, has no effect if OBFUSCATE = "no", default value: 10 -PROJECTS_NO_OBFUSCATION = "exliquid;dktk_supervisors;exporter" # Projects for which the results are not to be obfuscated, separated by ;, default value: "exliquid; dktk_supervisors" +PROJECTS_NO_OBFUSCATION = "exliquid;dktk_supervisors;exporter;ehds2" # Projects for which the results are not to be obfuscated, separated by ;, default value: "exliquid;dktk_supervisors;exporter;ehds2" QUERIES_TO_CACHE_FILE_PATH = "resources/bbmri" # The path to the file containing BASE64 encoded queries whose results are to be cached, if not set, no results are cached PROVIDER = "name" #OMOP provider name PROVIDER_ICON = "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABAQMAAAAl21bKAAAAA1BMVEUAAACnej3aAAAAAXRSTlMAQObYZgAAAApJREFUCNdjYAAAAAIAAeIhvDMAAAAASUVORK5CYII=" #Base64 encoded OMOP provider icon diff --git a/src/config.rs b/src/config.rs index 5dc7808..0561a95 100644 --- a/src/config.rs +++ b/src/config.rs @@ -124,7 +124,7 @@ struct CliArgs { rounding_step: usize, /// Projects for which the results are not to be obfuscated, separated by ; - #[clap(long, env, value_parser, default_value = "exliquid;dktk_supervisors;exporter")] + #[clap(long, env, value_parser, default_value = "exliquid;dktk_supervisors;exporter;ehds2")] projects_no_obfuscation: String, /// The path to the file containing BASE64 encoded queries whose results are to be cached From 9b41a6c575e404be541b673f6bbd6ef29dd4b229 Mon Sep 17 00:00:00 2001 From: Enola Knezevic <115070135+enola-dkfz@users.noreply.github.com> Date: Wed, 31 Jan 2024 14:51:52 +0100 Subject: [PATCH 08/19] counting errors again the issue with ttl higher than in beam was resolved months ago --- src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main.rs b/src/main.rs index dffb9a5..7792425 100644 --- a/src/main.rs +++ b/src/main.rs @@ -133,7 +133,7 @@ async fn main_loop() -> ExitCode { if let Err(e) = process_tasks(&mut task_queue, &mut seen_tasks).await { warn!("Encountered the following error, while processing tasks: {e}"); - //failures += 1; //I believe this can be uncommented now + failures += 1; } else { failures = 0; } From 367714c97e10ce65e5c7484d3776ca74de59de5e Mon Sep 17 00:00:00 2001 From: Martin Lablans <6804500+lablans@users.noreply.github.com> Date: Fri, 2 Feb 2024 09:00:38 +0100 Subject: [PATCH 09/19] Don't cache CQL maps --- build.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/build.rs b/build.rs index ac9d9fe..76667b0 100644 --- a/build.rs +++ b/build.rs @@ -46,7 +46,8 @@ fn main() { build_data::set_GIT_DIRTY(); build_data::set_BUILD_DATE(); build_data::set_BUILD_TIME(); - build_data::no_debug_rebuilds(); + // We must always run this build script as otherwise, we would cache old versions of CQL maps + //build_data::no_debug_rebuilds(); println!("cargo:rustc-env=SAMPLY_USER_AGENT=Samply.Focus.{}/{}", env!("CARGO_PKG_NAME"), version()); build_cqlmap(); From 0547c917109245cc9152c6d665aebff6e54e43d0 Mon Sep 17 00:00:00 2001 From: janskiba Date: Tue, 13 Feb 2024 12:45:12 +0000 Subject: [PATCH 10/19] refactor: Improve error messages --- src/errors.rs | 33 +++++++++++++++++++++++---------- 1 file changed, 23 insertions(+), 10 deletions(-) diff --git a/src/errors.rs b/src/errors.rs index 6cb4d42..3a3ba41 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -2,29 +2,29 @@ use thiserror::Error; #[derive(Error, Debug)] pub enum FocusError { - #[error("Unable to post FHIR Library")] + #[error("Unable to post FHIR Library: {0}")] UnableToPostLibrary(reqwest::Error), - #[error("Unable to post FHIR Measure")] + #[error("Unable to post FHIR Measure: {0}")] UnableToPostMeasure(reqwest::Error), - #[error("FHIR Measure evaluation error in Reqwest")] + #[error("FHIR Measure evaluation error in Reqwest: {0}")] MeasureEvaluationErrorReqwest(reqwest::Error), - #[error("FHIR Measure evaluation error in Blaze")] + #[error("FHIR Measure evaluation error in Blaze: {0}")] MeasureEvaluationErrorBlaze(String), #[error("CQL query error")] CQLQueryError(), #[error("Unable to retrieve tasks from Beam: {0}")] UnableToRetrieveTasksHttp(beam_lib::BeamError), - #[error("Unable to answer task")] + #[error("Unable to answer task: {0}")] UnableToAnswerTask(beam_lib::BeamError), - #[error("Unable to set proxy settings")] + #[error("Unable to set proxy settings: {0}")] InvalidProxyConfig(reqwest::Error), - #[error("Decode error")] + #[error("Decode error: {0}")] DecodeError(base64::DecodeError), - #[error("Configuration error")] + #[error("Configuration error: {0}")] ConfigurationError(String), - #[error("Cannot open file")] + #[error("Cannot open file: {0}")] FileOpeningError(String), - #[error("Parsing error")] + #[error("Parsing error: {0}")] ParsingError(String), #[error("CQL tampered with: {0}")] CQLTemperedWithError(String), @@ -48,3 +48,16 @@ pub enum FocusError { MissingExporterEndpoint(), } + +impl FocusError { + /// Generate a descriptive error message that does not leak any sensitive data that might me contained inside the error value + pub fn user_faceing_error(&self) -> &'static str { + use FocusError::*; + // TODO: Add more match arms + match self { + DecodeError(_) | ParsingError(_) => "Cannot parse query.", + LaplaceError(_) => "Cannot obfuscate result.", + _ => "Failed to execute query." + } + } +} From f1eabc31028c64ecb4eafb1fc9838de7ac26bc1a Mon Sep 17 00:00:00 2001 From: janskiba Date: Tue, 13 Feb 2024 12:46:34 +0000 Subject: [PATCH 11/19] feat: Cancel claiming task --- src/beam.rs | 6 ++--- src/task_processing.rs | 58 ++++++++++++++++++++---------------------- 2 files changed, 30 insertions(+), 34 deletions(-) diff --git a/src/beam.rs b/src/beam.rs index 900ac1d..7979ef9 100644 --- a/src/beam.rs +++ b/src/beam.rs @@ -85,9 +85,9 @@ pub async fn retrieve_tasks() -> Result>, FocusError> { .map_err(FocusError::UnableToRetrieveTasksHttp) } -pub async fn answer_task(task_id: MsgId, result: &TaskResult) -> Result { - debug!("Answer task with id: {task_id}"); - BEAM_CLIENT.put_result(result, &task_id) +pub async fn answer_task(result: &TaskResult) -> Result { + debug!("Answer task with id: {}", result.task); + BEAM_CLIENT.put_result(result, &result.task) .await .map_err(FocusError::UnableToAnswerTask) } diff --git a/src/task_processing.rs b/src/task_processing.rs index f5fee6e..6125063 100644 --- a/src/task_processing.rs +++ b/src/task_processing.rs @@ -3,7 +3,7 @@ use std::{sync::Arc, collections::HashMap, time::Duration}; use base64::{engine::general_purpose, Engine as _}; use laplace_rs::ObfCache; use tokio::sync::{mpsc, Semaphore, Mutex}; -use tracing::{error, warn, debug}; +use tracing::{error, warn, debug, Instrument, info_span}; use crate::{ReportCache, errors::FocusError, beam, BeamTask, BeamResult, run_exporter_query, config::{EndpointType, CONFIG}, run_cql_query, intermediate_rep, ast, run_intermediate_rep_query, Metadata, blaze::parse_blaze_query, util}; @@ -13,7 +13,7 @@ const WORKER_BUFFER: usize = 32; pub type TaskQueue = mpsc::Sender; pub fn spawn_task_workers(report_cache: ReportCache) -> TaskQueue { - let (tx, mut rx) = mpsc::channel(WORKER_BUFFER); + let (tx, mut rx) = mpsc::channel::(WORKER_BUFFER); let obf_cache = Arc::new(Mutex::new(ObfCache { cache: HashMap::new(), @@ -28,7 +28,8 @@ pub fn spawn_task_workers(report_cache: ReportCache) -> TaskQueue { let local_report_cache = report_cache.clone(); let local_obf_cache = obf_cache.clone(); tokio::spawn(async move { - handle_beam_task(task, local_obf_cache, local_report_cache).await; + let span = info_span!("task handeling", %task.id); + handle_beam_task(task, local_obf_cache, local_report_cache).instrument(span).await; drop(permit) }); } @@ -38,41 +39,36 @@ pub fn spawn_task_workers(report_cache: ReportCache) -> TaskQueue { } async fn handle_beam_task(task: BeamTask, local_obf_cache: Arc>, local_report_cache: Arc>) { - let task_cloned = task.clone(); - let claiming = tokio::task::spawn(async move { beam::claim_task(&task_cloned).await }); - let res = process_task(&task, local_obf_cache, local_report_cache).await; - let error_msg = match res { - Err(FocusError::DecodeError(_)) | Err(FocusError::ParsingError(_)) => { - Some("Cannot parse query".to_string()) + let task_claiming = beam::claim_task(&task); + let mut task_processing = std::pin::pin!(process_task(&task, local_obf_cache, local_report_cache)); + let task_result = tokio::select! { + // If task task proccessing happens before claiming is done drop the task claiming future + task_processed = &mut task_processing => { + task_processed + }, + task_claimed = task_claiming => { + if let Err(e) = task_claimed { + warn!("Failed to claim task: {e}"); + } else { + debug!("Successfully claimed task"); + }; + task_processing.await } - Err(FocusError::LaplaceError(_)) => Some("Cannot obfuscate result".to_string()), - Err(ref e) => Some(format!("Cannot execute query: {}", e)), - Ok(_) => None, }; - - let res = res.ok(); - // Make sure that claiming the task is done before we update it again. - match claiming.await.unwrap() { - Ok(_) => {} - Err(FocusError::ConfigurationError(s)) => { - error!("FATAL: Unable to report back to Beam due to a configuration issue: {s}"); - } - Err(FocusError::UnableToAnswerTask(e)) => { - warn!("Unable to report claimed task to Beam: {e}"); - } + let result = match task_result { + Ok(res) => res, Err(e) => { - warn!("Unknown error reporting claimed task back to Beam: {e}"); + warn!("Failed to execute query: {e}"); + if let Err(e) = beam::fail_task(&task, e.user_faceing_error()).await { + warn!("Failed to report failure to beam: {e}"); + } + return; } - } + }; const MAX_TRIES: u32 = 3600; for attempt in 0..MAX_TRIES { - let comm_result = if let Some(ref err_msg) = error_msg { - beam::fail_task(&task, err_msg).await - } else { - beam::answer_task(task.id, res.as_ref().unwrap()).await - }; - match comm_result { + match beam::answer_task(&result).await { Ok(_) => break, Err(FocusError::ConfigurationError(s)) => { error!( From 5ef8552dde726aba6be991b746f1299b13f2ba15 Mon Sep 17 00:00:00 2001 From: janskiba Date: Tue, 13 Feb 2024 12:59:36 +0000 Subject: [PATCH 12/19] fix: typos --- src/errors.rs | 2 +- src/task_processing.rs | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/errors.rs b/src/errors.rs index 3a3ba41..1f16989 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -51,7 +51,7 @@ pub enum FocusError { impl FocusError { /// Generate a descriptive error message that does not leak any sensitive data that might me contained inside the error value - pub fn user_faceing_error(&self) -> &'static str { + pub fn user_facing_error(&self) -> &'static str { use FocusError::*; // TODO: Add more match arms match self { diff --git a/src/task_processing.rs b/src/task_processing.rs index 6125063..1f5da91 100644 --- a/src/task_processing.rs +++ b/src/task_processing.rs @@ -28,7 +28,7 @@ pub fn spawn_task_workers(report_cache: ReportCache) -> TaskQueue { let local_report_cache = report_cache.clone(); let local_obf_cache = obf_cache.clone(); tokio::spawn(async move { - let span = info_span!("task handeling", %task.id); + let span = info_span!("task handling", %task.id); handle_beam_task(task, local_obf_cache, local_report_cache).instrument(span).await; drop(permit) }); @@ -42,7 +42,7 @@ async fn handle_beam_task(task: BeamTask, local_obf_cache: Arc>, let task_claiming = beam::claim_task(&task); let mut task_processing = std::pin::pin!(process_task(&task, local_obf_cache, local_report_cache)); let task_result = tokio::select! { - // If task task proccessing happens before claiming is done drop the task claiming future + // If task task processing happens before claiming is done drop the task claiming future task_processed = &mut task_processing => { task_processed }, @@ -59,7 +59,7 @@ async fn handle_beam_task(task: BeamTask, local_obf_cache: Arc>, Ok(res) => res, Err(e) => { warn!("Failed to execute query: {e}"); - if let Err(e) = beam::fail_task(&task, e.user_faceing_error()).await { + if let Err(e) = beam::fail_task(&task, e.user_facing_error()).await { warn!("Failed to report failure to beam: {e}"); } return; From c8c23592143a4f3977110e637b6196114ca727b5 Mon Sep 17 00:00:00 2001 From: Enola Knezevic <115070135+enola-dkfz@users.noreply.github.com> Date: Tue, 13 Feb 2024 14:33:55 +0100 Subject: [PATCH 13/19] Update src/errors.rs --- src/errors.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/errors.rs b/src/errors.rs index 1f16989..297cc8e 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -50,7 +50,7 @@ pub enum FocusError { } impl FocusError { - /// Generate a descriptive error message that does not leak any sensitive data that might me contained inside the error value + /// Generate a descriptive error message that does not leak any sensitive data that might be contained inside the error value pub fn user_facing_error(&self) -> &'static str { use FocusError::*; // TODO: Add more match arms From d0b5cd354915f8be0b9fd1efd17cd38044050576 Mon Sep 17 00:00:00 2001 From: Enola Knezevic Date: Wed, 14 Feb 2024 10:24:28 +0100 Subject: [PATCH 14/19] merge main into develop --- src/beam.rs | 9 +++++++-- src/task_processing.rs | 9 +++++++++ 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/src/beam.rs b/src/beam.rs index 7979ef9..1d797fa 100644 --- a/src/beam.rs +++ b/src/beam.rs @@ -1,6 +1,7 @@ use std::time::Duration; use beam_lib::{TaskResult, BeamClient, BlockingOptions, MsgId, TaskRequest, RawString}; +use http::StatusCode; use once_cell::sync::Lazy; use serde::Serialize; use tracing::{debug, warn}; @@ -92,13 +93,17 @@ pub async fn answer_task(result: &TaskResult) -> Resu .map_err(FocusError::UnableToAnswerTask) } -pub async fn fail_task(task: &TaskRequest, body: impl Into) -> Result { +pub async fn fail_task(task: &TaskRequest, body: impl Into) -> Result<(), FocusError> { let body = body.into(); warn!("Reporting failed task with id {}: {}", task.id, body); let result = beam_result::perm_failed(CONFIG.beam_app_id_long.clone(), vec![task.from.clone()], task.id, body); BEAM_CLIENT.put_result(&result, &task.id) .await - .map_err(FocusError::UnableToAnswerTask) + .map(|_| ()) + .or_else(|e| match e { + beam_lib::BeamError::UnexpectedStatus(s) if s == StatusCode::NOT_FOUND => Ok(()), + other => Err(FocusError::UnableToAnswerTask(other)) + }) } pub async fn claim_task(task: &TaskRequest) -> Result { diff --git a/src/task_processing.rs b/src/task_processing.rs index 1f5da91..dddaa1f 100644 --- a/src/task_processing.rs +++ b/src/task_processing.rs @@ -98,6 +98,15 @@ async fn process_task( execute: true, }); + if metadata.project == "focus-healthcheck" { + return Ok(beam::beam_result::succeeded( + CONFIG.beam_app_id_long.clone(), + vec![task.from.clone()], + task.id, + "healthy".into() + )); + } + if metadata.project == "exporter" { let body = &task.body; return Ok(run_exporter_query(task, body, metadata.execute).await)?; From 6108d3e0053e081724c967d9ea353d4044cc7ef0 Mon Sep 17 00:00:00 2001 From: Enola Knezevic Date: Wed, 14 Feb 2024 11:24:20 +0100 Subject: [PATCH 15/19] fixed merge, beam methods don't return bool --- src/beam.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/beam.rs b/src/beam.rs index 1d797fa..d67ef9f 100644 --- a/src/beam.rs +++ b/src/beam.rs @@ -86,11 +86,15 @@ pub async fn retrieve_tasks() -> Result>, FocusError> { .map_err(FocusError::UnableToRetrieveTasksHttp) } -pub async fn answer_task(result: &TaskResult) -> Result { +pub async fn answer_task(result: &TaskResult) -> Result<(), FocusError> { debug!("Answer task with id: {}", result.task); BEAM_CLIENT.put_result(result, &result.task) - .await - .map_err(FocusError::UnableToAnswerTask) + .await + .map(|_| ()) + .or_else(|e| match e { + beam_lib::BeamError::UnexpectedStatus(s) if s == StatusCode::NOT_FOUND => Ok(()), + other => Err(FocusError::UnableToAnswerTask(other)) + }) } pub async fn fail_task(task: &TaskRequest, body: impl Into) -> Result<(), FocusError> { From d4bcd132c2054b78116d83c0b06b8600aaaa651c Mon Sep 17 00:00:00 2001 From: MLambarki <57628671+MLambarki@users.noreply.github.com> Date: Thu, 22 Feb 2024 08:25:49 +0100 Subject: [PATCH 16/19] Bugfix: the CQL extension should prevent the sorting of null objects --- resources/cql/DKTK_STRAT_AGE_STRATIFIER | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resources/cql/DKTK_STRAT_AGE_STRATIFIER b/resources/cql/DKTK_STRAT_AGE_STRATIFIER index aab0cb3..6cb9744 100644 --- a/resources/cql/DKTK_STRAT_AGE_STRATIFIER +++ b/resources/cql/DKTK_STRAT_AGE_STRATIFIER @@ -1,7 +1,7 @@ define PrimaryDiagnosis: First( from [Condition] C -where C.extension.where(url='http://hl7.org/fhir/StructureDefinition/condition-related').empty() +where C.extension.where(url='http://hl7.org/fhir/StructureDefinition/condition-related').empty() and C.onset is not null sort by date from onset asc) define AgeClass: From 5620f7f82db391f343a31d1684a079fb3165ef1c Mon Sep 17 00:00:00 2001 From: Enola Knezevic <115070135+enola-dkfz@users.noreply.github.com> Date: Mon, 26 Feb 2024 16:33:30 +0100 Subject: [PATCH 17/19] Update Cargo.toml --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 9694dd9..d6e5e40 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "focus" -version = "0.4.0" +version = "0.5.0" edition = "2021" license = "Apache-2.0" From 5e0033f1c20e3da965ad4e7d7af07132591181cd Mon Sep 17 00:00:00 2001 From: Enola Knezevic Date: Mon, 26 Feb 2024 20:59:05 +0100 Subject: [PATCH 18/19] updated readme --- README.md | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 24bbd2a..6b7ea89 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,8 @@ # Focus -Focus is a Samply component ran on the sites, which distributes tasks from Beam.Proxy to the applications on the site and re-transmits the results through Samply.Beam. Currenly, only Samply.Blaze is supported as a target application, but Focus is easily extensible. +Focus is a Samply component ran on the sites, which distributes tasks from Beam.Proxy to the applications on the site and re-transmits the results through Samply.Beam. -It is possible to specify the queries whose results are to be cached to speed up retrieval. The cached results expire after 24 hours. +It is possible to specify Blaze queries whose results are to be cached to speed up retrieval. The cached results expire after 24 hours. ## Installation @@ -59,7 +59,13 @@ Optionally, you can provide the `TLS_CA_CERTIFICATES_DIR` environment variable t ## Usage -Creating a sample task using CURL: +Creating a sample focus healthcheck task using CURL (body can be any string and is ignored): + +```bash +curl -v -X POST -H "Content-Type: application/json" --data '{"id":"7fffefff-ffef-fcff-feef-feffffffffff","from":"app1.proxy1.broker","to":["app1.proxy1.broker"],"ttl":"10s","failure_strategy":{"retry":{"backoff_millisecs":1000,"max_tries":5}},"metadata":{"project":"focus-healthcheck"},"body":"wie geht es"}' -H "Authorization: ApiKey app1.proxy1.broker App1Secret" http://localhost:8081/v1/tasks +``` + +Creating a sample task containing a Blaze query using CURL: ```bash curl -v -X POST -H "Content-Type: application/json" --data '{"id":"7fffefff-ffef-fcff-feef-fefbffffeeff","from":"app1.proxy1.broker","to":["app1.proxy1.broker"],"ttl":"10s","failure_strategy":{"retry":{"backoff_millisecs":1000,"max_tries":5}},"metadata":{"project":"exliquid"},"body":"ewoJImxhbmciOiAiY3FsIiwKCSJsaWIiOiB7CgkJImNvbnRlbnQiOiBbCgkJCXsKCQkJCSJjb250ZW50VHlwZSI6ICJ0ZXh0L2NxbCIsCgkJCQkiZGF0YSI6ICJiR2xpY21GeWVTQlNaWFJ5YVdWMlpRcDFjMmx1WnlCR1NFbFNJSFpsY25OcGIyNGdKelF1TUM0d0p3cHBibU5zZFdSbElFWklTVkpJWld4d1pYSnpJSFpsY25OcGIyNGdKelF1TUM0d0p3b0tZMjlrWlhONWMzUmxiU0JzYjJsdVl6b2dKMmgwZEhBNkx5OXNiMmx1WXk1dmNtY25DbU52WkdWemVYTjBaVzBnYVdOa01UQTZJQ2RvZEhSd09pOHZhR3czTG05eVp5OW1hR2x5TDNOcFpDOXBZMlF0TVRBbkNtTnZaR1Z6ZVhOMFpXMGdVMkZ0Y0d4bFRXRjBaWEpwWVd4VWVYQmxPaUFuYUhSMGNITTZMeTltYUdseUxtSmliWEpwTG1SbEwwTnZaR1ZUZVhOMFpXMHZVMkZ0Y0d4bFRXRjBaWEpwWVd4VWVYQmxKd29LQ21OdmJuUmxlSFFnVUdGMGFXVnVkQW9LUWtKTlVrbGZVMVJTUVZSZlIwVk9SRVZTWDFOVVVrRlVTVVpKUlZJS0NrSkNUVkpKWDFOVVVrRlVYMFJGUmw5VFVFVkRTVTFGVGdwcFppQkpia2x1YVhScFlXeFFiM0IxYkdGMGFXOXVJSFJvWlc0Z1cxTndaV05wYldWdVhTQmxiSE5sSUh0OUlHRnpJRXhwYzNROFUzQmxZMmx0Wlc0K0NncENRazFTU1Y5VFZGSkJWRjlUUVUxUVRFVmZWRmxRUlY5VFZGSkJWRWxHU1VWU0NncENRazFTU1Y5VFZGSkJWRjlEVlZOVVQwUkpRVTVmVTFSU1FWUkpSa2xGVWdvS1FrSk5Va2xmVTFSU1FWUmZSRWxCUjA1UFUwbFRYMU5VVWtGVVNVWkpSVklLQ2tKQ1RWSkpYMU5VVWtGVVgwRkhSVjlUVkZKQlZFbEdTVVZTQ2dwQ1FrMVNTVjlUVkZKQlZGOUVSVVpmU1U1ZlNVNUpWRWxCVEY5UVQxQlZURUZVU1U5T0NuUnlkV1U9IgoJCQl9CgkJXSwKCQkicmVzb3VyY2VUeXBlIjogIkxpYnJhcnkiLAoJCSJzdGF0dXMiOiAiYWN0aXZlIiwKCQkidHlwZSI6IHsKCQkJImNvZGluZyI6IFsKCQkJCXsKCQkJCQkiY29kZSI6ICJsb2dpYy1saWJyYXJ5IiwKCQkJCQkic3lzdGVtIjogImh0dHA6Ly90ZXJtaW5vbG9neS5obDcub3JnL0NvZGVTeXN0ZW0vbGlicmFyeS10eXBlIgoJCQkJfQoJCQldCgkJfSwKCQkidXJsIjogInVybjp1dWlkOjdmZjUzMmFkLTY5ZTQtNDhlZC1hMmQzLTllZmFmYjYwOWY2MiIKCX0sCgkibWVhc3VyZSI6IHsKCQkiZ3JvdXAiOiBbCgkJCXsKCQkJCSJjb2RlIjogewoJCQkJCSJ0ZXh0IjogInBhdGllbnRzIgoJCQkJfSwKCQkJCSJwb3B1bGF0aW9uIjogWwoJCQkJCXsKCQkJCQkJImNvZGUiOiB7CgkJCQkJCQkiY29kaW5nIjogWwoJCQkJCQkJCXsKCQkJCQkJCQkJImNvZGUiOiAiaW5pdGlhbC1wb3B1bGF0aW9uIiwKCQkJCQkJCQkJInN5c3RlbSI6ICJodHRwOi8vdGVybWlub2xvZ3kuaGw3Lm9yZy9Db2RlU3lzdGVtL21lYXN1cmUtcG9wdWxhdGlvbiIKCQkJCQkJCQl9CgkJCQkJCQldCgkJCQkJCX0sCgkJCQkJCSJjcml0ZXJpYSI6IHsKCQkJCQkJCSJleHByZXNzaW9uIjogIkluSW5pdGlhbFBvcHVsYXRpb24iLAoJCQkJCQkJImxhbmd1YWdlIjogInRleHQvY3FsLWlkZW50aWZpZXIiCgkJCQkJCX0KCQkJCQl9CgkJCQldLAoJCQkJInN0cmF0aWZpZXIiOiBbCgkJCQkJewoJCQkJCQkiY29kZSI6IHsKCQkJCQkJCSJ0ZXh0IjogIkdlbmRlciIKCQkJCQkJfSwKCQkJCQkJImNyaXRlcmlhIjogewoJCQkJCQkJImV4cHJlc3Npb24iOiAiR2VuZGVyIiwKCQkJCQkJCSJsYW5ndWFnZSI6ICJ0ZXh0L2NxbCIKCQkJCQkJfQoJCQkJCX0sCgkJCQkJewoJCQkJCQkiY29kZSI6IHsKCQkJCQkJCSJ0ZXh0IjogIkFnZSIKCQkJCQkJfSwKCQkJCQkJImNyaXRlcmlhIjogewoJCQkJCQkJImV4cHJlc3Npb24iOiAiQWdlQ2xhc3MiLAoJCQkJCQkJImxhbmd1YWdlIjogInRleHQvY3FsIgoJCQkJCQl9CgkJCQkJfSwKCQkJCQl7CgkJCQkJCSJjb2RlIjogewoJCQkJCQkJInRleHQiOiAiQ3VzdG9kaWFuIgoJCQkJCQl9LAoJCQkJCQkiY3JpdGVyaWEiOiB7CgkJCQkJCQkiZXhwcmVzc2lvbiI6ICJDdXN0b2RpYW4iLAoJCQkJCQkJImxhbmd1YWdlIjogInRleHQvY3FsIgoJCQkJCQl9CgkJCQkJfQoJCQkJXQoJCQl9LAoJCQl7CgkJCQkiY29kZSI6IHsKCQkJCQkidGV4dCI6ICJkaWFnbm9zaXMiCgkJCQl9LAoJCQkJImV4dGVuc2lvbiI6IFsKCQkJCQl7CgkJCQkJCSJ1cmwiOiAiaHR0cDovL2hsNy5vcmcvZmhpci91cy9jcWZtZWFzdXJlcy9TdHJ1Y3R1cmVEZWZpbml0aW9uL2NxZm0tcG9wdWxhdGlvbkJhc2lzIiwKCQkJCQkJInZhbHVlQ29kZSI6ICJDb25kaXRpb24iCgkJCQkJfQoJCQkJXSwKCQkJCSJwb3B1bGF0aW9uIjogWwoJCQkJCXsKCQkJCQkJImNvZGUiOiB7CgkJCQkJCQkiY29kaW5nIjogWwoJCQkJCQkJCXsKCQkJCQkJCQkJImNvZGUiOiAiaW5pdGlhbC1wb3B1bGF0aW9uIiwKCQkJCQkJCQkJInN5c3RlbSI6ICJodHRwOi8vdGVybWlub2xvZ3kuaGw3Lm9yZy9Db2RlU3lzdGVtL21lYXN1cmUtcG9wdWxhdGlvbiIKCQkJCQkJCQl9CgkJCQkJCQldCgkJCQkJCX0sCgkJCQkJCSJjcml0ZXJpYSI6IHsKCQkJCQkJCSJleHByZXNzaW9uIjogIkRpYWdub3NpcyIsCgkJCQkJCQkibGFuZ3VhZ2UiOiAidGV4dC9jcWwtaWRlbnRpZmllciIKCQkJCQkJfQoJCQkJCX0KCQkJCV0sCgkJCQkic3RyYXRpZmllciI6IFsKCQkJCQl7CgkJCQkJCSJjb2RlIjogewoJCQkJCQkJInRleHQiOiAiZGlhZ25vc2lzIgoJCQkJCQl9LAoJCQkJCQkiY3JpdGVyaWEiOiB7CgkJCQkJCQkiZXhwcmVzc2lvbiI6ICJEaWFnbm9zaXNDb2RlIiwKCQkJCQkJCSJsYW5ndWFnZSI6ICJ0ZXh0L2NxbC1pZGVudGlmaWVyIgoJCQkJCQl9CgkJCQkJfQoJCQkJXQoJCQl9LAoJCQl7CgkJCQkiY29kZSI6IHsKCQkJCQkidGV4dCI6ICJzcGVjaW1lbiIKCQkJCX0sCgkJCQkiZXh0ZW5zaW9uIjogWwoJCQkJCXsKCQkJCQkJInVybCI6ICJodHRwOi8vaGw3Lm9yZy9maGlyL3VzL2NxZm1lYXN1cmVzL1N0cnVjdHVyZURlZmluaXRpb24vY3FmbS1wb3B1bGF0aW9uQmFzaXMiLAoJCQkJCQkidmFsdWVDb2RlIjogIlNwZWNpbWVuIgoJCQkJCX0KCQkJCV0sCgkJCQkicG9wdWxhdGlvbiI6IFsKCQkJCQl7CgkJCQkJCSJjb2RlIjogewoJCQkJCQkJImNvZGluZyI6IFsKCQkJCQkJCQl7CgkJCQkJCQkJCSJjb2RlIjogImluaXRpYWwtcG9wdWxhdGlvbiIsCgkJCQkJCQkJCSJzeXN0ZW0iOiAiaHR0cDovL3Rlcm1pbm9sb2d5LmhsNy5vcmcvQ29kZVN5c3RlbS9tZWFzdXJlLXBvcHVsYXRpb24iCgkJCQkJCQkJfQoJCQkJCQkJXQoJCQkJCQl9LAoJCQkJCQkiY3JpdGVyaWEiOiB7CgkJCQkJCQkiZXhwcmVzc2lvbiI6ICJTcGVjaW1lbiIsCgkJCQkJCQkibGFuZ3VhZ2UiOiAidGV4dC9jcWwtaWRlbnRpZmllciIKCQkJCQkJfQoJCQkJCX0KCQkJCV0sCgkJCQkic3RyYXRpZmllciI6IFsKCQkJCQl7CgkJCQkJCSJjb2RlIjogewoJCQkJCQkJInRleHQiOiAic2FtcGxlX2tpbmQiCgkJCQkJCX0sCgkJCQkJCSJjcml0ZXJpYSI6IHsKCQkJCQkJCSJleHByZXNzaW9uIjogIlNhbXBsZVR5cGUiLAoJCQkJCQkJImxhbmd1YWdlIjogInRleHQvY3FsIgoJCQkJCQl9CgkJCQkJfQoJCQkJXQoJCQl9CgkJXSwKCQkibGlicmFyeSI6ICJ1cm46dXVpZDo3ZmY1MzJhZC02OWU0LTQ4ZWQtYTJkMy05ZWZhZmI2MDlmNjIiLAoJCSJyZXNvdXJjZVR5cGUiOiAiTWVhc3VyZSIsCgkJInNjb3JpbmciOiB7CgkJCSJjb2RpbmciOiBbCgkJCQl7CgkJCQkJImNvZGUiOiAiY29ob3J0IiwKCQkJCQkic3lzdGVtIjogImh0dHA6Ly90ZXJtaW5vbG9neS5obDcub3JnL0NvZGVTeXN0ZW0vbWVhc3VyZS1zY29yaW5nIgoJCQkJfQoJCQldCgkJfSwKCQkic3RhdHVzIjogImFjdGl2ZSIsCgkJInN1YmplY3RDb2RlYWJsZUNvbmNlcHQiOiB7CgkJCSJjb2RpbmciOiBbCgkJCQl7CgkJCQkJImNvZGUiOiAiUGF0aWVudCIsCgkJCQkJInN5c3RlbSI6ICJodHRwOi8vaGw3Lm9yZy9maGlyL3Jlc291cmNlLXR5cGVzIgoJCQkJfQoJCQldCgkJfSwKCQkidXJsIjogInVybjp1dWlkOjVlZThkZTczLTM0N2UtNDdjYS1hMDE0LWYyZTcxNzY3YWRmYyIKCX0KfQ=="}' -H "Authorization: ApiKey app1.proxy1.broker App1Secret" http://localhost:8081/v1/tasks From e1252a8fc61bb56738b8befdb885ea0c22cd13e6 Mon Sep 17 00:00:00 2001 From: Enola Knezevic Date: Mon, 26 Feb 2024 21:03:52 +0100 Subject: [PATCH 19/19] formatting --- src/main.rs | 36 +++++++++++++++++------------------- 1 file changed, 17 insertions(+), 19 deletions(-) diff --git a/src/main.rs b/src/main.rs index 0b02a8f..64218b9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -54,32 +54,31 @@ struct ReportCache { } impl ReportCache { - pub fn new() -> Self { let mut cache = HashMap::new(); - + if let Some(filename) = CONFIG.queries_to_cache_file_path.clone() { let lines = util::read_lines(filename.clone().to_string()); match lines { Ok(ok_lines) => { - for line in ok_lines { - let Ok(ok_line) = line else { - warn!("A line in the file {} is not readable", filename); - continue; - }; - cache.insert((ok_line.clone(), false), ("".into(), UNIX_EPOCH)); - cache.insert((ok_line, true), ("".into(), UNIX_EPOCH)); - } - }, + for line in ok_lines { + let Ok(ok_line) = line else { + warn!("A line in the file {} is not readable", filename); + continue; + }; + cache.insert((ok_line.clone(), false), ("".into(), UNIX_EPOCH)); + cache.insert((ok_line, true), ("".into(), UNIX_EPOCH)); + } + } Err(_) => { - error!("The file {} cannot be opened", filename); //This shouldn't stop focus from running, it's just going to go to blaze every time, but that's not too slow + error!("The file {} cannot be opened", filename); //This shouldn't stop focus from running, it's just going to go to blaze every time, but that's not too slow } } } - - Self {cache} + + Self { cache } } - } +} const REPORTCACHE_TTL: Duration = Duration::from_secs(86400); //24h @@ -133,7 +132,7 @@ async fn main_loop() -> ExitCode { if let Err(e) = process_tasks(&mut task_queue, &mut seen_tasks).await { warn!("Encountered the following error, while processing tasks: {e}"); - failures += 1; + failures += 1; } else { failures = 0; } @@ -152,10 +151,8 @@ async fn process_tasks( debug!("Start processing tasks..."); let tasks = beam::retrieve_tasks().await?; for task in tasks { - if seen.contains(&task.id) { continue; - } seen.insert(task.id); task_queue @@ -183,7 +180,8 @@ async fn run_cql_query( let mut key_exists = false; - let obfuscate = CONFIG.obfuscate == config::Obfuscate::Yes && !CONFIG.unobfuscated.contains(&project); + let obfuscate = + CONFIG.obfuscate == config::Obfuscate::Yes && !CONFIG.unobfuscated.contains(&project); let report_from_cache = match report_cache .lock()