From b02e5c95cc98589fb9c6c8546f4d498239e3360a Mon Sep 17 00:00:00 2001 From: Steven Meyer <108885656+meyertst-aws@users.noreply.github.com> Date: Fri, 22 Nov 2024 18:30:45 -0500 Subject: [PATCH] Python: HealthLake examples for action (#7099) --- .doc_gen/metadata/healthlake_metadata.yaml | 175 +++++ python/example_code/healthlake/README.md | 95 +++ .../healthlake/health_lake_wrapper.py | 685 ++++++++++++++++++ .../example_code/healthlake/requirements.txt | 3 + .../example_code/healthlake/test/conftest.py | 17 + .../test/test_health_lake_wrapper.py | 289 ++++++++ python/test_tools/healthlake_stubber.py | 345 +++++++++ python/test_tools/stubber_factory.py | 3 + 8 files changed, 1612 insertions(+) create mode 100644 .doc_gen/metadata/healthlake_metadata.yaml create mode 100644 python/example_code/healthlake/README.md create mode 100644 python/example_code/healthlake/health_lake_wrapper.py create mode 100644 python/example_code/healthlake/requirements.txt create mode 100644 python/example_code/healthlake/test/conftest.py create mode 100644 python/example_code/healthlake/test/test_health_lake_wrapper.py create mode 100644 python/test_tools/healthlake_stubber.py diff --git a/.doc_gen/metadata/healthlake_metadata.yaml b/.doc_gen/metadata/healthlake_metadata.yaml new file mode 100644 index 00000000000..850302d60cf --- /dev/null +++ b/.doc_gen/metadata/healthlake_metadata.yaml @@ -0,0 +1,175 @@ +# zexi 0.4.0 +healthlake_CreateFHIRDatastore: + languages: + Python: + versions: + - sdk_version: 3 + github: python/example_code/healthlake + excerpts: + - description: + snippet_tags: + - python.example_code.healthlake.HealthLakeWrapper.decl + - python.example_code.healthlake.CreateFHIRDatastore + - description: > + The following code shows an example of parameters for a SMART on + FHIR-enabled HealthLake data store. + snippet_tags: + - python.example_code.healthlake.CreateFHIRDatastore.smart + services: + healthlake: {CreateFHIRDatastore} +healthlake_DescribeFHIRDatastore: + languages: + Python: + versions: + - sdk_version: 3 + github: python/example_code/healthlake + excerpts: + - description: + snippet_tags: + - python.example_code.healthlake.HealthLakeWrapper.decl + - python.example_code.healthlake.DescribeFHIRDatastore + services: + healthlake: {DescribeFHIRDatastore} +healthlake_ListFHIRDatastores: + languages: + Python: + versions: + - sdk_version: 3 + github: python/example_code/healthlake + excerpts: + - description: + snippet_tags: + - python.example_code.healthlake.HealthLakeWrapper.decl + - python.example_code.healthlake.ListFHIRDatastores + services: + healthlake: {ListFHIRDatastores} +healthlake_DeleteFHIRDatastore: + languages: + Python: + versions: + - sdk_version: 3 + github: python/example_code/healthlake + excerpts: + - description: + snippet_tags: + - python.example_code.healthlake.HealthLakeWrapper.decl + - python.example_code.healthlake.DeleteFHIRDatastore + services: + healthlake: {DeleteFHIRDatastore} +healthlake_StartFHIRImportJob: + languages: + Python: + versions: + - sdk_version: 3 + github: python/example_code/healthlake + excerpts: + - description: + snippet_tags: + - python.example_code.healthlake.HealthLakeWrapper.decl + - python.example_code.healthlake.StartFHIRImportJob + services: + healthlake: {StartFHIRImportJob} +healthlake_DescribeFHIRImportJob: + languages: + Python: + versions: + - sdk_version: 3 + github: python/example_code/healthlake + excerpts: + - description: + snippet_tags: + - python.example_code.healthlake.HealthLakeWrapper.decl + - python.example_code.healthlake.DescribeFHIRImportJob + services: + healthlake: {DescribeFHIRImportJob} +healthlake_ListFHIRDatastoreImportJobs: + languages: + Python: + versions: + - sdk_version: 3 + github: python/example_code/healthlake + excerpts: + - description: + snippet_tags: + - python.example_code.healthlake.HealthLakeWrapper.decl + - python.example_code.healthlake.ListFHIRDatastoreImportJobs + services: + healthlake: {ListFHIRDatastoreImportJobs} +healthlake_StartFHIRExportJob: + languages: + Python: + versions: + - sdk_version: 3 + github: python/example_code/healthlake + excerpts: + - description: + snippet_tags: + - python.example_code.healthlake.HealthLakeWrapper.decl + - python.example_code.healthlake.StartFHIRExportJob + services: + healthlake: {StartFHIRExportJob} +healthlake_DescribeFHIRExportJob: + languages: + Python: + versions: + - sdk_version: 3 + github: python/example_code/healthlake + excerpts: + - description: + snippet_tags: + - python.example_code.healthlake.HealthLakeWrapper.decl + - python.example_code.healthlake.DescribeFHIRExportJob + services: + healthlake: {DescribeFHIRExportJob} +healthlake_ListFHIRExportJobs: + languages: + Python: + versions: + - sdk_version: 3 + github: python/example_code/healthlake + excerpts: + - description: + snippet_tags: + - python.example_code.healthlake.HealthLakeWrapper.decl + - python.example_code.healthlake.ListFHIRExportJobs + services: + healthlake: {ListFHIRExportJobs} +healthlake_TagResource: + languages: + Python: + versions: + - sdk_version: 3 + github: python/example_code/healthlake + excerpts: + - description: + snippet_tags: + - python.example_code.healthlake.HealthLakeWrapper.decl + - python.example_code.healthlake.TagResource + services: + healthlake: {TagResource} +healthlake_ListTagsForResource: + languages: + Python: + versions: + - sdk_version: 3 + github: python/example_code/healthlake + excerpts: + - description: + snippet_tags: + - python.example_code.healthlake.HealthLakeWrapper.decl + - python.example_code.healthlake.ListTagsForResource + services: + healthlake: {ListTagsForResource} +healthlake_UntagResource: + languages: + Python: + versions: + - sdk_version: 3 + github: python/example_code/healthlake + excerpts: + - description: + snippet_tags: + - python.example_code.healthlake.HealthLakeWrapper.decl + - python.example_code.healthlake.UntagResource + services: + healthlake: {UntagResource} diff --git a/python/example_code/healthlake/README.md b/python/example_code/healthlake/README.md new file mode 100644 index 00000000000..6cc8b900788 --- /dev/null +++ b/python/example_code/healthlake/README.md @@ -0,0 +1,95 @@ +# HealthLake code examples for the SDK for Python + +## Overview + +Shows how to use the AWS SDK for Python (Boto3) to work with AWS HealthLake. + + + + +_HealthLake _ + +## ⚠ Important + +* Running this code might result in charges to your AWS account. For more details, see [AWS Pricing](https://aws.amazon.com/pricing/) and [Free Tier](https://aws.amazon.com/free/). +* Running the tests might result in charges to your AWS account. +* We recommend that you grant your code least privilege. At most, grant only the minimum permissions required to perform the task. For more information, see [Grant least privilege](https://docs.aws.amazon.com/IAM/latest/UserGuide/best-practices.html#grant-least-privilege). +* This code is not tested in every AWS Region. For more information, see [AWS Regional Services](https://aws.amazon.com/about-aws/global-infrastructure/regional-product-services). + + + + +## Code examples + +### Prerequisites + +For prerequisites, see the [README](../../README.md#Prerequisites) in the `python` folder. + +Install the packages required by these examples by running the following in a virtual environment: + +``` +python -m pip install -r requirements.txt +``` + + + + +### Single actions + +Code excerpts that show you how to call individual service functions. + +- [CreateFHIRDatastore](health_lake_wrapper.py#L42) +- [DeleteFHIRDatastore](health_lake_wrapper.py#L136) +- [DescribeFHIRDatastore](health_lake_wrapper.py#L84) +- [DescribeFHIRExportJob](health_lake_wrapper.py#L310) +- [DescribeFHIRImportJob](health_lake_wrapper.py#L197) +- [ListFHIRDatastoreImportJobs](health_lake_wrapper.py#L222) +- [ListFHIRDatastores](health_lake_wrapper.py#L106) +- [ListFHIRExportJobs](health_lake_wrapper.py#L335) +- [ListTagsForResource](health_lake_wrapper.py#L404) +- [StartFHIRExportJob](health_lake_wrapper.py#L272) +- [StartFHIRImportJob](health_lake_wrapper.py#L154) +- [TagResource](health_lake_wrapper.py#L385) +- [UntagResource](health_lake_wrapper.py#L426) + + + + + +## Run the examples + +### Instructions + + + + + + + +### Tests + +⚠ Running tests might result in charges to your AWS account. + + +To find instructions for running these tests, see the [README](../../README.md#Tests) +in the `python` folder. + + + + + + +## Additional resources + +- [HealthLake Developer Guide](https://docs.aws.amazon.com/healthlake/latest/devguide/what-is-amazon-health-lake.html) +- [HealthLake API Reference](https://docs.aws.amazon.com/healthlake/latest/APIReference/Welcome.html) +- [SDK for Python HealthLake reference](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/medical-imaging.html) + + + + +--- + +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 \ No newline at end of file diff --git a/python/example_code/healthlake/health_lake_wrapper.py b/python/example_code/healthlake/health_lake_wrapper.py new file mode 100644 index 00000000000..84231ddb80b --- /dev/null +++ b/python/example_code/healthlake/health_lake_wrapper.py @@ -0,0 +1,685 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +""" +Purpose + +Shows how to use the AWS SDK for Python (Boto3) to manage and invoke AWS HealthLake +functions. +""" +from datetime import datetime, timedelta +from importlib.metadata import metadata + +from boto3 import client +import logging +import json + +import boto3 +from botocore.exceptions import ClientError +import time + +logger = logging.getLogger(__name__) + + +# snippet-start:[python.example_code.healthlake.HealthLakeWrapper] +class HealthLakeWrapper: + def __init__(self, health_lake_client: client): + self.health_lake_client = health_lake_client + + # snippet-start:[python.example_code.healthlake.HealthLakeWrapper.decl] + @classmethod + def from_client(cls) -> "HealthLakeWrapper": + """ + Creates a HealthLakeWrapper instance with a default AWS HealthLake client. + + :return: An instance of HealthLakeWrapper initialized with the default HealthLake client. + """ + health_lake_client = boto3.client("healthlake") + return cls(health_lake_client) + + # snippet-end:[python.example_code.healthlake.HealthLakeWrapper.decl] + + # snippet-start:[python.example_code.healthlake.CreateFHIRDatastore] + def create_fhir_datastore( + self, + datastore_name: str, + sse_configuration: dict[str, any] = None, + identity_provider_configuration: dict[str, any] = None, + ) -> dict[str, str]: + """ + Creates a new HealthLake datastore. + When creating a SMART on FHIR datastore, the following parameters are required: + - sse_configuration: The server-side encryption configuration for a SMART on FHIR-enabled data store. + - identity_provider_configuration: The identity provider configuration for a SMART on FHIR-enabled data store. + + :param datastore_name: The name of the data store. + :param sse_configuration: The server-side encryption configuration for a SMART on FHIR-enabled data store. + :param identity_provider_configuration: The identity provider configuration for a SMART on FHIR-enabled data store. + :return: A dictionary containing the data store information. + """ + try: + parameters = {"DatastoreName": datastore_name, "DatastoreTypeVersion": "R4"} + if ( + sse_configuration is not None + and identity_provider_configuration is not None + ): + # Creating a SMART on FHIR-enabled data store + parameters["SseConfiguration"] = sse_configuration + parameters[ + "IdentityProviderConfiguration" + ] = identity_provider_configuration + + response = self.health_lake_client.create_fhir_datastore(**parameters) + return response + except ClientError as err: + logger.exception( + "Couldn't create datastore %s. Here's why %s", + datastore_name, + err.response["Error"]["Message"], + ) + raise + + # snippet-end:[python.example_code.healthlake.CreateFHIRDatastore] + + # snippet-start:[python.example_code.healthlake.DescribeFHIRDatastore] + def describe_fhir_datastore(self, datastore_id: str) -> dict[str, any]: + """ + Describes a HealthLake datastore. + :param datastore_id: The datastore ID. + :return: The datastore description. + """ + try: + response = self.health_lake_client.describe_fhir_datastore( + DatastoreId=datastore_id + ) + return response["DatastoreProperties"] + except ClientError as err: + logger.exception( + "Couldn't describe datastore with ID %s. Here's why %s", + datastore_id, + err.response["Error"]["Message"], + ) + raise + + # snippet-end:[python.example_code.healthlake.DescribeFHIRDatastore] + + # snippet-start:[python.example_code.healthlake.ListFHIRDatastores] + def list_fhir_datastores(self) -> list[dict[str, any]]: + """ + Lists all HealthLake datastores. + :return: A list of datastore descriptions. + """ + try: + next_token = None + datastores = [] + + # Loop through paginated results. + while True: + parameters = {} + if next_token is not None: + parameters["NextToken"] = next_token + response = self.health_lake_client.list_fhir_datastores(**parameters) + datastores.extend(response["DatastorePropertiesList"]) + if "NextToken" in response: + next_token = response["NextToken"] + else: + break + + return datastores + except ClientError as err: + logger.exception( + "Couldn't list datastores. Here's why %s", err.response["Error"]["Message"] + ) + raise + # snippet-end:[python.example_code.healthlake.ListFHIRDatastores] + + # snippet-start:[python.example_code.healthlake.DeleteFHIRDatastore] + def delete_fhir_datastore(self, datastore_id: str) -> None: + """ + Deletes a HealthLake datastore. + :param datastore_id: The datastore ID. + """ + try: + self.health_lake_client.delete_fhir_datastore(DatastoreId=datastore_id) + except ClientError as err: + logger.exception( + "Couldn't delete datastore with ID %s. Here's why %s", + datastore_id, + err.response["Error"]["Message"], + ) + raise + + # snippet-end:[python.example_code.healthlake.DeleteFHIRDatastore] + + # snippet-start:[python.example_code.healthlake.StartFHIRImportJob] + def start_fhir_import_job( + self, + job_name: str, + datastore_id: str, + input_s3_uri: str, + job_output_s3_uri: str, + kms_key_id: str, + data_access_role_arn: str, + ) -> dict[str, str]: + """ + Starts a HealthLake import job. + :param job_name: The import job name. + :param datastore_id: The datastore ID. + :param input_s3_uri: The input S3 URI. + :param job_output_s3_uri: The job output S3 URI. + :param kms_key_id: The KMS key ID associated with the output S3 bucket. + :param data_access_role_arn: The data access role ARN. + :return: The import job. + """ + try: + response = self.health_lake_client.start_fhir_import_job( + JobName=job_name, + InputDataConfig={"S3Uri": input_s3_uri}, + JobOutputDataConfig={ + "S3Configuration": { + "S3Uri": job_output_s3_uri, + "KmsKeyId": kms_key_id, + } + }, + DataAccessRoleArn=data_access_role_arn, + DatastoreId=datastore_id, + ) + return response + except ClientError as err: + logger.exception( + "Couldn't start import job. Here's why %s", + err.response["Error"]["Message"], + ) + raise + + # snippet-end:[python.example_code.healthlake.StartFHIRImportJob] + + # snippet-start:[python.example_code.healthlake.DescribeFHIRImportJob] + def describe_fhir_import_job( + self, datastore_id: str, job_id: str + ) -> dict[str, any]: + """ + Describes a HealthLake import job. + :param datastore_id: The datastore ID. + :param job_id: The import job ID. + :return: The import job description. + """ + try: + response = self.health_lake_client.describe_fhir_import_job( + DatastoreId=datastore_id, JobId=job_id + ) + return response["ImportJobProperties"] + except ClientError as err: + logger.exception( + "Couldn't describe import job with ID %s. Here's why %s", + job_id, + err.response["Error"]["Message"], + ) + raise + + # snippet-end:[python.example_code.healthlake.DescribeFHIRImportJob] + + # snippet-start:[python.example_code.healthlake.ListFHIRDatastoreImportJobs] + def list_fhir_import_jobs( + self, + datastore_id: str, + job_name: str = None, + job_status: str = None, + submitted_before: datetime = None, + submitted_after: datetime = None, + ) -> list[dict[str, any]]: + """ + Lists HealthLake import jobs satisfying the conditions. + :param datastore_id: The datastore ID. + :param job_name: The import job name. + :param job_status: The import job status. + :param submitted_before: The import job submitted before the specified date. + :param submitted_after: The import job submitted after the specified date. + :return: A list of import jobs. + """ + try: + parameters = {"DatastoreId": datastore_id} + if job_name is not None: + parameters["JobName"] = job_name + if job_status is not None: + parameters["JobStatus"] = job_status + if submitted_before is not None: + parameters["SubmittedBefore"] = submitted_before + if submitted_after is not None: + parameters["SubmittedAfter"] = submitted_after + next_token = None + jobs = [] + # Loop through paginated results. + while True: + if next_token is not None: + parameters["NextToken"] = next_token + response = self.health_lake_client.list_fhir_import_jobs(**parameters) + jobs.extend(response["ImportJobPropertiesList"]) + if "NextToken" in response: + next_token = response["NextToken"] + else: + break + return jobs + except ClientError as err: + logger.exception( + "Couldn't list import jobs. Here's why %s", + err.response["Error"]["Message"], + ) + raise + + # snippet-end:[python.example_code.healthlake.ListFHIRDatastoreImportJobs] + + # snippet-start:[python.example_code.healthlake.StartFHIRExportJob] + def start_fhir_export_job( + self, + job_name: str, + datastore_id: str, + output_s3_uri: str, + kms_key_id: str, + data_access_role_arn: str, + ) -> dict[str, str]: + """ + Starts a HealthLake export job. + :param job_name: The export job name. + :param datastore_id: The datastore ID. + :param output_s3_uri: The output S3 URI. + :param kms_key_id: The KMS key ID associated with the output S3 bucket. + :param data_access_role_arn: The data access role ARN. + :return: The export job. + """ + try: + response = self.health_lake_client.start_fhir_export_job( + OutputDataConfig={ + "S3Configuration": {"S3Uri": output_s3_uri, "KmsKeyId": kms_key_id} + }, + DataAccessRoleArn=data_access_role_arn, + DatastoreId=datastore_id, + JobName=job_name, + ) + + return response + except ClientError as err: + logger.exception( + "Couldn't start export job. Here's why %s", + err.response["Error"]["Message"], + ) + raise + + # snippet-end:[python.example_code.healthlake.StartFHIRExportJob] + + # snippet-start:[python.example_code.healthlake.DescribeFHIRExportJob] + def describe_fhir_export_job( + self, datastore_id: str, job_id: str + ) -> dict[str, any]: + """ + Describes a HealthLake export job. + :param datastore_id: The datastore ID. + :param job_id: The export job ID. + :return: The export job description. + """ + try: + response = self.health_lake_client.describe_fhir_export_job( + DatastoreId=datastore_id, JobId=job_id + ) + return response["ExportJobProperties"] + except ClientError as err: + logger.exception( + "Couldn't describe export job with ID %s. Here's why %s", + job_id, + err.response["Error"]["Message"], + ) + raise + + # snippet-end:[python.example_code.healthlake.DescribeFHIRExportJob] + + # snippet-start:[python.example_code.healthlake.ListFHIRExportJobs] + def list_fhir_export_jobs( + self, + datastore_id: str, + job_name: str = None, + job_status: str = None, + submitted_before: datetime = None, + submitted_after: datetime = None, + ) -> list[dict[str, any]]: + """ + Lists HealthLake export jobs satisfying the conditions. + :param datastore_id: The datastore ID. + :param job_name: The export job name. + :param job_status: The export job status. + :param submitted_before: The export job submitted before the specified date. + :param submitted_after: The export job submitted after the specified date. + :return: A list of export jobs. + """ + try: + parameters = {"DatastoreId": datastore_id} + if job_name is not None: + parameters["JobName"] = job_name + if job_status is not None: + parameters["JobStatus"] = job_status + if submitted_before is not None: + parameters["SubmittedBefore"] = submitted_before + if submitted_after is not None: + parameters["SubmittedAfter"] = submitted_after + next_token = None + jobs = [] + # Loop through paginated results. + while True: + if next_token is not None: + parameters["NextToken"] = next_token + response = self.health_lake_client.list_fhir_export_jobs(**parameters) + jobs.extend(response["ExportJobPropertiesList"]) + if "NextToken" in response: + next_token = response["NextToken"] + else: + break + return jobs + except ClientError as err: + logger.exception( + "Couldn't list export jobs. Here's why %s", + err.response["Error"]["Message"], + ) + raise + + # snippet-end:[python.example_code.healthlake.ListFHIRExportJobs] + + # snippet-start:[python.example_code.healthlake.TagResource] + def tag_resource(self, resource_arn: str, tags: list[dict[str, str]]) -> None: + """ + Tags a HealthLake resource. + :param resource_arn: The resource ARN. + :param tags: The tags to add to the resource. + """ + try: + self.health_lake_client.tag_resource(ResourceARN=resource_arn, Tags=tags) + except ClientError as err: + logger.exception( + "Couldn't tag resource %s. Here's why %s", + resource_arn, + err.response["Error"]["Message"], + ) + raise + + # snippet-end:[python.example_code.healthlake.TagResource] + + # snippet-start:[python.example_code.healthlake.ListTagsForResource] + def list_tags_for_resource(self, resource_arn: str) -> dict[str, str]: + """ + Lists the tags for a HealthLake resource. + :param resource_arn: The resource ARN. + :return: The tags for the resource. + """ + try: + response = self.health_lake_client.list_tags_for_resource( + ResourceARN=resource_arn + ) + return response["Tags"] + except ClientError as err: + logger.exception( + "Couldn't list tags for resource %s. Here's why %s", + resource_arn, + err.response["Error"]["Message"], + ) + raise + + # snippet-end:[python.example_code.healthlake.ListTagsForResource] + + # snippet-start:[python.example_code.healthlake.UntagResource] + def untag_resource(self, resource_arn: str, tag_keys: list[str]) -> None: + """ + Untags a HealthLake resource. + :param resource_arn: The resource ARN. + :param tag_keys: The tag keys to remove from the resource. + """ + try: + self.health_lake_client.untag_resource( + ResourceARN=resource_arn, TagKeys=tag_keys + ) + except ClientError as err: + logger.exception( + "Couldn't untag resource %s. Here's why %s", + resource_arn, + err.response["Error"]["Message"], + ) + raise + + # snippet-end:[python.example_code.healthlake.UntagResource] + + # snippet-end:[python.example_code.healthlake.HealthLakeWrapper] + + def wait_datastore_active(self, datastore_id: str) -> None: + """ + Waits for a HealthLake datastore to become active. + :param datastore_id: The datastore ID. + """ + counter = 0 + max_count_minutes = 40 # It can take a while to create a datastore, so we'll wait up to 40 minutes. + status = "CREATING" + while counter < max_count_minutes: + datastore = self.health_lake_client.describe_fhir_datastore( + DatastoreId=datastore_id + ) + status = datastore["DatastoreProperties"]["DatastoreStatus"] + if status == "ACTIVE" or status == "CREATE_FAILED": + break + else: + print(f"data store {status}, minutes {counter}") + counter += 1 + time.sleep(60) + + if status == "ACTIVE": + print( + f"Datastore with ID {datastore_id} is active after {counter} minutes." + ) + elif status == "CREATE_FAILED": + raise ClientError( + "Create datastore with ID %s failed after %d minutes.", + datastore_id, + counter, + ) + else: + raise ClientError( + "Datastore with ID %s is not active after %d minutes.", + datastore_id, + counter, + ) + + def wait_import_job_complete(self, datastore_id: str, job_id: str) -> None: + """ + Waits for a HealthLake import job to complete. + :param datastore_id: The datastore ID. + :param job_id: The import job ID. + """ + counter = 0 + max_count_minutes = ( + 20 + ) + status = "IN_PROGRESS" + while counter < max_count_minutes: + job = self.describe_fhir_import_job(datastore_id, job_id) + status = job["JobStatus"] + if status == "COMPLETED" or status == "COMPLETED_WITH_ERRORS": + break + else: + print(f"Import job {status}, minutes {counter}") + counter += 1 + time.sleep(60) + + if status == "COMPLETED": + print(f"Import job with ID {job_id} is completed after {counter} minutes.") + elif status == "COMPLETED_WITH_ERRORS": + print( + f"Import job with ID {job_id} is completed with errors after {counter} minutes." + ) + else: + raise ClientError( + "Import job with ID %s is not completed after %d minutes.", + job_id, + counter, + ) + + def wait_export_job_complete(self, datastore_id: str, job_id: str) -> None: + """ + Waits for a HealthLake export job to complete. + :param datastore_id: The datastore ID. + :param job_id: The export job ID. + """ + counter = 0 + max_count_minutes = ( + 20 + ) + status = "IN_PROGRESS" + while counter < max_count_minutes: + job = self.describe_fhir_export_job(datastore_id, job_id) + status = job["JobStatus"] + if status == "COMPLETED" or status == "COMPLETED_WITH_ERRORS": + break + else: + print(f"Export job {status}, minutes {counter}") + counter += 1 + time.sleep(60) + if status == "COMPLETED": + print(f"Export job with ID {job_id} is completed after {counter} minutes.") + elif status == "COMPLETED_WITH_ERRORS": + print( + f"Export job with ID {job_id} is completed with errors after {counter} minutes." + ) + else: + raise ClientError( + "Job with ID %s is not completed after %d minutes.", job_id, counter + ) + + def health_lake_demo(self) -> None: + use_smart_on_fhir_data_store = True + + datastore_name = "health_imaging_datastore2" + if use_smart_on_fhir_data_store: + # snippet-start:[python.example_code.healthlake.CreateFHIRDatastore.smart] + sse_configuration = { + "KmsEncryptionConfig": {"CmkType": "AWS_OWNED_KMS_KEY"} + } + # TODO: Update the metadata to match your environment. + metadata = { + "issuer": "https://ehr.example.com", + "jwks_uri": "https://ehr.example.com/.well-known/jwks.json", + "authorization_endpoint": "https://ehr.example.com/auth/authorize", + "token_endpoint": "https://ehr.token.com/auth/token", + "token_endpoint_auth_methods_supported": [ + "client_secret_basic", + "foo", + ], + "grant_types_supported": ["client_credential", "foo"], + "registration_endpoint": "https://ehr.example.com/auth/register", + "scopes_supported": ["openId", "profile", "launch"], + "response_types_supported": ["code"], + "management_endpoint": "https://ehr.example.com/user/manage", + "introspection_endpoint": "https://ehr.example.com/user/introspect", + "revocation_endpoint": "https://ehr.example.com/user/revoke", + "code_challenge_methods_supported": ["S256"], + "capabilities": [ + "launch-ehr", + "sso-openid-connect", + "client-public", + ], + } + # TODO: Update the IdpLambdaArn. + identity_provider_configuration = { + "AuthorizationStrategy": "SMART_ON_FHIR_V1", + "FineGrainedAuthorizationEnabled": True, + "IdpLambdaArn": "arn:aws:lambda:your-region:your-account-id:function:your-lambda-name", + "Metadata": json.dumps(metadata), + } + data_store = self.create_fhir_datastore( + datastore_name, sse_configuration, identity_provider_configuration + ) + # snippet-end:[python.example_code.healthlake.CreateFHIRDatastore.smart] + else: + data_store = self.create_fhir_datastore(datastore_name) + + data_store_id = data_store["DatastoreId"] + data_store_arn = data_store["DatastoreArn"] + + self.wait_datastore_active(data_store_id) + data_stores = self.list_fhir_datastores() + + print(f"{len(data_stores)} data store(s) found.") + for data_store in data_stores: + if data_store["DatastoreId"] == data_store_id: + logger.info( + "Datastore with ID %s is %s.", + data_store_id, + data_store["DatastoreStatus"], + ) + break + tags = [{"Key": "TagKey", "Value": "TagValue"}] + + self.tag_resource(data_store_arn, tags) + + tags = self.list_tags_for_resource(data_store_arn) + print(f"{len(tags)} tag(s) found.") + for tag in tags: + print(f"Tag key: {tag['Key']}, value: {tag['Value']}") + + keys = [] + for tag in tags: + keys.append(tag["Key"]) + + self.untag_resource(data_store_arn, keys) + + job_name = "my_import_job" + input_s3_uri = ( + "s3://health-lake-test-827365/import/examples/patient_example_chalmers.json" + ) + output_s3_uri = "s3://health-lake-test-827365/import/output/" + kms_key_id = "arn:aws:kms:us-east-1:123502194722:key/b7f645cb-e564-4981-8672-9e012d1ff1a0" + data_access_role_arn = ( + "arn:aws:iam::123502194722:role/healthlaketest37-ahl-full-access" + ) + import_job = self.start_fhir_import_job( + job_name, + data_store_id, + input_s3_uri, + output_s3_uri, + kms_key_id, + data_access_role_arn, + ) + + import_job_id = import_job["JobId"] + print(f"Started import job with ID: {import_job_id}") + + self.wait_import_job_complete(data_store_id, import_job_id) + + import_jobs = self.list_fhir_import_jobs( + data_store_id, submitted_after=datetime.now() - timedelta(days=1) + ) + print(f"{len(import_jobs)} import job(s) found.") + for import_job in import_jobs: + print( + f"Job id: {import_job['JobId']}, status: {import_job['JobStatus']}, submit time: {import_job['SubmitTime']}" + ) + + job_name = "my_export_job" + output_s3_uri = "s3://health-lake-test-827365/export/output/" + export_job = self.start_fhir_export_job( + job_name, data_store_id, output_s3_uri, kms_key_id, data_access_role_arn + ) + + export_job_id = export_job["JobId"] + print(f"Started export job with ID: {export_job_id}") + self.wait_export_job_complete(data_store_id, export_job_id) + + export_jobs = self.list_fhir_export_jobs( + data_store_id, submitted_after=datetime.now() - timedelta(days=1) + ) + print(f"{len(export_jobs)} export job(s) found.") + for export_job in export_jobs: + print( + f"Job id: {export_job['JobId']}, status: {export_job['JobStatus']}, submit time: {export_job['SubmitTime']}" + ) + + self.delete_fhir_datastore(data_store_id) + print(f"Data store with ID {data_store_id} deleted.") + + +if __name__ == "__main__": + health_lake_wrapper = HealthLakeWrapper.from_client() + health_lake_wrapper.health_lake_demo() diff --git a/python/example_code/healthlake/requirements.txt b/python/example_code/healthlake/requirements.txt new file mode 100644 index 00000000000..624c2cdf438 --- /dev/null +++ b/python/example_code/healthlake/requirements.txt @@ -0,0 +1,3 @@ +boto3>=1.34.149 +pytest>=7.2.1 +botocore>=1.34.149 \ No newline at end of file diff --git a/python/example_code/healthlake/test/conftest.py b/python/example_code/healthlake/test/conftest.py new file mode 100644 index 00000000000..0faa3f8991e --- /dev/null +++ b/python/example_code/healthlake/test/conftest.py @@ -0,0 +1,17 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +""" +Contains common test fixtures used to run AWS HealthImaging +tests. +""" + +import sys +import os + +script_dir = os.path.dirname(os.path.abspath(__file__)) + +# This is needed so Python can find test_tools on the path. +sys.path.append(os.path.join(script_dir, "../../..")) + +from test_tools.fixtures.common import * \ No newline at end of file diff --git a/python/example_code/healthlake/test/test_health_lake_wrapper.py b/python/example_code/healthlake/test/test_health_lake_wrapper.py new file mode 100644 index 00000000000..8c2f4dbb097 --- /dev/null +++ b/python/example_code/healthlake/test/test_health_lake_wrapper.py @@ -0,0 +1,289 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +""" +Unit tests for health_lake_wrapper functions. +""" + +import os +import sys + +import boto3 +import pytest +from botocore.exceptions import ClientError + +script_dir = os.path.dirname(os.path.abspath(__file__)) + +# Append parent directory to import health_lake_wrapper. +sys.path.append(os.path.join(script_dir, "..")) +from health_lake_wrapper import HealthLakeWrapper + + +@pytest.mark.parametrize("error_code", [None, "TestException"]) +def test_create_fhir_datastore(make_stubber, error_code): + healthlake_client = boto3.client("healthlake") + healthlake_stubber = make_stubber(healthlake_client) + wrapper = HealthLakeWrapper(healthlake_client) + datastore_name = "test-datastore" + datastore_id = "abcdedf1234567890abcdef123456789" + + healthlake_stubber.stub_create_fhir_datastore( + datastore_name, datastore_id, error_code=error_code + ) + + if error_code is None: + response = wrapper.create_fhir_datastore(datastore_name) + assert response["DatastoreId"] == datastore_id + else: + with pytest.raises(ClientError) as exc_info: + wrapper.create_fhir_datastore(datastore_name) + assert exc_info.value.response["Error"]["Code"] == error_code + + +@pytest.mark.parametrize("error_code", [None, "TestException"]) +def test_describe_fhir_datastore(make_stubber, error_code): + healthlake_client = boto3.client("healthlake") + healthlake_stubber = make_stubber(healthlake_client) + wrapper = HealthLakeWrapper(healthlake_client) + datastore_id = "abcdedf1234567890abcdef123456789" + + healthlake_stubber.stub_describe_fhir_datastore(datastore_id, error_code=error_code) + + if error_code is None: + response = wrapper.describe_fhir_datastore(datastore_id) + assert response["DatastoreId"] == datastore_id + else: + with pytest.raises(ClientError) as exc_info: + wrapper.describe_fhir_datastore(datastore_id) + assert exc_info.value.response["Error"]["Code"] == error_code + + +@pytest.mark.parametrize("error_code", [None, "TestException"]) +def test_list_fhir_datastores(make_stubber, error_code): + healthlake_client = boto3.client("healthlake") + healthlake_stubber = make_stubber(healthlake_client) + wrapper = HealthLakeWrapper(healthlake_client) + + healthlake_stubber.stub_list_fhir_datastores(error_code=error_code) + + if error_code is None: + response = wrapper.list_fhir_datastores() + assert len(response) == 1 + else: + with pytest.raises(ClientError) as exc_info: + wrapper.list_fhir_datastores() + assert exc_info.value.response["Error"]["Code"] == error_code + + +@pytest.mark.parametrize("error_code", [None, "TestException"]) +def test_delete_fhir_datastore(make_stubber, error_code): + healthlake_client = boto3.client("healthlake") + healthlake_stubber = make_stubber(healthlake_client) + wrapper = HealthLakeWrapper(healthlake_client) + datastore_id = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" + + healthlake_stubber.stub_delete_fhir_datastore(datastore_id, error_code=error_code) + + if error_code is None: + wrapper.delete_fhir_datastore(datastore_id) + else: + with pytest.raises(ClientError) as exc_info: + wrapper.delete_fhir_datastore(datastore_id) + assert exc_info.value.response["Error"]["Code"] == error_code + + +@pytest.mark.parametrize("error_code", [None, "TestException"]) +def test_start_fhir_import_job(make_stubber, error_code): + healthlake_client = boto3.client("healthlake") + healthlake_stubber = make_stubber(healthlake_client) + wrapper = HealthLakeWrapper(healthlake_client) + job_name = "test-job" + datastore_id = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" + input_s3_uri = "s3://amzn-s3-demo-bucket/test-data" + job_output_s3_uri = "s3://amzn-s3-demo-bucket/test-output" + kms_key_id = "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX" + data_access_role_arn = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" + + healthlake_stubber.stub_start_fhir_import_job( + job_name, + datastore_id, + input_s3_uri, + job_output_s3_uri, + kms_key_id, + data_access_role_arn, + error_code=error_code, + ) + + if error_code is None: + wrapper.start_fhir_import_job( + job_name, + datastore_id, + input_s3_uri, + job_output_s3_uri, + kms_key_id, + data_access_role_arn, + ) + else: + with pytest.raises(ClientError) as exc_info: + wrapper.start_fhir_import_job( + job_name, + datastore_id, + input_s3_uri, + job_output_s3_uri, + kms_key_id, + data_access_role_arn, + ) + assert exc_info.value.response["Error"]["Code"] == error_code + +@pytest.mark.parametrize("error_code", [None, "TestException"]) +def test_describe_fhir_import_job(make_stubber, error_code): + healthlake_client = boto3.client("healthlake") + healthlake_stubber = make_stubber(healthlake_client) + wrapper = HealthLakeWrapper(healthlake_client) + datastore_id = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" + job_id = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" + + healthlake_stubber.stub_describe_fhir_import_job( + datastore_id, job_id, error_code=error_code + ) + + if error_code is None: + wrapper.describe_fhir_import_job(datastore_id, job_id) + else: + with pytest.raises(ClientError) as exc_info: + wrapper.describe_fhir_import_job(datastore_id, job_id) + assert exc_info.value.response["Error"]["Code"] == error_code + +@pytest.mark.parametrize("error_code", [None, "TestException"]) +def test_list_fhir_import_jobs(make_stubber, error_code): + healthlake_client = boto3.client("healthlake") + healthlake_stubber = make_stubber(healthlake_client) + wrapper = HealthLakeWrapper(healthlake_client) + datastore_id = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" + + healthlake_stubber.stub_list_fhir_import_jobs(datastore_id, error_code=error_code) + + if error_code is None: + wrapper.list_fhir_import_jobs(datastore_id) + else: + with pytest.raises(ClientError) as exc_info: + wrapper.list_fhir_import_jobs(datastore_id) + assert exc_info.value.response["Error"]["Code"] == error_code + +@pytest.mark.parametrize("error_code", [None, "TestException"]) +def test_start_fhir_export_job(make_stubber, error_code): + healthlake_client = boto3.client("healthlake") + healthlake_stubber = make_stubber(healthlake_client) + wrapper = HealthLakeWrapper(healthlake_client) + job_name = "test-job" + datastore_id = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" + output_s3_uri = "s3://amzn-s3-demo-bucket/test-output" + data_access_role_arn = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" + kms_key_id = "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX" + + healthlake_stubber.stub_start_fhir_export_job( + job_name, + datastore_id, + output_s3_uri, + kms_key_id, + data_access_role_arn, + error_code=error_code, + ) + + if error_code is None: + wrapper.start_fhir_export_job( + job_name, + datastore_id, + output_s3_uri, + kms_key_id, + data_access_role_arn, + ) + else: + with pytest.raises(ClientError) as exc_info: + wrapper.start_fhir_export_job( + job_name, + datastore_id, + output_s3_uri, + kms_key_id, + data_access_role_arn, + ) + assert exc_info.value.response["Error"]["Code"] == error_code + +@pytest.mark.parametrize("error_code", [None, "TestException"]) +def test_list_fhir_export_jobs(make_stubber, error_code): + healthlake_client = boto3.client("healthlake") + healthlake_stubber = make_stubber(healthlake_client) + wrapper = HealthLakeWrapper(healthlake_client) + datastore_id = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" + + healthlake_stubber.stub_list_fhir_export_jobs(datastore_id, error_code=error_code) + + if error_code is None: + wrapper.list_fhir_export_jobs(datastore_id) + else: + with pytest.raises(ClientError) as exc_info: + wrapper.list_fhir_export_jobs(datastore_id) + assert exc_info.value.response["Error"]["Code"] == error_code + +@pytest.mark.parametrize("error_code", [None, "TestException"]) +def test_describe_fhir_export_job(make_stubber, error_code): + healthlake_client = boto3.client("healthlake") + healthlake_stubber = make_stubber(healthlake_client) + wrapper = HealthLakeWrapper(healthlake_client) + datastore_id = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" + job_id = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" + + healthlake_stubber.stub_describe_fhir_export_job( + datastore_id, job_id, error_code=error_code + ) + + if error_code is None: + wrapper.describe_fhir_export_job(datastore_id, job_id) + else: + with pytest.raises(ClientError) as exc_info: + wrapper.describe_fhir_export_job(datastore_id, job_id) + assert exc_info.value.response["Error"]["Code"] == error_code + +@pytest.mark.parametrize("error_code", [None, "TestException"]) +def test_tag_resource(make_stubber, error_code): + healthlake_client = boto3.client("healthlake") + healthlake_stubber = make_stubber(healthlake_client) + wrapper = HealthLakeWrapper(healthlake_client) + resource_arn = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" + tags = [{"Key" :"test-key", "Value" : "test-value"}] + healthlake_stubber.stub_tag_resource(resource_arn, tags, error_code=error_code) + if error_code is None: + wrapper.tag_resource(resource_arn, tags) + else: + with pytest.raises(ClientError) as exc_info: + wrapper.tag_resource(resource_arn, tags) + assert exc_info.value.response["Error"]["Code"] == error_code + +@pytest.mark.parametrize("error_code", [None, "TestException"]) +def test_untag_resource(make_stubber, error_code): + healthlake_client = boto3.client("healthlake") + healthlake_stubber = make_stubber(healthlake_client) + wrapper = HealthLakeWrapper(healthlake_client) + resource_arn = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" + tag_keys = ["test-key"] + healthlake_stubber.stub_untag_resource(resource_arn, tag_keys, error_code=error_code) + if error_code is None: + wrapper.untag_resource(resource_arn, tag_keys) + else: + with pytest.raises(ClientError) as exc_info: + wrapper.untag_resource(resource_arn, tag_keys) + assert exc_info.value.response["Error"]["Code"] == error_code + +@pytest.mark.parametrize("error_code", [None, "TestException"]) +def test_list_tags_for_resource(make_stubber, error_code): + healthlake_client = boto3.client("healthlake") + healthlake_stubber = make_stubber(healthlake_client) + wrapper = HealthLakeWrapper(healthlake_client) + resource_arn = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" + healthlake_stubber.stub_list_tags_for_resource(resource_arn, error_code=error_code) + if error_code is None: + wrapper.list_tags_for_resource(resource_arn) + else: + with pytest.raises(ClientError) as exc_info: + wrapper.list_tags_for_resource(resource_arn) + assert exc_info.value.response["Error"]["Code"] == error_code diff --git a/python/test_tools/healthlake_stubber.py b/python/test_tools/healthlake_stubber.py new file mode 100644 index 00000000000..5d85af53f5a --- /dev/null +++ b/python/test_tools/healthlake_stubber.py @@ -0,0 +1,345 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +""" +Stub functions that are used by the AWS HealthLake unit tests. + +When tests are run against an actual AWS account, the stubber class does not +set up stubs and passes all calls through to the Boto 3 client. +""" + +import io +import json +from botocore.stub import ANY +from boto3 import client + +from test_tools.example_stubber import ExampleStubber + +from datetime import timedelta, timezone, datetime + + +class HealthLakeStubber(ExampleStubber): + """ + A class that implements a variety of stub functions that are used by the + AWS HealthLake unit tests. + + The stubbed functions all expect certain parameters to be passed to them as + part of the tests, and will raise errors when the actual parameters differ from + the expected. + """ + + def __init__(self, healthlake_client: client, use_stubs=True) -> None: + """ + Initializes the object with a specific client and configures it for + stubbing or AWS passthrough. + + :param healthlake_client: A Boto 3 AWS HealthLake client. + :param use_stubs: When True, use stubs to intercept requests. Otherwise, + pass requests through to AWS. + """ + super().__init__(healthlake_client, use_stubs) + + def stub_create_fhir_datastore( + self, data_store_name: str, data_store_id: str, error_code: str = None + ) -> None: + expected_params = { + "DatastoreName": data_store_name, + "DatastoreTypeVersion": "R4", + } + + response = { + "DatastoreId": data_store_id, + "DatastoreArn": "datastore_arn", + "DatastoreStatus": "CREATING", + "DatastoreEndpoint": f"https://healthlake.us-east-1.amazonaws.com/datastore/{data_store_id}/r4/", + } + + self._stub_bifurcator( + "create_fhir_datastore", expected_params, response, error_code=error_code + ) + + def stub_describe_fhir_datastore( + self, data_store_id, error_code: str = None + ) -> None: + expected_params = {"DatastoreId": data_store_id} + + response = { + "DatastoreProperties": { + "DatastoreId": data_store_id, + "DatastoreArn": "datastore_arn", + "DatastoreStatus": "ACTIVE", + "DatastoreEndpoint": f"https://healthlake.us-east-1.amazonaws.com/datastore/{data_store_id}/r4/", + "CreatedAt": datetime.now(timezone.utc), + "DatastoreName": "datastore_name", + "DatastoreTypeVersion": "R4", + } + } + + self._stub_bifurcator( + "describe_fhir_datastore", expected_params, response, error_code=error_code + ) + + def stub_list_fhir_datastores(self, error_code: str = None) -> None: + expected_params = {} + + response = { + "DatastorePropertiesList": [ + { + "DatastoreId": "6407b9ae4c2def3cb6f1a46a0Example", + "DatastoreArn": "datastore_arn", + "DatastoreStatus": "ACTIVE", + "DatastoreEndpoint": f"https://healthlake.us-east-1.amazonaws.com/datastore/6407b9ae4c2def3cb6f1a46a0Example/r4/", + "CreatedAt": datetime.now(timezone.utc), + "DatastoreName": "datastore_name", + "DatastoreTypeVersion": "R4", + } + ] + } + + self._stub_bifurcator( + "list_fhir_datastores", expected_params, response, error_code=error_code + ) + + def stub_delete_fhir_datastore(self, data_store_id, error_code: str = None) -> None: + expected_params = {"DatastoreId": data_store_id} + + response = { + "DatastoreId": data_store_id, + "DatastoreArn": "datastore_arn", + "DatastoreStatus": "DELETING", + "DatastoreEndpoint": f"https://healthlake.us-east-1.amazonaws.com/datastore/{data_store_id}/r4/", + } + + self._stub_bifurcator( + "delete_fhir_datastore", expected_params, response, error_code=error_code + ) + + def stub_start_fhir_import_job( + self, + job_name: str, + data_store_id: str, + input_s3_uri: str, + output_s3_uri: str, + kms_key_id: str, + data_access_role_arn: str, + error_code: str = None, + ) -> None: + expected_params = { + "JobName": job_name, + "InputDataConfig": { + "S3Uri": input_s3_uri, + }, + "DatastoreId": data_store_id, + "JobOutputDataConfig": { + "S3Configuration": {"S3Uri": output_s3_uri, "KmsKeyId": kms_key_id} + }, + "DatastoreId": data_store_id, + "DataAccessRoleArn": data_access_role_arn, + } + + response = { + "JobId": "my_import_job", + "JobStatus": "SUBMITTED", + "DatastoreId": data_store_id, + } + + self._stub_bifurcator( + "start_fhir_import_job", expected_params, response, error_code=error_code + ) + + def stub_describe_fhir_import_job( + self, datastore_id, job_id, error_code: str = None + ): + expected_params = {"DatastoreId": datastore_id, "JobId": job_id} + + response = { + "ImportJobProperties": { + "JobId": job_id, + "JobName": "my_import_job", + "JobStatus": "COMPLETED", + "DatastoreId": datastore_id, + "SubmitTime": datetime.now(timezone.utc), + "EndTime": datetime.now(timezone.utc), + "InputDataConfig": { + "S3Uri": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX", + }, + "JobOutputDataConfig": { + "S3Configuration": { + "S3Uri": "s3://amzn-s3-demo-bucket-827365/import/output/", + "KmsKeyId": "kms_key_id", + } + }, + "JobProgressReport": { + "TotalNumberOfScannedFiles": 123, + "TotalSizeOfScannedFilesInMB": 123.0, + "TotalNumberOfImportedFiles": 123, + "TotalNumberOfResourcesScanned": 123, + "TotalNumberOfResourcesImported": 123, + "TotalNumberOfResourcesWithCustomerError": 123, + "TotalNumberOfFilesReadWithCustomerError": 123, + "Throughput": 123.0, + }, + "DataAccessRoleArn": "data_access_role_arn", + "Message": "Import job completed successfully", + } + } + + self._stub_bifurcator( + "describe_fhir_import_job", expected_params, response, error_code=error_code + ) + + def stub_list_fhir_import_jobs(self, data_store_id, error_code: str = None): + expected_params = {"DatastoreId": data_store_id} + + response = { + "ImportJobPropertiesList": [ + { + "JobId": "my_import_job", + "JobName": "my_import_job", + "JobStatus": "COMPLETED", + "DatastoreId": data_store_id, + "SubmitTime": datetime.now(timezone.utc), + "EndTime": datetime.now(timezone.utc), + "InputDataConfig": { + "S3Uri": "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX", + }, + "JobOutputDataConfig": { + "S3Configuration": { + "S3Uri": "s3://amzn-s3-demo-bucket-827365/import/output/", + "KmsKeyId": "kms_key_id", + } + }, + "JobProgressReport": { + "TotalNumberOfScannedFiles": 123, + "TotalSizeOfScannedFilesInMB": 123.0, + "TotalNumberOfImportedFiles": 123, + "TotalNumberOfResourcesScanned": 123, + "TotalNumberOfResourcesImported": 123, + "TotalNumberOfResourcesWithCustomerError": 123, + "TotalNumberOfFilesReadWithCustomerError": 123, + "Throughput": 123.0, + }, + "DataAccessRoleArn": "data_access_role_arn", + "Message": "Import job completed successfully", + } + ] + } + + self._stub_bifurcator( + "list_fhir_import_jobs", expected_params, response, error_code=error_code + ) + + def stub_start_fhir_export_job( + self, + job_name: str, + data_store_id: str, + output_s3_uri: str, + kms_key_id: str, + data_access_role_arn: str, + error_code: str = None, + ) -> None: + expected_params = { + "JobName": job_name, + "OutputDataConfig": { + "S3Configuration": {"S3Uri": output_s3_uri, "KmsKeyId": kms_key_id} + }, + "DatastoreId": data_store_id, + "DataAccessRoleArn": data_access_role_arn, + } + + response = { + "JobId": "my_export_job", + "JobStatus": "SUBMITTED", + "DatastoreId": data_store_id, + } + + self._stub_bifurcator( + "start_fhir_export_job", expected_params, response, error_code=error_code + ) + + def stub_list_fhir_export_jobs(self, data_store_id, error_code: str = None): + expected_params = {"DatastoreId": data_store_id} + + response = { + "ExportJobPropertiesList": [ + { + "JobId": "my_export_job", + "JobName": "my_export_job", + "JobStatus": "COMPLETED", + "DatastoreId": data_store_id, + "SubmitTime": datetime.now(timezone.utc), + "EndTime": datetime.now(timezone.utc), + "OutputDataConfig": { + "S3Configuration": { + "S3Uri": "s3://amzn-s3-demo-bucket-827365/export/output/", + "KmsKeyId": "kms_key_id", + } + }, + "DataAccessRoleArn": "data_access_role_arn", + "Message": "Export job completed successfully", + } + ] + } + + self._stub_bifurcator( + "list_fhir_export_jobs", expected_params, response, error_code=error_code + ) + + def stub_describe_fhir_export_job( + self, datastore_id, job_id, error_code: str = None + ): + expected_params = {"DatastoreId": datastore_id, "JobId": job_id} + + response = { + "ExportJobProperties": { + "JobId": job_id, + "JobName": "my_export_job", + "JobStatus": "COMPLETED", + "DatastoreId": datastore_id, + "SubmitTime": datetime.now(timezone.utc), + "EndTime": datetime.now(timezone.utc), + "OutputDataConfig": { + "S3Configuration": { + "S3Uri": "s3://amzn-s3-demo-bucket-827365/export/output/", + "KmsKeyId": "kms_key_id", + } + }, + "DataAccessRoleArn": "data_access_role_arn", + "Message": "Export job completed successfully", + } + } + + self._stub_bifurcator( + "describe_fhir_export_job", expected_params, response, error_code=error_code + ) + + def stub_tag_resource(self, resource_arn: str, tags: dict[str, str], error_code: str = None) -> None: + expected_params = { + "ResourceARN": resource_arn, + "Tags": tags, + } + + response = {} + + self._stub_bifurcator("tag_resource", expected_params, response, error_code=error_code) + + def stub_untag_resource(self, resource_arn: str, tag_keys: list[str], error_code: str = None) -> None: + expected_params = { + "ResourceARN": resource_arn, + "TagKeys": tag_keys, + } + response = {} + self._stub_bifurcator("untag_resource", expected_params, response, error_code=error_code) + + def stub_list_tags_for_resource(self, resource_arn: str, error_code: str = None) -> dict[str, str]: + expected_params = { + "ResourceARN": resource_arn, + } + + response = { + "Tags": [{"Key" :"test-key", "Value" : "test-value"}] + } + + self._stub_bifurcator( + "list_tags_for_resource", expected_params, response, error_code=error_code + ) \ No newline at end of file diff --git a/python/test_tools/stubber_factory.py b/python/test_tools/stubber_factory.py index e0164656a80..e21b17a232b 100644 --- a/python/test_tools/stubber_factory.py +++ b/python/test_tools/stubber_factory.py @@ -33,6 +33,7 @@ from test_tools.glacier_stubber import GlacierStubber from test_tools.glue_stubber import GlueStubber from test_tools.iam_stubber import IamStubber +from test_tools.healthlake_stubber import HealthLakeStubber from test_tools.keyspaces_stubber import KeyspacesStubber from test_tools.kinesis_stubber import KinesisStubber from test_tools.kinesis_analytics_v2_stubber import KinesisAnalyticsV2Stubber @@ -121,6 +122,8 @@ def stubber_factory(service_name): return GlueStubber elif service_name == "iam": return IamStubber + elif service_name == "healthlake": + return HealthLakeStubber elif service_name == "keyspaces": return KeyspacesStubber elif service_name == "kinesis":