Skip to content

Commit

Permalink
Merge pull request #179 from sunava/failure_handling
Browse files Browse the repository at this point in the history
[failure handling]  A subclass of FailureHandling
  • Loading branch information
tomsch420 authored Aug 27, 2024
2 parents 3e05312 + 11d7ba9 commit b020755
Show file tree
Hide file tree
Showing 3 changed files with 315 additions and 78 deletions.
114 changes: 101 additions & 13 deletions src/pycram/failure_handling.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
from .datastructures.enums import State
from .designator import DesignatorDescription
from .plan_failures import PlanFailure
from threading import Lock
from typing_extensions import Union, Tuple, Any, List
from .language import Language, Monitor


class FailureHandling:
class FailureHandling(Language):
"""
Base class for failure handling mechanisms in automated systems or workflows.
Expand All @@ -11,11 +15,12 @@ class FailureHandling:
to be extended by subclasses that implement specific failure handling behaviors.
"""

def __init__(self, designator_description: DesignatorDescription):
def __init__(self, designator_description: Union[DesignatorDescription, Monitor]):
"""
Initializes a new instance of the FailureHandling class.
:param designator_description: The description or context of the task or process for which the failure handling is being set up.
:param Union[DesignatorDescription, Monitor] designator_description: The description or context of the task
or process for which the failure handling is being set up.
"""
self.designator_description = designator_description

Expand All @@ -37,15 +42,10 @@ class Retry(FailureHandling):
This class represents a specific failure handling strategy where the system
attempts to retry a failed action a certain number of times before giving up.
Attributes:
max_tries (int): The maximum number of attempts to retry the action.
Inherits:
All attributes and methods from the FailureHandling class.
Overrides:
perform(): Implements the retry logic.
"""
max_tries: int
"""
The maximum number of attempts to retry the action.
"""

def __init__(self, designator_description: DesignatorDescription, max_tries: int = 3):
Expand All @@ -58,7 +58,7 @@ def __init__(self, designator_description: DesignatorDescription, max_tries: int
super().__init__(designator_description)
self.max_tries = max_tries

def perform(self):
def perform(self) -> Tuple[State, List[Any]]:
"""
Implementation of the retry mechanism.
Expand All @@ -79,5 +79,93 @@ def perform(self):
raise e


class RetryMonitor(FailureHandling):
"""
A subclass of FailureHandling that implements a retry mechanism that works with a Monitor.
This class represents a specific failure handling strategy that allows us to retry a demo that is
being monitored, in case that monitoring condition is triggered.
"""
max_tries: int
"""
The maximum number of attempts to retry the action.
"""
recovery: dict
"""
A dictionary that maps exception types to recovery actions
"""

def __init__(self, designator_description: Monitor, max_tries: int = 3, recovery: dict = None):
"""
Initializes a new instance of the RetryMonitor class.
:param Monitor designator_description: The Monitor instance to be used.
:param int max_tries: The maximum number of attempts to retry. Defaults to 3.
:param dict recovery: A dictionary that maps exception types to recovery actions. Defaults to None.
"""
super().__init__(designator_description)
self.max_tries = max_tries
self.lock = Lock()
if recovery is None:
self.recovery = {}
else:
if not isinstance(recovery, dict):
raise ValueError(
"Recovery must be a dictionary with exception types as keys and Language instances as values.")
for key, value in recovery.items():
if not issubclass(key, BaseException):
raise TypeError("Keys in the recovery dictionary must be exception types.")
if not isinstance(value, Language):
raise TypeError("Values in the recovery dictionary must be instances of the Language class.")
self.recovery = recovery

def perform(self) -> Tuple[State, List[Any]]:
"""
This method attempts to perform the Monitor + plan specified in the designator_description. If the action
fails, it is retried up to max_tries times. If all attempts fail, the last exception is raised. In every
loop, we need to clear the kill_event, and set all relevant 'interrupted' variables to False, to make sure
the Monitor and plan are executed properly again.
:raises PlanFailure: If all retry attempts fail.
:return: The state of the execution performed, as well as a flattened list of the
results, in the correct order
"""

def reset_interrupted(child):
child.interrupted = False
try:
for sub_child in child.children:
reset_interrupted(sub_child)
except AttributeError:
pass

def flatten(result):
flattened_list = []
if result:
for item in result:
if isinstance(item, list):
flattened_list.extend(item)
else:
flattened_list.append(item)
return flattened_list
return None

status, res = None, None
with self.lock:
tries = 0
while True:
self.designator_description.kill_event.clear()
self.designator_description.interrupted = False
for child in self.designator_description.children:
reset_interrupted(child)
try:
status, res = self.designator_description.perform()
break
except PlanFailure as e:
tries += 1
if tries >= self.max_tries:
raise e
exception_type = type(e)
if exception_type in self.recovery:
self.recovery[exception_type].perform()
return status, flatten(res)
Loading

0 comments on commit b020755

Please sign in to comment.