diff --git a/event_routing_backends/handlers.py b/event_routing_backends/handlers.py index dacf63f3..5a89c286 100644 --- a/event_routing_backends/handlers.py +++ b/event_routing_backends/handlers.py @@ -4,14 +4,20 @@ import json import logging +from django.conf import settings from django.dispatch import receiver from eventtracking.backends.event_bus import EventBusRoutingBackend from eventtracking.processors.exceptions import EventEmissionExit from eventtracking.tracker import get_tracker from openedx_events.analytics.signals import TRACKING_LOG_EVENT_EMITTED +from django_redis import get_redis_connection +from edx_django_utils.cache.utils import get_cache_key logger = logging.getLogger(__name__) +redis = get_redis_connection("default") + +TRANSFORMED_EVENT_KEY_NAME = "transformed_events" @receiver(TRACKING_LOG_EVENT_EMITTED) def send_tracking_log_to_backends( @@ -48,10 +54,26 @@ def send_tracking_log_to_backends( for name, engine in tracker.backends.items() if isinstance(engine, EventBusRoutingBackend) } - for name, engine in engines.items(): - try: - processed_event = engine.process_event(event) - logger.info('Successfully processed event "{}"'.format(event["name"])) - engine.send_to_backends(processed_event.copy()) - except EventEmissionExit: - logger.info("[EventEmissionExit] skipping event {}".format(event["name"])) + + queue_size = redis.llen(TRANSFORMED_EVENT_KEY_NAME) + if queue_size >= settings.EVENT_ROUTING_BATCH_SIZE: + queued_events = redis.rpop(TRANSFORMED_EVENT_KEY_NAME, queue_size) + json_events = [] + for queued_event in queued_events: + new_event = json.loads(queued_event.decode('utf-8')) + json_events.append(new_event) + + queued_events.append(event) + + for name, engine in engines.items(): + for backend_name, backend in engine.backends.items(): + logger.info(f"Sending events to backend [{backend_name}] event bus in batch") + backend.bulk_send(json_events) + else: + redis.lpush(TRANSFORMED_EVENT_KEY_NAME, json.dumps({ + "name": tracking_log.name, + "timestamp": tracking_log.timestamp.isoformat(), + "data": json.loads(tracking_log.data), + "context": json.loads(tracking_log.context), + })) + logger.info("Event pushed to the queue, current size: %s", queue_size + 1) diff --git a/event_routing_backends/settings/common.py b/event_routing_backends/settings/common.py index 153525f1..d8c448aa 100644 --- a/event_routing_backends/settings/common.py +++ b/event_routing_backends/settings/common.py @@ -219,3 +219,5 @@ def plugin_settings(settings): } } }) + + settings.EVENT_ROUTING_BATCH_SIZE = 5