Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions mavros/include/mavros/mavros_router.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include <array>
#include <memory>
#include <mutex>
#include <set>
#include <string>
#include <shared_mutex> // NOLINT
Expand Down Expand Up @@ -94,11 +95,12 @@ class Endpoint : public std::enable_shared_from_this<Endpoint>

std::shared_ptr<Router> parent;

uint32_t id; // id of the endpoint
Type link_type; // class of the endpoint
std::string url; // url to open that endpoint
std::set<addr_t> remote_addrs; // remotes that we heard there
std::set<addr_t> stale_addrs; // temporary storage for stale remote addrs
uint32_t id; // id of the endpoint
Type link_type; // class of the endpoint
std::string url; // url to open that endpoint
std::shared_mutex remote_addrs_mutex; // guards remote_addrs and stale_addrs
std::set<addr_t> remote_addrs; // remotes that we heard there
std::set<addr_t> stale_addrs; // temporary storage for stale remote addrs

virtual bool is_open() = 0;
virtual std::pair<bool, std::string> open() = 0;
Expand Down
49 changes: 37 additions & 12 deletions mavros/src/lib/mavros_router.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,11 @@ void Router::route_message(
// NOTE(vooon): current router do not allow to speak drone-to-drone.
// if it is needed perhaps better to add mavlink-router in front of mavros-router.

bool has_target = dest->remote_addrs.find(target_addr) != dest->remote_addrs.end();
bool has_target;
{
std::shared_lock<std::shared_mutex> lock(dest->remote_addrs_mutex);
has_target = dest->remote_addrs.find(target_addr) != dest->remote_addrs.end();
}

if (has_target) {
dest->send_message(msg, framing, src->id);
Expand Down Expand Up @@ -297,6 +301,7 @@ void Router::periodic_clear_stale_remote_addrs()
RCLCPP_DEBUG(lg, "clear stale remotes");
for (auto & kv : this->endpoints) {
auto & p = kv.second;
std::unique_lock<std::shared_mutex> endpoint_lock(p->remote_addrs_mutex);

// Step 1: remove any stale addrs that still there
// (hadn't been removed by Endpoint::recv_message())
Expand Down Expand Up @@ -342,16 +347,22 @@ void Endpoint::recv_message(const mavlink_message_t * msg, const Framing framing
const addr_t sysid_addr = msg->sysid << 8;
const addr_t sysid_compid_addr = (msg->sysid << 8) | msg->compid;

// save source addr to remote_addrs
auto sp = this->remote_addrs.emplace(sysid_addr);
auto scp = this->remote_addrs.emplace(sysid_compid_addr);
bool new_sysid_addr;
bool new_sysid_compid_addr;
{
std::unique_lock<std::shared_mutex> lock(this->remote_addrs_mutex);

// and delete it from stale_addrs
this->stale_addrs.erase(sysid_addr);
this->stale_addrs.erase(sysid_compid_addr);
// save source addr to remote_addrs
new_sysid_addr = this->remote_addrs.emplace(sysid_addr).second;
new_sysid_compid_addr = this->remote_addrs.emplace(sysid_compid_addr).second;

// and delete it from stale_addrs
this->stale_addrs.erase(sysid_addr);
this->stale_addrs.erase(sysid_compid_addr);
}

auto & nh = this->parent;
if (sp.second || scp.second) {
if (new_sysid_addr || new_sysid_compid_addr) {
RCLCPP_INFO(
nh->get_logger(), "link[%d] detected remote address %d.%d", this->id, msg->sysid,
msg->compid);
Expand Down Expand Up @@ -439,9 +450,16 @@ void MAVConnEndpoint::diag_run(diagnostic_updater::DiagnosticStatusWrapper & sta
stat.addf("Rx speed", "%f", iostat.rx_speed);
stat.addf("Tx speed", "%f", iostat.tx_speed);

stat.addf("Remotes count", "%zu", this->remote_addrs.size());
std::set<addr_t> remote_addrs_copy;
{
std::shared_lock<std::shared_mutex> lock(this->remote_addrs_mutex);
remote_addrs_copy = this->remote_addrs;
}

stat.addf("Remotes count", "%zu", remote_addrs_copy.size());

size_t idx = 0;
for (auto addr : this->remote_addrs) {
for (auto addr : remote_addrs_copy) {
stat.addf(utils::format("Remote [%d]", idx++), "%d.%d", addr >> 8, addr & 0xff);
}

Expand Down Expand Up @@ -534,9 +552,16 @@ void ROSEndpoint::diag_run(diagnostic_updater::DiagnosticStatusWrapper & stat)
{
// TODO(vooon): make some diagnostics

stat.addf("Remotes count", "%zu", this->remote_addrs.size());
std::set<addr_t> remote_addrs_copy;
{
std::shared_lock<std::shared_mutex> lock(this->remote_addrs_mutex);
remote_addrs_copy = this->remote_addrs;
}

stat.addf("Remotes count", "%zu", remote_addrs_copy.size());

size_t idx = 0;
for (auto addr : this->remote_addrs) {
for (auto addr : remote_addrs_copy) {
stat.addf(utils::format("Remote [%d]", idx++), "%d.%d", addr >> 8, addr & 0xff);
}

Expand Down
Loading