Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions .github/workflows/coverage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,12 @@ jobs:
matrix:
ros_distro: [kilted]
component:
# Add more entries here when enabling coverage for more modules.
# Example:
# - name: mavros
# packages: mavros
# source_filter: ".*/mavros/.*"
- name: libmavconn
packages: libmavconn
source_filter: ".*/libmavconn/.*"
- name: mavros
packages: mavros
source_filter: ".*/mavros/.*"

env:
ROS_DISTRO: ${{ matrix.ros_distro }}
Expand Down
43 changes: 35 additions & 8 deletions mavros/include/mavros/mavros_router.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include <Eigen/Eigen> // NOLINT

#include "mavconn/interface.hpp"
#include "mavconn/io_context_runner.hpp"
#include "mavconn/mavlink_dialect.hpp"
#include "mavros/utils.hpp"
#include "rclcpp/macros.hpp"
Expand All @@ -53,7 +54,6 @@ using mavconn::Framing;
using ::mavlink::mavlink_message_t;
using ::mavlink::msgid_t;

using namespace std::placeholders; // NOLINT
using namespace std::chrono_literals; // NOLINT

class Router;
Expand Down Expand Up @@ -146,6 +146,7 @@ class Router : public rclcpp::Node
const std::string & node_name = "mavros_router")
: rclcpp::Node(node_name,
options /* rclcpp::NodeOptions(options).use_intra_process_comms(true) */),
router_io_runner(),
endpoints{}, stat_msg_routed(0), stat_msg_sent(0), stat_msg_dropped(0),
diagnostic_updater(this, 1.0)
{
Expand All @@ -157,18 +158,28 @@ class Router : public rclcpp::Node

add_service = this->create_service<mavros_msgs::srv::EndpointAdd>(
"~/add_endpoint",
std::bind(&Router::add_endpoint, this, _1, _2));
[this](
const mavros_msgs::srv::EndpointAdd::Request::SharedPtr request,
mavros_msgs::srv::EndpointAdd::Response::SharedPtr response)
{
this->add_endpoint(request, response);
});
del_service = this->create_service<mavros_msgs::srv::EndpointDel>(
"~/del_endpoint",
std::bind(&Router::del_endpoint, this, _1, _2));
[this](
const mavros_msgs::srv::EndpointDel::Request::SharedPtr request,
mavros_msgs::srv::EndpointDel::Response::SharedPtr response)
{
this->del_endpoint(request, response);
});

// try to reconnect endpoint each 30 seconds
reconnect_timer =
this->create_wall_timer(30s, std::bind(&Router::periodic_reconnect_endpoints, this));
this->create_wall_timer(30s, [this]() {this->periodic_reconnect_endpoints();});

// collect garbage addrs each minute
stale_addrs_timer =
this->create_wall_timer(60s, std::bind(&Router::periodic_clear_stale_remote_addrs, this));
this->create_wall_timer(60s, [this]() {this->periodic_clear_stale_remote_addrs();});

diagnostic_updater.setHardwareID("none"); // NOTE: router connects several hardwares
diagnostic_updater.add("MAVROS Router", this, &Router::diag_run);
Expand All @@ -183,6 +194,8 @@ class Router : public rclcpp::Node
RCLCPP_INFO(get_logger(), "Known MAVLink dialects:%s", ss.str().c_str());
RCLCPP_INFO(get_logger(), "MAVROS Router started");

router_io_runner.start([this]() {this->router_io_runner.io().run();});

// Delay parameter callback initialization because
// add/del endpoints calls have to use shared_from_this(),
// which cannot be used before we leave the constructor.
Expand All @@ -193,15 +206,26 @@ class Router : public rclcpp::Node
});
}

~Router() override
{
router_io_runner.shutdown_owned();
}

void route_message(Endpoint::SharedPtr src, const mavlink_message_t * msg, const Framing framing);

[[nodiscard]] asio::io_service * get_shared_io()
{
return &router_io_runner.io();
}

private:
friend class Endpoint;
friend class TestRouter;

static std::atomic<id_t> id_counter;
mavconn::IoContextRunner router_io_runner;

std::shared_timed_mutex mu;
std::shared_mutex mu;

// map stores all routing endpoints
std::unordered_map<id_t, Endpoint::SharedPtr> endpoints;
Expand Down Expand Up @@ -232,15 +256,18 @@ class Router : public rclcpp::Node
void param_init()
{
set_parameters_handle_ptr =
this->add_on_set_parameters_callback(std::bind(&Router::on_set_parameters_cb, this, _1));
this->add_on_set_parameters_callback(
[this](const std::vector<rclcpp::Parameter> & parameters) {
return this->on_set_parameters_cb(parameters);
});

auto params = get_parameters({"fcu_urls", "gcs_urls", "uas_urls"});
on_set_parameters_cb(params);
}

void param_init_once()
{
std::call_once(param_init_flag, std::bind(&Router::param_init, this));
std::call_once(param_init_flag, &Router::param_init, this);
}

rcl_interfaces::msg::SetParametersResult on_set_parameters_cb(
Expand Down
137 changes: 89 additions & 48 deletions mavros/src/lib/mavros_router.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
using namespace mavros::router; // NOLINT
using rclcpp::QoS;

using unique_lock = std::unique_lock<std::shared_timed_mutex>;
using shared_lock = std::shared_lock<std::shared_timed_mutex>;
using unique_lock = std::unique_lock<std::shared_mutex>;
using shared_lock = std::shared_lock<std::shared_mutex>;

std::atomic<id_t> Router::id_counter {1000};

Expand All @@ -37,7 +37,6 @@ void Router::route_message(
Endpoint::SharedPtr src, const mavlink_message_t * msg,
const Framing framing)
{
shared_lock lock(mu);
this->stat_msg_routed++;

// find message destination target
Expand All @@ -52,35 +51,43 @@ void Router::route_message(
}
}

size_t sent_cnt = 0, retry_cnt = 0;
retry:
for (auto & kv : this->endpoints) {
auto & dest = kv.second;
auto collect_targets = [this, &src](addr_t addr) {
std::vector<Endpoint::SharedPtr> targets;
shared_lock lock(mu);
targets.reserve(this->endpoints.size());

if (src->id == dest->id) {
continue; // do not echo message
}
if (src->link_type == dest->link_type) {
continue; // drop messages between same type FCU/GCS/UAS
}
for (const auto & kv : this->endpoints) {
const auto & dest = kv.second;

if (src->id == dest->id) {
continue; // do not echo message
}
if (src->link_type == dest->link_type) {
continue; // drop messages between same type FCU/GCS/UAS
}

// NOTE(vooon): current router do not allow to speak drone-to-drone.
// if it is needed perhaps better to add mavlink-router in front of mavros-router.
// NOTE(vooon): current router do not allow to speak drone-to-drone.
// if needed, perhaps better to add mavlink-router in front of
// mavros-router.
if (dest->remote_addrs.find(addr) != dest->remote_addrs.end()) {
targets.emplace_back(dest);
}
}

bool has_target = dest->remote_addrs.find(target_addr) != dest->remote_addrs.end();
return targets;
};

if (has_target) {
dest->send_message(msg, framing, src->id);
sent_cnt++;
}
auto targets = collect_targets(target_addr);
if (targets.empty() && target_addr != 0) {
// if targeted message hasn't been sent, retry as broadcast
target_addr = 0;
targets = collect_targets(target_addr);
}

// if message haven't been sent retry broadcast it
if (sent_cnt == 0 && retry_cnt < 2) {
target_addr = 0;
retry_cnt++;
goto retry;
for (const auto & dest : targets) {
dest->send_message(msg, framing, src->id);
}
const auto sent_cnt = targets.size();

// update stats
this->stat_msg_sent.fetch_add(sent_cnt);
Expand All @@ -102,7 +109,6 @@ void Router::add_endpoint(
const mavros_msgs::srv::EndpointAdd::Request::SharedPtr request,
mavros_msgs::srv::EndpointAdd::Response::SharedPtr response)
{
unique_lock lock(mu);
auto lg = get_logger();

RCLCPP_INFO(
Expand Down Expand Up @@ -133,8 +139,15 @@ void Router::add_endpoint(
ep->link_type = static_cast<Endpoint::Type>(request->type);
ep->url = request->url;

this->endpoints[id] = ep;
this->diagnostic_updater.add(ep->diag_name(), std::bind(&Endpoint::diag_run, ep, _1));
{
unique_lock lock(mu);
this->endpoints[id] = ep;
}
this->diagnostic_updater.add(
ep->diag_name(),
[ep](diagnostic_updater::DiagnosticStatusWrapper & stat) {
ep->diag_run(stat);
});
RCLCPP_INFO(lg, "Endpoint link[%d] created", id);

auto result = ep->open();
Expand All @@ -153,35 +166,50 @@ void Router::del_endpoint(
const mavros_msgs::srv::EndpointDel::Request::SharedPtr request,
mavros_msgs::srv::EndpointDel::Response::SharedPtr response)
{
unique_lock lock(mu);
auto lg = get_logger();
Endpoint::SharedPtr endpoint_to_remove;
std::string endpoint_diag_name;

if (request->id != 0) {
RCLCPP_INFO(lg, "Requested to del endpoint id: %d", request->id);
auto it = this->endpoints.find(request->id);
if (it != this->endpoints.end() ) {
it->second->close();
this->diagnostic_updater.removeByName(it->second->diag_name());
this->endpoints.erase(it);
response->successful = true;
{
unique_lock lock(mu);
auto it = this->endpoints.find(request->id);
if (it != this->endpoints.end() ) {
endpoint_to_remove = it->second;
endpoint_diag_name = it->second->diag_name();
this->endpoints.erase(it);
response->successful = true;
}
}
if (endpoint_to_remove) {
endpoint_to_remove->close();
this->diagnostic_updater.removeByName(endpoint_diag_name);
}
return;
}

RCLCPP_INFO(
lg, "Requested to del endpoint type: %d url: %s", request->type,
request->url.c_str());
for (auto it = this->endpoints.cbegin(); it != this->endpoints.cend(); it++) {
if (it->second->url == request->url &&
it->second->link_type == static_cast<Endpoint::Type>( request->type))
{
it->second->close();
this->diagnostic_updater.removeByName(it->second->diag_name());
this->endpoints.erase(it);
response->successful = true;
return;
{
unique_lock lock(mu);
for (auto it = this->endpoints.cbegin(); it != this->endpoints.cend(); it++) {
if (it->second->url == request->url &&
it->second->link_type == static_cast<Endpoint::Type>(request->type))
{
endpoint_to_remove = it->second;
endpoint_diag_name = it->second->diag_name();
this->endpoints.erase(it);
response->successful = true;
break;
}
}
}
if (endpoint_to_remove) {
endpoint_to_remove->close();
this->diagnostic_updater.removeByName(endpoint_diag_name);
}
}

rcl_interfaces::msg::SetParametersResult Router::on_set_parameters_cb(
Expand Down Expand Up @@ -376,11 +404,22 @@ bool MAVConnEndpoint::is_open()

std::pair<bool, std::string> MAVConnEndpoint::open()
{
auto nh = this->parent;
if (!nh) {
return {false, "parent not set"};
}

try {
auto weak_self = weak_from_this();
this->link = mavconn::MAVConnInterface::open_url(
this->url, 1, mavconn::MAV_COMP_ID_UDP_BRIDGE, std::bind(
&MAVConnEndpoint::recv_message,
shared_from_this(), _1, _2));
this->url, 1, mavconn::MAV_COMP_ID_UDP_BRIDGE,
[weak_self](const mavlink_message_t * msg, const Framing framing) {
if (auto self = weak_self.lock()) {
self->recv_message(msg, framing);
}
},
mavconn::MAVConnInterface::ClosedCb(),
nh->get_shared_io());
} catch (mavconn::DeviceError & ex) {
return {false, ex.what()};
}
Expand Down Expand Up @@ -478,7 +517,9 @@ std::pair<bool, std::string> ROSEndpoint::open()
"mavlink_source"), qos);
this->sink = nh->create_subscription<mavros_msgs::msg::Mavlink>(
utils::format("%s/%s", this->url.c_str(), "mavlink_sink"), qos,
std::bind(&ROSEndpoint::ros_recv_message, this, _1));
[this](const mavros_msgs::msg::Mavlink::SharedPtr rmsg) {
this->ros_recv_message(rmsg);
});
} catch (rclcpp::exceptions::InvalidTopicNameError & ex) {
return {false, ex.what()};
}
Expand Down
Loading
Loading