This directory contains a reference Cloud Dataflow pipeline to convert HL7v2 messages to FHIR resources. Please note that additional configurations and hardening are required before processing PHI data with this pipeline.
- Have a Linux (Ubuntu & Debian preferred) machine ready.
- Add your public key to GitHub.
- Install the latest GCloud SDK.
- Create a project.
- Create an HL7v2 Store.
- Make sure to use beta endpoints and provide
NotificationConfig
s and a schematizedParserConfig
.
- Make sure to use beta endpoints and provide
- Create a FHIR Store.
- Set enableUpdateCreate and disableReferentialIntegrity for the FHIR store.
- Enable Cloud Dataflow API.
- (Highly recommended) Enable audit logging.
Make sure you have enough permissions to run Cloud Dataflow jobs.
The Cloud Dataflow Controller Service Account needs the following permissions.
roles/pubsub.subscriber
.- To listen for PubSub notifications from new messages. The service account only needs the role on the specific PubSub subscription.
roles/healthcare.hl7V2Consumer
.- To access messages in your HL7v2 store. The service account only needs the role on the source HL7v2 Store.
roles/healthcare.fhirResourceEditor
.- To write transformed resources to your FHIR store. The service account only needs this role on the target FHIR Store.
roles/storage.objectAdmin
.- To access mapping and harmonization configurations on GCS. The service account needs this role on all GCS buckets that the mappings reside in.
Build a fat JAR of the pipeline by running the following from the project directory.
- Please make sure gradle is added to PATH before running the following commands.
# Generate wrapper classes.
gradle wrapper --gradle-version 6.7.1
./gradlew shadowJar
A JAR file should be generated in build/libs
folder.
Now run the pipeline with the following command:
# Please set the environment variables in the following command.
java -jar build/libs/converter-0.1.0-all.jar --pubSubSubscription="projects/${PROJECT?}/subscriptions/${SUBSCRIPTION?}" \
--readErrorPath="gs://${ERROR_BUCKET?}/read/" \
--writeErrorPath="gs://${ERROR_BUCKET?}/write/" \
--mappingErrorPath="gs://${ERROR_BUCKET?}/mapping/" \
--mappingPath="gs://${MAPPING_BUCKET?}/mapping.textproto" \
--fhirStore="projects/${PROJECT?}/locations/${LOCATION?}/datasets/${DATASET?}/fhirStores/${FHIRSTORE?}" \
--runner=DataflowRunner \
--region=${REGION?} \
--project=${PROJECT?}
A few notes:
- By default, streaming pipelines do not have autoscaling enabled, please use
either
--enableStreamingEngine
(recommended) or a combination of--autoscalingAlgorithm=THROUGHPUT_BASED
and--maxNumWorkers=N
to manually enable it. See this page for more details. - For production use, we recommend enabling agent metrics by appending
--experiments=enable_stackdriver_agent_metrics
as an option (you will need to grantroles/monitoring.metricWriter
to Dataflow controller service account as well), see this page for more details. Additionally, we highly recommend limiting the number of threads on each worker, e.g.--numberOfWorkerHarnessThreads=10
. You can tune the limit based on your workload. - To generate a template instead of running the pipeline, add
--stagingLocation=gs://${STAGING_LOCATION} --templateLocation=gs://${TEMPLATE_LOCATION}
to the above command. See here
Please take a look at the PipelineRunner
class to see the concrete meaning of
each argument.
You should be able to verify that a Dataflow pipeline is running from the cloud console UI. Data should start flowing through the pipeline and arrive at the FHIR Store, use the SearchResources API to verify that FHIR Resources are written correctly.
This directory contains a reference Cloud Dataflow pipeline to convert a DICOM Study to a FHIR ImagingStudy resource.
- Have a Linux (Ubuntu & Debian preferred) machine ready.
- Add your public key to GitHub.
- Install the latest GCloud SDK.
- Create a project.
- Create a DICOM Store.
- Create an R4 FHIR Store.
- Set disableReferentialIntegrity for the FHIR store.
- Enable Cloud Dataflow API.
Make sure you have enough permissions to run Cloud Dataflow jobs.
The Cloud Dataflow Controller Service Account needs the following permissions.
roles/pubsub.subscriber
.- To listen for PubSub notifications from new messages. The service account only needs the role on the specific PubSub subscription.
roles/healthcare.dicomEditor
.- To access metadata of DICOM stores.
roles/healthcare.fhirResourceEditor
.- To write transformed resources to your FHIR store. The service account only needs this role on the target FHIR Store.
roles/storage.objectAdmin
.- To access mapping and harmonization configurations on GCS. The service account needs this role on all GCS buckets that the mappings reside in.
Build a fat JAR of the pipeline by running the following from the project directory.
- Please make sure gradle is added to PATH before running the following commands.
# Generate wrapper classes.
gradle wrapper
./gradlew shadowJar -PmainClass=com.google.cloud.healthcare.etl.runner.dicomtofhir.DicomToFhirStreamingRunner
A JAR file should be generated in build/libs
folder.
Now run the pipeline with the following command:
# Please set the environment variables in the following command.
java -jar build/libs/converter-0.1.0-all.jar --pubSubSubscription="projects/${PROJECT?}/subscriptions/${SUBSCRIPTION?}" \
--readErrorPath="gs://${ERROR_BUCKET?}/read/" \
--writeErrorPath="gs://${ERROR_BUCKET?}/write/" \
--mappingErrorPath="gs://${ERROR_BUCKET?}/mapping/" \
--mappingPath="gs://${MAPPING_BUCKET?}/main.textproto" \
--fhirStore="projects/${PROJECT?}/locations/${LOCATION}/datasets/${DATASET?}/fhirStores/${FHIRSTORE?}" \
--runner=DataflowRunner \
--region=${REGION?} \
--project=${PROJECT?}
A few notes:
- By default, streaming pipelines do not have autoscaling enabled, please use
either
--enableStreamingEngine
(recommended) or a combination of--autoscalingAlgorithm=THROUGHPUT_BASED
and--maxNumWorkers=N
to manually enable it. See this page for more details. - For production use, we recommend enabling agent metrics by appending
--experiments=enable_stackdriver_agent_metrics
as an option (you will need to grantroles/monitoring.metricWriter
to Dataflow controller service account as well), see this page for more details. Additionally, we highly recommend limiting the number of threads on each worker, e.g.--numberOfWorkerHarnessThreads=10
. You can tune the limit based on your workload. - To generate a template instead of running the pipeline, add
--stagingLocation=gs://${STAGING_LOCATION} --templateLocation=gs://${TEMPLATE_LOCATION}
to the above command. See here - The mappingPath file (main.textproto) configures the mapping library. Ensure that the paths inside the file exist (References the following repository: https://github.com/GoogleCloudPlatform/healthcare-data-harmonization/). The required binaries should be installed by the build JAR command. There is a sample main.textproto at src/main/java/com/google/cloud/healthcare/etl/runner/dicomtofhir/main.textproto, if specifying GCS (non-local) paths use
gcs_location:
instead oflocal_path:
. - As the mappings do not assign an ID to the mapped FHIR resource, each input creates a new output in the FHIR store. TODO: evaluate maintaining an ID for DICOM Instances.
Please take a look at the PipelineRunner
class to see the concrete meaning of
each argument.
You should be able to verify that a Dataflow pipeline is running from the cloud console UI. Data should start flowing through the pipeline and arrive at the FHIR Store, use the SearchResources API to verify that FHIR Resources are written correctly.
Please file GitHub issues if you encounter any problems.