Skip to content

Commit

Permalink
Merge pull request #158 from samply/feature/sql
Browse files Browse the repository at this point in the history
Feature/sql
  • Loading branch information
enola-dkfz authored Jul 22, 2024
2 parents 28c6a63 + c5b647f commit b921140
Show file tree
Hide file tree
Showing 8 changed files with 295 additions and 81 deletions.
6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ license = "Apache-2.0"
[dependencies]
base64 = "0.22.1"
http = "0.2"
reqwest = { version = "0.11", default_features = false, features = ["json", "default-tls"] }
reqwest = { version = "0.11", default-features = false, features = ["json", "default-tls"] }
serde = { version = "1.0.152", features = ["serde_derive"] }
serde_json = "1.0"
thiserror = "1.0.38"
Expand All @@ -21,6 +21,9 @@ laplace_rs = {git = "https://github.com/samply/laplace-rs.git", tag = "v0.3.0" }
uuid = "1.8.0"
rand = { default-features = false, version = "0.8.5" }
futures-util = { version = "0.3", default-features = false, features = ["std"] }
sqlx = { version = "0.7.4", features = [ "runtime-tokio", "postgres", "macros", "chrono"] }
sqlx-pgrow-serde = "0.2.0"
tryhard = "0.5"

# Logging
tracing = { version = "0.1.37", default_features = false }
Expand All @@ -32,6 +35,7 @@ once_cell = "1.18"
# Command Line Interface
clap = { version = "4", default_features = false, features = ["std", "env", "derive", "help", "color"] }


[features]
default = []
bbmri = []
Expand Down
36 changes: 21 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,24 +34,25 @@ BEAM_APP_ID_LONG = "app1.broker.example.com"
### Optional variables

```bash
RETRY_COUNT = "32" # The maximum number of retries for beam and blaze healthchecks, default value: 32
ENDPOINT_TYPE = "blaze" # Type of the endpoint, allowed values: "blaze", "omop", default value: "blaze"
RETRY_COUNT = "32" # The maximum number of retries for beam and blaze healthchecks; default value: 32
ENDPOINT_TYPE = "blaze" # Type of the endpoint, allowed values: "blaze", "omop", "sql", "blaze-and-sql"; default value: "blaze"
EXPORTER_URL = " https://exporter.site/" # The exporter URL
OBFUSCATE = "yes" # Should the results be obfuscated - the "master switch", allowed values: "yes", "no", default value: "yes"
OBFUSCATE_BELOW_10_MODE = "1" # The mode of obfuscating values below 10: 0 - return zero, 1 - return ten, 2 - obfuscate using Laplace distribution and rounding, has no effect if OBFUSCATE = "no", default value: 1
DELTA_PATIENT = "1." # Sensitivity parameter for obfuscating the counts in the Patient stratifier, has no effect if OBFUSCATE = "no", default value: 1
DELTA_SPECIMEN = "20." # Sensitivity parameter for obfuscating the counts in the Specimen stratifier, has no effect if OBFUSCATE = "no", default value: 20
DELTA_DIAGNOSIS = "3." # Sensitivity parameter for obfuscating the counts in the Diagnosis stratifier, has no effect if OBFUSCATE = "no", default value: 3
DELTA_PROCEDURES = "1.7" # Sensitivity parameter for obfuscating the counts in the Procedures stratifier, has no effect if OBFUSCATE = "no", default value: 1.7
DELTA_MEDICATION_STATEMENTS = "2.1" # Sensitivity parameter for obfuscating the counts in the Medication Statements stratifier, has no effect if OBFUSCATE = "no", default value: 2.1
DELTA_HISTO = "20." # Sensitivity parameter for obfuscating the counts in the Histo stratifier, has no effect if OBFUSCATE = "no", default value: 20
EPSILON = "0.1" # Privacy budget parameter for obfuscating the counts in the stratifiers, has no effect if OBFUSCATE = "no", default value: 0.1
ROUNDING_STEP = "10" # The granularity of the rounding of the obfuscated values, has no effect if OBFUSCATE = "no", default value: 10
PROJECTS_NO_OBFUSCATION = "exliquid;dktk_supervisors;exporter;ehds2" # Projects for which the results are not to be obfuscated, separated by ;, default value: "exliquid;dktk_supervisors;exporter;ehds2"
OBFUSCATE = "yes" # Should the results be obfuscated - the "master switch", allowed values: "yes", "no"; default value: "yes"
OBFUSCATE_BELOW_10_MODE = "1" # The mode of obfuscating values below 10: 0 - return zero, 1 - return ten, 2 - obfuscate using Laplace distribution and rounding, has no effect if OBFUSCATE = "no"; default value: 1
DELTA_PATIENT = "1." # Sensitivity parameter for obfuscating the counts in the Patient stratifier, has no effect if OBFUSCATE = "no"; default value: 1
DELTA_SPECIMEN = "20." # Sensitivity parameter for obfuscating the counts in the Specimen stratifier, has no effect if OBFUSCATE = "no"; default value: 20
DELTA_DIAGNOSIS = "3." # Sensitivity parameter for obfuscating the counts in the Diagnosis stratifier, has no effect if OBFUSCATE = "no"; default value: 3
DELTA_PROCEDURES = "1.7" # Sensitivity parameter for obfuscating the counts in the Procedures stratifier, has no effect if OBFUSCATE = "no"; default value: 1.7
DELTA_MEDICATION_STATEMENTS = "2.1" # Sensitivity parameter for obfuscating the counts in the Medication Statements stratifier, has no effect if OBFUSCATE = "no"; default value: 2.1
DELTA_HISTO = "20." # Sensitivity parameter for obfuscating the counts in the Histo stratifier, has no effect if OBFUSCATE = "no"; default value: 20
EPSILON = "0.1" # Privacy budget parameter for obfuscating the counts in the stratifiers, has no effect if OBFUSCATE = "no"; default value: 0.1
ROUNDING_STEP = "10" # The granularity of the rounding of the obfuscated values, has no effect if OBFUSCATE = "no"; default value: 10
PROJECTS_NO_OBFUSCATION = "exliquid;dktk_supervisors;exporter;ehds2" # Projects for which the results are not to be obfuscated, separated by ";" ; default value: "exliquid;dktk_supervisors;exporter;ehds2"
QUERIES_TO_CACHE = "queries_to_cache.conf" # The path to a file containing base64 encoded queries whose results are to be cached. If not set, no results are cached
PROVIDER = "name" #OMOP provider name
PROVIDER_ICON = "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABAQMAAAAl21bKAAAAA1BMVEUAAACnej3aAAAAAXRSTlMAQObYZgAAAApJREFUCNdjYAAAAAIAAeIhvDMAAAAASUVORK5CYII=" # Base64 encoded OMOP provider icon
PROVIDER = "name" #EUCAIM provider name
PROVIDER_ICON = "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABAQMAAAAl21bKAAAAA1BMVEUAAACnej3aAAAAAXRSTlMAQObYZgAAAApJREFUCNdjYAAAAAIAAeIhvDMAAAAASUVORK5CYII=" # Base64 encoded EUCAIM provider icon
AUTH_HEADER = "ApiKey XXXX" #Authorization header
POSTGRES_CONNECTION_STRING = "postgresql://postgres:Test.123@localhost:5432/postgres" # Postgres connection string
```

Obfuscating zero counts is by default switched off. To enable obfuscating zero counts, set the env. variable `OBFUSCATE_ZERO`.
Expand Down Expand Up @@ -80,6 +81,11 @@ Creating a sample task containing an abstract syntax tree (AST) query using curl
curl -v -X POST -H "Content-Type: application/json" --data '{"id":"7fffefff-ffef-fcff-feef-feffffffffff","from":"app1.proxy1.broker","to":["app1.proxy1.broker"],"ttl":"10s","failure_strategy":{"retry":{"backoff_millisecs":1000,"max_tries":5}},"metadata":{"project":"bbmri"},"body":"eyJsYW5nIjoiYXN0IiwicGF5bG9hZCI6ImV5SmhjM1FpT25zaWIzQmxjbUZ1WkNJNklrOVNJaXdpWTJocGJHUnlaVzRpT2x0N0ltOXdaWEpoYm1RaU9pSkJUa1FpTENKamFHbHNaSEpsYmlJNlczc2liM0JsY21GdVpDSTZJazlTSWl3aVkyaHBiR1J5Wlc0aU9sdDdJbXRsZVNJNkltZGxibVJsY2lJc0luUjVjR1VpT2lKRlVWVkJURk1pTENKemVYTjBaVzBpT2lJaUxDSjJZV3gxWlNJNkltMWhiR1VpZlN4N0ltdGxlU0k2SW1kbGJtUmxjaUlzSW5SNWNHVWlPaUpGVVZWQlRGTWlMQ0p6ZVhOMFpXMGlPaUlpTENKMllXeDFaU0k2SW1abGJXRnNaU0o5WFgxZGZWMTlMQ0pwWkNJNkltRTJaakZqWTJZekxXVmlaakV0TkRJMFppMDVaRFk1TFRSbE5XUXhNelZtTWpNME1DSjkifQ=="}' -H "Authorization: ApiKey app1.proxy1.broker App1Secret" http://localhost:8081/v1/tasks
```

Creating a sample SQL task for a `SELECT_TABLES` query using curl:
```bash
curl -v -X POST -H "Content-Type: application/json" --data '{"id":"7fffefff-ffef-fcff-feef-feffffffffff","from":"app1.proxy1.broker","to":["app1.proxy1.broker"],"ttl":"10s","failure_strategy":{"retry":{"backoff_millisecs":1000,"max_tries":5}},"metadata":{"project":"exliquid"},"body":"eyJwYXlsb2FkIjoiU0VMRUNUX1RBQkxFUyJ9"}' -H "Authorization: ApiKey app1.proxy1.broker App1Secret" http://localhost:8081/v1/tasks
```

Creating a sample [Exporter](https://github.com/samply/exporter) "execute" task containing an Exporter query using curl:

```bash
Expand Down
25 changes: 25 additions & 0 deletions build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,30 @@ fn build_cqlmap() {
).unwrap();
}

fn build_sqlmap() {
let path = Path::new(&env::var("OUT_DIR").unwrap()).join("sql_replace_map.rs");
let mut file = BufWriter::new(File::create(path).unwrap());

write!(&mut file, r#"
static SQL_REPLACE_MAP: once_cell::sync::Lazy<HashMap<&'static str, &'static str>> = once_cell::sync::Lazy::new(|| {{
let mut map = HashMap::new();
"#).unwrap();

for sqlfile in std::fs::read_dir(Path::new("resources/sql")).unwrap() {
let sqlfile = sqlfile.unwrap();
let sqlfilename = sqlfile.file_name().to_str().unwrap().to_owned();
let sqlcontent = std::fs::read_to_string(sqlfile.path()).unwrap();
write!(&mut file, r####"
map.insert(r###"{sqlfilename}"###, r###"{sqlcontent}"###);
"####).unwrap();
}

writeln!(&mut file, "
map
}});"
).unwrap();
}

fn main() {
build_data::set_GIT_COMMIT_SHORT();
build_data::set_GIT_DIRTY();
Expand All @@ -51,4 +75,5 @@ fn main() {
println!("cargo:rustc-env=SAMPLY_USER_AGENT=Samply.Focus.{}/{}", env!("CARGO_PKG_NAME"), version());

build_cqlmap();
build_sqlmap();
}
1 change: 1 addition & 0 deletions resources/sql/SELECT_TABLES
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
SELECT * FROM pg_catalog.pg_tables
26 changes: 19 additions & 7 deletions src/config.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::path::PathBuf;
use std::fmt;
use std::path::PathBuf;

use beam_lib::AppId;
use clap::Parser;
Expand All @@ -10,7 +10,6 @@ use tracing::{debug, info, warn};

use crate::errors::FocusError;


#[derive(clap::ValueEnum, Clone, PartialEq, Debug)]
pub enum Obfuscate {
No,
Expand All @@ -21,18 +20,21 @@ pub enum Obfuscate {
pub enum EndpointType {
Blaze,
Omop,
BlazeAndSql,
Sql,
}

impl fmt::Display for EndpointType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
EndpointType::Blaze => write!(f, "blaze"),
EndpointType::Blaze => write!(f, "blaze"),
EndpointType::Omop => write!(f, "omop"),
EndpointType::BlazeAndSql => write!(f, "blaze_and_sql"),
EndpointType::Sql => write!(f, "sql"),
}
}
}


pub(crate) static CONFIG: Lazy<Config> = Lazy::new(|| {
debug!("Loading config");
Config::load().unwrap_or_else(|e| {
Expand Down Expand Up @@ -128,7 +130,12 @@ struct CliArgs {
rounding_step: usize,

/// Projects for which the results are not to be obfuscated, separated by ;
#[clap(long, env, value_parser, default_value = "exliquid;dktk_supervisors;exporter;ehds2")]
#[clap(
long,
env,
value_parser,
default_value = "exliquid;dktk_supervisors;exporter;ehds2"
)]
projects_no_obfuscation: String,

/// Path to a file containing BASE64 encoded queries whose results are to be cached
Expand All @@ -142,7 +149,7 @@ struct CliArgs {
/// OMOP provider name
#[clap(long, env, value_parser)]
provider: Option<String>,

/// Base64 encoded OMOP provider icon
#[clap(long, env, value_parser)]
provider_icon: Option<String>,
Expand All @@ -151,6 +158,9 @@ struct CliArgs {
#[clap(long, env, value_parser)]
auth_header: Option<String>,

/// Database connection string
#[clap(long, env, value_parser)]
postgres_connection_string: Option<String>,
}

pub(crate) struct Config {
Expand Down Expand Up @@ -178,6 +188,7 @@ pub(crate) struct Config {
pub provider: Option<String>,
pub provider_icon: Option<String>,
pub auth_header: Option<String>,
pub postgres_connection_string: Option<String>,
}

impl Config {
Expand Down Expand Up @@ -219,6 +230,7 @@ impl Config {
provider: cli_args.provider,
provider_icon: cli_args.provider_icon,
auth_header: cli_args.auth_header,
postgres_connection_string: cli_args.postgres_connection_string,
client,
};
Ok(config)
Expand Down Expand Up @@ -274,7 +286,7 @@ pub fn prepare_reqwest_client(certs: &Vec<Certificate>) -> Result<reqwest::Clien
),
"all_proxy" => proxies.push(
Proxy::all(v)
.map_err( FocusError::InvalidProxyConfig)?
.map_err(FocusError::InvalidProxyConfig)?
.no_proxy(no_proxy.clone()),
),
_ => (),
Expand Down
64 changes: 64 additions & 0 deletions src/db.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
use crate::errors::FocusError;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use sqlx::{postgres::PgPoolOptions, postgres::PgRow, PgPool};
use sqlx_pgrow_serde::SerMapPgRow;
use std::{collections::HashMap, time::Duration};
use tracing::{warn, info, debug};


#[derive(Serialize, Deserialize, Debug, Default, Clone)]
pub struct SqlQuery {
pub payload: String,
}

include!(concat!(env!("OUT_DIR"), "/sql_replace_map.rs"));

pub async fn get_pg_connection_pool(pg_url: &str, max_attempts: u32) -> Result<PgPool, FocusError> {
info!("Trying to establish a PostgreSQL connection pool");

tryhard::retry_fn(|| async {
info!("Attempting to connect to PostgreSQL");
PgPoolOptions::new()
.max_connections(10)
.connect(pg_url)
.await
.map_err(|e| {
warn!("Failed to connect to PostgreSQL: {}", e);
FocusError::CannotConnectToDatabase(e.to_string())
})
})
.retries(max_attempts)
.exponential_backoff(Duration::from_secs(2))
.await
}

pub async fn run_query(pool: &PgPool, query: &str) -> Result<Vec<PgRow>, FocusError> {
sqlx::query(query)
.fetch_all(pool)
.await
.map_err(FocusError::ErrorExecutingQuery)
}

pub async fn process_sql_task(pool: &PgPool, key: &str) -> Result<Vec<PgRow>, FocusError> {
debug!("Executing query with key = {}", &key);
let sql_query = SQL_REPLACE_MAP.get(&key);
let Some(query) = sql_query else {
return Err(FocusError::QueryNotAllowed(key.into()));
};
debug!("Executing query {}", &query);

run_query(pool, query).await
}

pub fn serialize_rows(rows: Vec<PgRow>) -> Result<Value, FocusError> {
let mut rows_json: Vec<Value> = Vec::with_capacity(rows.len());

for row in rows {
let row = SerMapPgRow::from(row);
let row_json = serde_json::to_value(&row)?;
rows_json.push(row_json);
}

Ok(Value::Array(rows_json))
}
8 changes: 8 additions & 0 deletions src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ pub enum FocusError {
MissingExporterEndpoint,
#[error("Missing Exporter Task Type")]
MissingExporterTaskType,
#[error("Cannot connect to database: {0}")]
CannotConnectToDatabase(String),
#[error("Error executing query: {0}")]
ErrorExecutingQuery(sqlx::Error),
#[error("QueryResultBad: {0}")]
QueryResultBad(String),
#[error("Query not allowed: {0}")]
QueryNotAllowed(String),
}

impl FocusError {
Expand Down
Loading

0 comments on commit b921140

Please sign in to comment.