Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions crates/store/src/state/subscription.rs

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is true.

What is true is that the validator allows replacing the latest block it has personally signed. But such a "replaced" block is never committed and therefore should never be considered the chain tip.

As an example:

  1. Block N is proposed and
    1. submitted to the validator for signing
    2. signed by the validator
    3. committed by the sequencer
    4. N is now the new committed chain tip and may be sent to downstream subscribers
  2. Block N+1 is proposed
    1. submitted to the validator for signing
    2. signed by the validator
    3. the sequencer has some problem and discards it
    4. this block is never committed and is never streamed out
  3. A new N+1 is proposed
    1. submitted to the validator for signing
    2. signed by the validator, overwriting its local last signed block
    3. committed by the sequencer
    4. N+1 is now the new committed chain tip and may be sent to downstream

In other words, the validators chain tip is irrelevant. It is never streamed. Only the sequencer's committed chain tip is streamed.


A separate concern is what happens if we discover that some block M proof fails, or is rejected by L1. Then we need some sort of rollback mechanism but that is a different issue.

@sergerad sergerad Jun 25, 2026

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In other words, the validators chain tip is irrelevant. It is never streamed. Only the sequencer's committed chain tip is streamed.

We use the same machinery for streaming blocks for the validator backup stream and the RPC replica streams. This PR is trying to solve the problem we have ATM where a redriven Validator block would be ignored. So we would potentially have a bad backup (backup contains the wrong version of a block that was overwritten).

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The validator should be used for sequencer recovery. In other words, there cannot be a redriven block because there is no sequencer and therefore no new blocks.

Or if you prefer a stricter model - the validator should reject all new blocks while it has a subscription active. But I don't think we need to go that route because, again, this is intended as a last resort data recovery.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though maybe that isn't such a bad idea. Reject all block signing requests with cannot sign, recovery in progress.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there cannot be a redriven block because there is no sequencer and therefore no new blocks.

I don't understand what this means. The redrive is made by the sequencer in the first place - overwriting the N'th block (next-1).

Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,9 @@ impl SubscriptionSource for ProofSource {
/// subscriber with [`SubscriptionStreamError::TooSlow`] if it falls too far behind the tip or if a
/// single send blocks for longer than [`SEND_TIMEOUT`], which may occur only after the buffer has
/// [`SUBSCRIBER_CHANNEL_CAPACITY`] blocks queued.
///
/// Sources are allowed to re-drive earlier tips so consumers of the stream should allow for
/// overwrites so as to be idempotent.
pub fn run_stream<S: SubscriptionSource>(
from: BlockNumber,
tip_rx: watch::Receiver<BlockNumber>,
Expand All @@ -235,6 +238,15 @@ async fn run_stream_inner<S: SubscriptionSource>(
loop {
let mut tip = *tip_rx.borrow_and_update();

// Support redrives: a producer may overwrite the current tip block in place. Since the tip
// otherwise only moves forward, observing it at or below an already-emitted block (`tip <
// next`) means that block was rewritten, so rewind to re-emit it. The `tip >= from` guard
// preserves subscriptions that start from a future block, where `tip < next` instead means
// the requested start has not been reached yet.
if tip < next && tip >= from {
next = tip;
}

let current_gap = tip.saturating_sub(next.as_u32()).as_u32();
(previous_gap, running_gap) =
check_growing_gap(current_gap, previous_gap, running_gap, MAX_RUNNING_GAP)
Expand Down Expand Up @@ -286,8 +298,72 @@ fn check_growing_gap(

#[cfg(test)]
mod tests {
use tokio_stream::StreamExt;

use super::*;

/// Test [`SubscriptionSource`] that emits the requested block number, so emissions can be
/// asserted directly against the driving tips.
struct NumberSource;

impl SubscriptionSource for NumberSource {
type Event = BlockNumber;
type Error = std::convert::Infallible;

async fn fetch(&self, block_num: BlockNumber) -> Result<Vec<u8>, Self::Error> {
Ok(block_num.as_u32().to_le_bytes().to_vec())
}

fn build_event(
&self,
block_num: BlockNumber,
_data: Vec<u8>,
_tip: BlockNumber,
) -> BlockNumber {
block_num
}
}

/// A redrive overwrites the current tip block in place, and the subscriber re-emits it.
///
/// Each emission is awaited before driving the next tip so the spawned stream task observes
/// every tip update individually rather than coalescing them.
#[tokio::test]
async fn redrive_re_emits_the_current_tip_block() {
let (tip_tx, tip_rx) = watch::channel(BlockNumber::GENESIS);
let mut stream = Box::pin(run_stream(BlockNumber::GENESIS, tip_rx, NumberSource));

// Genesis is emitted from the initial tip.
assert_eq!(stream.next().await.unwrap().unwrap(), BlockNumber::GENESIS);

// Drive block 1.
tip_tx.send_replace(BlockNumber::from(1u32));
assert_eq!(stream.next().await.unwrap().unwrap(), BlockNumber::from(1u32));

// Redrive block 1: the tip does not advance, but block 1 is emitted again.
tip_tx.send_replace(BlockNumber::from(1u32));
assert_eq!(stream.next().await.unwrap().unwrap(), BlockNumber::from(1u32));

// Drive block 2: the stream resumes forward progress without re-emitting block 1.
tip_tx.send_replace(BlockNumber::from(2u32));
assert_eq!(stream.next().await.unwrap().unwrap(), BlockNumber::from(2u32));
}

/// A subscription that starts ahead of the tip waits for the tip to reach `from` rather than
/// rewinding to re-emit earlier blocks.
#[tokio::test]
async fn subscription_from_future_block_does_not_rewind() {
let from = BlockNumber::from(10u32);
let (tip_tx, tip_rx) = watch::channel(BlockNumber::from(5u32));
let mut stream = Box::pin(run_stream(from, tip_rx, NumberSource));

// Advance the tip up to `from`; no earlier block should be emitted while behind it.
for tip in 6..=10 {
tip_tx.send_replace(BlockNumber::from(tip));
}
assert_eq!(stream.next().await.unwrap().unwrap(), from);
}

fn run(gaps: &[u32]) -> Result<(), ()> {
let mut previous_gap = gaps.first().copied().unwrap_or(u32::MAX);
let mut growth_run = 0u32;
Expand Down
Loading