Skip to content
This repository has been archived by the owner on Mar 23, 2024. It is now read-only.

Commit

Permalink
Akshanjay/optimization objective skeleton code (#37)
Browse files Browse the repository at this point in the history
* Skeleton code for the path_objective file

* Added the skeleton code for ompl_path file

* Added the base test cases

* Reduced the number of inputs to the updated objective functions

* Linted the path_objective

* Fixed a linting error in ompl_path and test_path_objective

* Add file docstring

* Rename path_objective to objectives

* Fix docstring

* Cleanup

* Remove unnecessary global variables

---------

Co-authored-by: Patrick Creighton <[email protected]>
  • Loading branch information
FireBoyAJ24 and patrick-5546 authored Oct 21, 2023
1 parent f66766d commit 110a103
Show file tree
Hide file tree
Showing 4 changed files with 167 additions and 4 deletions.
2 changes: 1 addition & 1 deletion launch/main_launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def setup_launch(context: LaunchContext) -> List[Node]:
context (LaunchContext): The current launch context.
Returns:
List[LaunchDescriptionEntity]: Launch descriptions.
List[Node]: Nodes to launch.
"""
launch_description_entities = list()
launch_description_entities.append(get_navigate_node_description(context))
Expand Down
93 changes: 93 additions & 0 deletions local_pathfinding/objectives.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
"""Our custom OMPL optimization objectives."""

from ompl import base as ob


class Objective(ob.StateCostIntegralObjective):
"""All of our optimization objectives inherit from this class.
Notes:
- This class inherits from the OMPL class StateCostIntegralObjective:
https://ompl.kavrakilab.org/classompl_1_1base_1_1StateCostIntegralObjective.html
- Camelcase is used for functions that override OMPL functions, as that is their convention.
Attributes:
space_information: https://ompl.kavrakilab.org/classompl_1_1base_1_1SpaceInformation.html
"""

def __init__(self, space_information):
super().__init__(si=space_information, enableMotionCostInterpolation=True)
self.space_information = space_information

def motionCost(self, s1: ob.SE2StateSpace, s2: ob.SE2StateSpace) -> ob.Cost:
raise NotImplementedError


class DistanceObjective(Objective):
def __init__(self, space_information):
super().__init__(space_information)

def motionCost(self, s1: ob.SE2StateSpace, s2: ob.SE2StateSpace) -> ob.Cost:
return ob.Cost()

def _get_path_length_objective(self):
raise NotImplementedError

def _get_euclidean_path_length_objective(self, s1, s2):
raise NotImplementedError

def _get_latlon_path_length_objective(self, s1, s2):
raise NotImplementedError


class MinimumTurningObjective(Objective):
def __init__(self, space_information, simple_setup, heading_degrees: float):
super().__init__(space_information)
self.simple_setup = simple_setup
self.heading_degrees = heading_degrees

def motionCost(self, s1: ob.SE2StateSpace, s2: ob.SE2StateSpace) -> ob.Cost:
return ob.Cost()

def _goal_path_turn_cost(self, s1, s2):
raise NotImplementedError

def _goal_heading_turn_cost(self, s1):
raise NotImplementedError

def _heading_path_turn_cost(self, s1, s2):
raise NotImplementedError


class WindObjective(Objective):
def __init__(self, space_information, wind_direction_degrees: float):
super().__init__(space_information)
self.wind_direction_degrees = wind_direction_degrees

# This objective function punishes the boat for going up/downwind
def motionCost(self, s1: ob.SE2StateSpace, s2: ob.SE2StateSpace) -> ob.Cost:
return ob.Cost()

@staticmethod
def _is_upwind(wind_direction_radians: float, boat_direction_radians: float) -> bool:
raise NotImplementedError

@staticmethod
def _is_downwind(wind_direction_radians: float, boat_direction_radians: float) -> bool:
raise NotImplementedError


def get_sailing_objective(
space_information, simple_setup, heading_degrees: float, wind_direction_degrees: float
) -> ob.OptimizationObjective:
objective = ob.MultiOptimizationObjective(si=space_information)
objective.addObjective(objective=DistanceObjective(space_information), weight=1.0)
objective.addObjective(
objective=MinimumTurningObjective(space_information, simple_setup, heading_degrees),
weight=100.0,
)
objective.addObjective(
objective=WindObjective(space_information, wind_direction_degrees), weight=1.0
)

return objective
18 changes: 15 additions & 3 deletions local_pathfinding/ompl_path.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
from ompl import util as ou
from rclpy.impl.rcutils_logger import RcutilsLogger

from local_pathfinding.objectives import get_sailing_objective

if TYPE_CHECKING:
from local_pathfinding.local_path import LocalPathState

Expand All @@ -24,6 +26,9 @@
class OMPLPathState:
def __init__(self, local_path_state: LocalPathState):
# TODO: derive OMPLPathState attributes from local_path_state
self.headingDirection = 45
self.windDirection = 10

self.state_domain = (-1, 1)
self.state_range = (-1, 1)
self.start_state = (0.5, 0.4)
Expand Down Expand Up @@ -87,7 +92,6 @@ def get_waypoints(self) -> List[Tuple[float, float]]:

def update_objectives(self):
"""Update the objectives on the basis of which the path is optimized.
Raises:
NotImplementedError: Method or function hasn't been implemented yet.
"""
Expand Down Expand Up @@ -137,13 +141,21 @@ def _init_simple_setup(self) -> og.SimpleSetup:
)
simple_setup.setStartAndGoalStates(start, goal)

# Constructs a space information instance for this simple setup
space_information = simple_setup.getSpaceInformation()

# set the optimization objective of the simple setup object
# TODO: implement and add optimization objective here
# simple_setup.setOptimizationObjective(objective)

objective = get_sailing_objective(
space_information, simple_setup, self.state.headingDirection, self.state.windDirection
)
simple_setup.setOptimizationObjective(objective)

# set the planner of the simple setup object
# TODO: implement and add planner here
# simple_setup.setPlanner(planner)
planner = og.RRTstar(space_information)
simple_setup.setPlanner(planner)

return simple_setup

Expand Down
58 changes: 58 additions & 0 deletions test/test_objectives.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import pytest
from rclpy.impl.rcutils_logger import RcutilsLogger

import local_pathfinding.objectives as objectives
import local_pathfinding.ompl_path as ompl_path

PATH = ompl_path.OMPLPath(
parent_logger=RcutilsLogger(),
max_runtime=1,
local_path_state=None, # type: ignore[arg-type] # None is placeholder
)


def test_distance_objective():
distance_objective = objectives.DistanceObjective(PATH._simple_setup.getSpaceInformation())
assert distance_objective is not None


def test_get_path_length_objective():
with pytest.raises(NotImplementedError):
raise NotImplementedError


def test_get_euclidean_path_length_objective():
with pytest.raises(NotImplementedError):
raise NotImplementedError


def test_get_latlon_path_length_objective():
with pytest.raises(NotImplementedError):
raise NotImplementedError


def test_minimum_turning_objective():
minimum_turning_objective = objectives.MinimumTurningObjective(
PATH._simple_setup.getSpaceInformation(), None, 0
)
assert minimum_turning_objective is not None


def test_goalPathTurnCost():
with pytest.raises(NotImplementedError):
raise NotImplementedError


def test_goalHeadingTurnCost():
with pytest.raises(NotImplementedError):
raise NotImplementedError


def test_headingPathTurnCost():
with pytest.raises(NotImplementedError):
raise NotImplementedError


def test_wind_objective():
wind_objective = objectives.WindObjective(PATH._simple_setup.getSpaceInformation(), 0)
assert wind_objective is not None

0 comments on commit 110a103

Please sign in to comment.