Skip to content

Commit

Permalink
WIP fix: correct bad indentation in business_rule unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Baptiste O'Jeanson committed Oct 19, 2023
1 parent e35da7e commit e0120b1
Show file tree
Hide file tree
Showing 10 changed files with 241 additions and 205 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ def __init__(self, expected_label: str):
self.expected_label = expected_label

def get_camera_decision(self, inference: Dict[str, Union[str, Dict]]) -> Decision:
camera_decision = Decision.NO_DECISION
for inf in inference:
if inf in self.expected_label:
camera_decision = Decision.OK
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ def __init__(self, class_to_detect: str, max_threshold: int):
def get_camera_decision(self, inference: Dict[str, Union[str, Dict]]) -> Decision:
objects_of_interest = [obj for obj in inference if obj in self.class_to_detect]

if len(objects_of_interest) < self.max_threshold:
if len(inference) == 0:
camera_decision = Decision.NO_DECISION
elif len(objects_of_interest) < self.max_threshold:
camera_decision = Decision.KO
else:
camera_decision = Decision.OK
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ def __init__(self, class_to_detect: str, min_threshold: int):
def get_camera_decision(self, inference: Dict[str, Union[str, Dict]]) -> Decision:
objects_of_interest = [obj for obj in inference if obj in self.class_to_detect]

if len(objects_of_interest) < self.min_threshold:
if len(inference) == 0:
camera_decision = Decision.NO_DECISION
elif len(objects_of_interest) < self.min_threshold:
camera_decision = Decision.KO
else:
camera_decision = Decision.OK
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ def get_item_decision(self, cameras_decisions: Dict[str, str]) -> Decision:
if decision == Decision.KO.value
]

if len(ko_decisions) >= self.threshold:
if len(cameras_decisions) == 0:
return Decision.NO_DECISION
elif len(ko_decisions) >= self.threshold:
return Decision.KO
else:
return Decision.OK
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ def get_item_decision(self, cameras_decisions: Dict[str, str]) -> Decision:

ratio_ok = len(ok_decisions) / len(cameras_decisions)

if ratio_ok >= self.min_threshold:
if len(cameras_decisions) == 0:
return Decision.NO_DECISION
elif ratio_ok >= self.min_threshold:
return Decision.OK
else:
return Decision.KO
Original file line number Diff line number Diff line change
Expand Up @@ -213,9 +213,7 @@ def apply_business_rules(self, item: Item) -> Decision:
]["camera_rule"]["name"]
camera_rule_parameters = self.station_config.active_config["cameras"][
camera_id
]["camera_rule"][
"parameters"
] # noqa
]["camera_rule"]["parameters"]

last_model_inferences = get_last_inference_by_camera(
item.inferences[camera_id]
Expand All @@ -231,7 +229,7 @@ def apply_business_rules(self, item: Item) -> Decision:
labels_of_last_model_inferences
)

camera_decisions[f"{camera_id}"] = camera_decision.value
camera_decisions[camera_id] = camera_decision.value

item_rule_name = self.station_config.active_config["item_rule"]["name"]
item_rule_parameters = self.station_config.active_config["item_rule"][
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
from edge_orchestrator.domain.models.business_rule.camera_rule.camera_rule_factory import (
get_camera_rule,
)
from edge_orchestrator.domain.models.camera import (
get_last_inference_by_camera,
)
from edge_orchestrator.domain.models.decision import Decision
from edge_orchestrator.domain.use_cases.supervisor import get_labels


class TestCameraBusinessRule:
def test_camera_decision_should_return_KO_when_expected_label_is_OK(self): # noqa
# Given
inferences = {
"camera_id3": {
"model_id4": {"full_image": {"label": "KO", "probability": 0.999930501}}
}
}

# When
camera_decisions = {}
for camera in inferences:
camera_rule_name = "expected_label_rule"
camera_rule_parameters = {"expected_label": ["OK"]}

last_model_inferences = get_last_inference_by_camera(inferences[camera])
labels_of_last_model_inferences = get_labels(last_model_inferences)

item_camera_rule = get_camera_rule(camera_rule_name)(
**camera_rule_parameters
)
camera_decision = item_camera_rule.get_camera_decision(
labels_of_last_model_inferences
)

camera_decisions[f"{camera}"] = camera_decision.value

# Then
assert camera_decisions == {"camera_id3": "KO"}

def test_camera_decision_should_return_OK_when_minimum_one_person_is_detected(
self,
): # noqa
# Given
inferences = {
"camera_id3": {
"model_id5": {
"object_1": {
"location": [155, 413, 381, 709],
"score": 0.773778856,
"label": "person",
},
"object_2": {
"location": [422, 10, 719, 720],
"score": 0.709803939,
"label": "bicycle",
},
"object_3": {
"location": [623, 430, 757, 648],
"score": 0.523171604,
"label": "couch",
},
}
}
}

# When
camera_decisions = {}
for camera in inferences:
camera_rule_name = "min_nb_objects_rule"
camera_rule_parameters = {"class_to_detect": ["person"], "min_threshold": 1}

last_model_inferences = get_last_inference_by_camera(inferences[camera])
labels_of_last_model_inferences = get_labels(last_model_inferences)

item_camera_rule = get_camera_rule(camera_rule_name)(
**camera_rule_parameters
)
camera_decision = item_camera_rule.get_camera_decision(
labels_of_last_model_inferences
)

camera_decisions[f"{camera}"] = camera_decision.value

# Then
assert camera_decisions == {"camera_id3": "OK"}

def test_camera_decision_should_return_OK_when_minimum_one_face_is_detected_with_two_object_detection_models(
self,
): # noqa
# Given
inferences = {
"camera_id3": {
"model_id1": {
"object_1": {
"label": "person",
"location": [351, 110, 508, 361],
"score": 0.98046875,
},
"object_2": {
"label": "person",
"location": [233, 73, 385, 397],
"score": 0.91015625,
},
"object_3": {
"label": "person",
"location": [7, 3, 240, 509],
"score": 0.87890625,
},
"object_4": {
"label": "person",
"location": [493, 93, 678, 389],
"score": 0.87890625,
},
"object_5": {
"label": "person",
"location": [135, 35, 276, 417],
"score": 0.83984375,
},
"object_6": {
"label": "person",
"location": [520, 47, 804, 527],
"score": 0.58203125,
},
},
"model_id6": {
"object_1": {
"label": "face",
"location": [555, 97, 611, 207],
"score": 0.98046875,
},
"object_2": {
"label": "face",
"location": [645, 46, 727, 180],
"score": 0.5,
},
},
}
}

# When
camera_decisions = {}
camera = "camera_id3"
camera_rule_name = "min_nb_objects_rule"
camera_rule_parameters = {"class_to_detect": ["face"], "min_threshold": 1}

item_camera_rule = get_camera_rule(camera_rule_name)(**camera_rule_parameters)
camera_decision = item_camera_rule.get_camera_decision(inferences[camera])

camera_decisions[f"{camera}"] = camera_decision.value

# Then
assert camera_decisions == {"camera_id3": "OK"}

def test_camera_decision_should_return_OK_when_minimum_one_connected_cellphone_is_detected_with_one_object_detection_and_one_classification_model( # noqa
self,
):
# Given
inferences = {
"camera_id1": {
"model_id1": {
"object_3": {
"label": "cell phone",
"location": [427, 227, 467, 278],
"score": 0.41796875,
}
},
"model_id6": {
"object_3": {
"label": "unconnected",
"probability": 0.9975850582122803,
}
},
}
}

# When
camera_decisions = {}
camera = "camera_id1"
camera_rule_name = "min_nb_objects_rule"
camera_rule_parameters = {
"class_to_detect": ["connected"],
"min_threshold": 1,
}

item_camera_rule = get_camera_rule(camera_rule_name)(**camera_rule_parameters)
camera_decision = item_camera_rule.get_camera_decision(inferences[camera])

camera_decisions[f"{camera}"] = camera_decision.value

# Then
assert camera_decisions == {"camera_id1": "KO"}

def test_camera_decision_should_return_no_decision_without_inference_results(self):
# Given
inferences = {}

# When
camera_rule_name = "min_nb_objects_rule"
camera_rule_parameters = {
"class_to_detect": ["connected"],
"min_threshold": 1,
}

item_camera_rule = get_camera_rule(camera_rule_name)(**camera_rule_parameters)
camera_decision = item_camera_rule.get_camera_decision(inferences)

# Then
assert camera_decision == Decision.NO_DECISION
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,19 @@ def test_item_decision_should_return_decision_ok_when_more_than_50_pct_of_camera

# Then
assert item_decision == Decision.OK

def test_item_decision_should_return_no_decision_ko_with_no_camera_decision(
self,
): # noqa
# Given
camera_decisions = {}

# When
item_rule_name = "min_threshold_ko_rule"
item_rule_parameters = {"threshold": 1}

item_rule = get_item_rule(item_rule_name)(**item_rule_parameters)
item_decision = item_rule.get_item_decision(camera_decisions)

# Then
assert item_decision == Decision.NO_DECISION
Loading

0 comments on commit e0120b1

Please sign in to comment.