diff --git a/modules/condition.py b/modules/condition.py new file mode 100644 index 0000000..cf3b92d --- /dev/null +++ b/modules/condition.py @@ -0,0 +1,23 @@ +""" +Abstract class for evaluating drone conditions +""" + +import abc + + +class Condition(abc.ABC): + """ + Wrapper for different classes to check conditions + """ + + @abc.abstractmethod + def evaluate_condition(self) -> bool: + """ + The appropriate evaluate method depending on the child + """ + + @abc.abstractmethod + def message(self) -> None: + """ + A message to output when the condition evalues to true + """ diff --git a/modules/condition_evaluator.py b/modules/condition_evaluator.py new file mode 100644 index 0000000..790996a --- /dev/null +++ b/modules/condition_evaluator.py @@ -0,0 +1,36 @@ +""" +Module to evaluate multiple Evaluator objects +""" + +from . import condition + + +class ConditionEvaluator: + """ + Attributes: + condition_list : A list of EvaluateCondition objects, representing all the + conditions to evaluate + """ + + def __init__(self, condition_list: "list[condition.Condition]") -> None: + """ + Constructor + + Parameters: + condition_list : A list of EvaluateCondition objects, representing conditions + to evaluate + """ + self.evaluate_object_list = condition_list + + def evaluate_all_conditions(self) -> bool: + """ + Runs all the evaluate objects. If any of the evaluations return true, evaluate_all outputs + status messages to the console and returns true. + """ + evaluation = False + for evaluate_object in self.evaluate_object_list: + if evaluate_object.evaluate_condition(): + evaluation = True + evaluate_object.message() + + return evaluation diff --git a/modules/mission_time_condition.py b/modules/mission_time_condition.py new file mode 100644 index 0000000..547fd6a --- /dev/null +++ b/modules/mission_time_condition.py @@ -0,0 +1,69 @@ +""" +Checks whether the drone has reached its max flight time and sends it back to launch. +""" + +import time + +from . import condition + + +class MissionTimeCondition(condition.Condition): + """ + Checks if drone exceeds the maximum flight time limit, inherits from Evaluate class + """ + + __create_key = object() + + @classmethod + def create( + cls, start_time: "float | None", maximum_flight_time: "float | None" + ) -> "tuple[bool, MissionTimeCondition | None]": + """ + start_time: float + The time the drone started the mission in seconds. + maximum_flight_time: float + Max flight time for drone in seconds. + """ + if start_time is None: + return False, None + + if maximum_flight_time is None: + return False, None + + return True, MissionTimeCondition(cls.__create_key, start_time, maximum_flight_time) + + def __init__( + self, class_private_create_key: object, start_time: float, maximum_flight_time: float + ) -> None: + """ + Private constructor, use create() method + """ + assert class_private_create_key is MissionTimeCondition.__create_key + self.start_time = start_time + self.maximum_flight_time = maximum_flight_time + + def evaluate_condition(self) -> bool: + """ + Evaluates whether the drone should land based on time remaining. + """ + current_time = time.time() + if current_time - self.start_time < self.maximum_flight_time: + return False + + return True + + def output_time_elapsed(self) -> None: + """ + Outputs the total time elapsed during the mission. + """ + current_time = time.time() + print(f"Elapsed time (s): {current_time - self.start_time}") + + def message(self) -> None: + """ + Outputs status when the drone has exceeded the time limit. + """ + print("This mission has exceeded the maximum flight time limit.") + print(f"Specified maximum flight time limit: {self.maximum_flight_time}") + print(f"Mission start time: {self.start_time}") + print(f"Time when condition was met: {time.time()}") diff --git a/path_2024_task_1.py b/path_2024_task_1.py index a243225..d20af38 100644 --- a/path_2024_task_1.py +++ b/path_2024_task_1.py @@ -3,16 +3,21 @@ """ import pathlib +import time import dronekit +from modules import condition_evaluator from modules import load_waypoint_name_to_coordinates_map +from modules import mission_time_condition from modules import waypoints_dict_to_list TAKEOFF_WAYPOINT_FILE_PATH = pathlib.Path("2024", "waypoints", "takeoff_waypoint_task_1") LAP_WAYPOINTS_FILE_PATH = pathlib.Path("2024", "waypoints", "lap_waypoints_task_1.csv") CONNECTION_ADDRESS = "tcp:localhost:14550" +DELAY = 0.1 # seconds +MAXIMUM_FLIGHT_TIME = 1800 # in seconds def main() -> int: @@ -64,6 +69,28 @@ def main() -> int: print("ERROR: Convert waypoint dictionary to list") return -1 + start_time = time.time() + result, time_condition = mission_time_condition.MissionTimeCondition.create( + start_time, MAXIMUM_FLIGHT_TIME + ) + + if not result: + print("Error: Mission time condition") + return -1 + + return_to_launch_evaluator = condition_evaluator.ConditionEvaluator([time_condition]) + + while True: + # Send drone back to launch if exceeds time limit + + should_return_to_launch = return_to_launch_evaluator.evaluate_all_conditions() + if should_return_to_launch: + break + + time_condition.output_time_elapsed() + + time.sleep(DELAY) + return 0 diff --git a/tests/integration/test_check_stop_condition.py b/tests/integration/test_check_stop_condition.py index 2fc448b..03b2d67 100644 --- a/tests/integration/test_check_stop_condition.py +++ b/tests/integration/test_check_stop_condition.py @@ -5,7 +5,8 @@ import time from modules import add_takeoff_and_landing_command -from modules import check_stop_condition +from modules import condition_evaluator +from modules import mission_time_condition from modules import upload_commands from modules import waypoints_to_commands from modules.common.kml.modules import location_ground @@ -63,16 +64,22 @@ def main() -> int: # Loop mimics path_2024.py structure start_time = time.time() + success, check_time_condition = mission_time_condition.MissionTimeCondition.create( + start_time, MAXIMUM_FLIGHT_TIME + ) + if not success: + print("Unable to create the MissionTimeCondition object.") + return -1 + + return_to_launch_evaluator = condition_evaluator.ConditionEvaluator([check_time_condition]) + while True: # Check whether drone exceeds max flight time - current_time = time.time() - is_returning_to_launch = check_stop_condition.check_stop_condition( - start_time, current_time, controller.drone, MAXIMUM_FLIGHT_TIME - ) - if is_returning_to_launch: + should_return_to_launch = return_to_launch_evaluator.evaluate_all_conditions() + if should_return_to_launch: break - print(f"Elapsed time (s): {current_time - start_time}") + check_time_condition.output_time_elapsed() time.sleep(DELAY)