-
Notifications
You must be signed in to change notification settings - Fork 5.7k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Python: SNS FIFO topic and standard queue scenario (#5366)
* Updated FIFO example --------- Co-authored-by: David Souther <[email protected]>
- Loading branch information
1 parent
fdc7da5
commit f7dddc1
Showing
5 changed files
with
408 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,256 @@ | ||
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
# SPDX-License-Identifier: Apache-2.0 | ||
|
||
""" | ||
Purpose | ||
Demonstrates subscribing Amazon Simple Queue Service (Amazon SQS) | ||
queues to a FIFO (First-In-First-Out) Amazon Simple Notification Service (Amazon SNS) topic. | ||
""" | ||
|
||
import logging | ||
import uuid | ||
import boto3 | ||
import json | ||
from botocore.exceptions import ClientError | ||
from sns_basics import SnsWrapper | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
# snippet-start:[python.example_code.sns.FifoTopicWrapper] | ||
class FifoTopicWrapper: | ||
"""Encapsulates Amazon SNS FIFO topic and subscription functions.""" | ||
|
||
def __init__(self, sns_resource): | ||
""" | ||
:param sns_resource: A Boto3 Amazon SNS resource. | ||
""" | ||
self.sns_resource = sns_resource | ||
|
||
# snippet-start:[python.example_code.sns.CreateFifoTopic] | ||
def create_fifo_topic(self, topic_name): | ||
""" | ||
Create a FIFO topic. | ||
Topic names must be made up of only uppercase and lowercase ASCII letters, | ||
numbers, underscores, and hyphens, and must be between 1 and 256 characters long. | ||
For a FIFO topic, the name must end with the .fifo suffix. | ||
:param topic_name: The name for the topic. | ||
:return: The new topic. | ||
""" | ||
try: | ||
topic = self.sns_resource.create_topic( | ||
Name=topic_name, | ||
Attributes={ | ||
'FifoTopic': str(True), | ||
'ContentBasedDeduplication': str(False) | ||
} | ||
) | ||
logger.info("Created FIFO topic with name=%s.", topic_name) | ||
return topic | ||
except ClientError as error: | ||
logger.exception("Couldn't create topic with name=%s!", topic_name) | ||
raise error | ||
|
||
# snippet-end:[python.example_code.sns.CreateFifoTopic] | ||
|
||
# snippet-start:[python.example_code.sns.AddTopicPolicy] | ||
@staticmethod | ||
def add_access_policy(queue, topic_arn): | ||
""" | ||
Add the necessary access policy to a queue, so | ||
it can receive messages from a topic. | ||
:param queue: The queue resource. | ||
:param topic_arn: The ARN of the topic. | ||
:return: None. | ||
""" | ||
try: | ||
queue.set_attributes( | ||
Attributes={ | ||
'Policy': json.dumps({ | ||
'Version': '2012-10-17', | ||
'Statement': [{ | ||
'Sid': 'test-sid', | ||
'Effect': 'Allow', | ||
'Principal': {'AWS': '*'}, | ||
'Action': 'SQS:SendMessage', | ||
'Resource': queue.attributes['QueueArn'], | ||
'Condition': {'ArnLike': {'aws:SourceArn': topic_arn}} | ||
}] | ||
}) | ||
} | ||
) | ||
logger.info("Added trust policy to the queue.") | ||
except ClientError as error: | ||
logger.exception("Couldn't add trust policy to the queue!") | ||
raise error | ||
|
||
# snippet-end:[python.example_code.sns.AddTopicPolicy] | ||
|
||
# snippet-start:[python.example_code.sns.SubscribeQueueToTopic] | ||
@staticmethod | ||
def subscribe_queue_to_topic(topic, queue_arn): | ||
""" | ||
Subscribe a queue to a topic. | ||
:param topic: The topic resource. | ||
:param queue_arn: The ARN of the queue. | ||
:return: The subscription resource. | ||
""" | ||
try: | ||
subscription = topic.subscribe( | ||
Protocol='sqs', | ||
Endpoint=queue_arn, | ||
) | ||
logger.info("The queue is subscribed to the topic.") | ||
return subscription | ||
except ClientError as error: | ||
logger.exception("Couldn't subscribe queue to topic!") | ||
raise error | ||
|
||
# snippet-end:[python.example_code.sns.SubscribeQueueToTopic] | ||
|
||
# snippet-start:[python.example_code.sns.PublishToTopic] | ||
@staticmethod | ||
def publish_price_update(topic, payload, group_id): | ||
""" | ||
Compose and publish a message that updates the wholesale price. | ||
:param topic: The topic to publish to. | ||
:param payload: The message to publish. | ||
:param group_id: The group ID for the message. | ||
:return: The ID of the message. | ||
""" | ||
try: | ||
att_dict = {'business': {'DataType': 'String', 'StringValue': 'wholesale'}} | ||
dedup_id = uuid.uuid4() | ||
response = topic.publish( | ||
Subject='Price Update', | ||
Message=payload, | ||
MessageAttributes=att_dict, | ||
MessageGroupId=group_id, | ||
MessageDeduplicationId=str(dedup_id)) | ||
message_id = response['MessageId'] | ||
logger.info( | ||
"Published message to topic %s.", topic.arn) | ||
except ClientError as error: | ||
logger.exception("Couldn't publish message to topic %s.", topic.arn) | ||
raise error | ||
return message_id | ||
|
||
# snippet-end:[python.example_code.sns.PublishToTopic] | ||
|
||
# snippet-start:[python.example_code.sns.DeleteQueue] | ||
@staticmethod | ||
def delete_queue(queue): | ||
""" | ||
Removes an SQS queue. When run against an AWS account, it can take up to | ||
60 seconds before the queue is actually deleted. | ||
:param queue: The queue to delete. | ||
:return: None | ||
""" | ||
try: | ||
queue.delete() | ||
logger.info("Deleted queue with URL=%s.", queue.url) | ||
except ClientError as error: | ||
logger.exception("Couldn't delete queue with URL=%s!", queue.url) | ||
raise error | ||
|
||
# snippet-end:[python.example_code.sns.DeleteQueue] | ||
|
||
# snippet-end:[python.example_code.sns.FifoTopicWrapper] | ||
|
||
|
||
# snippet-start:[python.example_code.sns.Scenario_SubscribeFifoTopic] | ||
def usage_demo(): | ||
"""Shows how to subscribe queues to a FIFO topic.""" | ||
print('-' * 88) | ||
print("Welcome to the `Subscribe queues to a FIFO topic` demo!") | ||
print('-' * 88) | ||
|
||
sns = boto3.resource('sns') | ||
sqs = boto3.resource('sqs') | ||
fifo_topic_wrapper = FifoTopicWrapper(sns) | ||
sns_wrapper = SnsWrapper(sns) | ||
|
||
prefix = 'sqs-subscribe-demo-' | ||
queues = set() | ||
subscriptions = set() | ||
|
||
wholesale_queue = sqs.create_queue( | ||
QueueName=prefix + 'wholesale.fifo', | ||
Attributes={ | ||
'MaximumMessageSize': str(4096), | ||
'ReceiveMessageWaitTimeSeconds': str(10), | ||
'VisibilityTimeout': str(300), | ||
'FifoQueue': str(True), | ||
'ContentBasedDeduplication': str(True), | ||
} | ||
) | ||
queues.add(wholesale_queue) | ||
print(f"Created FIFO queue with URL: {wholesale_queue.url}.") | ||
|
||
retail_queue = sqs.create_queue( | ||
QueueName=prefix + 'retail.fifo', | ||
Attributes={ | ||
'MaximumMessageSize': str(4096), | ||
'ReceiveMessageWaitTimeSeconds': str(10), | ||
'VisibilityTimeout': str(300), | ||
'FifoQueue': str(True), | ||
'ContentBasedDeduplication': str(True) | ||
} | ||
) | ||
queues.add(retail_queue) | ||
print(f"Created FIFO queue with URL: {retail_queue.url}.") | ||
|
||
analytics_queue = sqs.create_queue( | ||
QueueName=prefix + 'analytics', | ||
Attributes={} | ||
) | ||
queues.add(analytics_queue) | ||
print(f"Created standard queue with URL: {analytics_queue.url}.") | ||
|
||
topic = fifo_topic_wrapper.create_fifo_topic("price-updates-topic.fifo") | ||
print(f"Created FIFO topic: {topic.attributes['TopicArn']}.") | ||
|
||
for q in queues: | ||
fifo_topic_wrapper.add_access_policy(q, topic.attributes['TopicArn']) | ||
|
||
print(f"Added access policies for topic: {topic.attributes['TopicArn']}.") | ||
|
||
for q in queues: | ||
sub = fifo_topic_wrapper.subscribe_queue_to_topic(topic, q.attributes['QueueArn']) | ||
subscriptions.add(sub) | ||
|
||
print(f"Subscribed queues to topic: {topic.attributes['TopicArn']}.") | ||
|
||
input("Press Enter to publish a message to the topic.") | ||
|
||
message_id = fifo_topic_wrapper.publish_price_update(topic, '{"product": 214, "price": 79.99}', "Consumables") | ||
|
||
print(f"Published price update with message ID: {message_id}.") | ||
|
||
# Clean up the subscriptions, queues, and topic. | ||
input("Press Enter to clean up resources.") | ||
for s in subscriptions: | ||
sns_wrapper.delete_subscription(s) | ||
|
||
sns_wrapper.delete_topic(topic) | ||
|
||
for q in queues: | ||
fifo_topic_wrapper.delete_queue(q) | ||
|
||
print(f"Deleted subscriptions, queues, and topic.") | ||
|
||
print("Thanks for watching!") | ||
print('-' * 88) | ||
|
||
|
||
# snippet-end:[python.example_code.sns.Scenario_SubscribeFifoTopic] | ||
|
||
|
||
if __name__ == '__main__': | ||
usage_demo() |
Oops, something went wrong.