diff --git a/event_routing_backends/backends/events_router.py b/event_routing_backends/backends/events_router.py index 6df1dcdb..9e3a1c08 100644 --- a/event_routing_backends/backends/events_router.py +++ b/event_routing_backends/backends/events_router.py @@ -140,7 +140,8 @@ def bulk_send(self, events): prepared_events.append(updated_event) if prepared_events: # pragma: no cover - dispatch_bulk_events.delay( + func = dispatch_bulk_events if self.sync else dispatch_bulk_events.delay + func( prepared_events, host['router_type'], host['host_configurations'] diff --git a/event_routing_backends/management/commands/helpers/queued_sender.py b/event_routing_backends/management/commands/helpers/queued_sender.py index d6ddf5c0..57cd0334 100644 --- a/event_routing_backends/management/commands/helpers/queued_sender.py +++ b/event_routing_backends/management/commands/helpers/queued_sender.py @@ -7,12 +7,11 @@ from io import BytesIO from time import sleep -from event_routing_backends.backends.events_router import EventsRouter from event_routing_backends.management.commands.helpers.event_log_parser import parse_json_event -from event_routing_backends.models import RouterConfiguration -from event_routing_backends.processors.caliper.transformer_processor import CaliperProcessor -from event_routing_backends.processors.xapi.transformer_processor import XApiProcessor - +from eventtracking.processors.exceptions import EventEmissionExit +from eventtracking.processors.whitelist import NameWhitelistProcessor +from eventtracking.tracker import get_tracker +from eventtracking.backends.event_bus import EventBusRoutingBackend class QueuedSender: """ @@ -23,6 +22,7 @@ def __init__( destination, destination_container, destination_prefix, + backend, transformer_type, max_queue_size=10000, sleep_between_batches_secs=1.0, @@ -31,6 +31,7 @@ def __init__( self.destination = destination self.destination_container = destination_container self.destination_prefix = destination_prefix + self.backend = backend self.transformer_type = transformer_type self.event_queue = [] self.max_queue_size = max_queue_size @@ -43,16 +44,12 @@ def __init__( self.unparsable_lines = 0 self.batches_sent = 0 - if self.transformer_type == "xapi": - self.router = EventsRouter( - backend_name=RouterConfiguration.XAPI_BACKEND, - processors=[XApiProcessor()] - ) - else: - self.router = EventsRouter( - backend_name=RouterConfiguration.CALIPER_BACKEND, - processors=[CaliperProcessor()] - ) + self.tracker = get_tracker() + self.router = self.tracker.backends[self.backend] + if transformer_type != "all": + self.router.backends = { + self.backend: self.router.backends[self.transformer_type] + } def is_known_event(self, event): """ @@ -60,6 +57,11 @@ def is_known_event(self, event): """ if "name" in event: for processor in self.router.processors: + if isinstance(processor, NameWhitelistProcessor): + try: + return processor(event) + except EventEmissionExit: + continue if event["name"] in processor.registry.mapping: return True return False @@ -108,7 +110,12 @@ def send(self): """ if self.destination == "LRS": print(f"Sending {len(self.event_queue)} events to LRS...") - self.router.bulk_send(self.event_queue) + if isinstance(self.router, EventBusRoutingBackend): + for event in self.event_queue: + self.router.send(event) + else: + for backend in self.router.backends.values(): + backend.bulk_send(self.event_queue) else: print("Skipping send, we're storing with libcloud instead of an LRS.") diff --git a/event_routing_backends/management/commands/transform_tracking_logs.py b/event_routing_backends/management/commands/transform_tracking_logs.py index b11df861..7ba277fe 100644 --- a/event_routing_backends/management/commands/transform_tracking_logs.py +++ b/event_routing_backends/management/commands/transform_tracking_logs.py @@ -237,9 +237,14 @@ def add_arguments(self, parser): "key 'container'. If your destination needs a prefix (ex: directory path), add it here under the key " "'prefix'. If no prefix is given, the output file(s) will be written to the base path.", ) + parser.add_argument( + '--backend', + type=str, + help="The name of the tracking backend to use. This is only used if the destination_provider is 'LRS'." + ) parser.add_argument( '--transformer_type', - choices=["xapi", "caliper"], + choices=["xapi", "caliper", "all"], required=True, help="The type of transformation to do, only one can be done at a time.", ) @@ -291,6 +296,7 @@ def handle(self, *args, **options): dest_driver, dest_container, dest_prefix, + options["backend"], options["transformer_type"], max_queue_size=options["batch_size"], sleep_between_batches_secs=options["sleep_between_batches_secs"],