diff --git a/bin/node/src/commands/mod.rs b/bin/node/src/commands/mod.rs index bb616e1b2..ded156bdf 100644 --- a/bin/node/src/commands/mod.rs +++ b/bin/node/src/commands/mod.rs @@ -16,7 +16,6 @@ pub use recover::RecoverCommand; const ENV_DATA_DIRECTORY: &str = "MIDEN_NODE_DATA_DIRECTORY"; #[derive(Subcommand, Debug)] -#[expect(clippy::large_enum_variant, reason = "cli is a once-off usage")] pub enum Command { /// Start the node in sequencer mode. /// diff --git a/bin/node/src/commands/modes.rs b/bin/node/src/commands/modes.rs index 0381afde3..0f79ad8b8 100644 --- a/bin/node/src/commands/modes.rs +++ b/bin/node/src/commands/modes.rs @@ -1,12 +1,19 @@ +use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; use anyhow::Context; use miden_node_block_producer::{DEFAULT_VALIDATOR_TIMEOUT, Sequencer}; -use miden_node_proto::clients::{Builder, NtxBuilderClient, RpcClient, ValidatorClient}; -use miden_node_rpc::{Rpc, RpcMode}; +use miden_node_proto::clients::{ + Builder, + NtxBuilderClient, + RpcClient, + SequencerClient, + ValidatorClient, +}; +use miden_node_rpc::{Rpc, RpcMode, SequencerInternal}; use miden_node_store::State; -use miden_node_utils::clap::duration_to_human_readable_string; +use miden_node_utils::clap::{GrpcOptionsInternal, duration_to_human_readable_string}; use miden_node_utils::tasks::Tasks; use tokio::net::TcpListener; use url::Url; @@ -32,6 +39,14 @@ pub struct SequencerCommand { #[command(flatten)] pub store: StoreOptions, + + /// Socket address at which to serve the internal sequencer API. + #[arg( + long = "internal.listen", + env = "MIDEN_NODE_SEQUENCER_INTERNAL_LISTEN", + value_name = "LISTEN" + )] + pub internal: Option, } impl SequencerCommand { @@ -63,7 +78,10 @@ impl SequencerCommand { let rpc = Rpc { listener: bind_rpc(runtime.rpc_listen).await?, store: state, - mode: RpcMode::sequencer(block_producer, self.external_services.validator_client()?), + mode: RpcMode::sequencer( + block_producer.clone(), + self.external_services.validator_client()?, + ), ntx_builder: Some(self.external_services.ntx_builder_client()?), grpc_options: runtime.external_grpc_options, network_tx_auth, @@ -71,6 +89,14 @@ impl SequencerCommand { let mut tasks = Tasks::new(); tasks.spawn("sequencer", sequencer.wait()); tasks.spawn("RPC server", rpc.serve()); + if let Some(internal_listen) = self.internal { + let sequencer_internal = SequencerInternal { + listener: bind_rpc(internal_listen).await?, + block_producer, + grpc_options: GrpcOptionsInternal::from(runtime.external_grpc_options), + }; + tasks.spawn("sequencer internal server", sequencer_internal.serve()); + } tasks.join_next_as_error().await } @@ -132,12 +158,32 @@ pub struct FullNodeCommand { #[command(flatten)] pub store: StoreOptions, + + /// The validator service gRPC URL. + #[arg( + long = "validator.url", + env = "MIDEN_NODE_VALIDATOR_URL", + value_name = "URL", + requires = "sequencer_url" + )] + pub validator_url: Option, + + /// The sequencer's internal service gRPC URL. + #[arg( + long = "sequencer.internal.url", + env = "MIDEN_NODE_SEQUENCER_INTERNAL_URL", + value_name = "URL", + requires = "validator_url" + )] + pub sequencer_url: Option, } impl FullNodeCommand { pub async fn handle(self) -> anyhow::Result<()> { let runtime = self.runtime.runtime_config(&self.store); let source_rpc = self.sync.source_rpc_client()?; + let validator_client = self.validator_client(); + let sequencer_client = self.sequencer_client(); let network_tx_auth = self.runtime.rpc.network_tx_auth()?; let state = load_state(&runtime).await?; let _disk_monitor = state.spawn_disk_monitor(); @@ -145,7 +191,12 @@ impl FullNodeCommand { let rpc = Rpc { listener: bind_rpc(runtime.rpc_listen).await?, store: state, - mode: RpcMode::full_node(source_rpc, self.sync.readiness_threshold), + mode: RpcMode::full_node( + source_rpc, + self.sync.readiness_threshold, + validator_client, + sequencer_client, + ), ntx_builder: None, grpc_options: runtime.external_grpc_options, network_tx_auth, @@ -155,6 +206,32 @@ impl FullNodeCommand { tasks.join_next_as_error().await } + + fn sequencer_client(&self) -> Option { + self.sequencer_url.as_ref().map(|url| { + Builder::new(url.clone()) + .with_tls() + .expect("TLS is enabled") + .with_timeout(Duration::from_secs(5)) + .without_metadata_version() + .without_metadata_genesis() + .with_otel_context_injection() + .connect_lazy::() + }) + } + + fn validator_client(&self) -> Option { + self.validator_url.as_ref().map(|url| { + Builder::new(url.clone()) + .with_tls() + .expect("TLS is enabled") + .with_timeout(Duration::from_secs(5)) + .without_metadata_version() + .without_metadata_genesis() + .with_otel_context_injection() + .connect_lazy::() + }) + } } impl SyncOptions { @@ -181,7 +258,7 @@ async fn load_state(runtime: &RuntimeConfig) -> anyhow::Result> { Ok(Arc::new(state)) } -async fn bind_rpc(listen: std::net::SocketAddr) -> anyhow::Result { +async fn bind_rpc(listen: SocketAddr) -> anyhow::Result { TcpListener::bind(listen) .await .with_context(|| format!("failed to bind RPC listener to {listen}")) diff --git a/bin/node/src/commands/recover.rs b/bin/node/src/commands/recover.rs index 74af76f56..78def7341 100644 --- a/bin/node/src/commands/recover.rs +++ b/bin/node/src/commands/recover.rs @@ -55,7 +55,7 @@ impl RecoverCommand { fn validator_client(&self) -> anyhow::Result { Ok(Builder::new(self.validator_url.clone()) .with_tls()? - .with_timeout(Duration::from_secs(20)) + .with_timeout(Duration::from_secs(5)) .without_metadata_version() .without_metadata_genesis() .with_otel_context_injection() diff --git a/crates/block-producer/src/domain/transaction.rs b/crates/block-producer/src/domain/transaction.rs index d8f06e17e..a91a7194f 100644 --- a/crates/block-producer/src/domain/transaction.rs +++ b/crates/block-producer/src/domain/transaction.rs @@ -1,11 +1,14 @@ use std::collections::HashSet; use std::sync::Arc; +use miden_node_proto::errors::ConversionError; +use miden_node_proto::generated::sequencer; use miden_protocol::Word; use miden_protocol::account::AccountId; use miden_protocol::block::BlockNumber; use miden_protocol::note::Nullifier; use miden_protocol::transaction::{ProvenTransaction, TransactionId, TxAccountUpdate}; +use miden_protocol::utils::serde::{Deserializable, Serializable}; use crate::errors::StateConflict; use crate::store::TransactionInputs; @@ -132,6 +135,48 @@ impl AuthenticatedTransaction { } } +// PROTO CONVERSIONS +// ================================================================================================ + +impl From for sequencer::AuthenticatedTransaction { + fn from(value: AuthenticatedTransaction) -> Self { + Self { + transaction: value.inner.to_bytes(), + store_account_state: value.store_account_state.map(Into::into), + notes_authenticated_by_store: value + .notes_authenticated_by_store + .into_iter() + .map(Into::into) + .collect(), + authentication_height: value.authentication_height.as_u32(), + } + } +} + +impl TryFrom for AuthenticatedTransaction { + type Error = ConversionError; + + fn try_from(value: sequencer::AuthenticatedTransaction) -> Result { + let inner = ProvenTransaction::read_from_bytes(&value.transaction) + .map_err(|err| ConversionError::deserialization("ProvenTransaction", err))?; + + let store_account_state = value.store_account_state.map(Word::try_from).transpose()?; + + let notes_authenticated_by_store = value + .notes_authenticated_by_store + .into_iter() + .map(Word::try_from) + .collect::, _>>()?; + + Ok(Self { + inner: Arc::new(inner), + store_account_state, + notes_authenticated_by_store, + authentication_height: value.authentication_height.into(), + }) + } +} + #[cfg(test)] impl AuthenticatedTransaction { //! Builder methods intended for easier test setup. @@ -157,18 +202,21 @@ impl AuthenticatedTransaction { } /// Overrides the authentication height with the given value. + #[must_use] pub fn with_authentication_height(mut self, height: BlockNumber) -> Self { self.authentication_height = height; self } /// Overrides the store state with the given value. + #[must_use] pub fn with_store_state(mut self, state: Word) -> Self { self.store_account_state = Some(state); self } /// Unsets the store state. + #[must_use] pub fn with_empty_store_state(mut self) -> Self { self.store_account_state = None; self diff --git a/crates/block-producer/src/errors.rs b/crates/block-producer/src/errors.rs index 21201196e..290be8df7 100644 --- a/crates/block-producer/src/errors.rs +++ b/crates/block-producer/src/errors.rs @@ -41,6 +41,10 @@ pub enum MempoolSubmissionError { #[grpc(internal)] StoreStateReadFailed(#[source] StoreError), + #[error("failed to authenticate transaction")] + #[grpc(internal)] + AuthenticationFailed(#[source] StateConflict), + #[error( "transaction input data from block {input_block} is rejected as stale because it is older than the limit of {stale_limit}" )] diff --git a/crates/block-producer/src/lib.rs b/crates/block-producer/src/lib.rs index 6559f5f57..c1823173f 100644 --- a/crates/block-producer/src/lib.rs +++ b/crates/block-producer/src/lib.rs @@ -21,6 +21,7 @@ pub mod errors; mod errors; pub mod server; +pub use domain::transaction::AuthenticatedTransaction; pub use errors::MempoolSubmissionError; pub use proof_scheduler::DEFAULT_MAX_CONCURRENT_PROOFS; pub use rpc_sync::{RpcReadiness, RpcSync}; diff --git a/crates/block-producer/src/server/mod.rs b/crates/block-producer/src/server/mod.rs index 39c827d25..ec3621a44 100644 --- a/crates/block-producer/src/server/mod.rs +++ b/crates/block-producer/src/server/mod.rs @@ -20,6 +20,7 @@ use crate::block_prover::BlockProver; use crate::domain::transaction::AuthenticatedTransaction; use crate::errors::MempoolSubmissionError; use crate::mempool::{BatchBudget, BlockBudget, Mempool, MempoolConfig, SharedMempool}; +use crate::store::{TransactionInputs, get_tx_inputs}; use crate::validator::BlockProducerValidatorClient; use crate::{ CACHED_MEMPOOL_STATS_UPDATE_INTERVAL, @@ -285,16 +286,13 @@ impl BlockProducerApi { skip_all, err )] - #[expect(clippy::let_and_return)] pub async fn submit_proven_tx( &self, tx: ProvenTransaction, ) -> Result { - let tx_id = tx.id(); - debug!( target: COMPONENT, - tx_id = %tx_id.to_hex(), + tx_id = %tx.id().to_hex(), account_id = %tx.account_id().to_hex(), initial_state_commitment = %tx.account_update().initial_state_commitment(), final_state_commitment = %tx.account_update().final_state_commitment(), @@ -305,21 +303,34 @@ impl BlockProducerApi { ); debug!(target: COMPONENT, proof = ?tx.proof()); - let inputs = crate::store::get_tx_inputs(&self.store, &tx) + // Authenticate against the local store, then add to the mempool. + let inputs = get_tx_inputs(&self.store, &tx) .await .map_err(MempoolSubmissionError::StoreStateReadFailed)?; - // SAFETY: we assume that the rpc component has verified the transaction proof already. - let tx = AuthenticatedTransaction::new_unchecked(Arc::new(tx), inputs) - .map(Arc::new) - .map_err(MempoolSubmissionError::StateConflict)?; + let tx = AuthenticatedTransaction::new_unchecked(tx.into(), inputs) + .map_err(MempoolSubmissionError::AuthenticationFailed)?; + self.submit_authenticated_tx(tx).await + } + /// Adds a transaction that has already been authenticated. + #[instrument( + target = COMPONENT, + name = "block_producer.api.submit_authenticated_tx", + skip_all, + err + )] + #[expect(clippy::let_and_return, reason = "required to lengthen arc lifetime")] + pub async fn submit_authenticated_tx( + &self, + tx: AuthenticatedTransaction, + ) -> Result { let shared_mempool = self.mempool.lock().await; // We need the let binding here to avoid E0597 `shared_mempool` does not live long enough let result = shared_mempool .lock() .map_err(MempoolSubmissionError::MempoolPoisoned)? - .add_transaction(tx); + .add_transaction(tx.into()); result } @@ -329,7 +340,6 @@ impl BlockProducerApi { skip_all, err )] - #[expect(clippy::let_and_return)] pub async fn submit_proven_tx_batch( &self, batch: ProposedBatch, @@ -337,12 +347,46 @@ impl BlockProducerApi { // We assume that the rpc component has verified everything, including the transaction // proofs. - let mut txs = Vec::with_capacity(batch.transactions().len()); + // Authenticate each transaction against the local store, then add the batch to the mempool. + let mut inputs = Vec::with_capacity(batch.transactions().len()); for tx in batch.transactions() { - let inputs = crate::store::get_tx_inputs(&self.store, tx) - .await - .map_err(MempoolSubmissionError::StoreStateReadFailed)?; + inputs.push( + get_tx_inputs(&self.store, tx) + .await + .map_err(MempoolSubmissionError::StoreStateReadFailed)?, + ); + } + self.submit_authenticated_tx_batch(batch, inputs).await + } + + /// Adds a batch whose transactions have already been authenticated against the store to the + /// mempool. + /// + /// # Panics + /// + /// Panics if the number of transactions in `batch` does not match the number of inputs in + /// `inputs`. + #[instrument( + target = COMPONENT, + name = "block_producer.api.submit_authenticated_tx_batch", + skip_all, + err + )] + #[expect(clippy::let_and_return)] + pub async fn submit_authenticated_tx_batch( + &self, + batch: ProposedBatch, + inputs: Vec, + ) -> Result { + assert_eq!( + batch.transactions().len(), + inputs.len(), + "transaction inputs must match the batch's transactions" + ); + + let mut txs = Vec::with_capacity(batch.transactions().len()); + for (tx, inputs) in batch.transactions().iter().zip(inputs) { // SAFETY: We assume that the rpc component has verified the transaction proofs, as well // as the batch integrity itself. let tx = AuthenticatedTransaction::new_unchecked(Arc::clone(tx), inputs) diff --git a/crates/block-producer/src/store/mod.rs b/crates/block-producer/src/store/mod.rs index dbd896e8e..df3bdb94e 100644 --- a/crates/block-producer/src/store/mod.rs +++ b/crates/block-producer/src/store/mod.rs @@ -3,6 +3,8 @@ use std::fmt::{Display, Formatter}; use std::num::NonZeroU32; use itertools::Itertools; +use miden_node_proto::errors::ConversionError; +use miden_node_proto::generated::sequencer; use miden_node_store::state::{Finality, State, TransactionInputs as StoreTransactionInputs}; use miden_node_utils::formatting::format_opt; use miden_protocol::Word; @@ -65,6 +67,73 @@ impl TransactionInputs { } } +// PROTO CONVERSIONS +// ------------------------------------------------------------------------------------------------ + +impl From for sequencer::AuthInputs { + fn from(value: TransactionInputs) -> Self { + Self { + account_id: Some(value.account_id.into()), + account_commitment: value.account_commitment.map(Into::into), + nullifiers: value + .nullifiers + .into_iter() + .map(|(nullifier, block_num)| sequencer::NullifierRecord { + nullifier: Some(nullifier.into()), + block_num: block_num.map_or(0, NonZeroU32::get), + }) + .collect(), + found_unauthenticated_notes: value + .found_unauthenticated_notes + .into_iter() + .map(Into::into) + .collect(), + current_block_height: value.current_block_height.as_u32(), + } + } +} + +impl TryFrom for TransactionInputs { + type Error = ConversionError; + + fn try_from(value: sequencer::AuthInputs) -> Result { + let account_id = value + .account_id + .ok_or_else(|| ConversionError::missing_field::("account_id"))? + .try_into()?; + + let account_commitment = value.account_commitment.map(Word::try_from).transpose()?; + + let nullifiers = value + .nullifiers + .into_iter() + .map(|record| { + let nullifier = record + .nullifier + .ok_or_else(|| { + ConversionError::missing_field::("nullifier") + })? + .try_into()?; + Ok((nullifier, NonZeroU32::new(record.block_num))) + }) + .collect::>()?; + + let found_unauthenticated_notes = value + .found_unauthenticated_notes + .into_iter() + .map(Word::try_from) + .collect::>()?; + + Ok(Self { + account_id, + account_commitment, + nullifiers, + found_unauthenticated_notes, + current_block_height: value.current_block_height.into(), + }) + } +} + impl Display for TransactionInputs { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { let nullifiers = self @@ -91,6 +160,18 @@ impl Display for TransactionInputs { // STORE STATE // ================================================================================================ +/// Authenticates a proven transaction against the store, returning the [`TransactionInputs`] +/// needed to admit it to the mempool. +/// +/// This reads the committed state relevant to the transaction: the account's current commitment, +/// the consumption status of each of the transaction's nullifiers, and which of its unauthenticated +/// input notes have since been committed. The result is captured at the store's current committed +/// chain tip. +/// +/// # Errors +/// +/// Returns an error if the store query fails, or if the transaction creates a new account whose ID +/// prefix already exists in the store. #[instrument(target = COMPONENT, name = "store.state.get_tx_inputs", skip_all, err)] pub async fn get_tx_inputs( state: &State, diff --git a/crates/proto/build.rs b/crates/proto/build.rs index bf72f20c4..08b6aa5cb 100644 --- a/crates/proto/build.rs +++ b/crates/proto/build.rs @@ -8,6 +8,7 @@ use miden_node_proto_build::{ ntx_builder_api_descriptor, remote_prover_api_descriptor, rpc_api_descriptor, + sequencer_api_descriptor, validator_api_descriptor, }; use miette::{Context, IntoDiagnostic}; @@ -29,6 +30,7 @@ fn main() -> miette::Result<()> { remote_prover_api_descriptor(), validator_api_descriptor(), ntx_builder_api_descriptor(), + sequencer_api_descriptor(), ]; for file_descriptors in &descriptor_sets { diff --git a/crates/proto/src/clients/mod.rs b/crates/proto/src/clients/mod.rs index b88939db4..4ac8a2605 100644 --- a/crates/proto/src/clients/mod.rs +++ b/crates/proto/src/clients/mod.rs @@ -32,6 +32,8 @@ use std::time::Duration; use http::header::ACCEPT; use miden_node_utils::tracing::grpc::OtelInterceptor; +use miden_protocol::batch::ProposedBatch; +use miden_protocol::utils::serde::Serializable; use tonic::metadata::AsciiMetadataValue; use tonic::service::interceptor::InterceptedService; use tonic::transport::{Channel, ClientTlsConfig, Endpoint, Error as TransportError}; @@ -161,6 +163,7 @@ type GeneratedProxyStatusClient = type GeneratedProverClient = generated::remote_prover::api_client::ApiClient; type GeneratedValidatorClient = generated::validator::api_client::ApiClient; type GeneratedNtxBuilderClient = generated::ntx_builder::api_client::ApiClient; +type GeneratedSequencerClient = generated::sequencer::api_client::ApiClient; // gRPC CLIENTS // ================================================================================================ @@ -175,6 +178,8 @@ pub struct RemoteProverClient(GeneratedProverClient); pub struct ValidatorClient(GeneratedValidatorClient); #[derive(Debug, Clone)] pub struct NtxBuilderClient(GeneratedNtxBuilderClient); +#[derive(Debug, Clone)] +pub struct SequencerClient(GeneratedSequencerClient); impl DerefMut for RpcClient { fn deref_mut(&mut self) -> &mut Self::Target { @@ -246,6 +251,20 @@ impl Deref for NtxBuilderClient { } } +impl DerefMut for SequencerClient { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +impl Deref for SequencerClient { + type Target = GeneratedSequencerClient; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + // GRPC CLIENT BUILDER TRAIT // ================================================================================================ @@ -284,6 +303,12 @@ impl GrpcClient for NtxBuilderClient { } } +impl GrpcClient for SequencerClient { + fn with_interceptor(channel: Channel, interceptor: Interceptor) -> Self { + Self(GeneratedSequencerClient::new(InterceptedService::new(channel, interceptor))) + } +} + // STRICT TYPE-SAFE BUILDER (NO DEFAULTS) // ================================================================================================ @@ -486,3 +511,30 @@ impl Builder { T::with_interceptor(channel, interceptor) } } + +impl ValidatorClient { + /// Submits each transaction in the batch to the validator for re-execution. + /// + /// # Errors + /// + /// - If `transaction_inputs` does not match the batch's transactions in length + pub async fn submit_batch( + &mut self, + proposed_batch: &ProposedBatch, + transaction_inputs: &[Vec], + ) -> Result<(), Status> { + if proposed_batch.transactions().len() != transaction_inputs.len() { + return Err(Status::invalid_argument( + "transaction inputs do not match the batch's transactions", + )); + } + for (tx, inputs) in proposed_batch.transactions().iter().zip(transaction_inputs) { + let proven_tx = generated::transaction::ProvenTransaction { + transaction: tx.to_bytes(), + transaction_inputs: Some(inputs.clone()), + }; + self.submit_proven_transaction(proven_tx).await?; + } + Ok(()) + } +} diff --git a/crates/rpc/src/lib.rs b/crates/rpc/src/lib.rs index 4254d631e..0c53705a9 100644 --- a/crates/rpc/src/lib.rs +++ b/crates/rpc/src/lib.rs @@ -2,7 +2,7 @@ mod server; #[cfg(test)] mod tests; -pub use server::{Rpc, RpcMode}; +pub use server::{Rpc, RpcMode, SequencerInternal}; // CONSTANTS // ================================================================================================= diff --git a/crates/rpc/src/server/api.rs b/crates/rpc/src/server/api.rs index 50c44f6de..f4a4393bf 100644 --- a/crates/rpc/src/server/api.rs +++ b/crates/rpc/src/server/api.rs @@ -6,6 +6,7 @@ use std::task::{Context as TaskContext, Poll}; use std::time::{Duration, Instant}; use anyhow::Context as AnyhowContext; +use miden_node_block_producer::BlockProducerApi; use miden_node_proto::clients::NtxBuilderClient; use miden_node_proto::domain::block::InvalidBlockRange; use miden_node_proto::generated::rpc::MempoolStats as ProtoMempoolStats; @@ -267,6 +268,13 @@ impl RpcService { } } +// INTERNAL SEQUENCER SERVICE +// ================================================================================================ + +pub(crate) struct SequencerInternalService { + pub(crate) block_producer: BlockProducerApi, +} + // API IMPLEMENTATION // ================================================================================================ @@ -280,6 +288,8 @@ mod get_note_script_by_root; mod get_notes_by_id; mod proof_subscription; mod status; +mod submit_auth_tx; +mod submit_auth_tx_batch; mod submit_proven_tx; mod submit_proven_tx_batch; mod subscription_ban; diff --git a/crates/rpc/src/server/api/submit_auth_tx.rs b/crates/rpc/src/server/api/submit_auth_tx.rs new file mode 100644 index 000000000..4f994129f --- /dev/null +++ b/crates/rpc/src/server/api/submit_auth_tx.rs @@ -0,0 +1,31 @@ +use miden_node_block_producer::AuthenticatedTransaction; +use miden_node_proto::generated as proto; +use miden_node_proto::generated::server::sequencer_api; +use miden_node_utils::ErrorReport; +use tonic::Status; + +use super::SequencerInternalService; + +#[tonic::async_trait] +impl sequencer_api::SubmitAuthenticatedTx for SequencerInternalService { + type Input = AuthenticatedTransaction; + type Output = proto::blockchain::BlockNumber; + + fn decode(request: proto::sequencer::AuthenticatedTransaction) -> tonic::Result { + AuthenticatedTransaction::try_from(request).map_err(|err| { + Status::invalid_argument(err.as_report_context("invalid authenticated transaction")) + }) + } + + fn encode(output: Self::Output) -> tonic::Result { + Ok(output) + } + + async fn handle(&self, tx: Self::Input) -> tonic::Result { + self.block_producer + .submit_authenticated_tx(tx) + .await + .map(Into::into) + .map_err(Into::into) + } +} diff --git a/crates/rpc/src/server/api/submit_auth_tx_batch.rs b/crates/rpc/src/server/api/submit_auth_tx_batch.rs new file mode 100644 index 000000000..4e63a7677 --- /dev/null +++ b/crates/rpc/src/server/api/submit_auth_tx_batch.rs @@ -0,0 +1,54 @@ +use miden_node_block_producer::store::TransactionInputs; +use miden_node_proto::generated as proto; +use miden_node_proto::generated::server::sequencer_api; +use miden_node_utils::ErrorReport; +use miden_protocol::batch::ProposedBatch; +use miden_protocol::utils::serde::Deserializable; +use tonic::Status; + +use super::SequencerInternalService; + +#[tonic::async_trait] +impl sequencer_api::SubmitAuthenticatedTxBatch for SequencerInternalService { + type Input = (ProposedBatch, Vec); + type Output = proto::blockchain::BlockNumber; + + fn decode( + request: proto::sequencer::AuthenticatedTransactionBatch, + ) -> tonic::Result { + let batch = ProposedBatch::read_from_bytes(&request.proposed_batch).map_err(|err| { + Status::invalid_argument(err.as_report_context("invalid proposed_batch")) + })?; + + if batch.transactions().len() != request.auth_inputs.len() { + return Err(Status::invalid_argument(format!( + "Number of inputs {} does not match number of transactions {} in batch", + request.auth_inputs.len(), + batch.transactions().len() + ))); + } + + let inputs = request + .auth_inputs + .into_iter() + .map(TransactionInputs::try_from) + .collect::, _>>() + .map_err(|err| { + Status::invalid_argument(err.as_report_context("invalid auth_inputs")) + })?; + + Ok((batch, inputs)) + } + + fn encode(output: Self::Output) -> tonic::Result { + Ok(output) + } + + async fn handle(&self, (batch, inputs): Self::Input) -> tonic::Result { + self.block_producer + .submit_authenticated_tx_batch(batch, inputs) + .await + .map(Into::into) + .map_err(Into::into) + } +} diff --git a/crates/rpc/src/server/api/submit_proven_tx.rs b/crates/rpc/src/server/api/submit_proven_tx.rs index b672f3dbc..26d5fb9c0 100644 --- a/crates/rpc/src/server/api/submit_proven_tx.rs +++ b/crates/rpc/src/server/api/submit_proven_tx.rs @@ -1,3 +1,6 @@ +use miden_node_block_producer::AuthenticatedTransaction; +use miden_node_block_producer::store::get_tx_inputs; +use miden_node_proto::clients::{SequencerClient, ValidatorClient}; use miden_node_proto::generated as proto; use miden_node_utils::ErrorReport; use miden_node_utils::spawn::spawn_blocking_in_current_span; @@ -130,38 +133,87 @@ impl proto::server::rpc_api::SubmitProvenTx for RpcService { Status::internal(format!("transaction proof verification task failed: {err}")) })??; - // In full node mode we forward the request to the source. - let (block_producer, validator) = match &self.mode { + match &self.mode { RpcMode::Sequencer { block_producer, validator } => { - (block_producer.as_ref(), validator.as_ref()) + validator.clone().submit_proven_transaction(request.clone()).await?; + block_producer + .submit_proven_tx(rebuilt_tx) + .await + .map(Into::into) + .map_err(Into::into) }, - RpcMode::FullNode { source_rpc, .. } => { - let mut forwarded_request = Request::new(request); - if let Some(accept) = original_accept_header { - forwarded_request.metadata_mut().insert(http::header::ACCEPT.as_str(), accept); + RpcMode::FullNode { source_rpc, validator, sequencer, .. } => { + match (validator, sequencer) { + (Some(validator), Some(sequencer)) => { + // Pre-authenticated transactions: validate and authenticate locally, then + // submit the authenticated transaction to the sequencer's pre-authenticated + // API. + self.submit_authenticated_to_sequencer( + *validator.clone(), + *sequencer.clone(), + request, + rebuilt_tx, + ) + .await + }, + (None, None) => { + // Unauthenticated transactions: forward the request to the source verbatim. + let mut forwarded_request = Request::new(request); + if let Some(accept) = original_accept_header { + forwarded_request + .metadata_mut() + .insert(http::header::ACCEPT.as_str(), accept); + } + source_rpc + .as_ref() + .clone() + .submit_proven_tx(forwarded_request) + .await + .map(tonic::Response::into_inner) + }, + (Some(_), None) | (None, Some(_)) => { + Err(Status::internal("one of validator or sequencer are not configured")) + }, } - return source_rpc - .as_ref() - .clone() - .submit_proven_tx(forwarded_request) - .await - .map(tonic::Response::into_inner); }, - }; - - // Transaction inputs must be provided in order to allow for transaction re-execution via - // the Validator. - if request.transaction_inputs.is_some() { - validator.clone().submit_proven_transaction(request.clone()).await?; - } else { - return Err(Status::invalid_argument("Transaction inputs must be provided")); } + } +} + +impl RpcService { + /// Pre-authenticated transaction submission path for a single transaction. + /// + /// Re-executes the transaction via the validator, authenticates it against the local + /// (replica) store, then submits the authenticated transaction to the sequencer's + /// pre-authenticated API. + async fn submit_authenticated_to_sequencer( + &self, + validator: ValidatorClient, + sequencer: SequencerClient, + request: proto::transaction::ProvenTransaction, + rebuilt_tx: ProvenTransaction, + ) -> tonic::Result { + let tx_inputs = get_tx_inputs(&self.store, &rebuilt_tx).await.map_err(|err| { + Status::internal(err.as_report_context("failed to get transaction inputs")) + })?; - block_producer - .submit_proven_tx(rebuilt_tx) + let authenticated_tx = + AuthenticatedTransaction::new_unchecked(rebuilt_tx.into(), tx_inputs).map_err( + |err| Status::internal(err.as_report_context("failed to authenticate transaction")), + )?; + + // Submit to validator. + let mut validator = validator; + validator.submit_proven_transaction(request).await?; + + // Submit to sequencer. + let mut sequencer = sequencer; + sequencer + .submit_authenticated_tx(proto::sequencer::AuthenticatedTransaction::from( + authenticated_tx, + )) .await - .map(Into::into) - .map_err(Into::into) + .map(tonic::Response::into_inner) } } diff --git a/crates/rpc/src/server/api/submit_proven_tx_batch.rs b/crates/rpc/src/server/api/submit_proven_tx_batch.rs index 18d96b229..90a78be3d 100644 --- a/crates/rpc/src/server/api/submit_proven_tx_batch.rs +++ b/crates/rpc/src/server/api/submit_proven_tx_batch.rs @@ -1,3 +1,5 @@ +use miden_node_block_producer::store::get_tx_inputs; +use miden_node_proto::clients::{SequencerClient, ValidatorClient}; use miden_node_proto::generated as proto; use miden_node_utils::ErrorReport; use miden_node_utils::spawn::spawn_blocking_in_current_span; @@ -115,64 +117,116 @@ impl proto::server::rpc_api::SubmitProvenTxBatch for RpcService { } // Verify batch transaction proofs. - // - // Need to do this because ProvenBatch has no real kernel yet, so we can only - // really check that the calculated proof matches the one given in the request. - let expected_proof = spawn_blocking_in_current_span({ - let proposed_batch = proposed_batch.clone(); - move || { - LocalBatchProver::new(MIN_PROOF_SECURITY_LEVEL).prove(proposed_batch).map_err( - |err| { - Status::invalid_argument( - err.as_report_context("proposed block proof failed"), - ) - }, - ) - } - }) - .await - .map_err(|err| { - Status::internal(format!("batch proof verification task failed: {err}")) - })??; + verify_batch_proof(&proven_batch, &proposed_batch).await?; - if expected_proof != proven_batch { - return Err(Status::invalid_argument("batch proof did not match proposed batch")); - } - - // In full node mode we forward the request to the source. - let (block_producer, validator) = match &self.mode { + match &self.mode { RpcMode::Sequencer { block_producer, validator } => { - (block_producer.as_ref(), validator.as_ref()) - }, - RpcMode::FullNode { source_rpc, .. } => { - let mut forwarded_request = Request::new(request); - if let Some(accept) = original_accept_header { - forwarded_request.metadata_mut().insert(http::header::ACCEPT.as_str(), accept); - } - return source_rpc - .as_ref() + validator .clone() - .submit_proven_tx_batch(forwarded_request) + .submit_batch(&proposed_batch, &request.transaction_inputs) + .await?; + block_producer + .submit_proven_tx_batch(proposed_batch) .await - .map(tonic::Response::into_inner); + .map(Into::into) + .map_err(Into::into) }, - }; + RpcMode::FullNode { source_rpc, validator, sequencer, .. } => { + match (validator, sequencer) { + (Some(validator), Some(sequencer)) => { + // Pre-authenticated transactions: validate and authenticate locally, then + // submit the authenticated batch to the sequencer's pre-authenticated API. + self.submit_authenticated_batch_to_sequencer( + *validator.clone(), + *sequencer.clone(), + proposed_batch, + &request.transaction_inputs, + ) + .await + }, + (None, None) => { + // Unauthenticated transactions: forward the request to the source verbatim. + let mut forwarded_request = Request::new(request); + if let Some(accept) = original_accept_header { + forwarded_request + .metadata_mut() + .insert(http::header::ACCEPT.as_str(), accept); + } + source_rpc + .as_ref() + .clone() + .submit_proven_tx_batch(forwarded_request) + .await + .map(tonic::Response::into_inner) + }, + (Some(_), None) | (None, Some(_)) => { + Err(Status::internal("one of validator or sequencer are not configured")) + }, + } + }, + } + } +} - // Submit each transaction to the validator. - // - // SAFETY: We checked earlier that the two iterators are the same length. - for (tx, inputs) in proposed_batch.transactions().iter().zip(&request.transaction_inputs) { - let request = proto::transaction::ProvenTransaction { - transaction: tx.to_bytes(), - transaction_inputs: inputs.clone().into(), - }; - validator.clone().submit_proven_transaction(request).await?; +impl RpcService { + /// Pre-authenticated transaction submission path for a batch. + /// + /// Re-executes each transaction via the validator, authenticates each against the + /// local (replica) store, then submits the authenticated batch to the sequencer's + /// pre-authenticated API. + async fn submit_authenticated_batch_to_sequencer( + &self, + mut validator: ValidatorClient, + mut sequencer: SequencerClient, + proposed_batch: ProposedBatch, + transaction_inputs: &[Vec], + ) -> tonic::Result { + validator.submit_batch(&proposed_batch, transaction_inputs).await?; + + let mut auth_inputs = Vec::with_capacity(proposed_batch.transactions().len()); + for tx in proposed_batch.transactions() { + let inputs = get_tx_inputs(&self.store, tx).await.map_err(|err| { + Status::internal(err.as_report_context("failed to authenticate transaction")) + })?; + auth_inputs.push(inputs.into()); } - block_producer - .submit_proven_tx_batch(proposed_batch) + let authenticated_batch = proto::sequencer::AuthenticatedTransactionBatch { + proposed_batch: proposed_batch.to_bytes(), + auth_inputs, + }; + sequencer + .submit_authenticated_tx_batch(authenticated_batch) .await - .map(Into::into) - .map_err(Into::into) + .map(tonic::Response::into_inner) } } + +/// Verifies the batch proof by re-proving the proposed batch and comparing against the submitted +/// proof. +/// +/// Need to do this because `ProvenBatch` has no real kernel yet, so we can only really check that +/// the calculated proof matches the one given in the request. +async fn verify_batch_proof( + proven_batch: &ProvenBatch, + proposed_batch: &ProposedBatch, +) -> tonic::Result<()> { + let expected_proof = spawn_blocking_in_current_span({ + let proposed_batch = proposed_batch.clone(); + move || { + LocalBatchProver::new(MIN_PROOF_SECURITY_LEVEL) + .prove(proposed_batch) + .map_err(|err| { + Status::invalid_argument(err.as_report_context("proposed block proof failed")) + }) + } + }) + .await + .map_err(|err| Status::internal(format!("batch proof verification task failed: {err}")))??; + + if &expected_proof != proven_batch { + return Err(Status::invalid_argument("batch proof did not match proposed batch")); + } + + Ok(()) +} diff --git a/crates/rpc/src/server/mod.rs b/crates/rpc/src/server/mod.rs index fa63779f7..1c70136fb 100644 --- a/crates/rpc/src/server/mod.rs +++ b/crates/rpc/src/server/mod.rs @@ -4,11 +4,16 @@ use std::sync::Arc; use accept::AcceptHeaderLayer; use anyhow::Context; use miden_node_block_producer::{BlockProducerApi, RpcReadiness, RpcSync}; -use miden_node_proto::clients::{NtxBuilderClient, RpcClient as SourceRpcClient, ValidatorClient}; -use miden_node_proto::server::rpc_api; +use miden_node_proto::clients::{ + NtxBuilderClient, + RpcClient as SourceRpcClient, + SequencerClient, + ValidatorClient, +}; +use miden_node_proto::server::{rpc_api, sequencer_api}; use miden_node_proto_build::rpc_api_descriptor; use miden_node_store::state::State; -use miden_node_utils::clap::GrpcOptionsExternal; +use miden_node_utils::clap::{GrpcOptionsExternal, GrpcOptionsInternal}; use miden_node_utils::cors::cors_for_grpc_web_layer; use miden_node_utils::grpc; use miden_node_utils::panic::{CatchPanicLayer, catch_panic_layer_fn}; @@ -24,6 +29,7 @@ use tower_http::trace::TraceLayer; use tracing::info; use crate::COMPONENT; +use crate::server::api::SequencerInternalService; use crate::server::health::HealthCheckLayer; mod accept; @@ -55,13 +61,19 @@ pub enum RpcMode { block_producer: Box, validator: Box, }, - /// Full-node RPC forwards submissions to the source RPC. + /// Full-node RPC. /// - /// The caller is responsible for configuring this client with any request metadata the source - /// RPC requires. + /// By default it forwards submissions verbatim to the source RPC (the caller is responsible for + /// configuring this client with any request metadata the source RPC requires). + /// + /// When the validator and sequencer clients are set, the full-node will, instead of forwarding, + /// re-execute submissions through the validator and authenticate them against its store, then + /// submit the authenticated result directly to the sequencer's internal API. FullNode { source_rpc: Box, readiness_threshold: u32, + validator: Option>, + sequencer: Option>, }, } @@ -73,10 +85,17 @@ impl RpcMode { } } - pub fn full_node(source_rpc: SourceRpcClient, readiness_threshold: u32) -> Self { + pub fn full_node( + source_rpc: SourceRpcClient, + readiness_threshold: u32, + validator: Option, + sequencer: Option, + ) -> Self { Self::FullNode { source_rpc: Box::new(source_rpc), readiness_threshold, + sequencer: sequencer.map(Box::new), + validator: validator.map(Box::new), } } } @@ -122,7 +141,7 @@ impl Rpc { ) .await; }, - RpcMode::FullNode { source_rpc, readiness_threshold } => { + RpcMode::FullNode { source_rpc, readiness_threshold, .. } => { health_reporter .set_service_status( rpc_api::service_name(), @@ -196,3 +215,46 @@ impl Rpc { tasks.join_next_as_error().await } } + +// INTERNAL SEQUENCER +// ================================================================================================ + +/// The internal Sequencer server. +/// +/// Serves the private `sequencer.Api` gRPC service, which accepts already-authenticated +/// transactions from full nodes and submits them directly to the mempool *without* +/// re-verification. +/// +/// This must only ever be exposed on a private, network-isolated listener: callers can inject +/// transactions that the sequencer will not independently verify. +pub struct SequencerInternal { + /// The listener the service binds to. + pub listener: TcpListener, + /// The in-process block producer API submissions are forwarded to. + pub block_producer: BlockProducerApi, + /// gRPC server options for internal services (timeouts). + pub grpc_options: GrpcOptionsInternal, +} + +impl SequencerInternal { + /// Serves the internal sequencer API. + /// + /// Executes in place (i.e. not spawned) and will run indefinitely until a fatal error is + /// encountered. + pub async fn serve(self) -> anyhow::Result<()> { + info!(target: COMPONENT, endpoint = ?self.listener, "Internal sequencer server initialized"); + + let service = SequencerInternalService { block_producer: self.block_producer }; + + // Note: deliberately no accept-header / rate-limit / auth layers; this is a private, + // trusted interface and is expected to be network-isolated. + tonic::transport::Server::builder() + .layer(CatchPanicLayer::custom(catch_panic_layer_fn)) + .layer(TraceLayer::new_for_grpc().make_span_with(grpc_trace_fn)) + .timeout(self.grpc_options.request_timeout) + .add_service(sequencer_api::service(service)) + .serve_with_incoming(TcpListenerStream::new(self.listener)) + .await + .context("failed to serve internal sequencer API") + } +} diff --git a/crates/rpc/src/tests.rs b/crates/rpc/src/tests.rs index e33ccf161..a2b873410 100644 --- a/crates/rpc/src/tests.rs +++ b/crates/rpc/src/tests.rs @@ -471,7 +471,7 @@ async fn rpc_rejects_post_deployment_network_account_tx() { let service = RpcService::new( Arc::clone(&store.state), - RpcMode::full_node(source_rpc_client(), 100), + RpcMode::full_node(source_rpc_client(), 100, None, None), None, NonZeroUsize::new(1_000_000).unwrap(), None, @@ -631,7 +631,7 @@ async fn full_node_forwards_get_network_note_status_to_source_rpc() { let local_store = TestStore::start().await; let full_node = RpcService::new( Arc::clone(&local_store.state), - RpcMode::full_node(source_rpc, 100), + RpcMode::full_node(source_rpc, 100, None, None), None, NonZeroUsize::new(1_000).unwrap(), None, @@ -661,7 +661,7 @@ async fn full_node_preserves_original_accept_metadata_when_forwarding() { let local_store = TestStore::start().await; let full_node = RpcService::new( Arc::clone(&local_store.state), - RpcMode::full_node(source_rpc, 100), + RpcMode::full_node(source_rpc, 100, None, None), None, NonZeroUsize::new(1_000).unwrap(), None, diff --git a/proto/proto/internal/sequencer.proto b/proto/proto/internal/sequencer.proto new file mode 100644 index 000000000..db26af427 --- /dev/null +++ b/proto/proto/internal/sequencer.proto @@ -0,0 +1,87 @@ +// Specification of the internal Sequencer API. +// +// This service is served by the sequencer on a private, network-isolated listener. +syntax = "proto3"; +package sequencer; + +import "types/account.proto"; +import "types/blockchain.proto"; +import "types/primitives.proto"; +import "types/transaction.proto"; + +// INTERNAL SEQUENCER API +// ================================================================================================ + +service Api { + // Submits an already-authenticated transaction directly to the mempool. Returns the node's + // current block height. + rpc SubmitAuthenticatedTx(AuthenticatedTransaction) returns (blockchain.BlockNumber) {} + + // Submits a batch of already-authenticated transactions directly to the mempool. Returns the + // node's current block height. + rpc SubmitAuthenticatedTxBatch(AuthenticatedTransactionBatch) returns (blockchain.BlockNumber) {} +} + +// MESSAGES +// ================================================================================================ + +// An authenticated transaction. +message AuthenticatedTransaction { + // The proven transaction. + // + // Encoded using [miden_protocol::transaction::ProvenTransaction::to_bytes]. + bytes transaction = 1; + + // The account state provided by the store inputs, if any. + optional primitives.Digest store_account_state = 2; + + // Unauthenticated input note commitments that have since been committed to, and authenticated. + repeated primitives.Digest notes_authenticated_by_store = 3; + + // The chain height at which authentication took place. + fixed32 authentication_height = 4; +} + +// A proposed batch together with the inputs each of its transactions was authenticated against. +message AuthenticatedTransactionBatch { + // The proposed batch. + // + // Encoded using [miden_protocol::batch::ProposedBatch::to_bytes]. + bytes proposed_batch = 1; + + // The store inputs for each transaction in the batch. + // + // Must match the transaction ordering in the batch. + repeated AuthInputs auth_inputs = 2; +} + +// The store-derived inputs a transaction was authenticated against. +// +// This is the wire form of the block producer's lightweight transaction inputs (the result of +// authenticating a transaction against the store state), not the protocol's transaction inputs +// used for re-execution. +message AuthInputs { + // The account ID of the transaction. + account.AccountId account_id = 1; + + // The transaction account's current onchain commitment, if any. + optional primitives.Digest account_commitment = 2; + + // The transaction's nullifiers and the block in which each was consumed, if any. + repeated NullifierRecord nullifiers = 3; + + // Unauthenticated input note commitments which were found committed in the store. + repeated primitives.Digest found_unauthenticated_notes = 4; + + // The current chain tip. + fixed32 current_block_height = 5; +} + +// A nullifier together with the block in which it was consumed. +message NullifierRecord { + // The nullifier. + primitives.Digest nullifier = 1; + + // The block number in which the nullifier was consumed; 0 if it is unspent. + fixed32 block_num = 2; +} diff --git a/scripts/run-node.sh b/scripts/run-node.sh index a88cb3fac..0c7da6214 100755 --- a/scripts/run-node.sh +++ b/scripts/run-node.sh @@ -29,6 +29,7 @@ NTX_BUILDER_DIR="/tmp/ntx-builder" ACCOUNTS_DIR="/tmp/accounts" VALIDATOR_PORT=50101 +SEQUENCER_INTERNAL_PORT=50201 NTX_BUILDER_PORT=50301 RPC_PORT=57291 FULL_NODE_1_RPC_PORT=57292 @@ -48,7 +49,7 @@ cleanup() { trap cleanup EXIT INT TERM kill_ports() { - local ports=("$VALIDATOR_PORT" "$NTX_BUILDER_PORT" "$RPC_PORT" "$REMOTE_PROVER_PORT") + local ports=("$VALIDATOR_PORT" "$SEQUENCER_INTERNAL_PORT" "$NTX_BUILDER_PORT" "$RPC_PORT" "$REMOTE_PROVER_PORT") if [[ "$ENABLE_FULL_NODES" == "true" ]]; then ports+=("$FULL_NODE_1_RPC_PORT" "$FULL_NODE_2_RPC_PORT") @@ -157,6 +158,7 @@ OTEL_RESOURCE_ATTRIBUTES="$(node_resource_attributes sequencer)" \ --data-directory "$NODE_DIR" \ --validator.url "http://127.0.0.1:$VALIDATOR_PORT" \ --ntx-builder.url "http://127.0.0.1:$NTX_BUILDER_PORT" \ + --internal.listen "0.0.0.0:$SEQUENCER_INTERNAL_PORT" \ $EXTRA_ARGS & PIDS+=($!) @@ -180,12 +182,14 @@ echo "Starting network transaction builder..." PIDS+=($!) if [[ "$ENABLE_FULL_NODES" == "true" ]]; then - echo "Starting full node 1 (upstream: sequencer at 127.0.0.1:$RPC_PORT)..." + echo "Starting full node 1 (pre-authenticating; upstream: sequencer at 127.0.0.1:$RPC_PORT)..." OTEL_RESOURCE_ATTRIBUTES="$(node_resource_attributes full-node-1)" \ "$NODE_BINARY" full \ --rpc.listen "0.0.0.0:$FULL_NODE_1_RPC_PORT" \ --sync.block-source.url "http://127.0.0.1:$RPC_PORT" \ --data-directory "$FULL_NODE_1_DIR" \ + --validator.url "http://127.0.0.1:$VALIDATOR_PORT" \ + --sequencer.internal.url "http://127.0.0.1:$SEQUENCER_INTERNAL_PORT" \ $EXTRA_ARGS & PIDS+=($!) @@ -205,9 +209,10 @@ else fi echo "=== All components running. Ctrl+C to stop. ===" +echo "=== Sequencer internal endpoint: :$SEQUENCER_INTERNAL_PORT ===" if [[ "$ENABLE_FULL_NODES" == "true" ]]; then echo "=== Block propagation chain: :$RPC_PORT -> :$FULL_NODE_1_RPC_PORT -> :$FULL_NODE_2_RPC_PORT ===" - echo "=== RPC endpoints: :$RPC_PORT, :$FULL_NODE_1_RPC_PORT, :$FULL_NODE_2_RPC_PORT ===" + echo "=== RPC endpoints: :$RPC_PORT, :$FULL_NODE_1_RPC_PORT (pre-authenticated submitter), :$FULL_NODE_2_RPC_PORT ===" else echo "=== RPC endpoint: :$RPC_PORT ===" fi