Skip to content

Commit

Permalink
Merge pull request #112 from sunava/failure_handling
Browse files Browse the repository at this point in the history
[failure_handling] rewrote failure handling, added test cases
  • Loading branch information
Tigul authored Jan 3, 2024
2 parents 6639625 + 7031edb commit 7c5a676
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 0 deletions.
8 changes: 8 additions & 0 deletions src/pycram/designator.py
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,14 @@ def ground(self) -> Action:
"""Fill all missing parameters and chose plan to execute. """
raise NotImplementedError(f"{type(self)}.ground() is not implemented.")

def __iter__(self):
"""
Iterate through all possible actions fitting this description
:yield: A resolved action designator
"""
yield self.ground()


class LocationDesignatorDescription(DesignatorDescription):
"""
Expand Down
93 changes: 93 additions & 0 deletions src/pycram/failure_handling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
from .designator import DesignatorDescription
from .plan_failures import PlanFailure

class FailureHandling:
"""
Base class for failure handling mechanisms in automated systems or workflows.
This class provides a structure for implementing different strategies to handle
failures that may occur during the execution of a plan or process. It is designed
to be extended by subclasses that implement specific failure handling behaviors.
Attributes:
designator_description (DesignatorDescription): An instance of DesignatorDescription.
Methods:
perform(): Abstract method
"""

def __init__(self, designator_description:DesignatorDescription):
"""
Initializes a new instance of the FailureHandling class.
Args:
designator_description (DesignatorDescription): The description or context
of the task or process for which the failure handling is being set up.
"""
self.designator_description = designator_description

def perform(self):
"""
Abstract method to perform the failure handling mechanism.
This method should be overridden in subclasses to implement the specific
behavior for handling failures.
Raises:
NotImplementedError: If the method is not implemented in a subclass.
"""
raise NotImplementedError()

class Retry(FailureHandling):
"""
A subclass of FailureHandling that implements a retry mechanism.
This class represents a specific failure handling strategy where the system
attempts to retry a failed action a certain number of times before giving up.
Attributes:
max_tries (int): The maximum number of attempts to retry the action.
Inherits:
All attributes and methods from the FailureHandling class.
Overrides:
perform(): Implements the retry logic.
"""

def __init__(self, designator_description:DesignatorDescription, max_tries:int=3):
"""
Initializes a new instance of the Retry class.
Args:
designator_description (DesignatorDescription): The description or context
of the task or process for which the retry mechanism is being set up.
max_tries (int, optional): The maximum number of attempts to retry. Defaults to 3.
"""
super().__init__(designator_description)
self.max_tries = max_tries

def perform(self):
"""
Implementation of the retry mechanism.
This method attempts to perform the action specified in the designator_description.
If the action fails, it is retried up to max_tries times. If all attempts fail,
the last exception is raised.
Raises:
PlanFailure: If all retry attempts fail.
"""
tries = 0
for action in iter(self.designator_description):
tries += 1
try:
action.perform()
break
except PlanFailure as e:
if tries >= self.max_tries:
raise e




57 changes: 57 additions & 0 deletions test/test_failure_handling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import unittest

import roslaunch

from pycram.bullet_world import BulletWorld, Object
from pycram.designator import ActionDesignatorDescription
from pycram.designators.action_designator import ParkArmsAction
from pycram.enums import ObjectType, Arms
from pycram.failure_handling import Retry
from pycram.plan_failures import PlanFailure
from pycram.process_module import ProcessModule, simulated_robot
from pycram.robot_descriptions import robot_description


# start ik_and_description.launch
class DummyActionDesignator(ActionDesignatorDescription):
class Action(ActionDesignatorDescription.Action):
def perform(self):
raise PlanFailure("Dummy action failed")

def __iter__(self):
for _ in range(100):
yield self.Action()


class FailureHandlingTest(unittest.TestCase):
world: BulletWorld
process: roslaunch.scriptapi.ROSLaunch

@classmethod
def setUpClass(cls):
cls.world = BulletWorld("DIRECT")
cls.robot = Object(robot_description.name, ObjectType.ROBOT, robot_description.name + ".urdf")
ProcessModule.execution_delay = True

def setUp(self):
self.world.reset_bullet_world()

def test_retry_with_success(self):
with simulated_robot:
Retry(ParkArmsAction([Arms.LEFT]), max_tries=5).perform()

def test_retry_with_failure(self):
with simulated_robot:
with self.assertRaises(PlanFailure):
Retry(DummyActionDesignator(), max_tries=5).perform()

def tearDown(self):
self.world.reset_bullet_world()

@classmethod
def tearDownClass(cls):
cls.world.exit()


if __name__ == '__main__':
unittest.main()

0 comments on commit 7c5a676

Please sign in to comment.