Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ repos:
rev: 1.6.1
hooks:
- id: copyright-required
stages: [pre-commit]
exclude: '(^\.git/|(\.ini|\.json|\.service|__init__\.py|\.md|\.gitkeep|\.conf|LICENSE|\.toml|\.template|\.style\..*|\.gitattributes|\.gitignore|\.editorconfig|\.bash-completion|\.install|\.links|changelog|debian/source/format|.codespellrc|copyright|ament_code_style\.cfg|test_pep257\.py|test_flake8\.py|test_copyright\.py)$)'
- repo: local
hooks:
Expand Down
11 changes: 11 additions & 0 deletions greenwave_monitor/include/greenwave_monitor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
#include <chrono>
#include <map>
#include <memory>
#include <mutex>
#include <optional>
#include <set>
#include <string>
#include <thread>
#include <vector>
Expand Down Expand Up @@ -48,6 +50,9 @@ class GreenwaveMonitor : public rclcpp::Node
if (init_timer_) {
init_timer_->cancel();
}
// Reset diagnostics subscription before clearing internal state to prevent
// callbacks from firing after greenwave_diagnostics_ is destroyed
diagnostics_subscription_.reset();
// Clear diagnostics before base Node destructor runs to avoid accessing invalid node state
greenwave_diagnostics_.clear();
subscriptions_.clear();
Expand Down Expand Up @@ -88,6 +93,8 @@ class GreenwaveMonitor : public rclcpp::Node

void add_topics_from_parameters();

void diagnostics_callback(const diagnostic_msgs::msg::DiagnosticArray::SharedPtr msg);

std::map<std::string,
std::unique_ptr<greenwave_diagnostics::GreenwaveDiagnostics>> greenwave_diagnostics_;
std::vector<std::shared_ptr<rclcpp::GenericSubscription>> subscriptions_;
Expand All @@ -98,4 +105,8 @@ class GreenwaveMonitor : public rclcpp::Node
rclcpp::Service<greenwave_monitor_interfaces::srv::SetExpectedFrequency>::SharedPtr
set_expected_frequency_service_;
greenwave_diagnostics::TimeCheckPreset time_check_preset_;
rclcpp::Subscription<diagnostic_msgs::msg::DiagnosticArray>::SharedPtr
diagnostics_subscription_;
std::set<std::string> externally_diagnosed_topics_;
std::mutex externally_diagnosed_topics_mutex_;
};
36 changes: 36 additions & 0 deletions greenwave_monitor/src/greenwave_monitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,14 @@ GreenwaveMonitor::GreenwaveMonitor(const rclcpp::NodeOptions & options)
timer_ = this->create_wall_timer(
1s, std::bind(&GreenwaveMonitor::timer_callback, this));

// Subscribe to /diagnostics early so we can detect external publishers before
// deferred_init() adds topics. This gives us the best chance of catching
// externally-published diagnostics before add_topic() is called.
diagnostics_subscription_ =
this->create_subscription<diagnostic_msgs::msg::DiagnosticArray>(
"/diagnostics", 10,
std::bind(&GreenwaveMonitor::diagnostics_callback, this, std::placeholders::_1));

// Defer topic discovery to allow the ROS graph to settle before querying other nodes
init_timer_ = this->create_wall_timer(
100ms, [this]() {
Expand Down Expand Up @@ -170,6 +178,19 @@ void GreenwaveMonitor::timer_callback()
RCLCPP_INFO(this->get_logger(), "====================================================");
}

void GreenwaveMonitor::diagnostics_callback(
const diagnostic_msgs::msg::DiagnosticArray::SharedPtr msg)
{
std::lock_guard<std::mutex> lock(externally_diagnosed_topics_mutex_);
for (const auto & status : msg->status) {
// Only track topic names that are not already monitored by us. This prevents
// our own published diagnostics from blocking re-adds after a remove_topic().
if (greenwave_diagnostics_.find(status.name) == greenwave_diagnostics_.end()) {
externally_diagnosed_topics_.insert(status.name);
}
}
}

void GreenwaveMonitor::handle_manage_topic(
const std::shared_ptr<greenwave_monitor_interfaces::srv::ManageTopic::Request> request,
std::shared_ptr<greenwave_monitor_interfaces::srv::ManageTopic::Response> response)
Expand Down Expand Up @@ -325,6 +346,21 @@ bool GreenwaveMonitor::add_topic(
return false;
}

// Check if an external node is already publishing diagnostics for this topic.
// Adding a duplicate would create redundant and potentially conflicting diagnostics.
{
std::lock_guard<std::mutex> lock(externally_diagnosed_topics_mutex_);
if (externally_diagnosed_topics_.count(topic) > 0) {
message = "Topic '" + topic +
"' already has diagnostics published externally on /diagnostics";
RCLCPP_ERROR(
this->get_logger(),
"Refusing to add topic '%s': diagnostics already published externally on /diagnostics",
topic.c_str());
return false;
}
}

RCLCPP_INFO(this->get_logger(), "Adding subscription for topic '%s'", topic.c_str());

auto maybe_type = find_topic_type(topic, max_retries, retry_wait_s);
Expand Down
47 changes: 47 additions & 0 deletions greenwave_monitor/test/test_greenwave_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import time
import unittest

from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus

from greenwave_monitor.test_utils import (
call_manage_topic_service,
collect_diagnostics_for_topic,
Expand Down Expand Up @@ -415,6 +417,51 @@ def test_yaml_parameter_loading(self, expected_frequency, message_type, toleranc
f'Topic {NONEXISTENT_TOPIC} should not be monitored'
)

def test_reject_externally_diagnosed_topic(
self, expected_frequency, message_type, tolerance_hz):
"""Test that add_topic() fails when the topic already has external diagnostics.

Verifies the fix for issue #23: Greenwave Monitor should not create a duplicate
diagnostics object when an external node is already publishing diagnostics for
the same topic on /diagnostics.
"""
if (message_type, expected_frequency, tolerance_hz) != MANAGE_TOPIC_TEST_CONFIG:
self.skipTest('Only running external diagnostics test once')

service_client = self.check_node_launches_successfully()

external_topic = '/external_diag_topic'

# Publish several DiagnosticArray messages for the external topic so the
# Greenwave Monitor's /diagnostics subscriber can detect them.
diag_pub = self.test_node.create_publisher(DiagnosticArray, '/diagnostics', 10)
try:
end_time = time.time() + 3.0
while time.time() < end_time:
msg = DiagnosticArray()
status = DiagnosticStatus()
status.name = external_topic
status.level = DiagnosticStatus.OK
status.message = 'External publisher'
msg.status = [status]
diag_pub.publish(msg)
rclpy.spin_once(self.test_node, timeout_sec=0.1)

# Greenwave Monitor should refuse to add the topic because an external
# node is already publishing diagnostics for it.
response = self.call_manage_topic(
add=True, topic=external_topic, service_client=service_client)
self.assertFalse(
response.success,
'add_topic() should fail when external diagnostics already exist for the topic'
)
self.assertIn(
'external', response.message.lower(),
'Error message should mention external diagnostics'
)
finally:
self.test_node.destroy_publisher(diag_pub)


if __name__ == '__main__':
unittest.main()
Loading