diff --git a/data_model/1.6/device_types/AirPurifier.xml b/data_model/1.6/device_types/AirPurifier.xml
index 2b9b3f7b2532bd..a31e5388d8c797 100644
--- a/data_model/1.6/device_types/AirPurifier.xml
+++ b/data_model/1.6/device_types/AirPurifier.xml
@@ -83,4 +83,18 @@ Davis, CA 95616, USA
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/data_model/1.6/device_types/BatteryStorage.xml b/data_model/1.6/device_types/BatteryStorage.xml
index 0bcb110f5f8a59..31a41ab618182e 100644
--- a/data_model/1.6/device_types/BatteryStorage.xml
+++ b/data_model/1.6/device_types/BatteryStorage.xml
@@ -69,26 +69,180 @@ Davis, CA 95616, USA
-
+
+
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/data_model/1.6/device_types/BridgedNode.xml b/data_model/1.6/device_types/BridgedNode.xml
index c1c1770ca9e0e4..2fe80eb464bb4c 100644
--- a/data_model/1.6/device_types/BridgedNode.xml
+++ b/data_model/1.6/device_types/BridgedNode.xml
@@ -98,4 +98,9 @@ Davis, CA 95616, USA
+
+
+
+
+
diff --git a/data_model/1.6/device_types/Camera.xml b/data_model/1.6/device_types/Camera.xml
index efed03dc6bf281..3f3f49343dfe5e 100644
--- a/data_model/1.6/device_types/Camera.xml
+++ b/data_model/1.6/device_types/Camera.xml
@@ -132,4 +132,9 @@ Davis, CA 95616, USA
+
+
+
+
+
diff --git a/data_model/1.6/device_types/Closure.xml b/data_model/1.6/device_types/Closure.xml
index 89b88a69295842..3a0729f073fb19 100644
--- a/data_model/1.6/device_types/Closure.xml
+++ b/data_model/1.6/device_types/Closure.xml
@@ -76,4 +76,15 @@ Davis, CA 95616, USA
+
+
+
+
+
+
+
+
+
+
+
diff --git a/data_model/1.6/device_types/Cooktop.xml b/data_model/1.6/device_types/Cooktop.xml
index 09f014328d67f2..38ae89b5970463 100644
--- a/data_model/1.6/device_types/Cooktop.xml
+++ b/data_model/1.6/device_types/Cooktop.xml
@@ -75,4 +75,12 @@ Davis, CA 95616, USA
+
+
+
+
+
+
+
+
diff --git a/data_model/1.6/device_types/EVSE.xml b/data_model/1.6/device_types/EVSE.xml
index 4d440608f7667f..c49c8992fdd008 100644
--- a/data_model/1.6/device_types/EVSE.xml
+++ b/data_model/1.6/device_types/EVSE.xml
@@ -78,7 +78,29 @@ Davis, CA 95616, USA
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -88,19 +110,11 @@ Davis, CA 95616, USA
-
-
-
-
-
-
-
-
-
+
-
-
-
-
+
+
+
+
diff --git a/data_model/1.6/device_types/ElectricalMeter.xml b/data_model/1.6/device_types/ElectricalMeter.xml
index ee31dc4c9f8307..b07524013efdd6 100644
--- a/data_model/1.6/device_types/ElectricalMeter.xml
+++ b/data_model/1.6/device_types/ElectricalMeter.xml
@@ -76,4 +76,12 @@ Davis, CA 95616, USA
+
+
+
+
+
+
+
+
diff --git a/data_model/1.6/device_types/ExtractorHood.xml b/data_model/1.6/device_types/ExtractorHood.xml
index c55bc9abd16cfc..d8c6906ec30f5f 100644
--- a/data_model/1.6/device_types/ExtractorHood.xml
+++ b/data_model/1.6/device_types/ExtractorHood.xml
@@ -87,4 +87,9 @@ Davis, CA 95616, USA
+
+
+
+
+
diff --git a/data_model/1.6/device_types/Fan.xml b/data_model/1.6/device_types/Fan.xml
index 4e9ee06dc4f668..b5ee7a1fe296a2 100644
--- a/data_model/1.6/device_types/Fan.xml
+++ b/data_model/1.6/device_types/Fan.xml
@@ -79,4 +79,9 @@ Davis, CA 95616, USA
+
+
+
+
+
diff --git a/data_model/1.6/device_types/FloodlightCamera.xml b/data_model/1.6/device_types/FloodlightCamera.xml
index feb4927739df1c..91e1226f017724 100644
--- a/data_model/1.6/device_types/FloodlightCamera.xml
+++ b/data_model/1.6/device_types/FloodlightCamera.xml
@@ -62,4 +62,18 @@ Davis, CA 95616, USA
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/data_model/1.6/device_types/HeatPump.xml b/data_model/1.6/device_types/HeatPump.xml
index 778d66f2e4a299..901c80b6abeeb8 100644
--- a/data_model/1.6/device_types/HeatPump.xml
+++ b/data_model/1.6/device_types/HeatPump.xml
@@ -71,30 +71,93 @@ Davis, CA 95616, USA
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
-
-
-
-
-
+
-
+
diff --git a/data_model/1.6/device_types/Intercom.xml b/data_model/1.6/device_types/Intercom.xml
index 85411d329d871c..89bda06b09814c 100644
--- a/data_model/1.6/device_types/Intercom.xml
+++ b/data_model/1.6/device_types/Intercom.xml
@@ -119,4 +119,21 @@ Davis, CA 95616, USA
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/data_model/1.6/device_types/IrrigationSystem.xml b/data_model/1.6/device_types/IrrigationSystem.xml
index f7de739e013bc6..06a73b5d396196 100644
--- a/data_model/1.6/device_types/IrrigationSystem.xml
+++ b/data_model/1.6/device_types/IrrigationSystem.xml
@@ -76,4 +76,12 @@ Davis, CA 95616, USA
+
+
+
+
+
+
+
+
diff --git a/data_model/1.6/device_types/MeterReferencePoint.xml b/data_model/1.6/device_types/MeterReferencePoint.xml
index 6cce132b79f13f..dc97eaaadcbd7f 100644
--- a/data_model/1.6/device_types/MeterReferencePoint.xml
+++ b/data_model/1.6/device_types/MeterReferencePoint.xml
@@ -77,4 +77,36 @@ Davis, CA 95616, USA
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/data_model/1.6/device_types/MicrowaveOven.xml b/data_model/1.6/device_types/MicrowaveOven.xml
index 1ce8b1e0ac985a..33deabb38f3d15 100644
--- a/data_model/1.6/device_types/MicrowaveOven.xml
+++ b/data_model/1.6/device_types/MicrowaveOven.xml
@@ -98,4 +98,9 @@ Davis, CA 95616, USA
+
+
+
+
+
diff --git a/data_model/1.6/device_types/Oven.xml b/data_model/1.6/device_types/Oven.xml
index 992083b572af97..40a5d2f449b939 100644
--- a/data_model/1.6/device_types/Oven.xml
+++ b/data_model/1.6/device_types/Oven.xml
@@ -76,4 +76,18 @@ Davis, CA 95616, USA
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/data_model/1.6/device_types/Refrigerator.xml b/data_model/1.6/device_types/Refrigerator.xml
index 82be6378774aeb..c968ed41fde007 100644
--- a/data_model/1.6/device_types/Refrigerator.xml
+++ b/data_model/1.6/device_types/Refrigerator.xml
@@ -100,4 +100,12 @@ Davis, CA 95616, USA
+
+
+
+
+
+
+
+
diff --git a/data_model/1.6/device_types/RoomAirConditioner.xml b/data_model/1.6/device_types/RoomAirConditioner.xml
index 0cb9a3998a177e..1722ecde3b8a77 100644
--- a/data_model/1.6/device_types/RoomAirConditioner.xml
+++ b/data_model/1.6/device_types/RoomAirConditioner.xml
@@ -109,4 +109,12 @@ Davis, CA 95616, USA
+
+
+
+
+
+
+
+
diff --git a/data_model/1.6/device_types/RootNodeDeviceType.xml b/data_model/1.6/device_types/RootNodeDeviceType.xml
index e9db0c1cf17a1d..1731fcbbd74066 100644
--- a/data_model/1.6/device_types/RootNodeDeviceType.xml
+++ b/data_model/1.6/device_types/RootNodeDeviceType.xml
@@ -319,4 +319,14 @@ Davis, CA 95616, USA
+
+
+
+
+
+
+
+
+
+
diff --git a/data_model/1.6/device_types/SmokeCOAlarm.xml b/data_model/1.6/device_types/SmokeCOAlarm.xml
index a0e0766163b199..4c4af4557cedbe 100644
--- a/data_model/1.6/device_types/SmokeCOAlarm.xml
+++ b/data_model/1.6/device_types/SmokeCOAlarm.xml
@@ -82,4 +82,12 @@ Davis, CA 95616, USA
+
+
+
+
+
+
+
+
diff --git a/data_model/1.6/device_types/SnapshotCamera.xml b/data_model/1.6/device_types/SnapshotCamera.xml
index f39717fcff010f..a5d2ebd4003f33 100644
--- a/data_model/1.6/device_types/SnapshotCamera.xml
+++ b/data_model/1.6/device_types/SnapshotCamera.xml
@@ -105,4 +105,9 @@ Davis, CA 95616, USA
+
+
+
+
+
diff --git a/data_model/1.6/device_types/SolarPower.xml b/data_model/1.6/device_types/SolarPower.xml
index d0875d1f86470e..c03cb84a98f309 100644
--- a/data_model/1.6/device_types/SolarPower.xml
+++ b/data_model/1.6/device_types/SolarPower.xml
@@ -68,23 +68,81 @@ Davis, CA 95616, USA
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
+
+
+
+
+
+
+
+
+
+
diff --git a/data_model/1.6/device_types/ThreadBorderRouter.xml b/data_model/1.6/device_types/ThreadBorderRouter.xml
index 724a314684541d..9d14fc58a82f68 100644
--- a/data_model/1.6/device_types/ThreadBorderRouter.xml
+++ b/data_model/1.6/device_types/ThreadBorderRouter.xml
@@ -74,4 +74,9 @@ Davis, CA 95616, USA
+
+
+
+
+
diff --git a/data_model/1.6/device_types/VideoDoorbell.xml b/data_model/1.6/device_types/VideoDoorbell.xml
index 37e4693966b435..0073611aa47fff 100644
--- a/data_model/1.6/device_types/VideoDoorbell.xml
+++ b/data_model/1.6/device_types/VideoDoorbell.xml
@@ -62,4 +62,18 @@ Davis, CA 95616, USA
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/data_model/1.6/device_types/WaterHeater.xml b/data_model/1.6/device_types/WaterHeater.xml
index 5f04901b9e3d96..6646b0c04c823b 100644
--- a/data_model/1.6/device_types/WaterHeater.xml
+++ b/data_model/1.6/device_types/WaterHeater.xml
@@ -82,7 +82,20 @@ Davis, CA 95616, USA
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -92,24 +105,11 @@ Davis, CA 95616, USA
-
-
-
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
diff --git a/src/python_testing/TC_DeviceConformance.py b/src/python_testing/TC_DeviceConformance.py
index df64784d535009..984041eda69356 100644
--- a/src/python_testing/TC_DeviceConformance.py
+++ b/src/python_testing/TC_DeviceConformance.py
@@ -95,7 +95,12 @@ def test_TC_IDM_10_5(self):
success, problems = self.check_device_type(fail_on_extra_clusters,
allow_provisional_test_event_only_disallowed_for_certification)
self.problems.extend(problems)
- if not success:
+
+ composed_dt_req_success, composed_dt_req_problems = self.check_composed_device_type_requirements(
+ allow_provisional_test_event_only_disallowed_for_certification)
+ self.problems.extend(composed_dt_req_problems)
+
+ if not success or not composed_dt_req_success:
self.fail_current_test("Problems with Device type conformance on one or more endpoints")
def test_TC_IDM_10_6(self):
diff --git a/src/python_testing/matter_testing_infrastructure/matter/testing/device_conformance_tests.py b/src/python_testing/matter_testing_infrastructure/matter/testing/device_conformance_tests.py
index 28336a5aa9957e..b9f0e6dffe81f0 100644
--- a/src/python_testing/matter_testing_infrastructure/matter/testing/device_conformance_tests.py
+++ b/src/python_testing/matter_testing_infrastructure/matter/testing/device_conformance_tests.py
@@ -16,21 +16,25 @@
#
+import logging
from typing import Callable, Optional
import matter.clusters as Clusters
from matter.testing.basic_composition import BasicCompositionTests
from matter.testing.choice_conformance import (evaluate_attribute_choice_conformance, evaluate_command_choice_conformance,
evaluate_feature_choice_conformance)
-from matter.testing.conformance import EMPTY_CLUSTER_GLOBAL_ATTRIBUTES, ConformanceAssessmentData, conformance_allowed
+from matter.testing.conformance import (EMPTY_CLUSTER_GLOBAL_ATTRIBUTES, ConformanceAssessmentData, ConformanceDecision,
+ conformance_allowed)
from matter.testing.global_attribute_ids import (ClusterIdType, DeviceTypeIdType, GlobalAttributeIds, cluster_id_type,
device_type_id_type, is_valid_device_type_id)
from matter.testing.problem_notices import (AttributePathLocation, ClusterPathLocation, CommandPathLocation, DeviceTypePathLocation,
ProblemNotice, ProblemSeverity)
-from matter.testing.spec_parsing import (CommandType, PrebuiltDataModelDirectory, XmlDeviceType, XmlDeviceTypeClusterRequirements,
- build_xml_device_types, build_xml_namespaces)
+from matter.testing.spec_parsing import (CommandType, PrebuiltDataModelDirectory, XmlDeviceType, build_xml_device_types,
+ build_xml_namespaces)
from matter.tlv import uint
+logger = logging.getLogger(__name__)
+
def get_supersets(xml_device_types: dict[int, XmlDeviceType]) -> list[set[int]]:
''' Returns a list of the sets of device type id that each constitute a single superset.
@@ -344,18 +348,44 @@ def record_error(location, problem):
location, f"Expected Device type revision for device type {device_type_id} {self.xml_device_types[device_type_id].name} on endpoint {endpoint_id} does not match revision on DUT. Expected: {expected_revision} DUT: {actual_revision}")
return success, problems
+ def _check_feature_overrides(self, cluster_requirement, cluster_info, feature_map, record_error, location, device_type_desc, allow_provisional):
+ for mask, conformance in cluster_requirement.feature_overrides.items():
+ conformance_decision_with_choice = conformance(cluster_info)
+ if conformance_decision_with_choice.is_mandatory() and ((feature_map & mask) == 0):
+ record_error(
+ location=location, problem=f"Feature bit {mask.bit_length() - 1} in cluster {cluster_requirement.name} is required by element override for {device_type_desc}, but is not present in the feature map")
+ if not conformance_allowed(conformance_decision_with_choice, allow_provisional) and ((feature_map & mask) != 0):
+ record_error(
+ location=location, problem=f"Feature bit {mask.bit_length() - 1} in cluster {cluster_requirement.name} is disallowed by element override for {device_type_desc}, but is present in the feature map")
+
+ def _check_attribute_overrides(self, cluster_requirement, cluster_info, attribute_list, record_error, location, device_type_desc, allow_provisional, device_type_id, cluster_id):
+ for _id, conformance in cluster_requirement.attribute_overrides.items():
+ conformance_decision_with_choice = conformance(cluster_info)
+ if conformance_decision_with_choice.is_mandatory() and _id not in attribute_list:
+ record_error(
+ location=location, problem=f"Attribute {_id} in cluster {cluster_requirement.name} is required by element override for {device_type_desc}, but is not present in the attribute list")
+ if not conformance_allowed(conformance_decision_with_choice, allow_provisional) and _id in attribute_list:
+ if device_type_id == 0x050F and cluster_id == Clusters.Thermostat.id and _id == Clusters.Thermostat.Attributes.SystemMode.attribute_id:
+ # This is a specific problem in the water heater device type where it is specifically disallowing a thing that shouldn't be disallowed
+ # For now, ignore this requirement until the spec is fixed
+ continue
+ record_error(
+ location=location, problem=f"Attribute {_id} in cluster {cluster_requirement.name} is disallowed by element override for {device_type_desc}, but is present in the attribute list")
+
+ def _check_command_overrides(self, cluster_requirement, cluster_info, cmd_list, record_error, location, device_type_desc, allow_provisional):
+ for _id, conformance in cluster_requirement.command_overrides.items():
+ conformance_decision_with_choice = conformance(cluster_info)
+ if conformance_decision_with_choice.is_mandatory() and _id not in cmd_list:
+ record_error(
+ location=location, problem=f"Command {_id} in cluster {cluster_requirement.name} is required by element override for {device_type_desc}, but is not present in the cmd list")
+ if not conformance_allowed(conformance_decision_with_choice, allow_provisional) and _id in cmd_list:
+ record_error(
+ location=location, problem=f"Command {_id} in cluster {cluster_requirement.name} is disallowed by element override for {device_type_desc}, but is present in the cmd list")
+
def check_device_type(self, fail_on_extra_clusters: bool = True, allow_provisional_test_event_only_disallowed_for_certification: bool = False) -> tuple[bool, list[ProblemNotice]]:
success = True
problems = []
- # This is a specific problem in the 1.5 specification for water heater. For now this requirement is being removed as it is
- # disallowed to overwrite a mandatory cluster requirement to disallowed in the device type
- try:
- water_heater_id = self._get_device_type_id('Water Heater')
- except KeyError:
- # water heater isn't in the spec, so just set it to an unused ID for checks
- water_heater_id = 0
-
def record_problem(location, problem, severity):
problems.append(ProblemNotice("IDM-10.5", location, severity, problem, ""))
@@ -416,40 +446,6 @@ def record_warning(location, problem):
# Optional cluster not on this endpoint
continue
- def check_feature_overrides(cluster_requirement: XmlDeviceTypeClusterRequirements, cluster_info: ConformanceAssessmentData):
- for mask, conformance in cluster_requirement.feature_overrides.items():
- conformance_decision_with_choice = conformance(cluster_info)
- if conformance_decision_with_choice.is_mandatory() and ((feature_map & mask) == 0):
- record_error(
- location=location, problem=f"Feature bit {mask.bit_length() - 1} in cluster {cluster_requirement.name} is required by element override for device type {xml_device.name}, but is not present in the feature map")
- if not conformance_allowed(conformance_decision_with_choice, allow_provisional_test_event_only_disallowed_for_certification) and ((feature_map & mask) != 0):
- record_error(
- location=location, problem=f"Feature bit {mask.bit_length() - 1} in cluster {cluster_requirement.name} is disallowed by element override for device type {xml_device.name}, but is present in the feature map")
-
- def check_attribute_overrides(cluster_requirement: XmlDeviceTypeClusterRequirements, cluster_info: ConformanceAssessmentData) -> None:
- for _id, conformance in cluster_requirement.attribute_overrides.items():
- conformance_decision_with_choice = conformance(cluster_info)
- if conformance_decision_with_choice.is_mandatory() and _id not in attribute_list:
- record_error(
- location=location, problem=f"Attribute {_id} in cluster {cluster_requirement.name} is required by element override for device type {xml_device.name}, but is not present in the attribute list")
- if not conformance_allowed(conformance_decision_with_choice, allow_provisional_test_event_only_disallowed_for_certification) and _id in attribute_list:
- if device_type_id == water_heater_id and cluster_id == Clusters.Thermostat.id and _id == Clusters.Thermostat.Attributes.SystemMode.attribute_id:
- # This is a specific problem in the water heater device type where it is specifically disallowing a thing that shouldn't be disallowed
- # For now, ignore this requirement until the spec is fixed
- continue
- record_error(
- location=location, problem=f"Attribute {_id} in cluster {cluster_requirement.name} is disallowed by element override for device type {xml_device.name}, but is present in the attribute list")
-
- def check_command_overrides(cluster_requirement: XmlDeviceTypeClusterRequirements, cluster_info: ConformanceAssessmentData):
- for _id, conformance in cluster_requirement.command_overrides.items():
- conformance_decision_with_choice = conformance(cluster_info)
- if conformance_decision_with_choice.is_mandatory() and _id not in cmd_list:
- record_error(
- location=location, problem=f"Command {_id} in cluster {cluster_requirement.name} is required by element override for device type {xml_device.name}, but is not present in the cmd list")
- if not conformance_allowed(conformance_decision_with_choice, allow_provisional_test_event_only_disallowed_for_certification) and _id in cmd_list:
- record_error(
- location=location, problem=f"Command {_id} in cluster {cluster_requirement.name} is disallowed by element override for device type {xml_device.name}, but is present in the cmd list")
-
cluster = Clusters.ClusterObjects.ALL_CLUSTERS[cluster_id]
feature_map = endpoint[cluster][cluster.Attributes.FeatureMap]
attribute_list = endpoint[cluster][cluster.Attributes.AttributeList]
@@ -457,9 +453,12 @@ def check_command_overrides(cluster_requirement: XmlDeviceTypeClusterRequirement
revision = endpoint[cluster][cluster.Attributes.ClusterRevision]
cluster_info = ConformanceAssessmentData(feature_map, attribute_list, cmd_list, revision)
- check_feature_overrides(cluster_requirement, cluster_info)
- check_attribute_overrides(cluster_requirement, cluster_info)
- check_command_overrides(cluster_requirement, cluster_info)
+ self._check_feature_overrides(cluster_requirement, cluster_info, feature_map, record_error, location,
+ f"device type {xml_device.name}", allow_provisional_test_event_only_disallowed_for_certification)
+ self._check_attribute_overrides(cluster_requirement, cluster_info, attribute_list, record_error, location,
+ f"device type {xml_device.name}", allow_provisional_test_event_only_disallowed_for_certification, device_type_id, cluster_id)
+ self._check_command_overrides(cluster_requirement, cluster_info, cmd_list, record_error, location,
+ f"device type {xml_device.name}", allow_provisional_test_event_only_disallowed_for_certification)
# If we want to check for extra clusters on the endpoint, we need to know the entire set of clusters in all the device type
# lists across all the device types on the endpoint.
@@ -475,6 +474,176 @@ def check_command_overrides(cluster_requirement: XmlDeviceTypeClusterRequirement
return success, problems
+ def _get_candidate_endpoints(self, endpoint_id: int, location: str, parts_list: list[int]) -> list[int]:
+ if location == 'deviceEndpoint':
+ return [endpoint_id]
+ if location == 'rootEndpoint':
+ return [0]
+ if location == 'anyEndpoint':
+ return list(self.endpoints.keys())
+ if location == 'descendantEndpoint':
+ descendants = []
+
+ def add_descendants(ep):
+ if ep in self.endpoints and Clusters.Descriptor in self.endpoints[ep]:
+ pl = self.endpoints[ep][Clusters.Descriptor].get(Clusters.Descriptor.Attributes.PartsList, [])
+ for child in pl:
+ descendants.append(child)
+ add_descendants(child)
+ add_descendants(endpoint_id)
+ return descendants
+ # childEndpoint or fallback
+ return parts_list
+
+ def check_composed_device_type_requirements(self, allow_provisional: bool = False) -> tuple[bool, list[ProblemNotice]]:
+ success = True
+ problems = []
+
+ try:
+ spec_version = self.endpoints[0][Clusters.BasicInformation][Clusters.BasicInformation.Attributes.SpecificationVersion]
+ except KeyError:
+ spec_version = 0
+
+ if spec_version < 0x01060000:
+ logger.info("Skipping flat model device type requirements check: this test is not enabled for versions below 1.6")
+ return success, problems
+
+ def record_error(location, problem):
+ nonlocal success
+ problems.append(ProblemNotice("IDM-10.5", location, ProblemSeverity.ERROR, problem, ""))
+ success = False
+
+ # Evaluate composed device type requirements for each device type found
+ for endpoint_id, endpoint in self.endpoints.items():
+ if Clusters.Descriptor not in endpoint:
+ continue
+
+ device_types = endpoint[Clusters.Descriptor][Clusters.Descriptor.Attributes.DeviceTypeList]
+ for dt in device_types:
+ device_type_id = dt.deviceType
+ if device_type_id not in self.xml_device_types:
+ continue
+
+ xml_device = self.xml_device_types[device_type_id]
+ from collections import defaultdict
+ reqs_by_dt = defaultdict(list)
+ for req in xml_device.composed_device_types:
+ reqs_by_dt[req.device_type_id].append(req)
+
+ parts_list = []
+ if Clusters.Descriptor.Attributes.PartsList in endpoint[Clusters.Descriptor]:
+ parts_list = endpoint[Clusters.Descriptor][Clusters.Descriptor.Attributes.PartsList]
+
+ for req_dt_id, req_list in reqs_by_dt.items():
+ req_matches = defaultdict(list)
+ for req_idx, req in enumerate(req_list):
+ candidate_eps = self._get_candidate_endpoints(endpoint_id, req.device_type_location, parts_list)
+
+ matching_eps = []
+ for candidate_ep_id in candidate_eps:
+ if candidate_ep_id in self.endpoints:
+ candidate_ep = self.endpoints[candidate_ep_id]
+ if Clusters.Descriptor in candidate_ep:
+ dt_list = candidate_ep[Clusters.Descriptor][Clusters.Descriptor.Attributes.DeviceTypeList]
+ if any(dt.deviceType == req_dt_id for dt in dt_list):
+ matching_eps.append(candidate_ep_id)
+
+ for ep_id in matching_eps:
+ child_ep = self.endpoints[ep_id]
+ server_list = child_ep[Clusters.Descriptor].get(Clusters.Descriptor.Attributes.ServerList, [])
+
+ matches = True
+ for cid, cr in req.cluster_requirements.items():
+ cconformance = cr.conformance(EMPTY_CLUSTER_GLOBAL_ATTRIBUTES)
+ if cconformance.is_mandatory() and cid not in server_list:
+ matches = False
+ break
+ if cconformance.decision == ConformanceDecision.DISALLOWED and cid in server_list:
+ matches = False
+ break
+
+ if matches:
+ # Also check element overrides!
+ failed_override = False
+
+ def dummy_record_error(location, problem):
+ nonlocal failed_override
+ failed_override = True
+
+ override_location = DeviceTypePathLocation(endpoint_id=ep_id, device_type_id=req.device_type_id)
+
+ for cid, cr in req.cluster_requirements.items():
+ if cid not in server_list:
+ continue
+ cluster = Clusters.ClusterObjects.ALL_CLUSTERS[cid]
+ feature_map = child_ep[cluster][cluster.Attributes.FeatureMap]
+ attribute_list = child_ep[cluster][cluster.Attributes.AttributeList]
+ cmd_list = child_ep[cluster][cluster.Attributes.AcceptedCommandList]
+ revision = child_ep[cluster][cluster.Attributes.ClusterRevision]
+ cluster_info = ConformanceAssessmentData(feature_map, attribute_list, cmd_list, revision)
+
+ self._check_feature_overrides(
+ cr, cluster_info, feature_map, dummy_record_error, override_location, f"composed device type {req.device_type_name}", allow_provisional)
+ self._check_attribute_overrides(cr, cluster_info, attribute_list, dummy_record_error, override_location,
+ f"composed device type {req.device_type_name}", allow_provisional, req.device_type_id, cid)
+ self._check_command_overrides(
+ cr, cluster_info, cmd_list, dummy_record_error, override_location, f"composed device type {req.device_type_name}", allow_provisional)
+
+ if not failed_override:
+ req_matches[req_idx].append(ep_id)
+
+ # Try to satisfy all reqs with overrides first!
+ reqs_with_overrides = []
+ for r in req_list:
+ if r.cluster_requirements:
+ reqs_with_overrides.append(r)
+
+ reqs_without_overrides = [r for r in req_list if r not in reqs_with_overrides]
+
+ def satisfy_overrides(idx, assigned):
+ if idx == len(reqs_with_overrides):
+ return assigned
+ req = reqs_with_overrides[idx]
+ req_idx = req_list.index(req)
+ for ep_id in req_matches[req_idx]:
+ if ep_id not in assigned:
+ res = satisfy_overrides(idx + 1, assigned | {ep_id})
+ if res is not None:
+ return res
+ return None
+
+ assigned_for_overrides = satisfy_overrides(0, set())
+
+ location = DeviceTypePathLocation(endpoint_id=endpoint_id, device_type_id=device_type_id)
+
+ if reqs_with_overrides and assigned_for_overrides is None:
+ record_error(
+ location, f"Could not find distinct child endpoints satisfying all labeled instances of composed device type {req_list[0].device_type_name}")
+ continue
+
+ # Now check constraints on base requirements
+ total_matching = len(matching_eps)
+
+ for req in reqs_without_overrides:
+ conformance_decision = req.conformance(EMPTY_CLUSTER_GLOBAL_ATTRIBUTES)
+
+ if conformance_decision.is_mandatory() and total_matching == 0:
+ record_error(
+ location, f"Mandatory composed device type {req.device_type_name} ({req.device_type_id}) is missing in child endpoints")
+ elif not conformance_allowed(conformance_decision, allow_provisional) and total_matching > 0:
+ record_error(
+ location, f"Disallowed composed device type {req.device_type_name} ({req.device_type_id}) is present in child endpoints")
+
+ if conformance_allowed(conformance_decision, allow_provisional):
+ if req.min_instances is not None and total_matching < req.min_instances:
+ record_error(
+ location, f"Composed device type {req.device_type_name} ({req.device_type_id}) expects at least {req.min_instances} instances in child endpoints, but found {total_matching}")
+ if req.max_instances is not None and total_matching > req.max_instances:
+ record_error(
+ location, f"Composed device type {req.device_type_name} ({req.device_type_id}) expects at most {req.max_instances} instances in child endpoints, but found {total_matching}")
+
+ return success, problems
+
def check_root_endpoint_for_application_device_types(self) -> list[ProblemNotice]:
problems = []
device_types = [d.deviceType for d in self.endpoints[0][Clusters.Descriptor][Clusters.Descriptor.Attributes.DeviceTypeList]]
diff --git a/src/python_testing/matter_testing_infrastructure/matter/testing/spec_parsing.py b/src/python_testing/matter_testing_infrastructure/matter/testing/spec_parsing.py
index 4adc4528cc0ce7..1d3415d61511c6 100644
--- a/src/python_testing/matter_testing_infrastructure/matter/testing/spec_parsing.py
+++ b/src/python_testing/matter_testing_infrastructure/matter/testing/spec_parsing.py
@@ -246,6 +246,17 @@ def __str__(self) -> str:
return f"{self.name}{desc}"
+@dataclass
+class XmlComposedDeviceTypeRequirement:
+ device_type_id: int
+ device_type_name: str
+ conformance: ConformanceCallable
+ min_instances: Optional[int] = None
+ max_instances: Optional[int] = None
+ device_type_location: str = 'childEndpoint'
+ cluster_requirements: dict[uint, XmlDeviceTypeClusterRequirements] = field(default_factory=dict)
+
+
@dataclass
class XmlDeviceType:
name: str
@@ -258,6 +269,7 @@ class XmlDeviceType:
revision_desc: dict[int, str]
superset_of_device_type_name: Optional[str] = None
superset_of_device_type_id: int = 0
+ composed_device_types: list['XmlComposedDeviceTypeRequirement'] = field(default_factory=list)
def __str__(self):
msg = f'{self.name} - Revision {self.revision}, Class {self.classification_class}, Scope {self.classification_scope}\n'
@@ -1356,6 +1368,67 @@ def build_xml_namespaces(data_model_directory: typing.Union[PrebuiltDataModelDir
def parse_single_device_type(root: ElementTree.Element, cluster_definition_xml: dict[uint, XmlCluster]) -> tuple[dict[int, XmlDeviceType], list[ProblemNotice]]:
problems: list[ProblemNotice] = []
device_types: dict[int, XmlDeviceType] = {}
+
+ def append_overrides(override_element_type: str, c: ElementTree.Element, cluster: XmlDeviceTypeClusterRequirements, c_id: uint, cluster_conformance_params: ConformanceParseParameters, location, problems: list[ProblemNotice]):
+ if override_element_type == 'feature':
+ name_to_id_map = {f.name: _id for _id, f in cluster_definition_xml[c_id].features.items()}
+ name_to_id_map.update(cluster_definition_xml[c_id].feature_map)
+ override = cluster.feature_overrides
+ elif override_element_type == 'attribute':
+ name_to_id_map = cluster_definition_xml[c_id].attribute_map
+ override = cluster.attribute_overrides
+ elif override_element_type == 'command':
+ name_to_id_map = cluster_definition_xml[c_id].command_map
+ override = cluster.command_overrides
+ else:
+ problems.append(ProblemNotice("Parse Device Type XML", location=location,
+ severity=ProblemSeverity.WARNING, problem=f"Request for unknown element override type {override_element_type} - this is a script error"))
+ return
+
+ container = c.find(f'{override_element_type}s')
+ if container is None:
+ return
+
+ elements = container.iter(override_element_type)
+ for e in elements:
+ try:
+ element_name = e.attrib['name']
+ except KeyError:
+ if override_element_type == 'feature':
+ try:
+ element_name = e.attrib['code']
+ except KeyError:
+ element_name = None
+ else:
+ element_name = None
+ if element_name is None:
+ problems.append(ProblemNotice("Parse Device Type XML", location=location,
+ severity=ProblemSeverity.WARNING, problem=f"Missing {override_element_type} name for override in cluster 0x{c_id:04X}, e={str(e.attrib)}"))
+ continue
+
+ try:
+ conformance_xml, tmp_problem = get_conformance(e, c_id)
+ if tmp_problem:
+ continue
+ conformance_override = parse_callable_from_xml(conformance_xml, cluster_conformance_params)
+
+ from matter.testing.spec_parsing import _fuzzy_name, is_disallowed
+ map_id = [name_to_id_map[n] for n in name_to_id_map if _fuzzy_name(n) == _fuzzy_name(element_name)]
+ if len(map_id) == 0:
+ if is_disallowed(conformance_override):
+ import logging
+ logging.getLogger(__name__).info(
+ f"Ignoring unknown {override_element_type} {element_name} in cluster {c_id} because the conformance is disallowed")
+ continue
+ problems.append(ProblemNotice("Parse Device Type XML", location=location,
+ severity=ProblemSeverity.WARNING, problem=f"Unknown {override_element_type} {element_name} in cluster 0x{c_id:04X} - map = {map_id}"))
+ else:
+ override[map_id[0]] = conformance_override
+
+ except ConformanceException as ex:
+ problems.append(ProblemNotice("Parse Device Type XML", location=location,
+ severity=ProblemSeverity.WARNING, problem=f"Unable to parse {override_element_type} conformance for {element_name} in cluster 0x{c_id:04X} - {ex}"))
+
d = root
if d.tag == 'deviceType':
device_name = d.attrib['name']
@@ -1412,7 +1485,7 @@ def parse_single_device_type(root: ElementTree.Element, cluster_definition_xml:
for c in clusters:
try:
try:
- cid = uint(int(c.attrib['id'], 0))
+ c_id = uint(int(c.attrib['id'], 0))
except ValueError:
location = DeviceTypePathLocation(device_type_id=tid)
problems.append(ProblemNotice("Parse Device Type XML", location=location,
@@ -1420,102 +1493,100 @@ def parse_single_device_type(root: ElementTree.Element, cluster_definition_xml:
continue
# Workaround for 1.3 device types with zigbee clusters and old scenes
# This is OK because there are other tests that ensure that unknown clusters do not appear on the device
- if cid not in cluster_definition_xml:
- LOGGER.info(f"Skipping unknown cluster {cid:04X}")
+ if c_id not in cluster_definition_xml:
+ LOGGER.info(f"Skipping unknown cluster {c_id:04X}")
continue
- conformance_xml, tmp_problem = get_conformance(c, cid)
+ conformance_xml, tmp_problem = get_conformance(c, c_id)
if tmp_problem:
problems.append(tmp_problem)
continue
cluster_conformance_params = ConformanceParseParameters(
- feature_map=cluster_definition_xml[cid].feature_map, attribute_map=cluster_definition_xml[cid].attribute_map, command_map=cluster_definition_xml[cid].command_map)
+ feature_map=cluster_definition_xml[c_id].feature_map, attribute_map=cluster_definition_xml[c_id].attribute_map, command_map=cluster_definition_xml[c_id].command_map)
conformance = parse_callable_from_xml(conformance_xml, cluster_conformance_params)
side_dict = {'server': ClusterSide.SERVER, 'client': ClusterSide.CLIENT}
side = side_dict[c.attrib['side']]
cluster_name = c.attrib['name']
- if cid in CLUSTER_NAME_FIXES:
- cluster_name = CLUSTER_NAME_FIXES[cid]
+ if c_id in CLUSTER_NAME_FIXES:
+ cluster_name = CLUSTER_NAME_FIXES[c_id]
cluster = XmlDeviceTypeClusterRequirements(name=cluster_name, side=side, conformance=conformance)
- def append_overrides(override_element_type: str):
- if override_element_type == 'feature':
- # The device types use feature name rather than feature code. So we need to build a new map.
- name_to_id_map = {f.name: _id for _id, f in cluster_definition_xml[cid].features.items()}
- # But also...that's not universal, so let's be tolerant to using the code too.
- name_to_id_map.update(cluster_definition_xml[cid].feature_map)
- override = cluster.feature_overrides
- elif override_element_type == 'attribute':
- name_to_id_map = cluster_definition_xml[cid].attribute_map
- override = cluster.attribute_overrides
- elif override_element_type == 'command':
- name_to_id_map = cluster_definition_xml[cid].command_map
- override = cluster.command_overrides
- else:
- problems.append(ProblemNotice("Parse Device Type XML", location=location,
- severity=ProblemSeverity.WARNING, problem=f"Request for unknown element override type {override_element_type} - this is a script error"))
- return
-
- container = c.find(f'{override_element_type}s')
- if container is None:
- return
-
- elements = container.iter(override_element_type)
- for e in elements:
- try:
- element_name = e.attrib['name']
- except KeyError:
- if override_element_type == 'feature':
- try:
- element_name = e.attrib['code']
- except KeyError:
- element_name = None
- else:
- element_name = None
- if element_name is None:
- problems.append(ProblemNotice("Parse Device Type XML", location=location,
- severity=ProblemSeverity.WARNING, problem=f"Missing {override_element_type} name for override in cluster 0x{cid:04X}, e={str(e.attrib)}"))
- continue
-
- try:
- conformance_xml, tmp_problem = get_conformance(e, cid)
- if tmp_problem:
- # It's not actually a problem if there is no conformance override - it might be a constraint override. Just continue
- continue
- conformance_override = parse_callable_from_xml(conformance_xml, cluster_conformance_params)
-
- map_id = [name_to_id_map[n] for n in name_to_id_map if _fuzzy_name(n) ==
- _fuzzy_name(element_name)]
- if len(map_id) == 0:
- # The thermostat in particular explicitly disallows some zigbee things that don't appear in the spec due to
- # ifdefs. We can ignore problems if the device type spec disallows things that don't exist.
- if is_disallowed(conformance_override):
- LOGGER.info(
- f"Ignoring unknown {override_element_type} {element_name} in cluster {cid} because the conformance is disallowed")
- continue
- problems.append(ProblemNotice("Parse Device Type XML", location=location,
- severity=ProblemSeverity.WARNING, problem=f"Unknown {override_element_type} {element_name} in cluster 0x{cid:04X} - map = {map_id}"))
- else:
- override[map_id[0]] = conformance_override
-
- except ConformanceException as ex:
- problems.append(ProblemNotice("Parse Device Type XML", location=location,
- severity=ProblemSeverity.WARNING, problem=f"Unable to parse {override_element_type} conformance for {element_name} in cluster 0x{cid:04X} - {ex}"))
-
- append_overrides('feature')
- append_overrides('attribute')
- append_overrides('command')
+ append_overrides('feature', c, cluster, c_id, cluster_conformance_params, location, problems)
+ append_overrides('attribute', c, cluster, c_id, cluster_conformance_params, location, problems)
+ append_overrides('command', c, cluster, c_id, cluster_conformance_params, location, problems)
if side == ClusterSide.SERVER:
- device_types[tid].server_clusters[cid] = cluster
+ device_types[tid].server_clusters[c_id] = cluster
else:
- device_types[tid].client_clusters[cid] = cluster
+ device_types[tid].client_clusters[c_id] = cluster
except ConformanceException as ex:
- location = DeviceTypePathLocation(device_type_id=tid, cluster_id=cid)
+ location = DeviceTypePathLocation(device_type_id=tid, cluster_id=c_id)
problems.append(ProblemNotice("Parse Device Type XML", location=location,
severity=ProblemSeverity.WARNING, problem=f"Unable to parse conformance for cluster - {ex}"))
# NOTE: Spec currently does a bad job of matching these exactly to the names and codes
# so this will need a bit of fancy handling here to get this right.
+
+ try:
+ composed_dts = d.findall('composedDeviceTypes/deviceType')
+ for composed_dt in composed_dts:
+ composed_id = int(composed_dt.attrib['deviceTypeId'], 0)
+ composed_name = composed_dt.attrib['deviceTypeName']
+ location = DeviceTypePathLocation(device_type_id=tid)
+
+ # Conformance
+ conformance_xml, _ = get_conformance(composed_dt, uint(tid))
+
+ conformance = parse_callable_from_xml(conformance_xml, ConformanceParseParameters(
+ feature_map={}, attribute_map={}, command_map={}))
+
+ min_instances = None
+ max_instances = None
+
+ if (constraint := composed_dt.find('constraint')) is not None:
+ if (allowed := constraint.find('allowed')) is not None:
+ min_instances = max_instances = int(allowed.attrib.get('value', "0"), 0)
+ else:
+ min_el = constraint.find('min')
+ max_el = constraint.find('max')
+
+ min_instances = int(min_el.attrib.get('value', "0"), 0) if min_el is not None else None
+ max_instances = int(max_el.attrib.get('value', "0"), 0) if max_el is not None else None
+
+ cluster_requirements = {}
+ cr_el = composed_dt.find('clusterRequirements')
+ if cr_el is not None:
+ for c in cr_el.findall('cluster'):
+ c_id = uint(int(c.attrib['id'], 0))
+ c_name = c.attrib['name']
+ c_conformance_xml, _ = get_conformance(c, c_id)
+ c_conformance = parse_callable_from_xml(c_conformance_xml, ConformanceParseParameters(
+ feature_map={}, attribute_map={}, command_map={}))
+ cluster = XmlDeviceTypeClusterRequirements(name=c_name, side=ClusterSide.SERVER, conformance=c_conformance)
+ cluster_requirements[c_id] = cluster
+
+ cluster_conformance_params = ConformanceParseParameters(
+ feature_map=cluster_definition_xml[c_id].feature_map,
+ attribute_map=cluster_definition_xml[c_id].attribute_map,
+ command_map=cluster_definition_xml[c_id].command_map
+ )
+ append_overrides('feature', c, cluster, c_id, cluster_conformance_params, location, problems)
+ append_overrides('attribute', c, cluster, c_id, cluster_conformance_params, location, problems)
+ append_overrides('command', c, cluster, c_id, cluster_conformance_params, location, problems)
+
+ device_type_location = composed_dt.attrib.get('deviceTypeLocation', 'childEndpoint')
+ device_types[tid].composed_device_types.append(XmlComposedDeviceTypeRequirement(
+ device_type_id=composed_id,
+ device_type_name=composed_name,
+ conformance=conformance,
+ min_instances=min_instances,
+ max_instances=max_instances,
+ device_type_location=device_type_location,
+ cluster_requirements=cluster_requirements
+ ))
+ except Exception as e:
+ problems.append(ProblemNotice("Parse Device Type XML", location=location,
+ severity=ProblemSeverity.WARNING, problem=f"Error parsing composedDeviceTypes: {e}"))
+
return device_types, problems
diff --git a/src/python_testing/test_testing/TestComposedDeviceTypeMatching.py b/src/python_testing/test_testing/TestComposedDeviceTypeMatching.py
new file mode 100644
index 00000000000000..c8f88d9063c7be
--- /dev/null
+++ b/src/python_testing/test_testing/TestComposedDeviceTypeMatching.py
@@ -0,0 +1,954 @@
+#
+# Copyright (c) 2026 Project CHIP Authors
+# All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import logging
+from xml.etree import ElementTree
+
+from mobly import asserts
+
+import matter.clusters as Clusters
+from matter.testing.device_conformance_tests import DeviceConformanceTests
+from matter.testing.runner import default_matter_test_main
+from matter.testing.spec_parsing import (ClusterSide, ConformanceParseParameters, XmlComposedDeviceTypeRequirement, XmlDeviceType,
+ XmlDeviceTypeClusterRequirements, parse_callable_from_xml)
+
+log = logging.getLogger(__name__)
+
+
+def get_mandatory_conformance():
+ return parse_callable_from_xml(ElementTree.Element('mandatoryConform'), ConformanceParseParameters({}, {}, {}))
+
+
+def get_optional_conformance():
+ return parse_callable_from_xml(ElementTree.Element('optionalConform'), ConformanceParseParameters({}, {}, {}))
+
+
+class TestComposedDeviceTypeMatching(DeviceConformanceTests):
+ def setup_class(self):
+ # We don't load real XMLs here to isolate the tests
+ self.xml_device_types = {}
+ self.xml_clusters = {}
+ self.problems = []
+
+ def _create_mock_composed_req(self, dt_id, name, conformance, min_instances=None, max_instances=None, cluster_requirements=None, device_type_location='childEndpoint'):
+ return XmlComposedDeviceTypeRequirement(
+ device_type_id=dt_id,
+ device_type_name=name,
+ conformance=conformance,
+ min_instances=min_instances,
+ max_instances=max_instances,
+ device_type_location=device_type_location,
+ cluster_requirements=cluster_requirements or {}
+ )
+
+ # ==========================================================================
+ # Scenario 1: Simple 1-to-1 Match
+ # ==========================================================================
+ # Requirement: Parent device type requires exactly 1 instance of Child DT.
+ # Topology: Endpoint 1 has Parent DT. Endpoint 2 has Child DT listed in PartsList.
+ # Expected: PASS.
+ # ==========================================================================
+ def test_scenario_simple_match(self):
+ dt_parent_id = 0x0001
+ dt_child_id = 0x0002
+
+ # Mock spec data
+ self.xml_device_types = {
+ dt_parent_id: XmlDeviceType(
+ name="Parent Device",
+ revision=1,
+ server_clusters={},
+ client_clusters={},
+ classification_class="simple",
+ classification_scope="endpoint",
+ revision_desc={},
+ composed_device_types=[
+ self._create_mock_composed_req(dt_child_id, "Child Device",
+ get_mandatory_conformance(), min_instances=1, max_instances=1)
+ ]
+ ),
+ dt_child_id: XmlDeviceType(
+ name="Child Device",
+ revision=1,
+ server_clusters={},
+ client_clusters={},
+ classification_class="simple",
+ classification_scope="endpoint",
+ revision_desc={}
+ )
+ }
+
+ # Mock device endpoints
+ self.endpoints = {
+ 0: {
+ Clusters.BasicInformation: {
+ Clusters.BasicInformation.Attributes.SpecificationVersion: 0x01060000
+ }
+ },
+ 1: {
+ Clusters.Descriptor: {
+ Clusters.Descriptor.Attributes.DeviceTypeList: [Clusters.Descriptor.Structs.DeviceTypeStruct(deviceType=dt_parent_id, revision=1)],
+ Clusters.Descriptor.Attributes.PartsList: [2]
+ }
+ },
+ 2: {
+ Clusters.Descriptor: {
+ Clusters.Descriptor.Attributes.DeviceTypeList: [
+ Clusters.Descriptor.Structs.DeviceTypeStruct(deviceType=dt_child_id, revision=1)]
+ }
+ }
+ }
+
+ success, problems = self.check_composed_device_type_requirements()
+ for p in problems:
+ log.info(p)
+ asserts.assert_true(success, "Failure on simple 1-to-1 match scenario")
+
+ # ==========================================================================
+ # Scenario 2: Missing Mandatory Instance
+ # ==========================================================================
+ # Requirement: Parent device type requires exactly 1 instance of Child DT.
+ # Topology: Endpoint 1 has Parent DT. PartsList is empty.
+ # Expected: FAIL (Missing mandatory composed device type).
+ # ==========================================================================
+ def test_scenario_missing_mandatory(self):
+ dt_parent_id = 0x0001
+ dt_child_id = 0x0002
+
+ self.xml_device_types = {
+ dt_parent_id: XmlDeviceType(
+ name="Parent Device",
+ revision=1,
+ server_clusters={},
+ client_clusters={},
+ classification_class="simple",
+ classification_scope="endpoint",
+ revision_desc={},
+ composed_device_types=[
+ self._create_mock_composed_req(dt_child_id, "Child Device",
+ get_mandatory_conformance(), min_instances=1, max_instances=1)
+ ]
+ ),
+ dt_child_id: XmlDeviceType(
+ name="Child Device",
+ revision=1,
+ server_clusters={},
+ client_clusters={},
+ classification_class="simple",
+ classification_scope="endpoint",
+ revision_desc={}
+ )
+ }
+
+ self.endpoints = {
+ 0: {
+ Clusters.BasicInformation: {
+ Clusters.BasicInformation.Attributes.SpecificationVersion: 0x01060000
+ }
+ },
+ 1: {
+ Clusters.Descriptor: {
+ Clusters.Descriptor.Attributes.DeviceTypeList: [Clusters.Descriptor.Structs.DeviceTypeStruct(deviceType=dt_parent_id, revision=1)],
+ Clusters.Descriptor.Attributes.PartsList: []
+ }
+ }
+ }
+
+ success, problems = self.check_composed_device_type_requirements()
+ asserts.assert_false(success, "Unexpected success when mandatory composed device type is missing")
+
+ # ==========================================================================
+ # Scenario 3: Bipartite Matching (Distinct Overrides)
+ # ==========================================================================
+ # Requirement: Parent requires TWO instances of Child DT.
+ # Instance #1 requires Cluster X to be present.
+ # Instance #2 requires Cluster Y to be present.
+ # Topology: Endpoint 1 has Parent DT. PartsList = [2, 3].
+ # Endpoint 2 has Child DT and Cluster X.
+ # Endpoint 3 has Child DT and Cluster Y.
+ # Expected: PASS (The test should find a valid 1-to-1 assignment).
+ # ==========================================================================
+ def test_scenario_bipartite_matching(self):
+ dt_parent_id = 0x0001
+ dt_child_id = 0x0002
+ cluster_x_id = 0x0090
+ cluster_y_id = 0x0091
+
+ # Mock spec data
+ req_base = self._create_mock_composed_req(dt_child_id, "Child Device Base", get_mandatory_conformance(), min_instances=2)
+
+ req1 = self._create_mock_composed_req(dt_child_id, "Child Device #1", get_mandatory_conformance())
+ req1.cluster_requirements = {
+ cluster_x_id: XmlDeviceTypeClusterRequirements(
+ name="Cluster X", side=ClusterSide.SERVER, conformance=get_mandatory_conformance())
+ }
+
+ req2 = self._create_mock_composed_req(dt_child_id, "Child Device #2", get_mandatory_conformance())
+ req2.cluster_requirements = {
+ cluster_y_id: XmlDeviceTypeClusterRequirements(
+ name="Cluster Y", side=ClusterSide.SERVER, conformance=get_mandatory_conformance())
+ }
+
+ self.xml_device_types = {
+ dt_parent_id: XmlDeviceType(
+ name="Parent Device",
+ revision=1,
+ server_clusters={},
+ client_clusters={},
+ classification_class="simple",
+ classification_scope="endpoint",
+ revision_desc={},
+ composed_device_types=[req_base, req1, req2]
+ ),
+ dt_child_id: XmlDeviceType(
+ name="Child Device",
+ revision=1,
+ server_clusters={},
+ client_clusters={},
+ classification_class="simple",
+ classification_scope="endpoint",
+ revision_desc={}
+ )
+ }
+
+ # Mock device endpoints
+ self.endpoints = {
+ 0: {
+ Clusters.BasicInformation: {
+ Clusters.BasicInformation.Attributes.SpecificationVersion: 0x01060000
+ }
+ },
+ 1: {
+ Clusters.Descriptor: {
+ Clusters.Descriptor.Attributes.DeviceTypeList: [Clusters.Descriptor.Structs.DeviceTypeStruct(deviceType=dt_parent_id, revision=1)],
+ Clusters.Descriptor.Attributes.PartsList: [2, 3]
+ }
+ },
+ 2: {
+ Clusters.Descriptor: {
+ Clusters.Descriptor.Attributes.DeviceTypeList: [Clusters.Descriptor.Structs.DeviceTypeStruct(deviceType=dt_child_id, revision=1)],
+ Clusters.Descriptor.Attributes.ServerList: [cluster_x_id]
+ },
+ Clusters.ElectricalPowerMeasurement: {
+ Clusters.ElectricalPowerMeasurement.Attributes.FeatureMap: 0x01,
+ Clusters.ElectricalPowerMeasurement.Attributes.AttributeList: [],
+ Clusters.ElectricalPowerMeasurement.Attributes.AcceptedCommandList: [],
+ Clusters.ElectricalPowerMeasurement.Attributes.ClusterRevision: 1
+ }
+ },
+ 3: {
+ Clusters.Descriptor: {
+ Clusters.Descriptor.Attributes.DeviceTypeList: [Clusters.Descriptor.Structs.DeviceTypeStruct(deviceType=dt_child_id, revision=1)],
+ Clusters.Descriptor.Attributes.ServerList: [cluster_y_id]
+ },
+ Clusters.ElectricalEnergyMeasurement: {
+ Clusters.ElectricalEnergyMeasurement.Attributes.FeatureMap: 0x02,
+ Clusters.ElectricalEnergyMeasurement.Attributes.AttributeList: [],
+ Clusters.ElectricalEnergyMeasurement.Attributes.AcceptedCommandList: [],
+ Clusters.ElectricalEnergyMeasurement.Attributes.ClusterRevision: 1
+ }
+ }
+ }
+
+ success, problems = self.check_composed_device_type_requirements()
+ for p in problems:
+ log.info(p)
+ asserts.assert_true(success, "Failure on bipartite matching scenario")
+
+ # ==========================================================================
+ # Scenario 3b: Bipartite Matching (Swapped Requirements)
+ # ==========================================================================
+ # Requirement: Parent requires TWO instances of Child DT.
+ # Instance #1 requires Cluster Y to be present.
+ # Instance #2 requires Cluster X to be present.
+ # (Swapped order in composed_device_types list compared to Scenario 3)
+ # Topology: Endpoint 1 has Parent DT. PartsList = [2, 3].
+ # Endpoint 2 has Child DT and Cluster X.
+ # Endpoint 3 has Child DT and Cluster Y.
+ # Expected: PASS (The test should find a valid 1-to-1 assignment regardless of order).
+ # ==========================================================================
+ def test_scenario_bipartite_matching_swapped(self):
+ log.info("Running Scenario 3b: Bipartite Matching (Swapped Requirements)")
+ dt_parent_id = 0x0001
+ dt_child_id = 0x0002
+ cluster_x_id = 0x0090
+ cluster_y_id = 0x0091
+
+ # Mock spec data
+ req_base = self._create_mock_composed_req(dt_child_id, "Child Device Base", get_mandatory_conformance(), min_instances=2)
+
+ req1 = self._create_mock_composed_req(dt_child_id, "Child Device #1", get_mandatory_conformance())
+ req1.cluster_requirements = {
+ cluster_x_id: XmlDeviceTypeClusterRequirements(
+ name="Cluster X", side=ClusterSide.SERVER, conformance=get_mandatory_conformance())
+ }
+
+ req2 = self._create_mock_composed_req(dt_child_id, "Child Device #2", get_mandatory_conformance())
+ req2.cluster_requirements = {
+ cluster_y_id: XmlDeviceTypeClusterRequirements(
+ name="Cluster Y", side=ClusterSide.SERVER, conformance=get_mandatory_conformance())
+ }
+
+ self.xml_device_types = {
+ dt_parent_id: XmlDeviceType(
+ name="Parent Device",
+ revision=1,
+ server_clusters={},
+ client_clusters={},
+ classification_class="simple",
+ classification_scope="endpoint",
+ revision_desc={},
+ composed_device_types=[req_base, req2, req1] # Swapped!
+ ),
+ dt_child_id: XmlDeviceType(
+ name="Child Device",
+ revision=1,
+ server_clusters={},
+ client_clusters={},
+ classification_class="simple",
+ classification_scope="endpoint",
+ revision_desc={}
+ )
+ }
+
+ # Mock device endpoints
+ self.endpoints = {
+ 0: {
+ Clusters.BasicInformation: {
+ Clusters.BasicInformation.Attributes.SpecificationVersion: 0x01060000
+ }
+ },
+ 1: {
+ Clusters.Descriptor: {
+ Clusters.Descriptor.Attributes.DeviceTypeList: [Clusters.Descriptor.Structs.DeviceTypeStruct(deviceType=dt_parent_id, revision=1)],
+ Clusters.Descriptor.Attributes.PartsList: [2, 3]
+ }
+ },
+ 2: {
+ Clusters.Descriptor: {
+ Clusters.Descriptor.Attributes.DeviceTypeList: [Clusters.Descriptor.Structs.DeviceTypeStruct(deviceType=dt_child_id, revision=1)],
+ Clusters.Descriptor.Attributes.ServerList: [cluster_x_id]
+ },
+ Clusters.ElectricalPowerMeasurement: {
+ Clusters.ElectricalPowerMeasurement.Attributes.FeatureMap: 0x01,
+ Clusters.ElectricalPowerMeasurement.Attributes.AttributeList: [],
+ Clusters.ElectricalPowerMeasurement.Attributes.AcceptedCommandList: [],
+ Clusters.ElectricalPowerMeasurement.Attributes.ClusterRevision: 1
+ }
+ },
+ 3: {
+ Clusters.Descriptor: {
+ Clusters.Descriptor.Attributes.DeviceTypeList: [Clusters.Descriptor.Structs.DeviceTypeStruct(deviceType=dt_child_id, revision=1)],
+ Clusters.Descriptor.Attributes.ServerList: [cluster_y_id]
+ },
+ Clusters.ElectricalEnergyMeasurement: {
+ Clusters.ElectricalEnergyMeasurement.Attributes.FeatureMap: 0x02,
+ Clusters.ElectricalEnergyMeasurement.Attributes.AttributeList: [],
+ Clusters.ElectricalEnergyMeasurement.Attributes.AcceptedCommandList: [],
+ Clusters.ElectricalEnergyMeasurement.Attributes.ClusterRevision: 1
+ }
+ }
+ }
+
+ success, problems = self.check_composed_device_type_requirements()
+ for p in problems:
+ log.info(p)
+ asserts.assert_true(success, "Failure on bipartite matching swapped scenario")
+
+ # ==========================================================================
+ # Scenario 4: Bipartite Matching Conflict
+ # ==========================================================================
+ # Requirement: Parent requires TWO instances of Child DT.
+ # Instance #1 requires Cluster X.
+ # Instance #2 requires Cluster Y.
+ # Topology: Endpoint 1 has Parent DT. PartsList = [2, 3].
+ # Endpoint 2 has Child DT and Cluster X.
+ # Endpoint 3 has Child DT and Cluster X (Conflict!).
+ # Expected: FAIL (Could not find distinct child endpoints satisfying all labeled instances).
+ # ==========================================================================
+ def test_scenario_bipartite_conflict(self):
+ dt_parent_id = 0x0001
+ dt_child_id = 0x0002
+ cluster_x_id = 0x0090
+ cluster_y_id = 0x0091
+
+ # Mock spec data
+ req1 = self._create_mock_composed_req(dt_child_id, "Child Device #1", get_mandatory_conformance())
+ req1.cluster_requirements = {
+ cluster_x_id: XmlDeviceTypeClusterRequirements(
+ name="Cluster X", side=ClusterSide.SERVER, conformance=get_mandatory_conformance())
+ }
+
+ req2 = self._create_mock_composed_req(dt_child_id, "Child Device #2", get_mandatory_conformance())
+ req2.cluster_requirements = {
+ cluster_y_id: XmlDeviceTypeClusterRequirements(
+ name="Cluster Y", side=ClusterSide.SERVER, conformance=get_mandatory_conformance())
+ }
+
+ self.xml_device_types = {
+ dt_parent_id: XmlDeviceType(
+ name="Parent Device",
+ revision=1,
+ server_clusters={},
+ client_clusters={},
+ classification_class="simple",
+ classification_scope="endpoint",
+ revision_desc={},
+ composed_device_types=[req1, req2]
+ ),
+ dt_child_id: XmlDeviceType(
+ name="Child Device",
+ revision=1,
+ server_clusters={},
+ client_clusters={},
+ classification_class="simple",
+ classification_scope="endpoint",
+ revision_desc={}
+ )
+ }
+
+ # Mock device endpoints
+ self.endpoints = {
+ 0: {
+ Clusters.BasicInformation: {
+ Clusters.BasicInformation.Attributes.SpecificationVersion: 0x01060000
+ }
+ },
+ 1: {
+ Clusters.Descriptor: {
+ Clusters.Descriptor.Attributes.DeviceTypeList: [Clusters.Descriptor.Structs.DeviceTypeStruct(deviceType=dt_parent_id, revision=1)],
+ Clusters.Descriptor.Attributes.PartsList: [2, 3]
+ }
+ },
+ 2: {
+ Clusters.Descriptor: {
+ Clusters.Descriptor.Attributes.DeviceTypeList: [Clusters.Descriptor.Structs.DeviceTypeStruct(deviceType=dt_child_id, revision=1)],
+ Clusters.Descriptor.Attributes.ServerList: [cluster_x_id]
+ },
+ Clusters.ElectricalPowerMeasurement: {
+ Clusters.ElectricalPowerMeasurement.Attributes.FeatureMap: 0x01,
+ Clusters.ElectricalPowerMeasurement.Attributes.AttributeList: [],
+ Clusters.ElectricalPowerMeasurement.Attributes.AcceptedCommandList: [],
+ Clusters.ElectricalPowerMeasurement.Attributes.ClusterRevision: 1
+ }
+ },
+ 3: {
+ Clusters.Descriptor: {
+ Clusters.Descriptor.Attributes.DeviceTypeList: [Clusters.Descriptor.Structs.DeviceTypeStruct(deviceType=dt_child_id, revision=1)],
+ Clusters.Descriptor.Attributes.ServerList: [cluster_x_id] # Conflict!
+ },
+ Clusters.ElectricalPowerMeasurement: {
+ Clusters.ElectricalPowerMeasurement.Attributes.FeatureMap: 0x01,
+ Clusters.ElectricalPowerMeasurement.Attributes.AttributeList: [],
+ Clusters.ElectricalPowerMeasurement.Attributes.AcceptedCommandList: [],
+ Clusters.ElectricalPowerMeasurement.Attributes.ClusterRevision: 1
+ }
+ }
+ }
+
+ success, problems = self.check_composed_device_type_requirements()
+ asserts.assert_false(success, "Unexpected success in bipartite conflict scenario")
+
+ # ==========================================================================
+ # Scenario 5: Anywhere Location Match
+ # ==========================================================================
+
+ def test_scenario_anywhere_location(self):
+ log.info("Running Scenario 5: Anywhere Location Match")
+
+ dt_parent_id = 0x0001
+ dt_child_id = 0x0002
+ dt_power_source_id = 0x0011
+
+ # Mock spec
+ self.xml_device_types = {
+ dt_parent_id: XmlDeviceType(
+ name="Parent Device",
+ revision=1,
+ server_clusters={},
+ client_clusters={},
+ classification_class="Simple",
+ classification_scope="Endpoint",
+ revision_desc={},
+ composed_device_types=[
+ self._create_mock_composed_req(dt_child_id, "Child Device", get_mandatory_conformance()),
+ self._create_mock_composed_req(dt_power_source_id, "Power Source",
+ get_mandatory_conformance(), device_type_location='anyEndpoint')
+ ]
+ ),
+ dt_child_id: XmlDeviceType(name="Child Device", revision=1, server_clusters={}, client_clusters={}, classification_class="Simple", classification_scope="Endpoint", revision_desc={}),
+ dt_power_source_id: XmlDeviceType(name="Power Source", revision=1, server_clusters={}, client_clusters={
+ }, classification_class="Utility", classification_scope="Endpoint", revision_desc={})
+ }
+
+ # Mock endpoints
+ # EP1: Parent Device
+ # EP2: Child Device (direct child of EP1)
+ # EP3: Power Source (NOT in EP1 parts list!)
+
+ self.endpoints = {
+ 0: {
+ Clusters.BasicInformation: {
+ Clusters.BasicInformation.Attributes.SpecificationVersion: 0x01060000
+ }
+ },
+ 1: {
+ Clusters.Descriptor: {
+ Clusters.Descriptor.Attributes.DeviceTypeList: [Clusters.Descriptor.Structs.DeviceTypeStruct(deviceType=dt_parent_id, revision=1)],
+ Clusters.Descriptor.Attributes.PartsList: [2] # Only EP2 is a child!
+ }
+ },
+ 2: {
+ Clusters.Descriptor: {
+ Clusters.Descriptor.Attributes.DeviceTypeList: [Clusters.Descriptor.Structs.DeviceTypeStruct(deviceType=dt_child_id, revision=1)],
+ Clusters.Descriptor.Attributes.PartsList: []
+ }
+ },
+ 3: {
+ Clusters.Descriptor: {
+ Clusters.Descriptor.Attributes.DeviceTypeList: [Clusters.Descriptor.Structs.DeviceTypeStruct(deviceType=dt_power_source_id, revision=1)],
+ Clusters.Descriptor.Attributes.PartsList: []
+ }
+ }
+ }
+
+ success, problems = self.check_composed_device_type_requirements()
+ for p in problems:
+ log.info(p)
+ asserts.assert_true(success, "Failure in anywhere location scenario")
+
+ # ==========================================================================
+ # Scenario 6: Self Location Match
+ # ==========================================================================
+
+ def test_scenario_self_location(self):
+ log.info("Running Scenario 6: Self Location Match")
+
+ dt_parent_id = 0x0001
+ dt_power_source_id = 0x0011
+
+ # Mock spec
+ self.xml_device_types = {
+ dt_parent_id: XmlDeviceType(
+ name="Parent Device",
+ revision=1,
+ server_clusters={},
+ client_clusters={},
+ classification_class="Simple",
+ classification_scope="Endpoint",
+ revision_desc={},
+ composed_device_types=[
+ self._create_mock_composed_req(dt_power_source_id, "Power Source",
+ get_mandatory_conformance(), device_type_location='deviceEndpoint')
+ ]
+ ),
+ dt_power_source_id: XmlDeviceType(name="Power Source", revision=1, server_clusters={}, client_clusters={
+ }, classification_class="Utility", classification_scope="Endpoint", revision_desc={})
+ }
+
+ # Mock endpoints
+ # EP1: Parent Device AND Power Source!
+
+ self.endpoints = {
+ 0: {
+ Clusters.BasicInformation: {
+ Clusters.BasicInformation.Attributes.SpecificationVersion: 0x01060000
+ }
+ },
+ 1: {
+ Clusters.Descriptor: {
+ Clusters.Descriptor.Attributes.DeviceTypeList: [
+ Clusters.Descriptor.Structs.DeviceTypeStruct(deviceType=dt_parent_id, revision=1),
+ Clusters.Descriptor.Structs.DeviceTypeStruct(deviceType=dt_power_source_id, revision=1)
+ ],
+ Clusters.Descriptor.Attributes.PartsList: []
+ }
+ }
+ }
+
+ success, problems = self.check_composed_device_type_requirements()
+ for p in problems:
+ log.info(p)
+ asserts.assert_true(success, "Failure in self location scenario (positive case)")
+
+ # Now make it fail by removing Power Source from EP1 and adding to EP2 (child)
+ self.endpoints[1][Clusters.Descriptor][Clusters.Descriptor.Attributes.DeviceTypeList] = [
+ Clusters.Descriptor.Structs.DeviceTypeStruct(deviceType=dt_parent_id, revision=1)
+ ]
+ self.endpoints[1][Clusters.Descriptor][Clusters.Descriptor.Attributes.PartsList] = [2]
+
+ self.endpoints[2] = {
+ Clusters.Descriptor: {
+ Clusters.Descriptor.Attributes.DeviceTypeList: [Clusters.Descriptor.Structs.DeviceTypeStruct(deviceType=dt_power_source_id, revision=1)],
+ Clusters.Descriptor.Attributes.PartsList: []
+ }
+ }
+
+ success, problems = self.check_composed_device_type_requirements()
+ asserts.assert_false(success, "Unexpected success in self location scenario (negative case)")
+
+ # ==========================================================================
+ # Scenario 7: Scattered Matching (Cyclic)
+ # ==========================================================================
+ # Requirement: Parent requires THREE instances of Child DT.
+ # Instance #1 requires Cluster X.
+ # Instance #2 requires Cluster Y.
+ # Instance #3 requires Cluster Z.
+ # Topology: Endpoint 1 has Parent DT. PartsList = [2, 3, 4, 5, 6].
+ # Endpoint 2 has Child DT and Clusters X, Y.
+ # Endpoint 3 has Child DT and Clusters Y, Z.
+ # Endpoint 4 has Child DT and Clusters Z, X.
+ # Endpoint 5 has Child DT (Base).
+ # Endpoint 6 has Child DT (Base).
+ # Expected: PASS (The test should find a valid assignment resolving overlapping capabilities).
+ # ==========================================================================
+
+ def test_scenario_scattered_matching(self):
+ log.info("Running Scenario 7: Scattered Matching (Cyclic)")
+
+ dt_parent_id = 0x0001
+ dt_child_id = 0x0002
+ cluster_x_id = 0x0090
+ cluster_y_id = 0x0091
+ cluster_z_id = 0x0201 # Thermostat
+
+ # Mock spec data
+ req_base = self._create_mock_composed_req(dt_child_id, "Child Device Base", get_mandatory_conformance(), min_instances=3)
+
+ req1 = self._create_mock_composed_req(dt_child_id, "Child Device #1", get_mandatory_conformance())
+ req1.cluster_requirements = {
+ cluster_x_id: XmlDeviceTypeClusterRequirements(
+ name="Cluster X", side=ClusterSide.SERVER, conformance=get_mandatory_conformance())
+ }
+
+ req2 = self._create_mock_composed_req(dt_child_id, "Child Device #2", get_mandatory_conformance())
+ req2.cluster_requirements = {
+ cluster_y_id: XmlDeviceTypeClusterRequirements(
+ name="Cluster Y", side=ClusterSide.SERVER, conformance=get_mandatory_conformance())
+ }
+
+ req3 = self._create_mock_composed_req(dt_child_id, "Child Device #3", get_mandatory_conformance())
+ req3.cluster_requirements = {
+ cluster_z_id: XmlDeviceTypeClusterRequirements(
+ name="Cluster Z", side=ClusterSide.SERVER, conformance=get_mandatory_conformance())
+ }
+
+ self.xml_device_types = {
+ dt_parent_id: XmlDeviceType(
+ name="Parent Device",
+ revision=1,
+ server_clusters={},
+ client_clusters={},
+ classification_class="simple",
+ classification_scope="endpoint",
+ revision_desc={},
+ composed_device_types=[req_base, req1, req2, req3]
+ ),
+ dt_child_id: XmlDeviceType(name="Child Device", revision=1, server_clusters={}, client_clusters={},
+ classification_class="simple", classification_scope="endpoint", revision_desc={})
+ }
+
+ # Mock device endpoints
+ self.endpoints = {
+ 0: {
+ Clusters.BasicInformation: {
+ Clusters.BasicInformation.Attributes.SpecificationVersion: 0x01060000
+ }
+ },
+ 1: {
+ Clusters.Descriptor: {
+ Clusters.Descriptor.Attributes.DeviceTypeList: [Clusters.Descriptor.Structs.DeviceTypeStruct(deviceType=dt_parent_id, revision=1)],
+ Clusters.Descriptor.Attributes.PartsList: [2, 3, 4, 5, 6]
+ }
+ },
+ 2: { # Has X and Y
+ Clusters.Descriptor: {
+ Clusters.Descriptor.Attributes.DeviceTypeList: [Clusters.Descriptor.Structs.DeviceTypeStruct(deviceType=dt_child_id, revision=1)],
+ Clusters.Descriptor.Attributes.ServerList: [cluster_x_id, cluster_y_id]
+ },
+ Clusters.ElectricalPowerMeasurement: {
+ Clusters.ElectricalPowerMeasurement.Attributes.FeatureMap: 0x01,
+ Clusters.ElectricalPowerMeasurement.Attributes.AttributeList: [],
+ Clusters.ElectricalPowerMeasurement.Attributes.AcceptedCommandList: [],
+ Clusters.ElectricalPowerMeasurement.Attributes.ClusterRevision: 1
+ },
+ Clusters.ElectricalEnergyMeasurement: {
+ Clusters.ElectricalEnergyMeasurement.Attributes.FeatureMap: 0x02,
+ Clusters.ElectricalEnergyMeasurement.Attributes.AttributeList: [],
+ Clusters.ElectricalEnergyMeasurement.Attributes.AcceptedCommandList: [],
+ Clusters.ElectricalEnergyMeasurement.Attributes.ClusterRevision: 1
+ }
+ },
+ 3: { # Has Y and Z
+ Clusters.Descriptor: {
+ Clusters.Descriptor.Attributes.DeviceTypeList: [Clusters.Descriptor.Structs.DeviceTypeStruct(deviceType=dt_child_id, revision=1)],
+ Clusters.Descriptor.Attributes.ServerList: [cluster_y_id, cluster_z_id]
+ },
+ Clusters.ElectricalEnergyMeasurement: {
+ Clusters.ElectricalEnergyMeasurement.Attributes.FeatureMap: 0x02,
+ Clusters.ElectricalEnergyMeasurement.Attributes.AttributeList: [],
+ Clusters.ElectricalEnergyMeasurement.Attributes.AcceptedCommandList: [],
+ Clusters.ElectricalEnergyMeasurement.Attributes.ClusterRevision: 1
+ },
+ Clusters.Thermostat: {
+ Clusters.Thermostat.Attributes.FeatureMap: 0x03,
+ Clusters.Thermostat.Attributes.AttributeList: [],
+ Clusters.Thermostat.Attributes.AcceptedCommandList: [],
+ Clusters.Thermostat.Attributes.ClusterRevision: 1
+ }
+ },
+ 4: { # Has Z and X
+ Clusters.Descriptor: {
+ Clusters.Descriptor.Attributes.DeviceTypeList: [Clusters.Descriptor.Structs.DeviceTypeStruct(deviceType=dt_child_id, revision=1)],
+ Clusters.Descriptor.Attributes.ServerList: [cluster_z_id, cluster_x_id]
+ },
+ Clusters.Thermostat: {
+ Clusters.Thermostat.Attributes.FeatureMap: 0x03,
+ Clusters.Thermostat.Attributes.AttributeList: [],
+ Clusters.Thermostat.Attributes.AcceptedCommandList: [],
+ Clusters.Thermostat.Attributes.ClusterRevision: 1
+ },
+ Clusters.ElectricalPowerMeasurement: {
+ Clusters.ElectricalPowerMeasurement.Attributes.FeatureMap: 0x01,
+ Clusters.ElectricalPowerMeasurement.Attributes.AttributeList: [],
+ Clusters.ElectricalPowerMeasurement.Attributes.AcceptedCommandList: [],
+ Clusters.ElectricalPowerMeasurement.Attributes.ClusterRevision: 1
+ }
+ },
+ 5: { # Base
+ Clusters.Descriptor: {
+ Clusters.Descriptor.Attributes.DeviceTypeList: [Clusters.Descriptor.Structs.DeviceTypeStruct(deviceType=dt_child_id, revision=1)],
+ Clusters.Descriptor.Attributes.ServerList: []
+ }
+ },
+ 6: { # Base
+ Clusters.Descriptor: {
+ Clusters.Descriptor.Attributes.DeviceTypeList: [Clusters.Descriptor.Structs.DeviceTypeStruct(deviceType=dt_child_id, revision=1)],
+ Clusters.Descriptor.Attributes.ServerList: []
+ }
+ }
+ }
+
+ success, problems = self.check_composed_device_type_requirements()
+ for p in problems:
+ log.info(p)
+ asserts.assert_true(success, "Failure on scattered matching scenario")
+
+ # ==========================================================================
+ # Scenario 8: Complex Location Matching
+ # ==========================================================================
+ # Requirement: Parent requires:
+ # - 1 instance of Child DT on childEndpoint (direct child).
+ # - 1 instance of Power Source on anywhere (any endpoint).
+ # - 1 instance of Thermostat on deviceEndpoint (same endpoint).
+ # Topology (Positive):
+ # Endpoint 1 has Parent DT and Thermostat. PartsList = [2].
+ # Endpoint 2 has Child DT.
+ # Endpoint 3 has Power Source (Standalone).
+ # Expected: PASS
+ # Topology (Negative):
+ # Move Thermostat to Endpoint 2 (Child).
+ # Expected: FAIL (Thermostat is not on Self).
+ # ==========================================================================
+
+ def test_scenario_complex_location_matching(self):
+ log.info("Running Scenario 8: Complex Location Matching")
+
+ dt_parent_id = 0x0001
+ dt_child_id = 0x0002
+ dt_power_source_id = 0x0011
+ dt_thermostat_id = 0x0301
+
+ # Mock spec
+ self.xml_device_types = {
+ dt_parent_id: XmlDeviceType(
+ name="Parent Device",
+ revision=1,
+ server_clusters={},
+ client_clusters={},
+ classification_class="simple",
+ classification_scope="endpoint",
+ revision_desc={},
+ composed_device_types=[
+ self._create_mock_composed_req(dt_child_id, "Child Device", get_mandatory_conformance()),
+ self._create_mock_composed_req(dt_power_source_id, "Power Source",
+ get_mandatory_conformance(), device_type_location='anyEndpoint'),
+ self._create_mock_composed_req(dt_thermostat_id, "Thermostat",
+ get_mandatory_conformance(), device_type_location='deviceEndpoint')
+ ]
+ ),
+ dt_child_id: XmlDeviceType(name="Child Device", revision=1, server_clusters={}, client_clusters={}, classification_class="simple", classification_scope="endpoint", revision_desc={}),
+ dt_power_source_id: XmlDeviceType(name="Power Source", revision=1, server_clusters={}, client_clusters={}, classification_class="Utility", classification_scope="endpoint", revision_desc={}),
+ dt_thermostat_id: XmlDeviceType(name="Thermostat", revision=1, server_clusters={}, client_clusters={
+ }, classification_class="Simple", classification_scope="endpoint", revision_desc={})
+ }
+
+ # Mock endpoints
+ # EP1: Parent Device AND Thermostat!
+ # EP2: Child Device (direct child of EP1)
+ # EP3: Power Source (Standalone)
+
+ self.endpoints = {
+ 0: {
+ Clusters.BasicInformation: {
+ Clusters.BasicInformation.Attributes.SpecificationVersion: 0x01060000
+ }
+ },
+ 1: {
+ Clusters.Descriptor: {
+ Clusters.Descriptor.Attributes.DeviceTypeList: [
+ Clusters.Descriptor.Structs.DeviceTypeStruct(deviceType=dt_parent_id, revision=1),
+ Clusters.Descriptor.Structs.DeviceTypeStruct(deviceType=dt_thermostat_id, revision=1)
+ ],
+ Clusters.Descriptor.Attributes.PartsList: [2]
+ }
+ },
+ 2: {
+ Clusters.Descriptor: {
+ Clusters.Descriptor.Attributes.DeviceTypeList: [Clusters.Descriptor.Structs.DeviceTypeStruct(deviceType=dt_child_id, revision=1)],
+ Clusters.Descriptor.Attributes.PartsList: []
+ }
+ },
+ 3: {
+ Clusters.Descriptor: {
+ Clusters.Descriptor.Attributes.DeviceTypeList: [Clusters.Descriptor.Structs.DeviceTypeStruct(deviceType=dt_power_source_id, revision=1)],
+ Clusters.Descriptor.Attributes.PartsList: []
+ }
+ }
+ }
+
+ success, problems = self.check_composed_device_type_requirements()
+ for p in problems:
+ log.info(p)
+ asserts.assert_true(success, "Failure in complex location scenario (positive case)")
+
+ # Break it by moving Thermostat to EP2
+ self.endpoints[1][Clusters.Descriptor][Clusters.Descriptor.Attributes.DeviceTypeList] = [
+ Clusters.Descriptor.Structs.DeviceTypeStruct(deviceType=dt_parent_id, revision=1)
+ ]
+ self.endpoints[2][Clusters.Descriptor][Clusters.Descriptor.Attributes.DeviceTypeList] = [
+ Clusters.Descriptor.Structs.DeviceTypeStruct(deviceType=dt_child_id, revision=1),
+ Clusters.Descriptor.Structs.DeviceTypeStruct(deviceType=dt_thermostat_id, revision=1)
+ ]
+
+ success, problems = self.check_composed_device_type_requirements()
+ asserts.assert_false(success, "Unexpected success in complex location scenario (negative case)")
+
+ # ==========================================================================
+ # Scenario 9: Element Overrides (Feature)
+ # ==========================================================================
+ # Requirement: Parent requires 1 instance of Child DT.
+ # That instance requires Feature bit 0 to be enabled on Cluster X.
+ # Topology (Negative):
+ # Endpoint 1 has Parent DT. PartsList = [2].
+ # Endpoint 2 has Child DT and Cluster X (FeatureMap = 0).
+ # Expected: FAIL (Feature requirement not met).
+ # Topology (Positive):
+ # Endpoint 1 has Parent DT. PartsList = [3].
+ # Endpoint 3 has Child DT and Cluster X (FeatureMap = 1).
+ # Expected: PASS
+ # ==========================================================================
+
+ def test_scenario_element_overrides(self):
+ log.info("Running Scenario 9: Element Overrides (Feature)")
+
+ dt_parent_id = 0x0001
+ dt_child_id = 0x0002
+ cluster_x_id = 0x0090 # ElectricalPowerMeasurement
+
+ # Mock spec data
+ req = self._create_mock_composed_req(dt_child_id, "Child Device", get_mandatory_conformance())
+ req.cluster_requirements = {
+ cluster_x_id: XmlDeviceTypeClusterRequirements(
+ name="Cluster X",
+ side=ClusterSide.SERVER,
+ conformance=get_mandatory_conformance(),
+ feature_overrides={0x01: get_mandatory_conformance()} # Require bit 0!
+ )
+ }
+
+ self.xml_device_types = {
+ dt_parent_id: XmlDeviceType(
+ name="Parent Device",
+ revision=1,
+ server_clusters={},
+ client_clusters={},
+ classification_class="simple",
+ classification_scope="endpoint",
+ revision_desc={},
+ composed_device_types=[req]
+ ),
+ dt_child_id: XmlDeviceType(name="Child Device", revision=1, server_clusters={}, client_clusters={},
+ classification_class="simple", classification_scope="endpoint", revision_desc={})
+ }
+
+ # Mock device endpoints
+ # EP1: Parent Device
+ # EP2: Child Device (has cluster X, but FeatureMap = 0!) -> Should fail!
+
+ self.endpoints = {
+ 0: {
+ Clusters.BasicInformation: {
+ Clusters.BasicInformation.Attributes.SpecificationVersion: 0x01060000
+ }
+ },
+ 1: {
+ Clusters.Descriptor: {
+ Clusters.Descriptor.Attributes.DeviceTypeList: [Clusters.Descriptor.Structs.DeviceTypeStruct(deviceType=dt_parent_id, revision=1)],
+ Clusters.Descriptor.Attributes.PartsList: [2]
+ }
+ },
+ 2: {
+ Clusters.Descriptor: {
+ Clusters.Descriptor.Attributes.DeviceTypeList: [Clusters.Descriptor.Structs.DeviceTypeStruct(deviceType=dt_child_id, revision=1)],
+ Clusters.Descriptor.Attributes.ServerList: [cluster_x_id]
+ },
+ Clusters.ElectricalPowerMeasurement: {
+ Clusters.ElectricalPowerMeasurement.Attributes.FeatureMap: 0x00, # No features!
+ Clusters.ElectricalPowerMeasurement.Attributes.AttributeList: [],
+ Clusters.ElectricalPowerMeasurement.Attributes.AcceptedCommandList: [],
+ Clusters.ElectricalPowerMeasurement.Attributes.ClusterRevision: 1
+ }
+ }
+ }
+
+ success, problems = self.check_composed_device_type_requirements()
+ asserts.assert_false(success, "Unexpected success when feature override is not met")
+
+ # Now add EP3 which meets the requirement
+ self.endpoints[1][Clusters.Descriptor][Clusters.Descriptor.Attributes.PartsList] = [3]
+ self.endpoints[3] = {
+ Clusters.Descriptor: {
+ Clusters.Descriptor.Attributes.DeviceTypeList: [Clusters.Descriptor.Structs.DeviceTypeStruct(deviceType=dt_child_id, revision=1)],
+ Clusters.Descriptor.Attributes.ServerList: [cluster_x_id]
+ },
+ Clusters.ElectricalPowerMeasurement: {
+ Clusters.ElectricalPowerMeasurement.Attributes.FeatureMap: 0x01, # Has feature bit 0!
+ Clusters.ElectricalPowerMeasurement.Attributes.AttributeList: [],
+ Clusters.ElectricalPowerMeasurement.Attributes.AcceptedCommandList: [],
+ Clusters.ElectricalPowerMeasurement.Attributes.ClusterRevision: 1
+ }
+ }
+
+ success, problems = self.check_composed_device_type_requirements()
+ for p in problems:
+ log.info(p)
+ asserts.assert_true(success, "Failure when feature override is met")
+
+
+if __name__ == "__main__":
+ default_matter_test_main()
diff --git a/src/python_testing/test_testing/TestSpecParsingComposedDeviceTypes.py b/src/python_testing/test_testing/TestSpecParsingComposedDeviceTypes.py
new file mode 100644
index 00000000000000..8dfbe80c8b3e32
--- /dev/null
+++ b/src/python_testing/test_testing/TestSpecParsingComposedDeviceTypes.py
@@ -0,0 +1,177 @@
+#
+# Copyright (c) 2024 Project CHIP Authors
+# All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import logging
+from xml.etree import ElementTree
+
+from mobly import asserts
+
+import matter.clusters as Clusters
+from matter.testing.device_conformance_tests import DeviceConformanceTests
+from matter.testing.runner import default_matter_test_main
+from matter.testing.spec_parsing import (ConformanceParseParameters, XmlCluster, XmlFeature, parse_callable_from_xml,
+ parse_single_device_type)
+
+log = logging.getLogger(__name__)
+
+# A mock XML representing a device type with composedDeviceTypes
+MOCK_DEVICE_TYPE_XML = """
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+"""
+
+
+class TestSpecParsingComposedDeviceTypes(DeviceConformanceTests):
+ def setup_class(self):
+ self.xml_device_types = {}
+ self.xml_clusters = {}
+ self.problems = []
+
+ # Parse the mock XML
+ root = ElementTree.fromstring(MOCK_DEVICE_TYPE_XML)
+
+ # We need to mock the cluster definition for the parser to resolve features
+ mock_cluster = XmlCluster(
+ name="Cluster X",
+ revision=1,
+ derived=None,
+ features={0x01: XmlFeature(code="ALTC", name="Alternate Cluster", conformance=parse_callable_from_xml(
+ ElementTree.Element('mandatoryConform'), ConformanceParseParameters({}, {}, {})))},
+ feature_map={"ALTC": 0x01}, # Map code to mask
+ attribute_map={},
+ command_map={},
+ attributes={},
+ accepted_commands={},
+ generated_commands={},
+ unknown_commands=[],
+ events={},
+ structs={},
+ enums={},
+ bitmaps={},
+ pics="",
+ is_provisional=False,
+ revision_desc={}
+ )
+ mock_clusters = {0x0090: mock_cluster}
+
+ device_types, problems = parse_single_device_type(root, mock_clusters)
+ self.problems.extend(problems)
+ self.xml_device_types.update(device_types)
+ self.xml_clusters.update(mock_clusters)
+
+ def test_parsing_correctness(self):
+ asserts.assert_equal(len(self.problems), 0, f"Unexpected problems during parsing: {self.problems}")
+
+ dt_parent_id = 0x0001
+ asserts.assert_in(dt_parent_id, self.xml_device_types, "Parent device type not found in parsed output")
+
+ xml_device = self.xml_device_types[dt_parent_id]
+ asserts.assert_equal(len(xml_device.composed_device_types), 2, "Expected 2 composed device type requirements")
+
+ # Check base requirement
+ req_base = xml_device.composed_device_types[0]
+ asserts.assert_equal(req_base.min_instances, 2, "Expected min_instances = 2 for base requirement")
+
+ # Check override requirement
+ req_override = xml_device.composed_device_types[1]
+ asserts.assert_in(0x0090, req_override.cluster_requirements, "Expected cluster requirement for 0x0090")
+
+ cr = req_override.cluster_requirements[0x0090]
+ asserts.assert_in(0x01, cr.feature_overrides, "Expected feature override for mask 0x01")
+
+ def test_enforcement_with_fake_device(self):
+ dt_parent_id = 0x0001
+ dt_child_id = 0x0002
+ cluster_x_id = 0x0090
+
+ # Scenario: We need at least 2 child devices.
+ # One of them must have Cluster X with feature ALTC (mask 0x01).
+
+ # Create mock endpoints
+ self.endpoints = {
+ 0: {
+ Clusters.BasicInformation: {
+ Clusters.BasicInformation.Attributes.SpecificationVersion: 0x01060000
+ }
+ },
+ 1: {
+ Clusters.Descriptor: {
+ Clusters.Descriptor.Attributes.DeviceTypeList: [Clusters.Descriptor.Structs.DeviceTypeStruct(deviceType=dt_parent_id, revision=1)],
+ Clusters.Descriptor.Attributes.PartsList: [2, 3]
+ }
+ },
+ 2: {
+ Clusters.Descriptor: {
+ Clusters.Descriptor.Attributes.DeviceTypeList: [Clusters.Descriptor.Structs.DeviceTypeStruct(deviceType=dt_child_id, revision=1)],
+ Clusters.Descriptor.Attributes.ServerList: [cluster_x_id]
+ },
+
+ }
+ }
+
+ # Let's use the cluster class as key!
+ cluster_class = Clusters.ClustersObjects.ALL_CLUSTERS[cluster_x_id] if hasattr(Clusters, 'ClustersObjects') else None
+ # Wait, let's check how to get cluster class in TestConformanceTest!
+ # It used `Clusters.ClusterObjects.ALL_CLUSTERS[cid]`!
+
+ cluster_class = Clusters.ClusterObjects.ALL_CLUSTERS[cluster_x_id]
+
+ self.endpoints[2][cluster_class] = {
+ cluster_class.Attributes.FeatureMap: 0x01, # Has ALTC!
+ cluster_class.Attributes.AttributeList: [],
+ cluster_class.Attributes.AcceptedCommandList: [],
+ cluster_class.Attributes.ClusterRevision: 1
+ }
+
+ # EP3: Child device without Cluster X (to satisfy min 2 constraint)
+ self.endpoints[3] = {
+ Clusters.Descriptor: {
+ Clusters.Descriptor.Attributes.DeviceTypeList: [Clusters.Descriptor.Structs.DeviceTypeStruct(deviceType=dt_child_id, revision=1)],
+ Clusters.Descriptor.Attributes.ServerList: []
+ }
+ }
+
+ success, problems = self.check_composed_device_type_requirements()
+ for p in problems:
+ log.info(p)
+ asserts.assert_true(success, "Failure on enforcement with fake device")
+
+
+if __name__ == "__main__":
+ default_matter_test_main()