Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions zenoh-plugin-ros2dds/src/qos_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,46 @@ pub fn is_transient_local(qos: &Qos) -> bool {
.is_some_and(|durability| durability.kind == DurabilityKind::TRANSIENT_LOCAL)
}

/// Returns true if `incoming` QoS is compatible with the `existing` route QoS.
///
/// Per the DDS specification (§7.1.3):
/// - Reliability: BEST_EFFORT writer can match a BEST_EFFORT reader;
/// RELIABLE writer can match any reader; BEST_EFFORT writer cannot
/// match a RELIABLE reader. In the bridge context both sides must agree.
/// - Durability: VOLATILE does not deliver historical data; TRANSIENT_LOCAL does.
/// If the route was created as TRANSIENT_LOCAL but the new entity is VOLATILE
/// (or vice versa) the DDS endpoint matching will silently fail.
///
/// Callers that detect incompatibility should log a WARN so the failure is
/// observable (D-VGC134-5 silent-failure class — convert silent drop to log
/// entry). Route teardown/recreation is left to future work.
pub fn qos_is_compatible(existing: &Qos, incoming: &Qos) -> bool {
// Check Reliability compatibility.
let existing_reliable = is_reliable(existing);
let incoming_reliable = is_reliable(incoming);
if existing_reliable != incoming_reliable {
return false;
}
// Check Durability (TRANSIENT_LOCAL vs. VOLATILE) compatibility.
let existing_transient = is_transient_local(existing);
let incoming_transient = is_transient_local(incoming);
if existing_transient != incoming_transient {
return false;
}
true
}

/// Returns a human-readable summary of Reliability + Durability for log output.
pub fn qos_summary(qos: &Qos) -> String {
let rel = if is_reliable(qos) { "RELIABLE" } else { "BEST_EFFORT" };
let dur = if is_transient_local(qos) {
"TRANSIENT_LOCAL"
} else {
"VOLATILE"
};
format!("{rel}/{dur}")
}

// Copy and adapt Writer's QoS for creation of a matching Reader
pub fn adapt_writer_qos_for_reader(qos: &Qos) -> Qos {
let mut reader_qos = qos.clone();
Expand Down
8 changes: 8 additions & 0 deletions zenoh-plugin-ros2dds/src/route_publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,14 @@ impl RoutePublisher {
pub fn is_unused(&self) -> bool {
!self.is_serving_local_node() && !self.is_serving_remote_route()
}

/// Returns the QoS this route's DDS Reader was created with.
/// Used to detect QoS mismatches when a new entity is discovered for an
/// existing route (issue #435 — silent DDS endpoint matching failure).
#[inline]
pub fn reader_qos(&self) -> &Qos {
&self._reader_qos
}
}

pub fn serialize_pub_cache<S>(zpub: &ZPublisher, s: S) -> Result<S::Ok, S::Error>
Expand Down
16 changes: 16 additions & 0 deletions zenoh-plugin-ros2dds/src/route_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ pub struct RouteSubscriber {
// a liveliness token associated to this route, for announcement to other plugins
#[serde(skip)]
liveliness_token: Option<LivelinessToken>,
// the QoS for the DDS Writer created for this route.
// Stored to enable QoS-mismatch detection when a new entity is discovered
// for an already-existing route (issue #435 — silent DDS endpoint failure).
#[serde(skip)]
_writer_qos: Qos,
// the list of remote routes served by this route ("<zenoh_id>:<zenoh_key_expr>"")
remote_routes: HashSet<String>,
// the list of nodes served by this route
Expand Down Expand Up @@ -138,6 +143,8 @@ impl RouteSubscriber {
tracing::debug!(
"Route Subscriber ({zenoh_key_expr} -> {ros2_name}): create Writer with {writer_qos:?}"
);
// Capture writer_qos before moving it into create_dds_writer
let stored_writer_qos = writer_qos.clone();
let dds_writer = create_dds_writer(
context.participant,
topic_name,
Expand All @@ -161,6 +168,7 @@ impl RouteSubscriber {
queries_timeout,
keyless,
liveliness_token: None,
_writer_qos: stored_writer_qos,
remote_routes: HashSet::new(),
local_nodes: HashSet::new(),
})
Expand Down Expand Up @@ -311,6 +319,14 @@ impl RouteSubscriber {
pub fn is_unused(&self) -> bool {
!self.is_serving_local_node() && !self.is_serving_remote_route()
}

/// Returns the QoS this route's DDS Writer was created with.
/// Used to detect QoS mismatches when a new entity is discovered for an
/// existing route (issue #435 — silent DDS endpoint matching failure).
#[inline]
pub fn writer_qos(&self) -> &Qos {
&self._writer_qos
}
}

fn route_zenoh_message_to_dds(s: Sample, ros2_name: &str, data_writer: dds_entity_t) {
Expand Down
33 changes: 30 additions & 3 deletions zenoh-plugin-ros2dds/src/routes_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::{
config::Config,
discovered_entities::DiscoveredEntities,
events::{ROS2AnnouncementEvent, ROS2DiscoveryEvent},
qos_helpers::{adapt_reader_qos_for_writer, adapt_writer_qos_for_reader},
qos_helpers::{adapt_reader_qos_for_writer, adapt_writer_qos_for_reader, qos_is_compatible, qos_summary},
ros2_utils::{key_expr_to_ros2_name, ros2_name_to_key_expr},
ros_discovery::RosDiscoveryInfoMgr,
route_action_cli::RouteActionCli,
Expand Down Expand Up @@ -588,7 +588,22 @@ impl RoutesMgr {

Ok(entry.insert(route))
}
Entry::Occupied(entry) => Ok(entry.into_mut()),
Entry::Occupied(entry) => {
// Route already exists: check that the incoming QoS is compatible with
// the QoS the route's DDS Reader was created with. A mismatch causes DDS
// to silently refuse endpoint matching — no error is surfaced to the caller
// (issue #435). Log a WARN so the failure is observable.
let route = entry.into_mut();
if !qos_is_compatible(route.reader_qos(), &reader_qos) {
tracing::warn!(
"{route} QoS mismatch: route was created with {} but incoming entity \
has {}. DDS endpoint matching may silently fail.",
qos_summary(route.reader_qos()),
qos_summary(&reader_qos),
);
}
Ok(route)
}
}
}

Expand Down Expand Up @@ -625,7 +640,19 @@ impl RoutesMgr {

Ok(entry.insert(route))
}
Entry::Occupied(entry) => Ok(entry.into_mut()),
Entry::Occupied(entry) => {
// Route already exists: check QoS compatibility (issue #435 — silent failure).
let route = entry.into_mut();
if !qos_is_compatible(route.writer_qos(), &writer_qos) {
tracing::warn!(
"{route} QoS mismatch: route was created with {} but incoming entity \
has {}. DDS endpoint matching may silently fail.",
qos_summary(route.writer_qos()),
qos_summary(&writer_qos),
);
}
Ok(route)
}
}
}

Expand Down