Skip to content

Commit

Permalink
[Python] Avoid RuntimeException if APIs with future raise an error
Browse files Browse the repository at this point in the history
Currently, when calling an API which uses a future causes an error
(e.g. CommissionWithCode with an invalid code), then the API call
already returns an error. In this case the call `raise_on_error()` on
the returned PyChipError object make sure that an exception is raised.
However, this also causes the `CallbackContext` context manager to
exit.

At this point the future is initialized but never completed, which
triggers the previously introduced sanity check in `CallbackContext`:
`RuntimeError("CallbackContext future not completed")`.

It would be safe to just remove that sanity check, at least for this
case. But it seems cleaner to just explicitly cancel the future on
errors and leave the sanity check in place.

Also, since most API calls return PyChipError, this changes
`CallAsync()` to raise an exception by default instead of returning a
PyChipError object. If the PyChipError object is required or an API
returns something else, the CallAsyncWithResult() method can be used.
  • Loading branch information
agners committed Jul 17, 2024
1 parent 6cde0eb commit e6e2312
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 57 deletions.
132 changes: 79 additions & 53 deletions src/controller/python/chip/ChipDeviceCtrl.py
Original file line number Diff line number Diff line change
Expand Up @@ -603,23 +603,30 @@ async def ConnectBLE(self, discriminator: int, setupPinCode: int, nodeid: int, i

async with self._commissioning_context as ctx:
self._enablePairingCompleteCallback(True)
res = await self._ChipStack.CallAsync(
lambda: self._dmLib.pychip_DeviceController_ConnectBLE(
self.devCtrl, discriminator, isShortDiscriminator, setupPinCode, nodeid)
)
res.raise_on_error()
try:
await self._ChipStack.CallAsync(
lambda: self._dmLib.pychip_DeviceController_ConnectBLE(
self.devCtrl, discriminator, isShortDiscriminator, setupPinCode, nodeid)
)
except:
ctx.future.cancel()
raise

return await asyncio.futures.wrap_future(ctx.future)

async def UnpairDevice(self, nodeid: int) -> None:
self.CheckIsActive()

async with self._unpair_device_context as ctx:
res = await self._ChipStack.CallAsync(
lambda: self._dmLib.pychip_DeviceController_UnpairDevice(
self.devCtrl, nodeid, self.cbHandleDeviceUnpairCompleteFunct)
)
res.raise_on_error()
try:
await self._ChipStack.CallAsync(
lambda: self._dmLib.pychip_DeviceController_UnpairDevice(
self.devCtrl, nodeid, self.cbHandleDeviceUnpairCompleteFunct)
)
except:
ctx.future.cancel()
raise

return await asyncio.futures.wrap_future(ctx.future)

def CloseBLEConnection(self):
Expand Down Expand Up @@ -656,8 +663,11 @@ async def _establishPASESession(self, callFunct):

async with self._pase_establishment_context as ctx:
self._enablePairingCompleteCallback(True)
res = await self._ChipStack.CallAsync(callFunct)
res.raise_on_error()
try:
await self._ChipStack.CallAsync(callFunct)
except:
ctx.future.cancel()
raise
await asyncio.futures.wrap_future(ctx.future)

async def EstablishPASESessionBLE(self, setupPinCode: int, discriminator: int, nodeid: int) -> None:
Expand Down Expand Up @@ -756,13 +766,12 @@ async def DiscoverCommissionableNodes(self, filterType: discovery.FilterType = d
# Discovery is also used during commissioning. Make sure this manual discovery
# and commissioning attempts do not interfere with each other.
async with self._commissioning_lock:
res = await self._ChipStack.CallAsync(
await self._ChipStack.CallAsync(
lambda: self._dmLib.pychip_DeviceController_DiscoverCommissionableNodes(
self.devCtrl, int(filterType), str(filter).encode("utf-8")))
res.raise_on_error()

async def _wait_discovery():
while not await self._ChipStack.CallAsync(
while not await self._ChipStack.CallAsyncWithResult(
lambda: self._dmLib.pychip_DeviceController_HasDiscoveredCommissionableNode(self.devCtrl)):
await asyncio.sleep(0.1)
return
Expand All @@ -776,9 +785,8 @@ async def _wait_discovery():
# Expected timeout, do nothing
pass
finally:
res = await self._ChipStack.CallAsync(
await self._ChipStack.CallAsync(
lambda: self._dmLib.pychip_DeviceController_StopCommissionableDiscovery(self.devCtrl))
res.raise_on_error()

return await self.GetDiscoveredDevices()

Expand All @@ -796,7 +804,7 @@ def HandleDevice(deviceJson, deviceJsonLen):
self._dmLib.pychip_DeviceController_IterateDiscoveredCommissionableNodes(devCtrl.devCtrl, HandleDevice)
return devices

return await self._ChipStack.CallAsync(lambda: GetDevices(self))
return await self._ChipStack.CallAsyncWithResult(lambda: GetDevices(self))

def GetIPForDiscoveredDevice(self, idx, addrStr, length):
self.CheckIsActive()
Expand Down Expand Up @@ -828,11 +836,14 @@ async def OpenCommissioningWindow(self, nodeid: int, timeout: int, iteration: in
self.CheckIsActive()

async with self._open_window_context as ctx:
res = await self._ChipStack.CallAsync(
lambda: self._dmLib.pychip_DeviceController_OpenCommissioningWindow(
self.devCtrl, self.pairingDelegate, nodeid, timeout, iteration, discriminator, option)
)
res.raise_on_error()
try:
await self._ChipStack.CallAsync(
lambda: self._dmLib.pychip_DeviceController_OpenCommissioningWindow(
self.devCtrl, self.pairingDelegate, nodeid, timeout, iteration, discriminator, option)
)
except:
ctx.future.cancel()
raise

return await asyncio.futures.wrap_future(ctx.future)

Expand Down Expand Up @@ -896,14 +907,14 @@ async def FindOrEstablishPASESession(self, setupCode: str, nodeid: int, timeoutM
''' Returns CommissioneeDeviceProxy if we can find or establish a PASE connection to the specified device'''
self.CheckIsActive()
returnDevice = c_void_p(None)
res = await self._ChipStack.CallAsync(lambda: self._dmLib.pychip_GetDeviceBeingCommissioned(
res = await self._ChipStack.CallAsyncWithResult(lambda: self._dmLib.pychip_GetDeviceBeingCommissioned(
self.devCtrl, nodeid, byref(returnDevice)), timeoutMs)
if res.is_success:
return DeviceProxyWrapper(returnDevice, DeviceProxyWrapper.DeviceProxyType.COMMISSIONEE, self._dmLib)

await self.EstablishPASESession(setupCode, nodeid)

res = await self._ChipStack.CallAsync(lambda: self._dmLib.pychip_GetDeviceBeingCommissioned(
res = await self._ChipStack.CallAsyncWithResult(lambda: self._dmLib.pychip_GetDeviceBeingCommissioned(
self.devCtrl, nodeid, byref(returnDevice)), timeoutMs)
if res.is_success:
return DeviceProxyWrapper(returnDevice, DeviceProxyWrapper.DeviceProxyType.COMMISSIONEE, self._dmLib)
Expand Down Expand Up @@ -991,7 +1002,7 @@ async def GetConnectedDevice(self, nodeid, allowPASE: bool = True, timeoutMs: in

if allowPASE:
returnDevice = c_void_p(None)
res = await self._ChipStack.CallAsync(lambda: self._dmLib.pychip_GetDeviceBeingCommissioned(
res = await self._ChipStack.CallAsyncWithResult(lambda: self._dmLib.pychip_GetDeviceBeingCommissioned(
self.devCtrl, nodeid, byref(returnDevice)), timeoutMs)
if res.is_success:
LOGGER.info('Using PASE connection')
Expand Down Expand Up @@ -1021,10 +1032,9 @@ def deviceAvailable(self, device, err):

closure = DeviceAvailableClosure(eventLoop, future)
ctypes.pythonapi.Py_IncRef(ctypes.py_object(closure))
res = await self._ChipStack.CallAsync(lambda: self._dmLib.pychip_GetConnectedDeviceByNodeId(
await self._ChipStack.CallAsync(lambda: self._dmLib.pychip_GetConnectedDeviceByNodeId(
self.devCtrl, nodeid, ctypes.py_object(closure), _DeviceAvailableCallback),
timeoutMs)
res.raise_on_error()

# The callback might have been received synchronously (during self._ChipStack.CallAsync()).
# In that case the Future has already been set it will return immediately
Expand Down Expand Up @@ -1917,11 +1927,14 @@ async def Commission(self, nodeid) -> int:

async with self._commissioning_context as ctx:
self._enablePairingCompleteCallback(False)
res = await self._ChipStack.CallAsync(
lambda: self._dmLib.pychip_DeviceController_Commission(
self.devCtrl, nodeid)
)
res.raise_on_error()
try:
await self._ChipStack.CallAsync(
lambda: self._dmLib.pychip_DeviceController_Commission(
self.devCtrl, nodeid)
)
except:
ctx.future.cancel()
raise

return await asyncio.futures.wrap_future(ctx.future)

Expand Down Expand Up @@ -2065,11 +2078,14 @@ async def CommissionOnNetwork(self, nodeId: int, setupPinCode: int,

async with self._commissioning_context as ctx:
self._enablePairingCompleteCallback(True)
res = await self._ChipStack.CallAsync(
lambda: self._dmLib.pychip_DeviceController_OnNetworkCommission(
self.devCtrl, self.pairingDelegate, nodeId, setupPinCode, int(filterType), str(filter).encode("utf-8") if filter is not None else None, discoveryTimeoutMsec)
)
res.raise_on_error()
try:
await self._ChipStack.CallAsync(
lambda: self._dmLib.pychip_DeviceController_OnNetworkCommission(
self.devCtrl, self.pairingDelegate, nodeId, setupPinCode, int(filterType), str(filter).encode("utf-8") if filter is not None else None, discoveryTimeoutMsec)
)
except:
ctx.future.cancel()
raise

return await asyncio.futures.wrap_future(ctx.future)

Expand All @@ -2086,11 +2102,14 @@ async def CommissionWithCode(self, setupPayload: str, nodeid: int, discoveryType

async with self._commissioning_context as ctx:
self._enablePairingCompleteCallback(True)
res = await self._ChipStack.CallAsync(
lambda: self._dmLib.pychip_DeviceController_ConnectWithCode(
self.devCtrl, setupPayload.encode("utf-8"), nodeid, discoveryType.value)
)
res.raise_on_error()
try:
await self._ChipStack.CallAsync(
lambda: self._dmLib.pychip_DeviceController_ConnectWithCode(
self.devCtrl, setupPayload.encode("utf-8"), nodeid, discoveryType.value)
)
except:
ctx.future.cancel()
raise

return await asyncio.futures.wrap_future(ctx.future)

Expand All @@ -2106,11 +2125,14 @@ async def CommissionIP(self, ipaddr: str, setupPinCode: int, nodeid: int) -> int

async with self._commissioning_context as ctx:
self._enablePairingCompleteCallback(True)
res = await self._ChipStack.CallAsync(
lambda: self._dmLib.pychip_DeviceController_ConnectIP(
self.devCtrl, ipaddr.encode("utf-8"), setupPinCode, nodeid)
)
res.raise_on_error()
try:
await self._ChipStack.CallAsync(
lambda: self._dmLib.pychip_DeviceController_ConnectIP(
self.devCtrl, ipaddr.encode("utf-8"), setupPinCode, nodeid)
)
except:
ctx.future.cancel()
raise

return await asyncio.futures.wrap_future(ctx.future)

Expand All @@ -2127,11 +2149,15 @@ async def IssueNOCChain(self, csr: Clusters.OperationalCredentials.Commands.CSRR
self.CheckIsActive()

async with self._issue_node_chain_context as ctx:
res = await self._ChipStack.CallAsync(
lambda: self._dmLib.pychip_DeviceController_IssueNOCChain(
self.devCtrl, py_object(self), csr.NOCSRElements, len(csr.NOCSRElements), nodeId)
)
res.raise_on_error()
try:
await self._ChipStack.CallAsync(
lambda: self._dmLib.pychip_DeviceController_IssueNOCChain(
self.devCtrl, py_object(self), csr.NOCSRElements, len(csr.NOCSRElements), nodeId)
)
except:
ctx.future.cancel()
raise

return await asyncio.futures.wrap_future(ctx.future)


Expand Down
7 changes: 6 additions & 1 deletion src/controller/python/chip/ChipStack.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ def Call(self, callFunct, timeoutMs: int = None):
'''
return self.PostTaskOnChipThread(callFunct).Wait(timeoutMs)

async def CallAsync(self, callFunct, timeoutMs: int = None):
async def CallAsyncWithResult(self, callFunct, timeoutMs: int = None):
'''Run a Python function on CHIP stack, and wait for the response.
This function will post a task on CHIP mainloop and waits for the call response in a asyncio friendly manner.
'''
Expand All @@ -232,6 +232,11 @@ async def CallAsync(self, callFunct, timeoutMs: int = None):

return await asyncio.wait_for(callObj.future, timeoutMs / 1000 if timeoutMs else None)

async def CallAsync(self, callFunct, timeoutMs: int = None) -> None:
'''Run a Python function on CHIP stack, and wait for the response.'''
res: PyChipError = await self.CallAsyncWithResult(callFunct, timeoutMs)
res.raise_on_error()

def PostTaskOnChipThread(self, callFunct) -> AsyncCallableHandle:
'''Run a Python function on CHIP stack, and wait for the response.
This function will post a task on CHIP mainloop, and return an object with Wait() method for getting the result.
Expand Down
2 changes: 1 addition & 1 deletion src/controller/python/chip/clusters/Attribute.py
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ def OverrideLivenessTimeoutMs(self, timeoutMs: int):

async def TriggerResubscribeIfScheduled(self, reason: str):
handle = chip.native.GetLibraryHandle()
await builtins.chipStack.CallAsync(
await builtins.chipStack.CallAsyncWithResult(
lambda: handle.pychip_ReadClient_TriggerResubscribeIfScheduled(
self._readTransaction._pReadClient, reason.encode("utf-8"))
)
Expand Down
4 changes: 2 additions & 2 deletions src/controller/python/chip/clusters/Command.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ async def SendCommand(future: Future, eventLoop, responseType: Type, device, com

payloadTLV = payload.ToTLV()
ctypes.pythonapi.Py_IncRef(ctypes.py_object(transaction))
return await builtins.chipStack.CallAsync(
return await builtins.chipStack.CallAsyncWithResult(
lambda: handle.pychip_CommandSender_SendCommand(
ctypes.py_object(transaction), device,
c_uint16(0 if timedRequestTimeoutMs is None else timedRequestTimeoutMs), commandPath.EndpointId,
Expand Down Expand Up @@ -388,7 +388,7 @@ async def SendBatchCommands(future: Future, eventLoop, device, commands: List[In
transaction = AsyncBatchCommandsTransaction(future, eventLoop, responseTypes)
ctypes.pythonapi.Py_IncRef(ctypes.py_object(transaction))

return await builtins.chipStack.CallAsync(
return await builtins.chipStack.CallAsyncWithResult(
lambda: handle.pychip_CommandSender_SendBatchCommands(
py_object(transaction), device,
c_uint16(0 if timedRequestTimeoutMs is None else timedRequestTimeoutMs),
Expand Down

0 comments on commit e6e2312

Please sign in to comment.