Skip to content

Commit

Permalink
Merge pull request #388 from openedx/cag/add-batching-for-consumer
Browse files Browse the repository at this point in the history
feat: add batching for event bus consumer
  • Loading branch information
Ian2012 authored Mar 22, 2024
2 parents 3b2b22c + f8a1415 commit 2b31649
Show file tree
Hide file tree
Showing 20 changed files with 618 additions and 34 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ Change Log
Unreleased
~~~~~~~~~~

[8.2.0]

* Add support for batching for EventsRouter.

[8.1.2]

* Add grade.now_* events to the xAPI supported events list.
Expand Down
40 changes: 40 additions & 0 deletions docs/getting_started.rst
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,46 @@ A sample override for ``xapi`` backend is presented below. Here we are allowing
}
}
Batching Configuration
----------------------
Batching of events can be configured using the following settings:
#. ``EVENT_ROUTING_BACKEND_BATCHING_ENABLED``: If set to ``True``, events will be batched before being routed. Default is ``False``.
#. ``EVENT_ROUTING_BACKEND_BATCH_SIZE``: Maximum number of events to be batched together. Default is 100.
#. ``EVENT_ROUTING_BACKEND_BATCH_INTERVAL``: Time interval (in seconds) after which events will be ent, whether or not the batch size criteria is met. Default is 60 seconds.
Batching is done in the ``EventsRouter`` backend. If ``EVENT_ROUTING_BACKEND_BATCHING_ENABLED`` is set to ``True``, then events will be batched together and routed to the configured routers after the specified interval or when the batch size is reached, whichever happens first.
In case of downtimes or network issues, events will be queued again to avoid data loss. However, there is no guarantee that the events will be routed in the same order as they were received.
Event bus configuration
-----------------------
The event bus backend can be configured as the producer of the events. In that case, the events will be consumed from the event bus and routed to the configured routers via event bus consumers. The event bus backend can be configured in your edx-platform settings as follows:
.. code-block:: python
EVENT_TRACKING_BACKENDS["xapi"]["ENGINE"] = "eventtracking.backends.event_bus.EventBusRoutingBackend"
EVENT_TRACKING_BACKENDS["xapi"]["OPTIONS"]["backends"]["xapi"]["ENGINE"] = "event_routing_backends.backends.sync_events_router.SyncEventsRouter"
EVENT_TRACKING_BACKENDS["xapi"]["OPTIONS"].pop("backend_name")
if "openedx_events" not in INSTALLED_APPS:
INSTALLED_APPS.append("openedx_events")
SEND_TRACKING_EVENT_EMITTED_SIGNAL = True
EVENT_BUS_PRODUCER_CONFIG = {
"org.openedx.analytics.tracking.event.emitted.v1": {
"analytics": {
"event_key_field": "tracking_log.name", "enabled": True
}
}
}
Once the event bus producer has been configured, the event bus consumer can be started using the following command:
.. code-block:: bash
./manage.py lms consume_events -t analytics -g event_routing_backends --extra '{"consumer_name": "event_routing_backends"}'
OpenEdx Filters
===============
Expand Down
2 changes: 1 addition & 1 deletion event_routing_backends/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
Various backends for receiving edX LMS events..
"""

__version__ = '8.1.2'
__version__ = '8.2.0'
68 changes: 67 additions & 1 deletion event_routing_backends/backends/events_router.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,24 @@
"""
Generic router to send events to hosts.
"""
import json
import logging
from datetime import datetime, timedelta

from django.conf import settings
from django_redis import get_redis_connection
from eventtracking.backends.logger import DateTimeJSONEncoder
from eventtracking.processors.exceptions import EventEmissionExit

from event_routing_backends.helpers import get_business_critical_events
from event_routing_backends.models import RouterConfiguration

logger = logging.getLogger(__name__)

EVENTS_ROUTER_QUEUE_FORMAT = 'events_router_queue_{}'
EVENTS_ROUTER_DEAD_QUEUE_FORMAT = 'dead_queue_{}'
EVENTS_ROUTER_LAST_SENT_FORMAT = 'last_sent_{}'


class EventsRouter:
"""
Expand All @@ -26,6 +35,9 @@ def __init__(self, processors=None, backend_name=None):
"""
self.processors = processors if processors else []
self.backend_name = backend_name
self.queue_name = EVENTS_ROUTER_QUEUE_FORMAT.format(self.backend_name)
self.dead_queue = EVENTS_ROUTER_DEAD_QUEUE_FORMAT.format(self.backend_name)
self.last_sent_key = EVENTS_ROUTER_LAST_SENT_FORMAT.format(self.backend_name)

def configure_host(self, host, router):
"""
Expand Down Expand Up @@ -117,6 +129,16 @@ def prepare_to_send(self, events):

return route_events

def get_failed_events(self, batch_size):
"""
Get failed events from the dead queue.
"""
redis = get_redis_connection()
failed_events = redis.rpop(self.dead_queue, batch_size)
if not failed_events:
return []
return [json.loads(event.decode('utf-8')) for event in failed_events]

def bulk_send(self, events):
"""
Send the event to configured routers after processing it.
Expand Down Expand Up @@ -154,8 +176,27 @@ def send(self, event):
Arguments:
event (dict): the original event dictionary
"""
event_routes = self.prepare_to_send([event])
if settings.EVENT_ROUTING_BACKEND_BATCHING_ENABLED:
redis = get_redis_connection()
batch = self.queue_event(redis, event)
if not batch:
return

try:
redis.set(self.last_sent_key, datetime.now().isoformat())
self.bulk_send([json.loads(queued_event.decode('utf-8')) for queued_event in batch])
except Exception: # pylint: disable=broad-except
logger.exception(
'Exception occurred while trying to bulk dispatch {} events.'.format(
len(batch)
),
exc_info=True
)
logger.info(f'Pushing failed events to the dead queue: {self.dead_queue}')
redis.lpush(self.dead_queue, *batch)
return

event_routes = self.prepare_to_send([event])
for events_for_route in event_routes.values():
for event_name, updated_event, host, is_business_critical in events_for_route:
if is_business_critical:
Expand All @@ -173,6 +214,31 @@ def send(self, event):
host['host_configurations'],
)

def queue_event(self, redis, event):
"""
Queue the event to be sent to configured routers.
"""
event["timestamp"] = event["timestamp"].isoformat()
queue_size = redis.lpush(self.queue_name, json.dumps(event, cls=DateTimeJSONEncoder))
logger.info(f'Event {event["name"]} has been queued for batching. Queue size: {queue_size}')

if queue_size >= settings.EVENT_ROUTING_BACKEND_BATCH_SIZE or self.time_to_send(redis):
batch = redis.rpop(self.queue_name, queue_size)
return batch

return None

def time_to_send(self, redis):
"""
Check if it is time to send the batched events.
"""
last_sent = redis.get(self.last_sent_key)
if not last_sent:
return True
time_passed = (datetime.now() - datetime.fromisoformat(last_sent.decode('utf-8')))
return time_passed > timedelta(seconds=settings.EVENT_ROUTING_BACKEND_BATCH_INTERVAL)

def process_event(self, event):
"""
Process the event through this router's processors.
Expand Down
108 changes: 107 additions & 1 deletion event_routing_backends/backends/tests/test_events_router.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
"""
Test the EventsRouter
"""
import datetime
import json
from copy import copy
from unittest.mock import MagicMock, call, patch, sentinel

import ddt
from django.conf import settings
from django.test import TestCase
from django.test import TestCase, override_settings
from edx_django_utils.cache.utils import TieredCache
from eventtracking.processors.exceptions import EventEmissionExit
from tincan.statement import Statement
Expand Down Expand Up @@ -257,6 +260,87 @@ def test_duplicate_xapi_event_id(self, mocked_logger):
mocked_logger.info.mock_calls
)

@override_settings(
EVENT_ROUTING_BACKEND_BATCHING_ENABLED=True,
EVENT_ROUTING_BACKEND_BATCH_SIZE=2
)
@patch('event_routing_backends.backends.events_router.get_redis_connection')
@patch('event_routing_backends.backends.events_router.logger')
@patch('event_routing_backends.backends.events_router.EventsRouter.bulk_send')
def test_queue_event(self, mock_bulk_send, mock_logger, mock_get_redis_connection):
router = EventsRouter(processors=[], backend_name='test')
redis_mock = MagicMock()
mock_get_redis_connection.return_value = redis_mock
redis_mock.lpush.return_value = None
event1 = copy(self.transformed_event)
event1["timestamp"] = datetime.datetime.now()
event2 = copy(self.transformed_event)
event2["timestamp"] = datetime.datetime.now()
events = [event1, event2]
formatted_events = []
for event in events:
formatted_event = copy(event)
formatted_event["timestamp"] = formatted_event["timestamp"].isoformat()
formatted_events.append(json.dumps(formatted_event).encode('utf-8'))

redis_mock.rpop.return_value = formatted_events
redis_mock.lpush.return_value = 1
redis_mock.get.return_value.decode.return_value = datetime.datetime.now().isoformat()

router.send(event1)
redis_mock.lpush.return_value = 2
router.send(event2)

redis_mock.lpush.assert_any_call(router.queue_name, json.dumps(event1))
redis_mock.rpop.assert_any_call(router.queue_name, settings.EVENT_ROUTING_BACKEND_BATCH_SIZE)
mock_logger.info.assert_any_call(
f"Event {self.transformed_event['name']} has been queued for batching. Queue size: 1"
)
mock_bulk_send.assert_any_call(events)

@override_settings(
EVENT_ROUTING_BACKEND_BATCHING_ENABLED=True,
EVENT_ROUTING_BACKEND_BATCH_SIZE=2
)
@patch('event_routing_backends.backends.events_router.get_redis_connection')
@patch('event_routing_backends.backends.events_router.logger')
@patch('event_routing_backends.backends.events_router.EventsRouter.bulk_send')
@patch('event_routing_backends.backends.events_router.EventsRouter.queue_event')
def test_send_event_with_bulk_exception(
self,
mock_queue_event,
mock_bulk_send,
mock_logger,
mock_get_redis_connection
):
router = EventsRouter(processors=[], backend_name='test')
redis_mock = MagicMock()
mock_get_redis_connection.return_value = redis_mock
mock_queue_event.return_value = [1]
mock_bulk_send.side_effect = EventNotDispatched

router.send(self.transformed_event)

mock_logger.exception.assert_called_once_with(
'Exception occurred while trying to bulk dispatch {} events.'.format(
1
),
exc_info=True
)
mock_logger.info.assert_called_once_with(
f'Pushing failed events to the dead queue: {router.dead_queue}'
)
redis_mock.lpush.assert_called_once_with(router.dead_queue, *[1])

@override_settings(
EVENT_ROUTING_BACKEND_BATCH_INTERVAL=1,
)
def test_time_to_send_no_data(self):
router = EventsRouter(processors=[], backend_name='test')
redis_mock = MagicMock()
redis_mock.get.return_value = None
self.assertTrue(router.time_to_send(redis_mock))


@ddt.ddt
class TestAsyncEventsRouter(TestEventsRouter): # pylint: disable=test-inherits-tests
Expand Down Expand Up @@ -1146,3 +1230,25 @@ def test_failed_routing(self, mocked_remote_lrs):
router = SyncEventsRouter(processors=[], backend_name=RouterConfiguration.XAPI_BACKEND)
with self.assertRaises(EventNotDispatched):
router.send(self.transformed_event)

@patch('event_routing_backends.backends.events_router.get_redis_connection')
def test_get_failed_events(self, mock_get_redis_connection):
redis_mock = MagicMock()
mock_get_redis_connection.return_value = redis_mock
redis_mock.rpop.return_value = [json.dumps({'name': 'test', 'data': {'key': 'value'}}).encode('utf-8')]

router = SyncEventsRouter(processors=[], backend_name='test')
router.get_failed_events(1)

redis_mock.rpop.assert_called_once_with(router.dead_queue, 1)

@patch('event_routing_backends.backends.events_router.get_redis_connection')
def test_get_failed_events_empty(self, mock_get_redis_connection):
redis_mock = MagicMock()
mock_get_redis_connection.return_value = redis_mock
redis_mock.rpop.return_value = None

router = SyncEventsRouter(processors=[], backend_name='test')
events = router.get_failed_events(1)

self.assertEqual(events, [])
2 changes: 1 addition & 1 deletion event_routing_backends/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def get_user(username_or_id):
if username and not user:
try:
user = get_potentially_retired_user_by_username(username)
except Exception as ex:
except Exception as ex: # pylint: disable=broad-except
logger.info('User with username "%s" does not exist.%s', username, ex)

return user
Expand Down
Loading

0 comments on commit 2b31649

Please sign in to comment.