Skip to content

Commit

Permalink
Updated TC_DEM_2.10 (Q Quality) Basic structure is there - steps need…
Browse files Browse the repository at this point in the history
… tidying up and fails due to powerAdjustmentCapability not being updated as expected.
  • Loading branch information
jamesharrow committed Aug 28, 2024
1 parent 7012080 commit 58e6bf3
Showing 1 changed file with 199 additions and 58 deletions.
257 changes: 199 additions & 58 deletions src/python_testing/TC_DEM_2_10.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

import logging
import time
import sys

import chip.clusters as Clusters
from chip.interaction_model import Status
Expand All @@ -53,107 +54,247 @@ def desc_TC_DEM_2_10(self) -> str:
def pics_TC_DEM_2_10(self):
"""Return the PICS definitions associated with this test."""
pics = [
# Depends on Feature 05 (ForecastAdjustment) & Feature 02 (StateForecastReporting)
"DEM.S.F05", "DEM.S.F02"
"DEM.S"
]
return pics

def steps_TC_DEM_2_10(self) -> list[TestStep]:
"""Execute the test steps."""
steps = [
TestStep("1", "Commission DUT to TH"),
TestStep("2", "TH reads from the DUT the Featuremap attribute",
"Verify that the DUT response contains the Featuremap attribute. Verify ForecastAdjustment and StateForecastReporting is supported. Verify PowerForecastReporting is not supported."),
TestStep("1", "Commission DUT to TH (can be skipped if done in a preceding test)"),
TestStep("2", "TH reads from the DUT the FeatureMap attribute",
"Verify that the DUT response contains the FeatureMap attribute. Store the value as FeatureMap."),
TestStep("3", "TH reads TestEventTriggersEnabled attribute from General Diagnostics Cluster",
"Value has to be 1 (True)"),
TestStep("4", "TH sends TestEventTrigger command to General Diagnostics Cluster on Endpoint 0 with EnableKey field set to PIXIT.DEM.TESTEVENT_TRIGGERKEY and EventTrigger field set to PIXIT.DEM.TESTEVENTTRIGGER for Forecast Adjustment Test Event",
TestStep("4", "Set up a subscription to the DeviceEnergyManagement cluster, with MinIntervalFloor set to 0, MaxIntervalCeiling set to 10 and KeepSubscriptions set to false",
"Subscription successfully established"),
TestStep("5", "TH sends TestEventTrigger command to General Diagnostics Cluster on Endpoint 0 with EnableKey field set to PIXIT.DEM.TESTEVENT_TRIGGERKEY and EventTrigger field set to PIXIT.DEM.TESTEVENTTRIGGER for User Opt-out Test Event Clear",
"Verify DUT responds w/ status SUCCESS(0x00)"),
TestStep("4a", "TH reads from the DUT the ESAState",
"Value has to be 0x01 (Online)"),
TestStep("4b", "TH reads from the DUT the Forecast",
"Value has to include slots[0].MinDurationAdjustment, slots[0].MaxDurationAdjustment"),
TestStep("4c", "TH reads from the DUT the OptOutState",
TestStep("5a", "TH reads from the DUT the OptOutState",
"Value has to be 0x00 (NoOptOut)"),
TestStep("5", "Set up a subscription to the Forecast attribute, with MinIntervalFloor set to 0, MaxIntervalCeiling set to 10 and KeepSubscriptions set to false",
"Subscription successfully established"),
TestStep("6", "TH sends TestEventTrigger command to General Diagnostics Cluster on Endpoint 0 with EnableKey field set to PIXIT.DEM.TESTEVENT_TRIGGERKEY and EventTrigger field set to PIXIT.DEM.TESTEVENTTRIGGER for User Opt-out Local Optimization Test Event",
TestStep("6", "If {PICS_S_FA} {featIsNotSupported} skip to step 14",
"Value has to be 0x00 (NoOptOut)"),

TestStep("7", "TH sends TestEventTrigger command to General Diagnostics Cluster on Endpoint 0 with EnableKey field set to PIXIT.DEM.TESTEVENT_TRIGGERKEY and EventTrigger field set to PIXIT.DEM.TESTEVENTTRIGGER for Forecast Adjustment Test Event",
"Verify DUT responds w/ status SUCCESS(0x00)"),
TestStep("6a", "TH reads from the DUT the ESAState",
TestStep("7a", "TH reads from the DUT the ESAState",
"Value has to be 0x01 (Online)"),
TestStep("6b", "TH reads from the DUT the OptOutState",
"Value has to be 0x02 (LocalOptOut)"),
TestStep("7", "TH sends command ModifyForecastRequest with ForecastID=Forecast.ForecastID, SlotAdjustments[0].{SlotIndex=0, Duration=Forecast.Slots[0].MinDurationAdjustment}, Cause=GridOptimization",
TestStep("8", "Reset all accumulated report counts, then wait 12 seconds"),
TestStep("9", "TH counts all report transactions with an attribute report for the Forecast attribute",
"TH verifies that numberOfReportsReceived \<= 2"),
TestStep("10", "TH reads from the DUT the Forecast",
"Value has to include slots[0].MinDurationAdjustment, slots[0].MaxDurationAdjustment"),
TestStep("11", "TH sends command ModifyForecastRequest... TODO",
"Verify DUT responds w/ status SUCCESS(0x00)"),
TestStep("8", "TH counts all report transactions with an attribute report for the Forecast attribute over the next Forecast.Slots[0].MinDurationAdjustment}",
"TH verifies that numberOfReportsReceived <= 2 + Forecast.Slots[0].MinDurationAdjustment}"),
TestStep("9", "Cancel the subscription to the Forecast attribute",
"The subscription is cancelled successfully"),
TestStep("10", "TH sends TestEventTrigger command to General Diagnostics Cluster on Endpoint 0 with EnableKey field set to PIXIT.DEM.TESTEVENT_TRIGGERKEY and EventTrigger field set to PIXIT.DEM.TESTEVENTTRIGGER for Forecast Adjustment Test Event Clear",

TestStep("12", "TH resets all accumulated report counts, then TH sends command CancelRequest//// TODO"),

TestStep("13", "Wait 5 seconds"),

TestStep("13a", "TH counts all report transactions with an attribute report for the Forecast attribute_",
"TH verifies that numberOfReportsReceived >= 1 and Value has to include ForecastUpdateReason=InternalOptimization in the last attribute report received."),

TestStep("14", "Clear forecast trigger --- TODO"),
TestStep("15", "If {PICS_S_PA} {featIsNotSupported} skip to step 22 - TODO", ""),
TestStep("16", "TH sends TestEventTrigger command to General Diagnostics Cluster on Endpoint 0 with EnableKey field set to PIXIT.DEM.TESTEVENT_TRIGGERKEY and EventTrigger field set to PIXIT.DEM.TESTEVENTTRIGGER for Power Adjustment Test Event",
"Verify DUT responds w/ status SUCCESS(0x00)"),
TestStep("16b", "TH reads from the DUT the PowerAdjustmentCapability",
"Value has to include Cause=NoAdjustment."),
TestStep("17", "TH resets all accumulated report counts, then TH sends command PowerAdjustRequest with Power=PowerAdjustmentCapability[0].MaxPower, Duration=20, Cause=LocalOptimization",
"Verify DUT responds w/ status SUCCESS(0x00)"),
TestStep("18", "Wait 12 seconds"),
TestStep("18a", "TH counts all report transactions with an attribute report for the PowerAdjustmentCapability attribute",
"TH verifies that numberOfReportsReceived \<= 2"),
TestStep("19", "TH resets all accumulated report counts, then TH sends command CancelPowerAdjustment",
"Verify DUT responds w/ status SUCCESS(0x00)"),
TestStep("20", "Wait 5 seconds"),
TestStep("20a", "TH counts all report transactions with an attribute report for the PowerAdjustmentCapability attribute",
"TH verifies that numberOfReportsReceived >=1"),

TestStep("21", "TH sends TestEventTrigger command to General Diagnostics Cluster on Endpoint 0 with EnableKey field set to PIXIT.DEM.TESTEVENT_TRIGGERKEY and EventTrigger field set to PIXIT.DEM.TESTEVENTTRIGGER for Power Adjustment Test Event Clear",
"Verify DUT responds w/ status SUCCESS(0x00)"),
TestStep("22", "Cancel the subscription to the Device Energy Management cluster",
"The subscription is cancelled successfully"),
]

return steps

@async_test_body
@ async_test_body
async def test_TC_DEM_2_10(self):
# pylint: disable=too-many-locals, too-many-statements
"""Run the test steps."""
self.step("1")
# Commission DUT - already done

self.step("2")
await self.validate_feature_map([Clusters.DeviceEnergyManagement.Bitmaps.Feature.kForecastAdjustment,
Clusters.DeviceEnergyManagement.Bitmaps.Feature.kStateForecastReporting],
[Clusters.DeviceEnergyManagement.Bitmaps.Feature.kPowerForecastReporting])
feature_map = await self.read_dem_attribute_expect_success(attribute="FeatureMap")
logger.info(f"FeatureMap: {feature_map}")

has_pfr = feature_map & Clusters.DeviceEnergyManagement.Bitmaps.Feature.kPowerForecastReporting
has_sfr = feature_map & Clusters.DeviceEnergyManagement.Bitmaps.Feature.kStateForecastReporting
has_pa = feature_map & Clusters.DeviceEnergyManagement.Bitmaps.Feature.kPowerAdjustment
has_sta = feature_map & Clusters.DeviceEnergyManagement.Bitmaps.Feature.kStartTimeAdjustment
has_pau = feature_map & Clusters.DeviceEnergyManagement.Bitmaps.Feature.kPausable
has_fa = feature_map & Clusters.DeviceEnergyManagement.Bitmaps.Feature.kForecastAdjustment
has_con = feature_map & Clusters.DeviceEnergyManagement.Bitmaps.Feature.kConstraintBasedAdjustment
has_any_forecast_adjustment = has_sta | has_pau | has_fa | has_con
if has_any_forecast_adjustment:
# check it has pfr or sfr (one not both)
asserts.assert_false(has_pfr and has_sfr, "Not allowed to have both PFR and SFR features enabled!")
asserts.assert_true(has_pfr or has_sfr, "Must have either PFR or SFR with a forecast adjustment feature")

if has_pa:
asserts.assert_true(has_pfr, "Since PowerAdjustment is supported, PFR should be used, not SFR")

self.step("3")
await self.check_test_event_triggers_enabled()

self.step("4")
await self.send_test_event_trigger_forecast_adjustment()
sub_handler = ClusterAttributeChangeAccumulator(Clusters.DeviceEnergyManagement)
await sub_handler.start(self.default_controller, self.dut_node_id,
self.matter_test_config.endpoint,
min_interval_sec=0,
max_interval_sec=10, keepSubscriptions=False)

self.step("4a")
await self.check_dem_attribute("ESAState", Clusters.DeviceEnergyManagement.Enums.ESAStateEnum.kOnline)
def accumulate_reports(wait_time):
logging.info(f"Test will now wait {wait_time} seconds to accumulate reports")
time.sleep(wait_time)

self.step("4b")
forecast = await self.read_dem_attribute_expect_success(attribute="Forecast")
asserts.assert_is_not_none(forecast.slots[0].minDurationAdjustment)
asserts.assert_is_not_none(forecast.slots[0].maxDurationAdjustment)
self.step("5")
await self.send_test_event_trigger_user_opt_out_clear_all()

self.step("4c")
self.step("5a")
await self.check_dem_attribute("OptOutState", Clusters.DeviceEnergyManagement.Enums.OptOutStateEnum.kNoOptOut)

self.step("5")
sub_handler = ClusterAttributeChangeAccumulator(Clusters.DeviceEnergyManagement)
await sub_handler.start(self.default_controller, self.dut_node_id, self.matter_test_config.endpoint, keepSubscriptions=False)
sub_handler.reset()

self.step("6")
await self.send_test_event_trigger_user_opt_out_local()
if not has_fa:
self.skip_step("7")
self.skip_step("7a")
self.skip_step("8")
self.skip_step("9")
self.skip_step("10")
self.skip_step("11")
self.skip_step("12")
self.skip_step("13")
self.skip_step("13a")
self.skip_step("14")
else:
self.step("7")
await self.send_test_event_trigger_forecast_adjustment()

self.step("6a")
await self.check_dem_attribute("ESAState", Clusters.DeviceEnergyManagement.Enums.ESAStateEnum.kOnline)
self.step("7a")
await self.check_dem_attribute("ESAState", Clusters.DeviceEnergyManagement.Enums.ESAStateEnum.kOnline)

self.step("6b")
await self.check_dem_attribute("OptOutState", Clusters.DeviceEnergyManagement.Enums.OptOutStateEnum.kLocalOptOut)
self.step("8")
wait = 12 # Wait 12 seconds - the spec says we should only get reports every 10s at most, unless a command changes it
sub_handler.reset()
accumulate_reports(wait)

self.step("7")
slotAdjustments = [Clusters.DeviceEnergyManagement.Structs.SlotAdjustmentStruct(
slotIndex=0, duration=forecast.slots[0].minDurationAdjustment)]
await self.send_modify_forecast_request_command(forecast.forecastID, slotAdjustments, Clusters.DeviceEnergyManagement.Enums.AdjustmentCauseEnum.kGridOptimization, expected_status=Status.Success)
self.step("9")
count = sub_handler.attribute_report_counts[Clusters.DeviceEnergyManagement.Attributes.Forecast]
logging.info(f"Received {count} Forecast updates in {wait} seconds")
asserts.assert_less_equal(count, 2, f"Expected <= 2 Forecast updates in {wait} seconds")

self.step("8")
time.sleep(forecast.slots[0].minDurationAdjustment)
self.step("10")
forecast = await self.read_dem_attribute_expect_success(attribute="Forecast")
asserts.assert_is_not_none(forecast.slots[0].minDurationAdjustment)
asserts.assert_is_not_none(forecast.slots[0].maxDurationAdjustment)

count = sub_handler.attribute_report_counts[Clusters.DeviceEnergyManagement.Attributes.Forecast]
logging.info(f"Number of Forecast updates {count}")
asserts.assert_less_equal(count, 10, "More than 10 reports received")
self.step("11")
if has_pfr:
# we include nominalPower
slotAdjustments = [Clusters.DeviceEnergyManagement.Structs.SlotAdjustmentStruct(
slotIndex=0, duration=forecast.slots[0].maxDurationAdjustment,
nominalPower=forecast.slots[0].minPowerAdjustment)]
else:
# SFR we don't provide nominalPower
slotAdjustments = [Clusters.DeviceEnergyManagement.Structs.SlotAdjustmentStruct(
slotIndex=0, duration=forecast.slots[0].maxDurationAdjustment)]
await self.send_modify_forecast_request_command(forecast.forecastID, slotAdjustments, Clusters.DeviceEnergyManagement.Enums.AdjustmentCauseEnum.kGridOptimization, expected_status=Status.Success)

self.step("9")
await sub_handler.cancel()
self.step("12")
sub_handler.reset()
await self.send_cancel_request_command()

self.step("13")
wait = 5 # We expect a change to the forecast attribute after the cancel, Wait 5 seconds - allow time for the report to come in
accumulate_reports(wait)

self.step("13a")
count = sub_handler.attribute_report_counts[Clusters.DeviceEnergyManagement.Attributes.Forecast]
logging.info(f"Received {count} Forecast updates in {wait} seconds")
asserts.assert_greater_equal(count, 1, "Expected >= 1 Forecast updates after a cancelled operation")

self.step("14")
await self.send_test_event_trigger_forecast_adjustment_clear()

self.step("15")
if not has_pa:
self.skip_step("16")
self.skip_step("16b")
self.skip_step("17")
self.skip_step("18")
self.skip_step("18a")
self.skip_step("19")
self.skip_step("20")
self.skip_step("20a")
self.skip_step("21")
else:

self.step("16")
await self.send_test_event_trigger_power_adjustment()

self.step("16b")
powerAdjustCapabilityStruct = await self.read_dem_attribute_expect_success(attribute="PowerAdjustmentCapability")
asserts.assert_greater_equal(len(powerAdjustCapabilityStruct.powerAdjustCapability), 1)
logging.info(powerAdjustCapabilityStruct)
asserts.assert_equal(powerAdjustCapabilityStruct.cause,
Clusters.DeviceEnergyManagement.Enums.PowerAdjustReasonEnum.kNoAdjustment)

# we should expect powerAdjustCapabilityStruct to have multiple entries with different max powers, min powers, max and min durations
min_power = sys.maxsize
max_power = 0

self.step("10")
await self.send_test_event_trigger_forecast_adjustment_clear()
for entry in powerAdjustCapabilityStruct.powerAdjustCapability:
min_power = min(min_power, entry.minPower)
max_power = max(max_power, entry.maxPower)

result = f"min_power {min_power} max_power {max_power}"
logging.info(result)

self.step("17")
sub_handler.reset()
await self.send_power_adjustment_command(power=powerAdjustCapabilityStruct.powerAdjustCapability[0].maxPower,
duration=20,
cause=Clusters.DeviceEnergyManagement.Enums.AdjustmentCauseEnum.kLocalOptimization)

self.step("18")
wait = 12 # Wait 12 seconds - the spec says we should only get reports every 10s at most, unless a command changes it
accumulate_reports(wait)

self.step("18a")
count = sub_handler.attribute_report_counts[Clusters.DeviceEnergyManagement.Attributes.PowerAdjustmentCapability]
logging.info(f"Received {count} PowerAdjustmentCapability updates in {wait} seconds")
asserts.assert_less_equal(count, 2, f"Expected <= 2 PowerAdjustmentCapability updates in {wait} seconds")

self.step("19")
sub_handler.reset()
await self.send_cancel_power_adjustment_command()

self.step("20")
wait = 5 # We expect a change to the forecast attribute after the cancel, Wait 5 seconds - allow time for the report to come in
accumulate_reports(wait)

self.step("20a")
count = sub_handler.attribute_report_counts[Clusters.DeviceEnergyManagement.Attributes.PowerAdjustmentCapability]
logging.info(f"Received {count} PowerAdjustmentCapability updates in {wait} seconds")
asserts.assert_greater_equal(count, 1, "Expected >= 1 PowerAdjustmentCapability updates after a cancelled operation")

self.step("21")
await self.send_test_event_trigger_power_adjustment_clear()

self.step("22")
await sub_handler.cancel()


if __name__ == "__main__":
Expand Down

0 comments on commit 58e6bf3

Please sign in to comment.