Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add ability to switch off/on creation of parquet dwh #1074

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 28 additions & 11 deletions cloudbuild.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -118,21 +118,11 @@ steps:
args: [ '-U', 'admin', '-d', 'postgres', '-h', 'hapi-fhir-db', '-p', '5432',
'-c', 'CREATE DATABASE views;']

# Resetting FHIR sink server
- name: 'docker/compose'
id: 'Turn down FHIR Sink Server'
args: [ '-f', './docker/sink-compose.yml', 'down' ,'-v']

- name: 'docker/compose'
id: 'Launch HAPI FHIR Sink Server'
args: [ '-f', './docker/sink-compose.yml', 'up','--force-recreate', '-d' ]

- name: 'docker/compose'
id: 'Bring up controller and Spark containers'
env:
- PIPELINE_CONFIG=/workspace/docker/config
- DWH_ROOT=/workspace/e2e-tests/controller-spark/dwh
- FHIRDATA_SINKFHIRSERVERURL=http://sink-server:8080/fhir
args: [ '-f', './docker/compose-controller-spark-sql-single.yaml', 'up',
'--force-recreate', '-d' ]

Expand All @@ -149,9 +139,36 @@ steps:
# - -c
# - docker logs pipeline-controller

- name: 'docker/compose'
id: 'Bring down controller and Spark containers for FHIR server to FHIR server sync'
args: [ '-f', './docker/compose-controller-spark-sql-single.yaml', 'down' ,'-v']

# Resetting Sink FHIR server
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that these new tests are adding 15+ minutes to the e2e test run-time; I think changes in PR #947 had a similar effect too and we should try to reduce this. How about doing the sync test in one of the scenarios only and see how much it reduces the run-time? Maybe we can have only one scenario where sync is on and Parquet generation is off. Please also make sure that the incremental mode is tested in that scenario.

- name: 'docker/compose'
id: 'Turn down HAPI Sink FHIR Server for FHIR server to FHIR server sync'
args: [ '-f', './docker/sink-compose.yml', 'down' ,'-v']

- name: 'docker/compose'
id: 'Launch Sink FHIR Server for FHIR server to FHIR server sync'
args: ['-f', './docker/sink-compose.yml', 'up', '--force-recreate', '-d']

# Spinning up only the pipeline controller for FHIR server to FHIR server sync
- name: 'docker/compose'
id: 'Bring up only the pipeline controller for FHIR server to FHIR server sync'
env:
- PIPELINE_CONFIG=/workspace/docker/config_fhir_sink
- DWH_ROOT=/workspace/e2e-tests/controller-spark/dwh
args: [ '-f', './docker/compose-controller-spark-sql-single.yaml', 'up',
'--force-recreate', '--no-deps' , '-d' ,'pipeline-controller' ]

- name: '${_REPOSITORY}/e2e-tests/controller-spark:${_TAG}'
id: 'Run E2E Test for Dockerized Controller for FHIR server to FHIR server sync'
env:
- DWH_TYPE="FHIR"

- name: 'docker/compose'
id: 'Bring down controller and Spark containers'
args: [ '-f', './docker/compose-controller-spark-sql-single.yaml', 'down' ]
args: [ '-f', './docker/compose-controller-spark-sql-single.yaml', 'down' ,'-v']

- name: 'docker/compose'
id: 'Turn down HAPI Source and Sink Servers'
Expand Down
2 changes: 0 additions & 2 deletions docker/compose-controller-spark-sql-single.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@ services:
- ${DWH_ROOT}:/dwh
environment:
- JAVA_OPTS=$JAVA_OPTS
# This is to turn this on in e2e but leave it off in the default config.
- FHIRDATA_SINKFHIRSERVERURL=$FHIRDATA_SINKFHIRSERVERURL
ports:
- '8090:8080'
networks:
Expand Down
2 changes: 2 additions & 0 deletions docker/config/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ fhirdata:
# fhirServerUrl: "http://hapi-server:8080/fhir"
dbConfig: "config/hapi-postgres-config_local.json"
dwhRootPrefix: "/dwh/controller_DWH"
#Whether to create a Parquet DWH or not
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can probably drop this comment as we have a reference to pipelines/controller/config/application.yaml at the top for all comments.

createParquetDwh: true
incrementalSchedule: "0 0 * * * *"
purgeSchedule: "0 30 * * * *"
numOfDwhSnapshotsToRetain: 2
Expand Down
59 changes: 59 additions & 0 deletions docker/config_fhir_sink/application.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that most of the content of this directory is a copy of config dir. Can you reuse those config files and only override the values that you need to change, e.g., with command-line arguments?

# Copyright 2020-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# See `pipelines/controller/config/application.yaml` for full documentation
# of these options.
# This config is meant to be used by `compose-controller-spark-sql.yaml`.
fhirdata:
# 172.17.0.1 is an example docker network interface ip address;
# `hapi-server` is another docker example where a container with that name is
# running on the same docker network.
# fhirServerUrl: "http://172.17.0.1:8091/fhir"
# fhirServerUrl: "http://hapi-server:8080/fhir"
dbConfig: "config/hapi-postgres-config_local.json"
dwhRootPrefix: "/dwh/controller_DWH"
#Whether to create a Parquet DWH or not
createParquetDwh: false
incrementalSchedule: "0 0 * * * *"
purgeSchedule: "0 30 * * * *"
numOfDwhSnapshotsToRetain: 2
# There is no Questionnaire in our test FHIR server, but it is added to
# prevent regression of https://github.com/google/fhir-data-pipes/issues/785.
# TODO: add resource table creation to e2e tests.
resourceList: "Patient,Encounter,Observation,Questionnaire,Condition,Practitioner,Location,Organization,DiagnosticReport,Immunization,MedicationRequest,PractitionerRole,Procedure"
numThreads: 1
autoGenerateFlinkConfiguration: true
createHiveResourceTables: false
#thriftserverHiveConfig: "config/thriftserver-hive-config_local.json"
#hiveResourceViewsDir: "config/views"
# structureDefinitionsPath: "config/profile-definitions"
structureDefinitionsPath: "classpath:/r4-us-core-definitions"
fhirVersion: "R4"
rowGroupSizeForParquetFiles: 33554432 # 32mb
#viewDefinitionsDir: "config/views"
#sinkDbConfigPath: "config/hapi-postgres-config_local_views.json"
sinkFhirServerUrl: "http://sink-server:8080/fhir"
#sinkUserName: "hapi"
#sinkPassword: "hapi123"
recursiveDepth: 1

# Enable spring boot actuator end points, use "*" to expose all endpoints, or a comma-separated
# list to expose selected ones
management:
endpoints:
web:
exposure:
include: health,info,metrics,prometheus,pipeline-metrics
31 changes: 31 additions & 0 deletions docker/config_fhir_sink/flink-conf.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# To use this config, FLINK_CONF_DIR env. var should be set to the parent dir.

# This is needed to prevent an "Insufficient number of network buffers"
# exceptions when running the merger on large input with many workers.
taskmanager.memory.network.max: 256mb

# This is needed to be able to process large resources, otherwise in JDBC
# mode we may get the following exception:
# "The record exceeds the maximum size of a sort buffer ..."
taskmanager.memory.managed.size: 256mb

# This is to make pipeline.run() non-blocking with FlinkRunner; unfortunately
# this is overwritten in `local` mode: https://stackoverflow.com/a/74416240
execution.attached: false

# This is required to track the pipeline metrics when FlinkRunner is used.
execution.job-listeners: com.google.fhir.analytics.metrics.FlinkJobListener
9 changes: 9 additions & 0 deletions docker/config_fhir_sink/hapi-postgres-config_local.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"jdbcDriverClass": "org.postgresql.Driver",
"databaseService" : "postgresql",
"databaseHostName" : "hapi-fhir-db",
"databasePort" : "5432",
"databaseUser" : "admin",
"databasePassword" : "admin",
"databaseName" : "hapi"
}
3 changes: 2 additions & 1 deletion e2e-tests/controller-spark/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,6 @@ COPY parquet-tools-1.11.1.jar parquet-tools-1.11.1.jar
ENV PARQUET_SUBDIR="dwh"
ENV DOCKER_NETWORK="--use_docker_network"
ENV HOME_DIR="/workspace/e2e-tests/controller-spark"
ENV DWH_TYPE="PARQUET"

ENTRYPOINT cd ${HOME_DIR}; ./controller_spark_sql_validation.sh ${HOME_DIR} ${PARQUET_SUBDIR} ${DOCKER_NETWORK}
ENTRYPOINT cd ${HOME_DIR}; ./controller_spark_sql_validation.sh ${HOME_DIR} ${PARQUET_SUBDIR} ${DOCKER_NETWORK} ${DWH_TYPE}
54 changes: 39 additions & 15 deletions e2e-tests/controller-spark/controller_spark_sql_validation.sh
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,13 @@ function validate_args() {
# anything that needs printing
#################################################
function print_message() {
local print_prefix="E2E TEST FOR CONTROLLER SPARK DEPLOYMENT:"
local print_prefix=""
if [[ "${DWH_TYPE}" == "PARQUET" ]]
then
print_prefix="E2E TEST FOR CONTROLLER SPARK DEPLOYMENT:"
else
print_prefix="E2E TEST FOR CONTROLLER FHIR SERVER TO FHIR SERVER SYNC:"
fi
echo "${print_prefix} $*"
}

Expand All @@ -88,6 +94,7 @@ function print_message() {
function setup() {
HOME_PATH=$1
PARQUET_SUBDIR=$2
DWH_TYPE=$4
SOURCE_FHIR_SERVER_URL='http://localhost:8091'
SINK_FHIR_SERVER_URL='http://localhost:8098'
PIPELINE_CONTROLLER_URL='http://localhost:8090'
Expand Down Expand Up @@ -187,7 +194,7 @@ function run_pipeline() {
#######################################################################
function check_parquet() {
local isIncremental=$1
local runtime="15 minute"
local runtime="5 minute"
local end_time=$(date -ud "$runtime" +%s)
local output="${HOME_PATH}/${PARQUET_SUBDIR}"
local timeout=true
Expand Down Expand Up @@ -224,7 +231,7 @@ function check_parquet() {
timeout=false
break
else
sleep 20
sleep 10
fi
fi
done
Expand Down Expand Up @@ -412,8 +419,14 @@ setup "$@"
fhir_source_query
sleep 50
run_pipeline "FULL"
check_parquet false
test_fhir_sink "FULL"
if [[ "${DWH_TYPE}" == "PARQUET" ]]
then
check_parquet false
else
# Provide enough Buffer time for FULL pipeline to completely run before testing the sink FHIR server
sleep 900
test_fhir_sink "FULL"
fi

clear

Expand All @@ -425,16 +438,27 @@ update_resource
sleep 60
# Incremental run.
run_pipeline "INCREMENTAL"
check_parquet true
fhir_source_query
test_fhir_sink "INCREMENTAL"

validate_resource_tables
validate_resource_tables_data
validate_updated_resource
if [[ "${DWH_TYPE}" == "PARQUET" ]]
then
check_parquet true
else
fhir_source_query
# Provide enough Buffer time for FULL pipeline to completely run before testing the sink FHIR server
sleep 300
test_fhir_sink "INCREMENTAL"
fi

if [[ "${DWH_TYPE}" == "PARQUET" ]]
then
validate_resource_tables
validate_resource_tables_data
validate_updated_resource

# View recreation run
# TODO add validation for the views as well
run_pipeline "VIEWS"

fi

# View recreation run
# TODO add validation for the views as well
run_pipeline "VIEWS"

print_message "END!!"
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public void writeResource(HapiRowDescriptor element)

numFetchedResourcesMap.get(resourceType).inc(1);

if (!parquetFile.isEmpty()) {
if (parquetUtil != null) {
startTime = System.currentTimeMillis();
parquetUtil.write(resource);
totalGenerateTimeMillisMap.get(resourceType).inc(System.currentTimeMillis() - startTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import ca.uhn.fhir.parser.IParser;
import com.cerner.bunsen.exception.ProfileException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.fhir.analytics.JdbcConnectionPools.DataSourceConfig;
import com.google.fhir.analytics.model.DatabaseConfiguration;
import com.google.fhir.analytics.view.ViewApplicationException;
Expand Down Expand Up @@ -88,6 +87,8 @@ abstract class FetchSearchPageFn<T> extends DoFn<T, KV<String, Integer>> {

protected final String parquetFile;

protected final Boolean createParquetDwh;

private final int secondsToFlush;

private final int rowGroupSize;
Expand Down Expand Up @@ -130,6 +131,7 @@ abstract class FetchSearchPageFn<T> extends DoFn<T, KV<String, Integer>> {
this.oAuthClientSecret = options.getFhirServerOAuthClientSecret();
this.stageIdentifier = stageIdentifier;
this.parquetFile = options.getOutputParquetPath();
this.createParquetDwh = options.isCreateParquetDwh();
this.secondsToFlush = options.getSecondsToFlushParquetFiles();
this.rowGroupSize = options.getRowGroupSizeForParquetFiles();
this.viewDefinitionsDir = options.getViewDefinitionsDir();
Expand Down Expand Up @@ -200,7 +202,7 @@ public void setup() throws SQLException, ProfileException {
oAuthClientSecret,
fhirContext);
fhirSearchUtil = new FhirSearchUtil(fetchUtil);
if (!Strings.isNullOrEmpty(parquetFile)) {
if (createParquetDwh) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a sanity check if createParquetDwh is true but parquetFile is null or empty?

parquetUtil =
new ParquetUtil(
fhirContext.getVersion().getVersion(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,4 +260,10 @@ public interface FhirEtlOptions extends BasePipelineOptions {
String getSourceNDJsonFilePattern();

void setSourceNDJsonFilePattern(String value);

@Description("Flag to switch off/on creation of a parquet DWH")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
@Description("Flag to switch off/on creation of a parquet DWH")
@Description("Flag to switch off/on creation of parquet files; can be turned off when syncing from a FHIR server to another.")

@Default.Boolean(true)
Boolean isCreateParquetDwh();

void setCreateParquetDwh(Boolean value);
}
3 changes: 3 additions & 0 deletions pipelines/controller/config/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ fhirdata:
# that directory too, such that files created by the pipelines are readable by
# the Thrift Server, e.g., `setfacl -d -m o::rx dwh/`.
dwhRootPrefix: "dwh/controller_DEV_DWH"
#Whether to create a Parquet DWH or not.In case of syncying between a FHIR server to FHIR server ,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Please also break long lines at 80 chars for YAML files (I know we have not followed it everywhere in this file but we should).

Suggested change
#Whether to create a Parquet DWH or not.In case of syncying between a FHIR server to FHIR server ,
# Whether to create a Parquet DWH or not. In case of syncing from a FHIR server to another, if Parquet files are not needed, their generation can be switched off by this flag.

# generation of parquet DWH could be switched off/on
createParquetDwh: true

# The schedule for automatic incremental pipeline runs.
# Uses the Spring CronExpression format, i.e.,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ public class DataProperties {

private int recursiveDepth;

private boolean createParquetDwh;

@PostConstruct
void validateProperties() {
CronExpression.parse(incrementalSchedule);
Expand All @@ -127,6 +129,9 @@ void validateProperties() {
"At least one of fhirServerUrl or dbConfig should be set!");
Preconditions.checkState(fhirVersion != null, "FhirVersion cannot be empty");

Preconditions.checkArgument(
!Strings.isNullOrEmpty(dwhRootPrefix), "dwhRootPrefix is required!");

if (!Strings.isNullOrEmpty(dbConfig)) {
if (!Strings.isNullOrEmpty(fhirServerUrl)) {
logger.warn("Both fhirServerUrl and dbConfig are set; ignoring fhirServerUrl!");
Expand All @@ -138,6 +143,7 @@ void validateProperties() {
logger.info("Using FHIR-search mode since dbConfig is not set.");
}
Preconditions.checkState(!createHiveResourceTables || !thriftserverHiveConfig.isEmpty());
Preconditions.checkState(!createHiveResourceTables || createParquetDwh);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there are more config sanity check that needs to be done, e.g., when we are not generating Parquet files, generation of views should be disabled as well.

}

private PipelineConfig.PipelineConfigBuilder addFlinkOptions(FhirEtlOptions options) {
Expand Down Expand Up @@ -213,6 +219,8 @@ PipelineConfig createBatchOptions() {
Instant.now().toString().replace(":", "-").replace("-", "_").replace(".", "_");
options.setOutputParquetPath(dwhRootPrefix + TIMESTAMP_PREFIX + timestampSuffix);

options.setCreateParquetDwh(createParquetDwh);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you tested the incremental pipeline when this flag is turned off. In particular does the mergerPipelines here work fine? I think we need extra logic in PipelineManager to handle these edge cases.


PipelineConfig.PipelineConfigBuilder pipelineConfigBuilder = addFlinkOptions(options);

// Get hold of thrift server parquet directory from dwhRootPrefix config.
Expand All @@ -230,6 +238,7 @@ List<ConfigFields> getConfigParams() {
return List.of(
new ConfigFields("fhirdata.fhirServerUrl", fhirServerUrl, "", ""),
new ConfigFields("fhirdata.dwhRootPrefix", dwhRootPrefix, "", ""),
new ConfigFields("fhirdata.createParquetDwh", String.valueOf(createParquetDwh), "", ""),
new ConfigFields("fhirdata.incrementalSchedule", incrementalSchedule, "", ""),
new ConfigFields("fhirdata.purgeSchedule", purgeSchedule, "", ""),
new ConfigFields(
Expand Down
Loading