From b4ba8e7ea04ee6f3eff88809d7190f2a35c43927 Mon Sep 17 00:00:00 2001
From: Steven Meyer <108885656+meyertst-aws@users.noreply.github.com>
Date: Wed, 20 Nov 2024 17:00:07 -0500
Subject: [PATCH 1/4] First commit for HealthLake
---
python/example_code/healthlake/README.md | 184 +++++++
.../healthlake/health_lake_wrapper.py | 138 +++++
.../example_code/healthlake/requirements.txt | 3 +
.../example_code/healthlake/test/conftest.py | 13 +
.../test/test_medical_imaging_basics.py | 482 ++++++++++++++++++
5 files changed, 820 insertions(+)
create mode 100644 python/example_code/healthlake/README.md
create mode 100644 python/example_code/healthlake/health_lake_wrapper.py
create mode 100644 python/example_code/healthlake/requirements.txt
create mode 100644 python/example_code/healthlake/test/conftest.py
create mode 100644 python/example_code/healthlake/test/test_medical_imaging_basics.py
diff --git a/python/example_code/healthlake/README.md b/python/example_code/healthlake/README.md
new file mode 100644
index 00000000000..46974e528ce
--- /dev/null
+++ b/python/example_code/healthlake/README.md
@@ -0,0 +1,184 @@
+# HealthImaging code examples for the SDK for Python
+
+## Overview
+
+Shows how to use the AWS SDK for Python (Boto3) to work with AWS HealthImaging.
+
+
+
+
+_HealthImaging is a HIPAA-eligible service that helps health care providers and their medical imaging ISV partners store, transform, and apply machine learning to medical images._
+
+## ⚠ Important
+
+* Running this code might result in charges to your AWS account. For more details, see [AWS Pricing](https://aws.amazon.com/pricing/) and [Free Tier](https://aws.amazon.com/free/).
+* Running the tests might result in charges to your AWS account.
+* We recommend that you grant your code least privilege. At most, grant only the minimum permissions required to perform the task. For more information, see [Grant least privilege](https://docs.aws.amazon.com/IAM/latest/UserGuide/best-practices.html#grant-least-privilege).
+* This code is not tested in every AWS Region. For more information, see [AWS Regional Services](https://aws.amazon.com/about-aws/global-infrastructure/regional-product-services).
+
+
+
+
+## Code examples
+
+### Prerequisites
+
+For prerequisites, see the [README](../../README.md#Prerequisites) in the `python` folder.
+
+Install the packages required by these examples by running the following in a virtual environment:
+
+```
+python -m pip install -r requirements.txt
+```
+
+
+
+
+### Get started
+
+- [Hello HealthImaging](imaging_set_and_frames_workflow/hello.py#L4) (`ListDatastores`)
+
+
+### Single actions
+
+Code excerpts that show you how to call individual service functions.
+
+- [CopyImageSet](health_lake_wrapper.py#L417)
+- [CreateDatastore](health_lake_wrapper.py#L31)
+- [DeleteDatastore](health_lake_wrapper.py#L104)
+- [DeleteImageSet](health_lake_wrapper.py#L489)
+- [GetDICOMImportJob](health_lake_wrapper.py#L158)
+- [GetDatastore](health_lake_wrapper.py#L54)
+- [GetImageFrame](health_lake_wrapper.py#L318)
+- [GetImageSet](health_lake_wrapper.py#L241)
+- [GetImageSetMetadata](health_lake_wrapper.py#L274)
+- [ListDICOMImportJobs](health_lake_wrapper.py#L183)
+- [ListDatastores](health_lake_wrapper.py#L79)
+- [ListImageSetVersions](health_lake_wrapper.py#L350)
+- [ListTagsForResource](health_lake_wrapper.py#L556)
+- [SearchImageSets](health_lake_wrapper.py#L211)
+- [StartDICOMImportJob](health_lake_wrapper.py#L124)
+- [TagResource](health_lake_wrapper.py#L514)
+- [UntagResource](health_lake_wrapper.py#L534)
+- [UpdateImageSetMetadata](health_lake_wrapper.py#L381)
+
+### Scenarios
+
+Code examples that show you how to accomplish a specific task by calling multiple
+functions within the same service.
+
+- [Get started with image sets and image frames](imaging_set_and_frames_workflow/imaging_set_and_frames.py)
+- [Tagging a data store](tagging_data_stores.py)
+- [Tagging an image set](tagging_image_sets.py)
+
+
+
+
+
+## Run the examples
+
+### Instructions
+
+
+
+
+
+#### Hello HealthImaging
+
+This example shows you how to get started using HealthImaging.
+
+```
+python imaging_set_and_frames_workflow/hello.py
+```
+
+
+#### Get started with image sets and image frames
+
+This example shows you how to import DICOM files and download image frames in HealthImaging.
+ The implementation is structured as a workflow command-line
+ application.
+
+
+- Set up resources for a DICOM import.
+- Import DICOM files into a data store.
+- Retrieve the image set IDs for the import job.
+- Retrieve the image frame IDs for the image sets.
+- Download, decode and verify the image frames.
+- Clean up resources.
+
+
+
+
+Start the example by running the following at a command prompt:
+
+```
+python imaging_set_and_frames_workflow/imaging_set_and_frames.py
+```
+
+
+
+
+
+#### Tagging a data store
+
+This example shows you how to tag a HealthImaging data store.
+
+
+
+
+
+Start the example by running the following at a command prompt:
+
+```
+python tagging_data_stores.py
+```
+
+
+
+
+
+#### Tagging an image set
+
+This example shows you how to tag a HealthImaging image set.
+
+
+
+
+
+Start the example by running the following at a command prompt:
+
+```
+python tagging_image_sets.py
+```
+
+
+
+
+
+### Tests
+
+⚠ Running tests might result in charges to your AWS account.
+
+
+To find instructions for running these tests, see the [README](../../README.md#Tests)
+in the `python` folder.
+
+
+
+
+
+
+## Additional resources
+
+- [HealthImaging Developer Guide](https://docs.aws.amazon.com/healthimaging/latest/devguide/what-is.html)
+- [HealthImaging API Reference](https://docs.aws.amazon.com/healthimaging/latest/APIReference/Welcome.html)
+- [SDK for Python HealthImaging reference](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/medical-imaging.html)
+
+
+
+
+---
+
+Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+
+SPDX-License-Identifier: Apache-2.0
\ No newline at end of file
diff --git a/python/example_code/healthlake/health_lake_wrapper.py b/python/example_code/healthlake/health_lake_wrapper.py
new file mode 100644
index 00000000000..f0941183b83
--- /dev/null
+++ b/python/example_code/healthlake/health_lake_wrapper.py
@@ -0,0 +1,138 @@
+# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+# SPDX-License-Identifier: Apache-2.0
+
+"""
+Purpose
+
+Shows how to use the AWS SDK for Python (Boto3) to manage and invoke AWS HealthImaging
+functions.
+"""
+
+from boto3 import client
+import logging
+
+import boto3
+from botocore.exceptions import ClientError
+
+import time
+
+logger = logging.getLogger(__name__)
+
+
+# snippet-start:[python.example_code.medical-imaging.HealthLakeWrapper]
+class HealthLakeWrapper:
+ def __init__(self, health_lake_client):
+ self.health_lake_client = health_lake_client
+
+
+ # snippet-start:[python.example_code.medical-imaging.HealthLakeWrapper.decl]
+ @classmethod
+ def from_client(cls) -> "HealthLakeWrapper":
+ """
+ Creates a HealthLakeWrapper instance with a default AWS HealthLake client.
+
+ :return: An instance of HealthLakeWrapper initialized with the default HealthLake client.
+ """
+ kms_client = boto3.client("healthlake")
+ return cls(kms_client)
+
+ # snippet-end:[python.example_code.medical-imaging.HealthLakeWrapper.decl]
+
+ # snippet-start:[python.example_code.medical-imaging.CreateFHIRDatastore]
+ def create_fihr_datastore(self, datastore_name: str, sse_configuration : dict[str, any] = None,
+ identity_provider_configuration : dict[str, any] = None) -> str:
+ """
+ Creates a new HealthLake datastore.
+ When creating a SMART on FHIR datastore, the following parameters are required:
+ - sse_configuration: The server-side encryption configuration for a SMART on FHIR-enabled data store.
+ - identity_provider_configuration: The identity provider configuration for a SMART on FHIR-enabled data store.
+
+ :param datastore_name: The name of the data store.
+ :param sse_configuration: The server-side encryption configuration for a SMART on FHIR-enabled data store.
+ :param identity_provider_configuration: The identity provider configuration for a SMART on FHIR-enabled data store.
+ :return: The datastore ID.
+ """
+ try:
+ parameters = {
+ 'DatastoreName': datastore_name,
+ 'DatastoreTypeVersion' : 'R4'
+ }
+ if sse_configuration is not None and identity_provider_configuration is not None:
+ # Creating a SMART on FHIR-enabled data store
+ parameters['SseConfiguration'] = sse_configuration
+ parameters['IdentityProviderConfiguration'] = identity_provider_configuration
+
+ response = self.health_lake_client.create_fhir_datastore(**parameters)
+ return response['datastoreId']
+ except ClientError as err:
+ logger.exception("Couldn't create datastore %s. Here's why",
+ datastore_name, err.response["Error"]["Message"])
+ raise
+
+ # snippet-end:[python.example_code.medical-imaging.CreateFHIRDatastore]
+
+ # snippet-start:[python.example_code.medical-imaging.DescribeFHIRDatastore]
+ def describe_fhir_datastore(self, datastore_id: str) -> dict[str, any]:
+ """
+ Describes a HealthLake datastore.
+ :param datastore_id: The datastore ID.
+ :return: The datastore description.
+ """
+ try:
+ response = self.health_lake_client.describe_fhir_datastore(
+ DatastoreId=datastore_id)
+ return response['DatastoreProperties']
+ except ClientError as err:
+ logger.exception("Couldn't describe datastore with ID %s. Here's why",
+ datastore_id, err.response["Error"]["Message"])
+ raise
+
+ # snippet-end:[python.example_code.medical-imaging.HealthLakeWrapper]
+
+ def wait_datastore_active(self, datastore_id: str) -> None:
+ """
+ Waits for a HealthLake datastore to become active.
+ :param datastore_id: The datastore ID.
+ """
+ counter = 0
+ max_count_minutes = 40 # It can take a while to create a datastore, so we'll wait up to 40 minutes.
+ data_store_active = False
+ while counter < max_count_minutes:
+ datastore = self.health_lake_client.describe_fhir_datastore(
+ DatastoreId=datastore_id)
+ if datastore["DatastoreProperties"]["DatastoreStatus"] == "ACTIVE":
+ data_store_active = True
+ break
+ else:
+ counter += 1
+ time.sleep(60)
+
+ if data_store_active :
+ logger.info("Datastore with ID %s is active after %d minutes.", datastore_id, counter)
+ else:
+ raise ClientError("Datastore with ID %s is not active after %d minutes.", datastore_id, counter)
+
+ try:
+ waiter = self.health_lake_client.get_waiter("datastore_active")
+ waiter.wait(DatastoreId=datastore_id)
+ except ClientError as err:
+ logger.exception("Data store with ID %s failed to become active. Here's why",
+ datastore_id, err.response["Error"]["Message"])
+ raise
+
+ def health_lake_demo(self) -> None:
+ use_smart_data_store = False
+ testing_code = True
+
+ datastore_name = "health_imaging_datastore"
+ if use_smart_data_store:
+ pass
+ else:
+ data_store_id = self.health_imaging_client.list_datastores(
+ maxResults=1
+ )['datastoreResults'][0]['datastoreId']
+
+
+if __name__ == "__main__":
+ health_lake_wrapper = HealthLakeWrapper.from_client()
+
diff --git a/python/example_code/healthlake/requirements.txt b/python/example_code/healthlake/requirements.txt
new file mode 100644
index 00000000000..624c2cdf438
--- /dev/null
+++ b/python/example_code/healthlake/requirements.txt
@@ -0,0 +1,3 @@
+boto3>=1.34.149
+pytest>=7.2.1
+botocore>=1.34.149
\ No newline at end of file
diff --git a/python/example_code/healthlake/test/conftest.py b/python/example_code/healthlake/test/conftest.py
new file mode 100644
index 00000000000..2d0663bfbd2
--- /dev/null
+++ b/python/example_code/healthlake/test/conftest.py
@@ -0,0 +1,13 @@
+# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+# SPDX-License-Identifier: Apache-2.0
+
+"""
+Contains common test fixtures used to run AWS HealthImaging
+tests.
+"""
+
+import sys
+
+# This is needed so Python can find test_tools on the path.
+sys.path.append("../..")
+from test_tools.fixtures.common import *
diff --git a/python/example_code/healthlake/test/test_medical_imaging_basics.py b/python/example_code/healthlake/test/test_medical_imaging_basics.py
new file mode 100644
index 00000000000..7956e8978ec
--- /dev/null
+++ b/python/example_code/healthlake/test/test_medical_imaging_basics.py
@@ -0,0 +1,482 @@
+# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+# SPDX-License-Identifier: Apache-2.0
+
+"""
+Unit tests for medical_imaging_basics functions.
+"""
+
+import os
+
+import boto3
+import pytest
+from botocore.exceptions import ClientError
+
+from health_lake_wrapper import MedicalImagingWrapper
+
+
+@pytest.mark.parametrize("error_code", [None, "TestException"])
+def test_create_datastore(make_stubber, error_code):
+ medical_imaging_client = boto3.client("medical-imaging")
+ medical_imaging_stubber = make_stubber(medical_imaging_client)
+ wrapper = MedicalImagingWrapper(medical_imaging_client)
+ datastore_name = "test-datastore"
+ datastore_id = "abcdedf1234567890abcdef123456789"
+
+ medical_imaging_stubber.stub_create_datastore(
+ datastore_name, datastore_id, error_code=error_code
+ )
+
+ if error_code is None:
+ got_datastore_id = wrapper.create_datastore(datastore_name)
+ assert got_datastore_id == datastore_id
+ else:
+ with pytest.raises(ClientError) as exc_info:
+ wrapper.create_datastore(datastore_name)
+ assert exc_info.value.response["Error"]["Code"] == error_code
+
+
+@pytest.mark.parametrize("error_code", [None, "TestException"])
+def test_get_datastore_properties(make_stubber, error_code):
+ medical_imaging_client = boto3.client("medical-imaging")
+ medical_imaging_stubber = make_stubber(medical_imaging_client)
+ wrapper = MedicalImagingWrapper(medical_imaging_client)
+ datastore_id = "abcdedf1234567890abcdef123456789"
+
+ medical_imaging_stubber.stub_get_datastore_properties(
+ datastore_id, error_code=error_code
+ )
+
+ if error_code is None:
+ got_properties = wrapper.get_datastore_properties(datastore_id)
+ assert got_properties["datastoreId"] == datastore_id
+ else:
+ with pytest.raises(ClientError) as exc_info:
+ wrapper.get_datastore_properties(datastore_id)
+ assert exc_info.value.response["Error"]["Code"] == error_code
+
+
+@pytest.mark.parametrize("error_code", [None, "TestException"])
+def test_list_datastores(make_stubber, error_code):
+ medical_imaging_client = boto3.client("medical-imaging")
+ medical_imaging_stubber = make_stubber(medical_imaging_client)
+ wrapper = MedicalImagingWrapper(medical_imaging_client)
+ datastore_id = "abcdedf1234567890abcdef123456789"
+ medical_imaging_stubber.stub_list_datastores(datastore_id, error_code=error_code)
+
+ if error_code is None:
+ datastores = wrapper.list_datastores()
+ assert datastores[0]["datastoreId"] == datastore_id
+ else:
+ with pytest.raises(ClientError) as exc_info:
+ wrapper.list_datastores()
+ assert exc_info.value.response["Error"]["Code"] == error_code
+
+
+@pytest.mark.parametrize("error_code", [None, "TestException"])
+def test_delete_datastore(make_stubber, error_code):
+ medical_imaging_client = boto3.client("medical-imaging")
+ medical_imaging_stubber = make_stubber(medical_imaging_client)
+ wrapper = MedicalImagingWrapper(medical_imaging_client)
+ datastore_id = "abcdedf1234567890abcdef123456789"
+
+ medical_imaging_stubber.stub_delete_data_store(datastore_id, error_code=error_code)
+
+ if error_code is None:
+ wrapper.delete_datastore(datastore_id)
+ else:
+ with pytest.raises(ClientError) as exc_info:
+ wrapper.delete_datastore(datastore_id)
+ assert exc_info.value.response["Error"]["Code"] == error_code
+
+
+@pytest.mark.parametrize("error_code", [None, "TestException"])
+def test_start_dicom_import_job(make_stubber, error_code):
+ medical_imaging_client = boto3.client("medical-imaging")
+ medical_imaging_stubber = make_stubber(medical_imaging_client)
+ wrapper = MedicalImagingWrapper(medical_imaging_client)
+ job_id = "cccccc1234567890abcdef123456789"
+ job_name = "job_name"
+ datastore_id = "abcdedf1234567890abcdef123456789"
+ role_arn = "arn:aws:iam::111111111111:role/dicom_import"
+ input_s3_uri = "s3://healthimaging-source/CRStudy/"
+ output_s3_uri = "s3://healthimaging-destination/CRStudy/"
+
+ medical_imaging_stubber.stub_start_dicom_import_job(
+ job_name,
+ datastore_id,
+ role_arn,
+ input_s3_uri,
+ output_s3_uri,
+ job_id,
+ error_code=error_code,
+ )
+
+ if error_code is None:
+ result = wrapper.start_dicom_import_job(
+ job_name, datastore_id, role_arn, input_s3_uri, output_s3_uri
+ )
+ assert result == job_id
+ else:
+ with pytest.raises(ClientError) as exc_info:
+ wrapper.start_dicom_import_job(
+ job_name, datastore_id, role_arn, input_s3_uri, output_s3_uri
+ )
+ assert exc_info.value.response["Error"]["Code"] == error_code
+
+
+@pytest.mark.parametrize("error_code", [None, "TestException"])
+def test_get_dicom_import_job(make_stubber, error_code):
+ medical_imaging_client = boto3.client("medical-imaging")
+ medical_imaging_stubber = make_stubber(medical_imaging_client)
+ wrapper = MedicalImagingWrapper(medical_imaging_client)
+ datastore_id = "abcdedf1234567890abcdef123456789"
+ job_id = "cccccc1234567890abcdef123456789"
+ job_status = "TESTING"
+
+ medical_imaging_stubber.stub_get_dicom_import_job(
+ job_id, datastore_id, job_status, error_code=error_code
+ )
+
+ if error_code is None:
+ result = wrapper.get_dicom_import_job(datastore_id, job_id)
+ assert result["jobStatus"] == job_status
+ else:
+ with pytest.raises(ClientError) as exc_info:
+ wrapper.get_dicom_import_job(datastore_id, job_id)
+ assert exc_info.value.response["Error"]["Code"] == error_code
+
+
+@pytest.mark.parametrize("error_code", [None, "TestException"])
+def test_list_dicom_import_jobs(make_stubber, error_code):
+ medical_imaging_client = boto3.client("medical-imaging")
+ medical_imaging_stubber = make_stubber(medical_imaging_client)
+ wrapper = MedicalImagingWrapper(medical_imaging_client)
+ datastore_id = "abcdedf1234567890abcdef123456789"
+
+ medical_imaging_stubber.stub_list_dicom_import_jobs(
+ datastore_id, error_code=error_code
+ )
+
+ if error_code is None:
+ wrapper.list_dicom_import_jobs(datastore_id)
+
+ else:
+ with pytest.raises(ClientError) as exc_info:
+ wrapper.list_dicom_import_jobs(datastore_id)
+ assert exc_info.value.response["Error"]["Code"] == error_code
+
+
+@pytest.mark.parametrize("error_code", [None, "TestException"])
+def test_search_mage_sets(make_stubber, error_code):
+ medical_imaging_client = boto3.client("medical-imaging")
+ medical_imaging_stubber = make_stubber(medical_imaging_client)
+ wrapper = MedicalImagingWrapper(medical_imaging_client)
+ datastore_id = "abcdedf1234567890abcdef123456789"
+ search_filter = {
+ "filters": [
+ {
+ "values": [
+ {"createdAt": "2023-09-13T14:13:39.302000-04:00"},
+ {"createdAt": "2023-09-13T14:13:39.302000-04:00"},
+ ],
+ "operator": "BETWEEN",
+ }
+ ]
+ }
+ medical_imaging_stubber.stub_search_image_sets(
+ datastore_id, search_filter, error_code=error_code
+ )
+
+ if error_code is None:
+ wrapper.search_image_sets(datastore_id, search_filter)
+
+ else:
+ with pytest.raises(ClientError) as exc_info:
+ wrapper.search_image_sets(datastore_id, search_filter)
+ assert exc_info.value.response["Error"]["Code"] == error_code
+
+
+@pytest.mark.parametrize("error_code", [None, "TestException"])
+def test_get_image_set(make_stubber, error_code):
+ medical_imaging_client = boto3.client("medical-imaging")
+ medical_imaging_stubber = make_stubber(medical_imaging_client)
+ wrapper = MedicalImagingWrapper(medical_imaging_client)
+ datastore_id = "abcdedf1234567890abcdef123456789"
+ image_set_id = "cccccc1234567890abcdef123456789"
+ version_id = "1"
+
+ medical_imaging_stubber.stub_get_image_set(
+ datastore_id, image_set_id, version_id, error_code=error_code
+ )
+
+ if error_code is None:
+ wrapper.get_image_set(datastore_id, image_set_id, version_id)
+
+ else:
+ with pytest.raises(ClientError) as exc_info:
+ wrapper.get_image_set(datastore_id, image_set_id, version_id)
+ assert exc_info.value.response["Error"]["Code"] == error_code
+
+
+@pytest.mark.parametrize("error_code", [None, "TestException"])
+def test_get_image_set_metadata(make_stubber, error_code):
+ medical_imaging_client = boto3.client("medical-imaging")
+ medical_imaging_stubber = make_stubber(medical_imaging_client)
+ wrapper = MedicalImagingWrapper(medical_imaging_client)
+ datastore_id = "abcdedf1234567890abcdef123456789"
+ image_set_id = "cccccc1234567890abcdef123456789"
+ test_file = "med-imag-test_file_1234.gzip"
+ medical_imaging_stubber.stub_get_image_set_metadata(
+ datastore_id, image_set_id, error_code=error_code
+ )
+
+ if error_code is None:
+ wrapper.get_image_set_metadata(test_file, datastore_id, image_set_id)
+ assert os.path.exists(test_file)
+ os.remove(test_file)
+
+ else:
+ with pytest.raises(ClientError) as exc_info:
+ wrapper.get_image_set_metadata(test_file, datastore_id, image_set_id)
+ assert exc_info.value.response["Error"]["Code"] == error_code
+
+
+@pytest.mark.parametrize("error_code", [None, "TestException"])
+def test_get_pixel_data(make_stubber, error_code):
+ medical_imaging_client = boto3.client("medical-imaging")
+ medical_imaging_stubber = make_stubber(medical_imaging_client)
+ wrapper = MedicalImagingWrapper(medical_imaging_client)
+ datastore_id = "abcdedf1234567890abcdef123456789"
+ image_set_id = "cccccc1234567890abcdef123456789"
+ image_frame_id = "cccccc1234567890abcdef123456789"
+ test_file = "med-imag-test_file_789654.jph"
+ medical_imaging_stubber.stub_get_pixel_data(
+ datastore_id, image_set_id, image_frame_id, error_code=error_code
+ )
+
+ if error_code is None:
+ wrapper.get_pixel_data(test_file, datastore_id, image_set_id, image_frame_id)
+ assert os.path.exists(test_file)
+ os.remove(test_file)
+
+ else:
+ with pytest.raises(ClientError) as exc_info:
+ wrapper.get_pixel_data(
+ test_file, datastore_id, image_set_id, image_frame_id
+ )
+ assert exc_info.value.response["Error"]["Code"] == error_code
+
+
+@pytest.mark.parametrize("error_code", [None, "TestException"])
+def test_list_image_set_versions(make_stubber, error_code):
+ medical_imaging_client = boto3.client("medical-imaging")
+ medical_imaging_stubber = make_stubber(medical_imaging_client)
+ wrapper = MedicalImagingWrapper(medical_imaging_client)
+ datastore_id = "abcdedf1234567890abcdef123456789"
+ image_set_id = "cccccc1234567890abcdef123456789"
+
+ medical_imaging_stubber.stub_list_image_set_versions(
+ datastore_id, image_set_id, error_code=error_code
+ )
+
+ if error_code is None:
+ wrapper.list_image_set_versions(datastore_id, image_set_id)
+
+ else:
+ with pytest.raises(ClientError) as exc_info:
+ wrapper.list_image_set_versions(datastore_id, image_set_id)
+ assert exc_info.value.response["Error"]["Code"] == error_code
+
+
+@pytest.mark.parametrize("error_code", [None, "TestException"])
+def test_update_image_set_metadata(make_stubber, error_code):
+ medical_imaging_client = boto3.client("medical-imaging")
+ medical_imaging_stubber = make_stubber(medical_imaging_client)
+ wrapper = MedicalImagingWrapper(medical_imaging_client)
+ datastore_id = "abcdedf1234567890abcdef123456789"
+ image_set_id = "cccccc1234567890abcdef123456789"
+ version_id = "1"
+ force = False
+ metadata = {
+ "DICOMUpdates": {
+ "updatableAttributes": '{"SchemaVersion":1.1,"Patient":{"DICOM":{"PatientName":"Garcia^Gloria"}}}'
+ }
+ }
+
+ medical_imaging_stubber.stub_update_image_set_metadata(
+ datastore_id, image_set_id, version_id, metadata, force, error_code=error_code
+ )
+
+ if error_code is None:
+ wrapper.update_image_set_metadata(
+ datastore_id, image_set_id, version_id, metadata, force
+ )
+
+ else:
+ with pytest.raises(ClientError) as exc_info:
+ wrapper.update_image_set_metadata(
+ datastore_id, image_set_id, version_id, metadata, force
+ )
+ assert exc_info.value.response["Error"]["Code"] == error_code
+
+
+@pytest.mark.parametrize("error_code", [None, "TestException"])
+def test_copy_image_set_without_destination(make_stubber, error_code):
+ medical_imaging_client = boto3.client("medical-imaging")
+ medical_imaging_stubber = make_stubber(medical_imaging_client)
+ wrapper = MedicalImagingWrapper(medical_imaging_client)
+ datastore_id = "abcdedf1234567890abcdef123456789"
+ image_set_id = "cccccc1234567890abcdef123456789"
+ version_id = "1"
+ new_image_set_id = "cccccc1234567890abcdef123456789"
+
+ medical_imaging_stubber.stub_copy_image_set_without_destination(
+ datastore_id, image_set_id, version_id, new_image_set_id, error_code=error_code
+ )
+
+ if error_code is None:
+ wrapper.copy_image_set(datastore_id, image_set_id, version_id)
+
+ else:
+ with pytest.raises(ClientError) as exc_info:
+ wrapper.copy_image_set(datastore_id, image_set_id, version_id)
+ assert exc_info.value.response["Error"]["Code"] == error_code
+
+
+@pytest.mark.parametrize("error_code", [None, "TestException"])
+def test_copy_image_set_with_destination(make_stubber, error_code):
+ medical_imaging_client = boto3.client("medical-imaging")
+ medical_imaging_stubber = make_stubber(medical_imaging_client)
+ wrapper = MedicalImagingWrapper(medical_imaging_client)
+ datastore_id = "abcdedf1234567890abcdef123456789"
+ image_set_id = "cccccc1234567890abcdef123456789"
+ version_id = "1"
+ destination_image_set_id = "cccccc1234567890abcdef123456789"
+ destination_version_id = "1"
+ force = True
+ subset_Id = "cccccc1234567890abcdef123456789"
+
+ medical_imaging_stubber.stub_copy_image_set_with_destination(
+ datastore_id,
+ image_set_id,
+ version_id,
+ destination_image_set_id,
+ destination_version_id,
+ force,
+ subset_Id,
+ error_code=error_code,
+ )
+
+ if error_code is None:
+ wrapper.copy_image_set(
+ datastore_id,
+ image_set_id,
+ version_id,
+ destination_image_set_id,
+ destination_version_id,
+ force,
+ [subset_Id],
+ )
+
+ else:
+ with pytest.raises(ClientError) as exc_info:
+ wrapper.copy_image_set(
+ datastore_id,
+ image_set_id,
+ version_id,
+ destination_image_set_id,
+ destination_version_id,
+ force,
+ [subset_Id],
+ )
+ assert exc_info.value.response["Error"]["Code"] == error_code
+
+
+@pytest.mark.parametrize("error_code", [None, "TestException"])
+def test_delete_image_set(make_stubber, error_code):
+ medical_imaging_client = boto3.client("medical-imaging")
+ medical_imaging_stubber = make_stubber(medical_imaging_client)
+ wrapper = MedicalImagingWrapper(medical_imaging_client)
+ datastore_id = "abcdedf1234567890abcdef123456789"
+ image_set_id = "cccccc1234567890abcdef123456789"
+
+ medical_imaging_stubber.stub_delete_image_set(
+ datastore_id, image_set_id, error_code=error_code
+ )
+
+ if error_code is None:
+ wrapper.delete_image_set(datastore_id, image_set_id)
+
+ else:
+ with pytest.raises(ClientError) as exc_info:
+ wrapper.delete_image_set(datastore_id, image_set_id)
+ assert exc_info.value.response["Error"]["Code"] == error_code
+
+
+@pytest.mark.parametrize("error_code", [None, "TestException"])
+def test_tag_resource(make_stubber, error_code):
+ medical_imaging_client = boto3.client("medical-imaging")
+ medical_imaging_stubber = make_stubber(medical_imaging_client)
+ wrapper = MedicalImagingWrapper(medical_imaging_client)
+ resource_arn = (
+ "arn:aws:medical-imaging:us-east-1:123456789012:datastore/abcdedf1234567890abcdef123456789/image"
+ "-set/cccccc1234567890abcdef123456789 "
+ )
+ tags = {"test-key": "test-value"}
+
+ medical_imaging_stubber.stub_tag_resource(resource_arn, tags, error_code=error_code)
+
+ if error_code is None:
+ wrapper.tag_resource(resource_arn, tags)
+
+ else:
+ with pytest.raises(ClientError) as exc_info:
+ wrapper.tag_resource(resource_arn, tags)
+ assert exc_info.value.response["Error"]["Code"] == error_code
+
+
+@pytest.mark.parametrize("error_code", [None, "TestException"])
+def test_untag_resource(make_stubber, error_code):
+ medical_imaging_client = boto3.client("medical-imaging")
+ medical_imaging_stubber = make_stubber(medical_imaging_client)
+ wrapper = MedicalImagingWrapper(medical_imaging_client)
+ resource_arn = (
+ "arn:aws:medical-imaging:us-east-1:123456789012:datastore/abcdedf1234567890abcdef123456789/image"
+ "-set/cccccc1234567890abcdef123456789 "
+ )
+ tag_keys = ["test-key"]
+
+ medical_imaging_stubber.stub_untag_resource(
+ resource_arn, tag_keys, error_code=error_code
+ )
+
+ if error_code is None:
+ wrapper.untag_resource(resource_arn, tag_keys)
+
+ else:
+ with pytest.raises(ClientError) as exc_info:
+ wrapper.untag_resource(resource_arn, tag_keys)
+ assert exc_info.value.response["Error"]["Code"] == error_code
+
+
+@pytest.mark.parametrize("error_code", [None, "TestException"])
+def test_list_tags_for_resource(make_stubber, error_code):
+ medical_imaging_client = boto3.client("medical-imaging")
+ medical_imaging_stubber = make_stubber(medical_imaging_client)
+ wrapper = MedicalImagingWrapper(medical_imaging_client)
+ resource_arn = (
+ "arn:aws:medical-imaging:us-east-1:123456789012:datastore/abcdedf1234567890abcdef123456789/image"
+ "-set/cccccc1234567890abcdef123456789 "
+ )
+
+ medical_imaging_stubber.stub_list_tags_for_resource(
+ resource_arn, error_code=error_code
+ )
+
+ if error_code is None:
+ wrapper.list_tags_for_resource(resource_arn)
+
+ else:
+ with pytest.raises(ClientError) as exc_info:
+ wrapper.list_tags_for_resource(resource_arn)
+ assert exc_info.value.response["Error"]["Code"] == error_code
From 1f0b213b035bed950d83dbd7648bb7f6499c8377 Mon Sep 17 00:00:00 2001
From: Steven Meyer <108885656+meyertst-aws@users.noreply.github.com>
Date: Thu, 21 Nov 2024 12:38:10 -0500
Subject: [PATCH 2/4] finished except for metadata and tests
---
.doc_gen/metadata/healthlake_metadata.yaml | 19 +
.../healthlake/health_lake_wrapper.py | 643 ++++++++++++++++--
2 files changed, 610 insertions(+), 52 deletions(-)
create mode 100644 .doc_gen/metadata/healthlake_metadata.yaml
diff --git a/.doc_gen/metadata/healthlake_metadata.yaml b/.doc_gen/metadata/healthlake_metadata.yaml
new file mode 100644
index 00000000000..04108e7ba25
--- /dev/null
+++ b/.doc_gen/metadata/healthlake_metadata.yaml
@@ -0,0 +1,19 @@
+# zexi 0.4.0
+healthlake_CreateFHIRDatastore:
+ languages:
+ Python:
+ versions:
+ - sdk_version: 3
+ github: python/example_code/healthlake
+ excerpts:
+ - description:
+ snippet_tags:
+ - python.example_code.healthlake.HealthLakeWrapper.decl
+ - python.example_code.healthlake.CreateFHIRDatastore
+ - description: >
+ The following code shows the parameters for SMART on
+ FHIR-enabled HealthLake data store.
+ snippet_tags:
+ - python.example_code.healthlake.CreateFHIRDatastore.smart
+ services:
+ healthlake: {CreateFHIRDatastore}
diff --git a/python/example_code/healthlake/health_lake_wrapper.py b/python/example_code/healthlake/health_lake_wrapper.py
index f0941183b83..c7ad67a6760 100644
--- a/python/example_code/healthlake/health_lake_wrapper.py
+++ b/python/example_code/healthlake/health_lake_wrapper.py
@@ -4,28 +4,29 @@
"""
Purpose
-Shows how to use the AWS SDK for Python (Boto3) to manage and invoke AWS HealthImaging
+Shows how to use the AWS SDK for Python (Boto3) to manage and invoke AWS HealthLake
functions.
"""
+from datetime import datetime, timedelta
+from importlib.metadata import metadata
from boto3 import client
import logging
+import json
import boto3
from botocore.exceptions import ClientError
-
import time
logger = logging.getLogger(__name__)
-# snippet-start:[python.example_code.medical-imaging.HealthLakeWrapper]
+# snippet-start:[python.example_code.healthlake.HealthLakeWrapper]
class HealthLakeWrapper:
- def __init__(self, health_lake_client):
+ def __init__(self, health_lake_client: client):
self.health_lake_client = health_lake_client
-
- # snippet-start:[python.example_code.medical-imaging.HealthLakeWrapper.decl]
+ # snippet-start:[python.example_code.healthlake.HealthLakeWrapper.decl]
@classmethod
def from_client(cls) -> "HealthLakeWrapper":
"""
@@ -36,11 +37,15 @@ def from_client(cls) -> "HealthLakeWrapper":
kms_client = boto3.client("healthlake")
return cls(kms_client)
- # snippet-end:[python.example_code.medical-imaging.HealthLakeWrapper.decl]
+ # snippet-end:[python.example_code.healthlake.HealthLakeWrapper.decl]
- # snippet-start:[python.example_code.medical-imaging.CreateFHIRDatastore]
- def create_fihr_datastore(self, datastore_name: str, sse_configuration : dict[str, any] = None,
- identity_provider_configuration : dict[str, any] = None) -> str:
+ # snippet-start:[python.example_code.healthlake.CreateFHIRDatastore]
+ def create_fihr_datastore(
+ self,
+ datastore_name: str,
+ sse_configuration: dict[str, any] = None,
+ identity_provider_configuration: dict[str, any] = None,
+ ) -> dict[str, str]:
"""
Creates a new HealthLake datastore.
When creating a SMART on FHIR datastore, the following parameters are required:
@@ -50,28 +55,33 @@ def create_fihr_datastore(self, datastore_name: str, sse_configuration : dict[st
:param datastore_name: The name of the data store.
:param sse_configuration: The server-side encryption configuration for a SMART on FHIR-enabled data store.
:param identity_provider_configuration: The identity provider configuration for a SMART on FHIR-enabled data store.
- :return: The datastore ID.
+ :return: A dictionary containing the data store information.
"""
try:
- parameters = {
- 'DatastoreName': datastore_name,
- 'DatastoreTypeVersion' : 'R4'
- }
- if sse_configuration is not None and identity_provider_configuration is not None:
+ parameters = {"DatastoreName": datastore_name, "DatastoreTypeVersion": "R4"}
+ if (
+ sse_configuration is not None
+ and identity_provider_configuration is not None
+ ):
# Creating a SMART on FHIR-enabled data store
- parameters['SseConfiguration'] = sse_configuration
- parameters['IdentityProviderConfiguration'] = identity_provider_configuration
+ parameters["SseConfiguration"] = sse_configuration
+ parameters[
+ "IdentityProviderConfiguration"
+ ] = identity_provider_configuration
response = self.health_lake_client.create_fhir_datastore(**parameters)
- return response['datastoreId']
+ return response
except ClientError as err:
- logger.exception("Couldn't create datastore %s. Here's why",
- datastore_name, err.response["Error"]["Message"])
+ logger.exception(
+ "Couldn't create datastore %s. Here's why",
+ datastore_name,
+ err.response["Error"]["Message"],
+ )
raise
- # snippet-end:[python.example_code.medical-imaging.CreateFHIRDatastore]
+ # snippet-end:[python.example_code.healthlake.CreateFHIRDatastore]
- # snippet-start:[python.example_code.medical-imaging.DescribeFHIRDatastore]
+ # snippet-start:[python.example_code.healthlake.DescribeFHIRDatastore]
def describe_fhir_datastore(self, datastore_id: str) -> dict[str, any]:
"""
Describes a HealthLake datastore.
@@ -80,14 +90,354 @@ def describe_fhir_datastore(self, datastore_id: str) -> dict[str, any]:
"""
try:
response = self.health_lake_client.describe_fhir_datastore(
- DatastoreId=datastore_id)
- return response['DatastoreProperties']
+ DatastoreId=datastore_id
+ )
+ return response["DatastoreProperties"]
+ except ClientError as err:
+ logger.exception(
+ "Couldn't describe datastore with ID %s. Here's why",
+ datastore_id,
+ err.response["Error"]["Message"],
+ )
+ raise
+
+ # snippet-start:[python.example_code.healthlake.ListFHIRDatastores]
+ def list_fhir_datastores(self) -> list[dict[str, any]]:
+ """
+ Lists all HealthLake datastores.
+ :return: A list of datastore descriptions.
+ """
+ try:
+ next_token = None
+ datastores = []
+ while True:
+ parameters = {}
+ if next_token is not None:
+ parameters["NextToken"] = next_token
+ response = self.health_lake_client.list_fhir_datastores(**parameters)
+ datastores.extend(response["DatastorePropertiesList"])
+ if "NextToken" in response:
+ next_token = response["NextToken"]
+ else:
+ break
+ response = self.health_lake_client.list_fhir_datastores()
+ return response["DatastorePropertiesList"]
+ except ClientError as err:
+ logger.exception(
+ "Couldn't list datastores. Here's why", err.response["Error"]["Message"]
+ )
+ raise
+
+ # snippet-start:[python.example_code.healthlake.DeleteFHIRDatastore]
+ def delete_fhir_datastore(self, datastore_id: str) -> None:
+ """
+ Deletes a HealthLake datastore.
+ :param datastore_id: The datastore ID.
+ """
+ try:
+ self.health_lake_client.delete_fhir_datastore(DatastoreId=datastore_id)
+ except ClientError as err:
+ logger.exception(
+ "Couldn't delete datastore with ID %s. Here's why",
+ datastore_id,
+ err.response["Error"]["Message"],
+ )
+ raise
+
+ # snippet-end:[python.example_code.healthlake.DeleteFHIRDatastore]
+
+ # snippet-start:[python.example_code.healthlake.StartFHIRImportJob]
+ def start_fihr_import_job(
+ self,
+ job_name: str,
+ datastore_id: str,
+ input_s3_uri: str,
+ job_output_s3_uri: str,
+ kms_key_id: str,
+ data_access_role_arn: str,
+ ) -> dict[str, str]:
+ """
+ Starts a HealthLake import job.
+ :param job_name: The import job name.
+ :param datastore_id: The datastore ID.
+ :param input_s3_uri: The input S3 URI.
+ :param job_output_s3_uri: The job output S3 URI.
+ :param kms_key_id: The KMS key ID associated with the output S3 bucket.
+ :param data_access_role_arn: The data access role ARN.
+ :return: The import job.
+ """
+ try:
+ response = self.health_lake_client.start_fhir_import_job(
+ JobName=job_name,
+ InputDataConfig={"S3Uri": input_s3_uri},
+ JobOutputDataConfig={
+ "S3Configuration": {
+ "S3Uri": job_output_s3_uri,
+ "KmsKeyId": kms_key_id,
+ }
+ },
+ DataAccessRoleArn=data_access_role_arn,
+ DatastoreId=datastore_id,
+ )
+ return response
+ except ClientError as err:
+ logger.exception(
+ "Couldn't start import job. Here's why",
+ err.response["Error"]["Message"],
+ )
+ raise
+
+ # snippet-end:[python.example_code.healthlake.StartFHIRImportJob]
+
+ # snippet-start:[python.example_code.healthlake.DescribeFHIRImportJob]
+ def describe_fihr_import_job(
+ self, datastore_id: str, job_id: str
+ ) -> dict[str, any]:
+ """
+ Describes a HealthLake import job.
+ :param datastore_id: The datastore ID.
+ :param job_id: The import job ID.
+ :return: The import job description.
+ """
+ try:
+ response = self.health_lake_client.describe_fhir_import_job(
+ DatastoreId=datastore_id, JobId=job_id
+ )
+ return response["ImportJobProperties"]
+ except ClientError as err:
+ logger.exception(
+ "Couldn't describe import job with ID %s. Here's why",
+ job_id,
+ err.response["Error"]["Message"],
+ )
+ raise
+
+ # snippet-end:[python.example_code.healthlake.DescribeFHIRImportJob]
+
+ # snippet-start:[python.example_code.healthlake.ListFHIRDatastoreImportJobs]
+ def list_fhir_import_jobs(
+ self,
+ datastore_id: str,
+ job_name: str = None,
+ job_status: str = None,
+ submitted_before: datetime = None,
+ submitted_after: datetime = None,
+ ) -> list[dict[str, any]]:
+ """
+ Lists HealthLake import jobs satisfying the conditions.
+ :param datastore_id: The datastore ID.
+ :param job_name: The import job name.
+ :param job_status: The import job status.
+ :param submitted_before: The import job submitted before the specified date.
+ :param submitted_after: The import job submitted after the specified date.
+ :return: A list of import jobs.
+ """
+ try:
+ parameters = {"DatastoreId": datastore_id}
+ if job_name is not None:
+ parameters["JobName"] = job_name
+ if job_status is not None:
+ parameters["JobStatus"] = job_status
+ if submitted_before is not None:
+ parameters["SubmittedBefore"] = submitted_before
+ if submitted_after is not None:
+ parameters["SubmittedAfter"] = submitted_after
+ next_token = None
+ jobs = []
+ while True:
+ if next_token is not None:
+ parameters["NextToken"] = next_token
+ response = self.health_lake_client.list_fhir_import_jobs(**parameters)
+ jobs.extend(response["ImportJobPropertiesList"])
+ if "NextToken" in response:
+ next_token = response["NextToken"]
+ else:
+ break
+ return jobs
+ except ClientError as err:
+ logger.exception(
+ "Couldn't list import jobs. Here's why",
+ err.response["Error"]["Message"],
+ )
+ raise
+
+ # snippet-end:[python.example_code.healthlake.ListFHIRDatastoreImportJobs]
+
+ # snippet-start:[python.example_code.healthlake.StartFHIRExportJob]
+ def start_fhir_export_job(
+ self,
+ job_name: str,
+ datastore_id: str,
+ output_s3_uri: str,
+ kms_key_id: str,
+ data_access_role_arn: str,
+ ) -> dict[str, str]:
+ """
+ Starts a HealthLake export job.
+ :param job_name: The export job name.
+ :param datastore_id: The datastore ID.
+ :param output_s3_uri: The output S3 URI.
+ :param kms_key_id: The KMS key ID associated with the output S3 bucket.
+ :param data_access_role_arn: The data access role ARN.
+ :return: The export job.
+ """
+ try:
+ response = self.health_lake_client.start_fhir_export_job(
+ OutputDataConfig={
+ "S3Configuration": {"S3Uri": output_s3_uri, "KmsKeyId": kms_key_id}
+ },
+ DataAccessRoleArn=data_access_role_arn,
+ DatastoreId=datastore_id,
+ JobName=job_name,
+ )
+
+ return response
+ except ClientError as err:
+ logger.exception(
+ "Couldn't start export job. Here's why",
+ err.response["Error"]["Message"],
+ )
+ raise
+
+ # snippet-end:[python.example_code.healthlake.StartFHIRExportJob]
+
+ # snippet-start:[python.example_code.healthlake.DescribeFHIRExportJob]
+ def describe_fhir_export_job(
+ self, datastore_id: str, job_id: str
+ ) -> dict[str, any]:
+ """
+ Describes a HealthLake export job.
+ :param datastore_id: The datastore ID.
+ :param job_id: The export job ID.
+ :return: The export job description.
+ """
+ try:
+ response = self.health_lake_client.describe_fhir_export_job(
+ DatastoreId=datastore_id, JobId=job_id
+ )
+ return response["ExportJobProperties"]
+ except ClientError as err:
+ logger.exception(
+ "Couldn't describe export job with ID %s. Here's why",
+ job_id,
+ err.response["Error"]["Message"],
+ )
+ raise
+
+ # snippet-end:[python.example_code.healthlake.DescribeFHIRExportJob]
+
+ # snippet-start:[python.example_code.healthlake.ListFHIRExportJobs]
+ def list_fhir_export_jobs(
+ self,
+ datastore_id: str,
+ job_name: str = None,
+ job_status: str = None,
+ submitted_before: datetime = None,
+ submitted_after: datetime = None,
+ ) -> list[dict[str, any]]:
+ """
+ Lists HealthLake export jobs satisfying the conditions.
+ :param datastore_id: The datastore ID.
+ :param job_name: The export job name.
+ :param job_status: The export job status.
+ :param submitted_before: The export job submitted before the specified date.
+ :param submitted_after: The export job submitted after the specified date.
+ :return: A list of export jobs.
+ """
+ try:
+ parameters = {"DatastoreId": datastore_id}
+ if job_name is not None:
+ parameters["JobName"] = job_name
+ if job_status is not None:
+ parameters["JobStatus"] = job_status
+ if submitted_before is not None:
+ parameters["SubmittedBefore"] = submitted_before
+ if submitted_after is not None:
+ parameters["SubmittedAfter"] = submitted_after
+ next_token = None
+ jobs = []
+ while True:
+ if next_token is not None:
+ parameters["NextToken"] = next_token
+ response = self.health_lake_client.list_fhir_export_jobs(**parameters)
+ jobs.extend(response["ExportJobPropertiesList"])
+ if "NextToken" in response:
+ next_token = response["NextToken"]
+ else:
+ break
+ return jobs
+ except ClientError as err:
+ logger.exception(
+ "Couldn't list export jobs. Here's why",
+ err.response["Error"]["Message"],
+ )
+ raise
+
+ # snippet-end:[python.example_code.healthlake.ListFHIRExportJobs]
+
+ # snippet-start:[python.example_code.healthlake.TagResource]
+ def tag_resource(self, resource_arn: str, tags: list[dict[str, str]]) -> None:
+ """
+ Tags a HealthLake resource.
+ :param resource_arn: The resource ARN.
+ :param tags: The tags to add to the resource.
+ """
+ try:
+ self.health_lake_client.tag_resource(ResourceARN=resource_arn, Tags=tags)
+ except ClientError as err:
+ logger.exception(
+ "Couldn't tag resource %s. Here's why",
+ resource_arn,
+ err.response["Error"]["Message"],
+ )
+ raise
+
+ # snippet-end:[python.example_code.healthlake.TagResource]
+
+ # snippet-start:[python.example_code.healthlake.ListTagsForResource]
+ def list_tags_for_resource(self, resource_arn: str) -> dict[str, str]:
+ """
+ Lists the tags for a HealthLake resource.
+ :param resource_arn: The resource ARN.
+ :return: The tags for the resource.
+ """
+ try:
+ response = self.health_lake_client.list_tags_for_resource(
+ ResourceARN=resource_arn
+ )
+ return response["Tags"]
+ except ClientError as err:
+ logger.exception(
+ "Couldn't list tags for resource %s. Here's why",
+ resource_arn,
+ err.response["Error"]["Message"],
+ )
+ raise
+
+ # snippet-end:[python.example_code.healthlake.ListTagsForResource]
+
+ # snippet-start:[python.example_code.healthlake.UntagResource]
+ def untag_resource(self, resource_arn: str, tag_keys: list[str]) -> None:
+ """
+ Untags a HealthLake resource.
+ :param resource_arn: The resource ARN.
+ :param tag_keys: The tag keys to remove from the resource.
+ """
+ try:
+ self.health_lake_client.untag_resource(
+ ResourceARN=resource_arn, TagKeys=tag_keys
+ )
except ClientError as err:
- logger.exception("Couldn't describe datastore with ID %s. Here's why",
- datastore_id, err.response["Error"]["Message"])
+ logger.exception(
+ "Couldn't untag resource %s. Here's why",
+ resource_arn,
+ err.response["Error"]["Message"],
+ )
raise
- # snippet-end:[python.example_code.medical-imaging.HealthLakeWrapper]
+ # snippet-end:[python.example_code.healthlake.UntagResource]
+
+ # snippet-end:[python.example_code.healthlake.HealthLakeWrapper]
def wait_datastore_active(self, datastore_id: str) -> None:
"""
@@ -95,44 +445,233 @@ def wait_datastore_active(self, datastore_id: str) -> None:
:param datastore_id: The datastore ID.
"""
counter = 0
- max_count_minutes = 40 # It can take a while to create a datastore, so we'll wait up to 40 minutes.
- data_store_active = False
+ max_count_minutes = 40 # It can take a while to create a datastore, so we'll wait up to 40 minutes.
+ status = "CREATING"
while counter < max_count_minutes:
datastore = self.health_lake_client.describe_fhir_datastore(
- DatastoreId=datastore_id)
- if datastore["DatastoreProperties"]["DatastoreStatus"] == "ACTIVE":
- data_store_active = True
+ DatastoreId=datastore_id
+ )
+ status = datastore["DatastoreProperties"]["DatastoreStatus"]
+ if status == "ACTIVE" or status == "CREATE_FAILED":
break
else:
+ print(f"data store {status}, minutes {counter}")
counter += 1
time.sleep(60)
- if data_store_active :
- logger.info("Datastore with ID %s is active after %d minutes.", datastore_id, counter)
+ if status == "ACTIVE":
+ print(
+ f"Datastore with ID {datastore_id} is active after {counter} minutes."
+ )
+ elif status == "CREATE_FAILED":
+ raise ClientError(
+ "Create datastore with ID %s failed after %d minutes.",
+ datastore_id,
+ counter,
+ )
else:
- raise ClientError("Datastore with ID %s is not active after %d minutes.", datastore_id, counter)
+ raise ClientError(
+ "Datastore with ID %s is not active after %d minutes.",
+ datastore_id,
+ counter,
+ )
- try:
- waiter = self.health_lake_client.get_waiter("datastore_active")
- waiter.wait(DatastoreId=datastore_id)
- except ClientError as err:
- logger.exception("Data store with ID %s failed to become active. Here's why",
- datastore_id, err.response["Error"]["Message"])
- raise
+ def wait_import_job_complete(self, datastore_id: str, job_id: str) -> None:
+ """
+ Waits for a HealthLake import job to complete.
+ :param datastore_id: The datastore ID.
+ :param job_id: The import job ID.
+ """
+ counter = 0
+ max_count_minutes = (
+ 20
+ )
+ status = "IN_PROGRESS"
+ while counter < max_count_minutes:
+ job = self.describe_fihr_import_job(datastore_id, job_id)
+ status = job["JobStatus"]
+ if status == "COMPLETED" or status == "COMPLETED_WITH_ERRORS":
+ break
+ else:
+ print(f"Import job {status}, minutes {counter}")
+ counter += 1
+ time.sleep(60)
+
+ if status == "COMPLETED":
+ print(f"Import job with ID {job_id} is completed after {counter} minutes.")
+ elif status == "COMPLETED_WITH_ERRORS":
+ print(
+ f"Import job with ID {job_id} is completed with errors after {counter} minutes."
+ )
+ else:
+ raise ClientError(
+ "Import job with ID %s is not completed after %d minutes.",
+ job_id,
+ counter,
+ )
+
+ def wait_export_job_complete(self, datastore_id: str, job_id: str) -> None:
+ """
+ Waits for a HealthLake export job to complete.
+ :param datastore_id: The datastore ID.
+ :param job_id: The export job ID.
+ """
+ counter = 0
+ max_count_minutes = (
+ 20
+ )
+ status = "IN_PROGRESS"
+ while counter < max_count_minutes:
+ job = self.describe_fhir_export_job(datastore_id, job_id)
+ status = job["JobStatus"]
+ if status == "COMPLETED" or status == "COMPLETED_WITH_ERRORS":
+ break
+ else:
+ print(f"Export job {status}, minutes {counter}")
+ counter += 1
+ time.sleep(60)
+ if status == "COMPLETED":
+ print(f"Export job with ID {job_id} is completed after {counter} minutes.")
+ elif status == "COMPLETED_WITH_ERRORS":
+ print(
+ f"Export job with ID {job_id} is completed with errors after {counter} minutes."
+ )
+ else:
+ raise ClientError(
+ "Job with ID %s is not completed after %d minutes.", job_id, counter
+ )
def health_lake_demo(self) -> None:
- use_smart_data_store = False
- testing_code = True
+ use_smart_on_fihr_data_store = True
+
+ datastore_name = "health_imaging_datastore2"
+ if use_smart_on_fihr_data_store:
+ # snippet-start:[python.example_code.healthlake.CreateFHIRDatastore.smart]
+ sse_configuration = {
+ "KmsEncryptionConfig": {"CmkType": "AWS_OWNED_KMS_KEY"}
+ }
- datastore_name = "health_imaging_datastore"
- if use_smart_data_store:
- pass
+ metadata = {
+ "issuer": "https://ehr.example.com",
+ "jwks_uri": "https://ehr.example.com/.well-known/jwks.json",
+ "authorization_endpoint": "https://ehr.example.com/auth/authorize",
+ "token_endpoint": "https://ehr.token.com/auth/token",
+ "token_endpoint_auth_methods_supported": [
+ "client_secret_basic",
+ "foo",
+ ],
+ "grant_types_supported": ["client_credential", "foo"],
+ "registration_endpoint": "https://ehr.example.com/auth/register",
+ "scopes_supported": ["openId", "profile", "launch"],
+ "response_types_supported": ["code"],
+ "management_endpoint": "https://ehr.example.com/user/manage",
+ "introspection_endpoint": "https://ehr.example.com/user/introspect",
+ "revocation_endpoint": "https://ehr.example.com/user/revoke",
+ "code_challenge_methods_supported": ["S256"],
+ "capabilities": [
+ "launch-ehr",
+ "sso-openid-connect",
+ "client-public",
+ ],
+ }
+ indentity_provider_configuration = {
+ "AuthorizationStrategy": "SMART_ON_FHIR_V1",
+ "FineGrainedAuthorizationEnabled": True,
+ "IdpLambdaArn": "arn:aws:lambda:us-east-1:123502194722:function:healthlaketest37-ahl-introspec:active",
+ "Metadata": json.dumps(metadata),
+ }
+ data_store = self.create_fihr_datastore(
+ datastore_name, sse_configuration, indentity_provider_configuration
+ )
+ # snippet-end:[python.example_code.healthlake.CreateFHIRDatastore.smart]
else:
- data_store_id = self.health_imaging_client.list_datastores(
- maxResults=1
- )['datastoreResults'][0]['datastoreId']
+ data_store = self.create_fihr_datastore(datastore_name)
+
+ data_store_id = data_store["DatastoreId"]
+ data_store_arn = data_store["DatastoreArn"]
+
+ self.wait_datastore_active(data_store_id)
+ data_stores = self.list_fhir_datastores()
+
+ print(f"{len(data_stores)} data store(s) found.")
+ for data_store in data_stores:
+ if data_store["DatastoreId"] == data_store_id:
+ logger.info(
+ "Datastore with ID %s is %s.",
+ data_store_id,
+ data_store["DatastoreStatus"],
+ )
+ break
+ tags = [{"Key": "TagKey", "Value": "TagValue"}]
+
+ self.tag_resource(data_store_arn, tags)
+
+ tags = self.list_tags_for_resource(data_store_arn)
+ print(f"{len(tags)} tag(s) found.")
+ for tag in tags:
+ print(f"Tag key: {tag['Key']}, value: {tag['Value']}")
+
+ keys = []
+ for tag in tags:
+ keys.append(tag["Key"])
+
+ self.untag_resource(data_store_arn, keys)
+
+ job_name = "my_import_job"
+ input_s3_uri = (
+ "s3://health-lake-test-827365/import/examples/patient_example_chalmers.json"
+ )
+ output_s3_uri = "s3://health-lake-test-827365/import/output/"
+ kms_key_id = "arn:aws:kms:us-east-1:123502194722:key/b7f645cb-e564-4981-8672-9e012d1ff1a0"
+ data_access_role_arn = (
+ "arn:aws:iam::123502194722:role/healthlaketest37-ahl-full-access"
+ )
+ import_job = self.start_fihr_import_job(
+ job_name,
+ data_store_id,
+ input_s3_uri,
+ output_s3_uri,
+ kms_key_id,
+ data_access_role_arn,
+ )
+
+ import_job_id = import_job["JobId"]
+ print(f"Started import job with ID: {import_job_id}")
+
+ self.wait_import_job_complete(data_store_id, import_job_id)
+
+ import_jobs = self.list_fhir_import_jobs(
+ data_store_id, submitted_after=datetime.now() - timedelta(days=1)
+ )
+ print(f"{len(import_jobs)} import job(s) found.")
+ for import_job in import_jobs:
+ print(
+ f"Job id: {import_job['JobId']}, status: {import_job['JobStatus']}, submit time: {import_job['SubmitTime']}"
+ )
+
+ job_name = "my_export_job"
+ output_s3_uri = "s3://health-lake-test-827365/export/output/"
+ export_job = self.start_fhir_export_job(
+ job_name, data_store_id, output_s3_uri, kms_key_id, data_access_role_arn
+ )
+
+ export_job_id = export_job["JobId"]
+ print(f"Started export job with ID: {export_job_id}")
+ self.wait_export_job_complete(data_store_id, export_job_id)
+
+ export_jobs = self.list_fhir_export_jobs(
+ data_store_id, submitted_after=datetime.now() - timedelta(days=1)
+ )
+ print(f"{len(export_jobs)} export job(s) found.")
+ for export_job in export_jobs:
+ print(
+ f"Job id: {export_job['JobId']}, status: {export_job['JobStatus']}, submit time: {export_job['SubmitTime']}"
+ )
+
+
+# self.delete_fhir_datastore(data_store_id)
if __name__ == "__main__":
health_lake_wrapper = HealthLakeWrapper.from_client()
-
+ health_lake_wrapper.health_lake_demo()
From bf806c358b338396d5ae6a21f9c319c16ea40655 Mon Sep 17 00:00:00 2001
From: Steven Meyer <108885656+meyertst-aws@users.noreply.github.com>
Date: Thu, 21 Nov 2024 12:48:15 -0500
Subject: [PATCH 3/4] updating metadata
---
.doc_gen/metadata/healthlake_metadata.yaml | 156 ++++++++++++++++++
.../healthlake/health_lake_wrapper.py | 3 +
2 files changed, 159 insertions(+)
diff --git a/.doc_gen/metadata/healthlake_metadata.yaml b/.doc_gen/metadata/healthlake_metadata.yaml
index 04108e7ba25..6e159338307 100644
--- a/.doc_gen/metadata/healthlake_metadata.yaml
+++ b/.doc_gen/metadata/healthlake_metadata.yaml
@@ -17,3 +17,159 @@ healthlake_CreateFHIRDatastore:
- python.example_code.healthlake.CreateFHIRDatastore.smart
services:
healthlake: {CreateFHIRDatastore}
+healthlake_DescribeFHIRDatastore:
+ languages:
+ Python:
+ versions:
+ - sdk_version: 3
+ github: python/example_code/healthlake
+ excerpts:
+ - description:
+ snippet_tags:
+ - python.example_code.healthlake.HealthLakeWrapper.decl
+ - python.example_code.healthlake.DescribeFHIRDatastore
+ services:
+ healthlake: {DescribeFHIRDatastore}
+healthlake_ListFHIRDatastores:
+ languages:
+ Python:
+ versions:
+ - sdk_version: 3
+ github: python/example_code/healthlake
+ excerpts:
+ - description:
+ snippet_tags:
+ - python.example_code.healthlake.HealthLakeWrapper.decl
+ - python.example_code.healthlake.ListFHIRDatastores
+ services:
+ healthlake: {ListFHIRDatastores}
+healthlake_DeleteFHIRDatastore:
+ languages:
+ Python:
+ versions:
+ - sdk_version: 3
+ github: python/example_code/healthlake
+ excerpts:
+ - description:
+ snippet_tags:
+ - python.example_code.healthlake.HealthLakeWrapper.decl
+ - python.example_code.healthlake.DeleteFHIRDatastore
+ services:
+ healthlake: {DeleteFHIRDatastore}
+healthlake_StartFHIRImportJob:
+ languages:
+ Python:
+ versions:
+ - sdk_version: 3
+ github: python/example_code/healthlake
+ excerpts:
+ - description:
+ snippet_tags:
+ - python.example_code.healthlake.HealthLakeWrapper.decl
+ - python.example_code.healthlake.StartFHIRImportJob
+ services:
+ healthlake: {StartFHIRImportJob}
+healthlake_DescribeFHIRImportJob:
+ languages:
+ Python:
+ versions:
+ - sdk_version: 3
+ github: python/example_code/healthlake
+ excerpts:
+ - description:
+ snippet_tags:
+ - python.example_code.healthlake.HealthLakeWrapper.decl
+ - python.example_code.healthlake.DescribeFHIRImportJob
+ services:
+ healthlake: {DescribeFHIRImportJob}
+healthlake_ListFHIRDatastoreImportJobs:
+ languages:
+ Python:
+ versions:
+ - sdk_version: 3
+ github: python/example_code/healthlake
+ excerpts:
+ - description:
+ snippet_tags:
+ - python.example_code.healthlake.HealthLakeWrapper.decl
+ - python.example_code.healthlake.ListFHIRDatastoreImportJobs
+ services:
+ healthlake: {ListFHIRDatastoreImportJobs}
+healthlake_StartFHIRExportJob:
+ languages:
+ Python:
+ versions:
+ - sdk_version: 3
+ github: python/example_code/healthlake
+ excerpts:
+ - description:
+ snippet_tags:
+ - python.example_code.healthlake.HealthLakeWrapper.decl
+ - python.example_code.healthlake.StartFHIRExportJob
+ services:
+ healthlake: {StartFHIRExportJob}
+healthlake_DescribeFHIRExportJob:
+ languages:
+ Python:
+ versions:
+ - sdk_version: 3
+ github: python/example_code/healthlake
+ excerpts:
+ - description:
+ snippet_tags:
+ - python.example_code.healthlake.HealthLakeWrapper.decl
+ - python.example_code.healthlake.DescribeFHIRExportJob
+ services:
+ healthlake: {DescribeFHIRExportJob}
+healthlake_ListFHIRExportJobs:
+ languages:
+ Python:
+ versions:
+ - sdk_version: 3
+ github: python/example_code/healthlake
+ excerpts:
+ - description:
+ snippet_tags:
+ - python.example_code.healthlake.HealthLakeWrapper.decl
+ - python.example_code.healthlake.ListFHIRExportJobs
+ services:
+ healthlake: {ListFHIRExportJobs}
+healthlake_TagResource:
+ languages:
+ Python:
+ versions:
+ - sdk_version: 3
+ github: python/example_code/healthlake
+ excerpts:
+ - description:
+ snippet_tags:
+ - python.example_code.healthlake.HealthLakeWrapper.decl
+ - python.example_code.healthlake.TagResource
+ services:
+ healthlake: {TagResource}
+healthlake_ListTagsForResource:
+ languages:
+ Python:
+ versions:
+ - sdk_version: 3
+ github: python/example_code/healthlake
+ excerpts:
+ - description:
+ snippet_tags:
+ - python.example_code.healthlake.HealthLakeWrapper.decl
+ - python.example_code.healthlake.ListTagsForResource
+ services:
+ healthlake: {ListTagsForResource}
+healthlake_UntagResource:
+ languages:
+ Python:
+ versions:
+ - sdk_version: 3
+ github: python/example_code/healthlake
+ excerpts:
+ - description:
+ snippet_tags:
+ - python.example_code.healthlake.HealthLakeWrapper.decl
+ - python.example_code.healthlake.UntagResource
+ services:
+ healthlake: {UntagResource}
diff --git a/python/example_code/healthlake/health_lake_wrapper.py b/python/example_code/healthlake/health_lake_wrapper.py
index c7ad67a6760..d5aea3febdc 100644
--- a/python/example_code/healthlake/health_lake_wrapper.py
+++ b/python/example_code/healthlake/health_lake_wrapper.py
@@ -101,6 +101,8 @@ def describe_fhir_datastore(self, datastore_id: str) -> dict[str, any]:
)
raise
+ # snippet-end:[python.example_code.healthlake.DescribeFHIRDatastore]
+
# snippet-start:[python.example_code.healthlake.ListFHIRDatastores]
def list_fhir_datastores(self) -> list[dict[str, any]]:
"""
@@ -127,6 +129,7 @@ def list_fhir_datastores(self) -> list[dict[str, any]]:
"Couldn't list datastores. Here's why", err.response["Error"]["Message"]
)
raise
+ # snippet-end:[python.example_code.healthlake.ListFHIRDatastores]
# snippet-start:[python.example_code.healthlake.DeleteFHIRDatastore]
def delete_fhir_datastore(self, datastore_id: str) -> None:
From 7f535f549b3fd7196e9e779f96ea94b79b380eed Mon Sep 17 00:00:00 2001
From: Steven Meyer <108885656+meyertst-aws@users.noreply.github.com>
Date: Thu, 21 Nov 2024 16:25:52 -0500
Subject: [PATCH 4/4] tests done
---
.doc_gen/metadata/healthlake_metadata.yaml | 2 +-
python/example_code/healthlake/README.md | 127 +----
.../healthlake/health_lake_wrapper.py | 69 +--
.../example_code/healthlake/test/conftest.py | 8 +-
.../test/test_health_lake_wrapper.py | 289 +++++++++++
.../test/test_medical_imaging_basics.py | 482 ------------------
python/test_tools/healthlake_stubber.py | 345 +++++++++++++
python/test_tools/stubber_factory.py | 3 +
8 files changed, 700 insertions(+), 625 deletions(-)
create mode 100644 python/example_code/healthlake/test/test_health_lake_wrapper.py
delete mode 100644 python/example_code/healthlake/test/test_medical_imaging_basics.py
create mode 100644 python/test_tools/healthlake_stubber.py
diff --git a/.doc_gen/metadata/healthlake_metadata.yaml b/.doc_gen/metadata/healthlake_metadata.yaml
index 6e159338307..850302d60cf 100644
--- a/.doc_gen/metadata/healthlake_metadata.yaml
+++ b/.doc_gen/metadata/healthlake_metadata.yaml
@@ -11,7 +11,7 @@ healthlake_CreateFHIRDatastore:
- python.example_code.healthlake.HealthLakeWrapper.decl
- python.example_code.healthlake.CreateFHIRDatastore
- description: >
- The following code shows the parameters for SMART on
+ The following code shows an example of parameters for a SMART on
FHIR-enabled HealthLake data store.
snippet_tags:
- python.example_code.healthlake.CreateFHIRDatastore.smart
diff --git a/python/example_code/healthlake/README.md b/python/example_code/healthlake/README.md
index 46974e528ce..6cc8b900788 100644
--- a/python/example_code/healthlake/README.md
+++ b/python/example_code/healthlake/README.md
@@ -1,13 +1,13 @@
-# HealthImaging code examples for the SDK for Python
+# HealthLake code examples for the SDK for Python
## Overview
-Shows how to use the AWS SDK for Python (Boto3) to work with AWS HealthImaging.
+Shows how to use the AWS SDK for Python (Boto3) to work with AWS HealthLake.
-_HealthImaging is a HIPAA-eligible service that helps health care providers and their medical imaging ISV partners store, transform, and apply machine learning to medical images._
+_HealthLake _
## ⚠ Important
@@ -34,42 +34,23 @@ python -m pip install -r requirements.txt
-### Get started
-
-- [Hello HealthImaging](imaging_set_and_frames_workflow/hello.py#L4) (`ListDatastores`)
-
-
### Single actions
Code excerpts that show you how to call individual service functions.
-- [CopyImageSet](health_lake_wrapper.py#L417)
-- [CreateDatastore](health_lake_wrapper.py#L31)
-- [DeleteDatastore](health_lake_wrapper.py#L104)
-- [DeleteImageSet](health_lake_wrapper.py#L489)
-- [GetDICOMImportJob](health_lake_wrapper.py#L158)
-- [GetDatastore](health_lake_wrapper.py#L54)
-- [GetImageFrame](health_lake_wrapper.py#L318)
-- [GetImageSet](health_lake_wrapper.py#L241)
-- [GetImageSetMetadata](health_lake_wrapper.py#L274)
-- [ListDICOMImportJobs](health_lake_wrapper.py#L183)
-- [ListDatastores](health_lake_wrapper.py#L79)
-- [ListImageSetVersions](health_lake_wrapper.py#L350)
-- [ListTagsForResource](health_lake_wrapper.py#L556)
-- [SearchImageSets](health_lake_wrapper.py#L211)
-- [StartDICOMImportJob](health_lake_wrapper.py#L124)
-- [TagResource](health_lake_wrapper.py#L514)
-- [UntagResource](health_lake_wrapper.py#L534)
-- [UpdateImageSetMetadata](health_lake_wrapper.py#L381)
-
-### Scenarios
-
-Code examples that show you how to accomplish a specific task by calling multiple
-functions within the same service.
-
-- [Get started with image sets and image frames](imaging_set_and_frames_workflow/imaging_set_and_frames.py)
-- [Tagging a data store](tagging_data_stores.py)
-- [Tagging an image set](tagging_image_sets.py)
+- [CreateFHIRDatastore](health_lake_wrapper.py#L42)
+- [DeleteFHIRDatastore](health_lake_wrapper.py#L136)
+- [DescribeFHIRDatastore](health_lake_wrapper.py#L84)
+- [DescribeFHIRExportJob](health_lake_wrapper.py#L310)
+- [DescribeFHIRImportJob](health_lake_wrapper.py#L197)
+- [ListFHIRDatastoreImportJobs](health_lake_wrapper.py#L222)
+- [ListFHIRDatastores](health_lake_wrapper.py#L106)
+- [ListFHIRExportJobs](health_lake_wrapper.py#L335)
+- [ListTagsForResource](health_lake_wrapper.py#L404)
+- [StartFHIRExportJob](health_lake_wrapper.py#L272)
+- [StartFHIRImportJob](health_lake_wrapper.py#L154)
+- [TagResource](health_lake_wrapper.py#L385)
+- [UntagResource](health_lake_wrapper.py#L426)
@@ -83,77 +64,7 @@ functions within the same service.
-#### Hello HealthImaging
-
-This example shows you how to get started using HealthImaging.
-
-```
-python imaging_set_and_frames_workflow/hello.py
-```
-
-
-#### Get started with image sets and image frames
-
-This example shows you how to import DICOM files and download image frames in HealthImaging.
- The implementation is structured as a workflow command-line
- application.
-
-
-- Set up resources for a DICOM import.
-- Import DICOM files into a data store.
-- Retrieve the image set IDs for the import job.
-- Retrieve the image frame IDs for the image sets.
-- Download, decode and verify the image frames.
-- Clean up resources.
-
-
-
-
-Start the example by running the following at a command prompt:
-
-```
-python imaging_set_and_frames_workflow/imaging_set_and_frames.py
-```
-
-
-
-
-
-#### Tagging a data store
-
-This example shows you how to tag a HealthImaging data store.
-
-
-
-
-
-Start the example by running the following at a command prompt:
-
-```
-python tagging_data_stores.py
-```
-
-
-
-
-
-#### Tagging an image set
-
-This example shows you how to tag a HealthImaging image set.
-
-
-
-
-
-Start the example by running the following at a command prompt:
-
-```
-python tagging_image_sets.py
-```
-
-
-
### Tests
@@ -170,9 +81,9 @@ in the `python` folder.
## Additional resources
-- [HealthImaging Developer Guide](https://docs.aws.amazon.com/healthimaging/latest/devguide/what-is.html)
-- [HealthImaging API Reference](https://docs.aws.amazon.com/healthimaging/latest/APIReference/Welcome.html)
-- [SDK for Python HealthImaging reference](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/medical-imaging.html)
+- [HealthLake Developer Guide](https://docs.aws.amazon.com/healthlake/latest/devguide/what-is-amazon-health-lake.html)
+- [HealthLake API Reference](https://docs.aws.amazon.com/healthlake/latest/APIReference/Welcome.html)
+- [SDK for Python HealthLake reference](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/medical-imaging.html)
diff --git a/python/example_code/healthlake/health_lake_wrapper.py b/python/example_code/healthlake/health_lake_wrapper.py
index d5aea3febdc..84231ddb80b 100644
--- a/python/example_code/healthlake/health_lake_wrapper.py
+++ b/python/example_code/healthlake/health_lake_wrapper.py
@@ -34,13 +34,13 @@ def from_client(cls) -> "HealthLakeWrapper":
:return: An instance of HealthLakeWrapper initialized with the default HealthLake client.
"""
- kms_client = boto3.client("healthlake")
- return cls(kms_client)
+ health_lake_client = boto3.client("healthlake")
+ return cls(health_lake_client)
# snippet-end:[python.example_code.healthlake.HealthLakeWrapper.decl]
# snippet-start:[python.example_code.healthlake.CreateFHIRDatastore]
- def create_fihr_datastore(
+ def create_fhir_datastore(
self,
datastore_name: str,
sse_configuration: dict[str, any] = None,
@@ -73,7 +73,7 @@ def create_fihr_datastore(
return response
except ClientError as err:
logger.exception(
- "Couldn't create datastore %s. Here's why",
+ "Couldn't create datastore %s. Here's why %s",
datastore_name,
err.response["Error"]["Message"],
)
@@ -95,7 +95,7 @@ def describe_fhir_datastore(self, datastore_id: str) -> dict[str, any]:
return response["DatastoreProperties"]
except ClientError as err:
logger.exception(
- "Couldn't describe datastore with ID %s. Here's why",
+ "Couldn't describe datastore with ID %s. Here's why %s",
datastore_id,
err.response["Error"]["Message"],
)
@@ -112,6 +112,8 @@ def list_fhir_datastores(self) -> list[dict[str, any]]:
try:
next_token = None
datastores = []
+
+ # Loop through paginated results.
while True:
parameters = {}
if next_token is not None:
@@ -122,11 +124,11 @@ def list_fhir_datastores(self) -> list[dict[str, any]]:
next_token = response["NextToken"]
else:
break
- response = self.health_lake_client.list_fhir_datastores()
- return response["DatastorePropertiesList"]
+
+ return datastores
except ClientError as err:
logger.exception(
- "Couldn't list datastores. Here's why", err.response["Error"]["Message"]
+ "Couldn't list datastores. Here's why %s", err.response["Error"]["Message"]
)
raise
# snippet-end:[python.example_code.healthlake.ListFHIRDatastores]
@@ -141,7 +143,7 @@ def delete_fhir_datastore(self, datastore_id: str) -> None:
self.health_lake_client.delete_fhir_datastore(DatastoreId=datastore_id)
except ClientError as err:
logger.exception(
- "Couldn't delete datastore with ID %s. Here's why",
+ "Couldn't delete datastore with ID %s. Here's why %s",
datastore_id,
err.response["Error"]["Message"],
)
@@ -150,7 +152,7 @@ def delete_fhir_datastore(self, datastore_id: str) -> None:
# snippet-end:[python.example_code.healthlake.DeleteFHIRDatastore]
# snippet-start:[python.example_code.healthlake.StartFHIRImportJob]
- def start_fihr_import_job(
+ def start_fhir_import_job(
self,
job_name: str,
datastore_id: str,
@@ -185,7 +187,7 @@ def start_fihr_import_job(
return response
except ClientError as err:
logger.exception(
- "Couldn't start import job. Here's why",
+ "Couldn't start import job. Here's why %s",
err.response["Error"]["Message"],
)
raise
@@ -193,7 +195,7 @@ def start_fihr_import_job(
# snippet-end:[python.example_code.healthlake.StartFHIRImportJob]
# snippet-start:[python.example_code.healthlake.DescribeFHIRImportJob]
- def describe_fihr_import_job(
+ def describe_fhir_import_job(
self, datastore_id: str, job_id: str
) -> dict[str, any]:
"""
@@ -209,7 +211,7 @@ def describe_fihr_import_job(
return response["ImportJobProperties"]
except ClientError as err:
logger.exception(
- "Couldn't describe import job with ID %s. Here's why",
+ "Couldn't describe import job with ID %s. Here's why %s",
job_id,
err.response["Error"]["Message"],
)
@@ -247,6 +249,7 @@ def list_fhir_import_jobs(
parameters["SubmittedAfter"] = submitted_after
next_token = None
jobs = []
+ # Loop through paginated results.
while True:
if next_token is not None:
parameters["NextToken"] = next_token
@@ -259,7 +262,7 @@ def list_fhir_import_jobs(
return jobs
except ClientError as err:
logger.exception(
- "Couldn't list import jobs. Here's why",
+ "Couldn't list import jobs. Here's why %s",
err.response["Error"]["Message"],
)
raise
@@ -297,7 +300,7 @@ def start_fhir_export_job(
return response
except ClientError as err:
logger.exception(
- "Couldn't start export job. Here's why",
+ "Couldn't start export job. Here's why %s",
err.response["Error"]["Message"],
)
raise
@@ -321,7 +324,7 @@ def describe_fhir_export_job(
return response["ExportJobProperties"]
except ClientError as err:
logger.exception(
- "Couldn't describe export job with ID %s. Here's why",
+ "Couldn't describe export job with ID %s. Here's why %s",
job_id,
err.response["Error"]["Message"],
)
@@ -359,6 +362,7 @@ def list_fhir_export_jobs(
parameters["SubmittedAfter"] = submitted_after
next_token = None
jobs = []
+ # Loop through paginated results.
while True:
if next_token is not None:
parameters["NextToken"] = next_token
@@ -371,7 +375,7 @@ def list_fhir_export_jobs(
return jobs
except ClientError as err:
logger.exception(
- "Couldn't list export jobs. Here's why",
+ "Couldn't list export jobs. Here's why %s",
err.response["Error"]["Message"],
)
raise
@@ -389,7 +393,7 @@ def tag_resource(self, resource_arn: str, tags: list[dict[str, str]]) -> None:
self.health_lake_client.tag_resource(ResourceARN=resource_arn, Tags=tags)
except ClientError as err:
logger.exception(
- "Couldn't tag resource %s. Here's why",
+ "Couldn't tag resource %s. Here's why %s",
resource_arn,
err.response["Error"]["Message"],
)
@@ -411,7 +415,7 @@ def list_tags_for_resource(self, resource_arn: str) -> dict[str, str]:
return response["Tags"]
except ClientError as err:
logger.exception(
- "Couldn't list tags for resource %s. Here's why",
+ "Couldn't list tags for resource %s. Here's why %s",
resource_arn,
err.response["Error"]["Message"],
)
@@ -432,7 +436,7 @@ def untag_resource(self, resource_arn: str, tag_keys: list[str]) -> None:
)
except ClientError as err:
logger.exception(
- "Couldn't untag resource %s. Here's why",
+ "Couldn't untag resource %s. Here's why %s",
resource_arn,
err.response["Error"]["Message"],
)
@@ -491,7 +495,7 @@ def wait_import_job_complete(self, datastore_id: str, job_id: str) -> None:
)
status = "IN_PROGRESS"
while counter < max_count_minutes:
- job = self.describe_fihr_import_job(datastore_id, job_id)
+ job = self.describe_fhir_import_job(datastore_id, job_id)
status = job["JobStatus"]
if status == "COMPLETED" or status == "COMPLETED_WITH_ERRORS":
break
@@ -545,15 +549,15 @@ def wait_export_job_complete(self, datastore_id: str, job_id: str) -> None:
)
def health_lake_demo(self) -> None:
- use_smart_on_fihr_data_store = True
+ use_smart_on_fhir_data_store = True
datastore_name = "health_imaging_datastore2"
- if use_smart_on_fihr_data_store:
+ if use_smart_on_fhir_data_store:
# snippet-start:[python.example_code.healthlake.CreateFHIRDatastore.smart]
sse_configuration = {
"KmsEncryptionConfig": {"CmkType": "AWS_OWNED_KMS_KEY"}
}
-
+ # TODO: Update the metadata to match your environment.
metadata = {
"issuer": "https://ehr.example.com",
"jwks_uri": "https://ehr.example.com/.well-known/jwks.json",
@@ -577,18 +581,19 @@ def health_lake_demo(self) -> None:
"client-public",
],
}
- indentity_provider_configuration = {
+ # TODO: Update the IdpLambdaArn.
+ identity_provider_configuration = {
"AuthorizationStrategy": "SMART_ON_FHIR_V1",
"FineGrainedAuthorizationEnabled": True,
- "IdpLambdaArn": "arn:aws:lambda:us-east-1:123502194722:function:healthlaketest37-ahl-introspec:active",
+ "IdpLambdaArn": "arn:aws:lambda:your-region:your-account-id:function:your-lambda-name",
"Metadata": json.dumps(metadata),
}
- data_store = self.create_fihr_datastore(
- datastore_name, sse_configuration, indentity_provider_configuration
+ data_store = self.create_fhir_datastore(
+ datastore_name, sse_configuration, identity_provider_configuration
)
# snippet-end:[python.example_code.healthlake.CreateFHIRDatastore.smart]
else:
- data_store = self.create_fihr_datastore(datastore_name)
+ data_store = self.create_fhir_datastore(datastore_name)
data_store_id = data_store["DatastoreId"]
data_store_arn = data_store["DatastoreArn"]
@@ -629,7 +634,7 @@ def health_lake_demo(self) -> None:
data_access_role_arn = (
"arn:aws:iam::123502194722:role/healthlaketest37-ahl-full-access"
)
- import_job = self.start_fihr_import_job(
+ import_job = self.start_fhir_import_job(
job_name,
data_store_id,
input_s3_uri,
@@ -671,8 +676,8 @@ def health_lake_demo(self) -> None:
f"Job id: {export_job['JobId']}, status: {export_job['JobStatus']}, submit time: {export_job['SubmitTime']}"
)
-
-# self.delete_fhir_datastore(data_store_id)
+ self.delete_fhir_datastore(data_store_id)
+ print(f"Data store with ID {data_store_id} deleted.")
if __name__ == "__main__":
diff --git a/python/example_code/healthlake/test/conftest.py b/python/example_code/healthlake/test/conftest.py
index 2d0663bfbd2..0faa3f8991e 100644
--- a/python/example_code/healthlake/test/conftest.py
+++ b/python/example_code/healthlake/test/conftest.py
@@ -7,7 +7,11 @@
"""
import sys
+import os
+
+script_dir = os.path.dirname(os.path.abspath(__file__))
# This is needed so Python can find test_tools on the path.
-sys.path.append("../..")
-from test_tools.fixtures.common import *
+sys.path.append(os.path.join(script_dir, "../../.."))
+
+from test_tools.fixtures.common import *
\ No newline at end of file
diff --git a/python/example_code/healthlake/test/test_health_lake_wrapper.py b/python/example_code/healthlake/test/test_health_lake_wrapper.py
new file mode 100644
index 00000000000..8c2f4dbb097
--- /dev/null
+++ b/python/example_code/healthlake/test/test_health_lake_wrapper.py
@@ -0,0 +1,289 @@
+# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+# SPDX-License-Identifier: Apache-2.0
+
+"""
+Unit tests for health_lake_wrapper functions.
+"""
+
+import os
+import sys
+
+import boto3
+import pytest
+from botocore.exceptions import ClientError
+
+script_dir = os.path.dirname(os.path.abspath(__file__))
+
+# Append parent directory to import health_lake_wrapper.
+sys.path.append(os.path.join(script_dir, ".."))
+from health_lake_wrapper import HealthLakeWrapper
+
+
+@pytest.mark.parametrize("error_code", [None, "TestException"])
+def test_create_fhir_datastore(make_stubber, error_code):
+ healthlake_client = boto3.client("healthlake")
+ healthlake_stubber = make_stubber(healthlake_client)
+ wrapper = HealthLakeWrapper(healthlake_client)
+ datastore_name = "test-datastore"
+ datastore_id = "abcdedf1234567890abcdef123456789"
+
+ healthlake_stubber.stub_create_fhir_datastore(
+ datastore_name, datastore_id, error_code=error_code
+ )
+
+ if error_code is None:
+ response = wrapper.create_fhir_datastore(datastore_name)
+ assert response["DatastoreId"] == datastore_id
+ else:
+ with pytest.raises(ClientError) as exc_info:
+ wrapper.create_fhir_datastore(datastore_name)
+ assert exc_info.value.response["Error"]["Code"] == error_code
+
+
+@pytest.mark.parametrize("error_code", [None, "TestException"])
+def test_describe_fhir_datastore(make_stubber, error_code):
+ healthlake_client = boto3.client("healthlake")
+ healthlake_stubber = make_stubber(healthlake_client)
+ wrapper = HealthLakeWrapper(healthlake_client)
+ datastore_id = "abcdedf1234567890abcdef123456789"
+
+ healthlake_stubber.stub_describe_fhir_datastore(datastore_id, error_code=error_code)
+
+ if error_code is None:
+ response = wrapper.describe_fhir_datastore(datastore_id)
+ assert response["DatastoreId"] == datastore_id
+ else:
+ with pytest.raises(ClientError) as exc_info:
+ wrapper.describe_fhir_datastore(datastore_id)
+ assert exc_info.value.response["Error"]["Code"] == error_code
+
+
+@pytest.mark.parametrize("error_code", [None, "TestException"])
+def test_list_fhir_datastores(make_stubber, error_code):
+ healthlake_client = boto3.client("healthlake")
+ healthlake_stubber = make_stubber(healthlake_client)
+ wrapper = HealthLakeWrapper(healthlake_client)
+
+ healthlake_stubber.stub_list_fhir_datastores(error_code=error_code)
+
+ if error_code is None:
+ response = wrapper.list_fhir_datastores()
+ assert len(response) == 1
+ else:
+ with pytest.raises(ClientError) as exc_info:
+ wrapper.list_fhir_datastores()
+ assert exc_info.value.response["Error"]["Code"] == error_code
+
+
+@pytest.mark.parametrize("error_code", [None, "TestException"])
+def test_delete_fhir_datastore(make_stubber, error_code):
+ healthlake_client = boto3.client("healthlake")
+ healthlake_stubber = make_stubber(healthlake_client)
+ wrapper = HealthLakeWrapper(healthlake_client)
+ datastore_id = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
+
+ healthlake_stubber.stub_delete_fhir_datastore(datastore_id, error_code=error_code)
+
+ if error_code is None:
+ wrapper.delete_fhir_datastore(datastore_id)
+ else:
+ with pytest.raises(ClientError) as exc_info:
+ wrapper.delete_fhir_datastore(datastore_id)
+ assert exc_info.value.response["Error"]["Code"] == error_code
+
+
+@pytest.mark.parametrize("error_code", [None, "TestException"])
+def test_start_fhir_import_job(make_stubber, error_code):
+ healthlake_client = boto3.client("healthlake")
+ healthlake_stubber = make_stubber(healthlake_client)
+ wrapper = HealthLakeWrapper(healthlake_client)
+ job_name = "test-job"
+ datastore_id = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
+ input_s3_uri = "s3://amzn-s3-demo-bucket/test-data"
+ job_output_s3_uri = "s3://amzn-s3-demo-bucket/test-output"
+ kms_key_id = "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX"
+ data_access_role_arn = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
+
+ healthlake_stubber.stub_start_fhir_import_job(
+ job_name,
+ datastore_id,
+ input_s3_uri,
+ job_output_s3_uri,
+ kms_key_id,
+ data_access_role_arn,
+ error_code=error_code,
+ )
+
+ if error_code is None:
+ wrapper.start_fhir_import_job(
+ job_name,
+ datastore_id,
+ input_s3_uri,
+ job_output_s3_uri,
+ kms_key_id,
+ data_access_role_arn,
+ )
+ else:
+ with pytest.raises(ClientError) as exc_info:
+ wrapper.start_fhir_import_job(
+ job_name,
+ datastore_id,
+ input_s3_uri,
+ job_output_s3_uri,
+ kms_key_id,
+ data_access_role_arn,
+ )
+ assert exc_info.value.response["Error"]["Code"] == error_code
+
+@pytest.mark.parametrize("error_code", [None, "TestException"])
+def test_describe_fhir_import_job(make_stubber, error_code):
+ healthlake_client = boto3.client("healthlake")
+ healthlake_stubber = make_stubber(healthlake_client)
+ wrapper = HealthLakeWrapper(healthlake_client)
+ datastore_id = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
+ job_id = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
+
+ healthlake_stubber.stub_describe_fhir_import_job(
+ datastore_id, job_id, error_code=error_code
+ )
+
+ if error_code is None:
+ wrapper.describe_fhir_import_job(datastore_id, job_id)
+ else:
+ with pytest.raises(ClientError) as exc_info:
+ wrapper.describe_fhir_import_job(datastore_id, job_id)
+ assert exc_info.value.response["Error"]["Code"] == error_code
+
+@pytest.mark.parametrize("error_code", [None, "TestException"])
+def test_list_fhir_import_jobs(make_stubber, error_code):
+ healthlake_client = boto3.client("healthlake")
+ healthlake_stubber = make_stubber(healthlake_client)
+ wrapper = HealthLakeWrapper(healthlake_client)
+ datastore_id = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
+
+ healthlake_stubber.stub_list_fhir_import_jobs(datastore_id, error_code=error_code)
+
+ if error_code is None:
+ wrapper.list_fhir_import_jobs(datastore_id)
+ else:
+ with pytest.raises(ClientError) as exc_info:
+ wrapper.list_fhir_import_jobs(datastore_id)
+ assert exc_info.value.response["Error"]["Code"] == error_code
+
+@pytest.mark.parametrize("error_code", [None, "TestException"])
+def test_start_fhir_export_job(make_stubber, error_code):
+ healthlake_client = boto3.client("healthlake")
+ healthlake_stubber = make_stubber(healthlake_client)
+ wrapper = HealthLakeWrapper(healthlake_client)
+ job_name = "test-job"
+ datastore_id = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
+ output_s3_uri = "s3://amzn-s3-demo-bucket/test-output"
+ data_access_role_arn = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
+ kms_key_id = "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX"
+
+ healthlake_stubber.stub_start_fhir_export_job(
+ job_name,
+ datastore_id,
+ output_s3_uri,
+ kms_key_id,
+ data_access_role_arn,
+ error_code=error_code,
+ )
+
+ if error_code is None:
+ wrapper.start_fhir_export_job(
+ job_name,
+ datastore_id,
+ output_s3_uri,
+ kms_key_id,
+ data_access_role_arn,
+ )
+ else:
+ with pytest.raises(ClientError) as exc_info:
+ wrapper.start_fhir_export_job(
+ job_name,
+ datastore_id,
+ output_s3_uri,
+ kms_key_id,
+ data_access_role_arn,
+ )
+ assert exc_info.value.response["Error"]["Code"] == error_code
+
+@pytest.mark.parametrize("error_code", [None, "TestException"])
+def test_list_fhir_export_jobs(make_stubber, error_code):
+ healthlake_client = boto3.client("healthlake")
+ healthlake_stubber = make_stubber(healthlake_client)
+ wrapper = HealthLakeWrapper(healthlake_client)
+ datastore_id = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
+
+ healthlake_stubber.stub_list_fhir_export_jobs(datastore_id, error_code=error_code)
+
+ if error_code is None:
+ wrapper.list_fhir_export_jobs(datastore_id)
+ else:
+ with pytest.raises(ClientError) as exc_info:
+ wrapper.list_fhir_export_jobs(datastore_id)
+ assert exc_info.value.response["Error"]["Code"] == error_code
+
+@pytest.mark.parametrize("error_code", [None, "TestException"])
+def test_describe_fhir_export_job(make_stubber, error_code):
+ healthlake_client = boto3.client("healthlake")
+ healthlake_stubber = make_stubber(healthlake_client)
+ wrapper = HealthLakeWrapper(healthlake_client)
+ datastore_id = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
+ job_id = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
+
+ healthlake_stubber.stub_describe_fhir_export_job(
+ datastore_id, job_id, error_code=error_code
+ )
+
+ if error_code is None:
+ wrapper.describe_fhir_export_job(datastore_id, job_id)
+ else:
+ with pytest.raises(ClientError) as exc_info:
+ wrapper.describe_fhir_export_job(datastore_id, job_id)
+ assert exc_info.value.response["Error"]["Code"] == error_code
+
+@pytest.mark.parametrize("error_code", [None, "TestException"])
+def test_tag_resource(make_stubber, error_code):
+ healthlake_client = boto3.client("healthlake")
+ healthlake_stubber = make_stubber(healthlake_client)
+ wrapper = HealthLakeWrapper(healthlake_client)
+ resource_arn = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
+ tags = [{"Key" :"test-key", "Value" : "test-value"}]
+ healthlake_stubber.stub_tag_resource(resource_arn, tags, error_code=error_code)
+ if error_code is None:
+ wrapper.tag_resource(resource_arn, tags)
+ else:
+ with pytest.raises(ClientError) as exc_info:
+ wrapper.tag_resource(resource_arn, tags)
+ assert exc_info.value.response["Error"]["Code"] == error_code
+
+@pytest.mark.parametrize("error_code", [None, "TestException"])
+def test_untag_resource(make_stubber, error_code):
+ healthlake_client = boto3.client("healthlake")
+ healthlake_stubber = make_stubber(healthlake_client)
+ wrapper = HealthLakeWrapper(healthlake_client)
+ resource_arn = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
+ tag_keys = ["test-key"]
+ healthlake_stubber.stub_untag_resource(resource_arn, tag_keys, error_code=error_code)
+ if error_code is None:
+ wrapper.untag_resource(resource_arn, tag_keys)
+ else:
+ with pytest.raises(ClientError) as exc_info:
+ wrapper.untag_resource(resource_arn, tag_keys)
+ assert exc_info.value.response["Error"]["Code"] == error_code
+
+@pytest.mark.parametrize("error_code", [None, "TestException"])
+def test_list_tags_for_resource(make_stubber, error_code):
+ healthlake_client = boto3.client("healthlake")
+ healthlake_stubber = make_stubber(healthlake_client)
+ wrapper = HealthLakeWrapper(healthlake_client)
+ resource_arn = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
+ healthlake_stubber.stub_list_tags_for_resource(resource_arn, error_code=error_code)
+ if error_code is None:
+ wrapper.list_tags_for_resource(resource_arn)
+ else:
+ with pytest.raises(ClientError) as exc_info:
+ wrapper.list_tags_for_resource(resource_arn)
+ assert exc_info.value.response["Error"]["Code"] == error_code
diff --git a/python/example_code/healthlake/test/test_medical_imaging_basics.py b/python/example_code/healthlake/test/test_medical_imaging_basics.py
deleted file mode 100644
index 7956e8978ec..00000000000
--- a/python/example_code/healthlake/test/test_medical_imaging_basics.py
+++ /dev/null
@@ -1,482 +0,0 @@
-# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
-# SPDX-License-Identifier: Apache-2.0
-
-"""
-Unit tests for medical_imaging_basics functions.
-"""
-
-import os
-
-import boto3
-import pytest
-from botocore.exceptions import ClientError
-
-from health_lake_wrapper import MedicalImagingWrapper
-
-
-@pytest.mark.parametrize("error_code", [None, "TestException"])
-def test_create_datastore(make_stubber, error_code):
- medical_imaging_client = boto3.client("medical-imaging")
- medical_imaging_stubber = make_stubber(medical_imaging_client)
- wrapper = MedicalImagingWrapper(medical_imaging_client)
- datastore_name = "test-datastore"
- datastore_id = "abcdedf1234567890abcdef123456789"
-
- medical_imaging_stubber.stub_create_datastore(
- datastore_name, datastore_id, error_code=error_code
- )
-
- if error_code is None:
- got_datastore_id = wrapper.create_datastore(datastore_name)
- assert got_datastore_id == datastore_id
- else:
- with pytest.raises(ClientError) as exc_info:
- wrapper.create_datastore(datastore_name)
- assert exc_info.value.response["Error"]["Code"] == error_code
-
-
-@pytest.mark.parametrize("error_code", [None, "TestException"])
-def test_get_datastore_properties(make_stubber, error_code):
- medical_imaging_client = boto3.client("medical-imaging")
- medical_imaging_stubber = make_stubber(medical_imaging_client)
- wrapper = MedicalImagingWrapper(medical_imaging_client)
- datastore_id = "abcdedf1234567890abcdef123456789"
-
- medical_imaging_stubber.stub_get_datastore_properties(
- datastore_id, error_code=error_code
- )
-
- if error_code is None:
- got_properties = wrapper.get_datastore_properties(datastore_id)
- assert got_properties["datastoreId"] == datastore_id
- else:
- with pytest.raises(ClientError) as exc_info:
- wrapper.get_datastore_properties(datastore_id)
- assert exc_info.value.response["Error"]["Code"] == error_code
-
-
-@pytest.mark.parametrize("error_code", [None, "TestException"])
-def test_list_datastores(make_stubber, error_code):
- medical_imaging_client = boto3.client("medical-imaging")
- medical_imaging_stubber = make_stubber(medical_imaging_client)
- wrapper = MedicalImagingWrapper(medical_imaging_client)
- datastore_id = "abcdedf1234567890abcdef123456789"
- medical_imaging_stubber.stub_list_datastores(datastore_id, error_code=error_code)
-
- if error_code is None:
- datastores = wrapper.list_datastores()
- assert datastores[0]["datastoreId"] == datastore_id
- else:
- with pytest.raises(ClientError) as exc_info:
- wrapper.list_datastores()
- assert exc_info.value.response["Error"]["Code"] == error_code
-
-
-@pytest.mark.parametrize("error_code", [None, "TestException"])
-def test_delete_datastore(make_stubber, error_code):
- medical_imaging_client = boto3.client("medical-imaging")
- medical_imaging_stubber = make_stubber(medical_imaging_client)
- wrapper = MedicalImagingWrapper(medical_imaging_client)
- datastore_id = "abcdedf1234567890abcdef123456789"
-
- medical_imaging_stubber.stub_delete_data_store(datastore_id, error_code=error_code)
-
- if error_code is None:
- wrapper.delete_datastore(datastore_id)
- else:
- with pytest.raises(ClientError) as exc_info:
- wrapper.delete_datastore(datastore_id)
- assert exc_info.value.response["Error"]["Code"] == error_code
-
-
-@pytest.mark.parametrize("error_code", [None, "TestException"])
-def test_start_dicom_import_job(make_stubber, error_code):
- medical_imaging_client = boto3.client("medical-imaging")
- medical_imaging_stubber = make_stubber(medical_imaging_client)
- wrapper = MedicalImagingWrapper(medical_imaging_client)
- job_id = "cccccc1234567890abcdef123456789"
- job_name = "job_name"
- datastore_id = "abcdedf1234567890abcdef123456789"
- role_arn = "arn:aws:iam::111111111111:role/dicom_import"
- input_s3_uri = "s3://healthimaging-source/CRStudy/"
- output_s3_uri = "s3://healthimaging-destination/CRStudy/"
-
- medical_imaging_stubber.stub_start_dicom_import_job(
- job_name,
- datastore_id,
- role_arn,
- input_s3_uri,
- output_s3_uri,
- job_id,
- error_code=error_code,
- )
-
- if error_code is None:
- result = wrapper.start_dicom_import_job(
- job_name, datastore_id, role_arn, input_s3_uri, output_s3_uri
- )
- assert result == job_id
- else:
- with pytest.raises(ClientError) as exc_info:
- wrapper.start_dicom_import_job(
- job_name, datastore_id, role_arn, input_s3_uri, output_s3_uri
- )
- assert exc_info.value.response["Error"]["Code"] == error_code
-
-
-@pytest.mark.parametrize("error_code", [None, "TestException"])
-def test_get_dicom_import_job(make_stubber, error_code):
- medical_imaging_client = boto3.client("medical-imaging")
- medical_imaging_stubber = make_stubber(medical_imaging_client)
- wrapper = MedicalImagingWrapper(medical_imaging_client)
- datastore_id = "abcdedf1234567890abcdef123456789"
- job_id = "cccccc1234567890abcdef123456789"
- job_status = "TESTING"
-
- medical_imaging_stubber.stub_get_dicom_import_job(
- job_id, datastore_id, job_status, error_code=error_code
- )
-
- if error_code is None:
- result = wrapper.get_dicom_import_job(datastore_id, job_id)
- assert result["jobStatus"] == job_status
- else:
- with pytest.raises(ClientError) as exc_info:
- wrapper.get_dicom_import_job(datastore_id, job_id)
- assert exc_info.value.response["Error"]["Code"] == error_code
-
-
-@pytest.mark.parametrize("error_code", [None, "TestException"])
-def test_list_dicom_import_jobs(make_stubber, error_code):
- medical_imaging_client = boto3.client("medical-imaging")
- medical_imaging_stubber = make_stubber(medical_imaging_client)
- wrapper = MedicalImagingWrapper(medical_imaging_client)
- datastore_id = "abcdedf1234567890abcdef123456789"
-
- medical_imaging_stubber.stub_list_dicom_import_jobs(
- datastore_id, error_code=error_code
- )
-
- if error_code is None:
- wrapper.list_dicom_import_jobs(datastore_id)
-
- else:
- with pytest.raises(ClientError) as exc_info:
- wrapper.list_dicom_import_jobs(datastore_id)
- assert exc_info.value.response["Error"]["Code"] == error_code
-
-
-@pytest.mark.parametrize("error_code", [None, "TestException"])
-def test_search_mage_sets(make_stubber, error_code):
- medical_imaging_client = boto3.client("medical-imaging")
- medical_imaging_stubber = make_stubber(medical_imaging_client)
- wrapper = MedicalImagingWrapper(medical_imaging_client)
- datastore_id = "abcdedf1234567890abcdef123456789"
- search_filter = {
- "filters": [
- {
- "values": [
- {"createdAt": "2023-09-13T14:13:39.302000-04:00"},
- {"createdAt": "2023-09-13T14:13:39.302000-04:00"},
- ],
- "operator": "BETWEEN",
- }
- ]
- }
- medical_imaging_stubber.stub_search_image_sets(
- datastore_id, search_filter, error_code=error_code
- )
-
- if error_code is None:
- wrapper.search_image_sets(datastore_id, search_filter)
-
- else:
- with pytest.raises(ClientError) as exc_info:
- wrapper.search_image_sets(datastore_id, search_filter)
- assert exc_info.value.response["Error"]["Code"] == error_code
-
-
-@pytest.mark.parametrize("error_code", [None, "TestException"])
-def test_get_image_set(make_stubber, error_code):
- medical_imaging_client = boto3.client("medical-imaging")
- medical_imaging_stubber = make_stubber(medical_imaging_client)
- wrapper = MedicalImagingWrapper(medical_imaging_client)
- datastore_id = "abcdedf1234567890abcdef123456789"
- image_set_id = "cccccc1234567890abcdef123456789"
- version_id = "1"
-
- medical_imaging_stubber.stub_get_image_set(
- datastore_id, image_set_id, version_id, error_code=error_code
- )
-
- if error_code is None:
- wrapper.get_image_set(datastore_id, image_set_id, version_id)
-
- else:
- with pytest.raises(ClientError) as exc_info:
- wrapper.get_image_set(datastore_id, image_set_id, version_id)
- assert exc_info.value.response["Error"]["Code"] == error_code
-
-
-@pytest.mark.parametrize("error_code", [None, "TestException"])
-def test_get_image_set_metadata(make_stubber, error_code):
- medical_imaging_client = boto3.client("medical-imaging")
- medical_imaging_stubber = make_stubber(medical_imaging_client)
- wrapper = MedicalImagingWrapper(medical_imaging_client)
- datastore_id = "abcdedf1234567890abcdef123456789"
- image_set_id = "cccccc1234567890abcdef123456789"
- test_file = "med-imag-test_file_1234.gzip"
- medical_imaging_stubber.stub_get_image_set_metadata(
- datastore_id, image_set_id, error_code=error_code
- )
-
- if error_code is None:
- wrapper.get_image_set_metadata(test_file, datastore_id, image_set_id)
- assert os.path.exists(test_file)
- os.remove(test_file)
-
- else:
- with pytest.raises(ClientError) as exc_info:
- wrapper.get_image_set_metadata(test_file, datastore_id, image_set_id)
- assert exc_info.value.response["Error"]["Code"] == error_code
-
-
-@pytest.mark.parametrize("error_code", [None, "TestException"])
-def test_get_pixel_data(make_stubber, error_code):
- medical_imaging_client = boto3.client("medical-imaging")
- medical_imaging_stubber = make_stubber(medical_imaging_client)
- wrapper = MedicalImagingWrapper(medical_imaging_client)
- datastore_id = "abcdedf1234567890abcdef123456789"
- image_set_id = "cccccc1234567890abcdef123456789"
- image_frame_id = "cccccc1234567890abcdef123456789"
- test_file = "med-imag-test_file_789654.jph"
- medical_imaging_stubber.stub_get_pixel_data(
- datastore_id, image_set_id, image_frame_id, error_code=error_code
- )
-
- if error_code is None:
- wrapper.get_pixel_data(test_file, datastore_id, image_set_id, image_frame_id)
- assert os.path.exists(test_file)
- os.remove(test_file)
-
- else:
- with pytest.raises(ClientError) as exc_info:
- wrapper.get_pixel_data(
- test_file, datastore_id, image_set_id, image_frame_id
- )
- assert exc_info.value.response["Error"]["Code"] == error_code
-
-
-@pytest.mark.parametrize("error_code", [None, "TestException"])
-def test_list_image_set_versions(make_stubber, error_code):
- medical_imaging_client = boto3.client("medical-imaging")
- medical_imaging_stubber = make_stubber(medical_imaging_client)
- wrapper = MedicalImagingWrapper(medical_imaging_client)
- datastore_id = "abcdedf1234567890abcdef123456789"
- image_set_id = "cccccc1234567890abcdef123456789"
-
- medical_imaging_stubber.stub_list_image_set_versions(
- datastore_id, image_set_id, error_code=error_code
- )
-
- if error_code is None:
- wrapper.list_image_set_versions(datastore_id, image_set_id)
-
- else:
- with pytest.raises(ClientError) as exc_info:
- wrapper.list_image_set_versions(datastore_id, image_set_id)
- assert exc_info.value.response["Error"]["Code"] == error_code
-
-
-@pytest.mark.parametrize("error_code", [None, "TestException"])
-def test_update_image_set_metadata(make_stubber, error_code):
- medical_imaging_client = boto3.client("medical-imaging")
- medical_imaging_stubber = make_stubber(medical_imaging_client)
- wrapper = MedicalImagingWrapper(medical_imaging_client)
- datastore_id = "abcdedf1234567890abcdef123456789"
- image_set_id = "cccccc1234567890abcdef123456789"
- version_id = "1"
- force = False
- metadata = {
- "DICOMUpdates": {
- "updatableAttributes": '{"SchemaVersion":1.1,"Patient":{"DICOM":{"PatientName":"Garcia^Gloria"}}}'
- }
- }
-
- medical_imaging_stubber.stub_update_image_set_metadata(
- datastore_id, image_set_id, version_id, metadata, force, error_code=error_code
- )
-
- if error_code is None:
- wrapper.update_image_set_metadata(
- datastore_id, image_set_id, version_id, metadata, force
- )
-
- else:
- with pytest.raises(ClientError) as exc_info:
- wrapper.update_image_set_metadata(
- datastore_id, image_set_id, version_id, metadata, force
- )
- assert exc_info.value.response["Error"]["Code"] == error_code
-
-
-@pytest.mark.parametrize("error_code", [None, "TestException"])
-def test_copy_image_set_without_destination(make_stubber, error_code):
- medical_imaging_client = boto3.client("medical-imaging")
- medical_imaging_stubber = make_stubber(medical_imaging_client)
- wrapper = MedicalImagingWrapper(medical_imaging_client)
- datastore_id = "abcdedf1234567890abcdef123456789"
- image_set_id = "cccccc1234567890abcdef123456789"
- version_id = "1"
- new_image_set_id = "cccccc1234567890abcdef123456789"
-
- medical_imaging_stubber.stub_copy_image_set_without_destination(
- datastore_id, image_set_id, version_id, new_image_set_id, error_code=error_code
- )
-
- if error_code is None:
- wrapper.copy_image_set(datastore_id, image_set_id, version_id)
-
- else:
- with pytest.raises(ClientError) as exc_info:
- wrapper.copy_image_set(datastore_id, image_set_id, version_id)
- assert exc_info.value.response["Error"]["Code"] == error_code
-
-
-@pytest.mark.parametrize("error_code", [None, "TestException"])
-def test_copy_image_set_with_destination(make_stubber, error_code):
- medical_imaging_client = boto3.client("medical-imaging")
- medical_imaging_stubber = make_stubber(medical_imaging_client)
- wrapper = MedicalImagingWrapper(medical_imaging_client)
- datastore_id = "abcdedf1234567890abcdef123456789"
- image_set_id = "cccccc1234567890abcdef123456789"
- version_id = "1"
- destination_image_set_id = "cccccc1234567890abcdef123456789"
- destination_version_id = "1"
- force = True
- subset_Id = "cccccc1234567890abcdef123456789"
-
- medical_imaging_stubber.stub_copy_image_set_with_destination(
- datastore_id,
- image_set_id,
- version_id,
- destination_image_set_id,
- destination_version_id,
- force,
- subset_Id,
- error_code=error_code,
- )
-
- if error_code is None:
- wrapper.copy_image_set(
- datastore_id,
- image_set_id,
- version_id,
- destination_image_set_id,
- destination_version_id,
- force,
- [subset_Id],
- )
-
- else:
- with pytest.raises(ClientError) as exc_info:
- wrapper.copy_image_set(
- datastore_id,
- image_set_id,
- version_id,
- destination_image_set_id,
- destination_version_id,
- force,
- [subset_Id],
- )
- assert exc_info.value.response["Error"]["Code"] == error_code
-
-
-@pytest.mark.parametrize("error_code", [None, "TestException"])
-def test_delete_image_set(make_stubber, error_code):
- medical_imaging_client = boto3.client("medical-imaging")
- medical_imaging_stubber = make_stubber(medical_imaging_client)
- wrapper = MedicalImagingWrapper(medical_imaging_client)
- datastore_id = "abcdedf1234567890abcdef123456789"
- image_set_id = "cccccc1234567890abcdef123456789"
-
- medical_imaging_stubber.stub_delete_image_set(
- datastore_id, image_set_id, error_code=error_code
- )
-
- if error_code is None:
- wrapper.delete_image_set(datastore_id, image_set_id)
-
- else:
- with pytest.raises(ClientError) as exc_info:
- wrapper.delete_image_set(datastore_id, image_set_id)
- assert exc_info.value.response["Error"]["Code"] == error_code
-
-
-@pytest.mark.parametrize("error_code", [None, "TestException"])
-def test_tag_resource(make_stubber, error_code):
- medical_imaging_client = boto3.client("medical-imaging")
- medical_imaging_stubber = make_stubber(medical_imaging_client)
- wrapper = MedicalImagingWrapper(medical_imaging_client)
- resource_arn = (
- "arn:aws:medical-imaging:us-east-1:123456789012:datastore/abcdedf1234567890abcdef123456789/image"
- "-set/cccccc1234567890abcdef123456789 "
- )
- tags = {"test-key": "test-value"}
-
- medical_imaging_stubber.stub_tag_resource(resource_arn, tags, error_code=error_code)
-
- if error_code is None:
- wrapper.tag_resource(resource_arn, tags)
-
- else:
- with pytest.raises(ClientError) as exc_info:
- wrapper.tag_resource(resource_arn, tags)
- assert exc_info.value.response["Error"]["Code"] == error_code
-
-
-@pytest.mark.parametrize("error_code", [None, "TestException"])
-def test_untag_resource(make_stubber, error_code):
- medical_imaging_client = boto3.client("medical-imaging")
- medical_imaging_stubber = make_stubber(medical_imaging_client)
- wrapper = MedicalImagingWrapper(medical_imaging_client)
- resource_arn = (
- "arn:aws:medical-imaging:us-east-1:123456789012:datastore/abcdedf1234567890abcdef123456789/image"
- "-set/cccccc1234567890abcdef123456789 "
- )
- tag_keys = ["test-key"]
-
- medical_imaging_stubber.stub_untag_resource(
- resource_arn, tag_keys, error_code=error_code
- )
-
- if error_code is None:
- wrapper.untag_resource(resource_arn, tag_keys)
-
- else:
- with pytest.raises(ClientError) as exc_info:
- wrapper.untag_resource(resource_arn, tag_keys)
- assert exc_info.value.response["Error"]["Code"] == error_code
-
-
-@pytest.mark.parametrize("error_code", [None, "TestException"])
-def test_list_tags_for_resource(make_stubber, error_code):
- medical_imaging_client = boto3.client("medical-imaging")
- medical_imaging_stubber = make_stubber(medical_imaging_client)
- wrapper = MedicalImagingWrapper(medical_imaging_client)
- resource_arn = (
- "arn:aws:medical-imaging:us-east-1:123456789012:datastore/abcdedf1234567890abcdef123456789/image"
- "-set/cccccc1234567890abcdef123456789 "
- )
-
- medical_imaging_stubber.stub_list_tags_for_resource(
- resource_arn, error_code=error_code
- )
-
- if error_code is None:
- wrapper.list_tags_for_resource(resource_arn)
-
- else:
- with pytest.raises(ClientError) as exc_info:
- wrapper.list_tags_for_resource(resource_arn)
- assert exc_info.value.response["Error"]["Code"] == error_code
diff --git a/python/test_tools/healthlake_stubber.py b/python/test_tools/healthlake_stubber.py
new file mode 100644
index 00000000000..5d85af53f5a
--- /dev/null
+++ b/python/test_tools/healthlake_stubber.py
@@ -0,0 +1,345 @@
+# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+# SPDX-License-Identifier: Apache-2.0
+
+"""
+Stub functions that are used by the AWS HealthLake unit tests.
+
+When tests are run against an actual AWS account, the stubber class does not
+set up stubs and passes all calls through to the Boto 3 client.
+"""
+
+import io
+import json
+from botocore.stub import ANY
+from boto3 import client
+
+from test_tools.example_stubber import ExampleStubber
+
+from datetime import timedelta, timezone, datetime
+
+
+class HealthLakeStubber(ExampleStubber):
+ """
+ A class that implements a variety of stub functions that are used by the
+ AWS HealthLake unit tests.
+
+ The stubbed functions all expect certain parameters to be passed to them as
+ part of the tests, and will raise errors when the actual parameters differ from
+ the expected.
+ """
+
+ def __init__(self, healthlake_client: client, use_stubs=True) -> None:
+ """
+ Initializes the object with a specific client and configures it for
+ stubbing or AWS passthrough.
+
+ :param healthlake_client: A Boto 3 AWS HealthLake client.
+ :param use_stubs: When True, use stubs to intercept requests. Otherwise,
+ pass requests through to AWS.
+ """
+ super().__init__(healthlake_client, use_stubs)
+
+ def stub_create_fhir_datastore(
+ self, data_store_name: str, data_store_id: str, error_code: str = None
+ ) -> None:
+ expected_params = {
+ "DatastoreName": data_store_name,
+ "DatastoreTypeVersion": "R4",
+ }
+
+ response = {
+ "DatastoreId": data_store_id,
+ "DatastoreArn": "datastore_arn",
+ "DatastoreStatus": "CREATING",
+ "DatastoreEndpoint": f"https://healthlake.us-east-1.amazonaws.com/datastore/{data_store_id}/r4/",
+ }
+
+ self._stub_bifurcator(
+ "create_fhir_datastore", expected_params, response, error_code=error_code
+ )
+
+ def stub_describe_fhir_datastore(
+ self, data_store_id, error_code: str = None
+ ) -> None:
+ expected_params = {"DatastoreId": data_store_id}
+
+ response = {
+ "DatastoreProperties": {
+ "DatastoreId": data_store_id,
+ "DatastoreArn": "datastore_arn",
+ "DatastoreStatus": "ACTIVE",
+ "DatastoreEndpoint": f"https://healthlake.us-east-1.amazonaws.com/datastore/{data_store_id}/r4/",
+ "CreatedAt": datetime.now(timezone.utc),
+ "DatastoreName": "datastore_name",
+ "DatastoreTypeVersion": "R4",
+ }
+ }
+
+ self._stub_bifurcator(
+ "describe_fhir_datastore", expected_params, response, error_code=error_code
+ )
+
+ def stub_list_fhir_datastores(self, error_code: str = None) -> None:
+ expected_params = {}
+
+ response = {
+ "DatastorePropertiesList": [
+ {
+ "DatastoreId": "6407b9ae4c2def3cb6f1a46a0Example",
+ "DatastoreArn": "datastore_arn",
+ "DatastoreStatus": "ACTIVE",
+ "DatastoreEndpoint": f"https://healthlake.us-east-1.amazonaws.com/datastore/6407b9ae4c2def3cb6f1a46a0Example/r4/",
+ "CreatedAt": datetime.now(timezone.utc),
+ "DatastoreName": "datastore_name",
+ "DatastoreTypeVersion": "R4",
+ }
+ ]
+ }
+
+ self._stub_bifurcator(
+ "list_fhir_datastores", expected_params, response, error_code=error_code
+ )
+
+ def stub_delete_fhir_datastore(self, data_store_id, error_code: str = None) -> None:
+ expected_params = {"DatastoreId": data_store_id}
+
+ response = {
+ "DatastoreId": data_store_id,
+ "DatastoreArn": "datastore_arn",
+ "DatastoreStatus": "DELETING",
+ "DatastoreEndpoint": f"https://healthlake.us-east-1.amazonaws.com/datastore/{data_store_id}/r4/",
+ }
+
+ self._stub_bifurcator(
+ "delete_fhir_datastore", expected_params, response, error_code=error_code
+ )
+
+ def stub_start_fhir_import_job(
+ self,
+ job_name: str,
+ data_store_id: str,
+ input_s3_uri: str,
+ output_s3_uri: str,
+ kms_key_id: str,
+ data_access_role_arn: str,
+ error_code: str = None,
+ ) -> None:
+ expected_params = {
+ "JobName": job_name,
+ "InputDataConfig": {
+ "S3Uri": input_s3_uri,
+ },
+ "DatastoreId": data_store_id,
+ "JobOutputDataConfig": {
+ "S3Configuration": {"S3Uri": output_s3_uri, "KmsKeyId": kms_key_id}
+ },
+ "DatastoreId": data_store_id,
+ "DataAccessRoleArn": data_access_role_arn,
+ }
+
+ response = {
+ "JobId": "my_import_job",
+ "JobStatus": "SUBMITTED",
+ "DatastoreId": data_store_id,
+ }
+
+ self._stub_bifurcator(
+ "start_fhir_import_job", expected_params, response, error_code=error_code
+ )
+
+ def stub_describe_fhir_import_job(
+ self, datastore_id, job_id, error_code: str = None
+ ):
+ expected_params = {"DatastoreId": datastore_id, "JobId": job_id}
+
+ response = {
+ "ImportJobProperties": {
+ "JobId": job_id,
+ "JobName": "my_import_job",
+ "JobStatus": "COMPLETED",
+ "DatastoreId": datastore_id,
+ "SubmitTime": datetime.now(timezone.utc),
+ "EndTime": datetime.now(timezone.utc),
+ "InputDataConfig": {
+ "S3Uri": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX",
+ },
+ "JobOutputDataConfig": {
+ "S3Configuration": {
+ "S3Uri": "s3://amzn-s3-demo-bucket-827365/import/output/",
+ "KmsKeyId": "kms_key_id",
+ }
+ },
+ "JobProgressReport": {
+ "TotalNumberOfScannedFiles": 123,
+ "TotalSizeOfScannedFilesInMB": 123.0,
+ "TotalNumberOfImportedFiles": 123,
+ "TotalNumberOfResourcesScanned": 123,
+ "TotalNumberOfResourcesImported": 123,
+ "TotalNumberOfResourcesWithCustomerError": 123,
+ "TotalNumberOfFilesReadWithCustomerError": 123,
+ "Throughput": 123.0,
+ },
+ "DataAccessRoleArn": "data_access_role_arn",
+ "Message": "Import job completed successfully",
+ }
+ }
+
+ self._stub_bifurcator(
+ "describe_fhir_import_job", expected_params, response, error_code=error_code
+ )
+
+ def stub_list_fhir_import_jobs(self, data_store_id, error_code: str = None):
+ expected_params = {"DatastoreId": data_store_id}
+
+ response = {
+ "ImportJobPropertiesList": [
+ {
+ "JobId": "my_import_job",
+ "JobName": "my_import_job",
+ "JobStatus": "COMPLETED",
+ "DatastoreId": data_store_id,
+ "SubmitTime": datetime.now(timezone.utc),
+ "EndTime": datetime.now(timezone.utc),
+ "InputDataConfig": {
+ "S3Uri": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX",
+ },
+ "JobOutputDataConfig": {
+ "S3Configuration": {
+ "S3Uri": "s3://amzn-s3-demo-bucket-827365/import/output/",
+ "KmsKeyId": "kms_key_id",
+ }
+ },
+ "JobProgressReport": {
+ "TotalNumberOfScannedFiles": 123,
+ "TotalSizeOfScannedFilesInMB": 123.0,
+ "TotalNumberOfImportedFiles": 123,
+ "TotalNumberOfResourcesScanned": 123,
+ "TotalNumberOfResourcesImported": 123,
+ "TotalNumberOfResourcesWithCustomerError": 123,
+ "TotalNumberOfFilesReadWithCustomerError": 123,
+ "Throughput": 123.0,
+ },
+ "DataAccessRoleArn": "data_access_role_arn",
+ "Message": "Import job completed successfully",
+ }
+ ]
+ }
+
+ self._stub_bifurcator(
+ "list_fhir_import_jobs", expected_params, response, error_code=error_code
+ )
+
+ def stub_start_fhir_export_job(
+ self,
+ job_name: str,
+ data_store_id: str,
+ output_s3_uri: str,
+ kms_key_id: str,
+ data_access_role_arn: str,
+ error_code: str = None,
+ ) -> None:
+ expected_params = {
+ "JobName": job_name,
+ "OutputDataConfig": {
+ "S3Configuration": {"S3Uri": output_s3_uri, "KmsKeyId": kms_key_id}
+ },
+ "DatastoreId": data_store_id,
+ "DataAccessRoleArn": data_access_role_arn,
+ }
+
+ response = {
+ "JobId": "my_export_job",
+ "JobStatus": "SUBMITTED",
+ "DatastoreId": data_store_id,
+ }
+
+ self._stub_bifurcator(
+ "start_fhir_export_job", expected_params, response, error_code=error_code
+ )
+
+ def stub_list_fhir_export_jobs(self, data_store_id, error_code: str = None):
+ expected_params = {"DatastoreId": data_store_id}
+
+ response = {
+ "ExportJobPropertiesList": [
+ {
+ "JobId": "my_export_job",
+ "JobName": "my_export_job",
+ "JobStatus": "COMPLETED",
+ "DatastoreId": data_store_id,
+ "SubmitTime": datetime.now(timezone.utc),
+ "EndTime": datetime.now(timezone.utc),
+ "OutputDataConfig": {
+ "S3Configuration": {
+ "S3Uri": "s3://amzn-s3-demo-bucket-827365/export/output/",
+ "KmsKeyId": "kms_key_id",
+ }
+ },
+ "DataAccessRoleArn": "data_access_role_arn",
+ "Message": "Export job completed successfully",
+ }
+ ]
+ }
+
+ self._stub_bifurcator(
+ "list_fhir_export_jobs", expected_params, response, error_code=error_code
+ )
+
+ def stub_describe_fhir_export_job(
+ self, datastore_id, job_id, error_code: str = None
+ ):
+ expected_params = {"DatastoreId": datastore_id, "JobId": job_id}
+
+ response = {
+ "ExportJobProperties": {
+ "JobId": job_id,
+ "JobName": "my_export_job",
+ "JobStatus": "COMPLETED",
+ "DatastoreId": datastore_id,
+ "SubmitTime": datetime.now(timezone.utc),
+ "EndTime": datetime.now(timezone.utc),
+ "OutputDataConfig": {
+ "S3Configuration": {
+ "S3Uri": "s3://amzn-s3-demo-bucket-827365/export/output/",
+ "KmsKeyId": "kms_key_id",
+ }
+ },
+ "DataAccessRoleArn": "data_access_role_arn",
+ "Message": "Export job completed successfully",
+ }
+ }
+
+ self._stub_bifurcator(
+ "describe_fhir_export_job", expected_params, response, error_code=error_code
+ )
+
+ def stub_tag_resource(self, resource_arn: str, tags: dict[str, str], error_code: str = None) -> None:
+ expected_params = {
+ "ResourceARN": resource_arn,
+ "Tags": tags,
+ }
+
+ response = {}
+
+ self._stub_bifurcator("tag_resource", expected_params, response, error_code=error_code)
+
+ def stub_untag_resource(self, resource_arn: str, tag_keys: list[str], error_code: str = None) -> None:
+ expected_params = {
+ "ResourceARN": resource_arn,
+ "TagKeys": tag_keys,
+ }
+ response = {}
+ self._stub_bifurcator("untag_resource", expected_params, response, error_code=error_code)
+
+ def stub_list_tags_for_resource(self, resource_arn: str, error_code: str = None) -> dict[str, str]:
+ expected_params = {
+ "ResourceARN": resource_arn,
+ }
+
+ response = {
+ "Tags": [{"Key" :"test-key", "Value" : "test-value"}]
+ }
+
+ self._stub_bifurcator(
+ "list_tags_for_resource", expected_params, response, error_code=error_code
+ )
\ No newline at end of file
diff --git a/python/test_tools/stubber_factory.py b/python/test_tools/stubber_factory.py
index e0164656a80..e21b17a232b 100644
--- a/python/test_tools/stubber_factory.py
+++ b/python/test_tools/stubber_factory.py
@@ -33,6 +33,7 @@
from test_tools.glacier_stubber import GlacierStubber
from test_tools.glue_stubber import GlueStubber
from test_tools.iam_stubber import IamStubber
+from test_tools.healthlake_stubber import HealthLakeStubber
from test_tools.keyspaces_stubber import KeyspacesStubber
from test_tools.kinesis_stubber import KinesisStubber
from test_tools.kinesis_analytics_v2_stubber import KinesisAnalyticsV2Stubber
@@ -121,6 +122,8 @@ def stubber_factory(service_name):
return GlueStubber
elif service_name == "iam":
return IamStubber
+ elif service_name == "healthlake":
+ return HealthLakeStubber
elif service_name == "keyspaces":
return KeyspacesStubber
elif service_name == "kinesis":