From 26a8533c67f9da6995e120f2003064754774e412 Mon Sep 17 00:00:00 2001 From: shikinamiasuka Date: Fri, 29 May 2026 00:54:52 +0800 Subject: [PATCH 1/2] fix(#703): release CString memory after DDS topic and writer creation create_topic and create_dds_writer leak two CString allocations per call: the topic name and the type name are converted via CString::into_raw and handed to cyclonedds (which copies them internally), but never reclaimed. Under workloads that churn DDS readers/writers (e.g. publisher subscribers that re-create their endpoints on a short cycle), this accumulates over hours and eventually exhausts entity slots in the underlying DDS layer ("Error creating DDS Reader: Bad Parameter"). Mirrors the cleanup pattern already used in ros_discovery.rs. Closes part of #703. --- zenoh-plugin-ros2dds/src/dds_utils.rs | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/zenoh-plugin-ros2dds/src/dds_utils.rs b/zenoh-plugin-ros2dds/src/dds_utils.rs index 10d3f7e7..7691aaca 100644 --- a/zenoh-plugin-ros2dds/src/dds_utils.rs +++ b/zenoh-plugin-ros2dds/src/dds_utils.rs @@ -137,7 +137,7 @@ pub unsafe fn create_topic( let cton = CString::new(topic_name.to_owned()).unwrap().into_raw(); let ctyn = CString::new(type_name.to_owned()).unwrap().into_raw(); - match type_info { + let topic = match type_info { None => cdds_create_blob_topic(dp, cton, ctyn, keyless), Some(type_info) => { let mut descriptor: *mut dds_topic_descriptor_t = std::ptr::null_mut(); @@ -157,7 +157,13 @@ pub unsafe fn create_topic( } topic } - } + }; + + // Reclaim the CStrings: cyclonedds copies them internally, so it is safe to free now. + drop(CString::from_raw(cton)); + drop(CString::from_raw(ctyn)); + + topic } pub fn create_dds_writer( @@ -175,6 +181,8 @@ pub fn create_dds_writer( let qos_native = qos.to_qos_native(); let writer: i32 = dds_create_writer(dp, t, qos_native, std::ptr::null_mut()); Qos::delete_qos_native(qos_native); + drop(CString::from_raw(cton)); + drop(CString::from_raw(ctyn)); if writer >= 0 { Ok(writer) } else { From 33e0078eb42aae13510836947467ecc46a5df9eb Mon Sep 17 00:00:00 2001 From: shikinamiasuka Date: Fri, 29 May 2026 00:56:58 +0800 Subject: [PATCH 2/2] fix(#702): key local_nodes by (participant_gid, node_fullname) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A zenoh<->DDS route can get stuck `is_active: false` with an empty `local_nodes` after a ROS 2 node restarts under the same name with a new participant GID. The local DDS subscriber is matched, the bridge's own ROS graph shows the subscription, but `local_nodes` desyncs and the route never reactivates until a brand-new (differently named) subscriber appears. Root cause: every route's `local_nodes` was a `HashSet` keyed on the node fullname alone. When a node restarted with the same name but a new participant GID, a late-arriving DISPOSE for the old participant's reader/writer would clear the entry that the new participant's ADD had just re-inserted, since both events used the same string key. The admin space at `ros2/node//...` remained consistent because it was already keyed by participant GID; only `local_nodes` lost the race. Fix: - Carry the participant GID through every `ROS2DiscoveryEvent` variant. - Add a `participant: Gid` field on `NodeInfo` and include it in every emitted Discovered*/Undiscovered* event. - Change `local_nodes` to `HashSet<(Gid, String)>` in every route type (subscriber, publisher, service srv/cli, action srv/cli) and update add_local_node/remove_local_node to use the tuple. - Preserve the admin-space JSON format with a custom serializer that emits a deduplicated array of node fullname strings, hiding the internal participant GID. Also fix a secondary issue in `RouteSubscriber::add_local_node`: it ran `len() == 1` after an unconditional `insert`, so re-inserting an existing key (which is common under DISPOSE-then-ADD races) would incorrectly re-run `announce_route`. Now mirrors RoutePublisher's correct pattern (only fire on the 0→1 transition). Side effect: in scenarios where a same-named subscriber on one side created an ADD/DISPOSE race that was previously corrupting cyclonedds state (and triggering a flood of "failed to activate DDS Reader: Bad Parameter" errors on the publisher side), the death loop also disappears once `local_nodes` is keyed correctly. Closes #702. Mitigates the matching-listener Bad-Parameter storm in #703 indirectly. --- zenoh-plugin-ros2dds/src/dds_utils.rs | 20 ++- zenoh-plugin-ros2dds/src/events.rs | 53 ++++---- zenoh-plugin-ros2dds/src/lib.rs | 12 +- zenoh-plugin-ros2dds/src/node_info.rs | 122 +++++++++--------- zenoh-plugin-ros2dds/src/route_action_cli.rs | 54 ++++---- zenoh-plugin-ros2dds/src/route_action_srv.rs | 53 ++++---- zenoh-plugin-ros2dds/src/route_publisher.rs | 21 ++- zenoh-plugin-ros2dds/src/route_service_cli.rs | 32 +++-- zenoh-plugin-ros2dds/src/route_service_srv.rs | 33 ++--- zenoh-plugin-ros2dds/src/route_subscriber.rs | 38 ++++-- zenoh-plugin-ros2dds/src/routes_mgr.rs | 48 +++---- 11 files changed, 264 insertions(+), 222 deletions(-) diff --git a/zenoh-plugin-ros2dds/src/dds_utils.rs b/zenoh-plugin-ros2dds/src/dds_utils.rs index 7691aaca..e988c65d 100644 --- a/zenoh-plugin-ros2dds/src/dds_utils.rs +++ b/zenoh-plugin-ros2dds/src/dds_utils.rs @@ -12,6 +12,7 @@ // ZettaScale Zenoh Team, // use std::{ + collections::{BTreeSet, HashSet}, ffi::{CStr, CString}, mem::MaybeUninit, sync::{atomic::AtomicI32, Arc}, @@ -22,7 +23,7 @@ use cyclors::{ qos::{History, HistoryKind, Qos}, *, }; -use serde::Serializer; +use serde::{ser::SerializeSeq, Serializer}; use tokio::task; use crate::{ @@ -31,6 +32,23 @@ use crate::{ vec_into_raw_parts, }; +/// Serialize a `HashSet<(Gid, String)>` as a deduplicated array of the node fullnames, +/// dropping the participant GID. Keeps the admin-space JSON format unchanged after #702 fix. +pub fn serialize_local_nodes( + set: &HashSet<(Gid, String)>, + serializer: S, +) -> Result +where + S: Serializer, +{ + let dedup: BTreeSet<&str> = set.iter().map(|(_, name)| name.as_str()).collect(); + let mut seq = serializer.serialize_seq(Some(dedup.len()))?; + for name in &dedup { + seq.serialize_element(name)?; + } + seq.end() +} + // An atomic dds_entity_t (=i32), for safe concurrent creation/deletion of DDS entities pub type AtomicDDSEntity = AtomicI32; diff --git a/zenoh-plugin-ros2dds/src/events.rs b/zenoh-plugin-ros2dds/src/events.rs index a2210bce..e0b29b8d 100644 --- a/zenoh-plugin-ros2dds/src/events.rs +++ b/zenoh-plugin-ros2dds/src/events.rs @@ -17,41 +17,42 @@ use std::fmt::Display; use cyclors::qos::Qos; use zenoh::key_expr::OwnedKeyExpr; -use crate::node_info::*; +use crate::{gid::Gid, node_info::*}; -/// A (local) discovery event of a ROS2 interface +/// A (local) discovery event of a ROS2 interface. +/// First Gid is the participant GID owning the node — used to disambiguate same-named nodes across restarts (#702). #[derive(Debug)] pub enum ROS2DiscoveryEvent { - DiscoveredMsgPub(String, MsgPub), - UndiscoveredMsgPub(String, MsgPub), - DiscoveredMsgSub(String, MsgSub), - UndiscoveredMsgSub(String, MsgSub), - DiscoveredServiceSrv(String, ServiceSrv), - UndiscoveredServiceSrv(String, ServiceSrv), - DiscoveredServiceCli(String, ServiceCli), - UndiscoveredServiceCli(String, ServiceCli), - DiscoveredActionSrv(String, ActionSrv), - UndiscoveredActionSrv(String, ActionSrv), - DiscoveredActionCli(String, ActionCli), - UndiscoveredActionCli(String, ActionCli), + DiscoveredMsgPub(Gid, String, MsgPub), + UndiscoveredMsgPub(Gid, String, MsgPub), + DiscoveredMsgSub(Gid, String, MsgSub), + UndiscoveredMsgSub(Gid, String, MsgSub), + DiscoveredServiceSrv(Gid, String, ServiceSrv), + UndiscoveredServiceSrv(Gid, String, ServiceSrv), + DiscoveredServiceCli(Gid, String, ServiceCli), + UndiscoveredServiceCli(Gid, String, ServiceCli), + DiscoveredActionSrv(Gid, String, ActionSrv), + UndiscoveredActionSrv(Gid, String, ActionSrv), + DiscoveredActionCli(Gid, String, ActionCli), + UndiscoveredActionCli(Gid, String, ActionCli), } impl std::fmt::Display for ROS2DiscoveryEvent { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { use ROS2DiscoveryEvent::*; match self { - DiscoveredMsgPub(node, iface) => write!(f, "Node {node} declares {iface}"), - DiscoveredMsgSub(node, iface) => write!(f, "Node {node} declares {iface}"), - DiscoveredServiceSrv(node, iface) => write!(f, "Node {node} declares {iface}"), - DiscoveredServiceCli(node, iface) => write!(f, "Node {node} declares {iface}"), - DiscoveredActionSrv(node, iface) => write!(f, "Node {node} declares {iface}"), - DiscoveredActionCli(node, iface) => write!(f, "Node {node} declares {iface}"), - UndiscoveredMsgPub(node, iface) => write!(f, "Node {node} undeclares {iface}"), - UndiscoveredMsgSub(node, iface) => write!(f, "Node {node} undeclares {iface}"), - UndiscoveredServiceSrv(node, iface) => write!(f, "Node {node} undeclares {iface}"), - UndiscoveredServiceCli(node, iface) => write!(f, "Node {node} undeclares {iface}"), - UndiscoveredActionSrv(node, iface) => write!(f, "Node {node} undeclares {iface}"), - UndiscoveredActionCli(node, iface) => write!(f, "Node {node} undeclares {iface}"), + DiscoveredMsgPub(_, node, iface) => write!(f, "Node {node} declares {iface}"), + DiscoveredMsgSub(_, node, iface) => write!(f, "Node {node} declares {iface}"), + DiscoveredServiceSrv(_, node, iface) => write!(f, "Node {node} declares {iface}"), + DiscoveredServiceCli(_, node, iface) => write!(f, "Node {node} declares {iface}"), + DiscoveredActionSrv(_, node, iface) => write!(f, "Node {node} declares {iface}"), + DiscoveredActionCli(_, node, iface) => write!(f, "Node {node} declares {iface}"), + UndiscoveredMsgPub(_, node, iface) => write!(f, "Node {node} undeclares {iface}"), + UndiscoveredMsgSub(_, node, iface) => write!(f, "Node {node} undeclares {iface}"), + UndiscoveredServiceSrv(_, node, iface) => write!(f, "Node {node} undeclares {iface}"), + UndiscoveredServiceCli(_, node, iface) => write!(f, "Node {node} undeclares {iface}"), + UndiscoveredActionSrv(_, node, iface) => write!(f, "Node {node} undeclares {iface}"), + UndiscoveredActionCli(_, node, iface) => write!(f, "Node {node} undeclares {iface}"), } } } diff --git a/zenoh-plugin-ros2dds/src/lib.rs b/zenoh-plugin-ros2dds/src/lib.rs index 8dd30b0e..1690283a 100644 --- a/zenoh-plugin-ros2dds/src/lib.rs +++ b/zenoh-plugin-ros2dds/src/lib.rs @@ -666,22 +666,22 @@ impl ROS2PluginRuntime { if let Some(allowance) = &self.config.allowance { use ROS2DiscoveryEvent::*; match evt { - DiscoveredMsgPub(_, iface) | UndiscoveredMsgPub(_, iface) => { + DiscoveredMsgPub(_, _, iface) | UndiscoveredMsgPub(_, _, iface) => { allowance.is_publisher_allowed(&iface.name) } - DiscoveredMsgSub(_, iface) | UndiscoveredMsgSub(_, iface) => { + DiscoveredMsgSub(_, _, iface) | UndiscoveredMsgSub(_, _, iface) => { allowance.is_subscriber_allowed(&iface.name) } - DiscoveredServiceSrv(_, iface) | UndiscoveredServiceSrv(_, iface) => { + DiscoveredServiceSrv(_, _, iface) | UndiscoveredServiceSrv(_, _, iface) => { allowance.is_service_srv_allowed(&iface.name) } - DiscoveredServiceCli(_, iface) | UndiscoveredServiceCli(_, iface) => { + DiscoveredServiceCli(_, _, iface) | UndiscoveredServiceCli(_, _, iface) => { allowance.is_service_cli_allowed(&iface.name) } - DiscoveredActionSrv(_, iface) | UndiscoveredActionSrv(_, iface) => { + DiscoveredActionSrv(_, _, iface) | UndiscoveredActionSrv(_, _, iface) => { allowance.is_action_srv_allowed(&iface.name) } - DiscoveredActionCli(_, iface) | UndiscoveredActionCli(_, iface) => { + DiscoveredActionCli(_, _, iface) | UndiscoveredActionCli(_, _, iface) => { allowance.is_action_cli_allowed(&iface.name) } } diff --git a/zenoh-plugin-ros2dds/src/node_info.rs b/zenoh-plugin-ros2dds/src/node_info.rs index 6f3ef954..37174a4f 100644 --- a/zenoh-plugin-ros2dds/src/node_info.rs +++ b/zenoh-plugin-ros2dds/src/node_info.rs @@ -362,6 +362,9 @@ pub struct NodeInfo { // The node unique id is: // #[serde(skip)] pub id: String, + // Participant GID that owns this node. Used to disambiguate same-named nodes across restarts (#702). + #[serde(skip)] + pub participant: Gid, #[serde(skip)] fullname: Range, #[serde(skip)] @@ -447,6 +450,7 @@ impl NodeInfo { Ok(NodeInfo { id, + participant, fullname, namespace, name, @@ -642,7 +646,7 @@ impl NodeInfo { Entry::Vacant(e) => match MsgPub::create(name.into(), typ, *writer) { Ok(t) => { e.insert(t.clone()); - Some(DiscoveredMsgPub(node_fullname, t)) + Some(DiscoveredMsgPub(self.participant, node_fullname, t)) } Err(e) => { tracing::error!( @@ -661,7 +665,7 @@ impl NodeInfo { ); } else if v.writers.insert(*writer) && v.writers.len() == 1 { // Send DiscoveredMsgPub event only for the 1st discovered Writer - result = Some(DiscoveredMsgPub(node_fullname, v.clone())); + result = Some(DiscoveredMsgPub(self.participant, node_fullname, v.clone())); } result } @@ -681,7 +685,7 @@ impl NodeInfo { Entry::Vacant(e) => match MsgSub::create(name.into(), typ, *reader) { Ok(t) => { e.insert(t.clone()); - Some(DiscoveredMsgSub(node_fullname, t)) + Some(DiscoveredMsgSub(self.participant, node_fullname, t)) } Err(e) => { tracing::error!( @@ -700,7 +704,7 @@ impl NodeInfo { ); } else if v.readers.insert(*reader) && v.readers.len() == 1 { // Send DiscoveredMsgSub event only for the 1st discovered Reader - result = Some(DiscoveredMsgSub(node_fullname, v.clone())); + result = Some(DiscoveredMsgSub(self.participant, node_fullname, v.clone())); } result } @@ -738,7 +742,7 @@ impl NodeInfo { ); v.typ = typ; if v.is_complete() { - result = Some(DiscoveredServiceSrv(node_fullname.clone(), v.clone())) + result = Some(DiscoveredServiceSrv(self.participant, node_fullname.clone(), v.clone())) }; } if v.entities.req_reader != *reader { @@ -750,7 +754,7 @@ impl NodeInfo { } v.entities.req_reader = *reader; if v.is_complete() { - result = Some(DiscoveredServiceSrv(node_fullname, v.clone())) + result = Some(DiscoveredServiceSrv(self.participant, node_fullname, v.clone())) }; } result @@ -789,7 +793,7 @@ impl NodeInfo { ); v.typ = typ; if v.is_complete() { - result = Some(DiscoveredServiceSrv(node_fullname.clone(), v.clone())) + result = Some(DiscoveredServiceSrv(self.participant, node_fullname.clone(), v.clone())) }; } if v.entities.rep_writer != *writer { @@ -801,7 +805,7 @@ impl NodeInfo { } v.entities.rep_writer = *writer; if v.is_complete() { - result = Some(DiscoveredServiceSrv(node_fullname, v.clone())) + result = Some(DiscoveredServiceSrv(self.participant, node_fullname, v.clone())) }; } result @@ -840,7 +844,7 @@ impl NodeInfo { ); v.typ = typ; if v.is_complete() { - result = Some(DiscoveredServiceCli(node_fullname.clone(), v.clone())) + result = Some(DiscoveredServiceCli(self.participant, node_fullname.clone(), v.clone())) }; } if v.entities.rep_reader != *reader { @@ -852,7 +856,7 @@ impl NodeInfo { } v.entities.rep_reader = *reader; if v.is_complete() { - result = Some(DiscoveredServiceCli(node_fullname, v.clone())) + result = Some(DiscoveredServiceCli(self.participant, node_fullname, v.clone())) }; } result @@ -891,7 +895,7 @@ impl NodeInfo { ); v.typ = typ; if v.is_complete() { - result = Some(DiscoveredServiceCli(node_fullname.clone(), v.clone())) + result = Some(DiscoveredServiceCli(self.participant, node_fullname.clone(), v.clone())) }; } if v.entities.req_writer != *writer { @@ -903,7 +907,7 @@ impl NodeInfo { } v.entities.req_writer = *writer; if v.is_complete() { - result = Some(DiscoveredServiceCli(node_fullname, v.clone())) + result = Some(DiscoveredServiceCli(self.participant, node_fullname, v.clone())) }; } result @@ -944,7 +948,7 @@ impl NodeInfo { } v.typ = typ; if v.is_complete() { - result = Some(DiscoveredActionSrv(node_fullname.clone(), v.clone())) + result = Some(DiscoveredActionSrv(self.participant, node_fullname.clone(), v.clone())) }; } if v.entities.send_goal.req_reader != *reader { @@ -956,7 +960,7 @@ impl NodeInfo { } v.entities.send_goal.req_reader = *reader; if v.is_complete() { - result = Some(DiscoveredActionSrv(node_fullname, v.clone())) + result = Some(DiscoveredActionSrv(self.participant, node_fullname, v.clone())) }; } result @@ -997,7 +1001,7 @@ impl NodeInfo { } v.typ = typ; if v.is_complete() { - result = Some(DiscoveredActionSrv(node_fullname.clone(), v.clone())) + result = Some(DiscoveredActionSrv(self.participant, node_fullname.clone(), v.clone())) }; } if v.entities.send_goal.rep_writer != *writer { @@ -1009,7 +1013,7 @@ impl NodeInfo { } v.entities.send_goal.rep_writer = *writer; if v.is_complete() { - result = Some(DiscoveredActionSrv(node_fullname, v.clone())) + result = Some(DiscoveredActionSrv(self.participant, node_fullname, v.clone())) }; } result @@ -1052,7 +1056,7 @@ impl NodeInfo { } v.entities.cancel_goal.req_reader = *reader; if v.is_complete() { - result = Some(DiscoveredActionSrv(node_fullname, v.clone())) + result = Some(DiscoveredActionSrv(self.participant, node_fullname, v.clone())) }; } result @@ -1095,7 +1099,7 @@ impl NodeInfo { } v.entities.cancel_goal.rep_writer = *writer; if v.is_complete() { - result = Some(DiscoveredActionSrv(node_fullname, v.clone())) + result = Some(DiscoveredActionSrv(self.participant, node_fullname, v.clone())) }; } result @@ -1136,7 +1140,7 @@ impl NodeInfo { } v.typ = typ; if v.is_complete() { - result = Some(DiscoveredActionSrv(node_fullname.clone(), v.clone())) + result = Some(DiscoveredActionSrv(self.participant, node_fullname.clone(), v.clone())) }; } if v.entities.get_result.req_reader != *reader { @@ -1148,7 +1152,7 @@ impl NodeInfo { } v.entities.get_result.req_reader = *reader; if v.is_complete() { - result = Some(DiscoveredActionSrv(node_fullname, v.clone())) + result = Some(DiscoveredActionSrv(self.participant, node_fullname, v.clone())) }; } result @@ -1189,7 +1193,7 @@ impl NodeInfo { } v.typ = typ; if v.is_complete() { - result = Some(DiscoveredActionSrv(node_fullname.clone(), v.clone())) + result = Some(DiscoveredActionSrv(self.participant, node_fullname.clone(), v.clone())) }; } if v.entities.get_result.rep_writer != *writer { @@ -1201,7 +1205,7 @@ impl NodeInfo { } v.entities.get_result.rep_writer = *writer; if v.is_complete() { - result = Some(DiscoveredActionSrv(node_fullname, v.clone())) + result = Some(DiscoveredActionSrv(self.participant, node_fullname, v.clone())) }; } result @@ -1244,7 +1248,7 @@ impl NodeInfo { } v.entities.status_writer = *writer; if v.is_complete() { - result = Some(DiscoveredActionSrv(node_fullname, v.clone())) + result = Some(DiscoveredActionSrv(self.participant, node_fullname, v.clone())) }; } result @@ -1285,7 +1289,7 @@ impl NodeInfo { } v.typ = typ; if v.is_complete() { - result = Some(DiscoveredActionSrv(node_fullname.clone(), v.clone())) + result = Some(DiscoveredActionSrv(self.participant, node_fullname.clone(), v.clone())) }; } if v.entities.feedback_writer != *writer { @@ -1297,7 +1301,7 @@ impl NodeInfo { } v.entities.feedback_writer = *writer; if v.is_complete() { - result = Some(DiscoveredActionSrv(node_fullname, v.clone())) + result = Some(DiscoveredActionSrv(self.participant, node_fullname, v.clone())) }; } result @@ -1338,7 +1342,7 @@ impl NodeInfo { } v.typ = typ; if v.is_complete() { - result = Some(DiscoveredActionCli(node_fullname.clone(), v.clone())) + result = Some(DiscoveredActionCli(self.participant, node_fullname.clone(), v.clone())) }; } if v.entities.send_goal.rep_reader != *reader { @@ -1350,7 +1354,7 @@ impl NodeInfo { } v.entities.send_goal.rep_reader = *reader; if v.is_complete() { - result = Some(DiscoveredActionCli(node_fullname, v.clone())) + result = Some(DiscoveredActionCli(self.participant, node_fullname, v.clone())) }; } result @@ -1391,7 +1395,7 @@ impl NodeInfo { } v.typ = typ; if v.is_complete() { - result = Some(DiscoveredActionCli(node_fullname.clone(), v.clone())) + result = Some(DiscoveredActionCli(self.participant, node_fullname.clone(), v.clone())) }; } if v.entities.send_goal.req_writer != *writer { @@ -1403,7 +1407,7 @@ impl NodeInfo { } v.entities.send_goal.req_writer = *writer; if v.is_complete() { - result = Some(DiscoveredActionCli(node_fullname, v.clone())) + result = Some(DiscoveredActionCli(self.participant, node_fullname, v.clone())) }; } result @@ -1446,7 +1450,7 @@ impl NodeInfo { } v.entities.cancel_goal.rep_reader = *reader; if v.is_complete() { - result = Some(DiscoveredActionCli(node_fullname, v.clone())) + result = Some(DiscoveredActionCli(self.participant, node_fullname, v.clone())) }; } result @@ -1489,7 +1493,7 @@ impl NodeInfo { } v.entities.cancel_goal.req_writer = *writer; if v.is_complete() { - result = Some(DiscoveredActionCli(node_fullname, v.clone())) + result = Some(DiscoveredActionCli(self.participant, node_fullname, v.clone())) }; } result @@ -1530,7 +1534,7 @@ impl NodeInfo { } v.typ = typ; if v.is_complete() { - result = Some(DiscoveredActionCli(node_fullname.clone(), v.clone())) + result = Some(DiscoveredActionCli(self.participant, node_fullname.clone(), v.clone())) }; } if v.entities.get_result.rep_reader != *reader { @@ -1542,7 +1546,7 @@ impl NodeInfo { } v.entities.get_result.rep_reader = *reader; if v.is_complete() { - result = Some(DiscoveredActionCli(node_fullname, v.clone())) + result = Some(DiscoveredActionCli(self.participant, node_fullname, v.clone())) }; } result @@ -1583,7 +1587,7 @@ impl NodeInfo { } v.typ = typ; if v.is_complete() { - result = Some(DiscoveredActionCli(node_fullname.clone(), v.clone())) + result = Some(DiscoveredActionCli(self.participant, node_fullname.clone(), v.clone())) }; } if v.entities.get_result.req_writer != *writer { @@ -1595,7 +1599,7 @@ impl NodeInfo { } v.entities.get_result.req_writer = *writer; if v.is_complete() { - result = Some(DiscoveredActionCli(node_fullname, v.clone())) + result = Some(DiscoveredActionCli(self.participant, node_fullname, v.clone())) }; } result @@ -1638,7 +1642,7 @@ impl NodeInfo { } v.entities.status_reader = *reader; if v.is_complete() { - result = Some(DiscoveredActionCli(node_fullname, v.clone())) + result = Some(DiscoveredActionCli(self.participant, node_fullname, v.clone())) }; } result @@ -1679,7 +1683,7 @@ impl NodeInfo { } v.typ = typ; if v.is_complete() { - result = Some(DiscoveredActionCli(node_fullname.clone(), v.clone())) + result = Some(DiscoveredActionCli(self.participant, node_fullname.clone(), v.clone())) }; } if v.entities.feedback_reader != *reader { @@ -1691,7 +1695,7 @@ impl NodeInfo { } v.entities.feedback_reader = *reader; if v.is_complete() { - result = Some(DiscoveredActionCli(node_fullname, v.clone())) + result = Some(DiscoveredActionCli(self.participant, node_fullname, v.clone())) }; } result @@ -1706,22 +1710,22 @@ impl NodeInfo { let mut events = Vec::new(); for (_, v) in self.msg_pub.drain() { - events.push(UndiscoveredMsgPub(node_fullname.clone(), v)) + events.push(UndiscoveredMsgPub(self.participant, node_fullname.clone(), v)) } for (_, v) in self.msg_sub.drain() { - events.push(UndiscoveredMsgSub(node_fullname.clone(), v)) + events.push(UndiscoveredMsgSub(self.participant, node_fullname.clone(), v)) } for (_, v) in self.service_srv.drain() { - events.push(UndiscoveredServiceSrv(node_fullname.clone(), v)) + events.push(UndiscoveredServiceSrv(self.participant, node_fullname.clone(), v)) } for (_, v) in self.service_cli.drain() { - events.push(UndiscoveredServiceCli(node_fullname.clone(), v)) + events.push(UndiscoveredServiceCli(self.participant, node_fullname.clone(), v)) } for (_, v) in self.action_srv.drain() { - events.push(UndiscoveredActionSrv(node_fullname.clone(), v)) + events.push(UndiscoveredActionSrv(self.participant, node_fullname.clone(), v)) } for (_, v) in self.action_cli.drain() { - events.push(UndiscoveredActionCli(node_fullname.clone(), v)) + events.push(UndiscoveredActionCli(self.participant, node_fullname.clone(), v)) } self.undiscovered_reader.resize(0, Gid::NOT_DISCOVERED); self.undiscovered_writer.resize(0, Gid::NOT_DISCOVERED); @@ -1745,8 +1749,7 @@ impl NodeInfo { } }) { // Return undiscovery event for this Subscriber, since all its DDS Writer have been undiscovered - return Some(UndiscoveredMsgSub( - node_fullname, + return Some(UndiscoveredMsgSub(self.participant, node_fullname, self.msg_sub.remove(&name).unwrap(), )); } @@ -1755,8 +1758,7 @@ impl NodeInfo { .iter() .find(|(_, v)| v.entities.req_reader == *reader) { - return Some(UndiscoveredServiceSrv( - node_fullname, + return Some(UndiscoveredServiceSrv(self.participant, node_fullname, self.service_srv.remove(&name.clone()).unwrap(), )); } @@ -1765,8 +1767,7 @@ impl NodeInfo { .iter() .find(|(_, v)| v.entities.rep_reader == *reader) { - return Some(UndiscoveredServiceCli( - node_fullname, + return Some(UndiscoveredServiceCli(self.participant, node_fullname, self.service_cli.remove(&name.clone()).unwrap(), )); } @@ -1775,8 +1776,7 @@ impl NodeInfo { || v.entities.cancel_goal.req_reader == *reader || v.entities.get_result.req_reader == *reader }) { - return Some(UndiscoveredActionSrv( - node_fullname, + return Some(UndiscoveredActionSrv(self.participant, node_fullname, self.action_srv.remove(&name.clone()).unwrap(), )); } @@ -1787,8 +1787,7 @@ impl NodeInfo { || v.entities.status_reader == *reader || v.entities.feedback_reader == *reader }) { - return Some(UndiscoveredActionCli( - node_fullname, + return Some(UndiscoveredActionCli(self.participant, node_fullname, self.action_cli.remove(&name.clone()).unwrap(), )); } @@ -1812,8 +1811,7 @@ impl NodeInfo { } }) { // Return undiscovery event for this Publisher, since all its DDS Writer have been undiscovered - return Some(UndiscoveredMsgPub( - node_fullname, + return Some(UndiscoveredMsgPub(self.participant, node_fullname, self.msg_pub.remove(&name).unwrap(), )); } @@ -1822,8 +1820,7 @@ impl NodeInfo { .iter() .find(|(_, v)| v.entities.rep_writer == *writer) { - return Some(UndiscoveredServiceSrv( - node_fullname, + return Some(UndiscoveredServiceSrv(self.participant, node_fullname, self.service_srv.remove(&name.clone()).unwrap(), )); } @@ -1832,8 +1829,7 @@ impl NodeInfo { .iter() .find(|(_, v)| v.entities.req_writer == *writer) { - return Some(UndiscoveredServiceCli( - node_fullname, + return Some(UndiscoveredServiceCli(self.participant, node_fullname, self.service_cli.remove(&name.clone()).unwrap(), )); } @@ -1844,8 +1840,7 @@ impl NodeInfo { || v.entities.status_writer == *writer || v.entities.feedback_writer == *writer }) { - return Some(UndiscoveredActionSrv( - node_fullname, + return Some(UndiscoveredActionSrv(self.participant, node_fullname, self.action_srv.remove(&name.clone()).unwrap(), )); } @@ -1854,8 +1849,7 @@ impl NodeInfo { || v.entities.cancel_goal.req_writer == *writer || v.entities.get_result.req_writer == *writer }) { - return Some(UndiscoveredActionCli( - node_fullname, + return Some(UndiscoveredActionCli(self.participant, node_fullname, self.action_cli.remove(&name.clone()).unwrap(), )); } diff --git a/zenoh-plugin-ros2dds/src/route_action_cli.rs b/zenoh-plugin-ros2dds/src/route_action_cli.rs index 08cdeeb3..eb67025a 100644 --- a/zenoh-plugin-ros2dds/src/route_action_cli.rs +++ b/zenoh-plugin-ros2dds/src/route_action_cli.rs @@ -20,9 +20,9 @@ use zenoh::{ }; use crate::{ - liveliness_mgt::new_ke_liveliness_action_cli, ros2_utils::*, - route_action_srv::serialize_action_zenoh_key_expr, route_service_cli::RouteServiceCli, - route_subscriber::RouteSubscriber, routes_mgr::Context, + dds_utils::serialize_local_nodes, gid::Gid, liveliness_mgt::new_ke_liveliness_action_cli, + ros2_utils::*, route_action_srv::serialize_action_zenoh_key_expr, + route_service_cli::RouteServiceCli, route_subscriber::RouteSubscriber, routes_mgr::Context, }; #[derive(Serialize)] @@ -56,8 +56,9 @@ pub struct RouteActionCli { liveliness_token: Option, // the list of remote routes served by this route (":"") remote_routes: HashSet, - // the list of nodes served by this route - local_nodes: HashSet, + // the list of nodes served by this route, keyed by (participant_gid, node_fullname) — #702. + #[serde(serialize_with = "serialize_local_nodes")] + local_nodes: HashSet<(Gid, String)>, } impl fmt::Display for RouteActionCli { @@ -246,40 +247,41 @@ impl RouteActionCli { } #[inline] - pub async fn add_local_node(&mut self, node: String) { + pub async fn add_local_node(&mut self, node_key: (Gid, String)) { futures::join!( - self.route_send_goal.add_local_node(node.clone()), - self.route_cancel_goal.add_local_node(node.clone()), - self.route_get_result.add_local_node(node.clone()), + self.route_send_goal.add_local_node(node_key.clone()), + self.route_cancel_goal.add_local_node(node_key.clone()), + self.route_get_result.add_local_node(node_key.clone()), self.route_feedback - .add_local_node(node.clone(), &QOS_DEFAULT_ACTION_FEEDBACK), + .add_local_node(node_key.clone(), &QOS_DEFAULT_ACTION_FEEDBACK), self.route_status - .add_local_node(node.clone(), &QOS_DEFAULT_ACTION_STATUS), + .add_local_node(node_key.clone(), &QOS_DEFAULT_ACTION_STATUS), ); - self.local_nodes.insert(node); - tracing::debug!("{self} now serving local nodes {:?}", self.local_nodes); - // if 1st local node added, activate the route - if self.local_nodes.len() == 1 { + if self.local_nodes.insert(node_key) && self.local_nodes.len() == 1 { + tracing::debug!("{self} now serving local nodes {:?}", self.local_nodes); if let Err(e) = self.announce_route().await { tracing::error!("{self} activation failed: {e}"); } + } else { + tracing::debug!("{self} now serving local nodes {:?}", self.local_nodes); } } #[inline] - pub fn remove_local_node(&mut self, node: &str) { - self.route_send_goal.remove_local_node(node); - self.route_cancel_goal.remove_local_node(node); - self.route_get_result.remove_local_node(node); - self.route_feedback.remove_local_node(node); - self.route_status.remove_local_node(node); + pub fn remove_local_node(&mut self, node_key: &(Gid, String)) { + self.route_send_goal.remove_local_node(node_key); + self.route_cancel_goal.remove_local_node(node_key); + self.route_get_result.remove_local_node(node_key); + self.route_feedback.remove_local_node(node_key); + self.route_status.remove_local_node(node_key); - self.local_nodes.remove(node); - tracing::debug!("{self} now serving local nodes {:?}", self.local_nodes); - // if last local node removed, deactivate the route - if self.local_nodes.is_empty() { - self.retire_route(); + if self.local_nodes.remove(node_key) { + tracing::debug!("{self} now serving local nodes {:?}", self.local_nodes); + // if last local node removed, deactivate the route + if self.local_nodes.is_empty() { + self.retire_route(); + } } } diff --git a/zenoh-plugin-ros2dds/src/route_action_srv.rs b/zenoh-plugin-ros2dds/src/route_action_srv.rs index 5eaee02f..0cfaaf3d 100644 --- a/zenoh-plugin-ros2dds/src/route_action_srv.rs +++ b/zenoh-plugin-ros2dds/src/route_action_srv.rs @@ -20,8 +20,9 @@ use zenoh::{ }; use crate::{ - liveliness_mgt::new_ke_liveliness_action_srv, ros2_utils::*, route_publisher::RoutePublisher, - route_service_srv::RouteServiceSrv, routes_mgr::Context, + dds_utils::serialize_local_nodes, gid::Gid, liveliness_mgt::new_ke_liveliness_action_srv, + ros2_utils::*, route_publisher::RoutePublisher, route_service_srv::RouteServiceSrv, + routes_mgr::Context, }; #[derive(Serialize)] @@ -55,8 +56,9 @@ pub struct RouteActionSrv { liveliness_token: Option, // the list of remote routes served by this route (":"") remote_routes: HashSet, - // the list of nodes served by this route - local_nodes: HashSet, + // the list of nodes served by this route, keyed by (participant_gid, node_fullname) — #702. + #[serde(serialize_with = "serialize_local_nodes")] + local_nodes: HashSet<(Gid, String)>, } impl fmt::Display for RouteActionSrv { @@ -232,40 +234,41 @@ impl RouteActionSrv { } #[inline] - pub async fn add_local_node(&mut self, node: String) { + pub async fn add_local_node(&mut self, node_key: (Gid, String)) { futures::join!( - self.route_send_goal.add_local_node(node.clone()), - self.route_cancel_goal.add_local_node(node.clone()), - self.route_get_result.add_local_node(node.clone()), + self.route_send_goal.add_local_node(node_key.clone()), + self.route_cancel_goal.add_local_node(node_key.clone()), + self.route_get_result.add_local_node(node_key.clone()), self.route_feedback - .add_local_node(node.clone(), &QOS_DEFAULT_ACTION_FEEDBACK), + .add_local_node(node_key.clone(), &QOS_DEFAULT_ACTION_FEEDBACK), self.route_status - .add_local_node(node.clone(), &QOS_DEFAULT_ACTION_STATUS), + .add_local_node(node_key.clone(), &QOS_DEFAULT_ACTION_STATUS), ); - self.local_nodes.insert(node); - tracing::debug!("{self} now serving local nodes {:?}", self.local_nodes); - // if 1st local node added, activate the route - if self.local_nodes.len() == 1 { + if self.local_nodes.insert(node_key) && self.local_nodes.len() == 1 { + tracing::debug!("{self} now serving local nodes {:?}", self.local_nodes); if let Err(e) = self.announce_route().await { tracing::error!("{self} activation failed: {e}"); } + } else { + tracing::debug!("{self} now serving local nodes {:?}", self.local_nodes); } } #[inline] - pub fn remove_local_node(&mut self, node: &str) { - self.route_send_goal.remove_local_node(node); - self.route_cancel_goal.remove_local_node(node); - self.route_get_result.remove_local_node(node); - self.route_feedback.remove_local_node(node); - self.route_status.remove_local_node(node); + pub fn remove_local_node(&mut self, node_key: &(Gid, String)) { + self.route_send_goal.remove_local_node(node_key); + self.route_cancel_goal.remove_local_node(node_key); + self.route_get_result.remove_local_node(node_key); + self.route_feedback.remove_local_node(node_key); + self.route_status.remove_local_node(node_key); - self.local_nodes.remove(node); - tracing::debug!("{self} now serving local nodes {:?}", self.local_nodes); - // if last local node removed, deactivate the route - if self.local_nodes.is_empty() { - self.retire_route(); + if self.local_nodes.remove(node_key) { + tracing::debug!("{self} now serving local nodes {:?}", self.local_nodes); + // if last local node removed, deactivate the route + if self.local_nodes.is_empty() { + self.retire_route(); + } } } diff --git a/zenoh-plugin-ros2dds/src/route_publisher.rs b/zenoh-plugin-ros2dds/src/route_publisher.rs index 90b7d473..2f98f071 100644 --- a/zenoh-plugin-ros2dds/src/route_publisher.rs +++ b/zenoh-plugin-ros2dds/src/route_publisher.rs @@ -38,8 +38,9 @@ use crate::{ dds_types::{DDSRawSample, TypeInfo}, dds_utils::{ create_dds_reader, delete_dds_entity, get_guid, serialize_atomic_entity_guid, - AtomicDDSEntity, DDS_ENTITY_NULL, + serialize_local_nodes, AtomicDDSEntity, DDS_ENTITY_NULL, }, + gid::Gid, liveliness_mgt::new_ke_liveliness_pub, qos_helpers::*, ros2_utils::{is_message_for_action, ros2_message_type_to_dds_type}, @@ -103,8 +104,10 @@ pub struct RoutePublisher { liveliness_token: Option, // the list of remote routes served by this route (":"") remote_routes: HashSet, - // the list of nodes served by this route - local_nodes: HashSet, + // the list of nodes served by this route, keyed by (participant_gid, node_fullname) to + // disambiguate same-named nodes across restarts (#702). + #[serde(serialize_with = "serialize_local_nodes")] + local_nodes: HashSet<(Gid, String)>, } impl Drop for RoutePublisher { @@ -348,8 +351,12 @@ impl RoutePublisher { } #[inline] - pub async fn add_local_node(&mut self, node: String, discovered_writer_qos: &Qos) { - if self.local_nodes.insert(node) { + pub async fn add_local_node( + &mut self, + node_key: (Gid, String), + discovered_writer_qos: &Qos, + ) { + if self.local_nodes.insert(node_key) { tracing::debug!("{self} now serving local nodes {:?}", self.local_nodes); // if 1st local node added, announce the route if self.local_nodes.len() == 1 { @@ -361,8 +368,8 @@ impl RoutePublisher { } #[inline] - pub fn remove_local_node(&mut self, node: &str) { - if self.local_nodes.remove(node) { + pub fn remove_local_node(&mut self, node_key: &(Gid, String)) { + if self.local_nodes.remove(node_key) { tracing::debug!("{self} now serving local nodes {:?}", self.local_nodes); // if last local node removed, retire the route if self.local_nodes.is_empty() { diff --git a/zenoh-plugin-ros2dds/src/route_service_cli.rs b/zenoh-plugin-ros2dds/src/route_service_cli.rs index 3c1d6b3d..03a0a052 100644 --- a/zenoh-plugin-ros2dds/src/route_service_cli.rs +++ b/zenoh-plugin-ros2dds/src/route_service_cli.rs @@ -39,8 +39,10 @@ use crate::{ dds_types::{DDSRawSample, TypeInfo}, dds_utils::{ create_dds_reader, create_dds_writer, dds_write, delete_dds_entity, get_guid, - is_cdr_little_endian, serialize_atomic_entity_guid, AtomicDDSEntity, DDS_ENTITY_NULL, + is_cdr_little_endian, serialize_atomic_entity_guid, serialize_local_nodes, + AtomicDDSEntity, DDS_ENTITY_NULL, }, + gid::Gid, liveliness_mgt::new_ke_liveliness_service_cli, ros2_utils::{ is_service_for_action, new_service_id, ros2_service_type_to_reply_dds_type, @@ -79,8 +81,9 @@ pub struct RouteServiceCli { liveliness_token: Option, // the list of remote routes served by this route (":"") remote_routes: HashSet, - // the list of nodes served by this route - local_nodes: HashSet, + // the list of nodes served by this route, keyed by (participant_gid, node_fullname) — #702. + #[serde(serialize_with = "serialize_local_nodes")] + local_nodes: HashSet<(Gid, String)>, } impl Drop for RouteServiceCli { @@ -253,24 +256,25 @@ impl RouteServiceCli { } #[inline] - pub async fn add_local_node(&mut self, node: String) { - self.local_nodes.insert(node); - tracing::debug!("{self}: now serving local nodes {:?}", self.local_nodes); - // if 1st local node added, announce the route - if self.local_nodes.len() == 1 { + pub async fn add_local_node(&mut self, node_key: (Gid, String)) { + if self.local_nodes.insert(node_key) && self.local_nodes.len() == 1 { + tracing::debug!("{self}: now serving local nodes {:?}", self.local_nodes); if let Err(e) = self.announce_route().await { tracing::error!("{self}: announcement failed: {e}"); } + } else { + tracing::debug!("{self}: now serving local nodes {:?}", self.local_nodes); } } #[inline] - pub fn remove_local_node(&mut self, node: &str) { - self.local_nodes.remove(node); - tracing::debug!("{self}: now serving local nodes {:?}", self.local_nodes); - // if last local node removed, retire the route - if self.local_nodes.is_empty() { - self.retire_route(); + pub fn remove_local_node(&mut self, node_key: &(Gid, String)) { + if self.local_nodes.remove(node_key) { + tracing::debug!("{self}: now serving local nodes {:?}", self.local_nodes); + // if last local node removed, retire the route + if self.local_nodes.is_empty() { + self.retire_route(); + } } } diff --git a/zenoh-plugin-ros2dds/src/route_service_srv.rs b/zenoh-plugin-ros2dds/src/route_service_srv.rs index a1b634a5..791af0d4 100644 --- a/zenoh-plugin-ros2dds/src/route_service_srv.rs +++ b/zenoh-plugin-ros2dds/src/route_service_srv.rs @@ -39,9 +39,10 @@ use crate::{ dds_types::{DDSRawSample, TypeInfo}, dds_utils::{ create_dds_reader, create_dds_writer, dds_write, delete_dds_entity, get_guid, - get_instance_handle, is_cdr_little_endian, serialize_entity_guid, CDR_HEADER_BE, - CDR_HEADER_LE, + get_instance_handle, is_cdr_little_endian, serialize_entity_guid, serialize_local_nodes, + CDR_HEADER_BE, CDR_HEADER_LE, }, + gid::Gid, liveliness_mgt::new_ke_liveliness_service_srv, ros2_utils::{ is_service_for_action, new_service_id, ros2_service_type_to_reply_dds_type, @@ -87,8 +88,9 @@ pub struct RouteServiceSrv { liveliness_token: Option, // the list of remote routes served by this route (":"") remote_routes: HashSet, - // the list of nodes served by this route - local_nodes: HashSet, + // the list of nodes served by this route, keyed by (participant_gid, node_fullname) — #702. + #[serde(serialize_with = "serialize_local_nodes")] + local_nodes: HashSet<(Gid, String)>, } impl Drop for RouteServiceSrv { @@ -312,24 +314,25 @@ impl RouteServiceSrv { } #[inline] - pub async fn add_local_node(&mut self, node: String) { - self.local_nodes.insert(node); - tracing::debug!("{self} now serving local nodes {:?}", self.local_nodes); - // if 1st local node added, activate the route - if self.local_nodes.len() == 1 { + pub async fn add_local_node(&mut self, node_key: (Gid, String)) { + if self.local_nodes.insert(node_key) && self.local_nodes.len() == 1 { + tracing::debug!("{self} now serving local nodes {:?}", self.local_nodes); if let Err(e) = self.announce_route().await { tracing::error!("{self} activation failed: {e}"); } + } else { + tracing::debug!("{self} now serving local nodes {:?}", self.local_nodes); } } #[inline] - pub fn remove_local_node(&mut self, node: &str) { - self.local_nodes.remove(node); - tracing::debug!("{self} now serving local nodes {:?}", self.local_nodes); - // if last local node removed, deactivate the route - if self.local_nodes.is_empty() { - self.retire_route(); + pub fn remove_local_node(&mut self, node_key: &(Gid, String)) { + if self.local_nodes.remove(node_key) { + tracing::debug!("{self} now serving local nodes {:?}", self.local_nodes); + // if last local node removed, deactivate the route + if self.local_nodes.is_empty() { + self.retire_route(); + } } } diff --git a/zenoh-plugin-ros2dds/src/route_subscriber.rs b/zenoh-plugin-ros2dds/src/route_subscriber.rs index d5e824b8..5ff2e37b 100644 --- a/zenoh-plugin-ros2dds/src/route_subscriber.rs +++ b/zenoh-plugin-ros2dds/src/route_subscriber.rs @@ -31,8 +31,9 @@ use zenoh_ext::{AdvancedSubscriber, AdvancedSubscriberBuilderExt, HistoryConfig} use crate::{ dds_utils::{ create_dds_writer, ddsrt_iov_len_from_usize, delete_dds_entity, get_guid, - serialize_entity_guid, + serialize_entity_guid, serialize_local_nodes, }, + gid::Gid, liveliness_mgt::new_ke_liveliness_sub, qos::{History, Qos}, qos_helpers::is_transient_local, @@ -78,8 +79,10 @@ pub struct RouteSubscriber { liveliness_token: Option, // the list of remote routes served by this route (":"") remote_routes: HashSet, - // the list of nodes served by this route - local_nodes: HashSet, + // the list of nodes served by this route, keyed by (participant_gid, node_fullname) to + // disambiguate same-named nodes across restarts (#702). + #[serde(serialize_with = "serialize_local_nodes")] + local_nodes: HashSet<(Gid, String)>, } impl Drop for RouteSubscriber { @@ -281,24 +284,31 @@ impl RouteSubscriber { } #[inline] - pub async fn add_local_node(&mut self, entity_key: String, discovered_reader_qos: &Qos) { - self.local_nodes.insert(entity_key); - tracing::debug!("{self} now serving local nodes {:?}", self.local_nodes); - // if 1st local node added, activate the route - if self.local_nodes.len() == 1 { + pub async fn add_local_node( + &mut self, + node_key: (Gid, String), + discovered_reader_qos: &Qos, + ) { + // Only activate on the transition from 0 to 1 entries — re-inserting an existing key + // must NOT re-run announce_route (was a subtle bug, mirrored from RoutePublisher's fix). + if self.local_nodes.insert(node_key) && self.local_nodes.len() == 1 { + tracing::debug!("{self} now serving local nodes {:?}", self.local_nodes); if let Err(e) = self.announce_route(discovered_reader_qos).await { tracing::error!("{self} activation failed: {e}"); } + } else { + tracing::debug!("{self} now serving local nodes {:?}", self.local_nodes); } } #[inline] - pub fn remove_local_node(&mut self, entity_key: &str) { - self.local_nodes.remove(entity_key); - tracing::debug!("{self} now serving local nodes {:?}", self.local_nodes); - // if last local node removed, deactivate the route - if self.local_nodes.is_empty() { - self.retire_route(); + pub fn remove_local_node(&mut self, node_key: &(Gid, String)) { + if self.local_nodes.remove(node_key) { + tracing::debug!("{self} now serving local nodes {:?}", self.local_nodes); + // if last local node removed, deactivate the route + if self.local_nodes.is_empty() { + self.retire_route(); + } } } diff --git a/zenoh-plugin-ros2dds/src/routes_mgr.rs b/zenoh-plugin-ros2dds/src/routes_mgr.rs index 14b6ff7d..40629185 100644 --- a/zenoh-plugin-ros2dds/src/routes_mgr.rs +++ b/zenoh-plugin-ros2dds/src/routes_mgr.rs @@ -135,7 +135,7 @@ impl RoutesMgr { ) -> Result<(), String> { use ROS2DiscoveryEvent::*; match event { - DiscoveredMsgPub(node, iface) => { + DiscoveredMsgPub(participant, node, iface) => { // Pick 1 discovered Writer amongst the possibly multiple ones listed in MsgPub let entity = { let entities = zread!(self.context.discovered_entities); @@ -157,7 +157,7 @@ impl RoutesMgr { true, ) .await?; - route.add_local_node(node, &entity.qos).await; + route.add_local_node((participant, node), &entity.qos).await; } None => { return Err(format!( @@ -168,11 +168,11 @@ impl RoutesMgr { } } - UndiscoveredMsgPub(node, iface) => { + UndiscoveredMsgPub(participant, node, iface) => { if let Entry::Occupied(mut entry) = self.routes_publishers.entry(iface.name.clone()) { let route = entry.get_mut(); - route.remove_local_node(&node); + route.remove_local_node(&(participant, node)); if route.is_unused() { self.admin_space .remove(&(*KE_PREFIX_ROUTE_PUBLISHER / iface.name_as_keyexpr())); @@ -182,7 +182,7 @@ impl RoutesMgr { } } - DiscoveredMsgSub(node, iface) => { + DiscoveredMsgSub(participant, node, iface) => { // Pick 1 discovered Reader amongst the possibly multiple ones listed in MsgSub let entity = { let entities = zread!(self.context.discovered_entities); @@ -204,7 +204,7 @@ impl RoutesMgr { true, ) .await?; - route.add_local_node(node, &entity.qos).await; + route.add_local_node((participant, node), &entity.qos).await; } None => { return Err(format!( @@ -215,12 +215,12 @@ impl RoutesMgr { } } - UndiscoveredMsgSub(node, iface) => { + UndiscoveredMsgSub(participant, node, iface) => { if let Entry::Occupied(mut entry) = self.routes_subscribers.entry(iface.name.clone()) { let route = entry.get_mut(); - route.remove_local_node(&node); + route.remove_local_node(&(participant, node)); if route.is_unused() { self.admin_space .remove(&(*KE_PREFIX_ROUTE_SUBSCRIBER / iface.name_as_keyexpr())); @@ -229,19 +229,19 @@ impl RoutesMgr { } } } - DiscoveredServiceSrv(node, iface) => { + DiscoveredServiceSrv(participant, node, iface) => { // Get route (create it if not yet exists) let route = self .get_or_create_route_service_srv(iface.name, iface.typ, true) .await?; - route.add_local_node(node).await; + route.add_local_node((participant, node)).await; } - UndiscoveredServiceSrv(node, iface) => { + UndiscoveredServiceSrv(participant, node, iface) => { if let Entry::Occupied(mut entry) = self.routes_service_srv.entry(iface.name.clone()) { let route = entry.get_mut(); - route.remove_local_node(&node); + route.remove_local_node(&(participant, node)); if route.is_unused() { self.admin_space .remove(&(*KE_PREFIX_ROUTE_SERVICE_SRV / iface.name_as_keyexpr())); @@ -250,19 +250,19 @@ impl RoutesMgr { } } } - DiscoveredServiceCli(node, iface) => { + DiscoveredServiceCli(participant, node, iface) => { // Get route (create it if not yet exists) let route = self .get_or_create_route_service_cli(iface.name, iface.typ, true) .await?; - route.add_local_node(node).await; + route.add_local_node((participant, node)).await; } - UndiscoveredServiceCli(node, iface) => { + UndiscoveredServiceCli(participant, node, iface) => { if let Entry::Occupied(mut entry) = self.routes_service_cli.entry(iface.name.clone()) { let route = entry.get_mut(); - route.remove_local_node(&node); + route.remove_local_node(&(participant, node)); if route.is_unused() { self.admin_space .remove(&(*KE_PREFIX_ROUTE_SERVICE_CLI / iface.name_as_keyexpr())); @@ -271,18 +271,18 @@ impl RoutesMgr { } } } - DiscoveredActionSrv(node, iface) => { + DiscoveredActionSrv(participant, node, iface) => { // Get route (create it if not yet exists) let route = self .get_or_create_route_action_srv(iface.name, iface.typ) .await?; - route.add_local_node(node).await; + route.add_local_node((participant, node)).await; } - UndiscoveredActionSrv(node, iface) => { + UndiscoveredActionSrv(participant, node, iface) => { if let Entry::Occupied(mut entry) = self.routes_action_srv.entry(iface.name.clone()) { let route = entry.get_mut(); - route.remove_local_node(&node); + route.remove_local_node(&(participant, node)); if route.is_unused() { self.admin_space .remove(&(*KE_PREFIX_ROUTE_ACTION_SRV / iface.name_as_keyexpr())); @@ -291,18 +291,18 @@ impl RoutesMgr { } } } - DiscoveredActionCli(node, iface) => { + DiscoveredActionCli(participant, node, iface) => { // Get route (create it if not yet exists) let route = self .get_or_create_route_action_cli(iface.name, iface.typ) .await?; - route.add_local_node(node).await; + route.add_local_node((participant, node)).await; } - UndiscoveredActionCli(node, iface) => { + UndiscoveredActionCli(participant, node, iface) => { if let Entry::Occupied(mut entry) = self.routes_action_cli.entry(iface.name.clone()) { let route = entry.get_mut(); - route.remove_local_node(&node); + route.remove_local_node(&(participant, node)); if route.is_unused() { self.admin_space .remove(&(*KE_PREFIX_ROUTE_ACTION_CLI / iface.name_as_keyexpr()));