Skip to content

Commit

Permalink
Implement existing scan commands on top of callback_for_commands
Browse files Browse the repository at this point in the history
  • Loading branch information
puddly committed Aug 19, 2024
1 parent ccb3afd commit 41684b9
Showing 1 changed file with 34 additions and 14 deletions.
48 changes: 34 additions & 14 deletions bellows/ezsp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,28 +224,44 @@ async def _list_command(
self, name, item_frames, completion_frame, spos, *args, **kwargs
):
"""Run a command, returning result callbacks as a list"""
fut = asyncio.Future()
queue = asyncio.Queue()
results = []

def cb(frame_name, response):
if frame_name in item_frames:
with self.callback_for_commands(
commands=set(item_frames) | {completion_frame},
callback=lambda command, response: queue.put_nowait((command, response)),
):
v = await self._command(name, *args, **kwargs)
if t.sl_Status.from_ember_status(v[0]) != t.sl_Status.OK:
raise Exception(v)

while True:
command, response = await queue.get()
if command == completion_frame:
if t.sl_Status.from_ember_status(response[spos]) != t.sl_Status.OK:
raise Exception(response)

break

results.append(response)
elif frame_name == completion_frame:
fut.set_result(response)

return results

@contextlib.contextmanager
def callback_for_commands(
self, commands: set[str], callback: Callable
) -> Generator[None]:
def cb(frame_name, response):
if frame_name in commands:
callback(frame_name, response)

cbid = self.add_callback(cb)

try:
v = await self._command(name, *args, **kwargs)
if t.sl_Status.from_ember_status(v[0]) != t.sl_Status.OK:
raise Exception(v)
v = await fut
if t.sl_Status.from_ember_status(v[spos]) != t.sl_Status.OK:
raise Exception(v)
yield
finally:
self.remove_callback(cbid)

return results

startScan = functools.partialmethod(
_list_command,
"startScan",
Expand All @@ -254,7 +270,11 @@ def cb(frame_name, response):
1,
)
pollForData = functools.partialmethod(
_list_command, "pollForData", ["pollHandler"], "pollCompleteHandler", 0
_list_command,
"pollForData",
["pollHandler"],
"pollCompleteHandler",
0,
)
zllStartScan = functools.partialmethod(
_list_command,
Expand Down

0 comments on commit 41684b9

Please sign in to comment.