Skip to content

Commit

Permalink
feat: send events in batch
Browse files Browse the repository at this point in the history
  • Loading branch information
Ian2012 committed Nov 7, 2023
1 parent 5c99f39 commit cd8ccb0
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 7 deletions.
36 changes: 29 additions & 7 deletions event_routing_backends/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,20 @@
import json
import logging

from django.conf import settings
from django.dispatch import receiver
from eventtracking.backends.event_bus import EventBusRoutingBackend
from eventtracking.processors.exceptions import EventEmissionExit
from eventtracking.tracker import get_tracker
from openedx_events.analytics.signals import TRACKING_LOG_EVENT_EMITTED
from django_redis import get_redis_connection
from edx_django_utils.cache.utils import get_cache_key

logger = logging.getLogger(__name__)

redis = get_redis_connection("default")

TRANSFORMED_EVENT_KEY_NAME = "transformed_events"

@receiver(TRACKING_LOG_EVENT_EMITTED)
def send_tracking_log_to_backends(
Expand Down Expand Up @@ -48,10 +54,26 @@ def send_tracking_log_to_backends(
for name, engine in tracker.backends.items()
if isinstance(engine, EventBusRoutingBackend)
}
for name, engine in engines.items():
try:
processed_event = engine.process_event(event)
logger.info('Successfully processed event "{}"'.format(event["name"]))
engine.send_to_backends(processed_event.copy())
except EventEmissionExit:
logger.info("[EventEmissionExit] skipping event {}".format(event["name"]))

queue_size = redis.llen(TRANSFORMED_EVENT_KEY_NAME)
if queue_size >= settings.EVENT_ROUTING_BATCH_SIZE:
queued_events = redis.rpop(TRANSFORMED_EVENT_KEY_NAME, queue_size)
json_events = []
for queued_event in queued_events:
new_event = json.loads(queued_event.decode('utf-8'))
json_events.append(new_event)

queued_events.append(event)

for name, engine in engines.items():
for backend_name, backend in engine.backends.items():
logger.info(f"Sending events to backend [{backend_name}] event bus in batch")
backend.bulk_send(json_events)
else:
redis.lpush(TRANSFORMED_EVENT_KEY_NAME, json.dumps({
"name": tracking_log.name,
"timestamp": tracking_log.timestamp.isoformat(),
"data": json.loads(tracking_log.data),
"context": json.loads(tracking_log.context),
}))
logger.info("Event pushed to the queue, current size: %s", queue_size + 1)
2 changes: 2 additions & 0 deletions event_routing_backends/settings/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,3 +219,5 @@ def plugin_settings(settings):
}
}
})

settings.EVENT_ROUTING_BATCH_SIZE = 5

0 comments on commit cd8ccb0

Please sign in to comment.