From f6bb6ae53b4f8f6fa7ea7eb112e837de740c8f90 Mon Sep 17 00:00:00 2001 From: Cristhian Garcia Date: Wed, 21 Feb 2024 15:01:11 -0500 Subject: [PATCH 1/8] feat: send events in batch from routers fix: add time_to_send method chore: quality fixes --- .../backends/events_router.py | 55 +++++++++++- .../backends/tests/test_events_router.py | 86 ++++++++++++++++++- event_routing_backends/helpers.py | 2 +- .../commands/transform_tracking_logs.py | 2 +- event_routing_backends/settings/common.py | 4 +- event_routing_backends/settings/production.py | 12 +++ pylintrc | 8 +- requirements/base.in | 1 + requirements/base.txt | 13 ++- requirements/ci.txt | 2 +- requirements/dev.txt | 23 +++-- requirements/doc.txt | 21 +++-- requirements/quality.txt | 21 +++-- requirements/test.txt | 21 +++-- test_settings.py | 3 + 15 files changed, 239 insertions(+), 35 deletions(-) diff --git a/event_routing_backends/backends/events_router.py b/event_routing_backends/backends/events_router.py index 85832f4d..546e2152 100644 --- a/event_routing_backends/backends/events_router.py +++ b/event_routing_backends/backends/events_router.py @@ -1,8 +1,12 @@ """ Generic router to send events to hosts. """ +import json import logging +from datetime import datetime, timedelta +from django.conf import settings +from django_redis import get_redis_connection from eventtracking.processors.exceptions import EventEmissionExit from event_routing_backends.helpers import get_business_critical_events @@ -10,6 +14,9 @@ logger = logging.getLogger(__name__) +EVENTS_ROUTER_QUEUE_FORMAT = 'events_router_queue_{}' +EVENTS_ROUTER_LAST_SENT_FORMAT = 'last_sent_{}' + class EventsRouter: """ @@ -26,6 +33,8 @@ def __init__(self, processors=None, backend_name=None): """ self.processors = processors if processors else [] self.backend_name = backend_name + self.queue_name = EVENTS_ROUTER_QUEUE_FORMAT.format(self.backend_name) + self.last_sent_key = EVENTS_ROUTER_LAST_SENT_FORMAT.format(self.backend_name) def configure_host(self, host, router): """ @@ -154,8 +163,27 @@ def send(self, event): Arguments: event (dict): the original event dictionary """ - event_routes = self.prepare_to_send([event]) + if settings.EVENT_ROUTING_BACKEND_BATCHING_ENABLED: + redis = get_redis_connection() + batch = self.queue_event(redis, event) + if not batch: + return + + try: + redis.set(self.last_sent_key, datetime.now().isoformat()) + self.bulk_send([json.loads(queued_event.decode('utf-8')) for queued_event in batch]) + except Exception: # pylint: disable=broad-except + logger.exception( + 'Exception occurred while trying to bulk dispatch {} events.'.format( + len(batch) + ), + exc_info=True + ) + logger.info('Re sending the batched events to the queue.') + redis.lpush(self.queue_name, *batch) + return + event_routes = self.prepare_to_send([event]) for events_for_route in event_routes.values(): for event_name, updated_event, host, is_business_critical in events_for_route: if is_business_critical: @@ -173,6 +201,31 @@ def send(self, event): host['host_configurations'], ) + def queue_event(self, redis, event): + """ + Queue the event to be sent to configured routers. + + """ + event["timestamp"] = event["timestamp"].isoformat() + queue_size = redis.lpush(self.queue_name, json.dumps(event)) + logger.info(f'Event {event["name"]} has been queued for batching. Queue size: {queue_size}') + + if queue_size >= settings.EVENT_ROUTING_BACKEND_BATCH_SIZE or self.time_to_send(redis): + batch = redis.rpop(self.queue_name, queue_size) + return batch + + return None + + def time_to_send(self, redis): + """ + Check if it is time to send the batched events. + """ + last_sent = redis.get(self.last_sent_key) + if not last_sent: + return True + time_passed = (datetime.now() - datetime.fromisoformat(last_sent.decode('utf-8'))) + return time_passed > timedelta(seconds=settings.EVENT_ROUTING_BACKEND_BATCH_INTERVAL) + def process_event(self, event): """ Process the event through this router's processors. diff --git a/event_routing_backends/backends/tests/test_events_router.py b/event_routing_backends/backends/tests/test_events_router.py index 45cc35ff..8b9c5a44 100644 --- a/event_routing_backends/backends/tests/test_events_router.py +++ b/event_routing_backends/backends/tests/test_events_router.py @@ -1,11 +1,14 @@ """ Test the EventsRouter """ +import datetime +import json +from copy import copy from unittest.mock import MagicMock, call, patch, sentinel import ddt from django.conf import settings -from django.test import TestCase +from django.test import TestCase, override_settings from edx_django_utils.cache.utils import TieredCache from eventtracking.processors.exceptions import EventEmissionExit from tincan.statement import Statement @@ -257,6 +260,87 @@ def test_duplicate_xapi_event_id(self, mocked_logger): mocked_logger.info.mock_calls ) + @override_settings( + EVENT_ROUTING_BACKEND_BATCHING_ENABLED=True, + EVENT_ROUTING_BACKEND_BATCH_SIZE=2 + ) + @patch('event_routing_backends.backends.events_router.get_redis_connection') + @patch('event_routing_backends.backends.events_router.logger') + @patch('event_routing_backends.backends.events_router.EventsRouter.bulk_send') + def test_queue_event(self, mock_bulk_send, mock_logger, mock_get_redis_connection): + router = EventsRouter(processors=[], backend_name='test') + redis_mock = MagicMock() + mock_get_redis_connection.return_value = redis_mock + redis_mock.lpush.return_value = None + event1 = copy(self.transformed_event) + event1["timestamp"] = datetime.datetime.now() + event2 = copy(self.transformed_event) + event2["timestamp"] = datetime.datetime.now() + events = [event1, event2] + formatted_events = [] + for event in events: + formatted_event = copy(event) + formatted_event["timestamp"] = formatted_event["timestamp"].isoformat() + formatted_events.append(json.dumps(formatted_event).encode('utf-8')) + + redis_mock.rpop.return_value = formatted_events + redis_mock.lpush.return_value = 1 + redis_mock.get.return_value.decode.return_value = datetime.datetime.now().isoformat() + + router.send(event1) + redis_mock.lpush.return_value = 2 + router.send(event2) + + redis_mock.lpush.assert_any_call(router.queue_name, json.dumps(event1)) + redis_mock.rpop.assert_any_call(router.queue_name, settings.EVENT_ROUTING_BACKEND_BATCH_SIZE) + mock_logger.info.assert_any_call( + f"Event {self.transformed_event['name']} has been queued for batching. Queue size: 1" + ) + mock_bulk_send.assert_any_call(events) + + @override_settings( + EVENT_ROUTING_BACKEND_BATCHING_ENABLED=True, + EVENT_ROUTING_BACKEND_BATCH_SIZE=2 + ) + @patch('event_routing_backends.backends.events_router.get_redis_connection') + @patch('event_routing_backends.backends.events_router.logger') + @patch('event_routing_backends.backends.events_router.EventsRouter.bulk_send') + @patch('event_routing_backends.backends.events_router.EventsRouter.queue_event') + def test_send_event_with_bulk_exception( + self, + mock_queue_event, + mock_bulk_send, + mock_logger, + mock_get_redis_connection + ): + router = EventsRouter(processors=[], backend_name='test') + redis_mock = MagicMock() + mock_get_redis_connection.return_value = redis_mock + mock_queue_event.return_value = [1] + mock_bulk_send.side_effect = EventNotDispatched + + router.send(self.transformed_event) + + mock_logger.exception.assert_called_once_with( + 'Exception occurred while trying to bulk dispatch {} events.'.format( + 1 + ), + exc_info=True + ) + mock_logger.info.assert_called_once_with( + 'Re sending the batched events to the queue.' + ) + redis_mock.lpush.assert_called_once_with(router.queue_name, *[1]) + + @override_settings( + EVENT_ROUTING_BACKEND_BATCH_INTERVAL=1, + ) + def test_time_to_send_no_data(self): + router = EventsRouter(processors=[], backend_name='test') + redis_mock = MagicMock() + redis_mock.get.return_value = None + self.assertTrue(router.time_to_send(redis_mock)) + @ddt.ddt class TestAsyncEventsRouter(TestEventsRouter): # pylint: disable=test-inherits-tests diff --git a/event_routing_backends/helpers.py b/event_routing_backends/helpers.py index 4cbda2e4..1b0fcdd0 100644 --- a/event_routing_backends/helpers.py +++ b/event_routing_backends/helpers.py @@ -108,7 +108,7 @@ def get_user(username_or_id): if username and not user: try: user = get_potentially_retired_user_by_username(username) - except Exception as ex: + except Exception as ex: # pylint: disable=broad-except logger.info('User with username "%s" does not exist.%s', username, ex) return user diff --git a/event_routing_backends/management/commands/transform_tracking_logs.py b/event_routing_backends/management/commands/transform_tracking_logs.py index b11df861..b2009a61 100644 --- a/event_routing_backends/management/commands/transform_tracking_logs.py +++ b/event_routing_backends/management/commands/transform_tracking_logs.py @@ -41,7 +41,7 @@ def _get_chunks(source, file, start_byte, end_byte): break # Catching all exceptions here because there's no telling what all # the possible errors from different libcloud providers are. - except Exception as e: + except Exception as e: # pylint: disable=broad-except print(e) if try_number == num_retries: print(f"Try {try_number}: Error occurred downloading, giving up.") diff --git a/event_routing_backends/settings/common.py b/event_routing_backends/settings/common.py index ed8de5cb..424bd427 100644 --- a/event_routing_backends/settings/common.py +++ b/event_routing_backends/settings/common.py @@ -15,7 +15,9 @@ def plugin_settings(settings): settings.EVENT_ROUTING_BACKEND_COUNTDOWN = 30 settings.EVENT_ROUTING_BACKEND_BULK_DOWNLOAD_MAX_RETRIES = 3 settings.EVENT_ROUTING_BACKEND_BULK_DOWNLOAD_COUNTDOWN = 1 - + settings.EVENT_ROUTING_BACKEND_BATCHING_ENABLED = True + settings.EVENT_ROUTING_BACKEND_BATCH_SIZE = 100 + settings.EVENT_ROUTING_BACKEND_BATCH_INTERVAL = 60 # .. setting_name: XAPI_AGENT_IFI_TYPE # .. setting_default: 'external_id' # .. setting_description: This setting can be used to specify the type of inverse functional identifier diff --git a/event_routing_backends/settings/production.py b/event_routing_backends/settings/production.py index 2cd9e926..646bfc02 100644 --- a/event_routing_backends/settings/production.py +++ b/event_routing_backends/settings/production.py @@ -15,6 +15,18 @@ def plugin_settings(settings): 'EVENT_ROUTING_BACKEND_COUNTDOWN', settings.EVENT_ROUTING_BACKEND_COUNTDOWN ) + settings.EVENT_ROUTING_BACKEND_BATCH_SIZE = settings.ENV_TOKENS.get( + 'EVENT_ROUTING_BACKEND_BATCH_SIZE', + settings.EVENT_ROUTING_BACKEND_BATCH_SIZE + ) + settings.EVENT_ROUTING_BACKEND_BATCHING_ENABLED = settings.ENV_TOKENS.get( + 'EVENT_ROUTING_BACKEND_BATCHING_ENABLED', + settings.EVENT_ROUTING_BACKEND_BATCHING_ENABLED + ) + settings.EVENT_ROUTING_BACKEND_BATCH_INTERVAL = settings.ENV_TOKENS.get( + 'EVENT_ROUTING_BACKEND_BATCH_INTERVAL', + settings.EVENT_ROUTING_BACKEND_BATCH_INTERVAL + ) settings.CALIPER_EVENTS_ENABLED = settings.ENV_TOKENS.get( 'CALIPER_EVENTS_ENABLED', settings.CALIPER_EVENTS_ENABLED diff --git a/pylintrc b/pylintrc index 0f382171..5e90ef14 100644 --- a/pylintrc +++ b/pylintrc @@ -64,7 +64,7 @@ # SERIOUSLY. # # ------------------------------ -# Generated by edx-lint version: 5.2.5 +# Generated by edx-lint version: 5.3.6 # ------------------------------ [MASTER] ignore = migrations @@ -259,6 +259,7 @@ enable = useless-suppression, disable = bad-indentation, + broad-exception-raised, consider-using-f-string, duplicate-code, file-ignored, @@ -380,7 +381,4 @@ import-graph = ext-import-graph = int-import-graph = -[EXCEPTIONS] -overgeneral-exceptions = Exception - -# 6076f75771fc24c52dcc7314fbed2a99eacaf5f0 +# 056eb70bf90e13f94ca9ae8ac1fa84836c7139ba diff --git a/requirements/base.in b/requirements/base.in index f9fdf05e..96c2d8a3 100644 --- a/requirements/base.in +++ b/requirements/base.in @@ -16,3 +16,4 @@ edx-celeryutils apache-libcloud # For bulk event log loading fasteners # Locking tools, required by apache-libcloud, but somehow not installed with it openedx-filters +django-redis diff --git a/requirements/base.txt b/requirements/base.txt index 231a8394..9cc87233 100644 --- a/requirements/base.txt +++ b/requirements/base.txt @@ -12,6 +12,8 @@ apache-libcloud==3.8.0 # via -r requirements/base.in asgiref==3.7.2 # via django +async-timeout==4.0.3 + # via redis attrs==23.2.0 # via openedx-events backports-zoneinfo[tzdata]==0.2.1 @@ -48,7 +50,7 @@ click-repl==0.3.0 # via celery code-annotations==1.6.0 # via edx-toggles -cryptography==42.0.3 +cryptography==42.0.4 # via django-fernet-fields-v2 django==3.2.24 # via @@ -58,6 +60,7 @@ django==3.2.24 # django-crum # django-fernet-fields-v2 # django-model-utils + # django-redis # django-waffle # djangorestframework # edx-celeryutils @@ -77,6 +80,8 @@ django-fernet-fields-v2==0.9 # via -r requirements/base.in django-model-utils==4.4.0 # via edx-celeryutils +django-redis==5.4.0 + # via -r requirements/base.in django-waffle==4.1.0 # via # edx-django-utils @@ -119,7 +124,7 @@ markupsafe==2.1.5 # via jinja2 newrelic==9.6.0 # via edx-django-utils -openedx-events==9.5.1 +openedx-events==9.5.2 # via event-tracking openedx-filters==1.6.0 # via -r requirements/base.in @@ -152,6 +157,8 @@ pytz==2024.1 # tincan pyyaml==6.0.1 # via code-annotations +redis==5.0.1 + # via django-redis requests==2.31.0 # via # -r requirements/base.in @@ -181,7 +188,7 @@ tzdata==2024.1 # via # backports-zoneinfo # celery -urllib3==2.2.0 +urllib3==2.2.1 # via requests vine==5.1.0 # via diff --git a/requirements/ci.txt b/requirements/ci.txt index 742e39c1..bd57292b 100644 --- a/requirements/ci.txt +++ b/requirements/ci.txt @@ -32,7 +32,7 @@ tomli==2.0.1 # via # pyproject-api # tox -tox==4.12.1 +tox==4.13.0 # via -r requirements/ci.in virtualenv==20.25.0 # via tox diff --git a/requirements/dev.txt b/requirements/dev.txt index c7207bb8..b6303e42 100644 --- a/requirements/dev.txt +++ b/requirements/dev.txt @@ -25,6 +25,10 @@ astroid==3.0.3 # -r requirements/quality.txt # pylint # pylint-celery +async-timeout==4.0.3 + # via + # -r requirements/quality.txt + # redis attrs==23.2.0 # via # -r requirements/quality.txt @@ -106,11 +110,11 @@ colorama==0.4.6 # via # -r requirements/ci.txt # tox -coverage[toml]==7.4.1 +coverage[toml]==7.4.2 # via # -r requirements/quality.txt # pytest-cov -cryptography==42.0.3 +cryptography==42.0.4 # via # -r requirements/quality.txt # django-fernet-fields-v2 @@ -136,6 +140,7 @@ django==3.2.24 # django-crum # django-fernet-fields-v2 # django-model-utils + # django-redis # django-waffle # djangorestframework # edx-celeryutils @@ -159,6 +164,8 @@ django-model-utils==4.4.0 # via # -r requirements/quality.txt # edx-celeryutils +django-redis==5.4.0 + # via -r requirements/quality.txt django-waffle==4.1.0 # via # -r requirements/quality.txt @@ -264,7 +271,7 @@ newrelic==9.6.0 # via # -r requirements/quality.txt # edx-django-utils -openedx-events==9.5.1 +openedx-events==9.5.2 # via # -r requirements/quality.txt # event-tracking @@ -363,7 +370,7 @@ pyproject-hooks==1.0.0 # -r requirements/pip-tools.txt # build # pip-tools -pytest==8.0.0 +pytest==8.0.1 # via # -r requirements/quality.txt # pytest-cov @@ -393,6 +400,10 @@ pyyaml==6.0.1 # -r requirements/quality.txt # code-annotations # edx-i18n-tools +redis==5.0.1 + # via + # -r requirements/quality.txt + # django-redis requests==2.31.0 # via # -r requirements/quality.txt @@ -441,7 +452,7 @@ tomlkit==0.12.3 # via # -r requirements/quality.txt # pylint -tox==4.12.1 +tox==4.13.0 # via -r requirements/ci.txt typing-extensions==4.9.0 # via @@ -461,7 +472,7 @@ tzdata==2024.1 # -r requirements/quality.txt # backports-zoneinfo # celery -urllib3==2.2.0 +urllib3==2.2.1 # via # -r requirements/quality.txt # requests diff --git a/requirements/doc.txt b/requirements/doc.txt index 2513b488..e8b5ec94 100644 --- a/requirements/doc.txt +++ b/requirements/doc.txt @@ -22,6 +22,10 @@ asgiref==3.7.2 # via # -r requirements/test.txt # django +async-timeout==4.0.3 + # via + # -r requirements/test.txt + # redis attrs==23.2.0 # via # -r requirements/test.txt @@ -86,11 +90,11 @@ code-annotations==1.6.0 # via # -r requirements/test.txt # edx-toggles -coverage[toml]==7.4.1 +coverage[toml]==7.4.2 # via # -r requirements/test.txt # pytest-cov -cryptography==42.0.3 +cryptography==42.0.4 # via # -r requirements/test.txt # django-fernet-fields-v2 @@ -105,6 +109,7 @@ django==3.2.24 # django-crum # django-fernet-fields-v2 # django-model-utils + # django-redis # django-waffle # djangorestframework # edx-celeryutils @@ -127,6 +132,8 @@ django-model-utils==4.4.0 # via # -r requirements/test.txt # edx-celeryutils +django-redis==5.4.0 + # via -r requirements/test.txt django-waffle==4.1.0 # via # -r requirements/test.txt @@ -241,7 +248,7 @@ newrelic==9.6.0 # edx-django-utils nh3==0.2.15 # via readme-renderer -openedx-events==9.5.1 +openedx-events==9.5.2 # via # -r requirements/test.txt # event-tracking @@ -297,7 +304,7 @@ pynacl==1.5.0 # edx-django-utils pyproject-hooks==1.0.0 # via build -pytest==8.0.0 +pytest==8.0.1 # via # -r requirements/test.txt # pytest-cov @@ -329,6 +336,10 @@ pyyaml==6.0.1 # code-annotations readme-renderer==42.0 # via twine +redis==5.0.1 + # via + # -r requirements/test.txt + # django-redis requests==2.31.0 # via # -r requirements/test.txt @@ -416,7 +427,7 @@ tzdata==2024.1 # -r requirements/test.txt # backports-zoneinfo # celery -urllib3==2.2.0 +urllib3==2.2.1 # via # -r requirements/test.txt # requests diff --git a/requirements/quality.txt b/requirements/quality.txt index f665ed61..b4c331a4 100644 --- a/requirements/quality.txt +++ b/requirements/quality.txt @@ -22,6 +22,10 @@ astroid==3.0.3 # via # pylint # pylint-celery +async-timeout==4.0.3 + # via + # -r requirements/test.txt + # redis attrs==23.2.0 # via # -r requirements/test.txt @@ -83,11 +87,11 @@ code-annotations==1.6.0 # -r requirements/test.txt # edx-lint # edx-toggles -coverage[toml]==7.4.1 +coverage[toml]==7.4.2 # via # -r requirements/test.txt # pytest-cov -cryptography==42.0.3 +cryptography==42.0.4 # via # -r requirements/test.txt # django-fernet-fields-v2 @@ -103,6 +107,7 @@ django==3.2.24 # django-crum # django-fernet-fields-v2 # django-model-utils + # django-redis # django-waffle # djangorestframework # edx-celeryutils @@ -125,6 +130,8 @@ django-model-utils==4.4.0 # via # -r requirements/test.txt # edx-celeryutils +django-redis==5.4.0 + # via -r requirements/test.txt django-waffle==4.1.0 # via # -r requirements/test.txt @@ -209,7 +216,7 @@ newrelic==9.6.0 # via # -r requirements/test.txt # edx-django-utils -openedx-events==9.5.1 +openedx-events==9.5.2 # via # -r requirements/test.txt # event-tracking @@ -268,7 +275,7 @@ pynacl==1.5.0 # via # -r requirements/test.txt # edx-django-utils -pytest==8.0.0 +pytest==8.0.1 # via # -r requirements/test.txt # pytest-cov @@ -297,6 +304,10 @@ pyyaml==6.0.1 # via # -r requirements/test.txt # code-annotations +redis==5.0.1 + # via + # -r requirements/test.txt + # django-redis requests==2.31.0 # via # -r requirements/test.txt @@ -348,7 +359,7 @@ tzdata==2024.1 # -r requirements/test.txt # backports-zoneinfo # celery -urllib3==2.2.0 +urllib3==2.2.1 # via # -r requirements/test.txt # requests diff --git a/requirements/test.txt b/requirements/test.txt index d87bcc3f..c6e0bed6 100644 --- a/requirements/test.txt +++ b/requirements/test.txt @@ -18,6 +18,10 @@ asgiref==3.7.2 # via # -r requirements/base.txt # django +async-timeout==4.0.3 + # via + # -r requirements/base.txt + # redis attrs==23.2.0 # via # -r requirements/base.txt @@ -75,9 +79,9 @@ code-annotations==1.6.0 # -r requirements/base.txt # -r requirements/test.in # edx-toggles -coverage[toml]==7.4.1 +coverage[toml]==7.4.2 # via pytest-cov -cryptography==42.0.3 +cryptography==42.0.4 # via # -r requirements/base.txt # django-fernet-fields-v2 @@ -90,6 +94,7 @@ ddt==1.7.1 # django-crum # django-fernet-fields-v2 # django-model-utils + # django-redis # django-waffle # djangorestframework # edx-celeryutils @@ -112,6 +117,8 @@ django-model-utils==4.4.0 # via # -r requirements/base.txt # edx-celeryutils +django-redis==5.4.0 + # via -r requirements/base.txt django-waffle==4.1.0 # via # -r requirements/base.txt @@ -182,7 +189,7 @@ newrelic==9.6.0 # via # -r requirements/base.txt # edx-django-utils -openedx-events==9.5.1 +openedx-events==9.5.2 # via # -r requirements/base.txt # event-tracking @@ -217,7 +224,7 @@ pynacl==1.5.0 # via # -r requirements/base.txt # edx-django-utils -pytest==8.0.0 +pytest==8.0.1 # via # pytest-cov # pytest-django @@ -245,6 +252,10 @@ pyyaml==6.0.1 # via # -r requirements/base.txt # code-annotations +redis==5.0.1 + # via + # -r requirements/base.txt + # django-redis requests==2.31.0 # via # -r requirements/base.txt @@ -287,7 +298,7 @@ tzdata==2024.1 # -r requirements/base.txt # backports-zoneinfo # celery -urllib3==2.2.0 +urllib3==2.2.1 # via # -r requirements/base.txt # requests diff --git a/test_settings.py b/test_settings.py index 23efece7..56baf298 100644 --- a/test_settings.py +++ b/test_settings.py @@ -47,5 +47,8 @@ def root(*args): RUNNING_WITH_TEST_SETTINGS = True EVENT_TRACKING_BACKENDS = {} XAPI_AGENT_IFI_TYPE = 'external_id' +EVENT_ROUTING_BACKEND_BATCHING_ENABLED = False +EVENT_ROUTING_BACKEND_BATCH_SIZE = 1 +EVENT_ROUTING_BACKEND_BATCH_INTERVAL = 100 _mock_third_party_modules() From cfee7da9d3f5dcbd032d6d9ca20a5d882baa9e24 Mon Sep 17 00:00:00 2001 From: Cristhian Garcia Date: Fri, 1 Mar 2024 10:54:52 -0500 Subject: [PATCH 2/8] chore: bump version to 8.2.0 --- CHANGELOG.rst | 4 ++++ event_routing_backends/__init__.py | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 420c4ec8..ca71c17f 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -14,6 +14,10 @@ Change Log Unreleased ~~~~~~~~~~ +[8.2.0] + +* Add support for batching for EventsRouter. + [8.1.2] * Add grade.now_* events to the xAPI supported events list. diff --git a/event_routing_backends/__init__.py b/event_routing_backends/__init__.py index 882bd4cb..f2df879a 100644 --- a/event_routing_backends/__init__.py +++ b/event_routing_backends/__init__.py @@ -2,4 +2,4 @@ Various backends for receiving edX LMS events.. """ -__version__ = '8.1.2' +__version__ = '8.2.0' From b5c2c5cc35e5f5a94fcd7dbea851744b0d9ee654 Mon Sep 17 00:00:00 2001 From: Cristhian Garcia Date: Fri, 1 Mar 2024 12:56:05 -0500 Subject: [PATCH 3/8] docs: add batching and event bus configuration --- docs/getting_started.rst | 39 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/docs/getting_started.rst b/docs/getting_started.rst index 6b78e47d..fabaad5f 100644 --- a/docs/getting_started.rst +++ b/docs/getting_started.rst @@ -220,6 +220,45 @@ A sample override for ``xapi`` backend is presented below. Here we are allowing } } +Batching Configuration +---------------------- + +Batching of events can be configured using the following settings: + +#. ``EVENT_ROUTING_BACKEND_BATCHING_ENABLED``: If set to ``True``, events will be batched before being routed. Default is ``False``. +#. ``EVENT_ROUTING_BACKEND_BATCH_SIZE``: Maximum number of events to be batched together. Default is 100. +#. ``EVENT_ROUTING_BACKEND_BATCHING_INTERVAL``: Time interval (in seconds) after which events will be batched. Default is 60 seconds. + +Batching is done in the ``EventsRouter`` backend. If ``EVENT_ROUTING_BACKEND_BATCHING_ENABLED`` is set to ``True``, then events will be batched together and routed to the configured routers after the specified interval or when the batch size is reached, whichever happens first. + +In case of downtimes or network issues, events will be queued again to avoid data loss. However, there is no guarantee that the events will be routed in the same order as they were received. + +Event bus configuration +----------------------- + +The event bus backend can be configured as the producer of the events in which case, the events will be consumed from the event bus and routed to the configured routers. The event bus backend can be configured as follows with python: + +.. code-block:: python + + EVENT_TRACKING_BACKENDS["xapi"]["ENGINE"] = "eventtracking.backends.event_bus.EventBusRoutingBackend" + EVENT_TRACKING_BACKENDS["xapi"]["OPTIONS"]["backends"]["xapi"]["ENGINE"] = "event_routing_backends.backends.sync_events_router.SyncEventsRouter" + EVENT_TRACKING_BACKENDS["xapi"]["OPTIONS"].pop("backend_name") + INSTALLED_APPS.append("openedx_events") + SEND_TRACKING_EVENT_EMITTED_SIGNAL = True + EVENT_BUS_PRODUCER_CONFIG = { + "org.openedx.analytics.tracking.event.emitted.v1": { + "analytics": { + "event_key_field": "tracking_log.name", "enabled": True + } + } + } + +Once the event bus producer has been configured, the event bus producer can be started using the following command: + +.. code-block:: bash + + ./manage.py lms consume_events -t analytics -g event_routing_backends --extra '{"consumer_name": "event_routing_backends"}' + OpenEdx Filters =============== From 94c9f1c25b3945234222a2742c546fdd02c5374a Mon Sep 17 00:00:00 2001 From: Cristhian Garcia Date: Tue, 19 Mar 2024 15:50:16 -0500 Subject: [PATCH 4/8] chore: address PR suggestions --- event_routing_backends/settings/common.py | 19 ++++++++++++++++++- pylintrc | 3 +++ test_settings.py | 1 - 3 files changed, 21 insertions(+), 2 deletions(-) diff --git a/event_routing_backends/settings/common.py b/event_routing_backends/settings/common.py index 424bd427..9dbdf3c6 100644 --- a/event_routing_backends/settings/common.py +++ b/event_routing_backends/settings/common.py @@ -15,8 +15,25 @@ def plugin_settings(settings): settings.EVENT_ROUTING_BACKEND_COUNTDOWN = 30 settings.EVENT_ROUTING_BACKEND_BULK_DOWNLOAD_MAX_RETRIES = 3 settings.EVENT_ROUTING_BACKEND_BULK_DOWNLOAD_COUNTDOWN = 1 - settings.EVENT_ROUTING_BACKEND_BATCHING_ENABLED = True + # .. toggle_name: EVENT_ROUTING_BACKEND_BATCHING_ENABLED + # .. toggle_implementation: DjangoSetting + # .. toggle_default: False + # .. toggle_use_cases: opt_in + # .. toggle_creation_date: 2024-04-19 + # .. toggle_description: This setting can be used to enable or disable batching of events + # to be sent to the event routing backend. If enabled, events will be batched and sent + # to the event routing backend in batches of EVENT_ROUTING_BACKEND_BATCH_SIZE + settings.EVENT_ROUTING_BACKEND_BATCHING_ENABLED = False + # .. setting_name: EVENT_ROUTING_BACKEND_BATCH_SIZE + # .. setting_default: 100 + # .. setting_description: This setting can be used to specify the size of the batch of events + # to be sent to the event routing backend. This setting is only used if EVENT_ROUTING_BACKEND_BATCHING_ENABLED settings.EVENT_ROUTING_BACKEND_BATCH_SIZE = 100 + # .. setting_name: EVENT_ROUTING_BACKEND_BATCH_INTERVAL + # .. setting_default: 60 + # .. setting_description: This setting can be used to specify the interval in seconds after which + # the batch of events will be sent to the event routing backend. This setting is only used if + # EVENT_ROUTING_BACKEND_BATCHING_ENABLED. settings.EVENT_ROUTING_BACKEND_BATCH_INTERVAL = 60 # .. setting_name: XAPI_AGENT_IFI_TYPE # .. setting_default: 'external_id' diff --git a/pylintrc b/pylintrc index 5e90ef14..d1c6b7ee 100644 --- a/pylintrc +++ b/pylintrc @@ -381,4 +381,7 @@ import-graph = ext-import-graph = int-import-graph = +[EXCEPTIONS] +overgeneral-exceptions = builtins.Exception + # 056eb70bf90e13f94ca9ae8ac1fa84836c7139ba diff --git a/test_settings.py b/test_settings.py index 56baf298..0bc4d976 100644 --- a/test_settings.py +++ b/test_settings.py @@ -48,7 +48,6 @@ def root(*args): EVENT_TRACKING_BACKENDS = {} XAPI_AGENT_IFI_TYPE = 'external_id' EVENT_ROUTING_BACKEND_BATCHING_ENABLED = False -EVENT_ROUTING_BACKEND_BATCH_SIZE = 1 EVENT_ROUTING_BACKEND_BATCH_INTERVAL = 100 _mock_third_party_modules() From 983996c5ef72f2344da894491f1b3ea030ad10b5 Mon Sep 17 00:00:00 2001 From: Cristhian Garcia Date: Tue, 19 Mar 2024 16:42:11 -0500 Subject: [PATCH 5/8] feat: add dead letter queue for event bus backend --- .../backends/events_router.py | 20 ++- .../backends/tests/test_events_router.py | 30 +++- .../commands/recover_failed_events.py | 65 ++++++++ .../tests/test_recover_failed_events.py | 141 ++++++++++++++++++ 4 files changed, 251 insertions(+), 5 deletions(-) create mode 100644 event_routing_backends/management/commands/recover_failed_events.py create mode 100644 event_routing_backends/management/commands/tests/test_recover_failed_events.py diff --git a/event_routing_backends/backends/events_router.py b/event_routing_backends/backends/events_router.py index 546e2152..a1be4ed2 100644 --- a/event_routing_backends/backends/events_router.py +++ b/event_routing_backends/backends/events_router.py @@ -7,6 +7,7 @@ from django.conf import settings from django_redis import get_redis_connection +from eventtracking.backends.logger import DateTimeJSONEncoder from eventtracking.processors.exceptions import EventEmissionExit from event_routing_backends.helpers import get_business_critical_events @@ -15,6 +16,7 @@ logger = logging.getLogger(__name__) EVENTS_ROUTER_QUEUE_FORMAT = 'events_router_queue_{}' +EVENTS_ROUTER_DEAD_QUEUE_FORMAT = 'dead_queue_{}' EVENTS_ROUTER_LAST_SENT_FORMAT = 'last_sent_{}' @@ -34,6 +36,7 @@ def __init__(self, processors=None, backend_name=None): self.processors = processors if processors else [] self.backend_name = backend_name self.queue_name = EVENTS_ROUTER_QUEUE_FORMAT.format(self.backend_name) + self.dead_queue = EVENTS_ROUTER_DEAD_QUEUE_FORMAT.format(self.backend_name) self.last_sent_key = EVENTS_ROUTER_LAST_SENT_FORMAT.format(self.backend_name) def configure_host(self, host, router): @@ -126,6 +129,17 @@ def prepare_to_send(self, events): return route_events + def get_failed_events(self): + """ + Get failed events from the dead queue. + """ + redis = get_redis_connection() + n = redis.llen(self.dead_queue) + if not n: + return [] + failed_events = redis.rpop(self.dead_queue, n) + return [json.loads(event.decode('utf-8')) for event in failed_events] + def bulk_send(self, events): """ Send the event to configured routers after processing it. @@ -179,8 +193,8 @@ def send(self, event): ), exc_info=True ) - logger.info('Re sending the batched events to the queue.') - redis.lpush(self.queue_name, *batch) + logger.info(f'Pushing failed events to the dead queue: {self.dead_queue}') + redis.lpush(self.dead_queue, *batch) return event_routes = self.prepare_to_send([event]) @@ -207,7 +221,7 @@ def queue_event(self, redis, event): """ event["timestamp"] = event["timestamp"].isoformat() - queue_size = redis.lpush(self.queue_name, json.dumps(event)) + queue_size = redis.lpush(self.queue_name, json.dumps(event, cls=DateTimeJSONEncoder)) logger.info(f'Event {event["name"]} has been queued for batching. Queue size: {queue_size}') if queue_size >= settings.EVENT_ROUTING_BACKEND_BATCH_SIZE or self.time_to_send(redis): diff --git a/event_routing_backends/backends/tests/test_events_router.py b/event_routing_backends/backends/tests/test_events_router.py index 8b9c5a44..264d19a2 100644 --- a/event_routing_backends/backends/tests/test_events_router.py +++ b/event_routing_backends/backends/tests/test_events_router.py @@ -328,9 +328,9 @@ def test_send_event_with_bulk_exception( exc_info=True ) mock_logger.info.assert_called_once_with( - 'Re sending the batched events to the queue.' + f'Pushing failed events to the dead queue: {router.dead_queue}' ) - redis_mock.lpush.assert_called_once_with(router.queue_name, *[1]) + redis_mock.lpush.assert_called_once_with(router.dead_queue, *[1]) @override_settings( EVENT_ROUTING_BACKEND_BATCH_INTERVAL=1, @@ -1230,3 +1230,29 @@ def test_failed_routing(self, mocked_remote_lrs): router = SyncEventsRouter(processors=[], backend_name=RouterConfiguration.XAPI_BACKEND) with self.assertRaises(EventNotDispatched): router.send(self.transformed_event) + + @patch('event_routing_backends.backends.events_router.get_redis_connection') + def test_get_failed_events(self, mock_get_redis_connection): + redis_mock = MagicMock() + mock_get_redis_connection.return_value = redis_mock + redis_mock.llen.return_value = 1 + redis_mock.rpop.return_value = [json.dumps({'name': 'test', 'data': {'key': 'value'}}).encode('utf-8')] + + router = SyncEventsRouter(processors=[], backend_name='test') + router.get_failed_events() + + redis_mock.llen.assert_called_once_with(router.dead_queue) + redis_mock.rpop.assert_called_once_with(router.dead_queue, 1) + + + @patch('event_routing_backends.backends.events_router.get_redis_connection') + def test_get_failed_events_empty(self, mock_get_redis_connection): + redis_mock = MagicMock() + mock_get_redis_connection.return_value = redis_mock + redis_mock.llen.return_value = 0 + + router = SyncEventsRouter(processors=[], backend_name='test') + events = router.get_failed_events() + + redis_mock.llen.assert_called_once_with(router.dead_queue) + self.assertEqual(events, []) diff --git a/event_routing_backends/management/commands/recover_failed_events.py b/event_routing_backends/management/commands/recover_failed_events.py new file mode 100644 index 00000000..faba8f57 --- /dev/null +++ b/event_routing_backends/management/commands/recover_failed_events.py @@ -0,0 +1,65 @@ +""" +Management command for resending events when a failure occurs. +""" + +import logging +from textwrap import dedent + +from django.conf import settings +from django.core.management.base import BaseCommand +from eventtracking.backends.async_routing import AsyncRoutingBackend +from eventtracking.backends.event_bus import EventBusRoutingBackend +from eventtracking.tracker import get_tracker + +logger = logging.getLogger(__name__) + + +class Command(BaseCommand): + """ + Management command for resending events when a failure occurs + in the event routing backend. + """ + + help = dedent(__doc__).strip() + + def add_arguments(self, parser): + parser.add_argument( + "--transformer_type", + choices=["xapi", "caliper", "all"], + required=True, + help="The type of transformation to do, only one can be done at a time.", + ) + + def handle(self, *args, **options): + """ + Configure the command and start the transform process. + """ + logger.info("Recovering failed events") + transformer_type = options["transformer_type"] + tracker = get_tracker() + + engines = { + name: engine + for name, engine in tracker.backends.items() + if isinstance(engine, EventBusRoutingBackend) + } + + if not engines: + logger.info("No compatible backend found.") + return + + settings.EVENT_ROUTING_BACKEND_BATCHING_ENABLED = False + + for name, engine in engines.items(): + if transformer_type not in ("all", name): + logger.info("Skipping backend: {}".format(name)) + continue + for backend_name, backend in engine.backends.items(): + failed_events = backend.get_failed_events() + if not failed_events: + logger.info( + "No failed events found for backend: {}".format(backend_name) + ) + continue + for event in failed_events: + backend.send(event) diff --git a/event_routing_backends/management/commands/tests/test_recover_failed_events.py b/event_routing_backends/management/commands/tests/test_recover_failed_events.py new file mode 100644 index 00000000..3c98b8ca --- /dev/null +++ b/event_routing_backends/management/commands/tests/test_recover_failed_events.py @@ -0,0 +1,141 @@ +""" +Tests for the transform_tracking_logs management command. +""" + +from unittest.mock import Mock, patch + +from django.core.management import call_command +from django.test import TestCase +from django.test.utils import override_settings +from eventtracking.django.django_tracker import DjangoTracker + +from event_routing_backends.management.commands.recover_failed_events import Command as RecoverFailedEventsCommand + + +class TestRecoverFailedEvents(TestCase): + @override_settings( + EVENT_TRACKING_BACKENDS={ + "event_bus": { + "ENGINE": "eventtracking.backends.event_bus.EventBusRoutingBackend", + "OPTIONS": { + "backends": { + "xapi": { + "ENGINE": "event_routing_backends.backends.async_events_router.AsyncEventsRouter", + "OPTIONS": { + "processors": [ + { + "ENGINE": "event_routing_backends.processors.xapi.transformer_processor.XApiProcessor", + "OPTIONS": {}, + } + ], + "backend_name": "xapi", + }, + } + }, + }, + }, + } + ) + @patch("event_routing_backends.management.commands.recover_failed_events.get_tracker") + def test_send_tracking_log_to_backends(self, mock_get_tracker): + """ + Test for send_tracking_log_to_backends + """ + tracker = DjangoTracker() + mock_get_tracker.return_value = tracker + mock_backend = Mock() + tracker.backends["event_bus"].backends["xapi"] = mock_backend + mock_backend.get_failed_events.return_value = [{"event": "event"}] + + call_command( + 'recover_failed_events', + transformer_type="all" + ) + + mock_backend.send.assert_called_once_with({"event": "event"}) + + + @override_settings( + EVENT_TRACKING_BACKENDS={ + "event_bus": { + "ENGINE": "eventtracking.backends.event_bus.EventBusRoutingBackend", + "OPTIONS": { + "backends": { + "xapi": { + "ENGINE": "event_routing_backends.backends.async_events_router.AsyncEventsRouter", + "OPTIONS": { + "processors": [ + { + "ENGINE": "event_routing_backends.processors.xapi.transformer_processor.XApiProcessor", + "OPTIONS": {}, + } + ], + "backend_name": "xapi", + }, + } + }, + }, + }, + "xapi": { + "ENGINE": "eventtracking.backends.event_bus.EventBusRoutingBackend", + "OPTIONS": { + "backends": { + "xapi": { + "ENGINE": "event_routing_backends.backends.async_events_router.AsyncEventsRouter", + "OPTIONS": { + "processors": [ + { + "ENGINE": "event_routing_backends.processors.xapi.transformer_processor.XApiProcessor", + "OPTIONS": {}, + } + ], + "backend_name": "xapi", + }, + } + }, + }, + }, + } + ) + @patch("event_routing_backends.management.commands.recover_failed_events.get_tracker") + def test_send_tracking_log_to_backends_no_failed_events(self, mock_get_tracker): + """ + Test for send_tracking_log_to_backends + """ + tracker = DjangoTracker() + mock_get_tracker.return_value = tracker + mock_backend = Mock() + tracker.backends["xapi"].backends["xapi"] = mock_backend + mock_backend.get_failed_events.return_value = [] + + call_command( + 'recover_failed_events', + transformer_type="xapi" + ) + + mock_backend.send.assert_not_called() + + @override_settings( + EVENT_TRACKING_BACKENDS={ + "event_bus": { + "ENGINE": "eventtracking.backends.logger.LoggerBackend", + "OPTIONS": {}, + }, + } + ) + @patch("event_routing_backends.management.commands.recover_failed_events.get_tracker") + @patch("event_routing_backends.management.commands.recover_failed_events.logger") + def test_send_tracking_log_to_backends_no_engines(self, mock_logger, mock_get_tracker): + """ + Test for send_tracking_log_to_backends + """ + tracker = DjangoTracker() + mock_get_tracker.return_value = tracker + + call_command( + 'recover_failed_events', + transformer_type="all" + ) + + mock_logger.info.assert_any_call("Recovering failed events") + mock_logger.info.assert_any_call("No compatible backend found.") From e871e3c8d5b64696221bc5551539f1a08fa75d37 Mon Sep 17 00:00:00 2001 From: Cristhian Garcia Date: Wed, 20 Mar 2024 13:42:53 -0500 Subject: [PATCH 6/8] chore: quality fixes --- .../backends/tests/test_events_router.py | 1 - .../commands/recover_failed_events.py | 1 - .../tests/test_recover_failed_events.py | 90 +++++++------------ 3 files changed, 30 insertions(+), 62 deletions(-) diff --git a/event_routing_backends/backends/tests/test_events_router.py b/event_routing_backends/backends/tests/test_events_router.py index 264d19a2..6831f98f 100644 --- a/event_routing_backends/backends/tests/test_events_router.py +++ b/event_routing_backends/backends/tests/test_events_router.py @@ -1244,7 +1244,6 @@ def test_get_failed_events(self, mock_get_redis_connection): redis_mock.llen.assert_called_once_with(router.dead_queue) redis_mock.rpop.assert_called_once_with(router.dead_queue, 1) - @patch('event_routing_backends.backends.events_router.get_redis_connection') def test_get_failed_events_empty(self, mock_get_redis_connection): redis_mock = MagicMock() diff --git a/event_routing_backends/management/commands/recover_failed_events.py b/event_routing_backends/management/commands/recover_failed_events.py index faba8f57..25e12caa 100644 --- a/event_routing_backends/management/commands/recover_failed_events.py +++ b/event_routing_backends/management/commands/recover_failed_events.py @@ -7,7 +7,6 @@ from django.conf import settings from django.core.management.base import BaseCommand -from eventtracking.backends.async_routing import AsyncRoutingBackend from eventtracking.backends.event_bus import EventBusRoutingBackend from eventtracking.tracker import get_tracker diff --git a/event_routing_backends/management/commands/tests/test_recover_failed_events.py b/event_routing_backends/management/commands/tests/test_recover_failed_events.py index 3c98b8ca..d2aecfa5 100644 --- a/event_routing_backends/management/commands/tests/test_recover_failed_events.py +++ b/event_routing_backends/management/commands/tests/test_recover_failed_events.py @@ -9,7 +9,18 @@ from django.test.utils import override_settings from eventtracking.django.django_tracker import DjangoTracker -from event_routing_backends.management.commands.recover_failed_events import Command as RecoverFailedEventsCommand +XAPI_PROCESSOR = { + "ENGINE": "event_routing_backends.backends.async_events_router.AsyncEventsRouter", + "OPTIONS": { + "processors": [ + { + "ENGINE": "event_routing_backends.processors.xapi.transformer_processor.XApiProcessor", + "OPTIONS": {}, + } + ], + "backend_name": "xapi", + }, +} class TestRecoverFailedEvents(TestCase): @@ -18,25 +29,14 @@ class TestRecoverFailedEvents(TestCase): "event_bus": { "ENGINE": "eventtracking.backends.event_bus.EventBusRoutingBackend", "OPTIONS": { - "backends": { - "xapi": { - "ENGINE": "event_routing_backends.backends.async_events_router.AsyncEventsRouter", - "OPTIONS": { - "processors": [ - { - "ENGINE": "event_routing_backends.processors.xapi.transformer_processor.XApiProcessor", - "OPTIONS": {}, - } - ], - "backend_name": "xapi", - }, - } - }, + "backends": {"xapi": XAPI_PROCESSOR}, }, }, } ) - @patch("event_routing_backends.management.commands.recover_failed_events.get_tracker") + @patch( + "event_routing_backends.management.commands.recover_failed_events.get_tracker" + ) def test_send_tracking_log_to_backends(self, mock_get_tracker): """ Test for send_tracking_log_to_backends @@ -47,57 +47,29 @@ def test_send_tracking_log_to_backends(self, mock_get_tracker): tracker.backends["event_bus"].backends["xapi"] = mock_backend mock_backend.get_failed_events.return_value = [{"event": "event"}] - call_command( - 'recover_failed_events', - transformer_type="all" - ) + call_command("recover_failed_events", transformer_type="all") mock_backend.send.assert_called_once_with({"event": "event"}) - @override_settings( EVENT_TRACKING_BACKENDS={ "event_bus": { "ENGINE": "eventtracking.backends.event_bus.EventBusRoutingBackend", "OPTIONS": { - "backends": { - "xapi": { - "ENGINE": "event_routing_backends.backends.async_events_router.AsyncEventsRouter", - "OPTIONS": { - "processors": [ - { - "ENGINE": "event_routing_backends.processors.xapi.transformer_processor.XApiProcessor", - "OPTIONS": {}, - } - ], - "backend_name": "xapi", - }, - } - }, + "backends": {"xapi": XAPI_PROCESSOR}, }, }, "xapi": { "ENGINE": "eventtracking.backends.event_bus.EventBusRoutingBackend", "OPTIONS": { - "backends": { - "xapi": { - "ENGINE": "event_routing_backends.backends.async_events_router.AsyncEventsRouter", - "OPTIONS": { - "processors": [ - { - "ENGINE": "event_routing_backends.processors.xapi.transformer_processor.XApiProcessor", - "OPTIONS": {}, - } - ], - "backend_name": "xapi", - }, - } - }, + "backends": {"xapi": XAPI_PROCESSOR}, }, }, } ) - @patch("event_routing_backends.management.commands.recover_failed_events.get_tracker") + @patch( + "event_routing_backends.management.commands.recover_failed_events.get_tracker" + ) def test_send_tracking_log_to_backends_no_failed_events(self, mock_get_tracker): """ Test for send_tracking_log_to_backends @@ -108,10 +80,7 @@ def test_send_tracking_log_to_backends_no_failed_events(self, mock_get_tracker): tracker.backends["xapi"].backends["xapi"] = mock_backend mock_backend.get_failed_events.return_value = [] - call_command( - 'recover_failed_events', - transformer_type="xapi" - ) + call_command("recover_failed_events", transformer_type="xapi") mock_backend.send.assert_not_called() @@ -123,19 +92,20 @@ def test_send_tracking_log_to_backends_no_failed_events(self, mock_get_tracker): }, } ) - @patch("event_routing_backends.management.commands.recover_failed_events.get_tracker") + @patch( + "event_routing_backends.management.commands.recover_failed_events.get_tracker" + ) @patch("event_routing_backends.management.commands.recover_failed_events.logger") - def test_send_tracking_log_to_backends_no_engines(self, mock_logger, mock_get_tracker): + def test_send_tracking_log_to_backends_no_engines( + self, mock_logger, mock_get_tracker + ): """ Test for send_tracking_log_to_backends """ tracker = DjangoTracker() mock_get_tracker.return_value = tracker - call_command( - 'recover_failed_events', - transformer_type="all" - ) + call_command("recover_failed_events", transformer_type="all") mock_logger.info.assert_any_call("Recovering failed events") mock_logger.info.assert_any_call("No compatible backend found.") From d64d1da764f1afefa795a920d6a9428380f20f2d Mon Sep 17 00:00:00 2001 From: Cristhian Garcia Date: Thu, 21 Mar 2024 15:39:49 -0500 Subject: [PATCH 7/8] chore: address PR suggestions --- docs/getting_started.rst | 9 ++- .../backends/events_router.py | 7 +- .../backends/tests/test_events_router.py | 9 +-- .../commands/recover_failed_events.py | 41 +++++++--- .../tests/test_recover_failed_events.py | 79 ++++++++++++++++++- 5 files changed, 120 insertions(+), 25 deletions(-) diff --git a/docs/getting_started.rst b/docs/getting_started.rst index fabaad5f..e6f66dfc 100644 --- a/docs/getting_started.rst +++ b/docs/getting_started.rst @@ -227,7 +227,7 @@ Batching of events can be configured using the following settings: #. ``EVENT_ROUTING_BACKEND_BATCHING_ENABLED``: If set to ``True``, events will be batched before being routed. Default is ``False``. #. ``EVENT_ROUTING_BACKEND_BATCH_SIZE``: Maximum number of events to be batched together. Default is 100. -#. ``EVENT_ROUTING_BACKEND_BATCHING_INTERVAL``: Time interval (in seconds) after which events will be batched. Default is 60 seconds. +#. ``EVENT_ROUTING_BACKEND_BATCH_INTERVAL``: Time interval (in seconds) after which events will be ent, whether or not the batch size criteria is met. Default is 60 seconds. Batching is done in the ``EventsRouter`` backend. If ``EVENT_ROUTING_BACKEND_BATCHING_ENABLED`` is set to ``True``, then events will be batched together and routed to the configured routers after the specified interval or when the batch size is reached, whichever happens first. @@ -236,14 +236,15 @@ In case of downtimes or network issues, events will be queued again to avoid dat Event bus configuration ----------------------- -The event bus backend can be configured as the producer of the events in which case, the events will be consumed from the event bus and routed to the configured routers. The event bus backend can be configured as follows with python: +The event bus backend can be configured as the producer of the events. In that case, the events will be consumed from the event bus and routed to the configured routers via event bus consumers. The event bus backend can be configured in your edx-platform settings as follows: .. code-block:: python EVENT_TRACKING_BACKENDS["xapi"]["ENGINE"] = "eventtracking.backends.event_bus.EventBusRoutingBackend" EVENT_TRACKING_BACKENDS["xapi"]["OPTIONS"]["backends"]["xapi"]["ENGINE"] = "event_routing_backends.backends.sync_events_router.SyncEventsRouter" EVENT_TRACKING_BACKENDS["xapi"]["OPTIONS"].pop("backend_name") - INSTALLED_APPS.append("openedx_events") + if "openedx_events" not in INSTALLED_APPS: + INSTALLED_APPS.append("openedx_events") SEND_TRACKING_EVENT_EMITTED_SIGNAL = True EVENT_BUS_PRODUCER_CONFIG = { "org.openedx.analytics.tracking.event.emitted.v1": { @@ -253,7 +254,7 @@ The event bus backend can be configured as the producer of the events in which c } } -Once the event bus producer has been configured, the event bus producer can be started using the following command: +Once the event bus producer has been configured, the event bus consumer can be started using the following command: .. code-block:: bash diff --git a/event_routing_backends/backends/events_router.py b/event_routing_backends/backends/events_router.py index a1be4ed2..1917adc5 100644 --- a/event_routing_backends/backends/events_router.py +++ b/event_routing_backends/backends/events_router.py @@ -129,15 +129,14 @@ def prepare_to_send(self, events): return route_events - def get_failed_events(self): + def get_failed_events(self, batch_size): """ Get failed events from the dead queue. """ redis = get_redis_connection() - n = redis.llen(self.dead_queue) - if not n: + failed_events = redis.rpop(self.dead_queue, batch_size) + if not failed_events: return [] - failed_events = redis.rpop(self.dead_queue, n) return [json.loads(event.decode('utf-8')) for event in failed_events] def bulk_send(self, events): diff --git a/event_routing_backends/backends/tests/test_events_router.py b/event_routing_backends/backends/tests/test_events_router.py index 6831f98f..2e5f5033 100644 --- a/event_routing_backends/backends/tests/test_events_router.py +++ b/event_routing_backends/backends/tests/test_events_router.py @@ -1235,23 +1235,20 @@ def test_failed_routing(self, mocked_remote_lrs): def test_get_failed_events(self, mock_get_redis_connection): redis_mock = MagicMock() mock_get_redis_connection.return_value = redis_mock - redis_mock.llen.return_value = 1 redis_mock.rpop.return_value = [json.dumps({'name': 'test', 'data': {'key': 'value'}}).encode('utf-8')] router = SyncEventsRouter(processors=[], backend_name='test') - router.get_failed_events() + router.get_failed_events(1) - redis_mock.llen.assert_called_once_with(router.dead_queue) redis_mock.rpop.assert_called_once_with(router.dead_queue, 1) @patch('event_routing_backends.backends.events_router.get_redis_connection') def test_get_failed_events_empty(self, mock_get_redis_connection): redis_mock = MagicMock() mock_get_redis_connection.return_value = redis_mock - redis_mock.llen.return_value = 0 + redis_mock.rpop.return_value = None router = SyncEventsRouter(processors=[], backend_name='test') - events = router.get_failed_events() + events = router.get_failed_events(1) - redis_mock.llen.assert_called_once_with(router.dead_queue) self.assertEqual(events, []) diff --git a/event_routing_backends/management/commands/recover_failed_events.py b/event_routing_backends/management/commands/recover_failed_events.py index 25e12caa..8f9a1090 100644 --- a/event_routing_backends/management/commands/recover_failed_events.py +++ b/event_routing_backends/management/commands/recover_failed_events.py @@ -9,6 +9,7 @@ from django.core.management.base import BaseCommand from eventtracking.backends.event_bus import EventBusRoutingBackend from eventtracking.tracker import get_tracker +from event_routing_backends.processors.transformer_utils.exceptions import EventNotDispatched logger = logging.getLogger(__name__) @@ -26,7 +27,12 @@ def add_arguments(self, parser): "--transformer_type", choices=["xapi", "caliper", "all"], required=True, - help="The type of transformation to do, only one can be done at a time.", + help="The type of transformed events to recover.", + ) + parser.add_argument( + "--batch_size", + default=100, + help="The number of events to recover at a time. Default is 100.", ) def handle(self, *args, **options): @@ -34,7 +40,9 @@ def handle(self, *args, **options): Configure the command and start the transform process. """ logger.info("Recovering failed events") + logger.warning("This command is intended for use in recovery situations only.") transformer_type = options["transformer_type"] + batch_size = options["batch_size"] tracker = get_tracker() engines = { @@ -47,18 +55,33 @@ def handle(self, *args, **options): logger.info("No compatible backend found.") return + # In the recovery process we are disabling batching to prevent + # single event failures from blocking the recovery process. settings.EVENT_ROUTING_BACKEND_BATCHING_ENABLED = False + success = 0 + malformed = 0 + failed = 0 for name, engine in engines.items(): if transformer_type not in ("all", name): logger.info("Skipping backend: {}".format(name)) continue for backend_name, backend in engine.backends.items(): - failed_events = backend.get_failed_events() - if not failed_events: - logger.info( - "No failed events found for backend: {}".format(backend_name) - ) - continue - for event in failed_events: - backend.send(event) + while failed_events:=backend.get_failed_events(batch_size): + logger.info("Recovering {} failed events for backend {}".format(len(failed_events), backend_name)) + for event in failed_events: + try: + backend.send(event) + success += 1 + except EventNotDispatched as e: + logger.error("Malformed event: {}".format(event["name"])) + malformed += 1 + except Exception as e: + # Backend can still be in a bad state, so we need to catch all exceptions + logger.error("Failed to send event: {}".format(e)) + failed += 1 + + logger.info("Recovery process completed.") + logger.info("Recovered events : {}".format(success)) + logger.info("Failed to recover : {}".format(failed)) + logger.info("Malformed events : {} ".format(malformed)) diff --git a/event_routing_backends/management/commands/tests/test_recover_failed_events.py b/event_routing_backends/management/commands/tests/test_recover_failed_events.py index d2aecfa5..68eafc57 100644 --- a/event_routing_backends/management/commands/tests/test_recover_failed_events.py +++ b/event_routing_backends/management/commands/tests/test_recover_failed_events.py @@ -8,6 +8,7 @@ from django.test import TestCase from django.test.utils import override_settings from eventtracking.django.django_tracker import DjangoTracker +from event_routing_backends.processors.transformer_utils.exceptions import EventNotDispatched XAPI_PROCESSOR = { "ENGINE": "event_routing_backends.backends.async_events_router.AsyncEventsRouter", @@ -22,6 +23,21 @@ }, } +called = False + + +class Mocker: + """Custom class that returns a list of failed events in the first call and an empty list in following calls""" + + def __init__(self): + self.called = False + + def custom_get_failed_events(self, batch_size): + if not self.called: + self.called = True + return [{"name": "test"}] + return [] + class TestRecoverFailedEvents(TestCase): @override_settings( @@ -45,11 +61,70 @@ def test_send_tracking_log_to_backends(self, mock_get_tracker): mock_get_tracker.return_value = tracker mock_backend = Mock() tracker.backends["event_bus"].backends["xapi"] = mock_backend - mock_backend.get_failed_events.return_value = [{"event": "event"}] + mock_backend.get_failed_events = Mocker().custom_get_failed_events + + call_command("recover_failed_events", transformer_type="all") + + mock_backend.send.assert_called_once_with({"name": "test"}) + + @override_settings( + EVENT_TRACKING_BACKENDS={ + "event_bus": { + "ENGINE": "eventtracking.backends.event_bus.EventBusRoutingBackend", + "OPTIONS": { + "backends": {"xapi": XAPI_PROCESSOR}, + }, + }, + } + ) + @patch( + "event_routing_backends.management.commands.recover_failed_events.get_tracker" + ) + @patch("event_routing_backends.management.commands.recover_failed_events.logger") + def test_send_tracking_log_to_backends_with_exception(self, mock_logger, mock_get_tracker): + """ + Test for send_tracking_log_to_backends + """ + tracker = DjangoTracker() + mock_get_tracker.return_value = tracker + mock_backend = Mock() + tracker.backends["event_bus"].backends["xapi"] = mock_backend + mock_backend.get_failed_events = Mocker().custom_get_failed_events + mock_backend.send.side_effect = Exception("Error") + + call_command("recover_failed_events", transformer_type="all") + + #mock_logger.error.assert_called_once_with("Malformed event: {}".format("test")) + mock_logger.error.assert_called_once_with("Failed to send event: Error") + + @override_settings( + EVENT_TRACKING_BACKENDS={ + "event_bus": { + "ENGINE": "eventtracking.backends.event_bus.EventBusRoutingBackend", + "OPTIONS": { + "backends": {"xapi": XAPI_PROCESSOR}, + }, + }, + } + ) + @patch( + "event_routing_backends.management.commands.recover_failed_events.get_tracker" + ) + @patch("event_routing_backends.management.commands.recover_failed_events.logger") + def test_send_tracking_log_to_backends_with_event_exception(self, mock_logger, mock_get_tracker): + """ + Test for send_tracking_log_to_backends + """ + tracker = DjangoTracker() + mock_get_tracker.return_value = tracker + mock_backend = Mock() + tracker.backends["event_bus"].backends["xapi"] = mock_backend + mock_backend.get_failed_events = Mocker().custom_get_failed_events + mock_backend.send.side_effect = EventNotDispatched("Error") call_command("recover_failed_events", transformer_type="all") - mock_backend.send.assert_called_once_with({"event": "event"}) + mock_logger.error.assert_called_once_with("Malformed event: {}".format("test")) @override_settings( EVENT_TRACKING_BACKENDS={ From f8a14157664850fdf7d4898336450b2b086a0cf1 Mon Sep 17 00:00:00 2001 From: Cristhian Garcia Date: Thu, 21 Mar 2024 16:18:40 -0500 Subject: [PATCH 8/8] chore: quality fixes --- .../management/commands/recover_failed_events.py | 13 +++++++++---- .../commands/tests/test_recover_failed_events.py | 10 +++++++--- 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/event_routing_backends/management/commands/recover_failed_events.py b/event_routing_backends/management/commands/recover_failed_events.py index 8f9a1090..c6806e4d 100644 --- a/event_routing_backends/management/commands/recover_failed_events.py +++ b/event_routing_backends/management/commands/recover_failed_events.py @@ -9,6 +9,7 @@ from django.core.management.base import BaseCommand from eventtracking.backends.event_bus import EventBusRoutingBackend from eventtracking.tracker import get_tracker + from event_routing_backends.processors.transformer_utils.exceptions import EventNotDispatched logger = logging.getLogger(__name__) @@ -67,16 +68,20 @@ def handle(self, *args, **options): logger.info("Skipping backend: {}".format(name)) continue for backend_name, backend in engine.backends.items(): - while failed_events:=backend.get_failed_events(batch_size): - logger.info("Recovering {} failed events for backend {}".format(len(failed_events), backend_name)) + while failed_events := backend.get_failed_events(batch_size): + logger.info( + "Recovering {} failed events for backend {}".format( + len(failed_events), backend_name + ) + ) for event in failed_events: try: backend.send(event) success += 1 - except EventNotDispatched as e: + except EventNotDispatched: logger.error("Malformed event: {}".format(event["name"])) malformed += 1 - except Exception as e: + except Exception as e: # pylint: disable=broad-except # Backend can still be in a bad state, so we need to catch all exceptions logger.error("Failed to send event: {}".format(e)) failed += 1 diff --git a/event_routing_backends/management/commands/tests/test_recover_failed_events.py b/event_routing_backends/management/commands/tests/test_recover_failed_events.py index 68eafc57..73666441 100644 --- a/event_routing_backends/management/commands/tests/test_recover_failed_events.py +++ b/event_routing_backends/management/commands/tests/test_recover_failed_events.py @@ -8,6 +8,7 @@ from django.test import TestCase from django.test.utils import override_settings from eventtracking.django.django_tracker import DjangoTracker + from event_routing_backends.processors.transformer_utils.exceptions import EventNotDispatched XAPI_PROCESSOR = { @@ -81,7 +82,9 @@ def test_send_tracking_log_to_backends(self, mock_get_tracker): "event_routing_backends.management.commands.recover_failed_events.get_tracker" ) @patch("event_routing_backends.management.commands.recover_failed_events.logger") - def test_send_tracking_log_to_backends_with_exception(self, mock_logger, mock_get_tracker): + def test_send_tracking_log_to_backends_with_exception( + self, mock_logger, mock_get_tracker + ): """ Test for send_tracking_log_to_backends """ @@ -94,7 +97,6 @@ def test_send_tracking_log_to_backends_with_exception(self, mock_logger, mock_ge call_command("recover_failed_events", transformer_type="all") - #mock_logger.error.assert_called_once_with("Malformed event: {}".format("test")) mock_logger.error.assert_called_once_with("Failed to send event: Error") @override_settings( @@ -111,7 +113,9 @@ def test_send_tracking_log_to_backends_with_exception(self, mock_logger, mock_ge "event_routing_backends.management.commands.recover_failed_events.get_tracker" ) @patch("event_routing_backends.management.commands.recover_failed_events.logger") - def test_send_tracking_log_to_backends_with_event_exception(self, mock_logger, mock_get_tracker): + def test_send_tracking_log_to_backends_with_event_exception( + self, mock_logger, mock_get_tracker + ): """ Test for send_tracking_log_to_backends """