Skip to content

Commit

Permalink
add ability to switch off/on creation of parquet dwh
Browse files Browse the repository at this point in the history
  • Loading branch information
mozzy11 committed May 29, 2024
1 parent c1dd688 commit 9ffab3a
Show file tree
Hide file tree
Showing 12 changed files with 194 additions and 30 deletions.
38 changes: 28 additions & 10 deletions cloudbuild.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -118,15 +118,6 @@ steps:
args: [ '-U', 'admin', '-d', 'postgres', '-h', 'hapi-fhir-db', '-p', '5432',
'-c', 'CREATE DATABASE views;']

# Resetting FHIR sink server
- name: 'docker/compose'
id: 'Turn down FHIR Sink Server'
args: [ '-f', './docker/sink-compose.yml', 'down' ,'-v']

- name: 'docker/compose'
id: 'Launch HAPI FHIR Sink Server'
args: [ '-f', './docker/sink-compose.yml', 'up','--force-recreate', '-d' ]

- name: 'docker/compose'
id: 'Bring up controller and Spark containers'
env:
Expand All @@ -148,9 +139,36 @@ steps:
# - -c
# - docker logs pipeline-controller

- name: 'docker/compose'
id: 'Bring down controller and Spark containers for FHIR server to FHIR server sync'
args: [ '-f', './docker/compose-controller-spark-sql-single.yaml', 'down' ,'-v']

# Resetting Sink FHIR server
- name: 'docker/compose'
id: 'Turn down HAPI Sink FHIR Server for FHIR server to FHIR server sync'
args: [ '-f', './docker/sink-compose.yml', 'down' ,'-v']

- name: 'docker/compose'
id: 'Launch Sink FHIR Server for FHIR server to FHIR server sync'
args: ['-f', './docker/sink-compose.yml', 'up', '--force-recreate', '-d']

# Spinning up only the pipeline controller for FHIR server to FHIR server sync
- name: 'docker/compose'
id: 'Bring up only the pipeline controller for FHIR server to FHIR server sync'
env:
- PIPELINE_CONFIG=/workspace/docker/config_fhir_sink
- DWH_ROOT=/workspace/e2e-tests/controller-spark/dwh
args: [ '-f', './docker/compose-controller-spark-sql-single.yaml', 'up',
'--force-recreate', '--no-deps' , '-d' ,'pipeline-controller' ]

- name: '${_REPOSITORY}/e2e-tests/controller-spark:${_TAG}'
id: 'Run E2E Test for Dockerized Controller for FHIR server to FHIR server sync'
env:
- DWH_TYPE="FHIR"

- name: 'docker/compose'
id: 'Bring down controller and Spark containers'
args: [ '-f', './docker/compose-controller-spark-sql-single.yaml', 'down' ]
args: [ '-f', './docker/compose-controller-spark-sql-single.yaml', 'down' ,'-v']

- name: 'docker/compose'
id: 'Turn down HAPI Source and Sink Servers'
Expand Down
4 changes: 3 additions & 1 deletion docker/config/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ fhirdata:
# fhirServerUrl: "http://hapi-server:8080/fhir"
dbConfig: "config/hapi-postgres-config_local.json"
dwhRootPrefix: "/dwh/controller_DWH"
#Whether to create a Parquet DWH or not
createParquetDwh: true
incrementalSchedule: "0 0 * * * *"
purgeSchedule: "0 30 * * * *"
numOfDwhSnapshotsToRetain: 2
Expand All @@ -43,7 +45,7 @@ fhirdata:
rowGroupSizeForParquetFiles: 33554432 # 32mb
viewDefinitionsDir: "config/views"
sinkDbConfigPath: "config/hapi-postgres-config_local_views.json"
sinkFhirServerUrl: "http://sink-server:8080/fhir"
#sinkFhirServerUrl: "http://sink-server:8080/fhir"
#sinkUserName: "hapi"
#sinkPassword: "hapi123"
recursiveDepth: 1
Expand Down
59 changes: 59 additions & 0 deletions docker/config_fhir_sink/application.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#
# Copyright 2020-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# See `pipelines/controller/config/application.yaml` for full documentation
# of these options.
# This config is meant to be used by `compose-controller-spark-sql.yaml`.
fhirdata:
# 172.17.0.1 is an example docker network interface ip address;
# `hapi-server` is another docker example where a container with that name is
# running on the same docker network.
# fhirServerUrl: "http://172.17.0.1:8091/fhir"
# fhirServerUrl: "http://hapi-server:8080/fhir"
dbConfig: "config/hapi-postgres-config_local.json"
dwhRootPrefix: "/dwh/controller_DWH"
#Whether to create a Parquet DWH or not
createParquetDwh: false
incrementalSchedule: "0 0 * * * *"
purgeSchedule: "0 30 * * * *"
numOfDwhSnapshotsToRetain: 2
# There is no Questionnaire in our test FHIR server, but it is added to
# prevent regression of https://github.com/google/fhir-data-pipes/issues/785.
# TODO: add resource table creation to e2e tests.
resourceList: "Patient,Encounter,Observation,Questionnaire,Condition,Practitioner,Location,Organization,DiagnosticReport,Immunization,MedicationRequest,PractitionerRole,Procedure"
numThreads: 1
autoGenerateFlinkConfiguration: true
createHiveResourceTables: false
#thriftserverHiveConfig: "config/thriftserver-hive-config_local.json"
#hiveResourceViewsDir: "config/views"
# structureDefinitionsPath: "config/profile-definitions"
structureDefinitionsPath: "classpath:/r4-us-core-definitions"
fhirVersion: "R4"
rowGroupSizeForParquetFiles: 33554432 # 32mb
#viewDefinitionsDir: "config/views"
#sinkDbConfigPath: "config/hapi-postgres-config_local_views.json"
sinkFhirServerUrl: "http://sink-server:8080/fhir"
#sinkUserName: "hapi"
#sinkPassword: "hapi123"
recursiveDepth: 1

# Enable spring boot actuator end points, use "*" to expose all endpoints, or a comma-separated
# list to expose selected ones
management:
endpoints:
web:
exposure:
include: health,info,metrics,prometheus,pipeline-metrics
31 changes: 31 additions & 0 deletions docker/config_fhir_sink/flink-conf.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# To use this config, FLINK_CONF_DIR env. var should be set to the parent dir.

# This is needed to prevent an "Insufficient number of network buffers"
# exceptions when running the merger on large input with many workers.
taskmanager.memory.network.max: 256mb

# This is needed to be able to process large resources, otherwise in JDBC
# mode we may get the following exception:
# "The record exceeds the maximum size of a sort buffer ..."
taskmanager.memory.managed.size: 256mb

# This is to make pipeline.run() non-blocking with FlinkRunner; unfortunately
# this is overwritten in `local` mode: https://stackoverflow.com/a/74416240
execution.attached: false

# This is required to track the pipeline metrics when FlinkRunner is used.
execution.job-listeners: com.google.fhir.analytics.metrics.FlinkJobListener
9 changes: 9 additions & 0 deletions docker/config_fhir_sink/hapi-postgres-config_local.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"jdbcDriverClass": "org.postgresql.Driver",
"databaseService" : "postgresql",
"databaseHostName" : "hapi-fhir-db",
"databasePort" : "5432",
"databaseUser" : "admin",
"databasePassword" : "admin",
"databaseName" : "hapi"
}
3 changes: 2 additions & 1 deletion e2e-tests/controller-spark/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,6 @@ COPY parquet-tools-1.11.1.jar parquet-tools-1.11.1.jar
ENV PARQUET_SUBDIR="dwh"
ENV DOCKER_NETWORK="--use_docker_network"
ENV HOME_DIR="/workspace/e2e-tests/controller-spark"
ENV DWH_TYPE="PARQUET"

ENTRYPOINT cd ${HOME_DIR}; ./controller_spark_sql_validation.sh ${HOME_DIR} ${PARQUET_SUBDIR} ${DOCKER_NETWORK}
ENTRYPOINT cd ${HOME_DIR}; ./controller_spark_sql_validation.sh ${HOME_DIR} ${PARQUET_SUBDIR} ${DOCKER_NETWORK} ${DWH_TYPE}
54 changes: 39 additions & 15 deletions e2e-tests/controller-spark/controller_spark_sql_validation.sh
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,13 @@ function validate_args() {
# anything that needs printing
#################################################
function print_message() {
local print_prefix="E2E TEST FOR CONTROLLER SPARK DEPLOYMENT:"
local print_prefix=""
if [[ "${DWH_TYPE}" == "PARQUET" ]]
then
print_prefix="E2E TEST FOR CONTROLLER SPARK DEPLOYMENT:"
else
print_prefix="E2E TEST FOR CONTROLLER FHIR SERVER TO FHIR SERVER SYNC:"
fi
echo "${print_prefix} $*"
}

Expand All @@ -88,6 +94,7 @@ function print_message() {
function setup() {
HOME_PATH=$1
PARQUET_SUBDIR=$2
DWH_TYPE=$4
SOURCE_FHIR_SERVER_URL='http://localhost:8091'
SINK_FHIR_SERVER_URL='http://localhost:8098'
PIPELINE_CONTROLLER_URL='http://localhost:8090'
Expand Down Expand Up @@ -187,7 +194,7 @@ function run_pipeline() {
#######################################################################
function check_parquet() {
local isIncremental=$1
local runtime="15 minute"
local runtime="5 minute"
local end_time=$(date -ud "$runtime" +%s)
local output="${HOME_PATH}/${PARQUET_SUBDIR}"
local timeout=true
Expand Down Expand Up @@ -224,7 +231,7 @@ function check_parquet() {
timeout=false
break
else
sleep 20
sleep 10
fi
fi
done
Expand Down Expand Up @@ -412,8 +419,14 @@ setup "$@"
fhir_source_query
sleep 50
run_pipeline "FULL"
check_parquet false
test_fhir_sink "FULL"
if [[ "${DWH_TYPE}" == "PARQUET" ]]
then
check_parquet false
else
# Provide enough Buffer time for FULL pipeline to completely run before testing the sink FHIR server
sleep 900
test_fhir_sink "FULL"
fi

clear

Expand All @@ -425,16 +438,27 @@ update_resource
sleep 60
# Incremental run.
run_pipeline "INCREMENTAL"
check_parquet true
fhir_source_query
test_fhir_sink "INCREMENTAL"

validate_resource_tables
validate_resource_tables_data
validate_updated_resource
if [[ "${DWH_TYPE}" == "PARQUET" ]]
then
check_parquet true
else
fhir_source_query
# Provide enough Buffer time for FULL pipeline to completely run before testing the sink FHIR server
sleep 300
test_fhir_sink "INCREMENTAL"
fi

if [[ "${DWH_TYPE}" == "PARQUET" ]]
then
validate_resource_tables
validate_resource_tables_data
validate_updated_resource

# View recreation run
# TODO add validation for the views as well
run_pipeline "VIEWS"

fi

# View recreation run
# TODO add validation for the views as well
run_pipeline "VIEWS"

print_message "END!!"
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public void writeResource(HapiRowDescriptor element)

numFetchedResourcesMap.get(resourceType).inc(1);

if (!parquetFile.isEmpty()) {
if (parquetUtil != null) {
startTime = System.currentTimeMillis();
parquetUtil.write(resource);
totalGenerateTimeMillisMap.get(resourceType).inc(System.currentTimeMillis() - startTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import ca.uhn.fhir.parser.IParser;
import com.cerner.bunsen.exception.ProfileException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.fhir.analytics.JdbcConnectionPools.DataSourceConfig;
import com.google.fhir.analytics.model.DatabaseConfiguration;
import com.google.fhir.analytics.view.ViewApplicationException;
Expand Down Expand Up @@ -88,6 +87,8 @@ abstract class FetchSearchPageFn<T> extends DoFn<T, KV<String, Integer>> {

protected final String parquetFile;

protected final Boolean createParquetDwh;

private final int secondsToFlush;

private final int rowGroupSize;
Expand Down Expand Up @@ -130,6 +131,7 @@ abstract class FetchSearchPageFn<T> extends DoFn<T, KV<String, Integer>> {
this.oAuthClientSecret = options.getFhirServerOAuthClientSecret();
this.stageIdentifier = stageIdentifier;
this.parquetFile = options.getOutputParquetPath();
this.createParquetDwh = options.isCreateParquetDwh();
this.secondsToFlush = options.getSecondsToFlushParquetFiles();
this.rowGroupSize = options.getRowGroupSizeForParquetFiles();
this.viewDefinitionsDir = options.getViewDefinitionsDir();
Expand Down Expand Up @@ -200,7 +202,7 @@ public void setup() throws SQLException, ProfileException {
oAuthClientSecret,
fhirContext);
fhirSearchUtil = new FhirSearchUtil(fetchUtil);
if (!Strings.isNullOrEmpty(parquetFile)) {
if (createParquetDwh) {
parquetUtil =
new ParquetUtil(
fhirContext.getVersion().getVersion(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,4 +260,10 @@ public interface FhirEtlOptions extends BasePipelineOptions {
String getSourceNDJsonFilePattern();

void setSourceNDJsonFilePattern(String value);

@Description("Flag to switch off/on creation of a parquet DWH")
@Default.Boolean(true)
Boolean isCreateParquetDwh();

void setCreateParquetDwh(Boolean value);
}
3 changes: 3 additions & 0 deletions pipelines/controller/config/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ fhirdata:
# that directory too, such that files created by the pipelines are readable by
# the Thrift Server, e.g., `setfacl -d -m o::rx dwh/`.
dwhRootPrefix: "dwh/controller_DEV_DWH"
#Whether to create a Parquet DWH or not.In case of syncying between a FHIR server to FHIR server ,
# generation of parquet DWH could be switched off/on
createParquetDwh: true

# The schedule for automatic incremental pipeline runs.
# Uses the Spring CronExpression format, i.e.,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ public class DataProperties {

private int recursiveDepth;

private boolean createParquetDwh;

@PostConstruct
void validateProperties() {
CronExpression.parse(incrementalSchedule);
Expand All @@ -127,6 +129,9 @@ void validateProperties() {
"At least one of fhirServerUrl or dbConfig should be set!");
Preconditions.checkState(fhirVersion != null, "FhirVersion cannot be empty");

Preconditions.checkArgument(
!Strings.isNullOrEmpty(dwhRootPrefix), "dwhRootPrefix is required!");

if (!Strings.isNullOrEmpty(dbConfig)) {
if (!Strings.isNullOrEmpty(fhirServerUrl)) {
logger.warn("Both fhirServerUrl and dbConfig are set; ignoring fhirServerUrl!");
Expand All @@ -138,6 +143,7 @@ void validateProperties() {
logger.info("Using FHIR-search mode since dbConfig is not set.");
}
Preconditions.checkState(!createHiveResourceTables || !thriftserverHiveConfig.isEmpty());
Preconditions.checkState(!createHiveResourceTables || createParquetDwh);
}

private PipelineConfig.PipelineConfigBuilder addFlinkOptions(FhirEtlOptions options) {
Expand Down Expand Up @@ -213,6 +219,8 @@ PipelineConfig createBatchOptions() {
Instant.now().toString().replace(":", "-").replace("-", "_").replace(".", "_");
options.setOutputParquetPath(dwhRootPrefix + TIMESTAMP_PREFIX + timestampSuffix);

options.setCreateParquetDwh(createParquetDwh);

PipelineConfig.PipelineConfigBuilder pipelineConfigBuilder = addFlinkOptions(options);

// Get hold of thrift server parquet directory from dwhRootPrefix config.
Expand All @@ -230,6 +238,7 @@ List<ConfigFields> getConfigParams() {
return List.of(
new ConfigFields("fhirdata.fhirServerUrl", fhirServerUrl, "", ""),
new ConfigFields("fhirdata.dwhRootPrefix", dwhRootPrefix, "", ""),
new ConfigFields("fhirdata.createParquetDwh", String.valueOf(createParquetDwh), "", ""),
new ConfigFields("fhirdata.incrementalSchedule", incrementalSchedule, "", ""),
new ConfigFields("fhirdata.purgeSchedule", purgeSchedule, "", ""),
new ConfigFields(
Expand Down

0 comments on commit 9ffab3a

Please sign in to comment.