From 3a925ad77e5f36a607725c5224679c0ccd136ee6 Mon Sep 17 00:00:00 2001 From: sergerad Date: Tue, 23 Jun 2026 10:28:24 +1200 Subject: [PATCH 1/4] Check proof block numbers re committed and proven tips --- crates/block-producer/src/rpc_sync.rs | 30 +++++++++++++++++---------- crates/store/src/state/apply_proof.rs | 20 +++++++++++++++++- 2 files changed, 38 insertions(+), 12 deletions(-) diff --git a/crates/block-producer/src/rpc_sync.rs b/crates/block-producer/src/rpc_sync.rs index 4c913c2f7..2062ca30f 100644 --- a/crates/block-producer/src/rpc_sync.rs +++ b/crates/block-producer/src/rpc_sync.rs @@ -87,11 +87,6 @@ struct BlockSync { readiness: RpcReadiness, } -struct ProofSync { - state: Arc, - source_rpc: RpcClient, -} - impl BlockSync { async fn run(self) -> anyhow::Result<()> { (|| async { @@ -135,6 +130,11 @@ impl BlockSync { } } +struct ProofSync { + state: Arc, + source_rpc: RpcClient, +} + impl ProofSync { /// Synchronizes block proofs from an upstream RPC service. /// @@ -161,27 +161,35 @@ impl ProofSync { async fn sync(&self) -> anyhow::Result<()> { // Subscribe from next proven tip. - let starting_block = self.state.chain_tip(Finality::Proven).await.child().as_u32(); - info!(starting_block, "Connecting to upstream RPC for proofs"); + let starting_block = self.state.chain_tip(Finality::Proven).await.child(); + info!("Connecting to upstream RPC for proofs from {starting_block}"); let mut client = self.source_rpc.clone(); let mut stream = client - .proof_subscription(ProofSubscriptionRequest { block_from: starting_block }) + .proof_subscription(ProofSubscriptionRequest { block_from: starting_block.as_u32() }) .await? .into_inner(); + let mut expected = starting_block; let mut committed_tip_rx = self.state.subscribe_committed_tip(); while let Some(result) = stream.next().await { let event = result?; - let proven_tip = BlockNumber::from(event.block_num); + let block_num = BlockNumber::from(event.block_num); + + anyhow::ensure!( + block_num == expected, + "upstream sent out-of-sequence proof: expected block {expected}, got {block_num}", + ); // Ensure the block is committed before applying its proof so that proven tip never // exceeds committed tip. committed_tip_rx - .wait_for(|committed_tip| *committed_tip >= proven_tip) + .wait_for(|committed_tip| *committed_tip >= block_num) .await .context("committed tip channel closed")?; - self.state.apply_proof(proven_tip, event.proof).await?; + self.state.apply_proof(block_num, event.proof).await?; + + expected = expected.child(); } Ok(()) diff --git a/crates/store/src/state/apply_proof.rs b/crates/store/src/state/apply_proof.rs index 4fdcb24f5..6e5791cf6 100644 --- a/crates/store/src/state/apply_proof.rs +++ b/crates/store/src/state/apply_proof.rs @@ -1,17 +1,35 @@ +use anyhow::ensure; use miden_protocol::block::BlockNumber; use tracing::instrument; use crate::COMPONENT; -use crate::state::{ProofNotification, State}; +use crate::state::{Finality, ProofNotification, State}; impl State { /// Saves a block proof, advances the proven-in-sequence tip, and notifies replica subscribers. + /// + /// # Errors + /// + /// - If proofs are not applied in strict ascending order (exactly one block past the proven tip) + /// - If the proof's corresponding block was not already committed #[instrument(target = COMPONENT, skip_all, err, fields(block.number = block_num.as_u32()))] pub async fn apply_proof( &self, block_num: BlockNumber, proof_bytes: Vec, ) -> anyhow::Result<()> { + let expected = self.proven_tip.read().child(); + ensure!( + block_num == expected, + "out-of-sequence proof: expected block {expected}, got {block_num}", + ); + + let committed_tip = self.chain_tip(Finality::Committed).await; + ensure!( + block_num <= committed_tip, + "proof for uncommitted block {block_num} exceeds committed tip {committed_tip}", + ); + self.block_store.commit_proof(block_num, &proof_bytes).await?; self.proof_cache .push(block_num, ProofNotification::new(block_num, proof_bytes)) From fbf4a82e5dec1eaee4a7e30a7d29909fbfadb67d Mon Sep 17 00:00:00 2001 From: sergerad Date: Tue, 23 Jun 2026 11:54:19 +1200 Subject: [PATCH 2/4] Add assert for advance() --- crates/store/src/proven_tip.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/crates/store/src/proven_tip.rs b/crates/store/src/proven_tip.rs index bde5bdbb9..73f3227f8 100644 --- a/crates/store/src/proven_tip.rs +++ b/crates/store/src/proven_tip.rs @@ -3,8 +3,8 @@ use tokio::sync::watch; /// Cloneable handle that can advance the proven chain tip. /// -/// All clones share the same underlying watch channel, so any `advance()` call is immediately -/// visible to all receivers returned by `subscribe()`. +/// All clones share the same underlying watch channel, so any [`ProvenTipWriter::advance()`] call is immediately +/// visible to all receivers returned by [`ProvenTipWriter::subscribe()`]. #[derive(Clone)] pub struct ProvenTipWriter(watch::Sender); @@ -23,9 +23,14 @@ impl ProvenTipWriter { /// Advances the tip to `new_tip` if it is greater than the current value. /// /// Notifies all subscribers only when the tip actually increases. + /// + /// # Panics + /// + /// Panics if `new_tip` is greater than the current tip's child. pub fn advance(&self, new_tip: BlockNumber) { self.0.send_if_modified(|current| { if new_tip > *current { + assert_eq!(new_tip, current.child()); *current = new_tip; true } else { From 980ec8cd19c9d58ebbfb85f4bf0e653195d97154 Mon Sep 17 00:00:00 2001 From: sergerad Date: Tue, 23 Jun 2026 12:11:55 +1200 Subject: [PATCH 3/4] Verify block proof stub and fix test --- crates/store/src/proven_tip.rs | 16 ++++++++-------- crates/store/src/state/apply_proof.rs | 16 ++++++++++++++-- 2 files changed, 22 insertions(+), 10 deletions(-) diff --git a/crates/store/src/proven_tip.rs b/crates/store/src/proven_tip.rs index 73f3227f8..8256fbbd0 100644 --- a/crates/store/src/proven_tip.rs +++ b/crates/store/src/proven_tip.rs @@ -55,19 +55,19 @@ mod tests { assert_eq!(writer.read(), BlockNumber::from(5u32)); // Advancing to a higher value updates the tip. - writer.advance(BlockNumber::from(10u32)); - assert_eq!(writer.read(), BlockNumber::from(10u32)); + writer.advance(BlockNumber::from(6u32)); + assert_eq!(writer.read(), BlockNumber::from(6u32)); // Advancing to a lower value is a no-op. - writer.advance(BlockNumber::from(7u32)); - assert_eq!(writer.read(), BlockNumber::from(10u32)); + writer.advance(BlockNumber::from(3u32)); + assert_eq!(writer.read(), BlockNumber::from(6u32)); // Advancing to the same value is a no-op. - writer.advance(BlockNumber::from(10u32)); - assert_eq!(writer.read(), BlockNumber::from(10u32)); + writer.advance(BlockNumber::from(6u32)); + assert_eq!(writer.read(), BlockNumber::from(6u32)); // Advancing to a higher value again works. - writer.advance(BlockNumber::from(15u32)); - assert_eq!(writer.read(), BlockNumber::from(15u32)); + writer.advance(BlockNumber::from(7u32)); + assert_eq!(writer.read(), BlockNumber::from(7u32)); } } diff --git a/crates/store/src/state/apply_proof.rs b/crates/store/src/state/apply_proof.rs index 6e5791cf6..71df527bd 100644 --- a/crates/store/src/state/apply_proof.rs +++ b/crates/store/src/state/apply_proof.rs @@ -1,5 +1,6 @@ -use anyhow::ensure; -use miden_protocol::block::BlockNumber; +use anyhow::{Context, ensure}; +use miden_protocol::block::{BlockNumber, BlockProof}; +use miden_protocol::utils::serde::Deserializable; use tracing::instrument; use crate::COMPONENT; @@ -30,6 +31,8 @@ impl State { "proof for uncommitted block {block_num} exceeds committed tip {committed_tip}", ); + verify_block_proof(block_num, &proof_bytes)?; + self.block_store.commit_proof(block_num, &proof_bytes).await?; self.proof_cache .push(block_num, ProofNotification::new(block_num, proof_bytes)) @@ -38,3 +41,12 @@ impl State { Ok(()) } } + +/// Verifies that `proof_bytes` is a valid [`BlockProof`] for the block at `block_num`. +fn verify_block_proof(_block_num: BlockNumber, proof_bytes: &[u8]) -> anyhow::Result<()> { + let _proof = + BlockProof::read_from_bytes(proof_bytes).context("failed to deserialize block proof")?; + + // TODO: perform verification. + Ok(()) +} From 807140e13c438e7befedb73a2f78e86e5a71a51e Mon Sep 17 00:00:00 2001 From: sergerad Date: Tue, 23 Jun 2026 14:18:44 +1200 Subject: [PATCH 4/4] Check for validated tx in SubmitProvenTransaction --- bin/validator/src/db/mod.rs | 68 +++++++++++++++++++ .../submit_proven_transaction.rs | 17 ++++- 2 files changed, 83 insertions(+), 2 deletions(-) diff --git a/bin/validator/src/db/mod.rs b/bin/validator/src/db/mod.rs index a2246cfed..7668b06cf 100644 --- a/bin/validator/src/db/mod.rs +++ b/bin/validator/src/db/mod.rs @@ -89,6 +89,30 @@ pub(crate) fn insert_transaction( Ok(count) } +/// Returns whether a transaction with the given id has already been validated. +/// +/// # Raw SQL +/// +/// ```sql +/// SELECT EXISTS( +/// SELECT 1 +/// FROM validated_transactions +/// WHERE id = ? +/// ); +/// ``` +#[instrument(target = COMPONENT, skip(conn), err)] +pub(crate) fn transaction_exists( + conn: &mut SqliteConnection, + tx_id: TransactionId, +) -> Result { + let exists = diesel::select(exists( + schema::validated_transactions::table + .filter(schema::validated_transactions::id.eq(tx_id.to_bytes())), + )) + .get_result::(conn)?; + Ok(exists) +} + /// Scans the database for transaction Ids that do not exist. /// /// If the resulting vector is empty, all supplied transaction ids have been validated in the past. @@ -216,4 +240,48 @@ mod tests { setup(db_path.clone()).await.expect("setup should bootstrap the database"); load(db_path).await.expect("load should accept a bootstrapped database"); } + + #[tokio::test] + async fn transaction_exists_detects_validated_transactions() { + use miden_protocol::Word; + + let temp_dir = tempfile::tempdir().expect("failed to create temp directory"); + let db = setup(temp_dir.path().join("validator.sqlite3")).await.unwrap(); + + let validated_id = TransactionId::from_raw(Word::try_from([1u64, 2, 3, 4]).unwrap()); + let unknown_id = TransactionId::from_raw(Word::try_from([5u64, 6, 7, 8]).unwrap()); + + // Insert a row keyed by `validated_id`. Only the primary key matters for this query, so the + // remaining columns are filled with placeholder bytes. + let row = ValidatedTransactionRowInsert { + id: validated_id.to_bytes(), + block_num: 0, + account_id: vec![], + account_delta: vec![], + input_notes: vec![], + output_notes: vec![], + initial_account_hash: vec![], + final_account_hash: vec![], + fee: vec![], + }; + db.transact("insert_row", move |conn| -> Result { + Ok(diesel::insert_into(schema::validated_transactions::table) + .values(row) + .execute(conn)?) + }) + .await + .unwrap(); + + let validated_exists = db + .query("transaction_exists", move |conn| transaction_exists(conn, validated_id)) + .await + .unwrap(); + assert!(validated_exists, "an inserted transaction id should be reported as existing"); + + let unknown_exists = db + .query("transaction_exists", move |conn| transaction_exists(conn, unknown_id)) + .await + .unwrap(); + assert!(!unknown_exists, "an unknown transaction id should not be reported as existing"); + } } diff --git a/bin/validator/src/server/validator_service/submit_proven_transaction.rs b/bin/validator/src/server/validator_service/submit_proven_transaction.rs index d981fd90a..66133c245 100644 --- a/bin/validator/src/server/validator_service/submit_proven_transaction.rs +++ b/bin/validator/src/server/validator_service/submit_proven_transaction.rs @@ -8,7 +8,7 @@ use miden_tx::utils::serde::Deserializable; use tonic::Status; use super::ValidatorService; -use crate::db::insert_transaction; +use crate::db::{insert_transaction, transaction_exists}; use crate::tx_validation::validate_transaction; #[tonic::async_trait] @@ -17,7 +17,20 @@ impl grpc::server::validator_api::SubmitProvenTransaction for ValidatorService { type Output = (); async fn handle(&self, input: Self::Input) -> tonic::Result { - tracing::Span::current().set_attribute("transaction.id", input.tx.id()); + let tx_id = input.tx.id(); + tracing::Span::current().set_attribute("transaction.id", tx_id); + + // Short-circuit transactions that have already been validated. + let already_validated = self + .db + .query("transaction_exists", move |conn| transaction_exists(conn, tx_id)) + .await + .map_err(|err| { + Status::internal(err.as_report_context("Failed to query transaction")) + })?; + if already_validated { + return Ok(()); + } // Validate the transaction. let tx_info = validate_transaction(input.tx, input.inputs).await.map_err(|err| {