Skip to content
Open
239 changes: 119 additions & 120 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ serde_cbor = "0.11.2"
serde_json = "1.0.132"
sha2 = "0.11.0"
strum = { version = "0.28.0", features = ["derive"] }
thiserror = "2.0"
tikv-jemallocator = "0.7.0"
tikv-jemalloc-ctl = { version = "0.7.0", features = ["stats"] }
time = { version = "0.3.47", features = ["macros", "serde"] }
Expand Down
28 changes: 27 additions & 1 deletion src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,37 @@ pub struct Ic {
#[clap(env, long)]
pub ic_use_discovery: bool,

/// In dynamic routing mode, limits routing to the top K API nodes with best score (ranked by latency and availability).
/// Dynamic routing mode: limits routing to the K top-scored API nodes (ranked by latency and availability).
/// If not set, routing uses all healthy API nodes.
#[clap(env, long)]
pub ic_use_k_top_api_nodes: Option<usize>,

/// Dynamic routing mode: how frequently to update healthy node list when no health state changes occur.
#[clap(env, long, default_value = "5s", value_parser = parse_duration)]
pub ic_discovery_idle_interval: Duration,

/// Dynamic routing mode: how frequently to health check each node
#[clap(env, long, default_value = "1s", value_parser = parse_duration)]
pub ic_discovery_health_check_interval: Duration,

/// Dynamic routing mode: health check timeout
#[clap(env, long, default_value = "3s", value_parser = parse_duration)]
pub ic_discovery_health_check_timeout: Duration,

/// Dynamic routing mode: how frequently to fetch a fresh list of API BNs
#[clap(env, long, default_value = "5m", value_parser = parse_duration)]
pub ic_discovery_node_fetch_interval: Duration,

/// Dynamic routing mode: EWMA alpha parameter, should be between 0.0 and 1.0.
/// The lower the value - the higher recent observations are valued over older ones.
#[clap(env, long, default_value = "0.5")]
pub ic_discovery_ewma_alpha: f64,

/// Dynamic routing mode: weight of the reliability metric, relative to the latency,
/// should be between 0.0 and 1.0. The weight of the latency metric will be (1.0 - reliability_weight).
#[clap(env, long, default_value = "0.9")]
pub ic_discovery_reliability_weight: f64,

/// Path to an IC root key. Must be DER-encoded.
/// If not specified - hardcoded or fetched (see `--ic-unsafe-root-key-fetch`) will be used.
#[clap(env, long)]
Expand Down
58 changes: 36 additions & 22 deletions src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use ic_bn_lib::{
vector::{self, VectorOptions, client::Vector},
};
use ic_bn_lib_common::{
principal,
traits::{custom_domains::ProvidesCustomDomains, tls::ProvidesCertificates},
types::{
dns::Options as DnsOptions,
Expand Down Expand Up @@ -124,36 +123,51 @@ pub async fn main(

http_client_opts.tls_config = Some(tls_config);

// Bare reqwest client is for now needed for the Route Provider and 2nd Agent
// TODO improve
let reqwest_client =
bnhttp::client::clients_reqwest::new(http_client_opts.clone(), Some(dns_resolver.clone()))?;
// Reqwest-based HTTP client
let http_client = Arc::new(bnhttp::ReqwestClient::new(
http_client_opts.clone(),
Some(dns_resolver.clone()),
)?);

// Simple Hyper-based HTTP client & an HTTP-service for the Agents backed by it.
// Used by lower-load tasks like RouteProvider
let http_client_hyper = Arc::new(bnhttp::HyperClient::new(
http_client_opts.clone(),
dns_resolver.clone(),
));

let http_service = Arc::new(AgentHttpService::new(
http_client_hyper.clone(),
cli.ic.ic_request_retry_interval,
));

// Create route provider
let route_provider = setup_route_provider(cli, reqwest_client.clone()).await?;
let (route_provider, dynamic_route_provider) =
setup_route_provider(cli, http_client_hyper, http_service.clone()).await?;
health_manager.add(Arc::new(RouteProviderWrapper::new(route_provider.clone())));

// Create a separate Agent backed by Reqwest to use solely with Resolver.
// Create a separate Agent to use solely with Resolver.
// This way we avoid a chicken-and-egg problem:
// - Hyper client needs resolver
// - Resolver needs Agent
// - Agent needs Hyper client
let ic_agent_resolver =
create_agent(cli, Arc::new(reqwest_client), route_provider.clone()).await?;
let ic_agent_resolver = create_agent(cli, http_service, route_provider.clone()).await?;
let api_bn_resolver = ApiBnResolver::new(dns_resolver.clone(), ic_agent_resolver);

let http_client = Arc::new(bnhttp::ReqwestClient::new(
http_client_opts.clone(),
Some(dns_resolver.clone()),
)?);

let http_client_hyper = Arc::new(bnhttp::HyperClientLeastLoaded::new(
// Least-load Hyper-based HTTP Client
let http_client_hyper_ll = Arc::new(bnhttp::HyperClientLeastLoaded::new(
http_client_opts,
api_bn_resolver.clone(),
cli.network.network_http_client_count as usize,
Some(&registry),
));

// HTTP service for the agents
let http_service_ll = Arc::new(AgentHttpService::new(
http_client_hyper_ll.clone(),
cli.ic.ic_request_retry_interval,
));

// Event sinks
let vector_metrics = vector::client::Metrics::new(&registry);
let vector_http = if cli.log.vector.log_vector_url.is_some() {
Expand Down Expand Up @@ -253,18 +267,14 @@ pub async fn main(
}

// Create IC Agent for use by RoutingTableManager / SMTP
let http_service = Arc::new(AgentHttpService::new(
http_client_hyper.clone(),
cli.ic.ic_request_retry_interval,
));
let ic_agent = create_agent(cli, http_service, route_provider.clone())
let ic_agent = create_agent(cli, http_service_ll, route_provider.clone())
.await
.context("unable to create agent for subnets info fetcher")?;

// Create a routing table manager that handles per-subnet information fetching
let routing_table_manager = Arc::new(RoutingTableManager::new(
ic_agent.clone(),
principal!(MAINNET_ROOT_SUBNET_ID),
MAINNET_ROOT_SUBNET_ID,
cli.ic.ic_routing_table_poll_interval,
));
health_manager.add(routing_table_manager.clone());
Expand Down Expand Up @@ -298,7 +308,7 @@ pub async fn main(
&mut tasks,
health_manager.clone(),
http_client.clone(),
http_client_hyper,
http_client_hyper_ll,
Comment thread
Bownairo marked this conversation as resolved.
route_provider.clone(),
&registry,
shutdown_token.clone(),
Expand Down Expand Up @@ -409,6 +419,10 @@ pub async fn main(
warn!("Shutdown signal received, cleaning up");
tasks.stop().await;

if let Some(v) = &dynamic_route_provider {
v.stop().await;
}

// Vector should stop last to ensure that all requests are finished & flushed
if let Some(v) = vector_http {
v.stop().await;
Expand Down
3 changes: 2 additions & 1 deletion src/routing/ic/http_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::{cell::RefCell, sync::Arc, time::Duration};
use anyhow::anyhow;
use async_trait::async_trait;
use bytes::Bytes;
use derive_new::new;
use http::{Request, Response, StatusCode};
use http_body_util::{BodyExt, Full, Limited};
use ic_bn_lib::ic_agent::{AgentError, agent::HttpService};
Expand Down Expand Up @@ -32,7 +33,7 @@ task_local! {
}

/// Service that executes requests on IC-Agent's behalf
#[derive(Debug, derive_new::new)]
#[derive(Debug, new)]
pub struct AgentHttpService {
client: Arc<dyn ClientHttp<Full<Bytes>>>,
retry_interval: Duration,
Expand Down
16 changes: 11 additions & 5 deletions src/routing/ic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,11 @@ pub mod http_service;
pub mod route_provider;
pub mod routing_table_manager;

pub const MAINNET_ROOT_SUBNET_ID: &str =
"tdb26-jop6k-aogll-7ltgs-eruif-6kk7m-qpktf-gdiqx-mxtrf-vb5e6-eqe";

use std::{fs, sync::Arc};
use std::sync::Arc;

use anyhow::{Context, Error};
use bytes::Bytes;
use candid::Principal;
use http::{HeaderMap, StatusCode, header::HeaderName};
use http_body_util::{Either, Full};
use ic_bn_lib::{
Expand All @@ -34,6 +32,12 @@ use tracing::warn;

use crate::Cli;

/// tdb26-jop6k-aogll-7ltgs-eruif-6kk7m-qpktf-gdiqx-mxtrf-vb5e6-eqe
pub const MAINNET_ROOT_SUBNET_ID: Principal = Principal::from_slice(&[
207, 242, 128, 227, 45, 127, 92, 205, 34, 70, 136, 47, 148, 175, 178, 15, 84, 202, 97, 162, 23,
101, 231, 18, 212, 61, 39, 137, 2,
]);

/// Metadata about the request to an API Boundary Node (ic-boundary)
#[derive(Clone, Debug, Default)]
pub struct BNRequestMetadata {
Expand Down Expand Up @@ -123,7 +127,9 @@ pub async fn create_agent(
.context("unable to build Agent")?;

if let Some(v) = &cli.ic.ic_root_key {
let key = fs::read(v).context("unable to read IC root key")?;
let key = tokio::fs::read(v)
.await
.context("unable to read IC root key")?;
agent.set_root_key(key);
} else if cli.ic.ic_unsafe_root_key_fetch {
warn!("Fetching IC root key (UNSAFE)");
Expand Down
88 changes: 0 additions & 88 deletions src/routing/ic/route_provider.rs

This file was deleted.

Loading
Loading