generated from timoguin/repo-template
-
Notifications
You must be signed in to change notification settings - Fork 1
/
sqs.py
95 lines (74 loc) · 2.85 KB
/
sqs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
from dataclasses import dataclass, field
from hashlib import md5
import logging
from typing import Optional
from .base import ModelBase
from ..utils.validators import is_valid_json
logging.getLogger(__name__).addHandler(logging.NullHandler())
@dataclass
class SqsMessageAttributes(ModelBase):
"""Attributes attached to an SQS message"""
# TODO: Unsure if some of these string will need to be cast to int or other types
approximate_receive_count: str
approximate_first_receive_timestamp: str
sender_id: str
sent_timestamp: str
# Optional attributes added when using FIFO queues
aws_trace_header: str = field(default=None)
message_deduplication_id: str = field(default=None)
message_group_id: str = field(default=None)
sequence_number: str = field(default=None)
_FIFO_ATTRIBUTES = [
"message_deduplication_id",
"message_group_id",
"sequence_number",
]
@property
def is_fifo(self) -> bool:
"""Check if there are FIFO-specific attributes"""
for attr in self._FIFO_ATTRIBUTES:
if getattr(self, attr) is None:
return False
return True
@dataclass
class SqsCustomMessageAttributeDefinition(ModelBase):
"""Type definition and value for a custom message attribute"""
data_type: str
string_value: str
@dataclass
class SqsMessage(ModelBase):
"""Schema for an SQS message"""
body: str
md5_of_body: str
message_id: str
receipt_handle: str
attributes: Optional[SqsMessageAttributes]
message_attributes: Optional[dict[str, SqsCustomMessageAttributeDefinition]]
event_source: Optional[str]
event_source_arn: Optional[str]
aws_region: Optional[str]
# Optional
md5_of_message_attributes: Optional[str] = field(default=None)
@property
def is_fifo(self) -> bool:
"""Check if a message came from a FIFO queue"""
return self.attributes.is_fifo()
@property
def calculated_md5_of_message_attributes(self) -> str:
"""Calculate the MD5 checksum of the message attributes"""
# TODO: Figure out the algorithm to build a digest of the message attributes.
# View the following documentation for details:
# https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-message-metadata.html#sqs-attributes-md5-message-digest-calculation # noqa
raise NotImplementedError
@property
def is_md5_of_message_attributes_valid(self) -> bool:
return (
self.md5_of_message_attributes == self.calculated_md5_of_message_attributes
)
@property
def is_md5_of_body_valid(self) -> bool:
return md5(self.body.encode("utf-8")).hexdigest() == self.md5_of_body
@property
def is_body_json(self) -> bool:
"""Check if the message body is a JSON string"""
return is_valid_json(self.body)