forked from cosmicpython/code
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbootstrap.py
81 lines (60 loc) · 2.53 KB
/
bootstrap.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import inspect
from typing import Callable
import edgedb
from fastapi import FastAPI, Request
from allocation.adapters import redis_eventpublisher
from allocation.adapters.notifications import AbstractNotifications, EmailNotifications
from allocation.app.settings import settings
from allocation.services import handlers, messagebus, unit_of_work
async def get_messagebus(request: Request) -> messagebus.MessageBus:
return request.app.state.bus
def get_pull_connection_edgedb(test_db: bool = False) -> edgedb.AsyncIOClient:
return edgedb.create_async_client(
settings.get_edgedb_dsn(test_db=test_db), tls_security="insecure"
)
def get_uow(
async_client_db: edgedb.AsyncIOClient = get_pull_connection_edgedb(),
) -> unit_of_work.EdgedbUnitOfWork:
return unit_of_work.EdgedbUnitOfWork(async_client_db)
def inject_dependencies(handler, dependencies):
params = inspect.signature(handler).parameters
deps = {
name: dependency for name, dependency in dependencies.items() if name in params
}
return lambda message: handler(message, **deps)
class Bootstrap:
def __init__(
self,
uow: unit_of_work.AbstractUnitOfWork = get_uow(),
notifications: AbstractNotifications = None,
publish: Callable = redis_eventpublisher.publish,
):
if not notifications:
notifications = EmailNotifications()
dependencies = {"uow": uow, "notifications": notifications, "publish": publish}
injected_event_handlers = {
event_type: [
inject_dependencies(handler, dependencies) for handler in event_handlers
]
for event_type, event_handlers in handlers.EVENT_HANDLERS.items()
}
injected_command_handlers = {
command_type: inject_dependencies(handler, dependencies)
for command_type, handler in handlers.COMMAND_HANDLERS.items()
}
self.messagebus = messagebus.MessageBus(
uow=uow,
event_handlers=injected_event_handlers,
command_handlers=injected_command_handlers,
)
async def __aenter__(self):
return self.messagebus
async def __aexit__(self, exc_type, exc, tb):
await self.messagebus.uow.async_client.aclose()
bootstrap = Bootstrap()
async def aenter_lifespan(app: FastAPI):
bus = app.state.bus = bootstrap.messagebus
await bus.uow.async_client.ensure_connected()
async def aexit_lifespan(app: FastAPI):
bus, app.state.bus = app.state.bus, None
await bus.uow.async_client.aclose()