Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ fn subscription_error_to_status(err: SubscriptionStreamError<BlockSubscriptionEr
SubscriptionStreamError::TooSlow => {
Status::resource_exhausted("subscriber is too slow to keep up with the chain")
},
SubscriptionStreamError::TooFarAhead => Status::out_of_range(
"subscriber's requested starting block is too far ahead of the chain tip",
),
SubscriptionStreamError::Source(BlockSubscriptionError::NotFound(block_num)) => {
Status::not_found(format!("block {block_num} not found"))
},
Expand Down
6 changes: 6 additions & 0 deletions crates/rpc/src/server/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,9 @@ fn block_subscription_error_to_status(
SubscriptionStreamError::TooSlow => {
Status::resource_exhausted("subscriber is too slow to keep up with the chain")
},
SubscriptionStreamError::TooFarAhead => Status::out_of_range(
"subscriber's requested starting block is too far ahead of the chain tip",
),
SubscriptionStreamError::Source(BlockSubscriptionError::NotFound(block_num)) => {
Status::not_found(format!("block {block_num} not found"))
},
Expand All @@ -332,6 +335,9 @@ fn proof_subscription_error_to_status(
SubscriptionStreamError::TooSlow => {
Status::resource_exhausted("subscriber is too slow to keep up with the chain")
},
SubscriptionStreamError::TooFarAhead => Status::out_of_range(
"subscriber's requested starting block is too far ahead of the chain tip",
),
SubscriptionStreamError::Source(ProofSubscriptionError::NotFound(block_num)) => {
Status::not_found(format!("proof for block {block_num} not found"))
},
Expand Down
97 changes: 94 additions & 3 deletions crates/store/src/state/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ pub const SUBSCRIBER_CHANNEL_CAPACITY: usize = 32;
const SEND_TIMEOUT: Duration = Duration::from_secs(10);
/// Maximum running block-gap allowed before a subscriber is disconnected.
const MAX_RUNNING_GAP: u32 = 100u32;
/// Maximum gap between tip and subscriber's requested starting block where the starting block is
/// greater than the tip.
const MAX_FUTURE_GAP: u32 = 100u32;

// SUBSCRIPTION EVENTS
// ================================================================================================
Expand All @@ -44,6 +47,8 @@ pub struct ProofSubscriptionEvent {
pub enum SubscriptionStreamError<E> {
#[error("subscriber is too slow to keep up with the chain")]
TooSlow,
#[error("subscriber's requested starting block is too far ahead of the chain tip")]
TooFarAhead,
#[error(transparent)]
Source(#[from] E),
}
Expand Down Expand Up @@ -230,7 +235,12 @@ async fn run_stream_inner<S: SubscriptionSource>(
source: S,
) -> Result<(), SubscriptionStreamError<S::Error>> {
let mut next = from;
let mut previous_gap = tip_rx.borrow().as_u32();
let tip = tip_rx.borrow().as_u32();
if next.as_u32() > tip.saturating_add(MAX_FUTURE_GAP) {
return Err(SubscriptionStreamError::TooFarAhead);
}

let mut previous_gap = tip;
let mut running_gap = 0u32;
loop {
let mut tip = *tip_rx.borrow_and_update();
Expand All @@ -251,8 +261,15 @@ async fn run_stream_inner<S: SubscriptionSource>(
permit.send(Ok(source.build_event(next, data, tip)));
next = next.child();
}
if tip_rx.changed().await.is_err() {
return Ok(());
// Wait for the tip to advance, but also terminate promptly if the subscriber has
// disconnected (in case the tip is less than `next`).
tokio::select! {
changed = tip_rx.changed() => {
if changed.is_err() {
return Ok(());
}
},
() = tx.closed() => return Ok(()),
Comment on lines +264 to +272

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like this is just adding noise and is easier to understand without the branch. Waiting a few seconds for a block is fine before finding out if a subscriber is disconnected.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would prefer to keep it in here in case we ever need to increase the future block range allowed for some reason. @kkovaacs can tie break during his review

}
}
}
Expand Down Expand Up @@ -286,8 +303,82 @@ fn check_growing_gap(

#[cfg(test)]
mod tests {
use tokio_stream::StreamExt;

use super::*;

/// Minimal [`SubscriptionSource`] for lifetime tests. `fetch` is never reached when `from` is
/// ahead of the tip, so it just yields empty data.
struct MockSource;

impl SubscriptionSource for MockSource {
type Event = ();
type Error = BlockSubscriptionError;

async fn fetch(&self, _block_num: BlockNumber) -> Result<Vec<u8>, Self::Error> {
Ok(Vec::new())
}

fn build_event(&self, _block_num: BlockNumber, _data: Vec<u8>, _tip: BlockNumber) {}
}

/// A subscription whose `from` is ahead of the tip parks waiting for the tip to advance. When
/// the subscriber disconnects (the returned stream is dropped), the detached task must
/// terminate rather than leaking until the chain reaches `from`.
#[tokio::test]
async fn future_subscription_task_terminates_on_disconnect() {
let (tip_tx, tip_rx) = watch::channel(BlockNumber::GENESIS);

// `from` is far ahead of the tip, so the task never enters the send loop and parks on the
// tip. The spawned task holds the only watch receiver.
let stream = run_stream(BlockNumber::from(MAX_FUTURE_GAP - 1), tip_rx, MockSource);
assert_eq!(tip_tx.receiver_count(), 1, "the spawned task should hold the tip receiver");

// The client disconnects.
drop(stream);

// The task must observe the closed channel and drop its watch receiver. The tip never
// advances, so a task that only waited on tip changes would hang here.
tokio::time::timeout(Duration::from_secs(5), async {
while tip_tx.receiver_count() > 0 {
tokio::task::yield_now().await;
}
})
.await
.expect("subscription task must terminate after the subscriber disconnects");
}

#[tokio::test]
async fn starting_block_exceeds_future_gap_returns_too_far_ahead() {
let (_tip_tx, tip_rx) = watch::channel(BlockNumber::GENESIS);
let from = BlockNumber::from(MAX_FUTURE_GAP + 1);
let mut stream = run_stream(from, tip_rx, MockSource);

let item = tokio::time::timeout(Duration::from_secs(5), stream.next())
.await
.expect("stream must yield promptly")
.expect("stream must not end without an item");
assert!(matches!(item, Err(SubscriptionStreamError::TooFarAhead)));
}

#[tokio::test]
async fn starting_block_at_exact_future_gap_boundary_is_accepted() {
let (tip_tx, tip_rx) = watch::channel(BlockNumber::GENESIS);
// Exactly at the boundary: `from == tip + MAX_FUTURE_GAP` is NOT > tip + MAX_FUTURE_GAP, so
// the subscription must be accepted.
let from = BlockNumber::from(MAX_FUTURE_GAP);
let mut stream = run_stream(from, tip_rx, MockSource);

// Advance the tip to `from` so the task can produce an event rather than parking forever.
tip_tx.send(from).unwrap();

let item = tokio::time::timeout(Duration::from_secs(5), stream.next())
.await
.expect("stream must yield promptly")
.expect("stream must not end without an item");
assert!(matches!(item, Ok(())), "expected an event, not TooFarAhead: {item:?}");
}

fn run(gaps: &[u32]) -> Result<(), ()> {
let mut previous_gap = gaps.first().copied().unwrap_or(u32::MAX);
let mut growth_run = 0u32;
Expand Down
Loading