Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(components): try catch subscription callbacks #167

Merged
merged 4 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ Release Versions:

## Upcoming changes

- fix(components): remove incorrect log line (#166)
- fix(controllers): move predicate publishing rate parameter to BaseControllerInterface (#168)
- feat(components): get clproto message type from attribute (#175)
- fix(components): add missing test case (#181)
- fix(components): clean up lifecycle nodes properly (#178)
- fix(components): remove incorrect log line (#166)
- fix(controllers): move predicate publishing rate parameter to BaseControllerInterface (#168)
- feat(components): get clproto message type from attribute (#175)
- fix(components): add missing test case (#181)
- fix(components): clean up lifecycle nodes properly (#178)
- fix(components): try catch subscription callbacks (#167)

## 5.0.2

Expand Down
2 changes: 1 addition & 1 deletion aica-package.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#syntax=ghcr.io/aica-technology/package-builder:v1.3.0

[metadata]
version = "5.1.0-rc0002"
version = "5.1.0-rc0003"
description = "Modular ROS 2 extension library for dynamic composition of components and controllers with the AICA robotics framework"

[metadata.collection]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -736,8 +736,17 @@ inline void ComponentInterface::add_input(
RCLCPP_DEBUG_STREAM(
this->node_logging_->get_logger(),
"Adding input '" << parsed_signal_name << "' with topic name '" << topic_name << "'.");
auto subscription =
rclcpp::create_subscription<MsgT>(this->node_parameters_, this->node_topics_, topic_name, this->qos_, callback);
auto subscription = rclcpp::create_subscription<MsgT>(
domire8 marked this conversation as resolved.
Show resolved Hide resolved
this->node_parameters_, this->node_topics_, topic_name, this->qos_,
[this, signal_name, callback](const std::shared_ptr<MsgT> message) {
try {
callback(message);
} catch (const std::exception& ex) {
RCLCPP_WARN_STREAM_THROTTLE(
this->node_logging_->get_logger(), *this->node_clock_->get_clock(), 1000,
"Unhandled exception in callback of input '" << signal_name << "': " << ex.what());
}
});
auto subscription_interface =
std::make_shared<SubscriptionHandler<MsgT>>()->create_subscription_interface(subscription);
this->inputs_.insert_or_assign(parsed_signal_name, subscription_interface);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,13 @@ def __subscription_callback(
self.get_logger().error(f"Failed to execute user callback in subscription for attribute"
f" '{attribute_name}': {e}", throttle_duration_sec=1.0)

def __safe_callback(self, message: MsgT, signal_name: str, callback: Callable):
try:
callback(message)
except Exception as e:
self.get_logger().warn(f"Unhandled exception in callback of input '{
signal_name}': {e}", throttle_duration_sec=1.0)

def __declare_signal(self, signal_name: str, signal_type: str, default_topic="", fixed_topic=False):
"""
Declare an input to create the topic parameter without adding it to the map of inputs yet.
Expand Down Expand Up @@ -594,8 +601,8 @@ def add_input(self, signal_name: str, subscription: Union[str, Callable], messag
if user_callback:
self.get_logger().warn("Providing a callable for arguments 'subscription' and 'user_callback' is"
"not supported. The user callback will be ignored.")
self.__inputs[parsed_signal_name] = self.create_subscription(message_type, topic_name, subscription,
self.__qos)
self.__inputs[parsed_signal_name] = self.create_subscription(message_type, topic_name, partial(
self.__safe_callback, signal_name=parsed_signal_name, callback=subscription), self.__qos)
elif isinstance(subscription, str):
if callable(user_callback):
signature = inspect.signature(user_callback)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,32 @@ class MinimalCartesianInput : public ComponentT {
std::promise<void> received_;
};

template<class ComponentT>
class ExceptionCartesianInput : public ComponentT {
public:
ExceptionCartesianInput(const rclcpp::NodeOptions& node_options, const std::string& topic)
: ComponentT(node_options, "exception_cartesian_input"), thrown_(false) {
this->received_future = this->received_.get_future();
this->template add_input<modulo_core::EncodedState>(
"cartesian_state",
[this](const std::shared_ptr<modulo_core::EncodedState>) {
if (!this->thrown_) {
this->thrown_ = true;
throw std::runtime_error("Error");
} else {
this->received_.set_value();
}
},
topic);
}

std::shared_future<void> received_future;

private:
bool thrown_;
std::promise<void> received_;
};

template<class ComponentT>
class MinimalTwistOutput : public ComponentT {
public:
Expand Down
11 changes: 11 additions & 0 deletions source/modulo_components/test/cpp/test_component_communication.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,17 @@ TEST_F(ComponentCommunicationTest, InputOutput) {
EXPECT_THROW(output_node->publish(), modulo_core::exceptions::CoreException);
}

TEST_F(ComponentCommunicationTest, ExceptionInputOutput) {
auto cartesian_state = state_representation::CartesianState::Random("test");
auto input_node = std::make_shared<ExceptionCartesianInput<Component>>(rclcpp::NodeOptions(), "/topic");
auto output_node =
std::make_shared<MinimalCartesianOutput<Component>>(rclcpp::NodeOptions(), "/topic", cartesian_state, true);
this->exec_->add_node(input_node);
this->exec_->add_node(output_node);
auto return_code = this->exec_->spin_until_future_complete(input_node->received_future, 500ms);
ASSERT_EQ(return_code, rclcpp::FutureReturnCode::SUCCESS);
}

TEST_F(ComponentCommunicationTest, InputOutputManual) {
auto cartesian_state = state_representation::CartesianState::Random("test");
auto input_node = std::make_shared<MinimalCartesianInput<Component>>(rclcpp::NodeOptions(), "/topic");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,17 @@ TEST_F(LifecycleComponentCommunicationTest, InputOutput) {
EXPECT_THROW(output_node->publish(), modulo_core::exceptions::CoreException);
}

TEST_F(LifecycleComponentCommunicationTest, ExceptionInputOutput) {
auto cartesian_state = state_representation::CartesianState::Random("test");
auto input_node = std::make_shared<ExceptionCartesianInput<LifecycleComponent>>(rclcpp::NodeOptions(), "/topic");
auto output_node = std::make_shared<MinimalCartesianOutput<LifecycleComponent>>(
rclcpp::NodeOptions(), "/topic", cartesian_state, true);
add_configure_activate(this->exec_, input_node);
add_configure_activate(this->exec_, output_node);
auto return_code = this->exec_->spin_until_future_complete(input_node->received_future, 500ms);
ASSERT_EQ(return_code, rclcpp::FutureReturnCode::SUCCESS);
}

TEST_F(LifecycleComponentCommunicationTest, InputOutputManual) {
auto cartesian_state = state_representation::CartesianState::Random("test");
auto input_node = std::make_shared<MinimalCartesianInput<LifecycleComponent>>(rclcpp::NodeOptions(), "/topic");
Expand Down
21 changes: 21 additions & 0 deletions source/modulo_components/test/python/conftest.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import clproto
import pytest
import state_representation as sr
from types import MethodType
from modulo_components.component import Component
from modulo_core import EncodedState
from rclpy.task import Future
Expand Down Expand Up @@ -121,6 +122,26 @@ def _make_minimal_cartesian_input(component_type, topic):
yield _make_minimal_cartesian_input(request.param[0], request.param[1])


@pytest.fixture
def exception_cartesian_input(request):
def _make_exception_cartesian_input(component_type, topic):
def callback(self):
if not self.thrown:
self.thrown = True
raise RuntimeError()
else:
self.received_future.set_result(True)

component = component_type("exception_cartesian_input")
component.received_future = Future()
component.thrown = False
component.callback = MethodType(callback, component)
component.add_input("cartesian_pose", lambda msg: component.callback(), EncodedState, topic)
return component

yield _make_exception_cartesian_input(request.param[0], request.param[1])


@pytest.fixture
def minimal_sensor_input(request):
def _make_minimal_sensor_input(component_type, topic):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,15 @@ def test_input_output(ros_exec, random_pose, minimal_cartesian_output, minimal_c
minimal_cartesian_output.publish()


@pytest.mark.parametrize("exception_cartesian_input", [[Component, "/topic"]], indirect=True)
@pytest.mark.parametrize("minimal_cartesian_output", [[Component, "/topic", True]], indirect=True)
def test_exception_input_output(ros_exec, minimal_cartesian_output, exception_cartesian_input):
ros_exec.add_node(exception_cartesian_input)
ros_exec.add_node(minimal_cartesian_output)
ros_exec.spin_until_future_complete(exception_cartesian_input.received_future, timeout_sec=0.5)
assert exception_cartesian_input.received_future.done() and exception_cartesian_input.received_future.result()


@pytest.mark.parametrize("minimal_cartesian_input", [[Component, "/topic"]], indirect=True)
@pytest.mark.parametrize("minimal_cartesian_output", [[Component, "/topic", False]], indirect=True)
def test_input_output_manual(ros_exec, random_pose, minimal_cartesian_output, minimal_cartesian_input):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,24 @@ def test_input_output(ros_exec, make_lifecycle_change_client, random_pose, minim
minimal_cartesian_output.publish()


@pytest.mark.parametrize("exception_cartesian_input", [[LifecycleComponent, "/topic"]], indirect=True)
@pytest.mark.parametrize("minimal_cartesian_output", [[LifecycleComponent, "/topic", True]], indirect=True)
def test_exception_input_output(
ros_exec, make_lifecycle_change_client, minimal_cartesian_output, exception_cartesian_input):
input_change_client = make_lifecycle_change_client("exception_cartesian_input")
output_change_client = make_lifecycle_change_client("minimal_cartesian_output")
ros_exec.add_node(input_change_client)
ros_exec.add_node(output_change_client)
ros_exec.add_node(exception_cartesian_input)
ros_exec.add_node(minimal_cartesian_output)
input_change_client.configure(ros_exec)
output_change_client.configure(ros_exec)
input_change_client.activate(ros_exec)
output_change_client.activate(ros_exec)
ros_exec.spin_until_future_complete(exception_cartesian_input.received_future, timeout_sec=0.5)
assert exception_cartesian_input.received_future.done() and exception_cartesian_input.received_future.result()


@pytest.mark.parametrize("minimal_cartesian_input", [[LifecycleComponent, "/topic"]], indirect=True)
@pytest.mark.parametrize("minimal_cartesian_output", [[LifecycleComponent, "/topic", False]], indirect=True)
def test_input_output_manual(ros_exec, make_lifecycle_change_client, random_pose, minimal_cartesian_output,
Expand Down
Loading