From 5e3a41179dbdf68a2e14048574b313948b5f1785 Mon Sep 17 00:00:00 2001 From: Stefan Agner Date: Thu, 13 Jun 2024 01:10:41 +0200 Subject: [PATCH] [Python] Use thread-safe futures for concurrent operations Instead of using quasi-global variables in the ChipStack singleton use device controller local futures to store results from callbacks. This has several advantages, namely: - Avoid unnecessary shared state between device controllers - Avoid unnecessary shared state between various operations within a device controller (those who don't share callbacks could be called from different threads now) - Explicitly set Futures to None to detect spurious/unexpected callbacks - Better code readability - concurrent.futures are thread-safe - Will make asyncio transition easier This change shouldn't change the external API. --- src/controller/python/chip/ChipDeviceCtrl.py | 280 +++++++++++-------- src/controller/python/chip/ChipStack.py | 25 -- 2 files changed, 161 insertions(+), 144 deletions(-) diff --git a/src/controller/python/chip/ChipDeviceCtrl.py b/src/controller/python/chip/ChipDeviceCtrl.py index e9ff7280d9a31d..38e594f3b59434 100644 --- a/src/controller/python/chip/ChipDeviceCtrl.py +++ b/src/controller/python/chip/ChipDeviceCtrl.py @@ -30,6 +30,7 @@ import asyncio import builtins +import concurrent.futures import copy import ctypes import enum @@ -330,7 +331,6 @@ class ChipDeviceControllerBase(): activeList = set() def __init__(self, name: str = ''): - self.state = DCState.NOT_INITIALIZED self.devCtrl = None self._ChipStack = builtins.chipStack self._dmLib = None @@ -348,22 +348,28 @@ def __init__(self, name: str = ''): self._Cluster = ChipClusters(builtins.chipStack) self._Cluster.InitLib(self._dmLib) + self._commissioning_complete_future: typing.Optional[concurrent.futures.Future] = None + self._open_window_complete_future: typing.Optional[concurrent.futures.Future] = None + self._unpair_device_complete_future: typing.Optional[concurrent.futures.Future] = None + self._pase_establishment_complete_future: typing.Optional[concurrent.futures.Future] = None def _set_dev_ctrl(self, devCtrl, pairingDelegate): - def HandleCommissioningComplete(nodeid, err): + def HandleCommissioningComplete(nodeId: int, err: PyChipError): if err.is_success: logging.info("Commissioning complete") else: logging.warning("Failed to commission: {}".format(err)) self._dmLib.pychip_DeviceController_SetIcdRegistrationParameters(False, None) - self.state = DCState.IDLE - self._ChipStack.callbackRes = err - self._ChipStack.commissioningEventRes = err + if self._dmLib.pychip_TestCommissionerUsed(): - self._ChipStack.commissioningEventRes = self._dmLib.pychip_GetCompletionError() - self._ChipStack.commissioningCompleteEvent.set() - self._ChipStack.completeEvent.set() + err = self._dmLib.pychip_GetCompletionError() + + if self._commissioning_complete_future is None: + logging.exception("HandleCommissioningComplete called unexpectedly") + return + + self._commissioning_complete_future.set_result(err) def HandleFabricCheck(nodeId): self.fabricCheckNodeId = nodeId @@ -372,13 +378,19 @@ def HandleOpenWindowComplete(nodeid: int, setupPinCode: int, setupManualCode: st setupQRCode: str, err: PyChipError) -> None: if err.is_success: logging.info("Open Commissioning Window complete setting nodeid {} pincode to {}".format(nodeid, setupPinCode)) - self._ChipStack.openCommissioningWindowPincode[nodeid] = CommissioningParameters( + commissioningParameters = CommissioningParameters( setupPinCode=setupPinCode, setupManualCode=setupManualCode.decode(), setupQRCode=setupQRCode.decode()) else: logging.warning("Failed to open commissioning window: {}".format(err)) - self._ChipStack.callbackRes = err - self._ChipStack.completeEvent.set() + if self._open_window_complete_future is None: + logging.exception("HandleOpenWindowComplete called unexpectedly") + return + + if err.is_success: + self._open_window_complete_future.set_result(commissioningParameters) + else: + self._open_window_complete_future.set_exception(err.to_exception()) def HandleUnpairDeviceComplete(nodeid: int, err: PyChipError): if err.is_success: @@ -386,27 +398,33 @@ def HandleUnpairDeviceComplete(nodeid: int, err: PyChipError): else: logging.warning("Failed to unpair device: {}".format(err)) - self._ChipStack.callbackRes = err - self._ChipStack.completeEvent.set() + if self._unpair_device_complete_future is None: + logging.exception("HandleUnpairDeviceComplete called unexpectedly") + return + + if err.is_success: + self._unpair_device_complete_future.set_result() + else: + self._unpair_device_complete_future.set_exception(err.to_exception()) def HandlePASEEstablishmentComplete(err: PyChipError): if not err.is_success: logging.warning("Failed to establish secure session to device: {}".format(err)) - self._ChipStack.callbackRes = err.to_exception() else: logging.info("Established secure session with Device") - if self.state != DCState.COMMISSIONING: - # During Commissioning, HandlePASEEstablishmentComplete will also be called, - # in this case the async operation should be marked as finished by - # HandleCommissioningComplete instead this function. - self.state = DCState.IDLE - self._ChipStack.completeEvent.set() - else: - # When commissioning, getting an error during key exhange - # needs to unblock the entire commissioning flow. + if self._commissioning_complete_future is not None: + # During Commissioning, HandlePASEEstablishmentComplete will also be called. + # Only complete the future if PASE session establishment failed. if not err.is_success: - HandleCommissioningComplete(0, err) + self._commissioning_complete_future.set_result(err) + return + + if self._pase_establishment_complete_future is None: + logging.exception("HandlePASEEstablishmentComplete called unexpectedly") + return + + self._pase_establishment_complete_future.set_result(err) self.pairingDelegate = pairingDelegate self.devCtrl = devCtrl @@ -529,26 +547,31 @@ def IsConnected(self): def ConnectBLE(self, discriminator, setupPinCode, nodeid) -> PyChipError: self.CheckIsActive() - self._ChipStack.commissioningCompleteEvent.clear() + self._commissioning_complete_future = concurrent.futures.Future() - self.state = DCState.COMMISSIONING - self._enablePairingCompeleteCallback(True) - self._ChipStack.CallAsyncWithCompleteCallback( - lambda: self._dmLib.pychip_DeviceController_ConnectBLE( - self.devCtrl, discriminator, setupPinCode, nodeid) - ).raise_on_error() - if not self._ChipStack.commissioningCompleteEvent.isSet(): - # Error 50 is a timeout - return PyChipError(CHIP_ERROR_TIMEOUT) - return self._ChipStack.commissioningEventRes + try: + self._enablePairingCompeleteCallback(True) + self._ChipStack.Call( + lambda: self._dmLib.pychip_DeviceController_ConnectBLE( + self.devCtrl, discriminator, setupPinCode, nodeid) + ).raise_on_error() + + return self._commissioning_complete_future.result() + finally: + self._commissioning_complete_future = None - def UnpairDevice(self, nodeid: int): + def UnpairDevice(self, nodeid: int) -> None: self.CheckIsActive() - return self._ChipStack.CallAsyncWithCompleteCallback( - lambda: self._dmLib.pychip_DeviceController_UnpairDevice( - self.devCtrl, nodeid, self.cbHandleDeviceUnpairCompleteFunct) - ).raise_on_error() + self._unpair_device_complete_future = concurrent.futures.Future() + try: + self._ChipStack.Call( + lambda: self._dmLib.pychip_DeviceController_UnpairDevice( + self.devCtrl, nodeid, self.cbHandleDeviceUnpairCompleteFunct) + ).raise_on_error() + self._unpair_device_complete_future.result() + finally: + self._unpair_device_complete_future = None def CloseBLEConnection(self): self.CheckIsActive() @@ -582,32 +605,44 @@ def CloseSession(self, nodeid): def EstablishPASESessionBLE(self, setupPinCode: int, discriminator: int, nodeid: int): self.CheckIsActive() - self.state = DCState.RENDEZVOUS_ONGOING - self._enablePairingCompeleteCallback(True) - return self._ChipStack.CallAsyncWithCompleteCallback( - lambda: self._dmLib.pychip_DeviceController_EstablishPASESessionBLE( - self.devCtrl, setupPinCode, discriminator, nodeid) - ) + self._pase_establishment_complete_future = concurrent.futures.Future() + try: + self._enablePairingCompeleteCallback(True) + self._ChipStack.Call( + lambda: self._dmLib.pychip_DeviceController_EstablishPASESessionBLE( + self.devCtrl, setupPinCode, discriminator, nodeid) + ).raise_on_error() + return self._pase_establishment_complete_future.result() + finally: + self._pase_establishment_complete_future = None def EstablishPASESessionIP(self, ipaddr: str, setupPinCode: int, nodeid: int, port: int = 0): self.CheckIsActive() - self.state = DCState.RENDEZVOUS_ONGOING - self._enablePairingCompeleteCallback(True) - return self._ChipStack.CallAsyncWithCompleteCallback( - lambda: self._dmLib.pychip_DeviceController_EstablishPASESessionIP( - self.devCtrl, ipaddr.encode("utf-8"), setupPinCode, nodeid, port) - ) + self._pase_establishment_complete_future = concurrent.futures.Future() + try: + self._enablePairingCompeleteCallback(True) + self._ChipStack.Call( + lambda: self._dmLib.pychip_DeviceController_EstablishPASESessionIP( + self.devCtrl, ipaddr.encode("utf-8"), setupPinCode, nodeid, port) + ).raise_on_error() + return self._pase_establishment_complete_future.result() + finally: + self._pase_establishment_complete_future = None def EstablishPASESession(self, setUpCode: str, nodeid: int): self.CheckIsActive() - self.state = DCState.RENDEZVOUS_ONGOING - self._enablePairingCompeleteCallback(True) - return self._ChipStack.CallAsyncWithCompleteCallback( - lambda: self._dmLib.pychip_DeviceController_EstablishPASESession( - self.devCtrl, setUpCode.encode("utf-8"), nodeid) - ) + self._pase_establishment_complete_future = concurrent.futures.Future() + try: + self._enablePairingCompeleteCallback(True) + self._ChipStack.Call( + lambda: self._dmLib.pychip_DeviceController_EstablishPASESession( + self.devCtrl, setUpCode.encode("utf-8"), nodeid) + ).raise_on_error() + return self._pase_establishment_complete_future.result() + finally: + self._pase_establishment_complete_future = None def GetTestCommissionerUsed(self): return self._ChipStack.Call( @@ -642,11 +677,6 @@ def CheckStageSuccessful(self, stage: int): def CheckTestCommissionerPaseConnection(self, nodeid): return self._dmLib.pychip_TestPaseConnection(nodeid) - def NOCChainCallback(self, nocChain): - self._ChipStack.callbackRes = nocChain - self._ChipStack.completeEvent.set() - return - def ResolveNode(self, nodeid): self.CheckIsActive() @@ -753,12 +783,16 @@ def OpenCommissioningWindow(self, nodeid: int, timeout: int, iteration: int, Returns CommissioningParameters ''' self.CheckIsActive() - self._ChipStack.CallAsyncWithCompleteCallback( - lambda: self._dmLib.pychip_DeviceController_OpenCommissioningWindow( - self.devCtrl, self.pairingDelegate, nodeid, timeout, iteration, discriminator, option) - ).raise_on_error() - self._ChipStack.callbackRes.raise_on_error() - return self._ChipStack.openCommissioningWindowPincode[nodeid] + self._open_window_complete_future = concurrent.futures.Future() + try: + self._ChipStack.Call( + lambda: self._dmLib.pychip_DeviceController_OpenCommissioningWindow( + self.devCtrl, self.pairingDelegate, nodeid, timeout, iteration, discriminator, option) + ).raise_on_error() + + return self._open_window_complete_future.result() + finally: + self._open_window_complete_future = None def GetCompressedFabricId(self): self.CheckIsActive() @@ -1773,6 +1807,7 @@ def __init__(self, opCredsContext: ctypes.c_void_p, fabricId: int, nodeId: int, f"caIndex({fabricAdmin.caIndex:x})/fabricId(0x{fabricId:016X})/nodeId(0x{nodeId:016X})" ) + self._issue_node_chain_complete: typing.Optional[concurrent.futures.Future] = None self._dmLib.pychip_DeviceController_SetIssueNOCChainCallbackPythonCallback(_IssueNOCChainCallbackPythonCallback) pairingDelegate = c_void_p(None) @@ -1823,17 +1858,18 @@ def Commission(self, nodeid) -> PyChipError: bool: True if successful, False otherwise. ''' self.CheckIsActive() - self._ChipStack.commissioningCompleteEvent.clear() - self.state = DCState.COMMISSIONING - self._ChipStack.CallAsyncWithCompleteCallback( - lambda: self._dmLib.pychip_DeviceController_Commission( - self.devCtrl, nodeid) - ) - if not self._ChipStack.commissioningCompleteEvent.isSet(): - # Error 50 is a timeout - return PyChipError(CHIP_ERROR_TIMEOUT) - return self._ChipStack.commissioningEventRes + self._commissioning_complete_future = concurrent.futures.Future() + + try: + self._ChipStack.Call( + lambda: self._dmLib.pychip_DeviceController_Commission( + self.devCtrl, nodeid) + ).raise_on_error() + + return self._commissioning_complete_future.result() + finally: + self._commissioning_complete_future = None def CommissionThread(self, discriminator, setupPinCode, nodeId, threadOperationalDataset: bytes) -> PyChipError: ''' Commissions a Thread device over BLE @@ -1971,17 +2007,17 @@ def CommissionOnNetwork(self, nodeId: int, setupPinCode: int, if isinstance(filter, int): filter = str(filter) - self._ChipStack.commissioningCompleteEvent.clear() + self._commissioning_complete_future = concurrent.futures.Future() + try: + self._enablePairingCompeleteCallback(True) + self._ChipStack.Call( + lambda: self._dmLib.pychip_DeviceController_OnNetworkCommission( + self.devCtrl, self.pairingDelegate, nodeId, setupPinCode, int(filterType), str(filter).encode("utf-8") + b"\x00" if filter is not None else None, discoveryTimeoutMsec) + ).raise_on_error() - self._enablePairingCompeleteCallback(True) - self._ChipStack.CallAsyncWithCompleteCallback( - lambda: self._dmLib.pychip_DeviceController_OnNetworkCommission( - self.devCtrl, self.pairingDelegate, nodeId, setupPinCode, int(filterType), str(filter).encode("utf-8") + b"\x00" if filter is not None else None, discoveryTimeoutMsec) - ) - if not self._ChipStack.commissioningCompleteEvent.isSet(): - # Error 50 is a timeout - return PyChipError(CHIP_ERROR_TIMEOUT) - return self._ChipStack.commissioningEventRes + return self._commissioning_complete_future.result() + finally: + self._commissioning_complete_future = None def CommissionWithCode(self, setupPayload: str, nodeid: int, discoveryType: DiscoveryType = DiscoveryType.DISCOVERY_ALL) -> PyChipError: ''' Commission with the given nodeid from the setupPayload. @@ -1991,51 +2027,57 @@ def CommissionWithCode(self, setupPayload: str, nodeid: int, discoveryType: Disc setupPayload = setupPayload.encode() + b'\0' - # IP connection will run through full commissioning, so we need to wait - # for the commissioning complete event, not just any callback. - self.state = DCState.COMMISSIONING + self._commissioning_complete_future = concurrent.futures.Future() - self._ChipStack.commissioningCompleteEvent.clear() + try: + self._enablePairingCompeleteCallback(True) + self._ChipStack.Call( + lambda: self._dmLib.pychip_DeviceController_ConnectWithCode( + self.devCtrl, setupPayload, nodeid, discoveryType.value) + ).raise_on_error() - self._enablePairingCompeleteCallback(True) - self._ChipStack.CallAsyncWithCompleteCallback( - lambda: self._dmLib.pychip_DeviceController_ConnectWithCode( - self.devCtrl, setupPayload, nodeid, discoveryType.value) - ) - if not self._ChipStack.commissioningCompleteEvent.isSet(): - # Error 50 is a timeout - return PyChipError(CHIP_ERROR_TIMEOUT) - return self._ChipStack.commissioningEventRes + return self._commissioning_complete_future.result() + finally: + self._commissioning_complete_future = None def CommissionIP(self, ipaddr: str, setupPinCode: int, nodeid: int) -> PyChipError: """ DEPRECATED, DO NOT USE! Use `CommissionOnNetwork` or `CommissionWithCode` """ self.CheckIsActive() - # IP connection will run through full commissioning, so we need to wait - # for the commissioning complete event, not just any callback. - self.state = DCState.COMMISSIONING + self._commissioning_complete_future = concurrent.futures.Future() + + try: + self._enablePairingCompeleteCallback(True) + self._ChipStack.Call( + lambda: self._dmLib.pychip_DeviceController_ConnectIP( + self.devCtrl, ipaddr.encode("utf-8"), setupPinCode, nodeid) + ).raise_on_error() - self._ChipStack.commissioningCompleteEvent.clear() + return self._commissioning_complete_future.result() + finally: + self._commissioning_complete_future = None - self._enablePairingCompeleteCallback(True) - self._ChipStack.CallAsyncWithCompleteCallback( - lambda: self._dmLib.pychip_DeviceController_ConnectIP( - self.devCtrl, ipaddr.encode("utf-8"), setupPinCode, nodeid) - ) - if not self._ChipStack.commissioningCompleteEvent.isSet(): - # Error 50 is a timeout - return PyChipError(CHIP_ERROR_TIMEOUT) - return self._ChipStack.commissioningEventRes + def NOCChainCallback(self, nocChain): + if self._issue_node_chain_complete is None: + logging.exception("NOCChainCallback while not expecting a callback") + return + self._issue_node_chain_complete.set_result(nocChain) + return def IssueNOCChain(self, csr: Clusters.OperationalCredentials.Commands.CSRResponse, nodeId: int): """Issue an NOC chain using the associated OperationalCredentialsDelegate. The NOC chain will be provided in TLV cert format.""" self.CheckIsActive() - return self._ChipStack.CallAsyncWithCompleteCallback( - lambda: self._dmLib.pychip_DeviceController_IssueNOCChain( - self.devCtrl, py_object(self), csr.NOCSRElements, len(csr.NOCSRElements), nodeId) - ) + self._issue_node_chain_complete = concurrent.futures.Future() + try: + self._ChipStack.Call( + lambda: self._dmLib.pychip_DeviceController_IssueNOCChain( + self.devCtrl, py_object(self), csr.NOCSRElements, len(csr.NOCSRElements), nodeId) + ).raise_on_error() + return self._issue_node_chain_complete.result() + finally: + self._issue_node_chain_complete = None class BareChipDeviceController(ChipDeviceControllerBase): diff --git a/src/controller/python/chip/ChipStack.py b/src/controller/python/chip/ChipStack.py index 5fd0601ba204e7..3d61eac2ac192a 100644 --- a/src/controller/python/chip/ChipStack.py +++ b/src/controller/python/chip/ChipStack.py @@ -144,14 +144,9 @@ class ChipStack(object): def __init__(self, persistentStoragePath: str, enableServerInteractions=True): builtins.enableDebugMode = False - self.completeEvent = Event() - self.commissioningCompleteEvent = Event() self._ChipStackLib = None self._chipDLLPath = None self.devMgr = None - self.callbackRes = None - self.commissioningEventRes = None - self.openCommissioningWindowPincode = {} self._enableServerInteractions = enableServerInteractions # @@ -212,7 +207,6 @@ def Shutdown(self): self._ChipStackLib = None self._chipDLLPath = None self.devMgr = None - self.callbackRes = None delattr(builtins, "chipStack") @@ -239,25 +233,6 @@ async def CallAsync(self, callFunct, timeoutMs: int = None): return await asyncio.wait_for(callObj.future, timeoutMs / 1000 if timeoutMs else None) - def CallAsyncWithCompleteCallback(self, callFunct): - '''Run a Python function on CHIP stack, and wait for the application specific response. - This function is a wrapper of PostTaskOnChipThread, which includes some handling of application specific logics. - Calling this function on CHIP on CHIP mainloop thread will cause deadlock. - Make sure to register the necessary callbacks which release the function by setting the completeEvent. - ''' - # throw error if op in progress - self.callbackRes = None - self.completeEvent.clear() - res = self.PostTaskOnChipThread(callFunct).Wait() - - if not res.is_success: - self.completeEvent.set() - raise res.to_exception() - self.completeEvent.wait() - if isinstance(self.callbackRes, ChipStackException): - raise self.callbackRes - return self.callbackRes - def PostTaskOnChipThread(self, callFunct) -> AsyncCallableHandle: '''Run a Python function on CHIP stack, and wait for the response. This function will post a task on CHIP mainloop, and return an object with Wait() method for getting the result.