From 66dd7d002b5ab710c69458bbb972f2e278f9ede4 Mon Sep 17 00:00:00 2001 From: Rebecca Graber Date: Fri, 12 May 2023 14:29:41 -0400 Subject: [PATCH] feat: make signal argument optional --- CHANGELOG.rst | 7 +++++ edx_event_bus_redis/__init__.py | 2 +- edx_event_bus_redis/internal/consumer.py | 26 +++++++++---------- edx_event_bus_redis/internal/message.py | 10 ++----- .../internal/tests/test_consumer.py | 8 +----- .../internal/tests/test_message.py | 20 +------------- 6 files changed, 24 insertions(+), 49 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 5bccb74..1f2ffb2 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -16,6 +16,13 @@ Unreleased * +[0.2.1] - 2023-05-12 +************************************************ + +Changed +======= +* Deprecated ``signal`` argument in consumer (made optional in preparation for removal) + [0.1.1] - 2023-05-12 ************************************************ diff --git a/edx_event_bus_redis/__init__.py b/edx_event_bus_redis/__init__.py index 1618ceb..b0d6179 100644 --- a/edx_event_bus_redis/__init__.py +++ b/edx_event_bus_redis/__init__.py @@ -5,6 +5,6 @@ from edx_event_bus_redis.internal.consumer import RedisEventConsumer from edx_event_bus_redis.internal.producer import create_producer -__version__ = '0.1.1' +__version__ = '0.2.1' default_app_config = 'edx_event_bus_redis.apps.EdxEventBusRedisConfig' # pylint: disable=invalid-name diff --git a/edx_event_bus_redis/internal/consumer.py b/edx_event_bus_redis/internal/consumer.py index 02a2ebe..ff6bd42 100644 --- a/edx_event_bus_redis/internal/consumer.py +++ b/edx_event_bus_redis/internal/consumer.py @@ -79,8 +79,8 @@ def _reconnect_to_db_if_needed(): class RedisEventConsumer(EventBusConsumer): """ - Construct consumer for the given topic, group, and signal. The consumer can then - emit events from the event bus using the configured signal. + Construct consumer for the given topic and group. The consumer can then + emit events coming from the topic. Note that the topic should be specified here *without* the optional environment prefix. @@ -89,7 +89,7 @@ class RedisEventConsumer(EventBusConsumer): Attributes: topic: Topic/stream name. group_id: consumer group name. - signal: openedx_events signal. + signal: DEPRECATED, will be removed in a future release consumer_name: unique name for consumer within a group. last_read_msg_id: Start reading msgs from a specific redis msg id. check_backlog: flag to process all messages that were not read by this consumer group. @@ -101,11 +101,10 @@ class RedisEventConsumer(EventBusConsumer): consumer: consumer instance. """ - def __init__(self, topic, group_id, signal, consumer_name, last_read_msg_id=None, check_backlog=False, - claim_msgs_older_than=None): + def __init__(self, topic, group_id, consumer_name, signal=None, # pylint: disable=unused-argument + last_read_msg_id=None, check_backlog=False, claim_msgs_older_than=None): self.topic = topic self.group_id = group_id - self.signal = signal self.consumer_name = consumer_name self.last_read_msg_id = last_read_msg_id self.check_backlog = check_backlog @@ -128,7 +127,7 @@ def _create_db(self) -> Database: def _create_consumer(self, db: Database, full_topic: str) -> ConsumerGroupStream: """ - Create a redis stream consumer group and a consumer for events of the given signal instance. + Create a redis stream consumer group and a consumer for the given topic Returns ConsumerGroupStream @@ -193,7 +192,6 @@ def _consume_indefinitely(self): run_context = { 'full_topic': self.full_topic, 'consumer_group': self.group_id, - 'expected_signal': self.signal, 'consumer_name': self.consumer_name, } @@ -234,7 +232,7 @@ def _consume_indefinitely(self): if redis_raw_msg: if isinstance(redis_raw_msg, list): redis_raw_msg = redis_raw_msg[0] - msg = RedisMessage.parse(redis_raw_msg, self.full_topic, expected_signal=self.signal) + msg = RedisMessage.parse(redis_raw_msg, self.full_topic) # Before processing, make sure our db connection is still active _reconnect_to_db_if_needed() self.emit_signals_from_message(msg) @@ -281,7 +279,7 @@ def emit_signals_from_message(self, msg: RedisMessage): # Raise an exception if any receivers errored out. This allows logging of the receivers # along with partition, offset, etc. in record_event_consuming_error. Hopefully the # receiver code is idempotent and we can just replay any messages that were involved. - self._check_receiver_results(send_results) + self._check_receiver_results(send_results, signal) # At the very end, log that a message was processed successfully. # Since we're single-threaded, no other information is needed; @@ -290,12 +288,13 @@ def emit_signals_from_message(self, msg: RedisMessage): if AUDIT_LOGGING_ENABLED.is_enabled(): logger.info('Message from Redis processed successfully') - def _check_receiver_results(self, send_results: list): + def _check_receiver_results(self, send_results: list, signal: OpenEdxPublicSignal): """ Raises exception if any of the receivers produced an exception. Arguments: send_results: Output of ``send_events``, a list of ``(receiver, response)`` tuples. + signal: The signal used to send the events """ error_descriptions = [] errors = [] @@ -318,7 +317,7 @@ def _check_receiver_results(self, send_results: list): raise ReceiverError( f"{len(error_descriptions)} receiver(s) out of {len(send_results)} " "produced errors (stack trace elsewhere in logs) " - f"when handling signal {self.signal}: {', '.join(error_descriptions)}", + f"when handling signal {signal}: {', '.join(error_descriptions)}", errors ) @@ -356,7 +355,7 @@ def record_event_consuming_error(self, run_context, error, maybe_message): Arguments: run_context: Dictionary of contextual information: full_topic, consumer_group, - and expected_signal. + and consumer_name. error: An exception instance maybe_message: None if event could not be fetched or decoded, or a Redis Message if one was successfully deserialized but could not be processed for some reason @@ -385,7 +384,6 @@ def _add_message_monitoring(self, run_context, message, error=None): Arguments: run_context: Dictionary of contextual information: full_topic, consumer_group, - and expected_signal. message: None if event could not be fetched or decoded, or a Message if one was successfully deserialized but could not be processed for some reason error: (Optional) An exception instance, or None if no error. diff --git a/edx_event_bus_redis/internal/message.py b/edx_event_bus_redis/internal/message.py index 661f08d..1ae371e 100644 --- a/edx_event_bus_redis/internal/message.py +++ b/edx_event_bus_redis/internal/message.py @@ -3,7 +3,7 @@ """ from typing import Dict, NamedTuple, Optional -from openedx_events.tooling import EventsMetadata, OpenEdxPublicSignal +from openedx_events.tooling import EventsMetadata from edx_event_bus_redis.internal.utils import get_headers_from_metadata, get_metadata_from_headers @@ -35,14 +35,13 @@ def to_binary_dict(self) -> Dict[bytes, bytes]: return data @classmethod - def parse(cls, msg: tuple, topic: str, expected_signal: Optional[OpenEdxPublicSignal] = None): + def parse(cls, msg: tuple, topic: str): """ Takes message from redis stream and parses it to return an instance of RedisMessage. Args: msg: Tuple with 1st item being msg_id and 2nd data from message. topic: Stream name. - expected_signal [Optional]: If passed, the signal type is matched with type in msg. Returns: RedisMessage with msg_id @@ -54,9 +53,4 @@ def parse(cls, msg: tuple, topic: str, expected_signal: Optional[OpenEdxPublicSi except Exception as e: raise UnusableMessageError(f"Error determining metadata from message headers: {e}") from e - if expected_signal and metadata.event_type != expected_signal.event_type: - raise UnusableMessageError( - f"Signal types do not match. Expected {expected_signal.event_type}. " - f"Received message of type {metadata.event_type}." - ) return cls(msg_id=msg_id, event_data=event_data_bytes, event_metadata=metadata, topic=topic) diff --git a/edx_event_bus_redis/internal/tests/test_consumer.py b/edx_event_bus_redis/internal/tests/test_consumer.py index 3d05ca4..bb78cb5 100644 --- a/edx_event_bus_redis/internal/tests/test_consumer.py +++ b/edx_event_bus_redis/internal/tests/test_consumer.py @@ -80,7 +80,6 @@ def setUp(self): self.event_consumer = RedisEventConsumer( 'local-some-topic', 'test_group_id', - self.signal, consumer_name='test_group_id.c1', check_backlog=True, claim_msgs_older_than=10, @@ -125,7 +124,6 @@ def test_consumer_creation(self, last_read_msg_id, check_backlog, mock_logger): RedisEventConsumer( 'local-some-topic', 'test_group_id', - self.signal, consumer_name='test_group_id.c1', last_read_msg_id=last_read_msg_id, check_backlog=check_backlog, @@ -200,8 +198,6 @@ def test_consume_loop(self, is_pending, mock_sleep, mock_logger, mock_set_custom assert "Error consuming event from Redis: ValueError('something broke') in context" in exc_log_msg assert "full_topic='local-some-topic'" in exc_log_msg assert "consumer_group='test_group_id'" in exc_log_msg - assert ("expected_signal=") in exc_log_msg assert "-- event details: " in exc_log_msg assert str(self.normal_message) in exc_log_msg @@ -385,8 +381,6 @@ def read_side_effect(*args, **kwargs): assert f"Error consuming event from Redis: {repr(exception)} in context" in exc_log_msg assert "full_topic='local-some-topic'" in exc_log_msg assert "consumer_group='test_group_id'" in exc_log_msg - assert ("expected_signal=") in exc_log_msg assert "-- no event available" in exc_log_msg expected_custom_attribute_calls = [ @@ -473,7 +467,7 @@ def test_malformed_receiver_errors(self): (lambda x:x, Exception("for lambda")), # This would actually raise an error inside send_robust(), but it will serve well enough for testing... ("not even a function", ValueError("just plain bad")), - ]) + ], self.signal) assert exc_info.value.args == ( "2 receiver(s) out of 2 produced errors (stack trace elsewhere in logs) " "when handling signal