Skip to content

Commit

Permalink
wip: refactor: use utility from openedex_events to reset application …
Browse files Browse the repository at this point in the history
…state
  • Loading branch information
navinkarkera committed Oct 16, 2023
1 parent 2c6e41d commit f267bda
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 21 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ Unreleased

*

[0.3.3] - 2023-10-13
************************************************

Added
=====
* Use utility from openedx_events to reset application state before processing
event

[0.3.2] - 2023-09-01
************************************************

Expand Down
2 changes: 1 addition & 1 deletion edx_event_bus_redis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@
from edx_event_bus_redis.internal.consumer import RedisEventConsumer
from edx_event_bus_redis.internal.producer import create_producer

__version__ = '0.3.2'
__version__ = '0.3.3'

default_app_config = 'edx_event_bus_redis.apps.EdxEventBusRedisConfig' # pylint: disable=invalid-name
22 changes: 4 additions & 18 deletions edx_event_bus_redis/internal/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from edx_toggles.toggles import SettingToggle
from openedx_events.event_bus import EventBusConsumer
from openedx_events.event_bus.avro.deserializer import deserialize_bytes_to_event_data
from openedx_events.tooling import OpenEdxPublicSignal
from openedx_events.tooling import OpenEdxPublicSignal, prepare_for_new_work_cycle
from redis.exceptions import ConnectionError as RedisConnectionError
from redis.exceptions import ResponseError
from walrus import Database
Expand Down Expand Up @@ -70,21 +70,6 @@ class EventConsumptionException(Exception):
"""


def _reconnect_to_db_if_needed():
"""
Reconnects the db connection if needed.
This is important because Django only does connection validity/age checks as part of
its request/response cycle, which isn't in effect for the consume-loop. If we don't
force these checks, a broken connection will remain broken indefinitely. For most
consumers, this will cause event processing to fail.
"""
has_connection = bool(connection.connection)
requires_reconnect = has_connection and not connection.is_usable()
if requires_reconnect:
connection.connect()


class RedisEventConsumer(EventBusConsumer):
"""
Construct consumer for the given topic and group. The consumer can then
Expand Down Expand Up @@ -240,8 +225,9 @@ def _consume_indefinitely(self):
if isinstance(redis_raw_msg, list):
redis_raw_msg = redis_raw_msg[0]
msg = RedisMessage.parse(redis_raw_msg, self.full_topic)
# Before processing, make sure our db connection is still active
_reconnect_to_db_if_needed()
# Before processing, try to make sure our application state is cleaned
# up as would happen at the start of a Django request/response cycle.
prepare_for_new_work_cycle()
self.emit_signals_from_message(msg)
consecutive_errors = 0

Expand Down
4 changes: 2 additions & 2 deletions requirements/base.in
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
-c constraints.txt

Django # Web application framework
# openedx-events 8.0.0 removes an argument from consumer initialization
openedx-events>=8.0.0 # Events API
# openedx-events 9.0.1 adds utility to reset application state
openedx-events>=9.0.1 # Events API
edx_django_utils
edx_toggles

Expand Down

0 comments on commit f267bda

Please sign in to comment.