diff --git a/event_routing_backends/backends/events_router.py b/event_routing_backends/backends/events_router.py index 1e12a33b..8a8b774f 100644 --- a/event_routing_backends/backends/events_router.py +++ b/event_routing_backends/backends/events_router.py @@ -79,7 +79,7 @@ def prepare_to_send(self, events): 'Processing edx event "{}" for router with backend {}'.format(event_name, self.backend_name) ) - processed_event = self.process_event(event) + processed_events = self.process_event(event) except (EventEmissionExit, ValueError): logger.error( 'Could not process edx event "%s" for backend %s\'s router', @@ -90,10 +90,10 @@ def prepare_to_send(self, events): continue logger.debug( - 'Successfully processed edx event "%s" for router with backend %s. Processed event: %s', + 'Successfully processed edx event "%s" for router with backend %s. Processed events: %s', event_name, self.backend_name, - processed_event + processed_events ) for router in routers: @@ -107,11 +107,13 @@ def prepare_to_send(self, events): ) else: host = self.configure_host(host, router) - updated_event = self.overwrite_event_data(processed_event, host, event_name) - is_business_critical = event_name in business_critical_events - if router_pk not in route_events: - route_events[router_pk] = [(event_name, updated_event, host, is_business_critical),] - else: + + if processed_events and router_pk not in route_events: + route_events[router_pk] = [] + + for processed_event in processed_events: + updated_event = self.overwrite_event_data(processed_event, host, event_name) + is_business_critical = event_name in business_critical_events route_events[router_pk].append((event_name, updated_event, host, is_business_critical)) return route_events @@ -176,17 +178,19 @@ def process_event(self, event): """ Process the event through this router's processors. + This single event may produce multiple processed events, and so we return a list of events here. + Arguments: event (dict): Event to be processed Returns - dict + list of ANY """ - event = event.copy() + events = [event.copy()] for processor in self.processors: - event = processor(event) + events = processor(events) - return event + return events def overwrite_event_data(self, event, host, event_name): """ diff --git a/event_routing_backends/backends/tests/test_events_router.py b/event_routing_backends/backends/tests/test_events_router.py index 098a6270..a8bc1ccc 100644 --- a/event_routing_backends/backends/tests/test_events_router.py +++ b/event_routing_backends/backends/tests/test_events_router.py @@ -153,9 +153,9 @@ def setUp(self): @patch('event_routing_backends.models.RouterConfiguration.get_enabled_routers') def test_with_processor_exception(self, mocked_get_enabled_routers, mocked_logger, mocked_post): processors = [ - MagicMock(return_value=self.transformed_event), - MagicMock(side_effect=EventEmissionExit, return_value=self.transformed_event), - MagicMock(return_value=self.transformed_event), + MagicMock(return_value=[self.transformed_event]), + MagicMock(side_effect=EventEmissionExit, return_value=[self.transformed_event]), + MagicMock(return_value=[self.transformed_event]), ] processors[1].side_effect = EventEmissionExit @@ -164,8 +164,8 @@ def test_with_processor_exception(self, mocked_get_enabled_routers, mocked_logge router = EventsRouter(processors=processors, backend_name='test') router.send(self.transformed_event) - processors[0].assert_called_once_with(self.transformed_event) - processors[1].assert_called_once_with(self.transformed_event) + processors[0].assert_called_once_with([self.transformed_event]) + processors[1].assert_called_once_with([self.transformed_event]) processors[2].assert_not_called() mocked_post.assert_not_called() diff --git a/event_routing_backends/processors/caliper/envelope_processor.py b/event_routing_backends/processors/caliper/envelope_processor.py index 27f5d59d..4d1f2819 100644 --- a/event_routing_backends/processors/caliper/envelope_processor.py +++ b/event_routing_backends/processors/caliper/envelope_processor.py @@ -19,19 +19,22 @@ def __init__(self, sensor_id): """ self.sensor_id = sensor_id - def __call__(self, event): + def __call__(self, events): """ - Envelope the caliper transformed event. + Envelope the caliper transformed events. Arguments: - event (dict): IMS Caliper compliant event dict + events (list of dicts): List of IMS Caliper compliant event dicts Returns: - dict + list of dicts """ - return { - 'sensor': self.sensor_id, - 'sendTime': convert_datetime_to_iso(datetime.now(UTC)), - 'data': [event], - 'dataVersion': CALIPER_EVENT_CONTEXT - } + enveloped_events = [] + for event in events: + enveloped_events.append({ + 'sensor': self.sensor_id, + 'sendTime': convert_datetime_to_iso(datetime.now(UTC)), + 'data': [event], + 'dataVersion': CALIPER_EVENT_CONTEXT + }) + return enveloped_events diff --git a/event_routing_backends/processors/caliper/tests/test_caliper.py b/event_routing_backends/processors/caliper/tests/test_caliper.py index 3857da23..0c6a4a9b 100644 --- a/event_routing_backends/processors/caliper/tests/test_caliper.py +++ b/event_routing_backends/processors/caliper/tests/test_caliper.py @@ -35,7 +35,7 @@ def test_skip_event_when_disabled(self): @patch('event_routing_backends.processors.mixins.base_transformer_processor.logger') def test_send_method_with_no_transformer_implemented(self, mocked_logger): with self.assertRaises(EventEmissionExit): - self.processor(self.sample_event) + self.processor([self.sample_event]) mocked_logger.error.assert_called_once_with( 'Could not get transformer for %s event.', @@ -49,7 +49,7 @@ def test_send_method_with_no_transformer_implemented(self, mocked_logger): @patch('event_routing_backends.processors.mixins.base_transformer_processor.logger') def test_send_method_with_unknown_exception(self, mocked_logger, _): with self.assertRaises(ValueError): - self.processor(self.sample_event) + self.processor([self.sample_event]) mocked_logger.exception.assert_called_once_with( 'There was an error while trying to transform event "sentinel.name" using CaliperProcessor' @@ -66,14 +66,14 @@ def test_send_method_with_successfull_flow( mocked_logger, mocked_get_transformer ): - transformed_event = { + transformed_event = [{ 'transformed_key': 'transformed_value' - } + }] mocked_transformer = MagicMock() mocked_transformer.transform.return_value = transformed_event mocked_get_transformer.return_value = mocked_transformer - self.processor(self.sample_event) + self.processor([self.sample_event]) self.assertIn( call( @@ -102,14 +102,14 @@ def test_send_method_with_successfull_flow_logging_disabled( mocked_logger, mocked_get_transformer ): - transformed_event = { + transformed_event = [{ 'transformed_key': 'transformed_value' - } + }] mocked_transformer = MagicMock() mocked_transformer.transform.return_value = transformed_event mocked_get_transformer.return_value = mocked_transformer - self.processor(self.sample_event) + self.processor([self.sample_event]) self.assertIn( call( @@ -131,5 +131,5 @@ def test_with_no_registry(self, mocked_logger): backend = CaliperProcessor() backend.registry = None with self.assertRaises(EventEmissionExit): - self.assertIsNone(backend(self.sample_event)) + self.assertIsNone(backend([self.sample_event])) mocked_logger.exception.assert_called_once() diff --git a/event_routing_backends/processors/caliper/tests/test_envelope_processor.py b/event_routing_backends/processors/caliper/tests/test_envelope_processor.py index fe19abcf..9876b80f 100644 --- a/event_routing_backends/processors/caliper/tests/test_envelope_processor.py +++ b/event_routing_backends/processors/caliper/tests/test_envelope_processor.py @@ -30,10 +30,10 @@ def setUp(self): def test_caliper_envelope_processor(self, mocked_datetime): mocked_datetime.now.return_value = FROZEN_TIME - result = CaliperEnvelopeProcessor(sensor_id=self.sensor_id)(self.sample_event) - self.assertEqual(result, { + result = CaliperEnvelopeProcessor(sensor_id=self.sensor_id)([self.sample_event]) + self.assertEqual(result, [{ 'sensor': self.sensor_id, 'sendTime': convert_datetime_to_iso(str(FROZEN_TIME)), 'data': [self.sample_event], 'dataVersion': CALIPER_EVENT_CONTEXT - }) + }]) diff --git a/event_routing_backends/processors/mixins/base_transformer_processor.py b/event_routing_backends/processors/mixins/base_transformer_processor.py index f1d2110f..fa558771 100644 --- a/event_routing_backends/processors/mixins/base_transformer_processor.py +++ b/event_routing_backends/processors/mixins/base_transformer_processor.py @@ -18,33 +18,37 @@ class BaseTransformerProcessorMixin: registry = None - def __call__(self, event): + def __call__(self, events): """ - Transform and then route the event to the configured routers. + Transform the given events, and return the transformed events. Arguments: - event (dict): Event to be transformed and delivered. + event (list of dicts): Events to be transformed. Returns: - transformed event (dict) + transformed events (list of ANY) Raises: EventEmissionExit except.on: if no transformer is registered for an event. ANY exception: if raised. """ - transformed_event = self.transform_event(event) - - if transformed_event: - return transformed_event - - raise EventEmissionExit + returned_events = [] + for event in events: + transformed_event = self.transform_event(event) + if not transformed_event: + raise EventEmissionExit + if isinstance(transformed_event, list): + returned_events += transformed_event + else: + returned_events.append(transformed_event) + return returned_events def transform_event(self, event): """ Transform the event. - Transformer method can return transformed event in any data structure format - (dict or any custom class) but the configured router(s) MUST support it. + Transformer method can return transformed events in any data structure format + (dict, list, or any custom class) but the configured router(s) MUST support it. Arguments: event (dict): Event to be transformed. diff --git a/event_routing_backends/processors/xapi/tests/test_xapi.py b/event_routing_backends/processors/xapi/tests/test_xapi.py index 9bb044ba..6e5e5f75 100644 --- a/event_routing_backends/processors/xapi/tests/test_xapi.py +++ b/event_routing_backends/processors/xapi/tests/test_xapi.py @@ -31,7 +31,7 @@ def test_skip_event_when_disabled(self): @patch('event_routing_backends.processors.mixins.base_transformer_processor.logger') def test_send_method_with_no_transformer_implemented(self, mocked_logger): with self.assertRaises(EventEmissionExit): - self.processor(self.sample_event) + self.processor([self.sample_event]) mocked_logger.error.assert_called_once_with( 'Could not get transformer for %s event.', @@ -45,7 +45,7 @@ def test_send_method_with_no_transformer_implemented(self, mocked_logger): @patch('event_routing_backends.processors.mixins.base_transformer_processor.logger') def test_send_method_with_unknown_exception(self, mocked_logger, _): with self.assertRaises(ValueError): - self.processor(self.sample_event) + self.processor([self.sample_event]) mocked_logger.exception.assert_called_once_with( 'There was an error while trying to transform event "sentinel.name" using XApiProcessor' @@ -62,7 +62,7 @@ def test_send_method_with_successfull_flow(self, mocked_logger, mocked_get_trans mocked_transformer.transform.return_value = transformed_event mocked_get_transformer.return_value = mocked_transformer - self.processor(self.sample_event) + self.processor([self.sample_event]) self.assertIn(call(transformed_event.to_json()), mocked_logger.mock_calls) @@ -77,7 +77,7 @@ def test_send_method_with_invalid_object(self, mocked_logger, mocked_get_transfo mocked_get_transformer.return_value = mocked_transformer with self.assertRaises(EventEmissionExit): - self.processor(self.sample_event) + self.processor([self.sample_event]) self.assertNotIn(call(transformed_event.to_json()), mocked_logger.mock_calls) @@ -93,7 +93,7 @@ def test_send_method_with_successfull_flow_no_logger(self, mocked_logger, mocked mocked_transformer.transform.return_value = transformed_event mocked_get_transformer.return_value = mocked_transformer - self.processor(self.sample_event) + self.processor([self.sample_event]) self.assertNotIn(call(transformed_event.to_json()), mocked_logger.mock_calls) @@ -102,5 +102,5 @@ def test_with_no_registry(self, mocked_logger): backend = XApiProcessor() backend.registry = None with self.assertRaises(EventEmissionExit): - self.assertIsNone(backend(self.sample_event)) + self.assertIsNone(backend([self.sample_event])) mocked_logger.exception.assert_called_once() diff --git a/event_routing_backends/processors/xapi/transformer_processor.py b/event_routing_backends/processors/xapi/transformer_processor.py index 71ac712d..418e204c 100644 --- a/event_routing_backends/processors/xapi/transformer_processor.py +++ b/event_routing_backends/processors/xapi/transformer_processor.py @@ -34,7 +34,7 @@ def transform_event(self, event): event (dict): Event to be transformed. Returns: - dict: transformed event + ANY: transformed event Raises: Any Exception