diff --git a/Cargo.toml b/Cargo.toml index 9694dd9..d6e5e40 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "focus" -version = "0.4.0" +version = "0.5.0" edition = "2021" license = "Apache-2.0" diff --git a/README.md b/README.md index 63aa672..6b7ea89 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,8 @@ # Focus -Focus is a Samply component ran on the sites, which distributes tasks from Beam.Proxy to the applications on the site and re-transmits the results through Samply.Beam. Currenly, only Samply.Blaze is supported as a target application, but Focus is easily extensible. +Focus is a Samply component ran on the sites, which distributes tasks from Beam.Proxy to the applications on the site and re-transmits the results through Samply.Beam. -It is possible to specify the queries whose results are to be cached to speed up retrieval. The cached results expire after 24 hours. +It is possible to specify Blaze queries whose results are to be cached to speed up retrieval. The cached results expire after 24 hours. ## Installation @@ -46,7 +46,7 @@ DELTA_PROCEDURES = "1.7" # Sensitivity parameter for obfuscating the counts in t DELTA_MEDICATION_STATEMENTS = "2.1" # Sensitivity parameter for obfuscating the counts in the Medication Statements stratifier, has no effect if OBFUSCATE = "no", default value: 2.1 EPSILON = "0.1" # Privacy budget parameter for obfuscating the counts in the stratifiers, has no effect if OBFUSCATE = "no", default value: 0.1 ROUNDING_STEP = "10" # The granularity of the rounding of the obfuscated values, has no effect if OBFUSCATE = "no", default value: 10 -PROJECTS_NO_OBFUSCATION = "exliquid;dktk_supervisors;exporter" # Projects for which the results are not to be obfuscated, separated by ;, default value: "exliquid; dktk_supervisors" +PROJECTS_NO_OBFUSCATION = "exliquid;dktk_supervisors;exporter;ehds2" # Projects for which the results are not to be obfuscated, separated by ;, default value: "exliquid;dktk_supervisors;exporter;ehds2" QUERIES_TO_CACHE_FILE_PATH = "resources/bbmri" # The path to the file containing BASE64 encoded queries whose results are to be cached, if not set, no results are cached PROVIDER = "name" #OMOP provider name PROVIDER_ICON = "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABAQMAAAAl21bKAAAAA1BMVEUAAACnej3aAAAAAXRSTlMAQObYZgAAAApJREFUCNdjYAAAAAIAAeIhvDMAAAAASUVORK5CYII=" #Base64 encoded OMOP provider icon @@ -59,7 +59,13 @@ Optionally, you can provide the `TLS_CA_CERTIFICATES_DIR` environment variable t ## Usage -Creating a sample task using CURL: +Creating a sample focus healthcheck task using CURL (body can be any string and is ignored): + +```bash +curl -v -X POST -H "Content-Type: application/json" --data '{"id":"7fffefff-ffef-fcff-feef-feffffffffff","from":"app1.proxy1.broker","to":["app1.proxy1.broker"],"ttl":"10s","failure_strategy":{"retry":{"backoff_millisecs":1000,"max_tries":5}},"metadata":{"project":"focus-healthcheck"},"body":"wie geht es"}' -H "Authorization: ApiKey app1.proxy1.broker App1Secret" http://localhost:8081/v1/tasks +``` + +Creating a sample task containing a Blaze query using CURL: ```bash curl -v -X POST -H "Content-Type: application/json" --data '{"id":"7fffefff-ffef-fcff-feef-fefbffffeeff","from":"app1.proxy1.broker","to":["app1.proxy1.broker"],"ttl":"10s","failure_strategy":{"retry":{"backoff_millisecs":1000,"max_tries":5}},"metadata":{"project":"exliquid"},"body":"ewoJImxhbmciOiAiY3FsIiwKCSJsaWIiOiB7CgkJImNvbnRlbnQiOiBbCgkJCXsKCQkJCSJjb250ZW50VHlwZSI6ICJ0ZXh0L2NxbCIsCgkJCQkiZGF0YSI6ICJiR2xpY21GeWVTQlNaWFJ5YVdWMlpRcDFjMmx1WnlCR1NFbFNJSFpsY25OcGIyNGdKelF1TUM0d0p3cHBibU5zZFdSbElFWklTVkpJWld4d1pYSnpJSFpsY25OcGIyNGdKelF1TUM0d0p3b0tZMjlrWlhONWMzUmxiU0JzYjJsdVl6b2dKMmgwZEhBNkx5OXNiMmx1WXk1dmNtY25DbU52WkdWemVYTjBaVzBnYVdOa01UQTZJQ2RvZEhSd09pOHZhR3czTG05eVp5OW1hR2x5TDNOcFpDOXBZMlF0TVRBbkNtTnZaR1Z6ZVhOMFpXMGdVMkZ0Y0d4bFRXRjBaWEpwWVd4VWVYQmxPaUFuYUhSMGNITTZMeTltYUdseUxtSmliWEpwTG1SbEwwTnZaR1ZUZVhOMFpXMHZVMkZ0Y0d4bFRXRjBaWEpwWVd4VWVYQmxKd29LQ21OdmJuUmxlSFFnVUdGMGFXVnVkQW9LUWtKTlVrbGZVMVJTUVZSZlIwVk9SRVZTWDFOVVVrRlVTVVpKUlZJS0NrSkNUVkpKWDFOVVVrRlVYMFJGUmw5VFVFVkRTVTFGVGdwcFppQkpia2x1YVhScFlXeFFiM0IxYkdGMGFXOXVJSFJvWlc0Z1cxTndaV05wYldWdVhTQmxiSE5sSUh0OUlHRnpJRXhwYzNROFUzQmxZMmx0Wlc0K0NncENRazFTU1Y5VFZGSkJWRjlUUVUxUVRFVmZWRmxRUlY5VFZGSkJWRWxHU1VWU0NncENRazFTU1Y5VFZGSkJWRjlEVlZOVVQwUkpRVTVmVTFSU1FWUkpSa2xGVWdvS1FrSk5Va2xmVTFSU1FWUmZSRWxCUjA1UFUwbFRYMU5VVWtGVVNVWkpSVklLQ2tKQ1RWSkpYMU5VVWtGVVgwRkhSVjlUVkZKQlZFbEdTVVZTQ2dwQ1FrMVNTVjlUVkZKQlZGOUVSVVpmU1U1ZlNVNUpWRWxCVEY5UVQxQlZURUZVU1U5T0NuUnlkV1U9IgoJCQl9CgkJXSwKCQkicmVzb3VyY2VUeXBlIjogIkxpYnJhcnkiLAoJCSJzdGF0dXMiOiAiYWN0aXZlIiwKCQkidHlwZSI6IHsKCQkJImNvZGluZyI6IFsKCQkJCXsKCQkJCQkiY29kZSI6ICJsb2dpYy1saWJyYXJ5IiwKCQkJCQkic3lzdGVtIjogImh0dHA6Ly90ZXJtaW5vbG9neS5obDcub3JnL0NvZGVTeXN0ZW0vbGlicmFyeS10eXBlIgoJCQkJfQoJCQldCgkJfSwKCQkidXJsIjogInVybjp1dWlkOjdmZjUzMmFkLTY5ZTQtNDhlZC1hMmQzLTllZmFmYjYwOWY2MiIKCX0sCgkibWVhc3VyZSI6IHsKCQkiZ3JvdXAiOiBbCgkJCXsKCQkJCSJjb2RlIjogewoJCQkJCSJ0ZXh0IjogInBhdGllbnRzIgoJCQkJfSwKCQkJCSJwb3B1bGF0aW9uIjogWwoJCQkJCXsKCQkJCQkJImNvZGUiOiB7CgkJCQkJCQkiY29kaW5nIjogWwoJCQkJCQkJCXsKCQkJCQkJCQkJImNvZGUiOiAiaW5pdGlhbC1wb3B1bGF0aW9uIiwKCQkJCQkJCQkJInN5c3RlbSI6ICJodHRwOi8vdGVybWlub2xvZ3kuaGw3Lm9yZy9Db2RlU3lzdGVtL21lYXN1cmUtcG9wdWxhdGlvbiIKCQkJCQkJCQl9CgkJCQkJCQldCgkJCQkJCX0sCgkJCQkJCSJjcml0ZXJpYSI6IHsKCQkJCQkJCSJleHByZXNzaW9uIjogIkluSW5pdGlhbFBvcHVsYXRpb24iLAoJCQkJCQkJImxhbmd1YWdlIjogInRleHQvY3FsLWlkZW50aWZpZXIiCgkJCQkJCX0KCQkJCQl9CgkJCQldLAoJCQkJInN0cmF0aWZpZXIiOiBbCgkJCQkJewoJCQkJCQkiY29kZSI6IHsKCQkJCQkJCSJ0ZXh0IjogIkdlbmRlciIKCQkJCQkJfSwKCQkJCQkJImNyaXRlcmlhIjogewoJCQkJCQkJImV4cHJlc3Npb24iOiAiR2VuZGVyIiwKCQkJCQkJCSJsYW5ndWFnZSI6ICJ0ZXh0L2NxbCIKCQkJCQkJfQoJCQkJCX0sCgkJCQkJewoJCQkJCQkiY29kZSI6IHsKCQkJCQkJCSJ0ZXh0IjogIkFnZSIKCQkJCQkJfSwKCQkJCQkJImNyaXRlcmlhIjogewoJCQkJCQkJImV4cHJlc3Npb24iOiAiQWdlQ2xhc3MiLAoJCQkJCQkJImxhbmd1YWdlIjogInRleHQvY3FsIgoJCQkJCQl9CgkJCQkJfSwKCQkJCQl7CgkJCQkJCSJjb2RlIjogewoJCQkJCQkJInRleHQiOiAiQ3VzdG9kaWFuIgoJCQkJCQl9LAoJCQkJCQkiY3JpdGVyaWEiOiB7CgkJCQkJCQkiZXhwcmVzc2lvbiI6ICJDdXN0b2RpYW4iLAoJCQkJCQkJImxhbmd1YWdlIjogInRleHQvY3FsIgoJCQkJCQl9CgkJCQkJfQoJCQkJXQoJCQl9LAoJCQl7CgkJCQkiY29kZSI6IHsKCQkJCQkidGV4dCI6ICJkaWFnbm9zaXMiCgkJCQl9LAoJCQkJImV4dGVuc2lvbiI6IFsKCQkJCQl7CgkJCQkJCSJ1cmwiOiAiaHR0cDovL2hsNy5vcmcvZmhpci91cy9jcWZtZWFzdXJlcy9TdHJ1Y3R1cmVEZWZpbml0aW9uL2NxZm0tcG9wdWxhdGlvbkJhc2lzIiwKCQkJCQkJInZhbHVlQ29kZSI6ICJDb25kaXRpb24iCgkJCQkJfQoJCQkJXSwKCQkJCSJwb3B1bGF0aW9uIjogWwoJCQkJCXsKCQkJCQkJImNvZGUiOiB7CgkJCQkJCQkiY29kaW5nIjogWwoJCQkJCQkJCXsKCQkJCQkJCQkJImNvZGUiOiAiaW5pdGlhbC1wb3B1bGF0aW9uIiwKCQkJCQkJCQkJInN5c3RlbSI6ICJodHRwOi8vdGVybWlub2xvZ3kuaGw3Lm9yZy9Db2RlU3lzdGVtL21lYXN1cmUtcG9wdWxhdGlvbiIKCQkJCQkJCQl9CgkJCQkJCQldCgkJCQkJCX0sCgkJCQkJCSJjcml0ZXJpYSI6IHsKCQkJCQkJCSJleHByZXNzaW9uIjogIkRpYWdub3NpcyIsCgkJCQkJCQkibGFuZ3VhZ2UiOiAidGV4dC9jcWwtaWRlbnRpZmllciIKCQkJCQkJfQoJCQkJCX0KCQkJCV0sCgkJCQkic3RyYXRpZmllciI6IFsKCQkJCQl7CgkJCQkJCSJjb2RlIjogewoJCQkJCQkJInRleHQiOiAiZGlhZ25vc2lzIgoJCQkJCQl9LAoJCQkJCQkiY3JpdGVyaWEiOiB7CgkJCQkJCQkiZXhwcmVzc2lvbiI6ICJEaWFnbm9zaXNDb2RlIiwKCQkJCQkJCSJsYW5ndWFnZSI6ICJ0ZXh0L2NxbC1pZGVudGlmaWVyIgoJCQkJCQl9CgkJCQkJfQoJCQkJXQoJCQl9LAoJCQl7CgkJCQkiY29kZSI6IHsKCQkJCQkidGV4dCI6ICJzcGVjaW1lbiIKCQkJCX0sCgkJCQkiZXh0ZW5zaW9uIjogWwoJCQkJCXsKCQkJCQkJInVybCI6ICJodHRwOi8vaGw3Lm9yZy9maGlyL3VzL2NxZm1lYXN1cmVzL1N0cnVjdHVyZURlZmluaXRpb24vY3FmbS1wb3B1bGF0aW9uQmFzaXMiLAoJCQkJCQkidmFsdWVDb2RlIjogIlNwZWNpbWVuIgoJCQkJCX0KCQkJCV0sCgkJCQkicG9wdWxhdGlvbiI6IFsKCQkJCQl7CgkJCQkJCSJjb2RlIjogewoJCQkJCQkJImNvZGluZyI6IFsKCQkJCQkJCQl7CgkJCQkJCQkJCSJjb2RlIjogImluaXRpYWwtcG9wdWxhdGlvbiIsCgkJCQkJCQkJCSJzeXN0ZW0iOiAiaHR0cDovL3Rlcm1pbm9sb2d5LmhsNy5vcmcvQ29kZVN5c3RlbS9tZWFzdXJlLXBvcHVsYXRpb24iCgkJCQkJCQkJfQoJCQkJCQkJXQoJCQkJCQl9LAoJCQkJCQkiY3JpdGVyaWEiOiB7CgkJCQkJCQkiZXhwcmVzc2lvbiI6ICJTcGVjaW1lbiIsCgkJCQkJCQkibGFuZ3VhZ2UiOiAidGV4dC9jcWwtaWRlbnRpZmllciIKCQkJCQkJfQoJCQkJCX0KCQkJCV0sCgkJCQkic3RyYXRpZmllciI6IFsKCQkJCQl7CgkJCQkJCSJjb2RlIjogewoJCQkJCQkJInRleHQiOiAic2FtcGxlX2tpbmQiCgkJCQkJCX0sCgkJCQkJCSJjcml0ZXJpYSI6IHsKCQkJCQkJCSJleHByZXNzaW9uIjogIlNhbXBsZVR5cGUiLAoJCQkJCQkJImxhbmd1YWdlIjogInRleHQvY3FsIgoJCQkJCQl9CgkJCQkJfQoJCQkJXQoJCQl9CgkJXSwKCQkibGlicmFyeSI6ICJ1cm46dXVpZDo3ZmY1MzJhZC02OWU0LTQ4ZWQtYTJkMy05ZWZhZmI2MDlmNjIiLAoJCSJyZXNvdXJjZVR5cGUiOiAiTWVhc3VyZSIsCgkJInNjb3JpbmciOiB7CgkJCSJjb2RpbmciOiBbCgkJCQl7CgkJCQkJImNvZGUiOiAiY29ob3J0IiwKCQkJCQkic3lzdGVtIjogImh0dHA6Ly90ZXJtaW5vbG9neS5obDcub3JnL0NvZGVTeXN0ZW0vbWVhc3VyZS1zY29yaW5nIgoJCQkJfQoJCQldCgkJfSwKCQkic3RhdHVzIjogImFjdGl2ZSIsCgkJInN1YmplY3RDb2RlYWJsZUNvbmNlcHQiOiB7CgkJCSJjb2RpbmciOiBbCgkJCQl7CgkJCQkJImNvZGUiOiAiUGF0aWVudCIsCgkJCQkJInN5c3RlbSI6ICJodHRwOi8vaGw3Lm9yZy9maGlyL3Jlc291cmNlLXR5cGVzIgoJCQkJfQoJCQldCgkJfSwKCQkidXJsIjogInVybjp1dWlkOjVlZThkZTczLTM0N2UtNDdjYS1hMDE0LWYyZTcxNzY3YWRmYyIKCX0KfQ=="}' -H "Authorization: ApiKey app1.proxy1.broker App1Secret" http://localhost:8081/v1/tasks diff --git a/build.rs b/build.rs index ac9d9fe..76667b0 100644 --- a/build.rs +++ b/build.rs @@ -46,7 +46,8 @@ fn main() { build_data::set_GIT_DIRTY(); build_data::set_BUILD_DATE(); build_data::set_BUILD_TIME(); - build_data::no_debug_rebuilds(); + // We must always run this build script as otherwise, we would cache old versions of CQL maps + //build_data::no_debug_rebuilds(); println!("cargo:rustc-env=SAMPLY_USER_AGENT=Samply.Focus.{}/{}", env!("CARGO_PKG_NAME"), version()); build_cqlmap(); diff --git a/resources/cql/BBMRI_STRAT_GENDER_STRATIFIER b/resources/cql/BBMRI_STRAT_GENDER_STRATIFIER index 7223b2d..a7daf87 100644 --- a/resources/cql/BBMRI_STRAT_GENDER_STRATIFIER +++ b/resources/cql/BBMRI_STRAT_GENDER_STRATIFIER @@ -1,2 +1,4 @@ define Gender: - if (Patient.gender is null) then 'unknown' else Patient.gender + if (Patient.gender is null) then 'unknown' + else if (Patient.gender != 'male' and Patient.gender != 'female' and Patient.gender != 'other' and Patient.gender != 'unknown') then 'other' + else Patient.gender \ No newline at end of file diff --git a/resources/cql/DKTK_STRAT_AGE_STRATIFIER b/resources/cql/DKTK_STRAT_AGE_STRATIFIER index aab0cb3..6cb9744 100644 --- a/resources/cql/DKTK_STRAT_AGE_STRATIFIER +++ b/resources/cql/DKTK_STRAT_AGE_STRATIFIER @@ -1,7 +1,7 @@ define PrimaryDiagnosis: First( from [Condition] C -where C.extension.where(url='http://hl7.org/fhir/StructureDefinition/condition-related').empty() +where C.extension.where(url='http://hl7.org/fhir/StructureDefinition/condition-related').empty() and C.onset is not null sort by date from onset asc) define AgeClass: diff --git a/resources/test/query_bbmri.cql b/resources/test/query_bbmri.cql index b080800..3493054 100644 --- a/resources/test/query_bbmri.cql +++ b/resources/test/query_bbmri.cql @@ -15,8 +15,9 @@ define AgeClass: define Gender: - if (Patient.gender is null) then 'unknown' else Patient.gender - + if (Patient.gender is null) then 'unknown' + else if (Patient.gender != 'male' and Patient.gender != 'female' and Patient.gender != 'other' and Patient.gender != 'unknown') then 'other' + else Patient.gender define Custodian: First(from Specimen.extension E diff --git a/src/beam.rs b/src/beam.rs index 1d29c64..d67ef9f 100644 --- a/src/beam.rs +++ b/src/beam.rs @@ -86,15 +86,15 @@ pub async fn retrieve_tasks() -> Result>, FocusError> { .map_err(FocusError::UnableToRetrieveTasksHttp) } -pub async fn answer_task(task_id: MsgId, result: &TaskResult) -> Result<(), FocusError> { - debug!("Answer task with id: {task_id}"); - BEAM_CLIENT.put_result(result, &task_id) - .await - .map(|_| ()) - .or_else(|e| match e { - beam_lib::BeamError::UnexpectedStatus(s) if s == StatusCode::NOT_FOUND => Ok(()), - other => Err(FocusError::UnableToAnswerTask(other)) - }) +pub async fn answer_task(result: &TaskResult) -> Result<(), FocusError> { + debug!("Answer task with id: {}", result.task); + BEAM_CLIENT.put_result(result, &result.task) + .await + .map(|_| ()) + .or_else(|e| match e { + beam_lib::BeamError::UnexpectedStatus(s) if s == StatusCode::NOT_FOUND => Ok(()), + other => Err(FocusError::UnableToAnswerTask(other)) + }) } pub async fn fail_task(task: &TaskRequest, body: impl Into) -> Result<(), FocusError> { diff --git a/src/blaze.rs b/src/blaze.rs index 1ef9870..6e5f310 100644 --- a/src/blaze.rs +++ b/src/blaze.rs @@ -4,7 +4,9 @@ use serde::Serialize; use serde_json::Value; use tracing::{debug, warn}; +use crate::BeamTask; use crate::errors::FocusError; +use crate::util; use crate::util::get_json_field; use crate::config::CONFIG; @@ -120,3 +122,9 @@ pub async fn run_cql_query(library: &Value, measure: &Value) -> Result Result { + let decoded = util::base64_decode(&task.body)?; + serde_json::from_slice(&decoded).map_err(|e| FocusError::ParsingError(e.to_string())) +} diff --git a/src/config.rs b/src/config.rs index f5910d3..0561a95 100644 --- a/src/config.rs +++ b/src/config.rs @@ -11,13 +11,13 @@ use tracing::{debug, info, warn}; use crate::errors::FocusError; -#[derive(clap::ValueEnum, Clone, Debug)] +#[derive(clap::ValueEnum, Clone, PartialEq, Debug)] pub enum Obfuscate { No, Yes, } -#[derive(clap::ValueEnum, Clone, Debug, PartialEq, Copy)] +#[derive(clap::ValueEnum, Clone, PartialEq, Debug, Copy)] pub enum EndpointType { Blaze, Omop, @@ -124,7 +124,7 @@ struct CliArgs { rounding_step: usize, /// Projects for which the results are not to be obfuscated, separated by ; - #[clap(long, env, value_parser, default_value = "exliquid;dktk_supervisors;exporter")] + #[clap(long, env, value_parser, default_value = "exliquid;dktk_supervisors;exporter;ehds2")] projects_no_obfuscation: String, /// The path to the file containing BASE64 encoded queries whose results are to be cached diff --git a/src/errors.rs b/src/errors.rs index 6cb4d42..297cc8e 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -2,29 +2,29 @@ use thiserror::Error; #[derive(Error, Debug)] pub enum FocusError { - #[error("Unable to post FHIR Library")] + #[error("Unable to post FHIR Library: {0}")] UnableToPostLibrary(reqwest::Error), - #[error("Unable to post FHIR Measure")] + #[error("Unable to post FHIR Measure: {0}")] UnableToPostMeasure(reqwest::Error), - #[error("FHIR Measure evaluation error in Reqwest")] + #[error("FHIR Measure evaluation error in Reqwest: {0}")] MeasureEvaluationErrorReqwest(reqwest::Error), - #[error("FHIR Measure evaluation error in Blaze")] + #[error("FHIR Measure evaluation error in Blaze: {0}")] MeasureEvaluationErrorBlaze(String), #[error("CQL query error")] CQLQueryError(), #[error("Unable to retrieve tasks from Beam: {0}")] UnableToRetrieveTasksHttp(beam_lib::BeamError), - #[error("Unable to answer task")] + #[error("Unable to answer task: {0}")] UnableToAnswerTask(beam_lib::BeamError), - #[error("Unable to set proxy settings")] + #[error("Unable to set proxy settings: {0}")] InvalidProxyConfig(reqwest::Error), - #[error("Decode error")] + #[error("Decode error: {0}")] DecodeError(base64::DecodeError), - #[error("Configuration error")] + #[error("Configuration error: {0}")] ConfigurationError(String), - #[error("Cannot open file")] + #[error("Cannot open file: {0}")] FileOpeningError(String), - #[error("Parsing error")] + #[error("Parsing error: {0}")] ParsingError(String), #[error("CQL tampered with: {0}")] CQLTemperedWithError(String), @@ -48,3 +48,16 @@ pub enum FocusError { MissingExporterEndpoint(), } + +impl FocusError { + /// Generate a descriptive error message that does not leak any sensitive data that might be contained inside the error value + pub fn user_facing_error(&self) -> &'static str { + use FocusError::*; + // TODO: Add more match arms + match self { + DecodeError(_) | ParsingError(_) => "Cannot parse query.", + LaplaceError(_) => "Cannot obfuscate result.", + _ => "Failed to execute query." + } + } +} diff --git a/src/main.rs b/src/main.rs index 4f93d7f..64218b9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,30 +9,33 @@ mod graceful_shutdown; mod logger; mod intermediate_rep; +mod task_processing; mod util; -use beam_lib::{TaskRequest, TaskResult}; +use base64::{engine::general_purpose::STANDARD as BASE64, Engine as _}; +use beam_lib::{MsgId, TaskRequest, TaskResult}; use laplace_rs::ObfCache; +use task_processing::TaskQueue; +use tokio::sync::Mutex; use crate::util::{is_cql_tampered_with, obfuscate_counts_mr}; use crate::{config::CONFIG, errors::FocusError}; use blaze::CqlQuery; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; +use std::ops::DerefMut; use std::process::ExitCode; use std::str; +use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; use std::{process::exit, time::Duration}; -use base64::{engine::general_purpose, Engine as _}; - use serde::{Deserialize, Serialize}; -use serde_json::from_slice; - use tracing::{debug, error, warn}; // result cache type SearchQuery = String; +type Obfuscated = bool; type Report = String; type Created = std::time::SystemTime; //epoch type BeamTask = TaskRequest; @@ -45,9 +48,36 @@ struct Metadata { execute: bool, } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Default)] struct ReportCache { - cache: HashMap, + cache: HashMap<(SearchQuery, Obfuscated), (Report, Created)>, +} + +impl ReportCache { + pub fn new() -> Self { + let mut cache = HashMap::new(); + + if let Some(filename) = CONFIG.queries_to_cache_file_path.clone() { + let lines = util::read_lines(filename.clone().to_string()); + match lines { + Ok(ok_lines) => { + for line in ok_lines { + let Ok(ok_line) = line else { + warn!("A line in the file {} is not readable", filename); + continue; + }; + cache.insert((ok_line.clone(), false), ("".into(), UNIX_EPOCH)); + cache.insert((ok_line, true), ("".into(), UNIX_EPOCH)); + } + } + Err(_) => { + error!("The file {} cannot be opened", filename); //This shouldn't stop focus from running, it's just going to go to blaze every time, but that's not too slow + } + } + } + + Self { cache } + } } const REPORTCACHE_TTL: Duration = Duration::from_secs(86400); //24h @@ -73,33 +103,11 @@ pub async fn main() -> ExitCode { } async fn main_loop() -> ExitCode { - let mut obf_cache: ObfCache = ObfCache { - cache: HashMap::new(), - }; - - let mut report_cache: ReportCache = ReportCache { - cache: HashMap::new(), - }; - - if let Some(filename) = CONFIG.queries_to_cache_file_path.clone() { - let lines = util::read_lines(filename.clone().to_string()); - match lines { - Ok(ok_lines) => { - for line in ok_lines { - let Ok(ok_line) = line else { - warn!("A line in the file {} is not readable", filename); - continue; - }; - report_cache.cache.insert(ok_line, ("".into(), UNIX_EPOCH)); - } - } - Err(_) => { - error!("The file {} cannot be opened", filename); - exit(2); - } - } - } + // TODO: The report cache init should be an fn on the cache + let report_cache: ReportCache = ReportCache::new(); + let mut seen_tasks = Default::default(); + let mut task_queue = task_processing::spawn_task_workers(report_cache); let mut failures = 0; while failures < CONFIG.retry_count { if failures > 0 { @@ -122,9 +130,9 @@ async fn main_loop() -> ExitCode { //TODO health check } - if let Err(e) = process_tasks(&mut obf_cache, &mut report_cache).await { + if let Err(e) = process_tasks(&mut task_queue, &mut seen_tasks).await { warn!("Encountered the following error, while processing tasks: {e}"); - //failures += 1; + failures += 1; } else { failures = 0; } @@ -136,167 +144,30 @@ async fn main_loop() -> ExitCode { ExitCode::from(22) } -async fn process_task( - task: &BeamTask, - obf_cache: &mut ObfCache, - report_cache: &mut ReportCache, -) -> Result { - debug!("Processing task {}", task.id); - - let metadata: Metadata = serde_json::from_value(task.metadata.clone()).unwrap_or(Metadata { - project: "default_obfuscation".to_string(), - execute: true, - }); - - if metadata.project == "focus-healthcheck" { - return Ok(beam::beam_result::succeeded( - CONFIG.beam_app_id_long.clone(), - vec![task.from.clone()], - task.id, - "healthy".into() - )); - } - - if metadata.project == "exporter" { - let body = &task.body; - return Ok(run_exporter_query(task, body, metadata.execute).await)?; - } - - if CONFIG.endpoint_type == config::EndpointType::Blaze { - let query = parse_blaze_query(task)?; - if query.lang == "cql" { - // TODO: Change query.lang to an enum - - Ok(run_cql_query(task, &query, obf_cache, report_cache, metadata.project).await)? - } else { - warn!("Can't run queries with language {} in Blaze", query.lang); - Ok(beam::beam_result::perm_failed( - CONFIG.beam_app_id_long.clone(), - vec![task.from.clone()], - task.id, - format!( - "Can't run queries with language {} and/or endpoint type {}", - query.lang, CONFIG.endpoint_type - ), - )) - } - } else if CONFIG.endpoint_type == config::EndpointType::Omop { - let decoded = decode_body(task)?; - let intermediate_rep_query: intermediate_rep::IntermediateRepQuery = - from_slice(&decoded).map_err(|e| FocusError::ParsingError(e.to_string()))?; - //TODO check that the language is ast - let query_decoded = general_purpose::STANDARD - .decode(intermediate_rep_query.query) - .map_err(FocusError::DecodeError)?; - let ast: ast::Ast = - from_slice(&query_decoded).map_err(|e| FocusError::ParsingError(e.to_string()))?; - - Ok(run_intermediate_rep_query(task, ast).await)? - } else { - warn!( - "Can't run queries with endpoint type {}", - CONFIG.endpoint_type - ); - Ok(beam::beam_result::perm_failed( - CONFIG.beam_app_id_long.clone(), - vec![task.from.clone()], - task.id, - format!( - "Can't run queries with endpoint type {}", - CONFIG.endpoint_type - ), - )) - } -} - async fn process_tasks( - obf_cache: &mut ObfCache, - report_cache: &mut ReportCache, + task_queue: &mut TaskQueue, + seen: &mut HashSet, ) -> Result<(), FocusError> { debug!("Start processing tasks..."); - let tasks = beam::retrieve_tasks().await?; for task in tasks { - let task_cloned = task.clone(); - let claiming = tokio::task::spawn(async move { beam::claim_task(&task_cloned).await }); - let res = process_task(&task, obf_cache, report_cache).await; - let error_msg = match res { - Err(FocusError::DecodeError(_)) | Err(FocusError::ParsingError(_)) => { - Some("Cannot parse query".to_string()) - } - Err(FocusError::LaplaceError(_)) => Some("Cannot obfuscate result".to_string()), - Err(ref e) => Some(format!("Cannot execute query: {}", e)), - Ok(_) => None, - }; - - let res = res.ok(); - let res = res.as_ref(); - - // Make sure that claiming the task is done before we update it again. - match claiming.await.unwrap() { - Ok(_) => {} - Err(FocusError::ConfigurationError(s)) => { - error!("FATAL: Unable to report back to Beam due to a configuration issue: {s}"); - return Err(FocusError::ConfigurationError(s)); - } - Err(FocusError::UnableToAnswerTask(e)) => { - warn!("Unable to report claimed task to Beam: {e}"); - } - Err(e) => { - warn!("Unknown error reporting claimed task back to Beam: {e}"); - } - } - - const MAX_TRIES: u32 = 150; - for attempt in 0..MAX_TRIES { - let comm_result = if let Some(ref err_msg) = error_msg { - beam::fail_task(&task, err_msg).await - } else { - beam::answer_task(task.id, res.unwrap()).await - }; - match comm_result { - Ok(_) => break, - Err(FocusError::ConfigurationError(s)) => { - error!( - "FATAL: Unable to report back to Beam due to a configuration issue: {s}" - ); - return Err(FocusError::ConfigurationError(s)); - } - Err(FocusError::UnableToAnswerTask(e)) => { - warn!("Unable to report task result to Beam: {e}. Retrying (attempt {attempt}/{MAX_TRIES})."); - } - Err(e) => { - warn!("Unknown error reporting task result back to Beam: {e}. Retrying (attempt {attempt}/{MAX_TRIES})."); - } - } - tokio::time::sleep(Duration::from_secs(2)).await; + if seen.contains(&task.id) { + continue; } + seen.insert(task.id); + task_queue + .send(task) + .await + .expect("Receiver is never dropped"); } Ok(()) } -fn decode_body(task: &BeamTask) -> Result, FocusError> { - let decoded = general_purpose::STANDARD - .decode(&task.body) - .map_err(FocusError::DecodeError)?; - - Ok(decoded) -} - -fn parse_blaze_query(task: &BeamTask) -> Result { - let decoded = decode_body(task)?; - - let query: blaze::CqlQuery = - from_slice(&decoded).map_err(|e| FocusError::ParsingError(e.to_string()))?; - - Ok(query) -} - async fn run_cql_query( task: &BeamTask, query: &CqlQuery, - obf_cache: &mut ObfCache, - report_cache: &mut ReportCache, + obf_cache: Arc>, + report_cache: Arc>, project: String, ) -> Result { let encoded_query = @@ -309,8 +180,15 @@ async fn run_cql_query( let mut key_exists = false; - let cached_report = report_cache.cache.get(encoded_query); - let report_from_cache = match cached_report { + let obfuscate = + CONFIG.obfuscate == config::Obfuscate::Yes && !CONFIG.unobfuscated.contains(&project); + + let report_from_cache = match report_cache + .lock() + .await + .cache + .get(&(encoded_query.to_string(), obfuscate)) + { Some(existing_report) => { key_exists = true; if SystemTime::now().duration_since(existing_report.1).unwrap() < REPORTCACHE_TTL { @@ -329,32 +207,26 @@ async fn run_cql_query( let cql_result = blaze::run_cql_query(&query.lib, &query.measure).await?; - let cql_result_new: String = match CONFIG.obfuscate { - config::Obfuscate::Yes => { - if !CONFIG.unobfuscated.contains(&project) { - obfuscate_counts_mr( - &cql_result, - obf_cache, - CONFIG.obfuscate_zero, - CONFIG.obfuscate_below_10_mode, - CONFIG.delta_patient, - CONFIG.delta_specimen, - CONFIG.delta_diagnosis, - CONFIG.delta_procedures, - CONFIG.delta_medication_statements, - CONFIG.epsilon, - CONFIG.rounding_step, - )? - } else { - cql_result - } - } - config::Obfuscate::No => cql_result, + let cql_result_new: String = match obfuscate { + true => obfuscate_counts_mr( + &cql_result, + obf_cache.lock().await.deref_mut(), + CONFIG.obfuscate_zero, + CONFIG.obfuscate_below_10_mode, + CONFIG.delta_patient, + CONFIG.delta_specimen, + CONFIG.delta_diagnosis, + CONFIG.delta_procedures, + CONFIG.delta_medication_statements, + CONFIG.epsilon, + CONFIG.rounding_step, + )?, + false => cql_result, }; if key_exists { - report_cache.cache.insert( - encoded_query.to_string(), + report_cache.lock().await.cache.insert( + (encoded_query.to_string(), obfuscate), (cql_result_new.clone(), std::time::SystemTime::now()), ); } @@ -443,9 +315,7 @@ fn replace_cql_library(mut query: CqlQuery) -> Result { query.lib )))?; - let decoded_cql = general_purpose::STANDARD - .decode(old_data_string) - .map_err(FocusError::DecodeError)?; + let decoded_cql = util::base64_decode(old_data_string)?; let decoded_string = str::from_utf8(&decoded_cql) .map_err(|_| FocusError::ParsingError("CQL query was invalid".into()))?; @@ -461,7 +331,7 @@ fn replace_cql_library(mut query: CqlQuery) -> Result { }; let replaced_cql_str = util::replace_cql(decoded_string); - let replaced_cql_str_base64 = general_purpose::STANDARD.encode(replaced_cql_str); + let replaced_cql_str_base64 = BASE64.encode(replaced_cql_str); let new_data_value = serde_json::to_value(replaced_cql_str_base64) .expect("unable to turn base64 string into json value - this should not happen"); @@ -472,7 +342,7 @@ fn replace_cql_library(mut query: CqlQuery) -> Result { } fn beam_result(task: BeamTask, measure_report: String) -> Result { - let data = general_purpose::STANDARD.encode(measure_report.as_bytes()); + let data = BASE64.encode(measure_report.as_bytes()); Ok(beam::beam_result::succeeded( CONFIG.beam_app_id_long.clone(), vec![task.from], @@ -481,6 +351,7 @@ fn beam_result(task: BeamTask, measure_report: String) -> Result; + +pub fn spawn_task_workers(report_cache: ReportCache) -> TaskQueue { + let (tx, mut rx) = mpsc::channel::(WORKER_BUFFER); + + let obf_cache = Arc::new(Mutex::new(ObfCache { + cache: HashMap::new(), + })); + + let report_cache: Arc> = Arc::new(Mutex::new(report_cache)); + + tokio::spawn(async move { + let semaphore = Arc::new(Semaphore::new(NUM_WORKERS)); + while let Some(task) = rx.recv().await { + let permit = semaphore.clone().acquire_owned().await.unwrap(); + let local_report_cache = report_cache.clone(); + let local_obf_cache = obf_cache.clone(); + tokio::spawn(async move { + let span = info_span!("task handling", %task.id); + handle_beam_task(task, local_obf_cache, local_report_cache).instrument(span).await; + drop(permit) + }); + } + }); + + tx +} + +async fn handle_beam_task(task: BeamTask, local_obf_cache: Arc>, local_report_cache: Arc>) { + let task_claiming = beam::claim_task(&task); + let mut task_processing = std::pin::pin!(process_task(&task, local_obf_cache, local_report_cache)); + let task_result = tokio::select! { + // If task task processing happens before claiming is done drop the task claiming future + task_processed = &mut task_processing => { + task_processed + }, + task_claimed = task_claiming => { + if let Err(e) = task_claimed { + warn!("Failed to claim task: {e}"); + } else { + debug!("Successfully claimed task"); + }; + task_processing.await + } + }; + let result = match task_result { + Ok(res) => res, + Err(e) => { + warn!("Failed to execute query: {e}"); + if let Err(e) = beam::fail_task(&task, e.user_facing_error()).await { + warn!("Failed to report failure to beam: {e}"); + } + return; + } + }; + + const MAX_TRIES: u32 = 3600; + for attempt in 0..MAX_TRIES { + match beam::answer_task(&result).await { + Ok(_) => break, + Err(FocusError::ConfigurationError(s)) => { + error!( + "FATAL: Unable to report back to Beam due to a configuration issue: {s}" + ); + } + Err(FocusError::UnableToAnswerTask(e)) => { + warn!("Unable to report task result to Beam: {e}. Retrying (attempt {attempt}/{MAX_TRIES})."); + } + Err(e) => { + warn!("Unknown error reporting task result back to Beam: {e}. Retrying (attempt {attempt}/{MAX_TRIES})."); + } + }; + tokio::time::sleep(Duration::from_secs(2)).await; + } +} + +async fn process_task( + task: &BeamTask, + obf_cache: Arc>, + report_cache: Arc>, +) -> Result { + debug!("Processing task {}", task.id); + + let metadata: Metadata = serde_json::from_value(task.metadata.clone()).unwrap_or(Metadata { + project: "default_obfuscation".to_string(), + execute: true, + }); + + if metadata.project == "focus-healthcheck" { + return Ok(beam::beam_result::succeeded( + CONFIG.beam_app_id_long.clone(), + vec![task.from.clone()], + task.id, + "healthy".into() + )); + } + + if metadata.project == "exporter" { + let body = &task.body; + return Ok(run_exporter_query(task, body, metadata.execute).await)?; + } + + if CONFIG.endpoint_type == EndpointType::Blaze { + let query = parse_blaze_query(task)?; + if query.lang == "cql" { + // TODO: Change query.lang to an enum + + Ok(run_cql_query(task, &query, obf_cache, report_cache, metadata.project).await)? + } else { + warn!("Can't run queries with language {} in Blaze", query.lang); + Ok(beam::beam_result::perm_failed( + CONFIG.beam_app_id_long.clone(), + vec![task.from.clone()], + task.id, + format!( + "Can't run queries with language {} and/or endpoint type {}", + query.lang, CONFIG.endpoint_type + ), + )) + } + } else if CONFIG.endpoint_type == EndpointType::Omop { + let decoded = util::base64_decode(&task.body)?; + let intermediate_rep_query: intermediate_rep::IntermediateRepQuery = + serde_json::from_slice(&decoded).map_err(|e| FocusError::ParsingError(e.to_string()))?; + //TODO check that the language is ast + let query_decoded = general_purpose::STANDARD + .decode(intermediate_rep_query.query) + .map_err(FocusError::DecodeError)?; + let ast: ast::Ast = + serde_json::from_slice(&query_decoded).map_err(|e| FocusError::ParsingError(e.to_string()))?; + + Ok(run_intermediate_rep_query(task, ast).await)? + } else { + warn!( + "Can't run queries with endpoint type {}", + CONFIG.endpoint_type + ); + Ok(beam::beam_result::perm_failed( + CONFIG.beam_app_id_long.clone(), + vec![task.from.clone()], + task.id, + format!( + "Can't run queries with endpoint type {}", + CONFIG.endpoint_type + ), + )) + } +} diff --git a/src/util.rs b/src/util.rs index 668fde7..47b938d 100644 --- a/src/util.rs +++ b/src/util.rs @@ -1,4 +1,6 @@ use crate::errors::FocusError; +use base64::Engine as _; +use base64::engine::general_purpose; use laplace_rs::{get_from_cache_or_privatize, Bin, ObfCache, ObfuscateBelow10Mode}; use rand::thread_rng; use serde::{Deserialize, Serialize}; @@ -81,6 +83,12 @@ pub(crate) fn read_lines(filename: String) -> Result>, Ok(io::BufReader::new(file).lines()) } +pub(crate) fn base64_decode(data: impl AsRef<[u8]>) -> Result, FocusError> { + general_purpose::STANDARD + .decode(data) + .map_err(FocusError::DecodeError) +} + // REPLACE_MAP is built in build.rs include!(concat!(env!("OUT_DIR"), "/replace_map.rs")); @@ -411,7 +419,7 @@ mod test { let decoded_library = "BBMRI_STRAT_GENDER_STRATIFIER"; - let expected_result = "define Gender:\n if (Patient.gender is null) then 'unknown' else Patient.gender\n"; + let expected_result = "define Gender:\n if (Patient.gender is null) then 'unknown'\n else if (Patient.gender != 'male' and Patient.gender != 'female' and Patient.gender != 'other' and Patient.gender != 'unknown') then 'other'\n else Patient.gender"; assert_eq!(replace_cql(decoded_library), expected_result); let decoded_library = "BBMRI_STRAT_CUSTODIAN_STRATIFIER";