Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 60 additions & 25 deletions rclcpp/test/rclcpp/test_parameter_event_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -471,14 +471,38 @@ TEST_F(TestNode, ConfigureNodesFilterAndCheckAddParameterEventCallback)
std::atomic_bool received_from_remote_node1{false};
std::atomic_bool received_from_remote_node2{false};

// The ParameterEventHandler creates its /parameter_events subscription (with no
// content filter) at construction time. When declare_parameter is later called on
// remote nodes, it internally triggers set_parameter which publishes a ParameterEvent
// with the parameter in the new_parameters field. That event arrives on the
// already-existing, unfiltered subscription and is stored in its queue.
// When configure_nodes_filter is subsequently called, it applies a content filter to
// the subscription, but events already sitting in the subscriber's queue are not
// affected. Per the DDS specification, a content-filtered topic filter applies at the
// point of data delivery/insertion into the reader's cache. Once data is in the cache,
// it has been "received." Retroactively purging it when the filter changes is not
// standard behavior.
// rmw_connextdds correctly honors this: only future incoming samples are filtered, so
// queued declaration events would still be delivered. rmw_fastrtps_cpp may retroactively
// apply filters to already-queued messages or deliver them lazily, causing those
// events to be silently discarded - which is arguably incorrect but happens to make
// naive tests pass.
// Only track changed_parameters to ignore queued declare_parameter events.
auto cb =
[&remote_node_name1, &remote_node_name2, &received_from_remote_node1,
&received_from_remote_node2]
[&remote_node_name1, &remote_node_name2,
&remote_node1_param_name, &remote_node2_param_name,
&received_from_remote_node1, &received_from_remote_node2]
(const rcl_interfaces::msg::ParameterEvent & parm) {
if (parm.node == "/" + remote_node_name1) {
received_from_remote_node1 = true;
} else if (parm.node == "/" + remote_node_name2) {
received_from_remote_node2 = true;
for (const auto & changed_parameter : parm.changed_parameters) {
if (parm.node == "/" + remote_node_name1 &&
changed_parameter.name == remote_node1_param_name)
{
received_from_remote_node1 = true;
} else if (parm.node == "/" + remote_node_name2 && // NOLINT(readability/braces)
changed_parameter.name == remote_node2_param_name)
{
received_from_remote_node2 = true;
}
}
};

Expand All @@ -505,7 +529,12 @@ TEST_F(TestNode, ConfigureNodesFilterAndCheckAddParameterEventCallback)
};

{
std::thread thread(wait_param_event, 1s, nullptr);
std::thread thread(
wait_param_event,
3s,
[&received_from_remote_node2]() {
return received_from_remote_node2.load();
});
std::this_thread::sleep_for(100ms);
remote_node1->set_parameter(rclcpp::Parameter(remote_node1_param_name, 20));
remote_node2->set_parameter(rclcpp::Parameter(remote_node2_param_name, "abc"));
Expand Down Expand Up @@ -562,32 +591,33 @@ TEST_F(TestNode, ConfigureNodesFilterAndCheckAddParameterCallback)
remote_node1->declare_parameter(remote_node1_param_name, 10);
remote_node2->declare_parameter(remote_node2_param_name, "Default");

// The ParameterEventHandler creates its /parameter_events subscription (with no
// content filter) at construction time. When declare_parameter is later called on
// remote nodes, it internally triggers set_parameter which publishes a ParameterEvent
// with the parameter in the new_parameters field. That event arrives on the
// already-existing, unfiltered subscription and is stored in its queue.
// Recreate the param_handler so its new subscription starts with an empty queue,
// avoiding interference from queued declare_parameter events.
param_handler.reset();
param_handler = std::make_shared<TestParameterEventHandler>(node);

std::atomic_bool received_from_remote_node1{false};
std::atomic_bool received_from_remote_node2{false};

auto cb_remote_node1 =
[&remote_node1_param_name, &received_from_remote_node1]
(const rclcpp::Parameter & parm) {
if (parm.get_name() == remote_node1_param_name) {
received_from_remote_node1 = true;
}
auto cb1 = [&received_from_remote_node1](const rclcpp::Parameter &) {
received_from_remote_node1 = true;
};

auto cb_remote_node2 =
[&remote_node2_param_name, &received_from_remote_node2]
(const rclcpp::Parameter & parm) {
if (parm.get_name() == remote_node2_param_name) {
received_from_remote_node2 = true;
}
auto cb2 = [&received_from_remote_node2](const rclcpp::Parameter &) {
received_from_remote_node2 = true;
};

// Configure to only receive parameter events from remote_node_name2
ASSERT_TRUE(param_handler->configure_nodes_filter({remote_node_name2}));

auto handler1 = param_handler->add_parameter_callback(
remote_node1_param_name, cb_remote_node1, remote_node_name1);
auto handler2 = param_handler->add_parameter_callback(
remote_node2_param_name, cb_remote_node2, remote_node_name2);
auto h1 = param_handler->add_parameter_callback(
remote_node1_param_name, cb1, "/" + remote_node_name1);
auto h2 = param_handler->add_parameter_callback(
remote_node2_param_name, cb2, "/" + remote_node_name2);

rclcpp::executors::SingleThreadedExecutor executor;
executor.add_node(node);
Expand All @@ -607,7 +637,12 @@ TEST_F(TestNode, ConfigureNodesFilterAndCheckAddParameterCallback)
};

{
std::thread thread(wait_param_event, 1s, nullptr);
std::thread thread(
wait_param_event,
3s,
[&received_from_remote_node2]() {
return received_from_remote_node2.load();
});
std::this_thread::sleep_for(100ms);
remote_node1->set_parameter(rclcpp::Parameter(remote_node1_param_name, 20));
remote_node2->set_parameter(rclcpp::Parameter(remote_node2_param_name, "abc"));
Expand Down