Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove Obelisk Message Type Restrictions (Python) #130

Merged
merged 3 commits into from
Jan 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions obelisk/python/obelisk_py/core/control.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from abc import ABC, abstractmethod
from typing import Type

from obelisk_py.core.node import ObeliskNode
from obelisk_py.core.obelisk_typing import ObeliskControlMsg, ObeliskEstimatorMsg


class ObeliskController(ABC, ObeliskNode):
Expand All @@ -17,7 +17,7 @@ class ObeliskController(ABC, ObeliskNode):
the control message should be of type ObeliskControlMsg to be compatible with the Obelisk ecosystem.
"""

def __init__(self, node_name: str) -> None:
def __init__(self, node_name: str, ctrl_msg_type: Type, est_msg_type: Type) -> None:
"""Initialize the Obelisk controller."""
super().__init__(node_name)
self.register_obk_timer(
Expand All @@ -27,26 +27,26 @@ def __init__(self, node_name: str) -> None:
)
self.register_obk_publisher(
"pub_ctrl_setting",
msg_type=ctrl_msg_type,
key="pub_ctrl",
msg_type=None, # generic, specified in config file
)
self.register_obk_subscription(
"sub_est_setting",
self.update_x_hat,
msg_type=est_msg_type,
key="sub_est",
msg_type=None, # generic, specified in config file
)

@abstractmethod
def update_x_hat(self, x_hat_msg: ObeliskEstimatorMsg) -> None:
def update_x_hat(self, x_hat_msg: Type) -> None:
"""Update the state estimate.

Parameters:
x_hat_msg: The Obelisk message containing the state estimate.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the word Obelisk

"""

@abstractmethod
def compute_control(self) -> ObeliskControlMsg:
def compute_control(self) -> Type:
"""Compute the control signal.

This is the control timer callback and is expected to call 'publisher_ctrl' internally. Note that the control
Expand Down
28 changes: 4 additions & 24 deletions obelisk/python/obelisk_py/core/estimation.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
from abc import ABC, abstractmethod
from typing import Union

import obelisk_sensor_msgs.msg as osm
from rclpy.lifecycle.node import LifecycleState, TransitionCallbackReturn
from typing import Type

from obelisk_py.core.node import ObeliskNode
from obelisk_py.core.obelisk_typing import ObeliskEstimatorMsg, ObeliskSensorMsg
from obelisk_py.core.utils.internal import get_classes_in_module


class ObeliskEstimator(ABC, ObeliskNode):
Expand All @@ -32,40 +27,25 @@ def update_X(self, X_msg: ObeliskSensorMsg) -> None:
```
"""

def __init__(self, node_name: str) -> None:
def __init__(self, node_name: str, est_msg_type: Type) -> None:
"""Initialize the Obelisk estimator.

[NOTE] In derived classes, you should declare settings for sensor subscribers.
"""
super().__init__(node_name)
self._has_sensor_subscriber = False
self.register_obk_timer(
"timer_est_setting",
self.compute_state_estimate,
key="timer_est",
)
self.register_obk_publisher(
"pub_est_setting",
msg_type=est_msg_type,
key="pub_est",
msg_type=None, # generic, specified in config file
)

def on_configure(self, state: LifecycleState) -> TransitionCallbackReturn:
"""Configure the estimator."""
super().on_configure(state)

# ensure there is at least one sensor subscriber
for sub_dict in self._obk_sub_settings:
msg_type = sub_dict["msg_type"]
if msg_type in get_classes_in_module(osm):
self._has_sensor_subscriber = True
break
assert self._has_sensor_subscriber, "At least one sensor subscriber is required in an ObeliskEstimator!"

return TransitionCallbackReturn.SUCCESS

@abstractmethod
def compute_state_estimate(self) -> Union[ObeliskEstimatorMsg, ObeliskSensorMsg]:
def compute_state_estimate(self) -> Type:
"""Compute the state estimate.

This is the state estimate timer callback and is expected to call 'publisher_est' internally. Note that the
Expand Down
Loading