From c16063df1df89cce9eab0bb513b09151e5824727 Mon Sep 17 00:00:00 2001 From: sergerad Date: Thu, 25 Jun 2026 11:38:46 +1200 Subject: [PATCH 1/3] Add redrive --- crates/store/src/state/subscription.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/crates/store/src/state/subscription.rs b/crates/store/src/state/subscription.rs index 6cb3d4c41..7451081fb 100644 --- a/crates/store/src/state/subscription.rs +++ b/crates/store/src/state/subscription.rs @@ -209,6 +209,9 @@ impl SubscriptionSource for ProofSource { /// subscriber with [`SubscriptionStreamError::TooSlow`] if it falls too far behind the tip or if a /// single send blocks for longer than [`SEND_TIMEOUT`], which may occur only after the buffer has /// [`SUBSCRIBER_CHANNEL_CAPACITY`] blocks queued. +/// +/// Sources are allowed to re-drive earlier tips so consumers of the stream should allow for +/// overwrites so as to be idempotent. pub fn run_stream( from: BlockNumber, tip_rx: watch::Receiver, @@ -235,6 +238,11 @@ async fn run_stream_inner( loop { let mut tip = *tip_rx.borrow_and_update(); + // Allow for re-drive of the stream to earlier tips. + if tip < next { + next = tip; + } + let current_gap = tip.saturating_sub(next.as_u32()).as_u32(); (previous_gap, running_gap) = check_growing_gap(current_gap, previous_gap, running_gap, MAX_RUNNING_GAP) From 3015c7f6122ba9867597522c5984905564e89986 Mon Sep 17 00:00:00 2001 From: sergerad Date: Thu, 25 Jun 2026 12:05:27 +1200 Subject: [PATCH 2/3] Add redrive and test --- crates/store/src/state/subscription.rs | 73 +++++++++++++++++++++++++- 1 file changed, 71 insertions(+), 2 deletions(-) diff --git a/crates/store/src/state/subscription.rs b/crates/store/src/state/subscription.rs index 7451081fb..fd082f3ee 100644 --- a/crates/store/src/state/subscription.rs +++ b/crates/store/src/state/subscription.rs @@ -238,8 +238,13 @@ async fn run_stream_inner( loop { let mut tip = *tip_rx.borrow_and_update(); - // Allow for re-drive of the stream to earlier tips. - if tip < next { + // Support redrives: a producer may overwrite the current tip block in place (e.g. re-sign + // and re-commit it) without advancing the chain. Since the tip otherwise only moves + // forward, observing it at or below an already-emitted block (`tip < next`) means that + // block was rewritten, so rewind to re-emit it. The `tip >= from` guard preserves + // subscriptions that start from a future block, where `tip < next` instead means the + // requested start has not been reached yet. + if tip < next && tip >= from { next = tip; } @@ -294,8 +299,72 @@ fn check_growing_gap( #[cfg(test)] mod tests { + use tokio_stream::StreamExt; + use super::*; + /// Test [`SubscriptionSource`] that emits the requested block number, so emissions can be + /// asserted directly against the driving tips. + struct NumberSource; + + impl SubscriptionSource for NumberSource { + type Event = BlockNumber; + type Error = std::convert::Infallible; + + async fn fetch(&self, block_num: BlockNumber) -> Result, Self::Error> { + Ok(block_num.as_u32().to_le_bytes().to_vec()) + } + + fn build_event( + &self, + block_num: BlockNumber, + _data: Vec, + _tip: BlockNumber, + ) -> BlockNumber { + block_num + } + } + + /// A redrive overwrites the current tip block in place, and the subscriber re-emits it. + /// + /// Each emission is awaited before driving the next tip so the spawned stream task observes + /// every tip update individually rather than coalescing them. + #[tokio::test] + async fn redrive_re_emits_the_current_tip_block() { + let (tip_tx, tip_rx) = watch::channel(BlockNumber::GENESIS); + let mut stream = Box::pin(run_stream(BlockNumber::GENESIS, tip_rx, NumberSource)); + + // Genesis is emitted from the initial tip. + assert_eq!(stream.next().await.unwrap().unwrap(), BlockNumber::GENESIS); + + // Drive block 1. + tip_tx.send_replace(BlockNumber::from(1u32)); + assert_eq!(stream.next().await.unwrap().unwrap(), BlockNumber::from(1u32)); + + // Redrive block 1: the tip does not advance, but block 1 is emitted again. + tip_tx.send_replace(BlockNumber::from(1u32)); + assert_eq!(stream.next().await.unwrap().unwrap(), BlockNumber::from(1u32)); + + // Drive block 2: the stream resumes forward progress without re-emitting block 1. + tip_tx.send_replace(BlockNumber::from(2u32)); + assert_eq!(stream.next().await.unwrap().unwrap(), BlockNumber::from(2u32)); + } + + /// A subscription that starts ahead of the tip waits for the tip to reach `from` rather than + /// rewinding to re-emit earlier blocks. + #[tokio::test] + async fn subscription_from_future_block_does_not_rewind() { + let from = BlockNumber::from(10u32); + let (tip_tx, tip_rx) = watch::channel(BlockNumber::from(5u32)); + let mut stream = Box::pin(run_stream(from, tip_rx, NumberSource)); + + // Advance the tip up to `from`; no earlier block should be emitted while behind it. + for tip in 6..=10 { + tip_tx.send_replace(BlockNumber::from(tip)); + } + assert_eq!(stream.next().await.unwrap().unwrap(), from); + } + fn run(gaps: &[u32]) -> Result<(), ()> { let mut previous_gap = gaps.first().copied().unwrap_or(u32::MAX); let mut growth_run = 0u32; From 3f0af730612fda069dc854db11dec4ec42326a68 Mon Sep 17 00:00:00 2001 From: sergerad Date: Thu, 25 Jun 2026 12:09:19 +1200 Subject: [PATCH 3/3] Comment wrap --- crates/store/src/state/subscription.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/crates/store/src/state/subscription.rs b/crates/store/src/state/subscription.rs index fd082f3ee..20d9d2592 100644 --- a/crates/store/src/state/subscription.rs +++ b/crates/store/src/state/subscription.rs @@ -238,12 +238,11 @@ async fn run_stream_inner( loop { let mut tip = *tip_rx.borrow_and_update(); - // Support redrives: a producer may overwrite the current tip block in place (e.g. re-sign - // and re-commit it) without advancing the chain. Since the tip otherwise only moves - // forward, observing it at or below an already-emitted block (`tip < next`) means that - // block was rewritten, so rewind to re-emit it. The `tip >= from` guard preserves - // subscriptions that start from a future block, where `tip < next` instead means the - // requested start has not been reached yet. + // Support redrives: a producer may overwrite the current tip block in place. Since the tip + // otherwise only moves forward, observing it at or below an already-emitted block (`tip < + // next`) means that block was rewritten, so rewind to re-emit it. The `tip >= from` guard + // preserves subscriptions that start from a future block, where `tip < next` instead means + // the requested start has not been reached yet. if tip < next && tip >= from { next = tip; }