Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[failure handling] A subclass of FailureHandling #179

Merged
merged 9 commits into from
Aug 27, 2024
114 changes: 101 additions & 13 deletions src/pycram/failure_handling.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
from .datastructures.enums import State
from .designator import DesignatorDescription
from .plan_failures import PlanFailure
from threading import Lock
from typing_extensions import Union, Tuple, Any, List
from .language import Language, Monitor


class FailureHandling:
class FailureHandling(Language):
"""
Base class for failure handling mechanisms in automated systems or workflows.

Expand All @@ -11,11 +15,12 @@ class FailureHandling:
to be extended by subclasses that implement specific failure handling behaviors.
"""

def __init__(self, designator_description: DesignatorDescription):
def __init__(self, designator_description: Union[DesignatorDescription, Monitor]):
"""
Initializes a new instance of the FailureHandling class.

:param designator_description: The description or context of the task or process for which the failure handling is being set up.
:param Union[DesignatorDescription, Monitor] designator_description: The description or context of the task
or process for which the failure handling is being set up.
"""
self.designator_description = designator_description

Expand All @@ -37,15 +42,10 @@ class Retry(FailureHandling):

This class represents a specific failure handling strategy where the system
attempts to retry a failed action a certain number of times before giving up.

Attributes:
max_tries (int): The maximum number of attempts to retry the action.

Inherits:
All attributes and methods from the FailureHandling class.

Overrides:
perform(): Implements the retry logic.
"""
max_tries: int
"""
The maximum number of attempts to retry the action.
"""

def __init__(self, designator_description: DesignatorDescription, max_tries: int = 3):
Expand All @@ -58,7 +58,7 @@ def __init__(self, designator_description: DesignatorDescription, max_tries: int
super().__init__(designator_description)
self.max_tries = max_tries

def perform(self):
def perform(self) -> Tuple[State, List[Any]]:
"""
Implementation of the retry mechanism.

Expand All @@ -79,5 +79,93 @@ def perform(self):
raise e


class RetryMonitor(FailureHandling):
"""
A subclass of FailureHandling that implements a retry mechanism that works with a Monitor.

This class represents a specific failure handling strategy that allows us to retry a demo that is
being monitored, in case that monitoring condition is triggered.
"""
max_tries: int
"""
The maximum number of attempts to retry the action.
"""
recovery: dict
"""
A dictionary that maps exception types to recovery actions
"""

def __init__(self, designator_description: Monitor, max_tries: int = 3, recovery: dict = None):
"""
Initializes a new instance of the RetryMonitor class.
:param Monitor designator_description: The Monitor instance to be used.
:param int max_tries: The maximum number of attempts to retry. Defaults to 3.
:param dict recovery: A dictionary that maps exception types to recovery actions. Defaults to None.
"""
super().__init__(designator_description)
self.max_tries = max_tries
self.lock = Lock()
if recovery is None:
self.recovery = {}
else:
if not isinstance(recovery, dict):
raise ValueError(
"Recovery must be a dictionary with exception types as keys and Language instances as values.")
for key, value in recovery.items():
if not issubclass(key, BaseException):
raise TypeError("Keys in the recovery dictionary must be exception types.")
if not isinstance(value, Language):
raise TypeError("Values in the recovery dictionary must be instances of the Language class.")
self.recovery = recovery

def perform(self) -> Tuple[State, List[Any]]:
"""
This method attempts to perform the Monitor + plan specified in the designator_description. If the action
fails, it is retried up to max_tries times. If all attempts fail, the last exception is raised. In every
loop, we need to clear the kill_event, and set all relevant 'interrupted' variables to False, to make sure
the Monitor and plan are executed properly again.

:raises PlanFailure: If all retry attempts fail.

:return: The state of the execution performed, as well as a flattened list of the
results, in the correct order
"""

def reset_interrupted(child):
child.interrupted = False
try:
for sub_child in child.children:
reset_interrupted(sub_child)
except AttributeError:
pass

def flatten(result):
flattened_list = []
if result:
for item in result:
if isinstance(item, list):
flattened_list.extend(item)
else:
flattened_list.append(item)
return flattened_list
return None

status, res = None, None
with self.lock:
tries = 0
while True:
self.designator_description.kill_event.clear()
self.designator_description.interrupted = False
for child in self.designator_description.children:
reset_interrupted(child)
try:
status, res = self.designator_description.perform()
break
except PlanFailure as e:
tries += 1
if tries >= self.max_tries:
raise e
exception_type = type(e)
if exception_type in self.recovery:
self.recovery[exception_type].perform()
tomsch420 marked this conversation as resolved.
Show resolved Hide resolved
return status, flatten(res)
Loading
Loading