Skip to content

Commit

Permalink
Added FHIR Sink Config Properties to the Pipeline Controller (#947)
Browse files Browse the repository at this point in the history
  • Loading branch information
mozzy11 authored May 29, 2024
1 parent 1b7199b commit c1dd688
Show file tree
Hide file tree
Showing 8 changed files with 118 additions and 21 deletions.
9 changes: 9 additions & 0 deletions cloudbuild.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,15 @@ steps:
args: [ '-U', 'admin', '-d', 'postgres', '-h', 'hapi-fhir-db', '-p', '5432',
'-c', 'CREATE DATABASE views;']

# Resetting FHIR sink server
- name: 'docker/compose'
id: 'Turn down FHIR Sink Server'
args: [ '-f', './docker/sink-compose.yml', 'down' ,'-v']

- name: 'docker/compose'
id: 'Launch HAPI FHIR Sink Server'
args: [ '-f', './docker/sink-compose.yml', 'up','--force-recreate', '-d' ]

- name: 'docker/compose'
id: 'Bring up controller and Spark containers'
env:
Expand Down
3 changes: 3 additions & 0 deletions docker/config/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ fhirdata:
rowGroupSizeForParquetFiles: 33554432 # 32mb
viewDefinitionsDir: "config/views"
sinkDbConfigPath: "config/hapi-postgres-config_local_views.json"
sinkFhirServerUrl: "http://sink-server:8080/fhir"
#sinkUserName: "hapi"
#sinkPassword: "hapi123"
recursiveDepth: 1

# Enable spring boot actuator end points, use "*" to expose all endpoints, or a comma-separated
Expand Down
4 changes: 1 addition & 3 deletions docker/sink-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@
version: '2.4'
services:
sink-server:
# Using an older version as the latest image broke our build.
# image: "hapiproject/hapi:latest"
image: "hapiproject/hapi:v6.1.0"
image: "hapiproject/hapi:latest"
container_name: sink-server
environment:
- spring.datasource.hikari.maximum-pool-size=50
Expand Down
96 changes: 81 additions & 15 deletions e2e-tests/controller-spark/controller_spark_sql_validation.sh
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ function print_message() {
# HOME_PATH
# PARQUET_SUBDIR
# SOURCE_FHIR_SERVER_URL
# SINK_FHIR_SERVER_URL
# PIPELINE_CONTROLLER_URL
# THRIFTSERVER_URL
# Arguments:
Expand All @@ -88,15 +89,50 @@ function setup() {
HOME_PATH=$1
PARQUET_SUBDIR=$2
SOURCE_FHIR_SERVER_URL='http://localhost:8091'
SINK_FHIR_SERVER_URL='http://localhost:8098'
PIPELINE_CONTROLLER_URL='http://localhost:8090'
THRIFTSERVER_URL='localhost:10001'
if [[ $3 = "--use_docker_network" ]]; then
SOURCE_FHIR_SERVER_URL='http://hapi-server:8080'
SINK_FHIR_SERVER_URL='http://sink-server:8080'
PIPELINE_CONTROLLER_URL='http://pipeline-controller:8080'
THRIFTSERVER_URL='spark:10000'
fi
}

#######################################################################
# This function queries fhir server and writes results to json files.
# Globals:
# HOME_PATH
# PARQUET_SUBDIR
# Arguments:
# server_url: url of the source fhir server.
# patient_json_file : file to write Patient results
# encounter_json_file : file to write Encounter results
# obs_json_file : file to write Observation results
#######################################################################
function query_fhir_server(){
local query_param="?_summary=count"
local server_url=$1
local patient_json_file=$2
local encounter_json_file=$3
local obs_json_file=$4

print_message "Finding number of patients, encounters and obs in FHIR server"

curl -L -X GET -u hapi:hapi --connect-timeout 5 --max-time 20 \
"${server_url}/fhir/Patient${query_param}" 2>/dev/null \
>"${HOME_PATH}/${PARQUET_SUBDIR}/${patient_json_file}"

curl -L -X GET -u hapi:hapi --connect-timeout 5 --max-time 20 \
"${server_url}/fhir/Encounter${query_param}" 2>/dev/null \
>"${HOME_PATH}/${PARQUET_SUBDIR}/${encounter_json_file}"

curl -L -X GET -u hapi:hapi --connect-timeout 5 --max-time 20 \
"${server_url}/fhir/Observation${query_param}" 2>/dev/null\
>"${HOME_PATH}/${PARQUET_SUBDIR}/${obs_json_file}"
}

#################################################
# Function to count resources in source fhir server
# Globals:
Expand All @@ -108,26 +144,14 @@ function setup() {
# TOTAL_TEST_OBS
#################################################
function fhir_source_query() {
local query_param="?_summary=count"
local fhir_username="hapi"
local fhir_password="hapi"
local fhir_url_extension="/fhir"

curl -L -X GET -u $fhir_username:$fhir_password --connect-timeout 5 --max-time 20 \
"${SOURCE_FHIR_SERVER_URL}${fhir_url_extension}/Patient${query_param}" 2>/dev/null \
>"${HOME_PATH}/${PARQUET_SUBDIR}/patients.json"
query_fhir_server "${SOURCE_FHIR_SERVER_URL}" "patients.json" "encounters.json" "obs.json"
TOTAL_TEST_PATIENTS=$(jq '.total' "${HOME_PATH}/${PARQUET_SUBDIR}/patients.json")
print_message "Total FHIR source test patients ---> ${TOTAL_TEST_PATIENTS}"

curl -L -X GET -u $fhir_username:$fhir_password --connect-timeout 5 --max-time 20 \
"${SOURCE_FHIR_SERVER_URL}${fhir_url_extension}/Encounter${query_param}" \
2>/dev/null >"${HOME_PATH}/${PARQUET_SUBDIR}/encounters.json"
TOTAL_TEST_ENCOUNTERS=$(jq '.total' "${HOME_PATH}/${PARQUET_SUBDIR}/encounters.json")
print_message "Total FHIR source test encounters ---> ${TOTAL_TEST_ENCOUNTERS}"

curl -L -X GET -u $fhir_username:$fhir_password --connect-timeout 5 --max-time 20 \
"${SOURCE_FHIR_SERVER_URL}${fhir_url_extension}/Observation${query_param}" \
2>/dev/null >"${HOME_PATH}/${PARQUET_SUBDIR}/obs.json"
TOTAL_TEST_OBS=$(jq '.total' "${HOME_PATH}/${PARQUET_SUBDIR}/obs.json")
print_message "Total FHIR source test obs ---> ${TOTAL_TEST_OBS}"
}
Expand Down Expand Up @@ -163,7 +187,7 @@ function run_pipeline() {
#######################################################################
function check_parquet() {
local isIncremental=$1
local runtime="5 minute"
local runtime="15 minute"
local end_time=$(date -ud "$runtime" +%s)
local output="${HOME_PATH}/${PARQUET_SUBDIR}"
local timeout=true
Expand Down Expand Up @@ -200,7 +224,7 @@ function check_parquet() {
timeout=false
break
else
sleep 10
sleep 20
fi
fi
done
Expand Down Expand Up @@ -345,23 +369,65 @@ function validate_updated_resource() {
fi
}


#################################################
# Function that counts resources in FHIR server and compares output to what is
# in the source FHIR server
# Globals:
# HOME_PATH
# PARQUET_SUBDIR
# SINK_FHIR_SERVER_URL
# TOTAL_TEST_PATIENTS
# TOTAL_TEST_ENCOUNTERS
# TOTAL_TEST_OBS
#################################################
function test_fhir_sink(){
local runMode=$1

query_fhir_server "${SINK_FHIR_SERVER_URL}" "patients-sink.json" "encounters-sink.json" "obs-sink.json"

print_message "Counting number of patients, encounters and obs sinked to fhir files"

local total_patients_sinked_fhir=$(jq '.total' "${HOME_PATH}/${PARQUET_SUBDIR}/patients-sink.json")
print_message "Total patients sinked to fhir ---> ${total_patients_sinked_fhir}"

local total_encounters_sinked_fhir=$(jq '.total' "${HOME_PATH}/${PARQUET_SUBDIR}/encounters-sink.json")
print_message "Total encounters sinked to fhir ---> ${total_encounters_sinked_fhir}"

local total_obs_sinked_fhir=$(jq '.total' "${HOME_PATH}/${PARQUET_SUBDIR}/obs-sink.json")
print_message "Total observations sinked to fhir ---> ${total_obs_sinked_fhir}"

if [[ "${total_patients_sinked_fhir}" == "${TOTAL_TEST_PATIENTS}" && "${total_encounters_sinked_fhir}" \
== "${TOTAL_TEST_ENCOUNTERS}" && "${total_obs_sinked_fhir}" == "${TOTAL_TEST_OBS}" ]] \
; then
print_message "FHIR SERVER SINK EXECUTED SUCCESSFULLY USING ${runMode} MODE"
else
print_message "FHIR SERVER SINK TEST FAILED USING ${runMode} MODE"
exit 1
fi
}

validate_args "$@"
setup "$@"
fhir_source_query
sleep 50
run_pipeline "FULL"
check_parquet false
test_fhir_sink "FULL"

clear

add_resource
update_resource

# Provide enough buffer time before triggering the incremental run so that the previous full run
# completes fully (including creation of hive tables)
sleep 60
# Incremental run.
run_pipeline "INCREMENTAL"
check_parquet true
fhir_source_query
test_fhir_sink "INCREMENTAL"

validate_resource_tables
validate_resource_tables_data
Expand Down
4 changes: 2 additions & 2 deletions e2e-tests/pipeline_validation.sh
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,8 @@ function test_fhir_sink() {
local enc_obs_query_param="?_summary=count"

if [[ -n ${STREAMING} ]]; then
patient_query_param="?given=Alberta625"
enc_obs_query_param="?subject.given=Alberta625"
patient_query_param="?given=Alberta625&_summary=count"
enc_obs_query_param="?subject.given=Alberta625&_summary=count"
fi
print_message "Finding number of patients, encounters and obs in FHIR server"

Expand Down
6 changes: 6 additions & 0 deletions pipelines/controller/config/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,12 @@ fhirdata:
# empty string disables this feature.
sinkDbConfigPath: "config/hapi-postgres-config_local_views.json"

# The base URL of the sink FHIR server .if not set ,only a parquet DWH is created
sinkFhirServerUrl: "http://172.17.0.1:8098/fhir"
# The following user-name/password should be set if the sink FHIR server supports Basic Auth.
#sinkUserName: "hapi"
#sinkPassword: "hapi123"

# The maximum depth for traversing StructureDefinitions in Parquet schema
# generation (if it is non-positive, the default 1 will be used). Note in most
# cases, the default 1 is sufficient and increasing that can result in
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ public class DataProperties {

private String fhirServerOAuthClientSecret;

private String sinkFhirServerUrl;

public String sinkUserName;

public String sinkPassword;

private String structureDefinitionsPath;

private int rowGroupSizeForParquetFiles;
Expand Down Expand Up @@ -196,6 +202,12 @@ PipelineConfig createBatchOptions() {
options.setRowGroupSizeForParquetFiles(rowGroupSizeForParquetFiles);
}

if (!Strings.isNullOrEmpty(sinkFhirServerUrl)) {
options.setFhirSinkPath(sinkFhirServerUrl);
options.setSinkUserName(Strings.nullToEmpty(sinkUserName));
options.setSinkPassword(Strings.nullToEmpty(sinkPassword));
}

// Using underscore for suffix as hyphens are discouraged in hive table names.
String timestampSuffix =
Instant.now().toString().replace(":", "-").replace("-", "_").replace(".", "_");
Expand Down Expand Up @@ -230,6 +242,9 @@ List<ConfigFields> getConfigParams() {
new ConfigFields("fhirdata.dbConfig", dbConfig, "", ""),
new ConfigFields("fhirdata.viewDefinitionsDir", viewDefinitionsDir, "", ""),
new ConfigFields("fhirdata.sinkDbConfigPath", sinkDbConfigPath, "", ""),
new ConfigFields("fhirdata.fhirSinkPath", sinkFhirServerUrl, "", ""),
new ConfigFields("fhirdata.sinkUserName", sinkUserName, "", ""),
new ConfigFields("fhirdata.sinkPassword", sinkPassword, "", ""),
new ConfigFields("fhirdata.structureDefinitionsPath", structureDefinitionsPath, "", ""),
new ConfigFields("fhirdata.fhirVersion", fhirVersion.name(), "", ""),
new ConfigFields(
Expand Down
2 changes: 1 addition & 1 deletion synthea-hiv/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ the generator.

## Uploader

To upload to a GCP FHIR Store, run the following commands in your terminal
To upload to a GCP FHIR Store, run the following commands in your terminal
(Python3 is needed):

```bash
Expand Down

0 comments on commit c1dd688

Please sign in to comment.