Skip to content

Commit

Permalink
Merge pull request #22 from rgraber/rsgraber/20230512-remove-signal-p…
Browse files Browse the repository at this point in the history
…aram

feat: make signal argument optional
  • Loading branch information
bmtcril committed May 15, 2023
2 parents 86c7f12 + 66dd7d0 commit 0ebee6a
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 49 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@ Unreleased

*

[0.2.1] - 2023-05-12
************************************************

Changed
=======
* Deprecated ``signal`` argument in consumer (made optional in preparation for removal)

[0.1.1] - 2023-05-12
************************************************

Expand Down
2 changes: 1 addition & 1 deletion edx_event_bus_redis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@
from edx_event_bus_redis.internal.consumer import RedisEventConsumer
from edx_event_bus_redis.internal.producer import create_producer

__version__ = '0.1.1'
__version__ = '0.2.1'

default_app_config = 'edx_event_bus_redis.apps.EdxEventBusRedisConfig' # pylint: disable=invalid-name
26 changes: 12 additions & 14 deletions edx_event_bus_redis/internal/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ def _reconnect_to_db_if_needed():

class RedisEventConsumer(EventBusConsumer):
"""
Construct consumer for the given topic, group, and signal. The consumer can then
emit events from the event bus using the configured signal.
Construct consumer for the given topic and group. The consumer can then
emit events coming from the topic.
Note that the topic should be specified here *without* the optional environment prefix.
Expand All @@ -89,7 +89,7 @@ class RedisEventConsumer(EventBusConsumer):
Attributes:
topic: Topic/stream name.
group_id: consumer group name.
signal: openedx_events signal.
signal: DEPRECATED, will be removed in a future release
consumer_name: unique name for consumer within a group.
last_read_msg_id: Start reading msgs from a specific redis msg id.
check_backlog: flag to process all messages that were not read by this consumer group.
Expand All @@ -101,11 +101,10 @@ class RedisEventConsumer(EventBusConsumer):
consumer: consumer instance.
"""

def __init__(self, topic, group_id, signal, consumer_name, last_read_msg_id=None, check_backlog=False,
claim_msgs_older_than=None):
def __init__(self, topic, group_id, consumer_name, signal=None, # pylint: disable=unused-argument
last_read_msg_id=None, check_backlog=False, claim_msgs_older_than=None):
self.topic = topic
self.group_id = group_id
self.signal = signal
self.consumer_name = consumer_name
self.last_read_msg_id = last_read_msg_id
self.check_backlog = check_backlog
Expand All @@ -128,7 +127,7 @@ def _create_db(self) -> Database:

def _create_consumer(self, db: Database, full_topic: str) -> ConsumerGroupStream:
"""
Create a redis stream consumer group and a consumer for events of the given signal instance.
Create a redis stream consumer group and a consumer for the given topic
Returns
ConsumerGroupStream
Expand Down Expand Up @@ -193,7 +192,6 @@ def _consume_indefinitely(self):
run_context = {
'full_topic': self.full_topic,
'consumer_group': self.group_id,
'expected_signal': self.signal,
'consumer_name': self.consumer_name,
}

Expand Down Expand Up @@ -234,7 +232,7 @@ def _consume_indefinitely(self):
if redis_raw_msg:
if isinstance(redis_raw_msg, list):
redis_raw_msg = redis_raw_msg[0]
msg = RedisMessage.parse(redis_raw_msg, self.full_topic, expected_signal=self.signal)
msg = RedisMessage.parse(redis_raw_msg, self.full_topic)
# Before processing, make sure our db connection is still active
_reconnect_to_db_if_needed()
self.emit_signals_from_message(msg)
Expand Down Expand Up @@ -281,7 +279,7 @@ def emit_signals_from_message(self, msg: RedisMessage):
# Raise an exception if any receivers errored out. This allows logging of the receivers
# along with partition, offset, etc. in record_event_consuming_error. Hopefully the
# receiver code is idempotent and we can just replay any messages that were involved.
self._check_receiver_results(send_results)
self._check_receiver_results(send_results, signal)

# At the very end, log that a message was processed successfully.
# Since we're single-threaded, no other information is needed;
Expand All @@ -290,12 +288,13 @@ def emit_signals_from_message(self, msg: RedisMessage):
if AUDIT_LOGGING_ENABLED.is_enabled():
logger.info('Message from Redis processed successfully')

def _check_receiver_results(self, send_results: list):
def _check_receiver_results(self, send_results: list, signal: OpenEdxPublicSignal):
"""
Raises exception if any of the receivers produced an exception.
Arguments:
send_results: Output of ``send_events``, a list of ``(receiver, response)`` tuples.
signal: The signal used to send the events
"""
error_descriptions = []
errors = []
Expand All @@ -318,7 +317,7 @@ def _check_receiver_results(self, send_results: list):
raise ReceiverError(
f"{len(error_descriptions)} receiver(s) out of {len(send_results)} "
"produced errors (stack trace elsewhere in logs) "
f"when handling signal {self.signal}: {', '.join(error_descriptions)}",
f"when handling signal {signal}: {', '.join(error_descriptions)}",
errors
)

Expand Down Expand Up @@ -356,7 +355,7 @@ def record_event_consuming_error(self, run_context, error, maybe_message):
Arguments:
run_context: Dictionary of contextual information: full_topic, consumer_group,
and expected_signal.
and consumer_name.
error: An exception instance
maybe_message: None if event could not be fetched or decoded, or a Redis Message if
one was successfully deserialized but could not be processed for some reason
Expand Down Expand Up @@ -385,7 +384,6 @@ def _add_message_monitoring(self, run_context, message, error=None):
Arguments:
run_context: Dictionary of contextual information: full_topic, consumer_group,
and expected_signal.
message: None if event could not be fetched or decoded, or a Message if one
was successfully deserialized but could not be processed for some reason
error: (Optional) An exception instance, or None if no error.
Expand Down
10 changes: 2 additions & 8 deletions edx_event_bus_redis/internal/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"""
from typing import Dict, NamedTuple, Optional

from openedx_events.tooling import EventsMetadata, OpenEdxPublicSignal
from openedx_events.tooling import EventsMetadata

from edx_event_bus_redis.internal.utils import get_headers_from_metadata, get_metadata_from_headers

Expand Down Expand Up @@ -35,14 +35,13 @@ def to_binary_dict(self) -> Dict[bytes, bytes]:
return data

@classmethod
def parse(cls, msg: tuple, topic: str, expected_signal: Optional[OpenEdxPublicSignal] = None):
def parse(cls, msg: tuple, topic: str):
"""
Takes message from redis stream and parses it to return an instance of RedisMessage.
Args:
msg: Tuple with 1st item being msg_id and 2nd data from message.
topic: Stream name.
expected_signal [Optional]: If passed, the signal type is matched with type in msg.
Returns:
RedisMessage with msg_id
Expand All @@ -54,9 +53,4 @@ def parse(cls, msg: tuple, topic: str, expected_signal: Optional[OpenEdxPublicSi
except Exception as e:
raise UnusableMessageError(f"Error determining metadata from message headers: {e}") from e

if expected_signal and metadata.event_type != expected_signal.event_type:
raise UnusableMessageError(
f"Signal types do not match. Expected {expected_signal.event_type}. "
f"Received message of type {metadata.event_type}."
)
return cls(msg_id=msg_id, event_data=event_data_bytes, event_metadata=metadata, topic=topic)
8 changes: 1 addition & 7 deletions edx_event_bus_redis/internal/tests/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ def setUp(self):
self.event_consumer = RedisEventConsumer(
'local-some-topic',
'test_group_id',
self.signal,
consumer_name='test_group_id.c1',
check_backlog=True,
claim_msgs_older_than=10,
Expand Down Expand Up @@ -125,7 +124,6 @@ def test_consumer_creation(self, last_read_msg_id, check_backlog, mock_logger):
RedisEventConsumer(
'local-some-topic',
'test_group_id',
self.signal,
consumer_name='test_group_id.c1',
last_read_msg_id=last_read_msg_id,
check_backlog=check_backlog,
Expand Down Expand Up @@ -200,8 +198,6 @@ def test_consume_loop(self, is_pending, mock_sleep, mock_logger, mock_set_custom
assert "Error consuming event from Redis: ValueError('something broke') in context" in exc_log_msg
assert "full_topic='local-some-topic'" in exc_log_msg
assert "consumer_group='test_group_id'" in exc_log_msg
assert ("expected_signal=<OpenEdxPublicSignal: "
"org.openedx.learning.auth.session.login.completed.v1>") in exc_log_msg
assert "-- event details: " in exc_log_msg
assert str(self.normal_message) in exc_log_msg

Expand Down Expand Up @@ -385,8 +381,6 @@ def read_side_effect(*args, **kwargs):
assert f"Error consuming event from Redis: {repr(exception)} in context" in exc_log_msg
assert "full_topic='local-some-topic'" in exc_log_msg
assert "consumer_group='test_group_id'" in exc_log_msg
assert ("expected_signal=<OpenEdxPublicSignal: "
"org.openedx.learning.auth.session.login.completed.v1>") in exc_log_msg
assert "-- no event available" in exc_log_msg

expected_custom_attribute_calls = [
Expand Down Expand Up @@ -473,7 +467,7 @@ def test_malformed_receiver_errors(self):
(lambda x:x, Exception("for lambda")),
# This would actually raise an error inside send_robust(), but it will serve well enough for testing...
("not even a function", ValueError("just plain bad")),
])
], self.signal)
assert exc_info.value.args == (
"2 receiver(s) out of 2 produced errors (stack trace elsewhere in logs) "
"when handling signal <OpenEdxPublicSignal: "
Expand Down
20 changes: 1 addition & 19 deletions edx_event_bus_redis/internal/tests/test_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,24 +77,6 @@ def test_no_event_data(self):
"__init__() missing 1 required positional argument: 'event_type'",
)

def test_unexpected_signal_type_in_msg(self):
msg = (
b'1',
{
b'id': b'629f9892-c258-11ed-8dac-1c83413013cb',
b'event_data': self.event_data_bytes,
b'type': b'incorrect-type',
}
)

with pytest.raises(UnusableMessageError) as excinfo:
RedisMessage.parse(msg, topic='some-local-topic', expected_signal=self.signal)

assert excinfo.value.args == (
"Signal types do not match. Expected org.openedx.learning.auth.session.login.completed.v1. "
"Received message of type incorrect-type.",
)

def test_bad_msg(self):
"""
Check that if we cannot process the message headers, we raise an UnusableMessageError
Expand All @@ -111,7 +93,7 @@ def test_bad_msg(self):
)

with pytest.raises(UnusableMessageError) as excinfo:
RedisMessage.parse(msg, topic='some-local-topic', expected_signal=self.signal)
RedisMessage.parse(msg, topic='some-local-topic')

assert excinfo.value.args == (
"Error determining metadata from message headers: badly formed hexadecimal UUID string",
Expand Down

0 comments on commit 0ebee6a

Please sign in to comment.