diff --git a/Cargo.lock b/Cargo.lock index a7ff3b6ee7d..3e087aa8754 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1060,12 +1060,16 @@ dependencies = [ "axum", "datadog-protos", "protobuf", + "rcgen", "rmp-serde", + "rustls", + "rustls-pki-types", "saluki-error", "serde", "serde_json", "stele", "tokio", + "tokio-rustls", "tower-http 0.6.8", "tracing", "tracing-subscriber", diff --git a/bin/correctness/datadog-intake/Cargo.toml b/bin/correctness/datadog-intake/Cargo.toml index a7db1bbbe81..13c1de6aeed 100644 --- a/bin/correctness/datadog-intake/Cargo.toml +++ b/bin/correctness/datadog-intake/Cargo.toml @@ -12,7 +12,10 @@ workspace = true axum = { workspace = true, features = ["http1", "json", "tokio", "tracing"] } datadog-protos = { workspace = true } protobuf = { workspace = true } +rcgen = { workspace = true, features = ["crypto", "aws_lc_rs"] } rmp-serde = { workspace = true } +rustls = { workspace = true } +rustls-pki-types = { workspace = true } saluki-error = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } @@ -23,6 +26,7 @@ tokio = { workspace = true, features = [ "rt-multi-thread", "signal", ] } +tokio-rustls = { workspace = true } tower-http = { workspace = true, features = [ "compression-zstd", "decompression-deflate", diff --git a/bin/correctness/datadog-intake/src/app/agent_telemetry/handlers.rs b/bin/correctness/datadog-intake/src/app/agent_telemetry/handlers.rs new file mode 100644 index 00000000000..32b1e787e34 --- /dev/null +++ b/bin/correctness/datadog-intake/src/app/agent_telemetry/handlers.rs @@ -0,0 +1,26 @@ +use axum::{body::Bytes, extract::State, http::StatusCode, Json}; +use serde_json::Value; +use tracing::{info, warn}; + +use super::ApmTelemetryState; + +/// Handles `GET /agent-telemetry/dump` — returns all collected APM telemetry payloads as JSON. +pub async fn handle_agent_telemetry_dump(State(state): State) -> Json> { + info!("Got request to dump agent telemetry payloads."); + Json(state.dump_payloads()) +} + +/// Handles `POST /api/v2/apmtelemetry` — stores the raw JSON payload for later retrieval. +pub async fn handle_apmtelemetry(State(state): State, body: Bytes) -> StatusCode { + match serde_json::from_slice::(&body) { + Ok(payload) => { + info!("Received APM telemetry payload."); + state.add_payload(payload); + StatusCode::ACCEPTED + } + Err(e) => { + warn!(error = %e, bytes = body.len(), "Failed to parse APM telemetry payload as JSON."); + StatusCode::BAD_REQUEST + } + } +} diff --git a/bin/correctness/datadog-intake/src/app/agent_telemetry/mod.rs b/bin/correctness/datadog-intake/src/app/agent_telemetry/mod.rs new file mode 100644 index 00000000000..a8482a4cb84 --- /dev/null +++ b/bin/correctness/datadog-intake/src/app/agent_telemetry/mod.rs @@ -0,0 +1,24 @@ +use axum::{ + routing::{get, post}, + Router, +}; + +mod handlers; +use self::handlers::*; + +mod state; +pub use self::state::ApmTelemetryState; + +/// Builds the agent telemetry router. +/// +/// Routes: +/// - `POST /api/v2/apmtelemetry` — receives agent telemetry payloads from the Datadog Agent's +/// `agenttelemetry` component and stores them for later analysis. +/// - `GET /agent-telemetry/dump` — returns all collected payloads as a JSON array. +pub fn build_agent_telemetry_router() -> Router { + let state = ApmTelemetryState::new(); + Router::new() + .route("/api/v2/apmtelemetry", post(handle_apmtelemetry)) + .route("/agent-telemetry/dump", get(handle_agent_telemetry_dump)) + .with_state(state) +} diff --git a/bin/correctness/datadog-intake/src/app/agent_telemetry/state.rs b/bin/correctness/datadog-intake/src/app/agent_telemetry/state.rs new file mode 100644 index 00000000000..a894b3e7a27 --- /dev/null +++ b/bin/correctness/datadog-intake/src/app/agent_telemetry/state.rs @@ -0,0 +1,28 @@ +use std::sync::{Arc, Mutex}; + +use serde_json::Value; + +/// Shared state for collected APM telemetry payloads. +#[derive(Clone)] +pub struct ApmTelemetryState { + payloads: Arc>>, +} + +impl ApmTelemetryState { + /// Creates a new `ApmTelemetryState`. + pub fn new() -> Self { + Self { + payloads: Arc::new(Mutex::new(Vec::new())), + } + } + + /// Adds a raw JSON payload to the collected set. + pub fn add_payload(&self, payload: Value) { + self.payloads.lock().unwrap().push(payload); + } + + /// Returns all collected payloads. + pub fn dump_payloads(&self) -> Vec { + self.payloads.lock().unwrap().clone() + } +} diff --git a/bin/correctness/datadog-intake/src/app/mod.rs b/bin/correctness/datadog-intake/src/app/mod.rs index 1d81f5ab359..d0f4efb9567 100644 --- a/bin/correctness/datadog-intake/src/app/mod.rs +++ b/bin/correctness/datadog-intake/src/app/mod.rs @@ -6,6 +6,7 @@ use axum::{ use tower_http::{compression::CompressionLayer, decompression::RequestDecompressionLayer}; use tracing::info; +mod agent_telemetry; mod events; mod metrics; mod misc; @@ -17,6 +18,7 @@ pub fn initialize_app_router() -> Router { let service_checks_state = service_checks::ServiceChecksState::new(); Router::new() + .merge(agent_telemetry::build_agent_telemetry_router()) .merge(events::build_events_router(events_state.clone())) .merge(metrics::build_metrics_router()) .merge(service_checks::build_service_checks_router(service_checks_state)) diff --git a/bin/correctness/datadog-intake/src/main.rs b/bin/correctness/datadog-intake/src/main.rs index 11b05d3f774..85d79fbe8e8 100644 --- a/bin/correctness/datadog-intake/src/main.rs +++ b/bin/correctness/datadog-intake/src/main.rs @@ -3,14 +3,23 @@ #![deny(warnings)] #![deny(missing_docs)] +use std::sync::Arc; + +use rcgen::{generate_simple_self_signed, CertifiedKey}; +use rustls::{ + pki_types::{PrivateKeyDer, PrivatePkcs8KeyDer}, + ServerConfig, +}; use saluki_error::{ErrorContext as _, GenericError}; use tokio::{ - net::TcpListener, + io, + net::{TcpListener, TcpStream}, select, signal::unix::{signal, SignalKind}, sync::mpsc, }; -use tracing::{error, info}; +use tokio_rustls::TlsAcceptor; +use tracing::{debug, error, info}; use tracing_subscriber::{filter::LevelFilter, EnvFilter}; mod app; @@ -41,18 +50,119 @@ async fn main() { async fn run() -> Result<(), GenericError> { info!("datadog-intake starting..."); - let (shutdown_tx, mut shutdown_rx) = mpsc::channel(1); + // Initialize the AWS-LC crypto provider required by rustls. + rustls::crypto::aws_lc_rs::default_provider() + .install_default() + .map_err(|_| saluki_error::generic_error!("Failed to install rustls crypto provider."))?; + + let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<()>(1); spawn_signal_handlers(shutdown_tx).error_context("Failed to configure signal handlers.")?; - info!("datadog-intake started: listening on 0.0.0.0:2049"); + // Build a self-signed TLS cert for the HTTPS proxy listener. + // + // The agent telemetry component always sends to an HTTPS endpoint (hardcoded in the agent), + // so we listen on port 2050 with TLS and proxy connections through to the plain HTTP intake + // on port 2049. The agent must be configured with `skip_ssl_validation: true` to accept the + // self-signed cert. + let tls_acceptor = build_tls_acceptor().error_context("Failed to build TLS acceptor for HTTPS proxy.")?; + let https_listener = TcpListener::bind("0.0.0.0:2050") + .await + .error_context("Failed to bind HTTPS proxy listener on port 2050.")?; + info!("datadog-intake HTTPS proxy: listening on 0.0.0.0:2050 (proxying to 0.0.0.0:2049)"); + + // Spawn the TLS-terminating proxy as a background task. + // + // Each accepted TLS connection is forwarded to the plain HTTP intake on localhost:2049 via + // a bidirectional byte-copy, so the existing HTTP handlers receive all agent telemetry + // payloads without any changes. + let (proxy_shutdown_tx, proxy_shutdown_rx) = mpsc::channel::<()>(1); + tokio::spawn(run_tls_proxy(https_listener, tls_acceptor, proxy_shutdown_rx)); + + info!("datadog-intake HTTP: listening on 0.0.0.0:2049"); let listener = TcpListener::bind("0.0.0.0:2049").await.unwrap(); axum::serve(listener, initialize_app_router()) - .with_graceful_shutdown(async move { shutdown_rx.recv().await.unwrap_or(()) }) + .with_graceful_shutdown(async move { + shutdown_rx.recv().await.unwrap_or(()); + // Also stop the TLS proxy. + let _ = proxy_shutdown_tx.send(()).await; + }) .await .map_err(Into::into) } +/// Builds a [`TlsAcceptor`] backed by a freshly generated self-signed certificate. +/// +/// The certificate covers the `datadog-intake` and `localhost` hostnames so that either +/// address works as a target in the agent configuration. +fn build_tls_acceptor() -> Result { + let subject_alt_names = vec!["datadog-intake".to_string(), "localhost".to_string()]; + let CertifiedKey { cert, signing_key } = generate_simple_self_signed(subject_alt_names) + .map_err(|e| saluki_error::generic_error!("Failed to generate self-signed certificate: {}", e))?; + + let cert_chain = vec![cert.der().clone()]; + let private_key = PrivateKeyDer::Pkcs8(PrivatePkcs8KeyDer::from(signing_key.serialize_der())); + + let server_config = ServerConfig::builder() + .with_no_client_auth() + .with_single_cert(cert_chain, private_key) + .map_err(|e| saluki_error::generic_error!("Failed to build TLS server config: {}", e))?; + + Ok(TlsAcceptor::from(Arc::new(server_config))) +} + +/// Accepts TLS connections on `listener`, decrypts them, and pipes each connection to the plain +/// HTTP intake on `127.0.0.1:2049` via a bidirectional byte-copy. +async fn run_tls_proxy(listener: TcpListener, acceptor: TlsAcceptor, mut shutdown_rx: mpsc::Receiver<()>) { + loop { + select! { + result = listener.accept() => { + let (tcp_stream, peer_addr) = match result { + Ok(s) => s, + Err(e) => { + error!(error = %e, "TLS proxy: failed to accept connection."); + continue; + } + }; + debug!(%peer_addr, "TLS proxy: accepted connection."); + + let acceptor = acceptor.clone(); + tokio::spawn(async move { + let tls_stream = match acceptor.accept(tcp_stream).await { + Ok(s) => s, + Err(e) => { + debug!(error = %e, "TLS proxy: TLS handshake failed."); + return; + } + }; + + let plain_stream = match TcpStream::connect("127.0.0.1:2049").await { + Ok(s) => s, + Err(e) => { + error!(error = %e, "TLS proxy: failed to connect to HTTP intake."); + return; + } + }; + + let (mut tls_read, mut tls_write) = io::split(tls_stream); + let (mut plain_read, mut plain_write) = io::split(plain_stream); + + // Bidirectional byte-copy: client <-> plain HTTP intake. + let _ = tokio::join!( + io::copy(&mut tls_read, &mut plain_write), + io::copy(&mut plain_read, &mut tls_write), + ); + }); + } + + _ = shutdown_rx.recv() => { + debug!("TLS proxy: received shutdown signal."); + break; + } + } + } +} + fn spawn_signal_handlers(shutdown_tx: mpsc::Sender<()>) -> Result<(), GenericError> { let mut sigint_handler = signal(SignalKind::interrupt()).error_context("Failed to set up SIGINT handler.")?; let mut sigterm_handler = signal(SignalKind::terminate()).error_context("Failed to set up SIGTERM handler.")?; diff --git a/bin/correctness/panoramic/src/config.rs b/bin/correctness/panoramic/src/config.rs index 99f7548ebf0..7b8626065f5 100644 --- a/bin/correctness/panoramic/src/config.rs +++ b/bin/correctness/panoramic/src/config.rs @@ -495,6 +495,19 @@ pub struct MatrixConfig { #[serde(default)] pub additional_span_ignore_fields: Vec, + /// How long to wait after millstone exits before collecting data, in seconds. + /// + /// Propagated unchanged to every expanded [`CorrectnessConfig`]. + #[serde(default = "crate::correctness::config::default_flush_wait_secs")] + pub flush_wait_secs: u64, + + /// Restrict metrics analysis to exactly these metric names. + /// + /// Propagated unchanged to every expanded [`CorrectnessConfig`]. See + /// [`CorrectnessConfig::focus_metrics`] for full semantics. + #[serde(default)] + pub focus_metrics: Vec, + /// Matrix variants. Each entry produces one expanded test case. pub variants: Vec, @@ -591,6 +604,8 @@ impl MatrixConfig { }, otlp_direct_analysis_mode: self.otlp_direct_analysis_mode, additional_span_ignore_fields: self.additional_span_ignore_fields.clone(), + flush_wait_secs: self.flush_wait_secs, + focus_metrics: self.focus_metrics.clone(), base_config_path: PathBuf::new(), } }) diff --git a/bin/correctness/panoramic/src/correctness/analysis/agent_telemetry/mod.rs b/bin/correctness/panoramic/src/correctness/analysis/agent_telemetry/mod.rs new file mode 100644 index 00000000000..6aa7fb8694f --- /dev/null +++ b/bin/correctness/panoramic/src/correctness/analysis/agent_telemetry/mod.rs @@ -0,0 +1,367 @@ +use std::collections::BTreeMap; + +use saluki_error::{generic_error, GenericError}; +use serde_json::Value; +use stele::Metric as SteleMetric; +use tracing::{error, info, warn}; + +use crate::correctness::analysis::collected::CollectedData; + +/// A single normalized agent telemetry metric timeseries. +/// +/// A "context" is the unique combination of metric name and tag set. Each context maps to exactly +/// one value in a given telemetry payload. +#[derive(Debug, Clone)] +struct AtelMetric { + name: String, + metric_type: String, + /// Sorted tag pairs, keyed by tag name. + tags: BTreeMap, + value: f64, +} + +impl AtelMetric { + /// Returns the context key — a stable string that uniquely identifies this timeseries. + fn context_key(&self) -> String { + if self.tags.is_empty() { + self.name.clone() + } else { + let tags_str = self + .tags + .iter() + .map(|(k, v)| format!("{}={}", k, v)) + .collect::>() + .join(", "); + format!("{}[{}]", self.name, tags_str) + } + } + + fn display_value(&self) -> String { + format!("{}({})", self.metric_type, self.value) + } +} + +/// Analyzes agent telemetry payloads for correctness. +/// +/// Compares the full set of metric timeseries (context + value) reported by the baseline and +/// comparison targets. Both targets must report the same contexts with the same values. +/// +/// A test failure surfaces one of three categories of mismatch: +/// - **Context mismatch**: a timeseries present on one side but absent from the other. +/// - **Value mismatch**: the same context reported with a different value on each side. +/// - **No payloads**: either side emitted no agent telemetry at all. +pub struct AgentTelemetryAnalyzer<'a> { + baseline_payloads: Vec, + comparison_payloads: Vec, + baseline_data: &'a CollectedData, + comparison_series_data: &'a CollectedData, +} + +impl<'a> AgentTelemetryAnalyzer<'a> { + /// Creates a new `AgentTelemetryAnalyzer` from the given collected data. + pub fn new(baseline_data: &'a CollectedData, comparison_data: &'a CollectedData) -> Self { + Self { + baseline_payloads: baseline_data.agent_telemetry_payloads().to_vec(), + comparison_payloads: comparison_data.agent_telemetry_payloads().to_vec(), + baseline_data, + comparison_series_data: comparison_data, + } + } + + /// Runs the analysis. + /// + /// # Errors + /// + /// Returns an error if either side emitted no agent telemetry payloads, if the sets of + /// reported contexts differ, or if any shared context has a different value on each side. + pub fn run_analysis(self) -> Result<(), (GenericError, Vec)> { + // For each side, count total series points and break them down per flush timestamp + // so we can identify exactly which flush cycle varies between runs. + log_series_point_breakdown("baseline", self.baseline_data.metrics()); + log_series_point_breakdown("comparison", self.comparison_series_data.metrics()); + + info!( + "Analyzing agent telemetry: {} payload(s) from baseline, {} payload(s) from comparison.", + self.baseline_payloads.len(), + self.comparison_payloads.len(), + ); + + if self.baseline_payloads.is_empty() { + return Err(( + generic_error!( + "Baseline emitted no agent telemetry payloads. \ + Check that `agent_telemetry.logs_dd_url` is set in datadog.yaml and \ + `skip_ssl_validation: true` is configured." + ), + vec![], + )); + } + if self.comparison_payloads.is_empty() { + return Err(( + generic_error!( + "Comparison emitted no agent telemetry payloads. \ + Check that `agent_telemetry.logs_dd_url` is set in datadog.yaml and \ + `skip_ssl_validation: true` is configured." + ), + vec![], + )); + } + + let baseline_metrics = extract_metrics(&self.baseline_payloads); + let comparison_metrics = extract_metrics(&self.comparison_payloads); + + info!( + "Extracted {} context(s) from baseline, {} from comparison.", + baseline_metrics.len(), + comparison_metrics.len(), + ); + + // Build context-keyed maps for lookup. + let baseline_map: BTreeMap = + baseline_metrics.iter().map(|m| (m.context_key(), m)).collect(); + let comparison_map: BTreeMap = + comparison_metrics.iter().map(|m| (m.context_key(), m)).collect(); + + let mut details: Vec = Vec::new(); + let mut context_mismatches = 0usize; + let mut value_mismatches = 0usize; + + // Phase 1: contexts in baseline but not comparison. + let baseline_only: Vec<&str> = baseline_map + .keys() + .filter(|k| !comparison_map.contains_key(*k)) + .map(|k| k.as_str()) + .collect(); + + if !baseline_only.is_empty() { + error!( + "Agent telemetry context(s) in baseline but not in comparison ({} total):", + baseline_only.len() + ); + for ctx in &baseline_only { + let m = &baseline_map[*ctx]; + error!(" - {} = {}", ctx, m.display_value()); + details.push(format!("baseline-only: {} = {}", ctx, m.display_value())); + context_mismatches += 1; + } + } + + // Phase 2: contexts in comparison but not baseline. + let comparison_only: Vec<&str> = comparison_map + .keys() + .filter(|k| !baseline_map.contains_key(*k)) + .map(|k| k.as_str()) + .collect(); + + if !comparison_only.is_empty() { + error!( + "Agent telemetry context(s) in comparison but not in baseline ({} total):", + comparison_only.len() + ); + for ctx in &comparison_only { + let m = &comparison_map[*ctx]; + error!(" - {} = {}", ctx, m.display_value()); + details.push(format!("comparison-only: {} = {}", ctx, m.display_value())); + context_mismatches += 1; + } + } + + // Phase 3: shared contexts — compare values. + // + // For GAUGE metrics we allow a tolerance of GAUGE_TOLERANCE points. The sole known source + // of between-run gauge variance is `datadog.agent.running`, which is appended + // unconditionally to every 15-second aggregator flush in + // `pkg/aggregator/aggregator.go:appendDefaultSeries`. It fires on every flush tick with + // the exact flush wall-clock time as its timestamp (not aligned to a DSD 10-second bucket + // boundary). Whether the final firing lands just before or just after the `start_after: 67` + // agenttelemetry snapshot boundary determines whether the run captures 3 or 4 occurrences + // of the metric — a ±1 point swing. There is no agent config to disable this metric; it is + // unconditional code in the aggregator. + // + // Within a single run both agents start at the same second and hit the same flush count, so + // the intra-run comparison is always exact. The tolerance exists solely to guard against + // unexpected future changes that push the intra-run delta above zero. If a WARN ever fires + // here the root cause should be investigated before the tolerance is widened. + const GAUGE_TOLERANCE: f64 = 1.0; + + let shared_keys: Vec<&str> = baseline_map + .keys() + .filter(|k| comparison_map.contains_key(*k)) + .map(|k| k.as_str()) + .collect(); + + for ctx in shared_keys { + let b = baseline_map[ctx]; + let c = comparison_map[ctx]; + + if b.value == c.value { + continue; + } + + let within_tolerance = b.metric_type == "gauge" && (b.value - c.value).abs() <= GAUGE_TOLERANCE; + + if within_tolerance { + warn!( + "Agent telemetry gauge '{}' differs by {} point(s) (within ±{} tolerance — \ + likely datadog.agent.running flush boundary timing):", + ctx, + (b.value - c.value).abs(), + GAUGE_TOLERANCE, + ); + warn!(" baseline: {}", b.display_value()); + warn!(" comparison: {}", c.display_value()); + } else { + value_mismatches += 1; + error!("Agent telemetry value mismatch for '{}':", ctx); + error!(" baseline: {}", b.display_value()); + error!(" comparison: {}", c.display_value()); + let detail = format!( + " {}\n baseline: {}\n comparison: {}", + ctx, + b.display_value(), + c.display_value() + ); + details.push(detail); + } + } + + if context_mismatches == 0 && value_mismatches == 0 { + info!( + "Baseline and comparison agent telemetry match across {} context(s).", + baseline_metrics.len() + ); + return Ok(()); + } + + Err(( + generic_error!( + "Agent telemetry mismatch: {} context difference(s), {} value mismatch(es).", + context_mismatches, + value_mismatches, + ), + details, + )) + } +} + +// --------------------------------------------------------------------------- +// Series point breakdown (diagnostic) +// --------------------------------------------------------------------------- + +/// Logs total series points and a per-flush-timestamp breakdown for one side. +/// +/// The agent serializes all metrics from one aggregation bucket into one series payload, +/// so grouping by timestamp directly mirrors the per-payload point count that increments +/// `point.sent` in the Go forwarder. +fn log_series_point_breakdown(label: &str, metrics: &[SteleMetric]) { + // Map timestamp -> (metric_count, point_count) + let mut by_ts: BTreeMap = BTreeMap::new(); + let mut total_points = 0usize; + + for m in metrics { + for (ts, _val) in m.values() { + let entry = by_ts.entry(*ts).or_default(); + entry.0 += 1; // one more metric context at this timestamp + entry.1 += 1; // one more point + total_points += 1; + } + } + + info!( + "{}: {} total points across {} flush timestamps (from {} metric contexts)", + label, + total_points, + by_ts.len(), + metrics.len(), + ); + for (ts, (ctx_count, point_count)) in &by_ts { + if *ctx_count <= 3 { + // Small bucket — show metric names so we can identify non-DSD sources. + let names: Vec<&str> = metrics + .iter() + .filter(|m| m.values().iter().any(|(t, _)| t == ts)) + .map(|m| m.context().name()) + .collect(); + info!( + " ts={}: {} contexts, {} points [{}]", + ts, + ctx_count, + point_count, + names.join(", ") + ); + } else { + info!(" ts={}: {} contexts, {} points", ts, ctx_count, point_count); + } + } +} + +// --------------------------------------------------------------------------- +// Extraction helpers +// --------------------------------------------------------------------------- + +/// Extracts and sorts all metric timeseries from a set of raw APM telemetry payloads. +fn extract_metrics(payloads: &[Value]) -> Vec { + let mut metrics = Vec::new(); + for payload in payloads { + collect_from_payload(payload, &mut metrics); + } + metrics.sort_by_key(|a| a.context_key()); + metrics +} + +fn collect_from_payload(payload: &Value, out: &mut Vec) { + let request_type = payload.get("request_type").and_then(Value::as_str).unwrap_or(""); + match request_type { + "agent-metrics" => collect_from_metrics_payload(payload.get("payload"), out), + "message-batch" => { + if let Some(batch) = payload.get("payload").and_then(Value::as_array) { + for item in batch { + collect_from_payload(item, out); + } + } + } + other => { + warn!("Skipping unknown agent telemetry request_type: {:?}", other); + } + } +} + +fn collect_from_metrics_payload(payload: Option<&Value>, out: &mut Vec) { + let metrics_map = match payload.and_then(|p| p.get("metrics")).and_then(Value::as_object) { + Some(m) => m, + None => return, + }; + + for (name, val) in metrics_map { + if name == "agent_metadata" { + continue; + } + + let metric_type = val.get("type").and_then(Value::as_str).unwrap_or("?").to_string(); + + let value = match val.get("value").and_then(Value::as_f64) { + Some(v) => v, + None => { + warn!("Agent telemetry metric '{}' has no numeric value; skipping.", name); + continue; + } + }; + + let tags: BTreeMap = val + .get("tags") + .and_then(Value::as_object) + .map(|t| { + t.iter() + .map(|(k, v)| (k.clone(), v.as_str().unwrap_or("").to_string())) + .collect() + }) + .unwrap_or_default(); + + out.push(AtelMetric { + name: name.clone(), + metric_type, + tags, + value, + }); + } +} diff --git a/bin/correctness/panoramic/src/correctness/analysis/collected.rs b/bin/correctness/panoramic/src/correctness/analysis/collected.rs index d5695174fcb..88533c0cce0 100644 --- a/bin/correctness/panoramic/src/correctness/analysis/collected.rs +++ b/bin/correctness/panoramic/src/correctness/analysis/collected.rs @@ -1,4 +1,5 @@ use saluki_error::{generic_error, ErrorContext as _, GenericError}; +use serde_json::Value; use stele::{ClientStatisticsAggregator, Event, Metric, ServiceCheck, Span}; use tracing::debug; @@ -6,6 +7,7 @@ use tracing::debug; /// /// Holds all telemetry data sent by the test target to the `datadog-intake` server spawned for the test run. pub struct CollectedData { + agent_telemetry_payloads: Vec, events: Vec, metrics: Vec, service_checks: Vec, @@ -20,6 +22,7 @@ impl CollectedData { /// /// If the collected data cannot be retrieved from the `datadog-intake` server, an error is returned. pub async fn for_port(datadog_intake_port: u16) -> Result { + let agent_telemetry_payloads = get_captured_agent_telemetry(datadog_intake_port).await?; let events = get_captured_events(datadog_intake_port).await?; let metrics = get_captured_metrics(datadog_intake_port).await?; let service_checks = get_captured_service_checks(datadog_intake_port).await?; @@ -27,6 +30,7 @@ impl CollectedData { let trace_stats = get_captured_trace_stats(datadog_intake_port).await?; Ok(Self { + agent_telemetry_payloads, events, metrics, service_checks, @@ -35,6 +39,11 @@ impl CollectedData { }) } + /// Returns a reference to the collected agent telemetry payloads. + pub fn agent_telemetry_payloads(&self) -> &[Value] { + &self.agent_telemetry_payloads + } + /// Returns a reference to the collected events. pub fn events(&self) -> &[Event] { &self.events @@ -61,6 +70,25 @@ impl CollectedData { } } +async fn get_captured_agent_telemetry(datadog_intake_port: u16) -> Result, GenericError> { + let client = reqwest::Client::new(); + let payloads = client + .get(format!("http://localhost:{}/agent-telemetry/dump", datadog_intake_port)) + .send() + .await + .error_context("Failed to call agent-telemetry dump endpoint on datadog-intake server.")? + .json::>() + .await + .error_context("Failed to decode dumped agent telemetry from datadog-intake response.")?; + + debug!( + "Agent telemetry payloads dumped successfully ({} payload(s)).", + payloads.len() + ); + + Ok(payloads) +} + async fn get_captured_events(datadog_intake_port: u16) -> Result, GenericError> { let client = reqwest::Client::new(); let events = client diff --git a/bin/correctness/panoramic/src/correctness/analysis/metrics/mod.rs b/bin/correctness/panoramic/src/correctness/analysis/metrics/mod.rs index 900dccdc6d8..c2e3182c281 100644 --- a/bin/correctness/panoramic/src/correctness/analysis/metrics/mod.rs +++ b/bin/correctness/panoramic/src/correctness/analysis/metrics/mod.rs @@ -10,11 +10,19 @@ use crate::correctness::analysis::collected::CollectedData; pub struct MetricsAnalyzer { baseline_metrics: NormalizedMetrics, comparison_metrics: NormalizedMetrics, + /// When non-empty, only metrics whose names appear in this list are retained. + focus_metrics: Vec, } impl MetricsAnalyzer { /// Creates a new `MetricsAnalyzer` instance with the given baseline/comparison data. - pub fn new(baseline_data: &CollectedData, comparison_data: &CollectedData) -> Result { + /// + /// When `focus_metrics` is non-empty, the standard internal-telemetry filter is replaced by an + /// allowlist filter that keeps only the named metrics. This allows validating `datadog.*` + /// metrics that would otherwise be stripped. + pub fn new( + baseline_data: &CollectedData, comparison_data: &CollectedData, focus_metrics: Vec, + ) -> Result { let baseline_metrics = NormalizedMetrics::try_from_stele_metrics(baseline_data.metrics()) .error_context("Failed to normalize baseline metrics.")?; @@ -24,6 +32,7 @@ impl MetricsAnalyzer { Ok(Self { baseline_metrics, comparison_metrics, + focus_metrics, }) } @@ -42,8 +51,14 @@ impl MetricsAnalyzer { comparison_metrics.len() ); - // Filter out internal telemetry metrics. - filter_internal_telemetry_metrics(&mut baseline_metrics, &mut comparison_metrics); + // Filter metrics: focus list (allowlist) takes priority over the standard internal-telemetry + // filter. When `focus_metrics` is non-empty we keep only the named metrics; otherwise the + // standard filter removes `datadog.*` and similar internal namespaces. + if !self.focus_metrics.is_empty() { + filter_to_focus_metrics(&mut baseline_metrics, &mut comparison_metrics, &self.focus_metrics); + } else { + filter_internal_telemetry_metrics(&mut baseline_metrics, &mut comparison_metrics); + } // Make sure both the baseline and comparison targets emitted the same unique set of metrics. // @@ -142,6 +157,25 @@ fn compare_metric_values( } } +fn filter_to_focus_metrics( + baseline_metrics: &mut NormalizedMetrics, comparison_metrics: &mut NormalizedMetrics, focus: &[String], +) { + let baseline_before = baseline_metrics.len(); + let comparison_before = comparison_metrics.len(); + + baseline_metrics.remove_matching(|m| !focus.contains(&m.context().name().to_string())); + comparison_metrics.remove_matching(|m| !focus.contains(&m.context().name().to_string())); + + info!( + "Focus filter kept {}/{} baseline metric(s) and {}/{} comparison metric(s) (focus list: [{}]).", + baseline_metrics.len(), + baseline_before, + comparison_metrics.len(), + comparison_before, + focus.join(", "), + ); +} + fn filter_internal_telemetry_metrics( baseline_metrics: &mut NormalizedMetrics, comparison_metrics: &mut NormalizedMetrics, ) { diff --git a/bin/correctness/panoramic/src/correctness/analysis/mod.rs b/bin/correctness/panoramic/src/correctness/analysis/mod.rs index 01786eb5668..24003a2296c 100644 --- a/bin/correctness/panoramic/src/correctness/analysis/mod.rs +++ b/bin/correctness/panoramic/src/correctness/analysis/mod.rs @@ -1,6 +1,7 @@ use saluki_error::GenericError; use serde::Deserialize; +mod agent_telemetry; mod collected; pub use self::collected::CollectedData; @@ -13,6 +14,13 @@ mod traces; #[derive(Clone, Deserialize)] #[serde(rename_all = "snake_case")] pub enum AnalysisMode { + /// Compares agent telemetry payloads between the baseline and comparison targets. + /// + /// Checks that both targets report the same set of metric names via the `agenttelemetry` + /// component. Requires the agent to be configured with `agent_telemetry.logs_dd_url` pointing + /// to the intake's HTTPS listener (port 2050) and `skip_ssl_validation: true`. + AgentTelemetry, + /// Compares events between the baseline and comparison targets. Events, @@ -41,21 +49,27 @@ pub struct AnalysisRunner { baseline_data: CollectedData, comparison_data: CollectedData, traces_options: Option, + focus_metrics: Vec, } impl AnalysisRunner { /// Creates a new `AnalysisRunner` with the given analysis mode, baseline data, and comparison data. /// /// When mode is `Traces`, `traces_options` should be `Some(...)`; otherwise it is ignored. + /// + /// `focus_metrics` is forwarded to `MetricsAnalyzer` when mode is `Metrics`: when non-empty, + /// only the named metrics are retained before comparison (bypassing the standard + /// internal-telemetry filter). Pass an empty `Vec` to use the default filter. pub fn new( mode: AnalysisMode, baseline_data: CollectedData, comparison_data: CollectedData, - traces_options: Option, + traces_options: Option, focus_metrics: Vec, ) -> Self { Self { mode, baseline_data, comparison_data, traces_options, + focus_metrics, } } @@ -67,13 +81,18 @@ impl AnalysisRunner { /// an error is returned alongside the full list of mismatch details (for log output). pub fn run_analysis(self) -> Result<(), (GenericError, Vec)> { match self.mode { + AnalysisMode::AgentTelemetry => { + let analyzer = agent_telemetry::AgentTelemetryAnalyzer::new(&self.baseline_data, &self.comparison_data); + analyzer.run_analysis() + } AnalysisMode::Events => { let analyzer = events::EventsAnalyzer::new(&self.baseline_data, &self.comparison_data); analyzer.run_analysis() } AnalysisMode::Metrics => { - let analyzer = metrics::MetricsAnalyzer::new(&self.baseline_data, &self.comparison_data) - .map_err(|e| (e, vec![]))?; + let analyzer = + metrics::MetricsAnalyzer::new(&self.baseline_data, &self.comparison_data, self.focus_metrics) + .map_err(|e| (e, vec![]))?; analyzer.run_analysis() } AnalysisMode::ServiceChecks => { diff --git a/bin/correctness/panoramic/src/correctness/config.rs b/bin/correctness/panoramic/src/correctness/config.rs index decdcc671e1..5f4f7c694db 100644 --- a/bin/correctness/panoramic/src/correctness/config.rs +++ b/bin/correctness/panoramic/src/correctness/config.rs @@ -44,6 +44,10 @@ fn default_otlp_direct_analysis_mode() -> bool { false } +pub(crate) fn default_flush_wait_secs() -> u64 { + 32 +} + #[derive(Clone, Deserialize)] pub struct Config { #[serde(skip)] @@ -77,6 +81,34 @@ pub struct Config { #[serde(default)] pub additional_span_ignore_fields: Vec, + /// How long to wait after millstone exits before collecting data, in seconds. + /// + /// This gives agents time to flush any remaining aggregated metrics. Defaults to 32 seconds, + /// which covers approximately three 10-second aggregation flush cycles. + /// + /// Tests that need to capture metrics from periodic internal telemetry components (for example, + /// `analysis_mode: agent_telemetry`) should set this high enough that the telemetry fires at + /// least once after all traffic has been fully flushed. The `agent_telemetry.start_after` value + /// in `datadog.yaml` must be less than `(agent_startup_time + flush_wait_secs)` so the payload + /// arrives before data collection. + #[serde(default = "default_flush_wait_secs")] + pub flush_wait_secs: u64, + + /// When non-empty and `analysis_mode` is `metrics`, restrict analysis to exactly these metric + /// names. + /// + /// Instead of applying the standard internal-telemetry filter (which removes `datadog.*`, + /// `system.*`, and similar prefixes), only metrics whose names appear in this list are kept. + /// All other metrics — including user-submitted ones — are discarded before comparison. + /// + /// This is useful when the test goal is to validate specific agent-emitted metrics such as + /// `datadog.agent.point.sent` and `datadog.agent.point.dropped`, which would otherwise be + /// stripped by the default filter. + /// + /// Defaults to empty (standard filtering applies). + #[serde(default)] + pub focus_metrics: Vec, + #[serde(skip, default = "PathBuf::new")] pub(crate) base_config_path: PathBuf, } diff --git a/bin/correctness/panoramic/src/correctness/k8s.rs b/bin/correctness/panoramic/src/correctness/k8s.rs index b7a3b50a244..a164ecf3fe1 100644 --- a/bin/correctness/panoramic/src/correctness/k8s.rs +++ b/bin/correctness/panoramic/src/correctness/k8s.rs @@ -33,7 +33,7 @@ use crate::{ }; const POD_NAME: &str = "correctness-pod"; -const FLUSH_WAIT: Duration = Duration::from_secs(30); +const DEFAULT_K8S_FLUSH_WAIT: Duration = Duration::from_secs(30); const POD_POLL_INTERVAL: Duration = Duration::from_secs(1); const POD_READY_TIMEOUT: Duration = Duration::from_secs(120); // Allow enough time for millstone to finish sending all metrics plus a buffer. @@ -291,8 +291,9 @@ pub async fn run_k8s_correctness_test(name: String, config: Config, tctx: TestCo return make_error_result(name, started, "millstone_exit", cleanup(e).await); } - debug!("Millstone completed. Waiting {:?} for flush...", FLUSH_WAIT); - sleep(FLUSH_WAIT).await; + let flush_wait = Duration::from_secs(config.flush_wait_secs).max(DEFAULT_K8S_FLUSH_WAIT); + debug!("Millstone completed. Waiting {:?} for flush...", flush_wait); + sleep(flush_wait).await; // Phase 5: Collect data from both agent pods in parallel. let (baseline_result, comparison_result) = tokio::join!( @@ -337,7 +338,13 @@ pub async fn run_k8s_correctness_test(name: String, config: Config, tctx: TestCo }), _ => None, }; - let analysis_runner = AnalysisRunner::new(config.analysis_mode, baseline_data, comparison_data, traces_options); + let analysis_runner = AnalysisRunner::new( + config.analysis_mode, + baseline_data, + comparison_data, + traces_options, + config.focus_metrics, + ); let analysis_result = analysis_runner.run_analysis(); let analysis_duration = analysis_start.elapsed(); diff --git a/bin/correctness/panoramic/src/correctness/runner.rs b/bin/correctness/panoramic/src/correctness/runner.rs index 079842f7e75..2693cb07d15 100644 --- a/bin/correctness/panoramic/src/correctness/runner.rs +++ b/bin/correctness/panoramic/src/correctness/runner.rs @@ -34,12 +34,6 @@ use crate::{ /// This mirrors the path used by the single-millstone setup so the same bind-mount machinery works. const MILLSTONE_CONFIG_INTERNAL: &str = "/etc/millstone/config.toml"; -/// How long to wait after millstone exits before querying datadog-intake for data. -/// -/// This gives the agents time to flush any remaining aggregated metrics after millstone stops -/// sending. The value is slightly longer than a full aggregation bucket width. -const FLUSH_WAIT: Duration = Duration::from_secs(32); - /// Run a single correctness test and return a panoramic `TestResult`. pub async fn run_correctness_test(name: String, config: Config, tctx: TestContext) -> TestResult { match config.runtime { @@ -74,9 +68,17 @@ async fn run_docker_correctness_test(name: String, config: Config, tctx: TestCon otlp_direct_analysis_mode: config.otlp_direct_analysis_mode, additional_span_ignore_fields: config.additional_span_ignore_fields.clone(), }), - AnalysisMode::Events | AnalysisMode::Metrics | AnalysisMode::ServiceChecks => None, + AnalysisMode::AgentTelemetry | AnalysisMode::Events | AnalysisMode::Metrics | AnalysisMode::ServiceChecks => { + None + } }; - let analysis_runner = AnalysisRunner::new(config.analysis_mode, baseline_data, comparison_data, traces_options); + let analysis_runner = AnalysisRunner::new( + config.analysis_mode, + baseline_data, + comparison_data, + traces_options, + config.focus_metrics, + ); let analysis_result = analysis_runner.run_analysis(); let analysis_duration = analysis_start.elapsed(); @@ -172,6 +174,7 @@ pub struct CorrectnessRunner { millstone_config: MillstoneConfig, baseline_target_driver_config: DriverConfig, comparison_target_driver_config: DriverConfig, + flush_wait: Duration, tctx: TestContext, baseline_coordinator: Coordinator, comparison_coordinator: Coordinator, @@ -189,6 +192,7 @@ impl CorrectnessRunner { millstone_config: config.millstone_config(), baseline_target_driver_config: baseline, comparison_target_driver_config: comparison, + flush_wait: Duration::from_secs(config.flush_wait_secs), tctx, baseline_coordinator: Coordinator::new(), comparison_coordinator: Coordinator::new(), @@ -454,16 +458,16 @@ impl CorrectnessRunner { } debug!( "Shared millstone completed. Waiting {:?} for agents to flush...", - FLUSH_WAIT + self.flush_wait ); // Phase 6: Give agents time to flush all remaining aggregated metrics. // - // TODO: This should maybe be configurable, or perhaps we can figure out a better way to - // determine when the next flush has happened... and further, we might not need to care - // about this for particular analysis modes if the functionality we're testing doesn't rely - // on flushing like metrics does. - sleep(FLUSH_WAIT).await; + // The wait duration is controlled by `flush_wait_secs` in the test config. Tests that + // capture periodic internal telemetry (for example `agent_telemetry` analysis mode) need + // a longer wait so that the telemetry component fires at least once after all traffic has + // been fully flushed. + sleep(self.flush_wait).await; // Phase 7: Collect data from both datadog-intake containers, then shut everything down. info!("Collecting data from baseline and comparison intake containers..."); diff --git a/docker/Dockerfile.datadog-agent b/docker/Dockerfile.datadog-agent index 31758d10057..da3cbd0ab56 100644 --- a/docker/Dockerfile.datadog-agent +++ b/docker/Dockerfile.datadog-agent @@ -15,8 +15,12 @@ COPY --from=adp /opt/datadog/agent-data-plane /opt/datadog/agent-data-plane # Add the s6 service files for Agent Data Plane. # # ADP will only run when the `DD_DATA_PLANE_ENABLED` environment variable is set to `true`. +# +# Newer agent images (>= v112974386) already include a built-in `data-plane` s6 service at +# /etc/services.d/data-plane/ that starts the ADP binary. Copying our own s6-services entry +# would create a second competing service and cause a startup crash, so only cont-init.d is +# copied here; the built-in service handles ADP lifecycle. COPY docker/cont-init.d /etc/cont-init.d/ -COPY docker/s6-services /etc/services.d/ COPY --chmod=755 docker/entrypoint.sh /entrypoint.sh diff --git a/test/correctness/agent-point-metrics/conf.d/telemetry.d/conf.yaml b/test/correctness/agent-point-metrics/conf.d/telemetry.d/conf.yaml new file mode 100644 index 00000000000..00d9a2dbba2 --- /dev/null +++ b/test/correctness/agent-point-metrics/conf.d/telemetry.d/conf.yaml @@ -0,0 +1,2 @@ +instances: + - {} diff --git a/test/correctness/agent-point-metrics/config.yaml b/test/correctness/agent-point-metrics/config.yaml new file mode 100644 index 00000000000..412ff3b1713 --- /dev/null +++ b/test/correctness/agent-point-metrics/config.yaml @@ -0,0 +1,44 @@ +type: correctness +runtime: docker +analysis_mode: metrics + +# Restrict analysis to only the customer-facing agent point metrics. All user DSD metrics and +# other `datadog.*` internal metrics are discarded; only these two are compared between the +# baseline and comparison targets. +focus_metrics: + - datadog.agent.point.sent + - datadog.agent.point.dropped + +millstone: + image: saluki-images/correctness-tools:latest + config_path: millstone.yaml + +datadog_intake: + image: saluki-images/correctness-tools:latest + config_path: ../datadog-intake.yaml + +# Allow enough time after millstone exits for: +# 1. The final aggregation flush cycle (~10 s) to deliver user metrics to the intake. +# 2. The agent telemetry check to fire (~15 s collection interval) and submit +# `datadog.agent.point.sent` / `datadog.agent.point.dropped` via internal DSD. +# 3. A second aggregation flush cycle (~10 s) to deliver those DSD metrics to the intake. +# 45 s comfortably covers all three steps. +flush_wait_secs: 45 + +baseline: + image: saluki-images/datadog-agent:testing-release + files: + - datadog.yaml:/etc/datadog-agent/datadog.yaml + - conf.d/telemetry.d/conf.yaml:/etc/datadog-agent/conf.d/telemetry.d/conf.yaml + additional_env_vars: + - DD_API_KEY=correctness-test + +comparison: + image: saluki-images/datadog-agent:testing-release + files: + - datadog.yaml:/etc/datadog-agent/datadog.yaml + - conf.d/telemetry.d/conf.yaml:/etc/datadog-agent/conf.d/telemetry.d/conf.yaml + additional_env_vars: + - DD_API_KEY=correctness-test + - DD_DATA_PLANE_ENABLED=true + - DD_AGGREGATE_CONTEXT_LIMIT=500000 diff --git a/test/correctness/agent-point-metrics/datadog.yaml b/test/correctness/agent-point-metrics/datadog.yaml new file mode 100644 index 00000000000..b9946c4bf00 --- /dev/null +++ b/test/correctness/agent-point-metrics/datadog.yaml @@ -0,0 +1,32 @@ +# Using a fixed hostname is both required to avoid errors, and also will ensure consistent tags +# between DSD/ADP. +hostname: "correctness-testing" + +# Dummy API key. +api_key: dummy-api-key-correctness-testing + +# We have to specifically configure the health port to use. +health_port: 5555 + +# Point ourselves at the datadog-intake service. +dd_url: "http://datadog-intake:2049" + +# Turn off UDP and listen on a UDS socket instead. +use_dogstatsd: true +dogstatsd_port: 0 +dogstatsd_socket: /airlock/metrics.sock + +# Ensure origin detection is disabled since we can't support it with ADP in standalone mode. +dogstatsd_origin_detection: false + +# Keep a single DSD worker so gauge last-write-wins ordering is deterministic. +dogstatsd_workers_count: 1 + +# Disable the APM agent to reduce noise from unrelated internal DSD contexts. +apm_config: + enabled: false + +# Enable ADP's TelemetryProvider so the Go agent can collect ADP's point__sent gauge via the +# Remote Agent Registry and merge it into the customer-facing datadog.agent.point.sent metric. +data_plane: + telemetry_enabled: true diff --git a/test/correctness/agent-point-metrics/millstone.yaml b/test/correctness/agent-point-metrics/millstone.yaml new file mode 100644 index 00000000000..1ad1eac1130 --- /dev/null +++ b/test/correctness/agent-point-metrics/millstone.yaml @@ -0,0 +1,52 @@ +seed: [2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, 89, 97, 101, 103, 107, 109, 113, 127, 131] +target: "unixgram:///$GROUP-airlock/metrics.sock" +aggregation_bucket_width_secs: 10 + +# Moderate volume: enough points to produce a clearly non-zero `datadog.agent.point.sent` +# reading, but small enough for a fast test cycle. +volume: 5000 + +corpus: + size: 5000 + payload: + dogstatsd: + # Fixed context count so that both sides send the same number of unique timeseries, which + # means the aggregator flushes the same number of series points on each side. + contexts: + constant: 500 + name_length: + inclusive: + min: 4 + max: 24 + tag_length: + inclusive: + min: 3 + max: 12 + tags_per_msg: + inclusive: + min: 1 + max: 4 + value: + float_probability: 0.0 + range: + inclusive: + min: 0 + max: 9999 + multivalue_count: + inclusive: + min: 1 + max: 1 + multivalue_pack_probability: 0.0 + kind_weights: + metric: 100 + event: 0 + service_check: 0 + # Counts only: no sketches (distributions/histograms) to keep the point arithmetic clean + # and avoid any sketch-vs-series type mismatches. + metric_weights: + count: 1 + gauge: 0 + timer: 0 + distribution: 0 + set: 0 + histogram: 0 diff --git a/test/correctness/agent-telemetry/config.yaml b/test/correctness/agent-telemetry/config.yaml new file mode 100644 index 00000000000..87f4936145b --- /dev/null +++ b/test/correctness/agent-telemetry/config.yaml @@ -0,0 +1,27 @@ +type: correctness +runtime: docker +analysis_mode: agent_telemetry +# Flush wait must exceed (start_after - agent_startup_time) so data collection happens after the +# COAT snapshot arrives at the intake. With start_after=37 and agents ready ~2s before millstone, +# the snapshot arrives at ~t=35s after millstone exits. 60s gives comfortable headroom. +flush_wait_secs: 60 +millstone: + image: saluki-images/correctness-tools:latest + config_path: millstone.yaml +datadog_intake: + image: saluki-images/correctness-tools:latest + config_path: ../datadog-intake.yaml +baseline: + image: saluki-images/datadog-agent:testing-release + files: + - datadog.yaml:/etc/datadog-agent/datadog.yaml + additional_env_vars: + - DD_API_KEY=correctness-test +comparison: + image: saluki-images/datadog-agent:testing-release + files: + - datadog.yaml:/etc/datadog-agent/datadog.yaml + additional_env_vars: + - DD_API_KEY=correctness-test + - DD_DATA_PLANE_ENABLED=true + - DD_AGGREGATE_CONTEXT_LIMIT=500000 diff --git a/test/correctness/agent-telemetry/datadog.yaml b/test/correctness/agent-telemetry/datadog.yaml new file mode 100644 index 00000000000..aa6caee4570 --- /dev/null +++ b/test/correctness/agent-telemetry/datadog.yaml @@ -0,0 +1,81 @@ +# Using a fixed hostname is both required to avoid errors, and also will ensure consistent tags between DSD/ADP. +hostname: "correctness-testing" + +# Dummy API key. +api_key: dummy-api-key-correctness-testing + +# We have to specifically configure the health port to use. +health_port: 5555 + +# Point ourselves at the datadog-intake service. +dd_url: "http://datadog-intake:2049" + +# Turn off UDP and listen on a UDS socket instead. +# NOTE(temporary): newer agent images require use_dogstatsd: true to be explicit when +# dogstatsd_port: 0 is set, otherwise the agent forces data_plane.dogstatsd.enabled=false. +use_dogstatsd: true +dogstatsd_port: 0 +dogstatsd_socket: /airlock/metrics.sock + +# Ensure origin detection is disabled since we can't support it with ADP in standalone mode. +dogstatsd_origin_detection: false + +# Gauges can be processed out-of-order when multiple workers are used, while ADP does not use multiple workers, so ADP +# always ends up with the correct (last seen) value, while DSD might return the last seen value... or the value seen +# four updates ago, etc etc. +dogstatsd_workers_count: 1 +dogstatsd_context_expiry_seconds: 120 + +# Disable the trace agent to eliminate its internal DogStatsD telemetry (datadog.trace_agent.*, +# datadog.dogstatsd.client.*) from the series flush buckets. This reduces per-bucket context +# count to just millstone load + a handful of agent internals, making point.sent cleaner. +apm_config: + enabled: false + +# Allow the agent to connect to our self-signed HTTPS listener without certificate validation. +# This is required because the agenttelemetry sender always uses HTTPS regardless of the configured URL scheme. +skip_ssl_validation: true + +# Route agent telemetry to our local intake's HTTPS listener (port 2050) instead of the default +# instrumentation-telemetry-intake endpoint. The intake's TLS proxy decrypts the connection and +# forwards to the plain HTTP intake on port 2049. +# +# The profile below is scoped to the forwarder metrics most relevant for ADP correctness testing. +# Both the baseline (plain agent) and comparison (agent + ADP) should emit these, so any gap +# surfaces a real difference in how ADP integrates with the agent's telemetry component. +# Enable ADP's remote agent telemetry provider so the agent can gather ADP's Prometheus +# metrics (including point__sent) via the Remote Agent Registry. Without this, ADP only +# advertises StatusProvider and FlareProvider, and PR 50750's telemetry merge can't work. +data_plane: + telemetry_enabled: true + +agent_telemetry: + enabled: true + logs_dd_url: "https://datadog-intake:2050" + profiles: + - name: adp-forwarder + metric: + metrics: + - name: point.sent + - name: point.dropped + - name: transactions.input_count + - name: transactions.dropped + # transactions.success and transactions.errors are excluded: baseline sends user+internal + # in one Go payload per flush cycle while comparison splits them across separate ADP and + # Go payloads. Transaction counts and per-error-type counters are structurally different + # by design and cannot be compared between the two sides. + - name: transactions.http_errors + aggregate_tags: + - code + - endpoint + schedule: + # Fire 37 seconds after agent start. Both Go and ADP flush every ~15s but are offset by + # ~4s from each other (ADP starts slightly after Go). The first user DSD flush on both + # sides lands at ~t=30s; ADP's second flush lands at ~t=44s and Go's at ~t=45s. + # t=37 sits safely after both first flushes and before either second flush, so the snapshot + # always captures exactly one user DSD flush cycle on each side, making point.sent + # directly comparable between baseline and comparison. + start_after: 37 + # Exactly one snapshot per test run. + iterations: 1 + period: 900 diff --git a/test/correctness/agent-telemetry/millstone.yaml b/test/correctness/agent-telemetry/millstone.yaml new file mode 100644 index 00000000000..b38df2c4692 --- /dev/null +++ b/test/correctness/agent-telemetry/millstone.yaml @@ -0,0 +1,44 @@ +seed: [2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, 89, 97, 101, 103, 107, 109, 113, 127, 131] +target: "unixgram:///$GROUP-airlock/metrics.sock" +aggregation_bucket_width_secs: 10 +volume: 500 +corpus: + size: 500 + payload: + dogstatsd: + contexts: + constant: 100 + name_length: + inclusive: + min: 1 + max: 32 + tag_length: + inclusive: + min: 3 + max: 16 + tags_per_msg: + inclusive: + min: 2 + max: 4 + value: + float_probability: 0.5 + range: + inclusive: + min: 0 + max: 9999 + multivalue_count: + inclusive: + min: 1 + max: 4 + multivalue_pack_probability: 0.0 + kind_weights: + metric: 100 + event: 0 + service_check: 0 + metric_weights: + count: 1 + gauge: 0 + timer: 0 + distribution: 0 + set: 0 + histogram: 0