Skip to content

Commit

Permalink
Merge branch 'feature/sql' of github.com:samply/focus into feature/sql
Browse files Browse the repository at this point in the history
  • Loading branch information
enola-dkfz committed Jul 22, 2024
2 parents f55cfbb + dff9045 commit c5b647f
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 50 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ rand = { default-features = false, version = "0.8.5" }
futures-util = { version = "0.3", default-features = false, features = ["std"] }
sqlx = { version = "0.7.4", features = [ "runtime-tokio", "postgres", "macros", "chrono"] }
sqlx-pgrow-serde = "0.2.0"
tokio-retry = "0.3"
tryhard = "0.5"

# Logging
tracing = { version = "0.1.37", default_features = false }
Expand Down
55 changes: 6 additions & 49 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@ use serde::{Deserialize, Serialize};
use serde_json::Value;
use sqlx::{postgres::PgPoolOptions, postgres::PgRow, PgPool};
use sqlx_pgrow_serde::SerMapPgRow;
use std::collections::HashMap;
use tokio_retry::strategy::{jitter, ExponentialBackoff};
use tokio_retry::Retry;
use tracing::{debug, info, warn};
use std::{collections::HashMap, time::Duration};
use tracing::{warn, info, debug};


#[derive(Serialize, Deserialize, Debug, Default, Clone)]
pub struct SqlQuery {
Expand All @@ -18,11 +17,7 @@ include!(concat!(env!("OUT_DIR"), "/sql_replace_map.rs"));
pub async fn get_pg_connection_pool(pg_url: &str, max_attempts: u32) -> Result<PgPool, FocusError> {
info!("Trying to establish a PostgreSQL connection pool");

let retry_strategy = ExponentialBackoff::from_millis(1000)
.map(jitter)
.take(max_attempts as usize);

Retry::spawn(retry_strategy, || async {
tryhard::retry_fn(|| async {
info!("Attempting to connect to PostgreSQL");
PgPoolOptions::new()
.max_connections(10)
Expand All @@ -33,14 +28,11 @@ pub async fn get_pg_connection_pool(pg_url: &str, max_attempts: u32) -> Result<P
FocusError::CannotConnectToDatabase(e.to_string())
})
})
.retries(max_attempts)
.exponential_backoff(Duration::from_secs(2))
.await
}

pub async fn healthcheck(pool: &PgPool) -> bool {
let res = run_query(pool, SQL_REPLACE_MAP.get("SELECT_TABLES").unwrap()).await; //this file exists, safe to unwrap
res.is_ok()
}

pub async fn run_query(pool: &PgPool, query: &str) -> Result<Vec<PgRow>, FocusError> {
sqlx::query(query)
.fetch_all(pool)
Expand Down Expand Up @@ -70,38 +62,3 @@ pub fn serialize_rows(rows: Vec<PgRow>) -> Result<Value, FocusError> {

Ok(Value::Array(rows_json))
}

#[cfg(test)]
mod test {
use super::*;

#[tokio::test]
#[ignore] //TODO mock DB
async fn connect_healthcheck() {
let pool =
get_pg_connection_pool("postgresql://postgres:secret@localhost:5432/postgres", 1)
.await
.unwrap();

assert!(healthcheck(&pool).await);
}

#[tokio::test]
#[ignore] //TODO mock DB
async fn serialize() {
let pool =
get_pg_connection_pool("postgresql://postgres:secret@localhost:5432/postgres", 1)
.await
.unwrap();

let rows = run_query(&pool, SQL_REPLACE_MAP.get("SELECT_TABLES").unwrap())
.await
.unwrap();

let rows_json = serialize_rows(rows).unwrap();

assert!(rows_json.is_array());

assert_ne!(rows_json[0]["hasindexes"], Value::Null);
}
}

0 comments on commit c5b647f

Please sign in to comment.