Skip to content

Commit

Permalink
feat: allow processor chains to handle multiple events
Browse files Browse the repository at this point in the history
Updates the event processor change to allow for the possibility that
some event transformers may generate multiple events from a single
event, and these events need to be carried down the processor chain.
  • Loading branch information
pomegranited committed Aug 7, 2023
1 parent 31d24d6 commit 896981d
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 58 deletions.
28 changes: 16 additions & 12 deletions event_routing_backends/backends/events_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def prepare_to_send(self, events):
'Processing edx event "{}" for router with backend {}'.format(event_name, self.backend_name)
)

processed_event = self.process_event(event)
processed_events = self.process_event(event)
except (EventEmissionExit, ValueError):
logger.error(
'Could not process edx event "%s" for backend %s\'s router',
Expand All @@ -90,10 +90,10 @@ def prepare_to_send(self, events):
continue

logger.debug(
'Successfully processed edx event "%s" for router with backend %s. Processed event: %s',
'Successfully processed edx event "%s" for router with backend %s. Processed events: %s',
event_name,
self.backend_name,
processed_event
processed_events
)

for router in routers:
Expand All @@ -107,11 +107,13 @@ def prepare_to_send(self, events):
)
else:
host = self.configure_host(host, router)
updated_event = self.overwrite_event_data(processed_event, host, event_name)
is_business_critical = event_name in business_critical_events
if router_pk not in route_events:
route_events[router_pk] = [(event_name, updated_event, host, is_business_critical),]
else:

if processed_events and router_pk not in route_events:
route_events[router_pk] = []

for processed_event in processed_events:
updated_event = self.overwrite_event_data(processed_event, host, event_name)
is_business_critical = event_name in business_critical_events
route_events[router_pk].append((event_name, updated_event, host, is_business_critical))

return route_events
Expand Down Expand Up @@ -176,17 +178,19 @@ def process_event(self, event):
"""
Process the event through this router's processors.
This single event may produce multiple processed events, and so we return a list of events here.
Arguments:
event (dict): Event to be processed
Returns
dict
list of ANY
"""
event = event.copy()
events = [event.copy()]
for processor in self.processors:
event = processor(event)
events = processor(events)

return event
return events

def overwrite_event_data(self, event, host, event_name):
"""
Expand Down
10 changes: 5 additions & 5 deletions event_routing_backends/backends/tests/test_events_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,9 @@ def setUp(self):
@patch('event_routing_backends.models.RouterConfiguration.get_enabled_routers')
def test_with_processor_exception(self, mocked_get_enabled_routers, mocked_logger, mocked_post):
processors = [
MagicMock(return_value=self.transformed_event),
MagicMock(side_effect=EventEmissionExit, return_value=self.transformed_event),
MagicMock(return_value=self.transformed_event),
MagicMock(return_value=[self.transformed_event]),
MagicMock(side_effect=EventEmissionExit, return_value=[self.transformed_event]),
MagicMock(return_value=[self.transformed_event]),
]
processors[1].side_effect = EventEmissionExit

Expand All @@ -164,8 +164,8 @@ def test_with_processor_exception(self, mocked_get_enabled_routers, mocked_logge
router = EventsRouter(processors=processors, backend_name='test')
router.send(self.transformed_event)

processors[0].assert_called_once_with(self.transformed_event)
processors[1].assert_called_once_with(self.transformed_event)
processors[0].assert_called_once_with([self.transformed_event])
processors[1].assert_called_once_with([self.transformed_event])
processors[2].assert_not_called()

mocked_post.assert_not_called()
Expand Down
23 changes: 13 additions & 10 deletions event_routing_backends/processors/caliper/envelope_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,22 @@ def __init__(self, sensor_id):
"""
self.sensor_id = sensor_id

def __call__(self, event):
def __call__(self, events):
"""
Envelope the caliper transformed event.
Envelope the caliper transformed events.
Arguments:
event (dict): IMS Caliper compliant event dict
events (list of dicts): List of IMS Caliper compliant event dicts
Returns:
dict
list of dicts
"""
return {
'sensor': self.sensor_id,
'sendTime': convert_datetime_to_iso(datetime.now(UTC)),
'data': [event],
'dataVersion': CALIPER_EVENT_CONTEXT
}
enveloped_events = []
for event in events:
enveloped_events.append({
'sensor': self.sensor_id,
'sendTime': convert_datetime_to_iso(datetime.now(UTC)),
'data': [event],
'dataVersion': CALIPER_EVENT_CONTEXT
})
return enveloped_events
18 changes: 9 additions & 9 deletions event_routing_backends/processors/caliper/tests/test_caliper.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def test_skip_event_when_disabled(self):
@patch('event_routing_backends.processors.mixins.base_transformer_processor.logger')
def test_send_method_with_no_transformer_implemented(self, mocked_logger):
with self.assertRaises(EventEmissionExit):
self.processor(self.sample_event)
self.processor([self.sample_event])

mocked_logger.error.assert_called_once_with(
'Could not get transformer for %s event.',
Expand All @@ -49,7 +49,7 @@ def test_send_method_with_no_transformer_implemented(self, mocked_logger):
@patch('event_routing_backends.processors.mixins.base_transformer_processor.logger')
def test_send_method_with_unknown_exception(self, mocked_logger, _):
with self.assertRaises(ValueError):
self.processor(self.sample_event)
self.processor([self.sample_event])

mocked_logger.exception.assert_called_once_with(
'There was an error while trying to transform event "sentinel.name" using CaliperProcessor'
Expand All @@ -66,14 +66,14 @@ def test_send_method_with_successfull_flow(
mocked_logger,
mocked_get_transformer
):
transformed_event = {
transformed_event = [{
'transformed_key': 'transformed_value'
}
}]
mocked_transformer = MagicMock()
mocked_transformer.transform.return_value = transformed_event
mocked_get_transformer.return_value = mocked_transformer

self.processor(self.sample_event)
self.processor([self.sample_event])

self.assertIn(
call(
Expand Down Expand Up @@ -102,14 +102,14 @@ def test_send_method_with_successfull_flow_logging_disabled(
mocked_logger,
mocked_get_transformer
):
transformed_event = {
transformed_event = [{
'transformed_key': 'transformed_value'
}
}]
mocked_transformer = MagicMock()
mocked_transformer.transform.return_value = transformed_event
mocked_get_transformer.return_value = mocked_transformer

self.processor(self.sample_event)
self.processor([self.sample_event])

self.assertIn(
call(
Expand All @@ -131,5 +131,5 @@ def test_with_no_registry(self, mocked_logger):
backend = CaliperProcessor()
backend.registry = None
with self.assertRaises(EventEmissionExit):
self.assertIsNone(backend(self.sample_event))
self.assertIsNone(backend([self.sample_event]))
mocked_logger.exception.assert_called_once()
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ def setUp(self):
def test_caliper_envelope_processor(self, mocked_datetime):
mocked_datetime.now.return_value = FROZEN_TIME

result = CaliperEnvelopeProcessor(sensor_id=self.sensor_id)(self.sample_event)
self.assertEqual(result, {
result = CaliperEnvelopeProcessor(sensor_id=self.sensor_id)([self.sample_event])
self.assertEqual(result, [{
'sensor': self.sensor_id,
'sendTime': convert_datetime_to_iso(str(FROZEN_TIME)),
'data': [self.sample_event],
'dataVersion': CALIPER_EVENT_CONTEXT
})
}])
Original file line number Diff line number Diff line change
Expand Up @@ -18,33 +18,37 @@ class BaseTransformerProcessorMixin:

registry = None

def __call__(self, event):
def __call__(self, events):
"""
Transform and then route the event to the configured routers.
Transform the given events, and return the transformed events.
Arguments:
event (dict): Event to be transformed and delivered.
event (list of dicts): Events to be transformed.
Returns:
transformed event (dict)
transformed events (list of ANY)
Raises:
EventEmissionExit except.on: if no transformer is registered for an event.
ANY exception: if raised.
"""
transformed_event = self.transform_event(event)

if transformed_event:
return transformed_event

raise EventEmissionExit
returned_events = []
for event in events:
transformed_event = self.transform_event(event)
if not transformed_event:
raise EventEmissionExit
if isinstance(transformed_event, list):
returned_events += transformed_event
else:
returned_events.append(transformed_event)
return returned_events

def transform_event(self, event):
"""
Transform the event.
Transformer method can return transformed event in any data structure format
(dict or any custom class) but the configured router(s) MUST support it.
Transformer method can return transformed events in any data structure format
(dict, list, or any custom class) but the configured router(s) MUST support it.
Arguments:
event (dict): Event to be transformed.
Expand Down
12 changes: 6 additions & 6 deletions event_routing_backends/processors/xapi/tests/test_xapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def test_skip_event_when_disabled(self):
@patch('event_routing_backends.processors.mixins.base_transformer_processor.logger')
def test_send_method_with_no_transformer_implemented(self, mocked_logger):
with self.assertRaises(EventEmissionExit):
self.processor(self.sample_event)
self.processor([self.sample_event])

mocked_logger.error.assert_called_once_with(
'Could not get transformer for %s event.',
Expand All @@ -45,7 +45,7 @@ def test_send_method_with_no_transformer_implemented(self, mocked_logger):
@patch('event_routing_backends.processors.mixins.base_transformer_processor.logger')
def test_send_method_with_unknown_exception(self, mocked_logger, _):
with self.assertRaises(ValueError):
self.processor(self.sample_event)
self.processor([self.sample_event])

mocked_logger.exception.assert_called_once_with(
'There was an error while trying to transform event "sentinel.name" using XApiProcessor'
Expand All @@ -62,7 +62,7 @@ def test_send_method_with_successfull_flow(self, mocked_logger, mocked_get_trans
mocked_transformer.transform.return_value = transformed_event
mocked_get_transformer.return_value = mocked_transformer

self.processor(self.sample_event)
self.processor([self.sample_event])

self.assertIn(call(transformed_event.to_json()), mocked_logger.mock_calls)

Expand All @@ -77,7 +77,7 @@ def test_send_method_with_invalid_object(self, mocked_logger, mocked_get_transfo
mocked_get_transformer.return_value = mocked_transformer

with self.assertRaises(EventEmissionExit):
self.processor(self.sample_event)
self.processor([self.sample_event])

self.assertNotIn(call(transformed_event.to_json()), mocked_logger.mock_calls)

Expand All @@ -93,7 +93,7 @@ def test_send_method_with_successfull_flow_no_logger(self, mocked_logger, mocked
mocked_transformer.transform.return_value = transformed_event
mocked_get_transformer.return_value = mocked_transformer

self.processor(self.sample_event)
self.processor([self.sample_event])

self.assertNotIn(call(transformed_event.to_json()), mocked_logger.mock_calls)

Expand All @@ -102,5 +102,5 @@ def test_with_no_registry(self, mocked_logger):
backend = XApiProcessor()
backend.registry = None
with self.assertRaises(EventEmissionExit):
self.assertIsNone(backend(self.sample_event))
self.assertIsNone(backend([self.sample_event]))
mocked_logger.exception.assert_called_once()
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def transform_event(self, event):
event (dict): Event to be transformed.
Returns:
dict: transformed event
ANY: transformed event
Raises:
Any Exception
Expand Down

0 comments on commit 896981d

Please sign in to comment.