diff --git a/Cargo.lock b/Cargo.lock index e4de0a4894..e9e764aabc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7743,6 +7743,7 @@ dependencies = [ "observability", "oid-registry", "prost", + "rand 0.8.5", "rcgen", "rustls-webpki 0.103.9", "serde", diff --git a/core/grpc/proto/kms-service-insecure.v1.proto b/core/grpc/proto/kms-service-insecure.v1.proto index 9b87709b96..d0a018d855 100644 --- a/core/grpc/proto/kms-service-insecure.v1.proto +++ b/core/grpc/proto/kms-service-insecure.v1.proto @@ -73,4 +73,6 @@ service CoreServiceEndpoint { rpc GetKeyMaterialAvailability(kms.v1.Empty) returns (kms.v1.KeyMaterialAvailabilityResponse); rpc GetHealthStatus(kms.v1.Empty) returns (kms.v1.HealthStatusResponse); + + rpc BandwidthBenchmark(kms.v1.BandwidthBenchmarkRequest) returns (kms.v1.BandwidthBenchmarkResponse); } diff --git a/core/grpc/proto/kms-service.v1.proto b/core/grpc/proto/kms-service.v1.proto index 363ccc3e9a..9364f4243c 100644 --- a/core/grpc/proto/kms-service.v1.proto +++ b/core/grpc/proto/kms-service.v1.proto @@ -298,7 +298,7 @@ service CoreServiceEndpoint { // * The `crs_id` in `request` will not be usable again. // * All data associated with the `crs_id` has been removed. rpc AbortCrsGen(kms.v1.RequestId) returns (kms.v1.Empty); - + // Constructs a new MPC context. // // That is, updates the internal state and configuration files to support a new MPC context. @@ -494,4 +494,7 @@ service CoreServiceEndpoint { // ## Pre-condition: KMS service must be initialized // ## Post-condition: No state changes, read-only operation rpc GetHealthStatus(kms.v1.Empty) returns (kms.v1.HealthStatusResponse); + + + rpc BandwidthBenchmark(kms.v1.BandwidthBenchmarkRequest) returns (kms.v1.BandwidthBenchmarkResponse); } diff --git a/core/grpc/proto/kms.v1.proto b/core/grpc/proto/kms.v1.proto index b8520fc582..4d9dc44862 100644 --- a/core/grpc/proto/kms.v1.proto +++ b/core/grpc/proto/kms.v1.proto @@ -830,3 +830,57 @@ message EpochResultResponse { message DestroyMpcEpochRequest { RequestId epoch_id = 1; } + +enum BandwidthKind { + Duration = 0; + Once = 1; +} + +message BandwidthBenchmarkRequest { + // The KMS context for which to perform the benchmark + RequestId context_id = 1; + // How long we keep sending bytes (s) + uint64 duration = 2; + // Number of sessions trying to send bytes in parallel + uint32 number_sessions = 3; + // Size of the payload each session will send at a time + uint32 payload_size_per_session = 4; + // Number of independent gRPC connections to open per peer for the + // benchmark. The `number_sessions` sessions are striped across these + // connections (round-robin) so they no longer all share a single + // HTTP/2 codec task. A value of 0 is treated as 1, which preserves + // the historical single-connection behavior. Useful for measuring + // small-payload throughput, where the per-connection codec task is + // typically the bottleneck. + uint32 connections_per_peer = 5; + BandwidthKind kind = 6; +} + +// All latency info are in ms +message LatencyInfo { + uint64 average = 1; + uint64 p50 = 2; + uint64 p90 = 3; + uint64 p99 = 4; + uint64 slowest = 5; + uint64 fastest = 6; +} + +message PeerBandwidthInfo { + // Peer party ID + uint32 peer_id = 1; + // Peer endpoint address + string endpoint = 2; + // Number of bytes sent to this peer + uint64 bytes_sent = 3; + // Effective time spent averaged over all sessions + // (may differ from requested duration because we don't stop the experiment abruptly) + uint64 duration = 4; + // Some measure of latency over all messages sent + LatencyInfo latency = 5; +} + +message BandwidthBenchmarkResponse { + // Benchmark result for each of the peers + repeated PeerBandwidthInfo peers_info = 1; +} \ No newline at end of file diff --git a/core/service/src/engine/centralized/endpoint.rs b/core/service/src/engine/centralized/endpoint.rs index 61b1b8aac4..7f3b7b9ea7 100644 --- a/core/service/src/engine/centralized/endpoint.rs +++ b/core/service/src/engine/centralized/endpoint.rs @@ -404,4 +404,11 @@ impl< ) -> Result, Status> { unimplemented!("MPC epochs are not supported in centralized KMS"); } + + async fn bandwidth_benchmark( + &self, + _request: Request, + ) -> Result, Status> { + unimplemented!("Bandwidth benchmark is not supported in centralized KMS"); + } } diff --git a/core/service/src/engine/threshold/bandwidth_bench.rs b/core/service/src/engine/threshold/bandwidth_bench.rs new file mode 100644 index 0000000000..d191106347 --- /dev/null +++ b/core/service/src/engine/threshold/bandwidth_bench.rs @@ -0,0 +1,147 @@ +use std::{collections::HashMap, ops::AddAssign, time::Duration}; + +use itertools::Itertools; +use kms_grpc::{ + ContextId, + kms::v1::{ + BandwidthBenchmarkRequest, BandwidthBenchmarkResponse, BandwidthKind, LatencyInfo, + PeerBandwidthInfo, + }, +}; +use threshold_networking::health_check::{BenchKind, HealthCheckStatus}; +use tonic::{Request, Response, Status}; + +use crate::engine::{ + threshold::service::session::ImmutableSessionMaker, + validation::{RequestIdParsingErr, parse_optional_grpc_request_id}, +}; + +pub(crate) async fn run_bandwidth_benchmark( + request: Request, + session_maker: ImmutableSessionMaker, +) -> Result, Status> { + tracing::info!("Received bandwidth benchmark request: {:?}", request); + let request = request.into_inner(); + let context_id: ContextId = + parse_optional_grpc_request_id(&request.context_id, RequestIdParsingErr::Context)?; + let payload_size = request.payload_size_per_session as usize; + let num_sessions = request.number_sessions as usize; + let kind: BandwidthKind = request.kind.try_into().map_err(|e| { + Status::invalid_argument(format!( + "Invalid bandwidth benchmark kind {}: {}", + request.kind, e + )) + })?; + let duration = match kind { + BandwidthKind::Once => BenchKind::Once, + BandwidthKind::Duration => BenchKind::Duration(Duration::from_secs(request.duration)), + }; + // A value of 0 (the proto default for older clients) is treated as 1 + // so this remains backward-compatible with the historical + // single-connection benchmark. + let connections_per_peer = (request.connections_per_peer as usize).max(1); + + let mut join_set = tokio::task::JoinSet::new(); + for session_idx in 0..num_sessions { + let session = session_maker + .get_healthcheck_session_with_pool(&context_id, connections_per_peer) + .await + .map_err(|e| { + Status::internal(format!( + "Failed to create health check session for context {}: {}", + context_id, e + )) + })?; + join_set.spawn(async move { + session + .run_bandwidth_benchmark(payload_size, duration, session_idx) + .await + }); + } + + let mut results = HashMap::new(); + + while let Some(result) = join_set.join_next().await { + let result = result + .map_err(|e| { + Status::internal(format!("Failed to join bandwidth benchmark task: {}", e)) + })? + .map_err(|e| Status::internal(format!("Bandwidth benchmark task failed: {}", e)))?; + for ((role, id), (bytes_sent, duration, status)) in result { + let (entry_sent, entry_duration, entry_status) = results + .entry((role, id)) + .or_insert_with(|| (0, vec![], vec![])); + entry_sent.add_assign(bytes_sent); + entry_duration.push(duration); + entry_status.extend(status); + } + } + + let peers_info = results + .into_iter() + .map(|((role, id), (bytes_sent, durations, status))| { + // Fill up tha latency struct + let latency = make_latency(status) + .inspect_err(|e| tracing::warn!("Error computing latency info: {}", e)) + .ok(); + let duration = match kind { + BandwidthKind::Once => durations + .into_iter() + .map(|d| d.as_secs()) + .max() + .unwrap_or(0), + BandwidthKind::Duration => { + (durations.iter().sum::().as_secs_f64() / durations.len() as f64) + as u64 + } + }; + PeerBandwidthInfo { + peer_id: role.one_based() as u32, + endpoint: id.hostname().to_string(), + bytes_sent: bytes_sent as u64, + duration, + latency, + } + }) + .collect_vec(); + + tracing::info!("Bandwidth benchmark completed. Results: {:?}", peers_info); + + Ok(Response::new(BandwidthBenchmarkResponse { peers_info })) +} + +fn make_latency(status: Vec) -> Result { + let latencies: Vec = status + .iter() + .map(|s| match s { + HealthCheckStatus::Ok(duration) => Ok(duration.as_millis()), + HealthCheckStatus::Error(_) => { + Err("Some error sending at least one payload".to_string()) + } + HealthCheckStatus::TimeOut(_) => { + Err("Timeout sending at least one payload".to_string()) + } + }) + .try_collect()?; + + let sorted_latencies = latencies.into_iter().sorted().collect_vec(); + + let average = (sorted_latencies.iter().sum::() as f64 / status.len() as f64) as u64; + let p50_idx = status.len() / 2; + let p50 = sorted_latencies.get(p50_idx).copied().unwrap_or(0) as u64; + let p90_idx = (status.len() * 90) / 100; + let p90 = sorted_latencies.get(p90_idx).copied().unwrap_or(0) as u64; + let p99_idx = (status.len() * 99) / 100; + let p99 = sorted_latencies.get(p99_idx).copied().unwrap_or(0) as u64; + let slowest = sorted_latencies.last().copied().unwrap_or(0) as u64; + let fastest = sorted_latencies.first().copied().unwrap_or(0) as u64; + + Ok(LatencyInfo { + average, + p50, + p90, + p99, + slowest, + fastest, + }) +} diff --git a/core/service/src/engine/threshold/endpoint.rs b/core/service/src/engine/threshold/endpoint.rs index 62170c2da8..5c576e645a 100644 --- a/core/service/src/engine/threshold/endpoint.rs +++ b/core/service/src/engine/threshold/endpoint.rs @@ -1,3 +1,4 @@ +use crate::engine::threshold::bandwidth_bench::run_bandwidth_benchmark; use crate::engine::threshold::threshold_kms::ThresholdKms; use crate::engine::threshold::traits::{ CrsGenerator, KeyGenPreprocessor, KeyGenerator, PublicDecryptor, UserDecryptor, @@ -408,5 +409,14 @@ impl_endpoint! { Ok(Response::new(response)) } + + + #[tracing::instrument(skip(self, request))] + async fn bandwidth_benchmark( + &self, + request: Request, + ) -> Result, Status> { + run_bandwidth_benchmark(request, self.session_maker.clone()).await + } } } diff --git a/core/service/src/engine/threshold/mod.rs b/core/service/src/engine/threshold/mod.rs index df39c25831..2b7df489a0 100644 --- a/core/service/src/engine/threshold/mod.rs +++ b/core/service/src/engine/threshold/mod.rs @@ -1,3 +1,4 @@ +pub mod bandwidth_bench; mod endpoint; pub mod service; pub mod threshold_kms; diff --git a/core/service/src/engine/threshold/service/session.rs b/core/service/src/engine/threshold/service/session.rs index 5c62b90c62..089a8e3302 100644 --- a/core/service/src/engine/threshold/service/session.rs +++ b/core/service/src/engine/threshold/service/session.rs @@ -244,13 +244,28 @@ impl SessionMaker { async fn get_healthcheck_session( &self, context_id: &ContextId, + ) -> anyhow::Result> { + self.get_healthcheck_session_with_pool(context_id, 1).await + } + + // Returns a health check session whose per-peer connection pool has + // `connections_per_peer` independent gRPC connections. With + // `connections_per_peer = 1` this matches [`get_healthcheck_session`]. + // Used by the bandwidth benchmark to stripe parallel sessions across + // multiple HTTP/2 codec tasks. + async fn get_healthcheck_session_with_pool( + &self, + context_id: &ContextId, + connections_per_peer: usize, ) -> anyhow::Result> { let nm = self.networking_manager.read().await; let role_assignment = self.get_role_assignment(context_id).await?; let my_role = self.my_role(context_id).await?; if let Some(role) = my_role { - Ok(nm.make_healthcheck_session(&role_assignment, role).await?) + Ok(nm + .make_healthcheck_session_with_pool(&role_assignment, role, connections_per_peer) + .await?) } else { Err(anyhow::anyhow!( "My role is not defined for context {}", @@ -861,6 +876,28 @@ impl ImmutableSessionMaker { ) -> anyhow::Result>> { self.inner.get_healthcheck_session_all_contexts().await } + + // Returns a health check session for the given context. + //pub(crate) async fn get_healthcheck_session( + // &self, + // context_id: &ContextId, + //) -> anyhow::Result> { + // self.inner.get_healthcheck_session(context_id).await + //} + + // Returns a health check session for the given context whose per-peer + // connection pool has `connections_per_peer` independent gRPC + // connections. With `connections_per_peer = 1` this matches + // [`get_healthcheck_session`]. Used by the bandwidth benchmark. + pub(crate) async fn get_healthcheck_session_with_pool( + &self, + context_id: &ContextId, + connections_per_peer: usize, + ) -> anyhow::Result> { + self.inner + .get_healthcheck_session_with_pool(context_id, connections_per_peer) + .await + } } /// Validates that a context and epoch ID exists and returns the role of the current server in this context. diff --git a/core/threshold-networking/Cargo.toml b/core/threshold-networking/Cargo.toml index f8403aba3f..c580761a1a 100644 --- a/core/threshold-networking/Cargo.toml +++ b/core/threshold-networking/Cargo.toml @@ -10,6 +10,7 @@ description = "Provides networking for all threshold network components. GRPC, s [dependencies] anyhow.workspace = true clap = { workspace = true, features = ["derive", "env"] } +rand.workspace = true rcgen.workspace = true async-trait.workspace = true attestation-doc-validation.workspace = true diff --git a/core/threshold-networking/protos/gnetworking.proto b/core/threshold-networking/protos/gnetworking.proto index 131ca0ded5..6f062c47af 100644 --- a/core/threshold-networking/protos/gnetworking.proto +++ b/core/threshold-networking/protos/gnetworking.proto @@ -27,6 +27,7 @@ enum Status { message HealthCheckRequest { bytes tag = 1; + bytes payload = 2; // throwaway bytes for bandwidth testing } message HealthCheckResponse {} \ No newline at end of file diff --git a/core/threshold-networking/src/grpc.rs b/core/threshold-networking/src/grpc.rs index cacd85bf1e..5cf68a3a86 100644 --- a/core/threshold-networking/src/grpc.rs +++ b/core/threshold-networking/src/grpc.rs @@ -374,10 +374,35 @@ impl GrpcNetworkingManager { }) } + /// Build a [`HealthCheckSession`] with a single (cached) gRPC connection + /// per peer. This is the production health-check path. pub async fn make_healthcheck_session( &self, role_assignment: &RoleAssignment, my_role: R, + ) -> anyhow::Result> { + self.make_healthcheck_session_with_pool(role_assignment, my_role, 1) + .await + } + + /// Build a [`HealthCheckSession`] with `connections_per_peer` independent + /// gRPC connections per peer. + /// + /// `connections_per_peer` is clamped to at least 1. With a value of 1 + /// this is identical to [`Self::make_healthcheck_session`]. Larger + /// values are intended for the bandwidth benchmark, which spreads its + /// parallel sessions across the pool to break out of the + /// single-codec-task throughput ceiling that limits small-payload + /// throughput on a single shared HTTP/2 connection. + /// + /// Only the first connection per peer is cached in the + /// [`crate::sending_service::GrpcSendingService`] channel map; the rest + /// live for the duration of the returned session. + pub async fn make_healthcheck_session_with_pool( + &self, + role_assignment: &RoleAssignment, + my_role: R, + connections_per_peer: usize, ) -> anyhow::Result> { let mut others = role_assignment.clone(); @@ -397,8 +422,11 @@ impl GrpcNetworkingManager { let mut connection_channels = HashMap::new(); for (role, identity) in others.inner.into_iter() { - let channel = self.sending_service.connect_to_party(&identity).await?; - connection_channels.insert((role, identity), channel); + let pool = self + .sending_service + .connect_to_party_pool(&identity, connections_per_peer) + .await?; + connection_channels.insert((role, identity), pool); } Ok(HealthCheckSession::new( @@ -1020,7 +1048,7 @@ impl Gnetworking for NetworkingImpl { ) .map_err(|e| *e)?; - tracing::info!("Received a HealthPing from {}", health_tag.sender); + tracing::debug!("Received a HealthPing from {}", health_tag.sender); Ok(tonic::Response::new(HealthCheckResponse::default())) } diff --git a/core/threshold-networking/src/health_check.rs b/core/threshold-networking/src/health_check.rs index 87bd6f9c5f..75142f4ded 100644 --- a/core/threshold-networking/src/health_check.rs +++ b/core/threshold-networking/src/health_check.rs @@ -1,3 +1,4 @@ +#![allow(clippy::type_complexity)] use super::ggen::gnetworking_client::GnetworkingClient; use crate::grpc::HealthTag; use error_utils::anyhow_error_and_log; @@ -14,8 +15,15 @@ pub struct HealthCheckSession { /// My own [`Role`] pub(crate) my_role: R, pub(crate) timeout: Duration, - pub(crate) connection_channels: - HashMap<(R, Identity), GnetworkingClient>>, + /// One or more gRPC clients per peer. The first entry is the cached, + /// shared client used for ordinary health checks. Additional entries + /// (only populated by [`crate::grpc::GrpcNetworkingManager::make_healthcheck_session_with_pool`]) + /// are independent connections used by the bandwidth benchmark to + /// stripe sessions across multiple HTTP/2 codec tasks. + pub(crate) connection_channels: HashMap< + (R, Identity), + Vec>>, + >, } pub enum HealthCheckStatus { @@ -24,7 +32,15 @@ pub enum HealthCheckStatus { TimeOut(Duration), } +#[derive(Clone, Copy, Debug)] +pub enum BenchKind { + Once, + Duration(Duration), +} + pub type HealthCheckResult = HashMap<(R, Identity), HealthCheckStatus>; +pub type BandwidthBenchmarkResult = + HashMap<(R, Identity), (usize, Duration, Vec)>; impl HealthCheckSession { pub fn new( @@ -33,7 +49,7 @@ impl HealthCheckSession { timeout: Duration, connection_channels: HashMap< (R, Identity), - GnetworkingClient>, + Vec>>, >, ) -> Self { Self { @@ -66,7 +82,18 @@ impl HealthCheckSession { .map_err(|_| anyhow_error_and_log("Failed to serialize the Health Check Tag"))?; let mut tasks = JoinSet::new(); - for ((role, id), client) in self.connection_channels.iter() { + for ((role, id), clients) in self.connection_channels.iter() { + // For an ordinary health check we just use the first (cached) + // client; any extra pool slots are reserved for the bandwidth + // benchmark. `connection_channels` is built such that every + // peer has at least one client — see + // [`crate::grpc::GrpcNetworkingManager::make_healthcheck_session_with_pool`]. + let client = clients.first().ok_or_else(|| { + anyhow_error_and_log(format!( + "Empty connection pool for peer {:?} in HealthCheckSession", + id + )) + })?; let (role, id, client, tag_serialized, timeout) = ( *role, id.clone(), @@ -74,22 +101,14 @@ impl HealthCheckSession { tag_serialized.clone(), self.timeout, ); - tasks.spawn(async move { - let start = std::time::Instant::now(); - let request = tonic::Request::new(super::ggen::HealthCheckRequest { - tag: tag_serialized, - }); - let response = - tokio::time::timeout(timeout, client.clone().health_check(request)).await; - let duration = start.elapsed(); - - let response = match response { - Ok(Ok(_)) => HealthCheckStatus::Ok(duration), - Ok(Err(e)) => HealthCheckStatus::Error((duration, e)), - Err(_e) => HealthCheckStatus::TimeOut(timeout), - }; - (role, id, response) - }); + tasks.spawn(Self::send( + tag_serialized, + client, + timeout, + vec![], + role, + id, + )); } let mut results = HashMap::new(); @@ -102,4 +121,126 @@ impl HealthCheckSession { } Ok(results) } + + pub async fn run_bandwidth_benchmark( + &self, + payload_size: usize, + duration: BenchKind, + session_idx: usize, + ) -> anyhow::Result> { + // For duration, hit all the other parties with a payload of the given size. + // As soon as the other party has answered, hit it with the next payload until the duration has elapsed. + // + // `session_idx` is the index of this session within the surrounding + // benchmark request (see `run_bandwidth_benchmark` in + // `core/service/src/engine/threshold/bandwidth_bench.rs`). It is used to + // stripe sessions across the per-peer connection pool so that + // independent sessions land on independent HTTP/2 codec tasks. + + let tag = HealthTag { + sender: self.owner.mpc_identity(), + }; + + let tag_serialized = bc2wrap::serialize(&tag).map_err(|_| { + anyhow_error_and_log("Failed to serialize the Health Check Tag for Bandwidth Benchmark") + })?; + + // Spawn a task for each party to run the bandwidth benchmark in parallel. + let mut join_set = JoinSet::new(); + + // Be safe and use random bytes as payload to avoid any compression that + // could happen before TLS layer + let payload = (0..payload_size) + .map(|_| rand::random::()) + .collect::>(); + + for ((role, id), clients) in self.connection_channels.iter() { + // Stripe sessions across the per-peer pool. The pool is built by + // `make_healthcheck_session_with_pool` and always has at least + // one entry; the empty-pool branch below should be unreachable, + // but guarding it keeps the modulo from trapping on a malformed + // session. + let pool_len = clients.len(); + if pool_len == 0 { + return Err(anyhow_error_and_log(format!( + "Empty connection pool for peer {:?} in HealthCheckSession", + id + ))); + } + let client = clients[session_idx % pool_len].clone(); + let (role, id, tag_serialized, timeout) = + (*role, id.clone(), tag_serialized.clone(), self.timeout); + + let payload = payload.clone(); + join_set.spawn(async move { + let mut total_bytes_sent = 0; + let start = std::time::Instant::now(); + let mut answers = Vec::new(); + match duration { + BenchKind::Once => { + // For the "once" kind, we just send one payload and return the result. + answers.push( + Self::send( + tag_serialized.clone(), + client.clone(), + timeout, + payload.clone(), + role, + id.clone(), + ) + .await + .2, + ); + total_bytes_sent += payload_size; + } + BenchKind::Duration(target_duration) => { + // For the "duration" kind, we keep sending payloads until the target duration has elapsed. + while start.elapsed() < target_duration { + answers.push( + Self::send( + tag_serialized.clone(), + client.clone(), + timeout, + payload.clone(), + role, + id.clone(), + ) + .await + .2, + ); + total_bytes_sent += payload_size; + } + } + } + tracing::debug!("Total bytes sent to party {}: {}", id, total_bytes_sent); + ((role, id), (total_bytes_sent, start.elapsed(), answers)) + }); + } + + Ok(join_set.join_all().await.into_iter().collect()) + } + + async fn send( + tag_serialized: Vec, + client: GnetworkingClient>, + timeout: Duration, + payload: Vec, + role: R, + id: Identity, + ) -> (R, Identity, HealthCheckStatus) { + let start = std::time::Instant::now(); + let request = tonic::Request::new(super::ggen::HealthCheckRequest { + tag: tag_serialized, + payload, + }); + let response = tokio::time::timeout(timeout, client.clone().health_check(request)).await; + let duration = start.elapsed(); + + let response = match response { + Ok(Ok(_)) => HealthCheckStatus::Ok(duration), + Ok(Err(e)) => HealthCheckStatus::Error((duration, e)), + Err(_e) => HealthCheckStatus::TimeOut(timeout), + }; + (role, id, response) + } } diff --git a/core/threshold-networking/src/sending_service.rs b/core/threshold-networking/src/sending_service.rs index b28ab23f11..83f9365c1d 100644 --- a/core/threshold-networking/src/sending_service.rs +++ b/core/threshold-networking/src/sending_service.rs @@ -77,8 +77,16 @@ pub trait SendingService: Send + Sync { )>; } +/// Per-peer connection pool. The vector contains one entry per cached +/// gRPC client (each backed by an independent TCP/HTTP-2 connection). +/// +/// For ordinary traffic we only ever read index 0; the bandwidth benchmark +/// can grow the pool via [`GrpcSendingService::connect_to_party_pool`] so +/// that parallel sessions can be striped across independent HTTP/2 codec +/// tasks. All sessions targeting the same peer share the same underlying +/// pool — the entries are clones of an `Arc`-backed [`Channel`]. type ChannelMap = - HashMap>>; + HashMap>>>; #[derive(Debug, Clone)] pub struct GrpcSendingService { @@ -94,14 +102,20 @@ pub struct GrpcSendingService { impl GrpcSendingService { /// Create the network channel between self and the grpc server of the other party - /// or retrieve it if one already exists + /// or retrieve it if one already exists. + /// + /// Returns the first connection of the peer's pool (creating a + /// one-element pool if no connection is cached yet). Production MPC + /// traffic should keep using this method. pub(crate) async fn connect_to_party( &self, receiver: &Identity, ) -> anyhow::Result>> { - if let Some(channel) = self.channel_map.read().await.get(receiver) { + if let Some(pool) = self.channel_map.read().await.get(receiver) + && let Some(client) = pool.first() + { tracing::debug!("Channel to {:?} already existed, retrieving it.", receiver); - return Ok(channel.clone()); + return Ok(client.clone()); } // Hold a write lock on the entry to avoid duplicate connections @@ -109,14 +123,86 @@ impl GrpcSendingService { let entry = channel_map_write_lock.entry(receiver.clone()); // First thing we do is re-check whether connection has been established while waiting for the lock - if let Entry::Occupied(channel) = entry { + if let Entry::Occupied(pool) = entry + && let Some(client) = pool.get().first() + { tracing::debug!( "Channel to {:?} was created while waiting for the lock, retrieving it.", receiver ); - return Ok(channel.get().clone()); + return Ok(client.clone()); + } + + // `build_client` is synchronous in practice (it only constructs a + // lazy `Channel`), so holding the write lock across it does not + // yield control to other tasks. + let client = self.build_client(receiver)?; + let pool = channel_map_write_lock.entry(receiver.clone()).or_default(); + if pool.is_empty() { + pool.push(client.clone()); + } + Ok(client) + } + + /// Ensure that `receiver`'s connection pool has at least `pool_size` + /// entries and return clones of the first `pool_size` of them. + /// + /// All callers asking for the same peer share the same underlying + /// connections — the pool is cached in [`Self::channel_map`] just like + /// the single-connection variant. Index 0 is the same client that + /// [`Self::connect_to_party`] returns. Indices `1..pool_size` are + /// additional, independent TCP/HTTP-2 connections (and therefore have + /// independent h2 codec tasks). + /// + /// `pool_size` is clamped to at least 1. Once the pool has grown the + /// extra connections stay alive for the lifetime of the + /// `GrpcSendingService`; that is intentional so that repeated benchmark + /// runs do not pay the connection-establishment cost each time, but it + /// does mean shrinking is not supported. Intended for the bandwidth + /// benchmark; production MPC traffic should keep using + /// [`Self::connect_to_party`] until we see real benefits of pooling. + pub(crate) async fn connect_to_party_pool( + &self, + receiver: &Identity, + pool_size: usize, + ) -> anyhow::Result>>> + { + let pool_size = pool_size.max(1); + + // Fast path: the cache already has at least `pool_size` entries. + if let Some(pool) = self.channel_map.read().await.get(receiver) + && pool.len() >= pool_size + { + return Ok(pool[..pool_size].to_vec()); } + // Slow path: take the write lock, top up the pool to `pool_size` + // entries, and return clones. `build_client` is synchronous in + // practice (lazy channel construction), so the write lock is not + // held across an actual yield point. + let mut channel_map_write_lock = self.channel_map.write().await; + let pool = channel_map_write_lock.entry(receiver.clone()).or_default(); + while pool.len() < pool_size { + pool.push(self.build_client(receiver)?); + } + Ok(pool[..pool_size].to_vec()) + } + + /// Build a fresh, uncached `GnetworkingClient` (TLS or plaintext) to + /// `receiver`. Every call opens a new underlying TCP/HTTP-2 connection + /// (lazily — the actual connect happens on first use). + /// + /// This function is intentionally **not** `async` even though the + /// surrounding API is: every operation in the body — URI parsing, + /// rustls config cloning, `Channel::new` / `Channel::builder().connect_lazy()`, + /// `GnetworkingClient::with_interceptor` — is synchronous. Keeping it + /// synchronous lets [`Self::connect_to_party`] and + /// [`Self::connect_to_party_pool`] safely call it while holding the + /// `channel_map` write lock. + fn build_client( + &self, + receiver: &Identity, + ) -> anyhow::Result>> { let proto = match self.tls_config { Some(_) => "https", None => "http", @@ -202,7 +288,6 @@ impl GrpcSendingService { let client = GnetworkingClient::with_interceptor(channel, ContextPropagator) .max_decoding_message_size(self.config.get_max_en_decode_message_size()) .max_encoding_message_size(self.config.get_max_en_decode_message_size()); - entry.insert_entry(client.clone()); Ok(client) } diff --git a/tools/kms-health-check/README.md b/tools/kms-health-check/README.md index 245cd3f153..34dc45d710 100644 --- a/tools/kms-health-check/README.md +++ b/tools/kms-health-check/README.md @@ -35,6 +35,7 @@ kms-health-check live --endpoint localhost:50100 --health-config health-check.to # Using environment variables for timeouts (note the double underscore separator) HEALTH_CHECK__CONNECTION_TIMEOUT_SECS=10 HEALTH_CHECK__REQUEST_TIMEOUT_SECS=30 kms-health-check live --endpoint localhost:50100 + ``` ## Configuration @@ -335,3 +336,134 @@ docker run -v $(pwd):/workspace \ - **Operator Key**: In threshold mode, only available if backup vault uses `SecretSharing` keychain - **Docker Resolution**: Automatically translates Docker service names to localhost when needed + +## Bandwidth Benchmark + +The tool can also be used to perform a bandwidth benchmark between the different parties. +As for the healthcheck, the benchmark will use the same gRPC server as the MPC protocol, +and will also perform the party authentication. Both a text and a json output format are available. + + +The bandwidth benchmark expects the following parameters: +- `CONTEXT_ID` MPC context we want to benchmark (i.e. corresponds to the set of parties) +- `DURATION_SEC` the duration of the experiment (in seconds) +- `NUM_SESSIONS` the number of sessions spawned in parallel +- `PAYLOAD_SIZE` the size of the payload sent over and over by each sessions (in bytes) +- `ENDPOINT` the address of the nodes that will be sending data +- `CONNECTIONS_PER_PEER` the number of dedicated TCP connections to the other peers + +The bandwidth benchmark consists in spawning `NUM_SESSIONS` sessions (each in their own tokio task) that will send a payload of `PAYLOAD_SIZE` bytes, wait for the ack from the other party and repeat. +This is done over a period of `DURATION_SEC` after which the results are collected and displayed. + +If the `DURATION_SEC` is set to 0, then we only send a single payload per session; this may be useful to test how long it takes to clear a big number of sessions. + +To better emulate what happens during the execution of an MPC protocol, it's best to perform the bandwidth benchmark on all the parties at the same time, such that all the parties send and receive the same amount of data. + + +__NOTE__: The gRPC timeout between the tool and the nodes is set to be `DURATION_SEC` plus the usual request timeout configurable via environment variable as described above. + + +```bash +# Running the bandwidth benchmark on all 4 parties at the same time +kms-health-check bandwidth-bench -c -d -n -p -e localhost:50100 -e -e -e <...> --connections-per-peer +``` + + +An example output is: + +``` +❯ ./target/release/kms-health-check bandwidth-bench -c 0700000000000000000000000000000000000000000000000000000000000001 -d 30 -n 100 -p 100000 -e localhost:50100 -e localhost:50200 -e localhost:50300 -e localhost:50400 --connections-per-peer 3 + +[KMS BANDWIDTH BENCHMARK] +============================================================================== +Benchmark type: Duration-based (30 seconds) +Parallel sessions: 100 +Payload per session: 100000 bytes +Connections / peer: 3 + +------------------------------------------------------------------------------ +Endpoint: localhost:50400 +Peers: 3 +------------------------------------------------------------------------------ + Peer Address Sent (MiB) Secs MiB/s +------------------------------------------------------------------------------ + 2 abcd.dev-kms-core-2.com 8339.98 30 278.00 + latency(ms) avg: 33 + p50/p90/p99: 32/52/74 + slowest/fastest: 111/0 + 1 abcd.dev-kms-core-1.com 8475.02 30 282.50 + latency(ms) avg: 33 + p50/p90/p99: 31/51/78 + slowest/fastest: 172/0 + 3 abcd.dev-kms-core-3.com 7073.50 30 235.78 + latency(ms) avg: 39 + p50/p90/p99: 29/54/343 + slowest/fastest: 999/0 +------------------------------------------------------------------------------ +Summary: total 23888.49 MiB in ~30s, aggregate 796.28 MiB/s + +------------------------------------------------------------------------------ +Endpoint: localhost:50100 +Peers: 3 +------------------------------------------------------------------------------ + Peer Address Sent (MiB) Secs MiB/s +------------------------------------------------------------------------------ + 2 abcd.dev-kms-core-2.com 11069.58 30 368.99 + latency(ms) avg: 25 + p50/p90/p99: 22/42/73 + slowest/fastest: 154/1 + 3 abcd.dev-kms-core-3.com 4642.01 30 154.73 + latency(ms) avg: 61 + p50/p90/p99: 26/164/395 + slowest/fastest: 908/1 + 4 abcd.dev-kms-core-4.com 9154.03 30 305.13 + latency(ms) avg: 30 + p50/p90/p99: 28/48/85 + slowest/fastest: 224/1 +------------------------------------------------------------------------------ +Summary: total 24865.63 MiB in ~30s, aggregate 828.85 MiB/s + +------------------------------------------------------------------------------ +Endpoint: localhost:50200 +Peers: 3 +------------------------------------------------------------------------------ + Peer Address Sent (MiB) Secs MiB/s +------------------------------------------------------------------------------ + 3 abcd.dev-kms-core-3.com 7960.61 30 265.35 + latency(ms) avg: 35 + p50/p90/p99: 18/53/331 + slowest/fastest: 965/0 + 4 abcd.dev-kms-core-4.com 11151.60 30 371.72 + latency(ms) avg: 25 + p50/p90/p99: 24/39/56 + slowest/fastest: 97/0 + 1 abcd.dev-kms-core-1.com 9324.65 30 310.82 + latency(ms) avg: 30 + p50/p90/p99: 20/37/291 + slowest/fastest: 818/0 +------------------------------------------------------------------------------ +Summary: total 28436.85 MiB in ~30s, aggregate 947.90 MiB/s + +------------------------------------------------------------------------------ +Endpoint: localhost:50300 +Peers: 3 +------------------------------------------------------------------------------ + Peer Address Sent (MiB) Secs MiB/s +------------------------------------------------------------------------------ + 4 abcd.dev-kms-core-4.com 10526.08 30 350.87 + latency(ms) avg: 26 + p50/p90/p99: 24/42/77 + slowest/fastest: 241/0 + 2 abcd.dev-kms-core-2.com 7986.55 30 266.22 + latency(ms) avg: 35 + p50/p90/p99: 21/56/332 + slowest/fastest: 805/1 + 1 abcd.dev-kms-core-1.com 15588.28 30 519.61 + latency(ms) avg: 17 + p50/p90/p99: 16/29/43 + slowest/fastest: 75/0 +------------------------------------------------------------------------------ +Summary: total 34100.91 MiB in ~30s, aggregate 1136.70 MiB/s + +============================================================================== +``` \ No newline at end of file diff --git a/tools/kms-health-check/src/checks.rs b/tools/kms-health-check/src/checks.rs index a114a43b9e..a97750130a 100644 --- a/tools/kms-health-check/src/checks.rs +++ b/tools/kms-health-check/src/checks.rs @@ -1,7 +1,10 @@ use anyhow::Result; -use kms_grpc::kms::v1::RequestId; +use kms_grpc::kms::v1::{ + BandwidthBenchmarkRequest, BandwidthBenchmarkResponse, BandwidthKind, RequestId, +}; use serde::{Deserialize, Serialize}; use std::path::Path; +use std::time::Duration; use crate::config::{self}; use crate::grpc_client::GrpcHealthClient; @@ -501,3 +504,34 @@ pub async fn run_full_check( Ok(result) } + +pub async fn run_bandwidth_benchmark( + endpoint: &str, + context_id: String, + duration: u64, + num_sessions: u32, + payload_size: u32, + connections_per_peer: u32, +) -> Result { + let kind = if duration == 0 { + BandwidthKind::Once + } else { + BandwidthKind::Duration + } + .into(); + let request = BandwidthBenchmarkRequest { + duration, + number_sessions: num_sessions, + payload_size_per_session: payload_size, + context_id: Some(RequestId { + request_id: context_id, + }), + connections_per_peer, + kind, + }; + + let client = GrpcHealthClient::new(endpoint); + client + .run_bandwidth_benchmark(request, Duration::from_secs(duration)) + .await +} diff --git a/tools/kms-health-check/src/grpc_client.rs b/tools/kms-health-check/src/grpc_client.rs index 4bb086f521..fa2c79b084 100644 --- a/tools/kms-health-check/src/grpc_client.rs +++ b/tools/kms-health-check/src/grpc_client.rs @@ -1,10 +1,12 @@ use crate::config::HealthCheckConfig; use anyhow::{Context, Result}; use kms_grpc::kms::v1::{ - Empty, HealthStatusResponse, KeyMaterialAvailabilityResponse, OperatorPublicKey, + BandwidthBenchmarkRequest, BandwidthBenchmarkResponse, Empty, HealthStatusResponse, + KeyMaterialAvailabilityResponse, OperatorPublicKey, }; use kms_grpc::kms_service::v1::core_service_endpoint_client::CoreServiceEndpointClient; use std::time::{Duration, Instant}; +use tonic::Request; use tonic::transport::Channel; // Global timeout configuration @@ -102,4 +104,19 @@ impl GrpcHealthClient { let response = client.get_health_status(Empty {}).await?; Ok(response.into_inner()) } + + pub async fn run_bandwidth_benchmark( + &self, + request: BandwidthBenchmarkRequest, + duration: std::time::Duration, + ) -> Result { + let channel = Channel::from_shared(self.endpoint.clone())? + .timeout(duration + self.timeouts.request_timeout) // Set timeout to benchmark duration + buffer + .connect() + .await?; + + let mut client = CoreServiceEndpointClient::new(channel.clone()); + let response = client.bandwidth_benchmark(Request::new(request)).await?; + Ok(response.into_inner()) + } } diff --git a/tools/kms-health-check/src/main.rs b/tools/kms-health-check/src/main.rs index 4e34ae2135..edba0ae99a 100644 --- a/tools/kms-health-check/src/main.rs +++ b/tools/kms-health-check/src/main.rs @@ -4,6 +4,8 @@ use clap::{Parser, Subcommand}; use std::path::Path; use std::path::PathBuf; +use crate::output::{print_bandwidth_benchmark_json, print_bandwidth_benchmark_text}; + mod checks; mod config; mod grpc_client; @@ -55,6 +57,39 @@ enum Commands { #[arg(short, long)] config: PathBuf, }, + /// Runs a bandwidth benchmark against a lit of KMS endpoints. + /// NOTE: It makes more sense to run it on all the parties at the same time to emulate real bandwidth usage, but it can be run on a subset of parties as well. + BandwidthBench { + /// KMS endpoints to test (e.g., --endpoints host1:50100 --endpoints host2:50100) + #[arg(short, long)] + endpoints: Vec, + + /// Context id of the MPC context to test + #[arg(short, long)] + context_id: String, + + /// Duration of the benchmark in seconds + /// If Duration is set to 0 we only send one payload per session to each party and return the result immediately. + #[arg(short, long)] + duration: u64, + + /// Number of sessions trying to send bytes in parallel + #[arg(short, long)] + num_sessions: u32, + + /// Payload size per session in bytes + #[arg(short, long)] + payload_size: u32, + + /// Number of independent gRPC connections to open per peer. + /// Sessions are striped round-robin across these connections so + /// they no longer all share a single HTTP/2 codec task. + /// Defaults to 1, which preserves the historical single-connection + /// behavior; raise it (e.g. 8) when investigating small-payload + /// throughput. + #[arg(long, default_value_t = 1)] + connections_per_peer: u32, + }, } #[derive(Debug, Clone, ValueEnum)] @@ -83,16 +118,73 @@ async fn main() -> Result<()> { tracing::info!("Log level: {}", log_level); tracing::info!("Output format: {:?}", cli.format); - let result = match cli.command { - Commands::Config { file } => checks::run_config_validation(file.to_str().unwrap()).await?, + match cli.command { + Commands::Config { file } => { + output::print_result( + checks::run_config_validation(file.to_str().unwrap()).await?, + &cli.format, + )?; + } Commands::Live { endpoint, config } => { - checks::check_live(&endpoint, config.as_deref().map(Path::new)).await? + output::print_result( + checks::check_live(&endpoint, config.as_deref().map(Path::new)).await?, + &cli.format, + )?; } Commands::Full { endpoint, config } => { - checks::run_full_check(Some(config.to_str().unwrap()), &endpoint).await? + output::print_result( + checks::run_full_check(Some(config.to_str().unwrap()), &endpoint).await?, + &cli.format, + )?; + } + Commands::BandwidthBench { + endpoints, + context_id, + duration, + num_sessions, + payload_size, + connections_per_peer, + } => { + let mut joinset = tokio::task::JoinSet::new(); + for ep in endpoints { + let context_id = context_id.clone(); + joinset.spawn(async move { + let result = checks::run_bandwidth_benchmark( + &ep, + context_id, + duration, + num_sessions, + payload_size, + connections_per_peer, + ) + .await; + (ep, result) + }); + } + let mut results = Vec::new(); + while let Some(res) = joinset.join_next().await { + let (endpoint, result) = res?; + let result = result?; + results.push((endpoint, result)); + } + match cli.format { + OutputFormat::Json => print_bandwidth_benchmark_json( + duration, + num_sessions, + payload_size, + connections_per_peer, + results, + )?, + OutputFormat::Text => print_bandwidth_benchmark_text( + duration, + num_sessions, + payload_size, + connections_per_peer, + results, + )?, + } } }; - output::print_result(result, &cli.format)?; Ok(()) } diff --git a/tools/kms-health-check/src/output.rs b/tools/kms-health-check/src/output.rs index 786db052f9..826fac7c78 100644 --- a/tools/kms-health-check/src/output.rs +++ b/tools/kms-health-check/src/output.rs @@ -1,5 +1,6 @@ use crate::checks::{HealthCheckResult, HealthStatus}; use anyhow::Result; +use kms_grpc::kms::v1::BandwidthBenchmarkResponse; use std::fmt::Write; pub fn print_result(result: HealthCheckResult, format: &crate::OutputFormat) -> Result<()> { @@ -200,3 +201,199 @@ fn print_json(result: &HealthCheckResult) -> Result<()> { println!("{}", json); Ok(()) } + +pub fn print_bandwidth_benchmark_text( + duration: u64, + num_sessions: u32, + payload_size: u32, + connections_per_peer: u32, + results: Vec<(String, BandwidthBenchmarkResponse)>, +) -> Result<()> { + let oneshot = duration == 0; + let mut output = String::with_capacity(4096); + writeln!(output, "\n[KMS BANDWIDTH BENCHMARK]")?; + writeln!(output, "{}", "=".repeat(78))?; + if oneshot { + writeln!( + output, + "Benchmark type: One-shot (single payload per session)" + )?; + } else { + writeln!( + output, + "Benchmark type: Duration-based ({} seconds)", + duration + )?; + } + writeln!(output, "Parallel sessions: {}", num_sessions)?; + writeln!(output, "Payload per session: {} bytes", payload_size)?; + writeln!( + output, + "Connections / peer: {}", + connections_per_peer.max(1) + )?; + + for (endpoint, result) in results { + writeln!(output, "\n{}", "-".repeat(78))?; + writeln!(output, "Endpoint: {}", endpoint)?; + writeln!(output, "Peers: {}", result.peers_info.len())?; + writeln!(output, "{}", "-".repeat(78))?; + writeln!( + output, + "{:>7} {:<24} {:>12} {:>8} {:>10}", + "Peer", "Address", "Sent (MiB)", "Secs", "MiB/s" + )?; + writeln!(output, "{}", "-".repeat(78))?; + + let mut total_bytes_sent: u64 = 0; + let mut max_duration_secs: u64 = 0; + + for peer in &result.peers_info { + total_bytes_sent = total_bytes_sent.saturating_add(peer.bytes_sent); + max_duration_secs = max_duration_secs.max(peer.duration); + + let sent_mib = peer.bytes_sent as f64 / (1024.0 * 1024.0); + let throughput_mib_per_sec = if peer.duration == 0 { + 0.0 + } else { + sent_mib / (peer.duration as f64) + }; + + writeln!( + output, + "{:>7} {:<24} {:>12.2} {:>8} {:>10.2}", + peer.peer_id, peer.endpoint, sent_mib, peer.duration, throughput_mib_per_sec + )?; + + if let Some(latency) = &peer.latency { + writeln!(output, " latency(ms) avg: {}", latency.average,)?; + writeln!( + output, + " p50/p90/p99: {}/{}/{}", + latency.p50, latency.p90, latency.p99, + )?; + writeln!( + output, + " slowest/fastest: {}/{}", + latency.slowest, latency.fastest + )?; + } + } + + let total_mib = total_bytes_sent as f64 / (1024.0 * 1024.0); + let aggregate_mib_per_sec = if max_duration_secs == 0 { + 0.0 + } else { + total_mib / (max_duration_secs as f64) + }; + writeln!(output, "{}", "-".repeat(78))?; + writeln!( + output, + "Summary: total {:.2} MiB in ~{}s, aggregate {:.2} MiB/s", + total_mib, max_duration_secs, aggregate_mib_per_sec + )?; + } + + writeln!(output, "\n{}", "=".repeat(78))?; + print!("{}", output); + Ok(()) +} + +/// JSON-formatted equivalent of [`print_bandwidth_benchmark_text`]. +/// +/// Emits a single pretty-printed JSON document to stdout containing the +/// same configuration, per-peer measurements, and per-endpoint summary +/// statistics. The shape is intentionally machine-friendly (no padding, +/// fully explicit unit suffixes in field names) so downstream tooling can +/// parse it with `jq` or `serde_json` directly. +/// +/// All MiB values are computed with a 1024 × 1024 divisor (matching the +/// text formatter). Throughput fields are 0.0 when the relevant duration +/// is 0 (one-shot mode), since per-second rates are not meaningful in +/// that case. +pub fn print_bandwidth_benchmark_json( + duration: u64, + num_sessions: u32, + payload_size: u32, + connections_per_peer: u32, + results: Vec<(String, BandwidthBenchmarkResponse)>, +) -> Result<()> { + let oneshot = duration == 0; + + let endpoints: Vec = results + .into_iter() + .map(|(endpoint, result)| { + let mut total_bytes_sent: u64 = 0; + let mut max_duration_secs: u64 = 0; + + let peers: Vec = result + .peers_info + .iter() + .map(|peer| { + total_bytes_sent = total_bytes_sent.saturating_add(peer.bytes_sent); + max_duration_secs = max_duration_secs.max(peer.duration); + + let sent_mib = peer.bytes_sent as f64 / (1024.0 * 1024.0); + let throughput_mib_per_sec = if peer.duration == 0 { + 0.0 + } else { + sent_mib / (peer.duration as f64) + }; + + let latency = peer.latency.as_ref().map(|l| { + serde_json::json!({ + "average_ms": l.average, + "p50_ms": l.p50, + "p90_ms": l.p90, + "p99_ms": l.p99, + "slowest_ms": l.slowest, + "fastest_ms": l.fastest, + }) + }); + + serde_json::json!({ + "peer_id": peer.peer_id, + "endpoint": peer.endpoint, + "bytes_sent": peer.bytes_sent, + "duration_secs": peer.duration, + "throughput_mib_per_sec": throughput_mib_per_sec, + "latency": latency, + }) + }) + .collect(); + + let total_mib = total_bytes_sent as f64 / (1024.0 * 1024.0); + let aggregate_mib_per_sec = if max_duration_secs == 0 { + 0.0 + } else { + total_mib / (max_duration_secs as f64) + }; + + serde_json::json!({ + "endpoint": endpoint, + "num_peers": peers.len(), + "peers": peers, + "summary": { + "total_bytes_sent": total_bytes_sent, + "total_mib": total_mib, + "duration_secs": max_duration_secs, + "aggregate_mib_per_sec": aggregate_mib_per_sec, + }, + }) + }) + .collect(); + + let report = serde_json::json!({ + "config": { + "benchmark_type": if oneshot { "oneshot" } else { "duration" }, + "duration_secs": duration, + "num_sessions": num_sessions, + "payload_size_bytes": payload_size, + "connections_per_peer": connections_per_peer.max(1), + }, + "endpoints": endpoints, + }); + + println!("{}", serde_json::to_string_pretty(&report)?); + Ok(()) +}