Skip to content

Commit

Permalink
feat: allow to use any configured engine to replay tracking logs
Browse files Browse the repository at this point in the history
  • Loading branch information
Ian2012 committed Oct 27, 2023
1 parent 6b9530b commit b138dde
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 18 deletions.
3 changes: 2 additions & 1 deletion event_routing_backends/backends/events_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,8 @@ def bulk_send(self, events):
prepared_events.append(updated_event)

if prepared_events: # pragma: no cover
dispatch_bulk_events.delay(
func = dispatch_bulk_events if self.sync else dispatch_bulk_events.delay
func(
prepared_events,
host['router_type'],
host['host_configurations']
Expand Down
39 changes: 23 additions & 16 deletions event_routing_backends/management/commands/helpers/queued_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,11 @@
from io import BytesIO
from time import sleep

from event_routing_backends.backends.events_router import EventsRouter
from event_routing_backends.management.commands.helpers.event_log_parser import parse_json_event
from event_routing_backends.models import RouterConfiguration
from event_routing_backends.processors.caliper.transformer_processor import CaliperProcessor
from event_routing_backends.processors.xapi.transformer_processor import XApiProcessor

from eventtracking.processors.exceptions import EventEmissionExit
from eventtracking.processors.whitelist import NameWhitelistProcessor
from eventtracking.tracker import get_tracker
from eventtracking.backends.event_bus import EventBusRoutingBackend

class QueuedSender:
"""
Expand All @@ -23,6 +22,7 @@ def __init__(
destination,
destination_container,
destination_prefix,
backend,
transformer_type,
max_queue_size=10000,
sleep_between_batches_secs=1.0,
Expand All @@ -31,6 +31,7 @@ def __init__(
self.destination = destination
self.destination_container = destination_container
self.destination_prefix = destination_prefix
self.backend = backend
self.transformer_type = transformer_type
self.event_queue = []
self.max_queue_size = max_queue_size
Expand All @@ -43,23 +44,24 @@ def __init__(
self.unparsable_lines = 0
self.batches_sent = 0

if self.transformer_type == "xapi":
self.router = EventsRouter(
backend_name=RouterConfiguration.XAPI_BACKEND,
processors=[XApiProcessor()]
)
else:
self.router = EventsRouter(
backend_name=RouterConfiguration.CALIPER_BACKEND,
processors=[CaliperProcessor()]
)
self.tracker = get_tracker()
self.router = self.tracker.backends[self.backend]
if transformer_type != "all":
self.router.backends = {
self.backend: self.router.backends[self.transformer_type]
}

def is_known_event(self, event):
"""
Check whether any processor cares about this event.
"""
if "name" in event:
for processor in self.router.processors:
if isinstance(processor, NameWhitelistProcessor):
try:
return processor(event)
except EventEmissionExit:
continue
if event["name"] in processor.registry.mapping:
return True
return False
Expand Down Expand Up @@ -108,7 +110,12 @@ def send(self):
"""
if self.destination == "LRS":
print(f"Sending {len(self.event_queue)} events to LRS...")
self.router.bulk_send(self.event_queue)
if isinstance(self.router, EventBusRoutingBackend):
for event in self.event_queue:
self.router.send(event)
else:
for backend in self.router.backends.values():
backend.bulk_send(self.event_queue)
else:
print("Skipping send, we're storing with libcloud instead of an LRS.")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,9 +237,14 @@ def add_arguments(self, parser):
"key 'container'. If your destination needs a prefix (ex: directory path), add it here under the key "
"'prefix'. If no prefix is given, the output file(s) will be written to the base path.",
)
parser.add_argument(
'--backend',
type=str,
help="The name of the tracking backend to use. This is only used if the destination_provider is 'LRS'."
)
parser.add_argument(
'--transformer_type',
choices=["xapi", "caliper"],
choices=["xapi", "caliper", "all"],
required=True,
help="The type of transformation to do, only one can be done at a time.",
)
Expand Down Expand Up @@ -291,6 +296,7 @@ def handle(self, *args, **options):
dest_driver,
dest_container,
dest_prefix,
options["backend"],
options["transformer_type"],
max_queue_size=options["batch_size"],
sleep_between_batches_secs=options["sleep_between_batches_secs"],
Expand Down

0 comments on commit b138dde

Please sign in to comment.