diff --git a/src/python_testing/TC_CADMIN_1_3_4.py b/src/python_testing/TC_CADMIN_1_3_4.py index 029a27af346e23..674e595fef3320 100644 --- a/src/python_testing/TC_CADMIN_1_3_4.py +++ b/src/python_testing/TC_CADMIN_1_3_4.py @@ -27,31 +27,33 @@ # --trace-to json:${TRACE_TEST_JSON}.json # --trace-to perfetto:${TRACE_TEST_PERFETTO}.perfetto # factory-reset: true -# quiet: true +# quiet: false # run2: # app: ${ALL_CLUSTERS_APP} # app-args: --discriminator 1234 --KVS kvs1 --trace-to json:${TRACE_APP}.json # script-args: > +# --tests test_TC_CADMIN_1_4 # --storage-path admin_storage.json # --discriminator 1234 # --passcode 20202021 # --trace-to json:${TRACE_TEST_JSON}.json # --trace-to perfetto:${TRACE_TEST_PERFETTO}.perfetto # factory-reset: false -# quiet: true +# quiet: false # === END CI TEST ARGUMENTS === import asyncio import random from time import sleep +import base64 import chip.clusters as Clusters from chip import ChipDeviceCtrl from chip.exceptions import ChipStackError from chip.interaction_model import Status from chip.tlv import TLVReader from matter_testing_infrastructure.chip.testing.matter_testing import (MatterBaseTest, TestStep, async_test_body, - default_matter_test_main) + default_matter_test_main, MatterStackState) from mdns_discovery import mdns_discovery from mobly import asserts @@ -65,12 +67,6 @@ async def get_fabrics(self, th: ChipDeviceCtrl) -> int: fabric_info = await self.read_single_attribute_check_success(dev_ctrl=th, fabric_filtered=True, cluster=OC_cluster, attribute=OC_cluster.Attributes.Fabrics) return fabric_info - async def get_rcac_decoded(self, th: str) -> int: - csrResponse = await self.send_single_cmd(dev_ctrl=th, node_id=self.dut_node_id, cmd=opcreds.Commands.CSRRequest(CSRNonce=nonce, isForUpdateNOC=False)) - TH_certs_real = await th.IssueNOCChain(csrResponse, self.dut_node_id) - th_rcac_decoded = TLVReader(TH_certs_real.rcacBytes).get()["Any"] - return th_rcac_decoded - async def get_txt_record(self): discovery = mdns_discovery.MdnsDiscovery(verbose_logging=True) comm_service = await discovery.get_commissionable_service( @@ -172,8 +168,8 @@ async def test_TC_CADMIN_1_3(self): # Verify that the RootPublicKey matches the root public key for TH_CR1 and the NodeID matches the node ID used when TH_CR1 commissioned the device. await self.send_single_cmd(dev_ctrl=self.th1, node_id=self.dut_node_id, cmd=Clusters.GeneralCommissioning.Commands.ArmFailSafe(10)) - th1_rcac_decoded = await self.get_rcac_decoded(th=self.th1) - if th1_fabric_info[0].rootPublicKey != th1_rcac_decoded[9]: + th1_cam_rcac = TLVReader(base64.b64decode(self.certificate_authority_manager.activeCaList[0]._persistentStorage._jsonData["sdk-config"]["f/1/r"])).get()["Any"][9] + if th1_fabric_info[0].rootPublicKey != th1_cam_rcac: asserts.fail("public keys from fabric and certs for TH1 are not the same") if th1_fabric_info[0].nodeID != self.dut_node_id: asserts.fail("DUT node ID from fabric does not equal DUT node ID for TH1 during commissioning") @@ -187,8 +183,8 @@ async def test_TC_CADMIN_1_3(self): # Verify that the RootPublicKey matches the root public key for TH_CR2 and the NodeID matches the node ID used when TH_CR2 commissioned the device. await self.send_single_cmd(dev_ctrl=self.th2, node_id=self.dut_node_id, cmd=Clusters.GeneralCommissioning.Commands.ArmFailSafe(10)) - th2_rcac_decoded = await self.get_rcac_decoded(th=self.th2) - if th2_fabric_info[0].rootPublicKey != th2_rcac_decoded[9]: + th2_cam_rcac = TLVReader(base64.b64decode(self.certificate_authority_manager.activeCaList[1]._persistentStorage._jsonData["sdk-config"]["f/2/r"])).get()["Any"][9] + if th2_fabric_info[0].rootPublicKey != th2_cam_rcac: asserts.fail("public keys from fabric and certs for TH1 are not the same") if th2_fabric_info[0].nodeID != self.dut_node_id: asserts.fail("DUT node ID from fabric does not equal DUT node ID for TH1 during commissioning") @@ -332,8 +328,8 @@ async def test_TC_CADMIN_1_4(self): # Verify that the RootPublicKey matches the root public key for TH_CR1 and the NodeID matches the node ID used when TH_CR1 commissioned the device. await self.send_single_cmd(dev_ctrl=self.th1, node_id=self.dut_node_id, cmd=Clusters.GeneralCommissioning.Commands.ArmFailSafe(10)) - th1_rcac_decoded = await self.get_rcac_decoded(th=self.th1) - if th1_fabric_info[0].rootPublicKey != th1_rcac_decoded[9]: + th1_cam_rcac = TLVReader(base64.b64decode(self.certificate_authority_manager.activeCaList[0]._persistentStorage._jsonData["sdk-config"]["f/1/r"])).get()["Any"][9] + if th1_fabric_info[0].rootPublicKey != th1_cam_rcac: asserts.fail("public keys from fabric and certs for TH1 are not the same") if th1_fabric_info[0].nodeID != self.dut_node_id: asserts.fail("DUT node ID from fabric does not equal DUT node ID for TH1 during commissioning") @@ -347,8 +343,14 @@ async def test_TC_CADMIN_1_4(self): # Verify that the RootPublicKey matches the root public key for TH_CR2 and the NodeID matches the node ID used when TH_CR2 commissioned the device. await self.send_single_cmd(dev_ctrl=self.th2, node_id=self.dut_node_id, cmd=Clusters.GeneralCommissioning.Commands.ArmFailSafe(10)) - th2_rcac_decoded = await self.get_rcac_decoded(th=self.th2) - if th2_fabric_info[0].rootPublicKey != th2_rcac_decoded[9]: + + # Following try and except needed in order for this test to pass in CI since we are running this test back to back without factory-reset + try: + th2_cam_rcac = TLVReader(base64.b64decode(self.certificate_authority_manager.activeCaList[3]._persistentStorage._jsonData["sdk-config"]["f/4/r"])).get()["Any"][9] + except: + th2_cam_rcac = TLVReader(base64.b64decode(self.certificate_authority_manager.activeCaList[2]._persistentStorage._jsonData["sdk-config"]["f/3/r"])).get()["Any"][9] + + if th2_fabric_info[0].rootPublicKey != th2_cam_rcac: asserts.fail("public keys from fabric and certs for TH1 are not the same") if th2_fabric_info[0].nodeID != self.dut_node_id: asserts.fail("DUT node ID from fabric does not equal DUT node ID for TH1 during commissioning") @@ -364,6 +366,5 @@ async def test_TC_CADMIN_1_4(self): removeFabricCmd = Clusters.OperationalCredentials.Commands.RemoveFabric(th2_idx[outer_key][inner_key][attribute_key]) await self.th1.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=removeFabricCmd) - if __name__ == "__main__": default_matter_test_main()