Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions core/grpc/proto/kms-service-insecure.v1.proto
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,6 @@ service CoreServiceEndpoint {
rpc GetKeyMaterialAvailability(kms.v1.Empty) returns (kms.v1.KeyMaterialAvailabilityResponse);

rpc GetHealthStatus(kms.v1.Empty) returns (kms.v1.HealthStatusResponse);

rpc BandwidthBenchmark(kms.v1.BandwidthBenchmarkRequest) returns (kms.v1.BandwidthBenchmarkResponse);
}
5 changes: 4 additions & 1 deletion core/grpc/proto/kms-service.v1.proto
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ service CoreServiceEndpoint {
// * The `crs_id` in `request` will not be usable again.
// * All data associated with the `crs_id` has been removed.
rpc AbortCrsGen(kms.v1.RequestId) returns (kms.v1.Empty);

// Constructs a new MPC context.
//
// That is, updates the internal state and configuration files to support a new MPC context.
Expand Down Expand Up @@ -494,4 +494,7 @@ service CoreServiceEndpoint {
// ## Pre-condition: KMS service must be initialized
// ## Post-condition: No state changes, read-only operation
rpc GetHealthStatus(kms.v1.Empty) returns (kms.v1.HealthStatusResponse);


rpc BandwidthBenchmark(kms.v1.BandwidthBenchmarkRequest) returns (kms.v1.BandwidthBenchmarkResponse);
}
54 changes: 54 additions & 0 deletions core/grpc/proto/kms.v1.proto
Original file line number Diff line number Diff line change
Expand Up @@ -830,3 +830,57 @@ message EpochResultResponse {
message DestroyMpcEpochRequest {
RequestId epoch_id = 1;
}

enum BandwidthKind {
Duration = 0;
Once = 1;
}

message BandwidthBenchmarkRequest {
// The KMS context for which to perform the benchmark
RequestId context_id = 1;
// How long we keep sending bytes (s)
uint64 duration = 2;
// Number of sessions trying to send bytes in parallel
uint32 number_sessions = 3;
// Size of the payload each session will send at a time
uint32 payload_size_per_session = 4;
// Number of independent gRPC connections to open per peer for the
// benchmark. The `number_sessions` sessions are striped across these
// connections (round-robin) so they no longer all share a single
// HTTP/2 codec task. A value of 0 is treated as 1, which preserves
// the historical single-connection behavior. Useful for measuring
// small-payload throughput, where the per-connection codec task is
// typically the bottleneck.
uint32 connections_per_peer = 5;
BandwidthKind kind = 6;
}

// All latency info are in ms
message LatencyInfo {
uint64 average = 1;
uint64 p50 = 2;
uint64 p90 = 3;
uint64 p99 = 4;
uint64 slowest = 5;
uint64 fastest = 6;
}

message PeerBandwidthInfo {
// Peer party ID
uint32 peer_id = 1;
// Peer endpoint address
string endpoint = 2;
// Number of bytes sent to this peer
uint64 bytes_sent = 3;
// Effective time spent averaged over all sessions
// (may differ from requested duration because we don't stop the experiment abruptly)
uint64 duration = 4;
// Some measure of latency over all messages sent
LatencyInfo latency = 5;
}

message BandwidthBenchmarkResponse {
// Benchmark result for each of the peers
repeated PeerBandwidthInfo peers_info = 1;
}
7 changes: 7 additions & 0 deletions core/service/src/engine/centralized/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,4 +404,11 @@ impl<
) -> Result<Response<Empty>, Status> {
unimplemented!("MPC epochs are not supported in centralized KMS");
}

async fn bandwidth_benchmark(
&self,
_request: Request<kms_grpc::kms::v1::BandwidthBenchmarkRequest>,
) -> Result<Response<kms_grpc::kms::v1::BandwidthBenchmarkResponse>, Status> {
unimplemented!("Bandwidth benchmark is not supported in centralized KMS");
}
Comment on lines +408 to +413
}
147 changes: 147 additions & 0 deletions core/service/src/engine/threshold/bandwidth_bench.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
use std::{collections::HashMap, ops::AddAssign, time::Duration};

use itertools::Itertools;
use kms_grpc::{
ContextId,
kms::v1::{
BandwidthBenchmarkRequest, BandwidthBenchmarkResponse, BandwidthKind, LatencyInfo,
PeerBandwidthInfo,
},
};
use threshold_networking::health_check::{BenchKind, HealthCheckStatus};
use tonic::{Request, Response, Status};

use crate::engine::{
threshold::service::session::ImmutableSessionMaker,
validation::{RequestIdParsingErr, parse_optional_grpc_request_id},
};

pub(crate) async fn run_bandwidth_benchmark(
request: Request<BandwidthBenchmarkRequest>,
session_maker: ImmutableSessionMaker,
) -> Result<Response<BandwidthBenchmarkResponse>, Status> {
tracing::info!("Received bandwidth benchmark request: {:?}", request);
let request = request.into_inner();
Comment on lines +19 to +24
let context_id: ContextId =
parse_optional_grpc_request_id(&request.context_id, RequestIdParsingErr::Context)?;
let payload_size = request.payload_size_per_session as usize;
let num_sessions = request.number_sessions as usize;
let kind: BandwidthKind = request.kind.try_into().map_err(|e| {
Status::invalid_argument(format!(
"Invalid bandwidth benchmark kind {}: {}",
request.kind, e
))
})?;
let duration = match kind {
BandwidthKind::Once => BenchKind::Once,
BandwidthKind::Duration => BenchKind::Duration(Duration::from_secs(request.duration)),
};
// A value of 0 (the proto default for older clients) is treated as 1
// so this remains backward-compatible with the historical
// single-connection benchmark.
let connections_per_peer = (request.connections_per_peer as usize).max(1);

let mut join_set = tokio::task::JoinSet::new();
for session_idx in 0..num_sessions {
let session = session_maker
.get_healthcheck_session_with_pool(&context_id, connections_per_peer)
.await
.map_err(|e| {
Status::internal(format!(
"Failed to create health check session for context {}: {}",
context_id, e
))
})?;
join_set.spawn(async move {
session
.run_bandwidth_benchmark(payload_size, duration, session_idx)
.await
});
}

let mut results = HashMap::new();

while let Some(result) = join_set.join_next().await {
let result = result
.map_err(|e| {
Status::internal(format!("Failed to join bandwidth benchmark task: {}", e))
})?
.map_err(|e| Status::internal(format!("Bandwidth benchmark task failed: {}", e)))?;
for ((role, id), (bytes_sent, duration, status)) in result {
let (entry_sent, entry_duration, entry_status) = results
.entry((role, id))
.or_insert_with(|| (0, vec![], vec![]));
entry_sent.add_assign(bytes_sent);
entry_duration.push(duration);
entry_status.extend(status);
Comment on lines +70 to +76
}
}

let peers_info = results
.into_iter()
.map(|((role, id), (bytes_sent, durations, status))| {
// Fill up tha latency struct
let latency = make_latency(status)
Comment on lines +82 to +84
.inspect_err(|e| tracing::warn!("Error computing latency info: {}", e))
.ok();
let duration = match kind {
BandwidthKind::Once => durations
.into_iter()
.map(|d| d.as_secs())
.max()
.unwrap_or(0),
BandwidthKind::Duration => {
(durations.iter().sum::<Duration>().as_secs_f64() / durations.len() as f64)
as u64
}
};
PeerBandwidthInfo {
peer_id: role.one_based() as u32,
endpoint: id.hostname().to_string(),
bytes_sent: bytes_sent as u64,
duration,
latency,
}
})
.collect_vec();

tracing::info!("Bandwidth benchmark completed. Results: {:?}", peers_info);

Ok(Response::new(BandwidthBenchmarkResponse { peers_info }))
}

fn make_latency(status: Vec<HealthCheckStatus>) -> Result<LatencyInfo, String> {
let latencies: Vec<u128> = status
.iter()
.map(|s| match s {
HealthCheckStatus::Ok(duration) => Ok(duration.as_millis()),
HealthCheckStatus::Error(_) => {
Err("Some error sending at least one payload".to_string())
}
HealthCheckStatus::TimeOut(_) => {
Err("Timeout sending at least one payload".to_string())
}
})
.try_collect()?;

let sorted_latencies = latencies.into_iter().sorted().collect_vec();

let average = (sorted_latencies.iter().sum::<u128>() as f64 / status.len() as f64) as u64;
let p50_idx = status.len() / 2;
let p50 = sorted_latencies.get(p50_idx).copied().unwrap_or(0) as u64;
let p90_idx = (status.len() * 90) / 100;
let p90 = sorted_latencies.get(p90_idx).copied().unwrap_or(0) as u64;
let p99_idx = (status.len() * 99) / 100;
let p99 = sorted_latencies.get(p99_idx).copied().unwrap_or(0) as u64;
let slowest = sorted_latencies.last().copied().unwrap_or(0) as u64;
let fastest = sorted_latencies.first().copied().unwrap_or(0) as u64;

Ok(LatencyInfo {
average,
p50,
p90,
p99,
slowest,
fastest,
})
}
10 changes: 10 additions & 0 deletions core/service/src/engine/threshold/endpoint.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::engine::threshold::bandwidth_bench::run_bandwidth_benchmark;
use crate::engine::threshold::threshold_kms::ThresholdKms;
use crate::engine::threshold::traits::{
CrsGenerator, KeyGenPreprocessor, KeyGenerator, PublicDecryptor, UserDecryptor,
Expand Down Expand Up @@ -408,5 +409,14 @@ impl_endpoint! {

Ok(Response::new(response))
}


#[tracing::instrument(skip(self, request))]
async fn bandwidth_benchmark(
&self,
request: Request<kms_grpc::kms::v1::BandwidthBenchmarkRequest>,
) -> Result<Response<kms_grpc::kms::v1::BandwidthBenchmarkResponse>, Status> {
run_bandwidth_benchmark(request, self.session_maker.clone()).await
}
}
}
1 change: 1 addition & 0 deletions core/service/src/engine/threshold/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod bandwidth_bench;
mod endpoint;
pub mod service;
pub mod threshold_kms;
Expand Down
39 changes: 38 additions & 1 deletion core/service/src/engine/threshold/service/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,13 +244,28 @@ impl SessionMaker {
async fn get_healthcheck_session(
&self,
context_id: &ContextId,
) -> anyhow::Result<HealthCheckSession<Role>> {
self.get_healthcheck_session_with_pool(context_id, 1).await
}

// Returns a health check session whose per-peer connection pool has
// `connections_per_peer` independent gRPC connections. With
// `connections_per_peer = 1` this matches [`get_healthcheck_session`].
// Used by the bandwidth benchmark to stripe parallel sessions across
// multiple HTTP/2 codec tasks.
async fn get_healthcheck_session_with_pool(
&self,
context_id: &ContextId,
connections_per_peer: usize,
) -> anyhow::Result<HealthCheckSession<Role>> {
let nm = self.networking_manager.read().await;
let role_assignment = self.get_role_assignment(context_id).await?;
let my_role = self.my_role(context_id).await?;

if let Some(role) = my_role {
Ok(nm.make_healthcheck_session(&role_assignment, role).await?)
Ok(nm
.make_healthcheck_session_with_pool(&role_assignment, role, connections_per_peer)
.await?)
} else {
Err(anyhow::anyhow!(
"My role is not defined for context {}",
Expand Down Expand Up @@ -861,6 +876,28 @@ impl ImmutableSessionMaker {
) -> anyhow::Result<HashMap<ContextId, HealthCheckSession<Role>>> {
self.inner.get_healthcheck_session_all_contexts().await
}

// Returns a health check session for the given context.
//pub(crate) async fn get_healthcheck_session(
// &self,
// context_id: &ContextId,
//) -> anyhow::Result<HealthCheckSession<Role>> {
Comment on lines +880 to +884
// self.inner.get_healthcheck_session(context_id).await
//}

// Returns a health check session for the given context whose per-peer
// connection pool has `connections_per_peer` independent gRPC
// connections. With `connections_per_peer = 1` this matches
// [`get_healthcheck_session`]. Used by the bandwidth benchmark.
pub(crate) async fn get_healthcheck_session_with_pool(
&self,
context_id: &ContextId,
connections_per_peer: usize,
) -> anyhow::Result<HealthCheckSession<Role>> {
self.inner
.get_healthcheck_session_with_pool(context_id, connections_per_peer)
.await
}
}

/// Validates that a context and epoch ID exists and returns the role of the current server in this context.
Expand Down
1 change: 1 addition & 0 deletions core/threshold-networking/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ description = "Provides networking for all threshold network components. GRPC, s
[dependencies]
anyhow.workspace = true
clap = { workspace = true, features = ["derive", "env"] }
rand.workspace = true
rcgen.workspace = true
async-trait.workspace = true
attestation-doc-validation.workspace = true
Expand Down
1 change: 1 addition & 0 deletions core/threshold-networking/protos/gnetworking.proto
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ enum Status {

message HealthCheckRequest {
bytes tag = 1;
bytes payload = 2; // throwaway bytes for bandwidth testing
}

message HealthCheckResponse {}
Loading
Loading