Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 29 additions & 3 deletions zenoh-plugin-ros2dds/src/dds_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
//
use std::{
collections::{BTreeSet, HashSet},
ffi::{CStr, CString},
mem::MaybeUninit,
sync::{atomic::AtomicI32, Arc},
Expand All @@ -22,7 +23,7 @@ use cyclors::{
qos::{History, HistoryKind, Qos},
*,
};
use serde::Serializer;
use serde::{ser::SerializeSeq, Serializer};
use tokio::task;

use crate::{
Expand All @@ -31,6 +32,23 @@ use crate::{
vec_into_raw_parts,
};

/// Serialize a `HashSet<(Gid, String)>` as a deduplicated array of the node fullnames,
/// dropping the participant GID. Keeps the admin-space JSON format unchanged after #702 fix.
pub fn serialize_local_nodes<S>(
set: &HashSet<(Gid, String)>,
serializer: S,
) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let dedup: BTreeSet<&str> = set.iter().map(|(_, name)| name.as_str()).collect();
let mut seq = serializer.serialize_seq(Some(dedup.len()))?;
for name in &dedup {
seq.serialize_element(name)?;
}
seq.end()
}

// An atomic dds_entity_t (=i32), for safe concurrent creation/deletion of DDS entities
pub type AtomicDDSEntity = AtomicI32;

Expand Down Expand Up @@ -137,7 +155,7 @@ pub unsafe fn create_topic(
let cton = CString::new(topic_name.to_owned()).unwrap().into_raw();
let ctyn = CString::new(type_name.to_owned()).unwrap().into_raw();

match type_info {
let topic = match type_info {
None => cdds_create_blob_topic(dp, cton, ctyn, keyless),
Some(type_info) => {
let mut descriptor: *mut dds_topic_descriptor_t = std::ptr::null_mut();
Expand All @@ -157,7 +175,13 @@ pub unsafe fn create_topic(
}
topic
}
}
};

// Reclaim the CStrings: cyclonedds copies them internally, so it is safe to free now.
drop(CString::from_raw(cton));
drop(CString::from_raw(ctyn));

topic
}

pub fn create_dds_writer(
Expand All @@ -175,6 +199,8 @@ pub fn create_dds_writer(
let qos_native = qos.to_qos_native();
let writer: i32 = dds_create_writer(dp, t, qos_native, std::ptr::null_mut());
Qos::delete_qos_native(qos_native);
drop(CString::from_raw(cton));
drop(CString::from_raw(ctyn));
if writer >= 0 {
Ok(writer)
} else {
Expand Down
53 changes: 27 additions & 26 deletions zenoh-plugin-ros2dds/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,41 +17,42 @@ use std::fmt::Display;
use cyclors::qos::Qos;
use zenoh::key_expr::OwnedKeyExpr;

use crate::node_info::*;
use crate::{gid::Gid, node_info::*};

/// A (local) discovery event of a ROS2 interface
/// A (local) discovery event of a ROS2 interface.
/// First Gid is the participant GID owning the node — used to disambiguate same-named nodes across restarts (#702).
#[derive(Debug)]
pub enum ROS2DiscoveryEvent {
DiscoveredMsgPub(String, MsgPub),
UndiscoveredMsgPub(String, MsgPub),
DiscoveredMsgSub(String, MsgSub),
UndiscoveredMsgSub(String, MsgSub),
DiscoveredServiceSrv(String, ServiceSrv),
UndiscoveredServiceSrv(String, ServiceSrv),
DiscoveredServiceCli(String, ServiceCli),
UndiscoveredServiceCli(String, ServiceCli),
DiscoveredActionSrv(String, ActionSrv),
UndiscoveredActionSrv(String, ActionSrv),
DiscoveredActionCli(String, ActionCli),
UndiscoveredActionCli(String, ActionCli),
DiscoveredMsgPub(Gid, String, MsgPub),
UndiscoveredMsgPub(Gid, String, MsgPub),
DiscoveredMsgSub(Gid, String, MsgSub),
UndiscoveredMsgSub(Gid, String, MsgSub),
DiscoveredServiceSrv(Gid, String, ServiceSrv),
UndiscoveredServiceSrv(Gid, String, ServiceSrv),
DiscoveredServiceCli(Gid, String, ServiceCli),
UndiscoveredServiceCli(Gid, String, ServiceCli),
DiscoveredActionSrv(Gid, String, ActionSrv),
UndiscoveredActionSrv(Gid, String, ActionSrv),
DiscoveredActionCli(Gid, String, ActionCli),
UndiscoveredActionCli(Gid, String, ActionCli),
}

impl std::fmt::Display for ROS2DiscoveryEvent {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
use ROS2DiscoveryEvent::*;
match self {
DiscoveredMsgPub(node, iface) => write!(f, "Node {node} declares {iface}"),
DiscoveredMsgSub(node, iface) => write!(f, "Node {node} declares {iface}"),
DiscoveredServiceSrv(node, iface) => write!(f, "Node {node} declares {iface}"),
DiscoveredServiceCli(node, iface) => write!(f, "Node {node} declares {iface}"),
DiscoveredActionSrv(node, iface) => write!(f, "Node {node} declares {iface}"),
DiscoveredActionCli(node, iface) => write!(f, "Node {node} declares {iface}"),
UndiscoveredMsgPub(node, iface) => write!(f, "Node {node} undeclares {iface}"),
UndiscoveredMsgSub(node, iface) => write!(f, "Node {node} undeclares {iface}"),
UndiscoveredServiceSrv(node, iface) => write!(f, "Node {node} undeclares {iface}"),
UndiscoveredServiceCli(node, iface) => write!(f, "Node {node} undeclares {iface}"),
UndiscoveredActionSrv(node, iface) => write!(f, "Node {node} undeclares {iface}"),
UndiscoveredActionCli(node, iface) => write!(f, "Node {node} undeclares {iface}"),
DiscoveredMsgPub(_, node, iface) => write!(f, "Node {node} declares {iface}"),
DiscoveredMsgSub(_, node, iface) => write!(f, "Node {node} declares {iface}"),
DiscoveredServiceSrv(_, node, iface) => write!(f, "Node {node} declares {iface}"),
DiscoveredServiceCli(_, node, iface) => write!(f, "Node {node} declares {iface}"),
DiscoveredActionSrv(_, node, iface) => write!(f, "Node {node} declares {iface}"),
DiscoveredActionCli(_, node, iface) => write!(f, "Node {node} declares {iface}"),
UndiscoveredMsgPub(_, node, iface) => write!(f, "Node {node} undeclares {iface}"),
UndiscoveredMsgSub(_, node, iface) => write!(f, "Node {node} undeclares {iface}"),
UndiscoveredServiceSrv(_, node, iface) => write!(f, "Node {node} undeclares {iface}"),
UndiscoveredServiceCli(_, node, iface) => write!(f, "Node {node} undeclares {iface}"),
UndiscoveredActionSrv(_, node, iface) => write!(f, "Node {node} undeclares {iface}"),
UndiscoveredActionCli(_, node, iface) => write!(f, "Node {node} undeclares {iface}"),
}
}
}
Expand Down
12 changes: 6 additions & 6 deletions zenoh-plugin-ros2dds/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -666,22 +666,22 @@ impl ROS2PluginRuntime {
if let Some(allowance) = &self.config.allowance {
use ROS2DiscoveryEvent::*;
match evt {
DiscoveredMsgPub(_, iface) | UndiscoveredMsgPub(_, iface) => {
DiscoveredMsgPub(_, _, iface) | UndiscoveredMsgPub(_, _, iface) => {
allowance.is_publisher_allowed(&iface.name)
}
DiscoveredMsgSub(_, iface) | UndiscoveredMsgSub(_, iface) => {
DiscoveredMsgSub(_, _, iface) | UndiscoveredMsgSub(_, _, iface) => {
allowance.is_subscriber_allowed(&iface.name)
}
DiscoveredServiceSrv(_, iface) | UndiscoveredServiceSrv(_, iface) => {
DiscoveredServiceSrv(_, _, iface) | UndiscoveredServiceSrv(_, _, iface) => {
allowance.is_service_srv_allowed(&iface.name)
}
DiscoveredServiceCli(_, iface) | UndiscoveredServiceCli(_, iface) => {
DiscoveredServiceCli(_, _, iface) | UndiscoveredServiceCli(_, _, iface) => {
allowance.is_service_cli_allowed(&iface.name)
}
DiscoveredActionSrv(_, iface) | UndiscoveredActionSrv(_, iface) => {
DiscoveredActionSrv(_, _, iface) | UndiscoveredActionSrv(_, _, iface) => {
allowance.is_action_srv_allowed(&iface.name)
}
DiscoveredActionCli(_, iface) | UndiscoveredActionCli(_, iface) => {
DiscoveredActionCli(_, _, iface) | UndiscoveredActionCli(_, _, iface) => {
allowance.is_action_cli_allowed(&iface.name)
}
}
Expand Down
Loading