Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions bin/correctness/datadog-intake/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ workspace = true
axum = { workspace = true, features = ["http1", "json", "tokio", "tracing"] }
datadog-protos = { workspace = true }
protobuf = { workspace = true }
rcgen = { workspace = true, features = ["crypto", "aws_lc_rs"] }
rmp-serde = { workspace = true }
rustls = { workspace = true }
rustls-pki-types = { workspace = true }
saluki-error = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
Expand All @@ -23,6 +26,7 @@ tokio = { workspace = true, features = [
"rt-multi-thread",
"signal",
] }
tokio-rustls = { workspace = true }
tower-http = { workspace = true, features = [
"compression-zstd",
"decompression-deflate",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
use axum::{body::Bytes, extract::State, http::StatusCode, Json};
use serde_json::Value;
use tracing::{info, warn};

use super::ApmTelemetryState;

/// Handles `GET /agent-telemetry/dump` — returns all collected APM telemetry payloads as JSON.
pub async fn handle_agent_telemetry_dump(State(state): State<ApmTelemetryState>) -> Json<Vec<Value>> {
info!("Got request to dump agent telemetry payloads.");
Json(state.dump_payloads())
}

/// Handles `POST /api/v2/apmtelemetry` — stores the raw JSON payload for later retrieval.
pub async fn handle_apmtelemetry(State(state): State<ApmTelemetryState>, body: Bytes) -> StatusCode {
match serde_json::from_slice::<Value>(&body) {
Ok(payload) => {
info!("Received APM telemetry payload.");
state.add_payload(payload);
StatusCode::ACCEPTED
}
Err(e) => {
warn!(error = %e, bytes = body.len(), "Failed to parse APM telemetry payload as JSON.");
StatusCode::BAD_REQUEST
}
}
}
24 changes: 24 additions & 0 deletions bin/correctness/datadog-intake/src/app/agent_telemetry/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
use axum::{
routing::{get, post},
Router,
};

mod handlers;
use self::handlers::*;

mod state;
pub use self::state::ApmTelemetryState;

/// Builds the agent telemetry router.
///
/// Routes:
/// - `POST /api/v2/apmtelemetry` — receives agent telemetry payloads from the Datadog Agent's
/// `agenttelemetry` component and stores them for later analysis.
/// - `GET /agent-telemetry/dump` — returns all collected payloads as a JSON array.
pub fn build_agent_telemetry_router() -> Router {
let state = ApmTelemetryState::new();
Router::new()
.route("/api/v2/apmtelemetry", post(handle_apmtelemetry))
.route("/agent-telemetry/dump", get(handle_agent_telemetry_dump))
.with_state(state)
}
28 changes: 28 additions & 0 deletions bin/correctness/datadog-intake/src/app/agent_telemetry/state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
use std::sync::{Arc, Mutex};

use serde_json::Value;

/// Shared state for collected APM telemetry payloads.
#[derive(Clone)]
pub struct ApmTelemetryState {
payloads: Arc<Mutex<Vec<Value>>>,
}

impl ApmTelemetryState {
/// Creates a new `ApmTelemetryState`.
pub fn new() -> Self {
Self {
payloads: Arc::new(Mutex::new(Vec::new())),
}
}

/// Adds a raw JSON payload to the collected set.
pub fn add_payload(&self, payload: Value) {
self.payloads.lock().unwrap().push(payload);
}

/// Returns all collected payloads.
pub fn dump_payloads(&self) -> Vec<Value> {
self.payloads.lock().unwrap().clone()
}
}
2 changes: 2 additions & 0 deletions bin/correctness/datadog-intake/src/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use axum::{
use tower_http::{compression::CompressionLayer, decompression::RequestDecompressionLayer};
use tracing::info;

mod agent_telemetry;
mod events;
mod metrics;
mod misc;
Expand All @@ -17,6 +18,7 @@ pub fn initialize_app_router() -> Router {
let service_checks_state = service_checks::ServiceChecksState::new();

Router::new()
.merge(agent_telemetry::build_agent_telemetry_router())
.merge(events::build_events_router(events_state.clone()))
.merge(metrics::build_metrics_router())
.merge(service_checks::build_service_checks_router(service_checks_state))
Expand Down
120 changes: 115 additions & 5 deletions bin/correctness/datadog-intake/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,23 @@
#![deny(warnings)]
#![deny(missing_docs)]

use std::sync::Arc;

use rcgen::{generate_simple_self_signed, CertifiedKey};
use rustls::{
pki_types::{PrivateKeyDer, PrivatePkcs8KeyDer},
ServerConfig,
};
use saluki_error::{ErrorContext as _, GenericError};
use tokio::{
net::TcpListener,
io,
net::{TcpListener, TcpStream},
select,
signal::unix::{signal, SignalKind},
sync::mpsc,
};
use tracing::{error, info};
use tokio_rustls::TlsAcceptor;
use tracing::{debug, error, info};
use tracing_subscriber::{filter::LevelFilter, EnvFilter};

mod app;
Expand Down Expand Up @@ -41,18 +50,119 @@ async fn main() {
async fn run() -> Result<(), GenericError> {
info!("datadog-intake starting...");

let (shutdown_tx, mut shutdown_rx) = mpsc::channel(1);
// Initialize the AWS-LC crypto provider required by rustls.
rustls::crypto::aws_lc_rs::default_provider()
.install_default()
.map_err(|_| saluki_error::generic_error!("Failed to install rustls crypto provider."))?;

let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<()>(1);
spawn_signal_handlers(shutdown_tx).error_context("Failed to configure signal handlers.")?;

info!("datadog-intake started: listening on 0.0.0.0:2049");
// Build a self-signed TLS cert for the HTTPS proxy listener.
//
// The agent telemetry component always sends to an HTTPS endpoint (hardcoded in the agent),
// so we listen on port 2050 with TLS and proxy connections through to the plain HTTP intake
// on port 2049. The agent must be configured with `skip_ssl_validation: true` to accept the
// self-signed cert.
let tls_acceptor = build_tls_acceptor().error_context("Failed to build TLS acceptor for HTTPS proxy.")?;
let https_listener = TcpListener::bind("0.0.0.0:2050")
.await
.error_context("Failed to bind HTTPS proxy listener on port 2050.")?;
info!("datadog-intake HTTPS proxy: listening on 0.0.0.0:2050 (proxying to 0.0.0.0:2049)");

// Spawn the TLS-terminating proxy as a background task.
//
// Each accepted TLS connection is forwarded to the plain HTTP intake on localhost:2049 via
// a bidirectional byte-copy, so the existing HTTP handlers receive all agent telemetry
// payloads without any changes.
let (proxy_shutdown_tx, proxy_shutdown_rx) = mpsc::channel::<()>(1);
tokio::spawn(run_tls_proxy(https_listener, tls_acceptor, proxy_shutdown_rx));

info!("datadog-intake HTTP: listening on 0.0.0.0:2049");

let listener = TcpListener::bind("0.0.0.0:2049").await.unwrap();
axum::serve(listener, initialize_app_router())
.with_graceful_shutdown(async move { shutdown_rx.recv().await.unwrap_or(()) })
.with_graceful_shutdown(async move {
shutdown_rx.recv().await.unwrap_or(());
// Also stop the TLS proxy.
let _ = proxy_shutdown_tx.send(()).await;
})
.await
.map_err(Into::into)
}

/// Builds a [`TlsAcceptor`] backed by a freshly generated self-signed certificate.
///
/// The certificate covers the `datadog-intake` and `localhost` hostnames so that either
/// address works as a target in the agent configuration.
fn build_tls_acceptor() -> Result<TlsAcceptor, GenericError> {
let subject_alt_names = vec!["datadog-intake".to_string(), "localhost".to_string()];
let CertifiedKey { cert, signing_key } = generate_simple_self_signed(subject_alt_names)
.map_err(|e| saluki_error::generic_error!("Failed to generate self-signed certificate: {}", e))?;

let cert_chain = vec![cert.der().clone()];
let private_key = PrivateKeyDer::Pkcs8(PrivatePkcs8KeyDer::from(signing_key.serialize_der()));

let server_config = ServerConfig::builder()
.with_no_client_auth()
.with_single_cert(cert_chain, private_key)
.map_err(|e| saluki_error::generic_error!("Failed to build TLS server config: {}", e))?;

Ok(TlsAcceptor::from(Arc::new(server_config)))
}

/// Accepts TLS connections on `listener`, decrypts them, and pipes each connection to the plain
/// HTTP intake on `127.0.0.1:2049` via a bidirectional byte-copy.
async fn run_tls_proxy(listener: TcpListener, acceptor: TlsAcceptor, mut shutdown_rx: mpsc::Receiver<()>) {
loop {
select! {
result = listener.accept() => {
let (tcp_stream, peer_addr) = match result {
Ok(s) => s,
Err(e) => {
error!(error = %e, "TLS proxy: failed to accept connection.");
continue;
}
};
debug!(%peer_addr, "TLS proxy: accepted connection.");

let acceptor = acceptor.clone();
tokio::spawn(async move {
let tls_stream = match acceptor.accept(tcp_stream).await {
Ok(s) => s,
Err(e) => {
debug!(error = %e, "TLS proxy: TLS handshake failed.");
return;
}
};

let plain_stream = match TcpStream::connect("127.0.0.1:2049").await {
Ok(s) => s,
Err(e) => {
error!(error = %e, "TLS proxy: failed to connect to HTTP intake.");
return;
}
};

let (mut tls_read, mut tls_write) = io::split(tls_stream);
let (mut plain_read, mut plain_write) = io::split(plain_stream);

// Bidirectional byte-copy: client <-> plain HTTP intake.
let _ = tokio::join!(
io::copy(&mut tls_read, &mut plain_write),
io::copy(&mut plain_read, &mut tls_write),
);
});
}

_ = shutdown_rx.recv() => {
debug!("TLS proxy: received shutdown signal.");
break;
}
}
}
}

fn spawn_signal_handlers(shutdown_tx: mpsc::Sender<()>) -> Result<(), GenericError> {
let mut sigint_handler = signal(SignalKind::interrupt()).error_context("Failed to set up SIGINT handler.")?;
let mut sigterm_handler = signal(SignalKind::terminate()).error_context("Failed to set up SIGTERM handler.")?;
Expand Down
15 changes: 15 additions & 0 deletions bin/correctness/panoramic/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,19 @@ pub struct MatrixConfig {
#[serde(default)]
pub additional_span_ignore_fields: Vec<String>,

/// How long to wait after millstone exits before collecting data, in seconds.
///
/// Propagated unchanged to every expanded [`CorrectnessConfig`].
#[serde(default = "crate::correctness::config::default_flush_wait_secs")]
pub flush_wait_secs: u64,

/// Restrict metrics analysis to exactly these metric names.
///
/// Propagated unchanged to every expanded [`CorrectnessConfig`]. See
/// [`CorrectnessConfig::focus_metrics`] for full semantics.
#[serde(default)]
pub focus_metrics: Vec<String>,

/// Matrix variants. Each entry produces one expanded test case.
pub variants: Vec<MatrixVariant>,

Expand Down Expand Up @@ -591,6 +604,8 @@ impl MatrixConfig {
},
otlp_direct_analysis_mode: self.otlp_direct_analysis_mode,
additional_span_ignore_fields: self.additional_span_ignore_fields.clone(),
flush_wait_secs: self.flush_wait_secs,
focus_metrics: self.focus_metrics.clone(),
base_config_path: PathBuf::new(),
}
})
Expand Down
Loading
Loading