forked from cosmicpython/code
-
Notifications
You must be signed in to change notification settings - Fork 0
/
messagebus.py
74 lines (62 loc) · 2.44 KB
/
messagebus.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import abc
import logging
from typing import Any, Awaitable, Callable
from allocation.domain import commands, events
from . import unit_of_work
logger = logging.getLogger(__name__)
AsyncEventHandler = Callable[..., Awaitable[Any | None]]
Message = commands.Command | events.Event
class AbstractMessageBus(abc.ABC):
def __init__(
self,
uow: unit_of_work.AbstractUnitOfWork,
event_handlers: dict[type[events.Event], list[AsyncEventHandler]],
command_handlers: dict[type[commands.Command], AsyncEventHandler],
) -> None:
self.uow = uow
self.event_handlers = event_handlers
self.command_handlers = command_handlers
async def handle(self, message: Message):
self.queue = [message]
while self.queue:
message = self.queue.pop(0)
match message:
case events.Event():
await self.event_handler(message)
case commands.Command():
await self.command_handler(message)
case _:
raise Exception(f"{message} was not an Event or Command")
@abc.abstractmethod
async def event_handler(self, event: events.Event):
pass
@abc.abstractmethod
async def command_handler(self, command: commands.Command):
pass
class MessageBus(AbstractMessageBus):
async def event_handler(self, event: events.Event):
for handler in self.event_handlers[type(event)]:
try:
logger.debug(f"handling event {event} with handler {handler}")
await handler(event)
events_list = [
event async for event in self.uow.collect_new_events() if event
]
self.queue.extend(events_list)
except Exception:
logger.exception(f"Exception handling event {event}")
continue
async def command_handler(self, command: commands.Command):
handler = self.command_handlers[type(command)]
try:
logger.debug(f"handling command {command}")
await handler(command)
commands_list = [
command async for command in self.uow.collect_new_events() if command
]
self.queue.extend(commands_list)
except Exception:
logger.exception(
f"Exception handling command {command} with handler {handler}"
)
raise