Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion bin/node/src/commands/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ pub use recover::RecoverCommand;
const ENV_DATA_DIRECTORY: &str = "MIDEN_NODE_DATA_DIRECTORY";

#[derive(Subcommand, Debug)]
#[expect(clippy::large_enum_variant, reason = "cli is a once-off usage")]
pub enum Command {
/// Start the node in sequencer mode.
///
Expand Down
102 changes: 96 additions & 6 deletions bin/node/src/commands/modes.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;

use anyhow::Context;
use miden_node_block_producer::{DEFAULT_VALIDATOR_TIMEOUT, Sequencer};
use miden_node_proto::clients::{Builder, NtxBuilderClient, RpcClient, ValidatorClient};
use miden_node_rpc::{Rpc, RpcMode};
use miden_node_proto::clients::{
Builder,
NtxBuilderClient,
RpcClient,
TrustedClient,
ValidatorClient,
};
use miden_node_rpc::{Rpc, RpcMode, Trusted, TrustedSubmission};
use miden_node_store::State;
use miden_node_utils::clap::duration_to_human_readable_string;
use miden_node_utils::clap::{GrpcOptionsInternal, duration_to_human_readable_string};
use miden_node_utils::tasks::Tasks;
use tokio::net::TcpListener;
use url::Url;
Expand All @@ -32,6 +39,17 @@ pub struct SequencerCommand {

#[command(flatten)]
pub store: StoreOptions,

/// Socket address at which to serve the private trusted submission API.
///
/// When unset the trusted submission service is not exposed. This interface accepts
/// already-authenticated transactions from trusted full nodes *without* re-verification.
#[arg(
long = "trusted.listen",
env = "MIDEN_NODE_TRUSTED_LISTEN",
value_name = "LISTEN"
)]
pub trusted_listen: Option<SocketAddr>,
}

impl SequencerCommand {
Expand Down Expand Up @@ -63,14 +81,25 @@ impl SequencerCommand {
let rpc = Rpc {
listener: bind_rpc(runtime.rpc_listen).await?,
store: state,
mode: RpcMode::sequencer(block_producer, self.external_services.validator_client()?),
mode: RpcMode::sequencer(
block_producer.clone(),
self.external_services.validator_client()?,
),
ntx_builder: Some(self.external_services.ntx_builder_client()?),
grpc_options: runtime.external_grpc_options,
network_tx_auth,
};
let mut tasks = Tasks::new();
tasks.spawn("sequencer", sequencer.wait());
tasks.spawn("RPC server", rpc.serve());
if let Some(trusted_listen) = self.trusted_listen {
let trusted = Trusted {
listener: bind_rpc(trusted_listen).await?,
block_producer,
grpc_options: GrpcOptionsInternal::from(runtime.external_grpc_options),
};
tasks.spawn("trusted submission server", trusted.serve());
}

tasks.join_next_as_error().await
}
Expand Down Expand Up @@ -132,20 +161,24 @@ pub struct FullNodeCommand {

#[command(flatten)]
pub store: StoreOptions,

#[command(flatten)]
pub trusted: TrustedFullNodeOptions,
}

impl FullNodeCommand {
pub async fn handle(self) -> anyhow::Result<()> {
let runtime = self.runtime.runtime_config(&self.store);
let source_rpc = self.sync.source_rpc_client()?;
let trusted = self.trusted.trusted_submission()?;
let network_tx_auth = self.runtime.rpc.network_tx_auth()?;
let state = load_state(&runtime).await?;
let _disk_monitor = state.spawn_disk_monitor();

let rpc = Rpc {
listener: bind_rpc(runtime.rpc_listen).await?,
store: state,
mode: RpcMode::full_node(source_rpc, self.sync.readiness_threshold),
mode: RpcMode::full_node(source_rpc, self.sync.readiness_threshold, trusted),
ntx_builder: None,
grpc_options: runtime.external_grpc_options,
network_tx_auth,
Expand All @@ -157,6 +190,63 @@ impl FullNodeCommand {
}
}

/// Options that turn a full node into a *trusted* full node.
///
/// When both URLs are set the full node validates and authenticates submissions locally and
/// forwards the authenticated result to the sequencer's trusted submission API, rather than
/// forwarding the raw transaction upstream. Both must be provided together.
#[derive(clap::Args, Clone, Debug)]
pub struct TrustedFullNodeOptions {
/// The validator service gRPC URL.
#[arg(
long = "validator.url",
env = "MIDEN_NODE_VALIDATOR_URL",
value_name = "URL",
requires = "sequencer_url"
)]
pub validator_url: Option<Url>,

/// The sequencer's private trusted submission gRPC URL.
#[arg(
long = "sequencer.url",
env = "MIDEN_NODE_SEQUENCER_URL",
value_name = "URL",
requires = "validator_url"
)]
pub sequencer_url: Option<Url>,
}

impl TrustedFullNodeOptions {
/// Builds the trusted submission clients, or `None` if this full node is not trusted.
fn trusted_submission(&self) -> anyhow::Result<Option<TrustedSubmission>> {
let (Some(validator_url), Some(sequencer_url)) = (&self.validator_url, &self.sequencer_url)
else {
return Ok(None);
};

let validator = Builder::new(validator_url.clone())
.with_tls()?
.with_timeout(Duration::from_secs(20))
.without_metadata_version()
.without_metadata_genesis()
.with_otel_context_injection()
.connect_lazy::<ValidatorClient>();

let sequencer = Builder::new(sequencer_url.clone())
.with_tls()?
.with_timeout(Duration::from_secs(20))
Comment thread
sergerad marked this conversation as resolved.
Outdated
.without_metadata_version()
.without_metadata_genesis()
.with_otel_context_injection()
.connect_lazy::<TrustedClient>();

Ok(Some(TrustedSubmission {
validator: Box::new(validator),
sequencer: Box::new(sequencer),
}))
}
}

impl SyncOptions {
fn source_rpc_client(&self) -> anyhow::Result<RpcClient> {
Ok(Builder::new(self.block_source_url.clone())
Expand All @@ -181,7 +271,7 @@ async fn load_state(runtime: &RuntimeConfig) -> anyhow::Result<Arc<State>> {
Ok(Arc::new(state))
}

async fn bind_rpc(listen: std::net::SocketAddr) -> anyhow::Result<TcpListener> {
async fn bind_rpc(listen: SocketAddr) -> anyhow::Result<TcpListener> {
TcpListener::bind(listen)
.await
.with_context(|| format!("failed to bind RPC listener to {listen}"))
Expand Down
71 changes: 61 additions & 10 deletions crates/block-producer/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::block_prover::BlockProver;
use crate::domain::transaction::AuthenticatedTransaction;
use crate::errors::MempoolSubmissionError;
use crate::mempool::{BatchBudget, BlockBudget, Mempool, MempoolConfig, SharedMempool};
use crate::store::{TransactionInputs, get_tx_inputs};
use crate::validator::BlockProducerValidatorClient;
use crate::{
CACHED_MEMPOOL_STATS_UPDATE_INTERVAL,
Expand Down Expand Up @@ -285,16 +286,13 @@ impl BlockProducerApi {
skip_all,
err
)]
#[expect(clippy::let_and_return)]
pub async fn submit_proven_tx(
&self,
tx: ProvenTransaction,
) -> Result<BlockNumber, MempoolSubmissionError> {
let tx_id = tx.id();

debug!(
target: COMPONENT,
tx_id = %tx_id.to_hex(),
tx_id = %tx.id().to_hex(),
account_id = %tx.account_id().to_hex(),
initial_state_commitment = %tx.account_update().initial_state_commitment(),
final_state_commitment = %tx.account_update().final_state_commitment(),
Expand All @@ -305,10 +303,32 @@ impl BlockProducerApi {
);
debug!(target: COMPONENT, proof = ?tx.proof());

let inputs = crate::store::get_tx_inputs(&self.store, &tx)
// Authenticate against the local store, then add to the mempool.
let inputs = get_tx_inputs(&self.store, &tx)
.await
.map_err(MempoolSubmissionError::StoreStateReadFailed)?;

self.submit_authenticated_tx(tx, inputs).await
}

/// Adds a transaction that has already been authenticated against the store to the mempool.
///
/// Unlike [`Self::submit_proven_tx`] this skips the store authentication step
/// ([`crate::store::get_tx_inputs`]) and trusts the supplied [`TransactionInputs`]. It is used
/// by the trusted submission path where a trusted full node performs authentication on the
/// sequencer's behalf.
#[instrument(
target = COMPONENT,
name = "block_producer.api.submit_authenticated_tx",
skip_all,
err
)]
#[expect(clippy::let_and_return)]
Comment thread
sergerad marked this conversation as resolved.
Outdated
pub async fn submit_authenticated_tx(
&self,
tx: ProvenTransaction,
inputs: TransactionInputs,
Comment thread
sergerad marked this conversation as resolved.
Outdated
) -> Result<BlockNumber, MempoolSubmissionError> {
// SAFETY: we assume that the rpc component has verified the transaction proof already.
let tx = AuthenticatedTransaction::new_unchecked(Arc::new(tx), inputs)
.map(Arc::new)
Expand All @@ -329,20 +349,51 @@ impl BlockProducerApi {
skip_all,
err
)]
#[expect(clippy::let_and_return)]
pub async fn submit_proven_tx_batch(
&self,
batch: ProposedBatch,
) -> Result<BlockNumber, MempoolSubmissionError> {
// We assume that the rpc component has verified everything, including the transaction
// proofs.

let mut txs = Vec::with_capacity(batch.transactions().len());
// Authenticate each transaction against the local store, then add the batch to the mempool.
let mut inputs = Vec::with_capacity(batch.transactions().len());
for tx in batch.transactions() {
let inputs = crate::store::get_tx_inputs(&self.store, tx)
.await
.map_err(MempoolSubmissionError::StoreStateReadFailed)?;
inputs.push(
get_tx_inputs(&self.store, tx)
.await
.map_err(MempoolSubmissionError::StoreStateReadFailed)?,
);
}

self.submit_authenticated_tx_batch(batch, inputs).await
}

/// Adds a batch whose transactions have already been authenticated against the store to the
/// mempool.
///
/// Counterpart to [`Self::submit_authenticated_tx`] for batches. The `inputs` must be in the
/// same order as the batch's transactions and have the same length.
#[instrument(
target = COMPONENT,
name = "block_producer.api.submit_authenticated_tx_batch",
skip_all,
err
)]
#[expect(clippy::let_and_return)]
pub async fn submit_authenticated_tx_batch(
&self,
batch: ProposedBatch,
inputs: Vec<TransactionInputs>,
) -> Result<BlockNumber, MempoolSubmissionError> {
debug_assert_eq!(
batch.transactions().len(),
inputs.len(),
"transaction inputs must match the batch's transactions"
);
Comment thread
sergerad marked this conversation as resolved.
Outdated

let mut txs = Vec::with_capacity(batch.transactions().len());
for (tx, inputs) in batch.transactions().iter().zip(inputs) {
// SAFETY: We assume that the rpc component has verified the transaction proofs, as well
// as the batch integrity itself.
let tx = AuthenticatedTransaction::new_unchecked(Arc::clone(tx), inputs)
Expand Down
81 changes: 81 additions & 0 deletions crates/block-producer/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ use std::fmt::{Display, Formatter};
use std::num::NonZeroU32;

use itertools::Itertools;
use miden_node_proto::errors::ConversionError;
use miden_node_proto::generated::trusted;
use miden_node_store::state::{Finality, State, TransactionInputs as StoreTransactionInputs};
use miden_node_utils::formatting::format_opt;
use miden_protocol::Word;
Expand Down Expand Up @@ -65,6 +67,73 @@ impl TransactionInputs {
}
}

// PROTO CONVERSIONS
// ------------------------------------------------------------------------------------------------

impl From<TransactionInputs> for trusted::AuthInputs {
fn from(value: TransactionInputs) -> Self {
Self {
account_id: Some(value.account_id.into()),
account_commitment: value.account_commitment.map(Into::into),
nullifiers: value
.nullifiers
.into_iter()
.map(|(nullifier, block_num)| trusted::NullifierRecord {
nullifier: Some(nullifier.into()),
block_num: block_num.map_or(0, NonZeroU32::get),
})
.collect(),
found_unauthenticated_notes: value
.found_unauthenticated_notes
.into_iter()
.map(Into::into)
.collect(),
current_block_height: value.current_block_height.as_u32(),
}
}
}

impl TryFrom<trusted::AuthInputs> for TransactionInputs {
type Error = ConversionError;

fn try_from(value: trusted::AuthInputs) -> Result<Self, Self::Error> {
let account_id = value
.account_id
.ok_or_else(|| ConversionError::missing_field::<trusted::AuthInputs>("account_id"))?
.try_into()?;

let account_commitment = value.account_commitment.map(Word::try_from).transpose()?;

let nullifiers = value
.nullifiers
.into_iter()
.map(|record| {
let nullifier = record
.nullifier
.ok_or_else(|| {
ConversionError::missing_field::<trusted::NullifierRecord>("nullifier")
})?
.try_into()?;
Ok((nullifier, NonZeroU32::new(record.block_num)))
})
.collect::<Result<_, ConversionError>>()?;

let found_unauthenticated_notes = value
.found_unauthenticated_notes
.into_iter()
.map(Word::try_from)
.collect::<Result<_, ConversionError>>()?;

Ok(Self {
account_id,
account_commitment,
nullifiers,
found_unauthenticated_notes,
current_block_height: value.current_block_height.into(),
})
}
}

impl Display for TransactionInputs {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let nullifiers = self
Expand All @@ -91,6 +160,18 @@ impl Display for TransactionInputs {
// STORE STATE
// ================================================================================================

/// Authenticates a proven transaction against the store, returning the [`TransactionInputs`]
/// needed to admit it to the mempool.
///
/// This reads the committed state relevant to the transaction: the account's current commitment,
/// the consumption status of each of the transaction's nullifiers, and which of its unauthenticated
/// input notes have since been committed. The result is captured at the store's current committed
/// chain tip.
///
/// # Errors
///
/// Returns an error if the store query fails, or if the transaction creates a new account whose ID
/// prefix already exists in the store.
#[instrument(target = COMPONENT, name = "store.state.get_tx_inputs", skip_all, err)]
pub async fn get_tx_inputs(
state: &State,
Expand Down
Loading
Loading