Skip to content

Commit

Permalink
update: create a thread for every subscriber
Browse files Browse the repository at this point in the history
  • Loading branch information
Daniele Palaia authored and Daniele Palaia committed Oct 9, 2023
1 parent 641c0e8 commit dc88983
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 9 deletions.
31 changes: 22 additions & 9 deletions rstream/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def __init__(

self._last_heartbeat: float = 0
self._connection_closed_handler = connection_closed_handler
self._frames: asyncio.Queue = asyncio.Queue()
self._frames: dict[str, asyncio.Queue] = defaultdict(asyncio.Queue)

def start_task(self, name: str, coro: Awaitable[None]) -> None:
assert name not in self._tasks
Expand Down Expand Up @@ -180,21 +180,32 @@ async def start(self) -> None:
self._conn = Connection(self.host, self.port, self._ssl_context)
await self._conn.open()
self.start_task("listener", self._listener())
self.start_task("run_delivery_handlers", self._run_delivery_handlers())

self.add_handler(schema.Heartbeat, self._on_heartbeat)
self.add_handler(schema.Close, self._on_close)

async def _run_delivery_handlers(self):
async def run_queue_listener_task(self, subscriber_name: str):

if subscriber_name not in self._frames:
self.start_task(
"run_delivery_handlers" + subscriber_name, self._run_delivery_handlers(subscriber_name)
)

async def _run_delivery_handlers(self, subscriber_name: str):

while True:
frame_entry = None
try:
frame_entry = await self._frames.get()
frame_entry = await self._frames[subscriber_name].get()
maybe_coro = frame_entry.handler(frame_entry.frame)
if maybe_coro is not None:
await maybe_coro
except Exception:
logger.exception("Error while handling %s frame " + str(frame_entry.frame.__class__))
except Exception as e:
if frame_entry is not None:
logger.exception("Error while handling %s frame ", str(frame_entry.frame.__class__))
else:
logger.exception("Error while handling a frame " + str(e))
break

async def _listener(self) -> None:
assert self._conn
Expand Down Expand Up @@ -225,10 +236,10 @@ async def _listener(self) -> None:
fut = self._waiters[_key].pop()
fut.set_result(frame)

for _, handler in self._handlers.get(frame.__class__, {}).items():
for subscriber_name, handler in self._handlers.get(frame.__class__, {}).items():
try:
if frame.__class__ == schema.Deliver:
await self._frames.put(FrameEntry(handler, frame))
await self._frames[subscriber_name].put(FrameEntry(handler, frame))
else:
maybe_coro = handler(frame)
if maybe_coro is not None:
Expand Down Expand Up @@ -281,7 +292,9 @@ async def close(self) -> None:
resp_schema=schema.CloseResponse,
)

await self.stop_task("run_delivery_handlers")
for subscriber_name in self._frames:
await self.stop_task("run_delivery_handlers" + subscriber_name)

await self.stop_task("listener")

await self._conn.close()
Expand Down
2 changes: 2 additions & 0 deletions rstream/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,8 @@ async def subscribe(
offset=offset_specification.offset,
)

await subscriber.client.run_queue_listener_task(subscriber_name=subscriber.reference)

subscriber.client.add_handler(
schema.Deliver,
partial(self._on_deliver, subscriber=subscriber),
Expand Down

0 comments on commit dc88983

Please sign in to comment.