Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ impl grpc::server::validator_api::BlockSubscription for ValidatorService {

let from = BlockNumber::from(request.block_from);
let source = BlockStoreSource { block_store: self.block_store.clone() };
let stream = run_stream(from, self.committed_tip.subscribe(), source)
let stream = run_stream(from, self.finalized_tip.subscribe(), source)
.map(|event| event.map_err(subscription_error_to_status));

Ok(Box::pin(stream))
Expand Down
25 changes: 21 additions & 4 deletions bin/validator/src/server/validator_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,14 @@ pub(crate) struct ValidatorService {
/// Serializes `sign_block` requests so that concurrent calls are processed sequentially,
/// ensuring consistent chain tip reads and preventing race conditions.
sign_block_semaphore: Semaphore,
/// In-memory chain tip, updated after each signed block. Block subscriptions follow this to
/// stream live blocks as they are signed.
committed_tip: watch::Sender<BlockNumber>,
/// In-memory signed chain tip, updated after each signed block. The tip is provisional: it can
/// be replaced by a different block at the same height until a child block is signed on top of
/// it (see [`Self::finalized_tip`]).
signed_tip: watch::Sender<BlockNumber>,
/// In-memory finalized chain tip: the highest block that can no longer be replaced (the parent
/// of [`Self::signed_tip`]). Block subscriptions follow this so that only finalized blocks are
/// streamed downstream; the provisional tip is withheld until a child is signed on top of it.
finalized_tip: watch::Sender<BlockNumber>,
/// In-memory count of validated transactions, incremented after each new insert.
validated_transactions_count: AtomicU64,
/// In-memory count of signed blocks, incremented after each signed block.
Expand Down Expand Up @@ -102,12 +107,14 @@ impl ValidatorService {
});
}

let signed_tip = BlockNumber::from(initial_chain_tip);
Ok(Self {
signer,
db: db.into(),
block_store,
sign_block_semaphore: Semaphore::new(1),
committed_tip: watch::Sender::new(BlockNumber::from(initial_chain_tip)),
signed_tip: watch::Sender::new(signed_tip),
finalized_tip: watch::Sender::new(finalized_tip(signed_tip)),
validated_transactions_count: AtomicU64::new(initial_tx_count),
signed_blocks_count: AtomicU64::new(initial_block_count),
})
Expand Down Expand Up @@ -216,3 +223,13 @@ impl ValidatorService {
.map_err(|err| ValidatorError::BlockSigningFailed(err.to_string()))
}
}

/// Returns the finalized chain tip for a given signed chain tip.
///
/// The signed tip is provisional: it can be replaced by a different block at the same height until a
/// child block is signed on top of it. The finalized tip is therefore the parent of the signed tip,
/// since that parent can no longer be replaced. Genesis has no parent and is itself always final
/// (it cannot be replaced), so it maps to itself.
fn finalized_tip(signed_tip: BlockNumber) -> BlockNumber {
signed_tip.parent().unwrap_or(signed_tip)
}
14 changes: 9 additions & 5 deletions bin/validator/src/server/validator_service/sign_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use miden_protocol::block::{BlockNumber, ProposedBlock};
use miden_protocol::crypto::dsa::ecdsa_k256_keccak::Signature;
use miden_tx::utils::serde::{Deserializable, Serializable};

use super::ValidatorService;
use super::{ValidatorService, finalized_tip};
use crate::db::{load_chain_tip, upsert_block_header};

#[tonic::async_trait]
Expand Down Expand Up @@ -73,10 +73,14 @@ impl grpc::server::validator_api::SignBlock for ValidatorService {
))
})?;

// Update the in-memory counters after successful persistence. The block has already been
// backed up to the block store by `validate_block`, so it is available to subscribers by
// the time they observe this new tip.
self.committed_tip.send_replace(BlockNumber::from(new_block_num));
// Update the in-memory tips after successful persistence. The newly signed block becomes
// the provisional tip, and its parent becomes finalized: the parent can no longer be
// replaced, so it is now safe to stream downstream. On a replacement (same-height) block
// the finalized tip is unchanged, so subscribers never observe a block that was later
// replaced.
let signed_tip = BlockNumber::from(new_block_num);
self.signed_tip.send_replace(signed_tip);
self.finalized_tip.send_replace(finalized_tip(signed_tip));
self.signed_blocks_count.fetch_add(1, Ordering::Relaxed);

Ok((signature, block_commitment))
Expand Down
2 changes: 1 addition & 1 deletion bin/validator/src/server/validator_service/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ impl grpc::server::validator_api::Status for ValidatorService {
Ok(grpc::validator::ValidatorStatus {
version: env!("CARGO_PKG_VERSION").to_string(),
status: "OK".to_string(),
chain_tip: self.committed_tip.borrow().as_u32(),
chain_tip: self.signed_tip.borrow().as_u32(),
validated_transactions_count: self.validated_transactions_count.load(Ordering::Relaxed),
signed_blocks_count: self.signed_blocks_count.load(Ordering::Relaxed),
})
Expand Down
46 changes: 41 additions & 5 deletions bin/validator/src/server/validator_service/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -499,8 +499,8 @@ async fn validate_block_number_mismatch() {
);
}

/// A block subscription replays the backed-up blocks from the requested height and then streams
/// newly signed blocks as they arrive.
/// A block subscription replays the finalized backed-up blocks from the requested height and then
/// streams blocks live as they are finalized (i.e. once a child block is signed on top of them).
#[tokio::test]
async fn block_subscription_replays_then_follows() {
use std::time::Duration;
Expand All @@ -511,11 +511,14 @@ async fn block_subscription_replays_then_follows() {

let mut tv = TestValidator::new().await;

// Sign blocks 1 and 2 so the validator backs them up to its block store.
// Sign blocks 1, 2 and 3. The signed tip is 3, so blocks 1 and 2 are finalized while block 3
// remains the provisional, replaceable tip and is withheld from subscribers.
tv.apply_empty_block().await;
tv.apply_empty_block().await;
tv.apply_empty_block().await;

// Subscribe from the first signed block and confirm the backed-up blocks are replayed in order.
// Subscribe from the first signed block and confirm only the finalized blocks (1 and 2) are
// replayed; the provisional tip (block 3) is withheld.
let mut stream = tv.call_block_subscription(1).await;
for expected in 1..=2 {
let response = tokio::time::timeout(Duration::from_secs(5), stream.next())
Expand All @@ -528,7 +531,7 @@ async fn block_subscription_replays_then_follows() {
assert_eq!(response.committed_chain_tip, 2);
}

// Sign a new block and confirm it is streamed live to the existing subscriber.
// Sign a new block (4), finalizing block 3, and confirm block 3 is now streamed live.
tv.apply_empty_block().await;
let response = tokio::time::timeout(Duration::from_secs(5), stream.next())
.await
Expand All @@ -539,3 +542,36 @@ async fn block_subscription_replays_then_follows() {
assert_eq!(block.header().block_num().as_u32(), 3);
assert_eq!(response.committed_chain_tip, 3);
}

/// The provisional chain tip (the most recently signed block) must be withheld from subscribers
/// until a child block is signed on top of it, since it can still be replaced.
#[tokio::test]
async fn provisional_tip_is_withheld_from_subscribers() {
use std::time::Duration;

use miden_protocol::block::SignedBlock;
use miden_tx::utils::serde::Deserializable;
use tokio_stream::StreamExt;

let mut tv = TestValidator::new().await;

// Sign blocks 1 and 2. Block 1 is finalized (it has a child); block 2 is the provisional tip.
tv.apply_empty_block().await;
tv.apply_empty_block().await;

let mut stream = tv.call_block_subscription(1).await;

// Block 1 is finalized and streamed.
let response = tokio::time::timeout(Duration::from_secs(5), stream.next())
.await
.expect("finalized block should arrive promptly")
.expect("stream should not end")
.expect("stream item should not be an error");
let block = SignedBlock::read_from_bytes(&response.block).expect("valid signed block");
assert_eq!(block.header().block_num().as_u32(), 1);
assert_eq!(response.committed_chain_tip, 1);

// Block 2 is the provisional tip and must not be streamed while it remains replaceable.
let next = tokio::time::timeout(Duration::from_millis(500), stream.next()).await;
assert!(next.is_err(), "provisional tip (block 2) should be withheld from subscribers");
}
Loading