Skip to content

Commit

Permalink
Updated TC_CADMIN_1_3_4 test module:
Browse files Browse the repository at this point in the history
- Updated method to cross verify root public key, now using self.certificate_authority_manager.activeCaList
  • Loading branch information
j-ororke committed Oct 30, 2024
1 parent 9cbd719 commit 374dc49
Showing 1 changed file with 19 additions and 18 deletions.
37 changes: 19 additions & 18 deletions src/python_testing/TC_CADMIN_1_3_4.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,31 +27,33 @@
# --trace-to json:${TRACE_TEST_JSON}.json
# --trace-to perfetto:${TRACE_TEST_PERFETTO}.perfetto
# factory-reset: true
# quiet: true
# quiet: false
# run2:
# app: ${ALL_CLUSTERS_APP}
# app-args: --discriminator 1234 --KVS kvs1 --trace-to json:${TRACE_APP}.json
# script-args: >
# --tests test_TC_CADMIN_1_4
# --storage-path admin_storage.json
# --discriminator 1234
# --passcode 20202021
# --trace-to json:${TRACE_TEST_JSON}.json
# --trace-to perfetto:${TRACE_TEST_PERFETTO}.perfetto
# factory-reset: false
# quiet: true
# quiet: false
# === END CI TEST ARGUMENTS ===

import asyncio
import random
from time import sleep

import base64
import chip.clusters as Clusters
from chip import ChipDeviceCtrl
from chip.exceptions import ChipStackError
from chip.interaction_model import Status
from chip.tlv import TLVReader
from matter_testing_infrastructure.chip.testing.matter_testing import (MatterBaseTest, TestStep, async_test_body,
default_matter_test_main)
default_matter_test_main, MatterStackState)
from mdns_discovery import mdns_discovery
from mobly import asserts

Expand All @@ -65,12 +67,6 @@ async def get_fabrics(self, th: ChipDeviceCtrl) -> int:
fabric_info = await self.read_single_attribute_check_success(dev_ctrl=th, fabric_filtered=True, cluster=OC_cluster, attribute=OC_cluster.Attributes.Fabrics)
return fabric_info

async def get_rcac_decoded(self, th: str) -> int:
csrResponse = await self.send_single_cmd(dev_ctrl=th, node_id=self.dut_node_id, cmd=opcreds.Commands.CSRRequest(CSRNonce=nonce, isForUpdateNOC=False))
TH_certs_real = await th.IssueNOCChain(csrResponse, self.dut_node_id)
th_rcac_decoded = TLVReader(TH_certs_real.rcacBytes).get()["Any"]
return th_rcac_decoded

async def get_txt_record(self):
discovery = mdns_discovery.MdnsDiscovery(verbose_logging=True)
comm_service = await discovery.get_commissionable_service(
Expand Down Expand Up @@ -172,8 +168,8 @@ async def test_TC_CADMIN_1_3(self):

# Verify that the RootPublicKey matches the root public key for TH_CR1 and the NodeID matches the node ID used when TH_CR1 commissioned the device.
await self.send_single_cmd(dev_ctrl=self.th1, node_id=self.dut_node_id, cmd=Clusters.GeneralCommissioning.Commands.ArmFailSafe(10))
th1_rcac_decoded = await self.get_rcac_decoded(th=self.th1)
if th1_fabric_info[0].rootPublicKey != th1_rcac_decoded[9]:
th1_cam_rcac = TLVReader(base64.b64decode(self.certificate_authority_manager.activeCaList[0]._persistentStorage._jsonData["sdk-config"]["f/1/r"])).get()["Any"][9]
if th1_fabric_info[0].rootPublicKey != th1_cam_rcac:
asserts.fail("public keys from fabric and certs for TH1 are not the same")
if th1_fabric_info[0].nodeID != self.dut_node_id:
asserts.fail("DUT node ID from fabric does not equal DUT node ID for TH1 during commissioning")
Expand All @@ -187,8 +183,8 @@ async def test_TC_CADMIN_1_3(self):

# Verify that the RootPublicKey matches the root public key for TH_CR2 and the NodeID matches the node ID used when TH_CR2 commissioned the device.
await self.send_single_cmd(dev_ctrl=self.th2, node_id=self.dut_node_id, cmd=Clusters.GeneralCommissioning.Commands.ArmFailSafe(10))
th2_rcac_decoded = await self.get_rcac_decoded(th=self.th2)
if th2_fabric_info[0].rootPublicKey != th2_rcac_decoded[9]:
th2_cam_rcac = TLVReader(base64.b64decode(self.certificate_authority_manager.activeCaList[1]._persistentStorage._jsonData["sdk-config"]["f/2/r"])).get()["Any"][9]
if th2_fabric_info[0].rootPublicKey != th2_cam_rcac:
asserts.fail("public keys from fabric and certs for TH1 are not the same")
if th2_fabric_info[0].nodeID != self.dut_node_id:
asserts.fail("DUT node ID from fabric does not equal DUT node ID for TH1 during commissioning")
Expand Down Expand Up @@ -332,8 +328,8 @@ async def test_TC_CADMIN_1_4(self):

# Verify that the RootPublicKey matches the root public key for TH_CR1 and the NodeID matches the node ID used when TH_CR1 commissioned the device.
await self.send_single_cmd(dev_ctrl=self.th1, node_id=self.dut_node_id, cmd=Clusters.GeneralCommissioning.Commands.ArmFailSafe(10))
th1_rcac_decoded = await self.get_rcac_decoded(th=self.th1)
if th1_fabric_info[0].rootPublicKey != th1_rcac_decoded[9]:
th1_cam_rcac = TLVReader(base64.b64decode(self.certificate_authority_manager.activeCaList[0]._persistentStorage._jsonData["sdk-config"]["f/1/r"])).get()["Any"][9]
if th1_fabric_info[0].rootPublicKey != th1_cam_rcac:
asserts.fail("public keys from fabric and certs for TH1 are not the same")
if th1_fabric_info[0].nodeID != self.dut_node_id:
asserts.fail("DUT node ID from fabric does not equal DUT node ID for TH1 during commissioning")
Expand All @@ -347,8 +343,14 @@ async def test_TC_CADMIN_1_4(self):

# Verify that the RootPublicKey matches the root public key for TH_CR2 and the NodeID matches the node ID used when TH_CR2 commissioned the device.
await self.send_single_cmd(dev_ctrl=self.th2, node_id=self.dut_node_id, cmd=Clusters.GeneralCommissioning.Commands.ArmFailSafe(10))
th2_rcac_decoded = await self.get_rcac_decoded(th=self.th2)
if th2_fabric_info[0].rootPublicKey != th2_rcac_decoded[9]:

# Following try and except needed in order for this test to pass in CI since we are running this test back to back without factory-reset
try:
th2_cam_rcac = TLVReader(base64.b64decode(self.certificate_authority_manager.activeCaList[3]._persistentStorage._jsonData["sdk-config"]["f/4/r"])).get()["Any"][9]
except:
th2_cam_rcac = TLVReader(base64.b64decode(self.certificate_authority_manager.activeCaList[2]._persistentStorage._jsonData["sdk-config"]["f/3/r"])).get()["Any"][9]

if th2_fabric_info[0].rootPublicKey != th2_cam_rcac:
asserts.fail("public keys from fabric and certs for TH1 are not the same")
if th2_fabric_info[0].nodeID != self.dut_node_id:
asserts.fail("DUT node ID from fabric does not equal DUT node ID for TH1 during commissioning")
Expand All @@ -364,6 +366,5 @@ async def test_TC_CADMIN_1_4(self):
removeFabricCmd = Clusters.OperationalCredentials.Commands.RemoveFabric(th2_idx[outer_key][inner_key][attribute_key])
await self.th1.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=removeFabricCmd)


if __name__ == "__main__":
default_matter_test_main()

0 comments on commit 374dc49

Please sign in to comment.