Skip to content

Commit

Permalink
.brokers._daemon: add notes around needed brokerd respawn tech
Browse files Browse the repository at this point in the history
  • Loading branch information
goodboy committed Jun 27, 2023
1 parent a44bc4a commit 3535986
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 19 deletions.
58 changes: 40 additions & 18 deletions piker/brokers/_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,22 @@
``brokerd``.
'''
from __future__ import annotations
from contextlib import (
asynccontextmanager as acm,
)
from typing import TYPE_CHECKING
import exceptiongroup as eg

import tractor
import trio

from . import _util
from . import get_brokermod

if TYPE_CHECKING:
from ..data import _FeedsBus

# `brokerd` enabled modules
# TODO: move this def to the `.data` subpkg..
# NOTE: keeping this list as small as possible is part of our caps-sec
Expand Down Expand Up @@ -69,24 +75,40 @@ async def _setup_persistent_brokerd(
# set global for this actor to this new process-wide instance B)
_util.log = log

from piker.data.feed import (
_bus,
get_feed_bus,
)
global _bus
assert not _bus

async with trio.open_nursery() as service_nursery:
# assign a nursery to the feeds bus for spawning
# background tasks from clients
get_feed_bus(brokername, service_nursery)

# unblock caller
await ctx.started()

# we pin this task to keep the feeds manager active until the
# parent actor decides to tear it down
await trio.sleep_forever()
from piker.data import feed
assert not feed._bus

# allocate a nursery to the bus for spawning background
# tasks to service client IPC requests, normally
# `tractor.Context` connections to explicitly required
# `brokerd` endpoints such as:
# - `stream_quotes()`,
# - `manage_history()`,
# - `allocate_persistent_feed()`,
# - `open_symbol_search()`
# NOTE: see ep invocation details inside `.data.feed`.
try:
async with trio.open_nursery() as service_nursery:
bus: _FeedsBus = feed.get_feed_bus(
brokername,
service_nursery,
)
assert bus is feed._bus

# unblock caller
await ctx.started()

# we pin this task to keep the feeds manager active until the
# parent actor decides to tear it down
await trio.sleep_forever()

except eg.ExceptionGroup:
# TODO: likely some underlying `brokerd` IPC connection
# broke so here we handle a respawn and re-connect attempt!
# This likely should pair with development of the OCO task
# nusery in dev over @ `tractor` B)
# https://github.com/goodboy/tractor/pull/363
raise


async def spawn_brokerd(
Expand Down
2 changes: 1 addition & 1 deletion piker/data/feed.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ def remove_subs(

def get_feed_bus(
brokername: str,
nursery: Optional[trio.Nursery] = None,
nursery: trio.Nursery | None = None,

) -> _FeedsBus:
'''
Expand Down

0 comments on commit 3535986

Please sign in to comment.