From aa2804c5743d3807219573ed83b000382d9d3fdd Mon Sep 17 00:00:00 2001 From: sergerad Date: Wed, 24 Jun 2026 13:24:05 +1200 Subject: [PATCH 1/3] Handle disconnected clients --- crates/store/src/state/subscription.rs | 55 +++++++++++++++++++++++++- 1 file changed, 53 insertions(+), 2 deletions(-) diff --git a/crates/store/src/state/subscription.rs b/crates/store/src/state/subscription.rs index 6cb3d4c41..1d64c79e0 100644 --- a/crates/store/src/state/subscription.rs +++ b/crates/store/src/state/subscription.rs @@ -251,8 +251,18 @@ async fn run_stream_inner( permit.send(Ok(source.build_event(next, data, tip))); next = next.child(); } - if tip_rx.changed().await.is_err() { - return Ok(()); + // Wait for the tip to advance, but also terminate promptly if the subscriber has + // disconnected. A subscription whose `from` is ahead of the tip never enters the send loop + // above, so without also watching `tx.closed()` here the detached task would park on tip + // changes until the chain reaches `from` (or the node shuts down), leaking the task, its + // watch receiver, and its `Arc` long after the client dropped the stream. + tokio::select! { + changed = tip_rx.changed() => { + if changed.is_err() { + return Ok(()); + } + }, + () = tx.closed() => return Ok(()), } } } @@ -288,6 +298,47 @@ fn check_growing_gap( mod tests { use super::*; + /// Minimal [`SubscriptionSource`] for lifetime tests. `fetch` is never reached when `from` is + /// ahead of the tip, so it just yields empty data. + struct MockSource; + + impl SubscriptionSource for MockSource { + type Event = (); + type Error = BlockSubscriptionError; + + async fn fetch(&self, _block_num: BlockNumber) -> Result, Self::Error> { + Ok(Vec::new()) + } + + fn build_event(&self, _block_num: BlockNumber, _data: Vec, _tip: BlockNumber) {} + } + + /// A subscription whose `from` is ahead of the tip parks waiting for the tip to advance. When + /// the subscriber disconnects (the returned stream is dropped), the detached task must + /// terminate rather than leaking until the chain reaches `from`. + #[tokio::test] + async fn future_subscription_task_terminates_on_disconnect() { + let (tip_tx, tip_rx) = watch::channel(BlockNumber::GENESIS); + + // `from` is far ahead of the tip, so the task never enters the send loop and parks on the + // tip. The spawned task holds the only watch receiver. + let stream = run_stream(BlockNumber::from(1_000_000), tip_rx, MockSource); + assert_eq!(tip_tx.receiver_count(), 1, "the spawned task should hold the tip receiver"); + + // The client disconnects. + drop(stream); + + // The task must observe the closed channel and drop its watch receiver. The tip never + // advances, so a task that only waited on tip changes would hang here. + tokio::time::timeout(Duration::from_secs(5), async { + while tip_tx.receiver_count() > 0 { + tokio::task::yield_now().await; + } + }) + .await + .expect("subscription task must terminate after the subscriber disconnects"); + } + fn run(gaps: &[u32]) -> Result<(), ()> { let mut previous_gap = gaps.first().copied().unwrap_or(u32::MAX); let mut growth_run = 0u32; From 8a063fc39c8bc46061b5f2eeb05274010e74ef2f Mon Sep 17 00:00:00 2001 From: sergerad Date: Mon, 29 Jun 2026 10:09:41 +1200 Subject: [PATCH 2/3] Disallow too far --- crates/store/src/state/subscription.rs | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/crates/store/src/state/subscription.rs b/crates/store/src/state/subscription.rs index 1d64c79e0..6218d43e3 100644 --- a/crates/store/src/state/subscription.rs +++ b/crates/store/src/state/subscription.rs @@ -18,6 +18,9 @@ pub const SUBSCRIBER_CHANNEL_CAPACITY: usize = 32; const SEND_TIMEOUT: Duration = Duration::from_secs(10); /// Maximum running block-gap allowed before a subscriber is disconnected. const MAX_RUNNING_GAP: u32 = 100u32; +/// Maximum gap between tip and subscriber's requested starting block where the starting block is +/// greater than the tip. +const MAX_FUTURE_GAP: u32 = 100u32; // SUBSCRIPTION EVENTS // ================================================================================================ @@ -44,6 +47,8 @@ pub struct ProofSubscriptionEvent { pub enum SubscriptionStreamError { #[error("subscriber is too slow to keep up with the chain")] TooSlow, + #[error("subscriber's requested starting block is too far ahead of the chain tip")] + TooFarAhead, #[error(transparent)] Source(#[from] E), } @@ -230,7 +235,12 @@ async fn run_stream_inner( source: S, ) -> Result<(), SubscriptionStreamError> { let mut next = from; - let mut previous_gap = tip_rx.borrow().as_u32(); + let tip = tip_rx.borrow().as_u32(); + if next.as_u32() > tip.saturating_add(MAX_FUTURE_GAP) { + return Err(SubscriptionStreamError::TooFarAhead); + } + + let mut previous_gap = tip; let mut running_gap = 0u32; loop { let mut tip = *tip_rx.borrow_and_update(); @@ -252,10 +262,7 @@ async fn run_stream_inner( next = next.child(); } // Wait for the tip to advance, but also terminate promptly if the subscriber has - // disconnected. A subscription whose `from` is ahead of the tip never enters the send loop - // above, so without also watching `tx.closed()` here the detached task would park on tip - // changes until the chain reaches `from` (or the node shuts down), leaking the task, its - // watch receiver, and its `Arc` long after the client dropped the stream. + // disconnected (in case the tip is less than `next`). tokio::select! { changed = tip_rx.changed() => { if changed.is_err() { From edf8570deabc6ea44ca3e7fdebbe67cb039ebe68 Mon Sep 17 00:00:00 2001 From: sergerad Date: Mon, 29 Jun 2026 10:36:55 +1200 Subject: [PATCH 3/3] Fix tests and error matches --- .../validator_service/block_subscription.rs | 3 ++ crates/rpc/src/server/api.rs | 6 ++++ crates/store/src/state/subscription.rs | 35 ++++++++++++++++++- 3 files changed, 43 insertions(+), 1 deletion(-) diff --git a/bin/validator/src/server/validator_service/block_subscription.rs b/bin/validator/src/server/validator_service/block_subscription.rs index b5f538971..586f4e4c4 100644 --- a/bin/validator/src/server/validator_service/block_subscription.rs +++ b/bin/validator/src/server/validator_service/block_subscription.rs @@ -98,6 +98,9 @@ fn subscription_error_to_status(err: SubscriptionStreamError { Status::resource_exhausted("subscriber is too slow to keep up with the chain") }, + SubscriptionStreamError::TooFarAhead => Status::out_of_range( + "subscriber's requested starting block is too far ahead of the chain tip", + ), SubscriptionStreamError::Source(BlockSubscriptionError::NotFound(block_num)) => { Status::not_found(format!("block {block_num} not found")) }, diff --git a/crates/rpc/src/server/api.rs b/crates/rpc/src/server/api.rs index 6fe2cb411..1ac9afc18 100644 --- a/crates/rpc/src/server/api.rs +++ b/crates/rpc/src/server/api.rs @@ -316,6 +316,9 @@ fn block_subscription_error_to_status( SubscriptionStreamError::TooSlow => { Status::resource_exhausted("subscriber is too slow to keep up with the chain") }, + SubscriptionStreamError::TooFarAhead => Status::out_of_range( + "subscriber's requested starting block is too far ahead of the chain tip", + ), SubscriptionStreamError::Source(BlockSubscriptionError::NotFound(block_num)) => { Status::not_found(format!("block {block_num} not found")) }, @@ -332,6 +335,9 @@ fn proof_subscription_error_to_status( SubscriptionStreamError::TooSlow => { Status::resource_exhausted("subscriber is too slow to keep up with the chain") }, + SubscriptionStreamError::TooFarAhead => Status::out_of_range( + "subscriber's requested starting block is too far ahead of the chain tip", + ), SubscriptionStreamError::Source(ProofSubscriptionError::NotFound(block_num)) => { Status::not_found(format!("proof for block {block_num} not found")) }, diff --git a/crates/store/src/state/subscription.rs b/crates/store/src/state/subscription.rs index 6218d43e3..e156cf4df 100644 --- a/crates/store/src/state/subscription.rs +++ b/crates/store/src/state/subscription.rs @@ -303,6 +303,8 @@ fn check_growing_gap( #[cfg(test)] mod tests { + use tokio_stream::StreamExt; + use super::*; /// Minimal [`SubscriptionSource`] for lifetime tests. `fetch` is never reached when `from` is @@ -329,7 +331,7 @@ mod tests { // `from` is far ahead of the tip, so the task never enters the send loop and parks on the // tip. The spawned task holds the only watch receiver. - let stream = run_stream(BlockNumber::from(1_000_000), tip_rx, MockSource); + let stream = run_stream(BlockNumber::from(MAX_FUTURE_GAP - 1), tip_rx, MockSource); assert_eq!(tip_tx.receiver_count(), 1, "the spawned task should hold the tip receiver"); // The client disconnects. @@ -346,6 +348,37 @@ mod tests { .expect("subscription task must terminate after the subscriber disconnects"); } + #[tokio::test] + async fn starting_block_exceeds_future_gap_returns_too_far_ahead() { + let (_tip_tx, tip_rx) = watch::channel(BlockNumber::GENESIS); + let from = BlockNumber::from(MAX_FUTURE_GAP + 1); + let mut stream = run_stream(from, tip_rx, MockSource); + + let item = tokio::time::timeout(Duration::from_secs(5), stream.next()) + .await + .expect("stream must yield promptly") + .expect("stream must not end without an item"); + assert!(matches!(item, Err(SubscriptionStreamError::TooFarAhead))); + } + + #[tokio::test] + async fn starting_block_at_exact_future_gap_boundary_is_accepted() { + let (tip_tx, tip_rx) = watch::channel(BlockNumber::GENESIS); + // Exactly at the boundary: `from == tip + MAX_FUTURE_GAP` is NOT > tip + MAX_FUTURE_GAP, so + // the subscription must be accepted. + let from = BlockNumber::from(MAX_FUTURE_GAP); + let mut stream = run_stream(from, tip_rx, MockSource); + + // Advance the tip to `from` so the task can produce an event rather than parking forever. + tip_tx.send(from).unwrap(); + + let item = tokio::time::timeout(Duration::from_secs(5), stream.next()) + .await + .expect("stream must yield promptly") + .expect("stream must not end without an item"); + assert!(matches!(item, Ok(())), "expected an event, not TooFarAhead: {item:?}"); + } + fn run(gaps: &[u32]) -> Result<(), ()> { let mut previous_gap = gaps.first().copied().unwrap_or(u32::MAX); let mut growth_run = 0u32;