From f7dddc1e4cb536c6e9a9f895f1b843150fd9778a Mon Sep 17 00:00:00 2001 From: Rachel Hagerman <110480692+rlhagerm@users.noreply.github.com> Date: Wed, 13 Sep 2023 13:18:25 -0500 Subject: [PATCH] Python: SNS FIFO topic and standard queue scenario (#5366) * Updated FIFO example --------- Co-authored-by: David Souther --- .doc_gen/metadata/sns_metadata.yaml | 14 +- python/example_code/sns/README.md | 25 ++ python/example_code/sns/sns_fifo_topic.py | 256 ++++++++++++++++++ .../sns/test/test_sns_fifo_topic.py | 104 +++++++ python/test_tools/sns_stubber.py | 12 +- 5 files changed, 408 insertions(+), 3 deletions(-) create mode 100644 python/example_code/sns/sns_fifo_topic.py create mode 100644 python/example_code/sns/test/test_sns_fifo_topic.py diff --git a/.doc_gen/metadata/sns_metadata.yaml b/.doc_gen/metadata/sns_metadata.yaml index fcad04d8e3e..b0dabf2f4bd 100644 --- a/.doc_gen/metadata/sns_metadata.yaml +++ b/.doc_gen/metadata/sns_metadata.yaml @@ -1288,6 +1288,17 @@ sns_PublishFifoTopic: the end. snippet_tags: - sns.java2.PriceUpdateExample.display + Python: + versions: + - sdk_version: 3 + github: python/example_code/sns + excerpts: + - description: + Create an &SNS; FIFO topic, subscribe &SQS; FIFO and standard queues to the topic, + and publish a message to the topic. + snippet_tags: + - python.example_code.sns.Scenario_SubscribeFifoTopic + - python.example_code.sns.FifoTopicWrapper SAP ABAP: versions: - sdk_version: 1 @@ -1300,7 +1311,8 @@ sns_PublishFifoTopic: snippet_tags: - sns.abapv1.publish_message_to_fifo_queue services: - sns: {} + sns: {CreateTopic, Publish, Subscribe} + sqs: {} sns_PublishLargeMessage: title: Publish a large message to &SNS; with &S3; using an &AWS; SDK title_abbrev: Publish a large message diff --git a/python/example_code/sns/README.md b/python/example_code/sns/README.md index b1a6581fb92..8ff997fbc9c 100644 --- a/python/example_code/sns/README.md +++ b/python/example_code/sns/README.md @@ -49,6 +49,13 @@ Code excerpts that show you how to call individual service functions. * [Set a filter policy](sns_basics.py#L130) (`SetSubscriptionAttributes`) * [Subscribe an email address to a topic](sns_basics.py#L80) (`Subscribe`) +### Scenarios + +Code examples that show you how to accomplish a specific task by calling multiple +functions within the same service. + +* [Create and publish to a FIFO topic](sns_fifo_topic.py) + ### Cross-service examples Sample applications that work across multiple AWS services. @@ -70,6 +77,24 @@ python sns_basics.py +#### Create and publish to a FIFO topic + +This example shows you how to create and publish to a FIFO Amazon SNS topic. + + + + + +Start the example by running the following at a command prompt: + +``` +python sns_fifo_topic.py +``` + + + + + ### Tests ⚠ Running tests might result in charges to your AWS account. diff --git a/python/example_code/sns/sns_fifo_topic.py b/python/example_code/sns/sns_fifo_topic.py new file mode 100644 index 00000000000..2c264ea4de7 --- /dev/null +++ b/python/example_code/sns/sns_fifo_topic.py @@ -0,0 +1,256 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +""" +Purpose + +Demonstrates subscribing Amazon Simple Queue Service (Amazon SQS) +queues to a FIFO (First-In-First-Out) Amazon Simple Notification Service (Amazon SNS) topic. +""" + +import logging +import uuid +import boto3 +import json +from botocore.exceptions import ClientError +from sns_basics import SnsWrapper + +logger = logging.getLogger(__name__) + + +# snippet-start:[python.example_code.sns.FifoTopicWrapper] +class FifoTopicWrapper: + """Encapsulates Amazon SNS FIFO topic and subscription functions.""" + + def __init__(self, sns_resource): + """ + :param sns_resource: A Boto3 Amazon SNS resource. + """ + self.sns_resource = sns_resource + + # snippet-start:[python.example_code.sns.CreateFifoTopic] + def create_fifo_topic(self, topic_name): + """ + Create a FIFO topic. + Topic names must be made up of only uppercase and lowercase ASCII letters, + numbers, underscores, and hyphens, and must be between 1 and 256 characters long. + For a FIFO topic, the name must end with the .fifo suffix. + + :param topic_name: The name for the topic. + :return: The new topic. + """ + try: + topic = self.sns_resource.create_topic( + Name=topic_name, + Attributes={ + 'FifoTopic': str(True), + 'ContentBasedDeduplication': str(False) + } + ) + logger.info("Created FIFO topic with name=%s.", topic_name) + return topic + except ClientError as error: + logger.exception("Couldn't create topic with name=%s!", topic_name) + raise error + + # snippet-end:[python.example_code.sns.CreateFifoTopic] + + # snippet-start:[python.example_code.sns.AddTopicPolicy] + @staticmethod + def add_access_policy(queue, topic_arn): + """ + Add the necessary access policy to a queue, so + it can receive messages from a topic. + + :param queue: The queue resource. + :param topic_arn: The ARN of the topic. + :return: None. + """ + try: + queue.set_attributes( + Attributes={ + 'Policy': json.dumps({ + 'Version': '2012-10-17', + 'Statement': [{ + 'Sid': 'test-sid', + 'Effect': 'Allow', + 'Principal': {'AWS': '*'}, + 'Action': 'SQS:SendMessage', + 'Resource': queue.attributes['QueueArn'], + 'Condition': {'ArnLike': {'aws:SourceArn': topic_arn}} + }] + }) + } + ) + logger.info("Added trust policy to the queue.") + except ClientError as error: + logger.exception("Couldn't add trust policy to the queue!") + raise error + + # snippet-end:[python.example_code.sns.AddTopicPolicy] + + # snippet-start:[python.example_code.sns.SubscribeQueueToTopic] + @staticmethod + def subscribe_queue_to_topic(topic, queue_arn): + """ + Subscribe a queue to a topic. + + :param topic: The topic resource. + :param queue_arn: The ARN of the queue. + :return: The subscription resource. + """ + try: + subscription = topic.subscribe( + Protocol='sqs', + Endpoint=queue_arn, + ) + logger.info("The queue is subscribed to the topic.") + return subscription + except ClientError as error: + logger.exception("Couldn't subscribe queue to topic!") + raise error + + # snippet-end:[python.example_code.sns.SubscribeQueueToTopic] + + # snippet-start:[python.example_code.sns.PublishToTopic] + @staticmethod + def publish_price_update(topic, payload, group_id): + """ + Compose and publish a message that updates the wholesale price. + + :param topic: The topic to publish to. + :param payload: The message to publish. + :param group_id: The group ID for the message. + :return: The ID of the message. + """ + try: + att_dict = {'business': {'DataType': 'String', 'StringValue': 'wholesale'}} + dedup_id = uuid.uuid4() + response = topic.publish( + Subject='Price Update', + Message=payload, + MessageAttributes=att_dict, + MessageGroupId=group_id, + MessageDeduplicationId=str(dedup_id)) + message_id = response['MessageId'] + logger.info( + "Published message to topic %s.", topic.arn) + except ClientError as error: + logger.exception("Couldn't publish message to topic %s.", topic.arn) + raise error + return message_id + + # snippet-end:[python.example_code.sns.PublishToTopic] + + # snippet-start:[python.example_code.sns.DeleteQueue] + @staticmethod + def delete_queue(queue): + """ + Removes an SQS queue. When run against an AWS account, it can take up to + 60 seconds before the queue is actually deleted. + + :param queue: The queue to delete. + :return: None + """ + try: + queue.delete() + logger.info("Deleted queue with URL=%s.", queue.url) + except ClientError as error: + logger.exception("Couldn't delete queue with URL=%s!", queue.url) + raise error + +# snippet-end:[python.example_code.sns.DeleteQueue] + +# snippet-end:[python.example_code.sns.FifoTopicWrapper] + + +# snippet-start:[python.example_code.sns.Scenario_SubscribeFifoTopic] +def usage_demo(): + """Shows how to subscribe queues to a FIFO topic.""" + print('-' * 88) + print("Welcome to the `Subscribe queues to a FIFO topic` demo!") + print('-' * 88) + + sns = boto3.resource('sns') + sqs = boto3.resource('sqs') + fifo_topic_wrapper = FifoTopicWrapper(sns) + sns_wrapper = SnsWrapper(sns) + + prefix = 'sqs-subscribe-demo-' + queues = set() + subscriptions = set() + + wholesale_queue = sqs.create_queue( + QueueName=prefix + 'wholesale.fifo', + Attributes={ + 'MaximumMessageSize': str(4096), + 'ReceiveMessageWaitTimeSeconds': str(10), + 'VisibilityTimeout': str(300), + 'FifoQueue': str(True), + 'ContentBasedDeduplication': str(True), + } + ) + queues.add(wholesale_queue) + print(f"Created FIFO queue with URL: {wholesale_queue.url}.") + + retail_queue = sqs.create_queue( + QueueName=prefix + 'retail.fifo', + Attributes={ + 'MaximumMessageSize': str(4096), + 'ReceiveMessageWaitTimeSeconds': str(10), + 'VisibilityTimeout': str(300), + 'FifoQueue': str(True), + 'ContentBasedDeduplication': str(True) + } + ) + queues.add(retail_queue) + print(f"Created FIFO queue with URL: {retail_queue.url}.") + + analytics_queue = sqs.create_queue( + QueueName=prefix + 'analytics', + Attributes={} + ) + queues.add(analytics_queue) + print(f"Created standard queue with URL: {analytics_queue.url}.") + + topic = fifo_topic_wrapper.create_fifo_topic("price-updates-topic.fifo") + print(f"Created FIFO topic: {topic.attributes['TopicArn']}.") + + for q in queues: + fifo_topic_wrapper.add_access_policy(q, topic.attributes['TopicArn']) + + print(f"Added access policies for topic: {topic.attributes['TopicArn']}.") + + for q in queues: + sub = fifo_topic_wrapper.subscribe_queue_to_topic(topic, q.attributes['QueueArn']) + subscriptions.add(sub) + + print(f"Subscribed queues to topic: {topic.attributes['TopicArn']}.") + + input("Press Enter to publish a message to the topic.") + + message_id = fifo_topic_wrapper.publish_price_update(topic, '{"product": 214, "price": 79.99}', "Consumables") + + print(f"Published price update with message ID: {message_id}.") + + # Clean up the subscriptions, queues, and topic. + input("Press Enter to clean up resources.") + for s in subscriptions: + sns_wrapper.delete_subscription(s) + + sns_wrapper.delete_topic(topic) + + for q in queues: + fifo_topic_wrapper.delete_queue(q) + + print(f"Deleted subscriptions, queues, and topic.") + + print("Thanks for watching!") + print('-' * 88) + + +# snippet-end:[python.example_code.sns.Scenario_SubscribeFifoTopic] + + +if __name__ == '__main__': + usage_demo() diff --git a/python/example_code/sns/test/test_sns_fifo_topic.py b/python/example_code/sns/test/test_sns_fifo_topic.py new file mode 100644 index 00000000000..d99183b9bf3 --- /dev/null +++ b/python/example_code/sns/test/test_sns_fifo_topic.py @@ -0,0 +1,104 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +""" +Unit tests for sns_fifo_topic.py +""" + +import json +import boto3 +from botocore.exceptions import ClientError +from botocore.stub import ANY +import pytest + +from sns_fifo_topic import FifoTopicWrapper + +TOPIC_ARN = 'arn:aws:sns:REGION:123456789012:topic/test-name' +SUBSCRIPTION_ARN = 'arn:aws:sns:REGION:123456789012:subscription/sub-name' +QUEUE_URL = 'https://REGION.amazonaws.com/123456789012/queue-name' +QUEUE_ARN = 'arn:aws:sqs:REGION:123456789012:queue/queue-name' + + +@pytest.mark.parametrize('error_code', [None, 'TestException']) +def test_create_fifo_topic(make_stubber, error_code): + sns_resource = boto3.resource('sns') + sns_stubber = make_stubber(sns_resource.meta.client) + fifo_topic_wrapper = FifoTopicWrapper(sns_resource) + name = 'test-name.fifo' + attributes = {'ContentBasedDeduplication': 'False', 'FifoTopic': 'True'} + + sns_stubber.stub_create_topic(name, TOPIC_ARN, topic_attributes=attributes, error_code=error_code) + + if error_code is None: + got_topic = fifo_topic_wrapper.create_fifo_topic(name) + assert got_topic.arn == TOPIC_ARN + else: + with pytest.raises(ClientError) as exc_info: + fifo_topic_wrapper.create_fifo_topic(name) + assert exc_info.value.response['Error']['Code'] == error_code + +@pytest.mark.parametrize('error_code', [None, 'TestException']) +def test_subscribe_queue_to_topic(make_stubber, error_code): + sns_resource = boto3.resource('sns') + sns_stubber = make_stubber(sns_resource.meta.client) + topic = sns_resource.Topic(TOPIC_ARN) + fifo_topic_wrapper = FifoTopicWrapper(sns_resource) + + sns_stubber.stub_subscribe(topic.arn, 'sqs', QUEUE_ARN, SUBSCRIPTION_ARN, False, error_code=error_code) + + if error_code is None: + got_subscription = fifo_topic_wrapper.subscribe_queue_to_topic(topic, QUEUE_ARN) + assert got_subscription.arn == SUBSCRIPTION_ARN + else: + with pytest.raises(ClientError) as exc_info: + fifo_topic_wrapper.subscribe_queue_to_topic(topic, QUEUE_ARN) + assert exc_info.value.response['Error']['Code'] == error_code + + +@pytest.mark.parametrize('error_code', [None, 'TestException']) +def test_publish_price_update(make_stubber, error_code): + sns_resource = boto3.resource('sns') + sns_stubber = make_stubber(sns_resource.meta.client) + fifo_topic_wrapper = FifoTopicWrapper(sns_resource) + topic = sns_resource.Topic(TOPIC_ARN) + message = 'test message' + message_id = 'msg-id' + group_id = 'group-id' + attributes = {'business': 'wholesale'} + dedup_id = 'dedup-id' + subject = 'Price Update' + + sns_stubber.stub_publish( + message, message_id, + topic_arn=TOPIC_ARN, + subject=subject, + group_id=group_id, + dedup_id=dedup_id, + message_attributes=attributes, + error_code=error_code) + + if error_code is None: + got_message_id = fifo_topic_wrapper.publish_price_update(topic, message, group_id) + assert got_message_id == message_id + else: + with pytest.raises(ClientError) as exc_info: + fifo_topic_wrapper.publish_price_update(topic, message, group_id) + assert exc_info.value.response['Error']['Code'] == error_code + +@pytest.mark.parametrize('error_code', [None, 'TestException']) +def test_delete_queue(make_stubber, error_code): + sqs_resource = boto3.resource('sqs') + sns_resource = boto3.resource('sns') + sqs_stubber = make_stubber(sqs_resource.meta.client) + queue = sqs_resource.Queue(QUEUE_URL) + fifo_topic_wrapper = FifoTopicWrapper(sns_resource) + + sqs_stubber.stub_delete_queue(QUEUE_URL, error_code=error_code) + + if error_code is None: + fifo_topic_wrapper.delete_queue(queue) + else: + with pytest.raises(ClientError) as exc_info: + fifo_topic_wrapper.delete_queue(queue) + assert exc_info.value.response['Error']['Code'] == error_code + diff --git a/python/test_tools/sns_stubber.py b/python/test_tools/sns_stubber.py index 6f3c71617ec..7c7a9383ed0 100644 --- a/python/test_tools/sns_stubber.py +++ b/python/test_tools/sns_stubber.py @@ -6,6 +6,7 @@ """ import json +from botocore.stub import ANY from test_tools.example_stubber import ExampleStubber @@ -29,9 +30,11 @@ def __init__(self, client, use_stubs=True): """ super().__init__(client, use_stubs) - def stub_create_topic(self, topic_name, topic_arn, error_code=None): + def stub_create_topic(self, topic_name, topic_arn, topic_attributes=None, error_code=None): expected_params = {'Name': topic_name} response = {f'TopicArn': topic_arn} + if topic_attributes is not None: + expected_params['Attributes'] = topic_attributes self._stub_bifurcator( 'create_topic', expected_params, response, error_code=error_code) @@ -89,7 +92,7 @@ def stub_unsubscribe(self, subscription_arn, error_code=None): def stub_publish( self, message, message_id, topic_arn=None, phone_number=None, subject=None, - message_structure=None, message_attributes=None, error_code=None): + group_id=None, dedup_id=None, message_structure=None, message_attributes=None, error_code=None): expected_params = {'Message': message} if topic_arn is not None: expected_params['TopicArn'] = topic_arn @@ -97,6 +100,10 @@ def stub_publish( expected_params['PhoneNumber'] = phone_number if subject is not None: expected_params['Subject'] = subject + if group_id is not None: + expected_params['MessageGroupId'] = group_id + if dedup_id is not None: + expected_params['MessageDeduplicationId'] = ANY if message_structure is not None: expected_params['MessageStructure'] = message_structure if message_attributes is not None: @@ -110,3 +117,4 @@ def stub_publish( response = {'MessageId': message_id} self._stub_bifurcator( 'publish', expected_params, response, error_code=error_code) +