From a6287d29856ccfdaf4c3d4c42bc60381bbca14ef Mon Sep 17 00:00:00 2001 From: "Brian L. Troutwine" Date: Fri, 5 Jun 2026 15:06:05 +0000 Subject: [PATCH] Antithesis intake port --- Cargo.lock | 19 + Cargo.toml | 1 + test/antithesis/deploy/Dockerfile | 17 +- test/antithesis/deploy/docker-compose.yaml | 2 +- test/antithesis/intake/Cargo.toml | 47 +++ test/antithesis/intake/src/handlers.rs | 60 +++ test/antithesis/intake/src/http.rs | 116 ++++++ test/antithesis/intake/src/intake.rs | 279 +++++++++++++ test/antithesis/intake/src/main.rs | 124 ++++++ test/antithesis/intake/src/predicates.rs | 16 + .../intake/src/predicates/constants.rs | 25 ++ .../intake/src/predicates/metric_point.rs | 82 ++++ .../intake/src/predicates/payload.rs | 170 ++++++++ .../intake/src/predicates/series.rs | 373 ++++++++++++++++++ test/antithesis/intake/src/state.rs | 79 ++++ .../scratchbook/w-property-intake.md | 83 ++++ 16 files changed, 1485 insertions(+), 8 deletions(-) create mode 100644 test/antithesis/intake/Cargo.toml create mode 100644 test/antithesis/intake/src/handlers.rs create mode 100644 test/antithesis/intake/src/http.rs create mode 100644 test/antithesis/intake/src/intake.rs create mode 100644 test/antithesis/intake/src/main.rs create mode 100644 test/antithesis/intake/src/predicates.rs create mode 100644 test/antithesis/intake/src/predicates/constants.rs create mode 100644 test/antithesis/intake/src/predicates/metric_point.rs create mode 100644 test/antithesis/intake/src/predicates/payload.rs create mode 100644 test/antithesis/intake/src/predicates/series.rs create mode 100644 test/antithesis/intake/src/state.rs create mode 100644 test/antithesis/scratchbook/w-property-intake.md diff --git a/Cargo.lock b/Cargo.lock index e71935a4a7..fc6d7fdd19 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -135,6 +135,25 @@ dependencies = [ "cc", ] +[[package]] +name = "antithesis-intake" +version = "0.1.0" +dependencies = [ + "antithesis_sdk", + "axum", + "datadog-protos", + "protobuf", + "saluki-error", + "serde", + "serde_json", + "stele", + "tokio", + "tower", + "tower-http", + "tracing", + "tracing-subscriber", +] + [[package]] name = "antithesis_sdk" version = "0.2.8" diff --git a/Cargo.toml b/Cargo.toml index 95585fed57..c1c0580de7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,6 +30,7 @@ members = [ "lib/saluki-tls", "lib/stringtheory", "test/antithesis/harness", + "test/antithesis/intake", ] resolver = "2" diff --git a/test/antithesis/deploy/Dockerfile b/test/antithesis/deploy/Dockerfile index b0efa9304e..3ed7e0ee8d 100644 --- a/test/antithesis/deploy/Dockerfile +++ b/test/antithesis/deploy/Dockerfile @@ -4,7 +4,7 @@ # # Build context is the repository root. Three named targets: # - adp : agent-data-plane built WITH Antithesis coverage instrumentation + SDK (the SUT) -# - intake : datadog-intake mock Datadog intake (dependency) +# - intake : antithesis-intake mock Datadog intake + W-property assertions (dependency) # - workload : DogStatsD driver + test templates + setup-complete (the client) # # ADP is built native x86_64-unknown-linux-gnu (glibc), so no musl cross-compile headers are needed. @@ -67,7 +67,7 @@ RUN --mount=type=cache,target=/adp/target,id=antithesis-adp-target \ echo "Instrumentation symbols present." # --------------------------------------------------------------------------- -# Build the correctness tools (datadog-intake) and the test-command binaries, uninstrumented. +# Build the antithesis-intake mock and the test-command binaries, uninstrumented. # These are supporting harness components, not the SUT, so they need no coverage instrumentation. # --------------------------------------------------------------------------- FROM build-base AS tools-builder @@ -77,11 +77,11 @@ RUN --mount=type=cache,target=/tools/target,id=antithesis-tools-target \ --mount=type=cache,target=/root/.cargo/registry,id=cargo-registry \ --mount=type=cache,target=/root/.cargo/git,id=cargo-git \ cargo build --release \ - --bin datadog-intake \ + --bin antithesis-intake \ --bin parallel_driver_send_dogstatsd --bin parallel_driver_sketchburst \ --bin finally_verify_delivery --bin eventually_adp_alive \ --bin first_sample_config && \ - cp /tools/target/release/datadog-intake /usr/local/bin/datadog-intake && \ + cp /tools/target/release/antithesis-intake /usr/local/bin/antithesis-intake && \ cp /tools/target/release/parallel_driver_send_dogstatsd /usr/local/bin/parallel_driver_send_dogstatsd && \ cp /tools/target/release/parallel_driver_sketchburst /usr/local/bin/parallel_driver_sketchburst && \ cp /tools/target/release/finally_verify_delivery /usr/local/bin/finally_verify_delivery && \ @@ -118,12 +118,15 @@ ENTRYPOINT ["/entrypoint.sh"] CMD ["run"] # --------------------------------------------------------------------------- -# Runtime: datadog-intake (mock Datadog intake dependency). +# Runtime: antithesis-intake (mock Datadog intake + W-property assertions). # --------------------------------------------------------------------------- FROM ${APP_IMAGE} AS intake ENV NO_COLOR=1 -COPY --from=tools-builder /usr/local/bin/datadog-intake /usr/local/bin/datadog-intake -ENTRYPOINT ["/usr/local/bin/datadog-intake"] +# W17 resolves each series' host resource against this hostname. Keep it in sync +# with `hostname:` in adp/datadog.yaml and the DD_HOSTNAME default in the harness. +ENV DD_HOSTNAME=antithesis-adp +COPY --from=tools-builder /usr/local/bin/antithesis-intake /usr/local/bin/antithesis-intake +ENTRYPOINT ["/usr/local/bin/antithesis-intake"] # --------------------------------------------------------------------------- # Runtime: workload client (DogStatsD driver + test templates). diff --git a/test/antithesis/deploy/docker-compose.yaml b/test/antithesis/deploy/docker-compose.yaml index 4a54c2b95b..c64caaa23e 100644 --- a/test/antithesis/deploy/docker-compose.yaml +++ b/test/antithesis/deploy/docker-compose.yaml @@ -14,7 +14,7 @@ services: environment: NO_COLOR: "1" healthcheck: - # datadog-intake serves HTTP on :2049. /dev/tcp avoids needing curl in the image. + # antithesis-intake serves HTTP on :2049. /dev/tcp avoids needing curl in the image. test: ["CMD-SHELL", "bash -c 'exec 3<>/dev/tcp/localhost/2049'"] interval: 2s timeout: 2s diff --git a/test/antithesis/intake/Cargo.toml b/test/antithesis/intake/Cargo.toml new file mode 100644 index 0000000000..44accdfbbd --- /dev/null +++ b/test/antithesis/intake/Cargo.toml @@ -0,0 +1,47 @@ +[package] +name = "antithesis-intake" +version = "0.1.0" +edition = { workspace = true } +license = { workspace = true } +repository = { workspace = true } +publish = false + +[[bin]] +name = "antithesis-intake" +path = "src/main.rs" + +[lints] +workspace = true + +[dependencies] +antithesis_sdk = { workspace = true, features = ["full"] } +axum = { workspace = true, features = ["http1", "json", "tokio", "tracing"] } +datadog-protos = { workspace = true } +protobuf = { workspace = true } +saluki-error = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +stele = { workspace = true } +tokio = { workspace = true, features = [ + "macros", + "net", + "rt", + "rt-multi-thread", + "signal", +] } +tower = { workspace = true } +tower-http = { workspace = true, features = [ + "decompression-deflate", + "decompression-gzip", + "decompression-zstd", +] } +tracing = { workspace = true } +tracing-subscriber = { workspace = true, features = [ + "ansi", + "env-filter", + "fmt", + "local-time", + "registry", + "std", + "tracing-log", +] } diff --git a/test/antithesis/intake/src/handlers.rs b/test/antithesis/intake/src/handlers.rs new file mode 100644 index 0000000000..85d8a76ae4 --- /dev/null +++ b/test/antithesis/intake/src/handlers.rs @@ -0,0 +1,60 @@ +//! Non-asserting handlers: the metrics dump, the v1-series and sketch merge +//! endpoints, and the permissive fallback. +//! +//! These do not carry W-property assertions. The v1-series and sketch +//! endpoints exist so the Agent's other metric submissions still reach +//! `/metrics/dump`, which `finally_verify_delivery` polls. The fallback returns +//! `200 OK` so the Agent's connectivity probes and any unmodelled endpoint +//! succeed, matching the prior `datadog-intake` behaviour. + +use axum::{body::Bytes, extract::State, http::StatusCode, http::Uri, Json}; +use datadog_protos::metrics::SketchPayload; +use protobuf::Message as _; +use stele::Metric; +use tracing::{debug, error}; + +use crate::state::AppState; + +/// `GET /metrics/dump`: returns every ingested metric in simplified form. +pub async fn handle_metrics_dump(State(state): State) -> Json> { + Json(state.metrics.dump_metrics()) +} + +/// `POST /api/v1/series`: merges a legacy JSON series payload into the dump +/// store. A `{}` body is a connectivity probe and is accepted without merging. +pub async fn handle_series_v1(State(state): State, body: Bytes) -> StatusCode { + if body == b"{}"[..] { + return StatusCode::ACCEPTED; + } + match state.metrics.merge_series_v1_payload(&body) { + Ok(()) => StatusCode::ACCEPTED, + Err(e) => { + error!(error = %e, "Failed to merge series v1 payload."); + StatusCode::BAD_REQUEST + } + } +} + +/// `POST /api/beta/sketches`: merges a sketch payload into the dump store. +pub async fn handle_sketch(State(state): State, body: Bytes) -> StatusCode { + let payload = match SketchPayload::parse_from_bytes(&body) { + Ok(payload) => payload, + Err(e) => { + error!(error = %e, "Failed to parse sketch payload."); + return StatusCode::BAD_REQUEST; + } + }; + match state.metrics.merge_sketch_payload(payload) { + Ok(()) => StatusCode::ACCEPTED, + Err(e) => { + error!(error = %e, "Failed to merge sketch payload."); + StatusCode::BAD_REQUEST + } + } +} + +/// Permissive fallback for unmodelled endpoints and connectivity probes. +pub async fn fallback(uri: Uri) -> StatusCode { + debug!("Got unhandled request: path={}", uri); + StatusCode::OK +} diff --git a/test/antithesis/intake/src/http.rs b/test/antithesis/intake/src/http.rs new file mode 100644 index 0000000000..4c649f045d --- /dev/null +++ b/test/antithesis/intake/src/http.rs @@ -0,0 +1,116 @@ +//! Axum HTTP surface for the intake. +//! +//! The `/api/v2/series` route stacks a measurement middleware ahead of the +//! decompression layer so the W5 (compressed size), W6 (uncompressed size), +//! and W22 (content-length) checks can read both the on-the-wire body length +//! and the decompressed body length. The middleware records the compressed +//! length, the `Content-Encoding`, and the declared `Content-Length` before +//! `RequestDecompressionLayer` consumes the encoding and strips the header, +//! then hands the request along unchanged with those measurements riding as +//! request extensions. + +use axum::{ + body::Body, + extract::{DefaultBodyLimit, Request}, + http::{HeaderMap, StatusCode}, + middleware::{from_fn, Next}, + response::{IntoResponse, Response}, + routing::{get, post}, + Router, +}; +use tower::ServiceBuilder; +use tower_http::decompression::RequestDecompressionLayer; + +use crate::{handlers, intake, state::AppState}; + +/// Memory backstop on the body buffered by the measurement middleware. Sits +/// well above any spec-admitted body so it never preempts the W5 strict-below +/// check. Decompressed sketch payloads can be tens of MiB. +const MAX_BUFFERED_BODY_BYTES: usize = 64 * 1024 * 1024; + +/// Compressed request-body length recorded before decompression and attached +/// as a request extension for the W5/W22 checks. +#[derive(Clone, Copy, Debug)] +pub struct CompressedLen(pub u64); + +/// Whether the request entered the decompression path. Recorded from the +/// `Content-Encoding` header before the decompression layer consumes it, +/// attached as a request extension so the handler knows whether the W6 +/// uncompressed bound applies. +#[derive(Clone, Copy, Debug)] +pub struct DecompressionApplied(pub bool); + +/// The `Content-Length` value declared on the wire, read before the +/// decompression layer strips the header. Attached as a request extension so +/// the W22 check compares the declared length against the measured wire body +/// length rather than against the post-decompression headers, where the header +/// is absent. +#[derive(Clone, Copy, Debug)] +pub struct DeclaredContentLength(pub Option); + +/// Build the intake router. `/api/v2/series` carries the W-property assertions; +/// the other metric routes merge into the same store so `/metrics/dump` +/// reflects everything the Agent delivered. The fallback returns `200 OK` so +/// the Agent's connectivity probes and any unmodelled endpoint succeed. +pub fn build_router(state: AppState) -> Router { + // W1-W6 and W22 need the compressed body and raw headers, so the series + // route runs `measure_compressed_size` outermost, then decompresses, then + // lifts the body limit (the middleware's own cap is the backstop). + let series = post(intake::handle_series).layer( + ServiceBuilder::new() + .layer(from_fn(measure_compressed_size)) + .layer(RequestDecompressionLayer::new().pass_through_unaccepted(true)) + .layer(DefaultBodyLimit::disable()), + ); + // v1 series and sketches do not carry W assertions but still arrive + // compressed, so they decompress before merging into the dump store. + let decompress = || { + ServiceBuilder::new() + .layer(RequestDecompressionLayer::new().pass_through_unaccepted(true)) + .layer(DefaultBodyLimit::max(MAX_BUFFERED_BODY_BYTES)) + }; + + Router::new() + .route("/ready", get(|| async { StatusCode::OK })) + .route("/metrics/dump", get(handlers::handle_metrics_dump)) + .route("/api/v2/series", series) + .route("/api/v1/series", post(handlers::handle_series_v1).layer(decompress())) + .route("/api/beta/sketches", post(handlers::handle_sketch).layer(decompress())) + .fallback(handlers::fallback) + .with_state(state) +} + +fn declared_content_length(headers: &HeaderMap) -> Option { + headers + .get("content-length") + .and_then(|v| v.to_str().ok()) + .and_then(|s| s.parse::().ok()) +} + +fn decompression_applied(headers: &HeaderMap) -> bool { + let Some(enc) = headers.get("content-encoding").and_then(|v| v.to_str().ok()) else { + return false; + }; + let enc = enc.trim(); + ["deflate", "gzip", "zstd"] + .iter() + .any(|known| enc.eq_ignore_ascii_case(known)) +} + +/// Buffer the body, count its bytes, observe the `Content-Encoding` and +/// `Content-Length` headers before the decompression layer consumes them, then +/// hand the request along unchanged with the measurements riding as extensions. +async fn measure_compressed_size(req: Request, next: Next) -> Response { + let (parts, body) = req.into_parts(); + let Ok(bytes) = axum::body::to_bytes(body, MAX_BUFFERED_BODY_BYTES).await else { + return StatusCode::PAYLOAD_TOO_LARGE.into_response(); + }; + let len = bytes.len() as u64; + let applied = decompression_applied(&parts.headers); + let declared = declared_content_length(&parts.headers); + let mut req = Request::from_parts(parts, Body::from(bytes)); + req.extensions_mut().insert(CompressedLen(len)); + req.extensions_mut().insert(DecompressionApplied(applied)); + req.extensions_mut().insert(DeclaredContentLength(declared)); + next.run(req).await +} diff --git a/test/antithesis/intake/src/intake.rs b/test/antithesis/intake/src/intake.rs new file mode 100644 index 0000000000..0d7d7628bd --- /dev/null +++ b/test/antithesis/intake/src/intake.rs @@ -0,0 +1,279 @@ +//! `/api/v2/series` intake handler. +//! +//! Decodes the Agent's `MetricPayload` and fires an Antithesis SDK +//! `assert_always!` for every W property the handler can evaluate from the +//! request envelope and decoded payload. Assertion firing is independent of the +//! response code: the handler always returns `202 Accepted` on a decodable +//! body, mirroring real intake, which returns `202` even when individual series +//! carry validation errors. +//! +//! Ported from the `invariant-jig` checker. The property numbering and +//! semantics live in that project's `README.md` §Properties.Payloads. + +use std::time::{SystemTime, UNIX_EPOCH}; + +use antithesis_sdk::prelude::*; +use axum::{ + body::to_bytes, + extract::{Request, State}, + http::{HeaderMap, StatusCode}, +}; +use datadog_protos::metrics::{metric_payload::MetricSeries, MetricPayload}; +use protobuf::Message as _; +use serde_json::json; +use tracing::{debug, error}; + +use crate::{ + http::{CompressedLen, DeclaredContentLength, DecompressionApplied}, + predicates::{metric_point, payload, series}, + state::AppState, +}; + +/// `serializer_max_series_points_per_payload` default (W8/W15). The Agent caps +/// both the per-payload total and the per-series count at this value. +const MAX_POINTS_PER_PAYLOAD: usize = 10_000; + +/// `MaxTags(orgID)` default (W13). +const MAX_TAGS_PER_SERIES: usize = 100; + +/// `MaxResources(orgID)` default (W18). +const MAX_RESOURCES_PER_SERIES: usize = 500; + +/// Future-bound window used by W21, in seconds. +const MAX_SECONDS_IN_FUTURE: i64 = 600; + +/// W5 compressed-body cap. The Agent caps at `<= 512000` bytes and intake +/// rejects at `>= 512000`, so the effective cross-cut bound is strict-below. +const W5_COMPRESSED_CAP_BYTES: u64 = 512_000; + +/// W6 uncompressed-body cap. Intake admits up to 5 MiB inclusive. +const W6_UNCOMPRESSED_CAP_BYTES: u64 = 5 * 1024 * 1024; + +/// Handler for `POST /api/v2/series`. Fires one SDK assertion per W property +/// per request (envelope-level) or per series/point (series/point-level). +/// Antithesis aggregates per assertion name. +pub async fn handle_series(State(state): State, request: Request) -> StatusCode { + // W21 bounds a point's timestamp against the intake wall clock at request + // receipt. Capture it before draining and decoding the body: those take + // noticeable time and would otherwise move the bound later than the request + // actually arrived. + let now_secs = unix_now_secs(); + let (parts, body) = request.into_parts(); + let compressed_len = parts.extensions.get::().map_or(0, |c| c.0); + let decompression_applied = parts.extensions.get::().is_some_and(|d| d.0); + let declared_content_length = parts.extensions.get::().and_then(|d| d.0); + + let body_bytes = match to_bytes(body, usize::MAX).await { + Ok(bytes) => bytes, + Err(e) => { + error!(error = %e, "Failed to drain /api/v2/series body."); + return StatusCode::BAD_REQUEST; + } + }; + let headers = parts.headers; + + // The Agent sends a `{}` body to probe endpoint connectivity. That is not a + // metric payload, so skip the whole W pipeline: W7 would otherwise read the + // probe as a malformed `MetricPayload` and fire a spurious assertion on + // every probe. Real intake likewise short-circuits the diagnostic body. + if body_bytes.as_ref() == b"{}" { + debug!("Received diagnostic probe for /api/v2/series, ignoring."); + return StatusCode::ACCEPTED; + } + + evaluate_envelope(&headers); + evaluate_byte_sizes( + declared_content_length, + compressed_len, + decompression_applied, + body_bytes.len() as u64, + ); + + let decode_result = MetricPayload::parse_from_bytes(&body_bytes); + assert_always!( + decode_result.is_ok(), + "W7.decode_success", + &json!({ + "body_len": body_bytes.len(), + "decompression_applied": decompression_applied, + }) + ); + let metric_payload = match decode_result { + Ok(payload) => payload, + Err(e) => { + error!(error = %e, "Failed to parse /api/v2/series MetricPayload."); + return StatusCode::BAD_REQUEST; + } + }; + + if !metric_payload.series.is_empty() { + // Lets triage distinguish an Agent that came up and flushed from one + // that never did. A run where this never fires stalled before any + // /api/v2/series reached the intake. + assert_reachable!( + "intake.first_series_observed", + &json!({ "series": metric_payload.series.len() }) + ); + } + + evaluate_payload(&metric_payload); + for ms in &metric_payload.series { + evaluate_series(ms, now_secs, &state.expected_hostname); + } + + debug!( + bytes = body_bytes.len(), + series = metric_payload.series.len(), + "received /api/v2/series" + ); + + // Feed the dump store so /metrics/dump reflects delivery. Best-effort: a + // merge failure (e.g. an UNSPECIFIED type, which W12 already flagged) must + // not change the response the Agent observes. + if let Err(e) = state.metrics.merge_series_v2_payload(metric_payload) { + debug!(error = %e, "Could not merge series v2 payload into dump store."); + } + + StatusCode::ACCEPTED +} + +fn evaluate_byte_sizes( + declared_content_length: Option, compressed_len: u64, decompression_applied: bool, uncompressed_len: u64, +) { + assert_always!( + payload::w5_compressed_size(compressed_len, W5_COMPRESSED_CAP_BYTES), + "W5.compressed_size", + &json!({ "compressed_bytes": compressed_len, "cap_bytes": W5_COMPRESSED_CAP_BYTES }) + ); + if decompression_applied { + assert_always!( + payload::w6_uncompressed_size(uncompressed_len, W6_UNCOMPRESSED_CAP_BYTES), + "W6.uncompressed_size", + &json!({ "uncompressed_bytes": uncompressed_len, "cap_bytes": W6_UNCOMPRESSED_CAP_BYTES }) + ); + } + assert_always!( + payload::w22_content_length(declared_content_length, compressed_len), + "W22.content_length", + &json!({ "compressed_bytes": compressed_len, "declared_content_length": declared_content_length }) + ); +} + +fn evaluate_envelope(headers: &HeaderMap) { + assert_always!( + payload::w1_content_type(headers), + "W1.content_type", + &json!({ "header": "Content-Type" }) + ); + assert_always!( + payload::w2_content_encoding(headers), + "W2.content_encoding", + &json!({ "header": "Content-Encoding" }) + ); + assert_always!( + payload::w3_api_key(headers), + "W3.api_key_present", + &json!({ "header": "DD-Api-Key" }) + ); + // W4 (User-Agent prefix) is intentionally omitted. ADP sends + // `agent-data-plane/...`, not the Go Agent's `datadog-agent/`, and intake + // does not validate User-Agent. See predicates::payload for the rationale. +} + +fn evaluate_payload(metric_payload: &MetricPayload) { + let w8_violation = payload::w8_point_count(metric_payload, MAX_POINTS_PER_PAYLOAD); + assert_always!( + w8_violation.is_none(), + "W8.payload_point_count", + &json!({ "max_points": MAX_POINTS_PER_PAYLOAD, "observed": w8_violation }) + ); +} + +fn evaluate_series(ms: &MetricSeries, now_secs: i64, expected_hostname: &str) { + assert_always!( + series::w9_metric_non_empty(ms), + "W9.metric_non_empty", + &json!({ "metric": ms.metric() }) + ); + let w10 = series::w10_metric_name_too_long(ms); + assert_always!( + w10.is_none(), + "W10.metric_name_length", + &json!({ "metric": ms.metric(), "observed_len": w10 }) + ); + assert_always!( + !series::w11_metric_name_no_alpha(ms), + "W11.metric_name_alphabetic", + &json!({ "metric": ms.metric() }) + ); + let w12 = series::w12_type_out_of_domain(ms); + assert_always!( + w12.is_none(), + "W12.type_in_domain", + &json!({ "metric": ms.metric(), "out_of_domain_type": w12 }) + ); + let w13 = series::w13_tag_count(ms, MAX_TAGS_PER_SERIES); + assert_always!( + w13.is_none(), + "W13.tag_count", + &json!({ "metric": ms.metric(), "max_tags": MAX_TAGS_PER_SERIES, "observed": w13 }) + ); + let w14 = series::w14_reserved_tag_prefix(ms); + assert_always!( + w14.is_none(), + "W14.reserved_tag_prefix", + &json!({ "metric": ms.metric(), "offending": w14 }) + ); + let w15 = series::w15_point_count(ms, MAX_POINTS_PER_PAYLOAD); + assert_always!( + w15.is_none(), + "W15.series_point_count", + &json!({ "metric": ms.metric(), "observed": w15 }) + ); + let w17 = series::w17_host_unresolved(ms, expected_hostname); + let w17_observed = w17.map(|r| { + let (resolution, resolved) = r.as_detail(); + json!({ "resolution": resolution, "resolved": resolved }) + }); + assert_always!( + w17.is_none(), + "W17.host_resource_resolved", + &json!({ "metric": ms.metric(), "expected": expected_hostname, "observed": w17_observed }) + ); + let w18 = series::w18_resource_count(ms, MAX_RESOURCES_PER_SERIES); + assert_always!( + w18.is_none(), + "W18.resource_count", + &json!({ "metric": ms.metric(), "max_resources": MAX_RESOURCES_PER_SERIES, "observed": w18 }) + ); + let w19 = series::w19_host_name_too_long(ms); + assert_always!( + w19.is_none(), + "W19.host_name_length", + &json!({ "metric": ms.metric(), "observed": w19 }) + ); + let w16 = series::w16_origin_out_of_domain(ms); + assert_always!( + w16.is_none(), + "W16.origin_in_domain", + &json!({ "metric": ms.metric(), "out_of_domain": w16.map(|(field, value)| (field.as_str(), value)) }) + ); + let w20 = metric_point::w20_nan_violations(ms); + assert_always!( + w20.is_none(), + "W20.value_not_nan", + &json!({ "metric": ms.metric(), "observed": w20.map(|(idx, count)| (idx, count.get())) }) + ); + let w21 = metric_point::w21_future_violations(ms, now_secs, MAX_SECONDS_IN_FUTURE); + assert_always!( + w21.is_none(), + "W21.timestamp_future_bound", + &json!({ "metric": ms.metric(), "observed": w21.map(|(idx, count)| (idx, count.get())) }) + ); +} + +fn unix_now_secs() -> i64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map_or(0, |d| i64::try_from(d.as_secs()).unwrap_or(i64::MAX)) +} diff --git a/test/antithesis/intake/src/main.rs b/test/antithesis/intake/src/main.rs new file mode 100644 index 0000000000..2c1ebcf4a7 --- /dev/null +++ b/test/antithesis/intake/src/main.rs @@ -0,0 +1,124 @@ +//! A mock Datadog intake for the Antithesis harness. +//! +//! Simulates the real `/api/v2/series` intake and, on every payload the Agent +//! Data Plane delivers, fires Antithesis SDK assertions for the W properties +//! (W1-W22) ported from the `invariant-jig` checker. These observe whether the +//! Agent honoured its wire contract. Ingested metrics are also recorded for +//! `/metrics/dump`, which the `finally_verify_delivery` test command polls to +//! confirm end-to-end delivery. + +#![deny(warnings)] +#![deny(missing_docs)] + +use std::sync::Arc; + +use antithesis_sdk::prelude::*; +use saluki_error::{ErrorContext as _, GenericError}; +use tokio::{net::TcpListener, sync::mpsc}; +use tracing::{error, info}; +use tracing_subscriber::{filter::LevelFilter, EnvFilter}; + +mod handlers; +mod http; +mod intake; +mod predicates; +mod state; + +use crate::state::{AppState, MetricsState}; + +const HTTP_LISTEN_ADDR: &str = "0.0.0.0:2049"; + +/// The Agent hostname W17 resolves each series against, when `DD_HOSTNAME` is +/// unset. Matches the default the harness writes into ADP's `datadog.yaml`. +const DEFAULT_HOSTNAME: &str = "antithesis-adp"; + +#[tokio::main] +async fn main() { + tracing_subscriber::fmt() + .compact() + .with_env_filter( + EnvFilter::builder() + .with_default_directive(LevelFilter::INFO.into()) + .from_env_lossy(), + ) + .with_ansi(true) + .with_target(true) + .init(); + + antithesis_init(); + + match run().await { + Ok(()) => info!("antithesis-intake stopped."), + Err(e) => { + error!("{:?}", e); + std::process::exit(1); + } + } +} + +async fn run() -> Result<(), GenericError> { + info!("antithesis-intake starting..."); + + let expected_hostname: Arc = std::env::var("DD_HOSTNAME") + .unwrap_or_else(|_| DEFAULT_HOSTNAME.to_owned()) + .into(); + info!(hostname = %expected_hostname, "W17 resolves each series host against this hostname."); + + let (shutdown_tx, mut shutdown_rx) = mpsc::channel(1); + spawn_signal_handlers(shutdown_tx).error_context("Failed to configure signal handlers.")?; + + let state = AppState { + metrics: MetricsState::new(), + expected_hostname, + }; + + let listener = TcpListener::bind(HTTP_LISTEN_ADDR) + .await + .error_context("Failed to bind HTTP intake listener.")?; + info!("antithesis-intake started: listening on {HTTP_LISTEN_ADDR}."); + + axum::serve(listener, http::build_router(state)) + .with_graceful_shutdown(async move { shutdown_rx.recv().await.unwrap_or(()) }) + .await + .map_err(Into::into) +} + +#[cfg(unix)] +fn spawn_signal_handlers(shutdown_tx: mpsc::Sender<()>) -> Result<(), GenericError> { + use tokio::{ + select, + signal::unix::{signal, SignalKind}, + }; + + let mut sigint_handler = signal(SignalKind::interrupt()).error_context("Failed to set up SIGINT handler.")?; + let mut sigterm_handler = signal(SignalKind::terminate()).error_context("Failed to set up SIGTERM handler.")?; + + tokio::spawn(async move { + select! { + _ = sigint_handler.recv() => info!("Received SIGINT, shutting down..."), + _ = sigterm_handler.recv() => info!("Received SIGTERM, shutting down..."), + } + + if let Err(e) = shutdown_tx.send(()).await { + error!("Failed to send shutdown signal: {:?}", e); + } + }); + + Ok(()) +} + +#[cfg(windows)] +fn spawn_signal_handlers(shutdown_tx: mpsc::Sender<()>) -> Result<(), GenericError> { + tokio::spawn(async move { + match tokio::signal::ctrl_c().await { + Ok(()) => info!("Received Ctrl-C, shutting down..."), + Err(e) => error!(error = %e, "Failed to receive Ctrl-C signal."), + } + + if let Err(e) = shutdown_tx.send(()).await { + error!("Failed to send shutdown signal: {:?}", e); + } + }); + + Ok(()) +} diff --git a/test/antithesis/intake/src/predicates.rs b/test/antithesis/intake/src/predicates.rs new file mode 100644 index 0000000000..85ec70bb98 --- /dev/null +++ b/test/antithesis/intake/src/predicates.rs @@ -0,0 +1,16 @@ +//! W-property predicates evaluated by the intake handler. +//! +//! Each W function answers "does this property hold for the given input?" -- +//! typically returning `bool` or `Option` where `Some` means a violation +//! was observed and carries the offending value. The intake handler turns each +//! return into an `assert_always!` call against the Antithesis SDK. +//! +//! Ported from the `invariant-jig` checker. The property definitions live in +//! that project's `README.md` §Properties.Payloads. The numbering (W1-W22) and +//! semantics are preserved verbatim; only the proto access layer changes, from +//! `prost` to the `rust-protobuf` types in `datadog_protos`. + +pub mod constants; +pub mod metric_point; +pub mod payload; +pub mod series; diff --git a/test/antithesis/intake/src/predicates/constants.rs b/test/antithesis/intake/src/predicates/constants.rs new file mode 100644 index 0000000000..6ecbebdd1c --- /dev/null +++ b/test/antithesis/intake/src/predicates/constants.rs @@ -0,0 +1,25 @@ +//! Spec-derived constants the W predicates depend on. +//! +//! Sourced from the `invariant-jig` `README.md` §Properties.Payloads and from +//! the Agent's published intake limits. + +/// Host name byte-length cap (W19). +pub const MAX_HOST_NAME_BYTES: usize = 255; + +/// Metric name byte-length cap (W10). +pub const MAX_METRIC_NAME_BYTES: usize = 350; + +/// Origin product ordinal upper bound (W16). +pub const ORIGIN_PRODUCT_MAX: u32 = 45; + +/// Origin category ordinal upper bound (W16). +pub const ORIGIN_CATEGORY_MAX: u32 = 87; + +/// Origin category reserved ordinals (W16). +pub const ORIGIN_CATEGORY_RESERVED: [u32; 2] = [1, 13]; + +/// Origin service ordinal upper bound (W16). +pub const ORIGIN_SERVICE_MAX: u32 = 519; + +/// Origin service reserved ordinals (W16). +pub const ORIGIN_SERVICE_RESERVED: [u32; 8] = [8, 31, 32, 33, 46, 88, 123, 159]; diff --git a/test/antithesis/intake/src/predicates/metric_point.rs b/test/antithesis/intake/src/predicates/metric_point.rs new file mode 100644 index 0000000000..92278f998b --- /dev/null +++ b/test/antithesis/intake/src/predicates/metric_point.rs @@ -0,0 +1,82 @@ +//! Per-point predicates from the `invariant-jig` `README.md` +//! §Properties.Payloads for the `MetricPoint` category. + +use std::num::NonZeroUsize; + +use datadog_protos::metrics::metric_payload::MetricSeries; + +/// W20 -- `MetricPoint`, Value Not-NaN. Specification §Properties.Payloads W20: +/// a point's `value` is not NaN. The W20 asymmetry note records intake +/// admitting `+/-Inf` and rejecting only NaN, so the predicate checks `is_nan` +/// alone. Returns the index of the first NaN point paired with the count of NaN +/// points across the series, or `None` when the series carries none. +#[must_use] +pub fn w20_nan_violations(series: &MetricSeries) -> Option<(usize, NonZeroUsize)> { + let first = series.points.iter().position(|p| p.value.is_nan())?; + let count = series.points.iter().filter(|p| p.value.is_nan()).count(); + NonZeroUsize::new(count).map(|count| (first, count)) +} + +/// W21 -- `MetricPoint`, Timestamp Future Bound. Specification +/// §Properties.Payloads W21: a point's `timestamp` is at most `intake_now + +/// MaxSecondsInFuture`. Intake drops points strictly past that bound, so the +/// predicate flags `timestamp > intake_now + max_future`. `intake_now_secs` is +/// the intake's wall clock at request receipt in epoch seconds. +/// `max_future_secs` is the bound. Both are epoch-second quantities far below +/// `i64::MAX`, so the sum does not overflow. Returns the index of the first +/// over-bound point paired with the count across the series, or `None` when +/// every point is within the bound. +#[must_use] +pub fn w21_future_violations( + series: &MetricSeries, intake_now_secs: i64, max_future_secs: i64, +) -> Option<(usize, NonZeroUsize)> { + let bound = intake_now_secs + max_future_secs; + let first = series.points.iter().position(|p| p.timestamp > bound)?; + let count = series.points.iter().filter(|p| p.timestamp > bound).count(); + NonZeroUsize::new(count).map(|count| (first, count)) +} + +#[cfg(test)] +mod tests { + use datadog_protos::metrics::metric_payload::{MetricPoint, MetricSeries}; + + use super::*; + + fn series_with(values: &[(i64, f64)]) -> MetricSeries { + let mut series = MetricSeries::new(); + for &(timestamp, value) in values { + let mut point = MetricPoint::new(); + point.timestamp = timestamp; + point.value = value; + series.points.push(point); + } + series + } + + #[test] + fn w20_admits_infinities_flags_nan() { + let series = series_with(&[(0, f64::INFINITY), (1, f64::NEG_INFINITY), (2, 1.0)]); + assert!(w20_nan_violations(&series).is_none()); + } + + #[test] + fn w20_reports_first_index_and_count() { + let series = series_with(&[(0, 1.0), (1, f64::NAN), (2, 2.0), (3, f64::NAN)]); + assert_eq!( + w20_nan_violations(&series).map(|(first, count)| (first, count.get())), + Some((1, 2)), + ); + } + + #[test] + fn w21_is_strict_at_the_future_bound() { + // bound = 1000 + 600 = 1600. A point at exactly 1600 holds; 1601 fires. + let at_bound = series_with(&[(1600, 1.0)]); + assert!(w21_future_violations(&at_bound, 1000, 600).is_none()); + let past_bound = series_with(&[(1601, 1.0)]); + assert_eq!( + w21_future_violations(&past_bound, 1000, 600).map(|(first, count)| (first, count.get())), + Some((0, 1)), + ); + } +} diff --git a/test/antithesis/intake/src/predicates/payload.rs b/test/antithesis/intake/src/predicates/payload.rs new file mode 100644 index 0000000000..27a6f2a7e3 --- /dev/null +++ b/test/antithesis/intake/src/predicates/payload.rs @@ -0,0 +1,170 @@ +//! Envelope and byte-level payload predicates from the `invariant-jig` +//! `README.md` §Properties.Payloads. + +use axum::http::HeaderMap; +use datadog_protos::metrics::MetricPayload; + +/// W1 -- Envelope, Content-Type. Specification §Properties.Payloads W1: +/// `Content-Type` in `{application/x-protobuf, application/json}` after +/// `mime.ParseMediaType` normalization. We do the equivalent normalization: +/// strip parameters (everything from the first `;`), trim surrounding +/// whitespace, and fold the media type to ASCII lowercase. An absent or empty +/// header fails the check. The Agent emits `application/x-protobuf` on metric +/// submissions and `application/json` on connectivity probes. +#[must_use] +pub fn w1_content_type(h: &HeaderMap) -> bool { + let Some(value) = h.get("content-type").and_then(|v| v.to_str().ok()) else { + return false; + }; + let essence = value.split(';').next().unwrap_or("").trim().to_ascii_lowercase(); + matches!(essence.as_str(), "application/x-protobuf" | "application/json") +} + +/// W2 -- Envelope, Content-Encoding. Specification §Properties.Payloads W2: +/// `Content-Encoding` in `{deflate, gzip, zstd, identity}`. An absent header is +/// identity per RFC 7231, so it holds. Content-coding values are +/// case-insensitive. Intake does not reject an unknown value at the header +/// layer. It falls through to no-compression decode, so W2 is +/// observation-only in the pipeline. +#[must_use] +pub fn w2_content_encoding(h: &HeaderMap) -> bool { + let Some(value) = h.get("content-encoding").map(axum::http::HeaderValue::as_bytes) else { + return true; + }; + [b"deflate".as_slice(), b"gzip", b"zstd", b"identity"] + .iter() + .any(|accepted| value.eq_ignore_ascii_case(accepted)) +} + +/// W3 -- Envelope, API Key. Specification §Properties.Payloads W3: `DD-Api-Key` +/// is present iff a non-empty key was configured. +#[must_use] +pub fn w3_api_key(h: &HeaderMap) -> bool { + h.get("dd-api-key").is_some_and(|v| !v.as_bytes().is_empty()) +} + +// W4 -- Envelope, User-Agent -- is intentionally not asserted. The invariant-jig +// W4 checks a `datadog-agent/` User-Agent prefix, sourced from the Go Agent's +// default forwarder. ADP identifies itself as `agent-data-plane/x.y.z` by design +// (see `common/datadog/middleware.rs`), so the Go Agent's prefix never holds. +// Intake does not validate User-Agent, so there is no cross-cut contract to +// assert here. Run 5ef26a05...-54-9 confirmed a 100% false-positive firing. + +/// W5 -- Bytes, Compressed Size. Specification §Properties.Payloads W5: +/// `body < 500 KiB compressed`. The Agent caps at `<= 512000` and intake +/// rejects at `>= 512000`, so the cross-cut bound is strict. +#[must_use] +pub fn w5_compressed_size(body_len: u64, cap: u64) -> bool { + body_len < cap +} + +/// W6 -- Bytes, Uncompressed Size. Specification §Properties.Payloads W6: +/// `body <= 5 MiB uncompressed`. The bound is inclusive. The caller passes the +/// decompressed body length. The asymmetry note records intake enforcing the +/// cap only on the decompression path. +#[must_use] +pub fn w6_uncompressed_size(uncompressed_len: u64, cap: u64) -> bool { + uncompressed_len <= cap +} + +/// W8 -- `MetricPayload`, Point Count. Specification §Properties.Payloads W8: +/// total points across every series at or below the configured +/// `serializer_max_series_points_per_payload` cap. The Agent never assembles a +/// payload past it, and intake does not re-validate, so W8 is observation-only +/// in the pipeline. Returns the observed total when it exceeds the cap, else +/// `None`. +#[must_use] +pub fn w8_point_count(payload: &MetricPayload, max_points: usize) -> Option { + let total: usize = payload.series.iter().map(|s| s.points.len()).sum(); + (total > max_points).then_some(total) +} + +/// W22 -- Bytes, Content-Length. Specification §Properties.Payloads W22: +/// `Content-Length` absent or its value equals the body byte count. Both the +/// Agent and intake leave this to HTTP framing, so W22 is observation-only in +/// the pipeline. `declared` is the `Content-Length` value read from the wire +/// before the decompression layer strips it, and `body_len` is the measured +/// wire body length, which is the byte count the header describes. +#[must_use] +pub fn w22_content_length(declared: Option, body_len: u64) -> bool { + declared.is_none_or(|declared| declared == body_len) +} + +#[cfg(test)] +mod tests { + use axum::http::HeaderValue; + + use super::*; + + #[test] + fn w1_absent_header_fails() { + assert!(!w1_content_type(&HeaderMap::new())); + } + + #[test] + fn w1_strips_parameters_and_folds_case() { + let mut h = HeaderMap::new(); + h.insert( + "content-type", + HeaderValue::from_static("Application/X-Protobuf; charset=utf-8"), + ); + assert!(w1_content_type(&h)); + } + + #[test] + fn w1_rejects_unknown_media_type() { + let mut h = HeaderMap::new(); + h.insert("content-type", HeaderValue::from_static("text/plain")); + assert!(!w1_content_type(&h)); + } + + #[test] + fn w2_absent_header_is_identity() { + assert!(w2_content_encoding(&HeaderMap::new())); + } + + #[test] + fn w2_case_insensitive_accepts() { + let mut h = HeaderMap::new(); + h.insert("content-encoding", HeaderValue::from_static("GZIP")); + assert!(w2_content_encoding(&h)); + } + + #[test] + fn w2_rejects_unknown_encoding() { + let mut h = HeaderMap::new(); + h.insert("content-encoding", HeaderValue::from_static("br")); + assert!(!w2_content_encoding(&h)); + } + + #[test] + fn w3_present_nonempty_holds() { + let mut h = HeaderMap::new(); + h.insert("dd-api-key", HeaderValue::from_static("abc")); + assert!(w3_api_key(&h)); + assert!(!w3_api_key(&HeaderMap::new())); + } + + #[test] + fn w5_is_strict_below_the_cap() { + let cap = 512_000_u64; + assert!(w5_compressed_size(511_999, cap)); + assert!(!w5_compressed_size(cap, cap)); + assert!(!w5_compressed_size(512_001, cap)); + } + + #[test] + fn w6_is_inclusive_at_the_cap() { + let cap = 5_242_880_u64; + assert!(w6_uncompressed_size(5_242_879, cap)); + assert!(w6_uncompressed_size(cap, cap)); + assert!(!w6_uncompressed_size(5_242_881, cap)); + } + + #[test] + fn w22_absent_holds_and_mismatch_fails() { + assert!(w22_content_length(None, 42)); + assert!(w22_content_length(Some(42), 42)); + assert!(!w22_content_length(Some(43), 42)); + } +} diff --git a/test/antithesis/intake/src/predicates/series.rs b/test/antithesis/intake/src/predicates/series.rs new file mode 100644 index 0000000000..f468562343 --- /dev/null +++ b/test/antithesis/intake/src/predicates/series.rs @@ -0,0 +1,373 @@ +//! Series-level predicates from the `invariant-jig` `README.md` +//! §Properties.Payloads for checks that read a whole `MetricSeries` rather than +//! a single point. + +use datadog_protos::metrics::metric_payload::{MetricSeries, Resource}; +use datadog_protos::metrics::MetricType; + +use crate::predicates::constants::{ + MAX_HOST_NAME_BYTES, MAX_METRIC_NAME_BYTES, ORIGIN_CATEGORY_MAX, ORIGIN_CATEGORY_RESERVED, ORIGIN_PRODUCT_MAX, + ORIGIN_SERVICE_MAX, ORIGIN_SERVICE_RESERVED, +}; + +/// Intake resolves a series' host with `series.Host()`, which scans the +/// resources and returns the first whose `type` is `"host"`. It ignores +/// position and ignores any later host-typed resource. A series with no +/// host-typed resource resolves to no host. The scan mirrors dd-source +/// `apiv2/custom.go MetricSeries.Host`. W17 and W19 both read the host through +/// this one helper, so the rig resolves the host exactly as intake does. +#[must_use] +pub fn host_resource(series: &MetricSeries) -> Option<&Resource> { + series.resources.iter().find(|r| r.type_() == "host") +} + +/// W19 -- Resource, Host Name Length. Specification §Properties.Payloads W19: +/// the host name is at most 255 bytes. Intake reads the host via +/// [`host_resource`], the first resource whose `type` is `"host"`. A series +/// with no host-typed resource carries no host name, so the check does not +/// apply. Returns the over-cap host name when it exceeds the bound, or `None` +/// otherwise. +#[must_use] +pub fn w19_host_name_too_long(series: &MetricSeries) -> Option<&str> { + let host = host_resource(series)?; + (host.name().len() > MAX_HOST_NAME_BYTES).then_some(host.name()) +} + +/// W13 -- `MetricSeries`, Tag Count. Specification §Properties.Payloads W13: a +/// series carries at most `MaxTags(orgID)` tags. The bound is inclusive, so +/// intake rejects a series strictly over the cap. An Agent reports to one org, +/// so the caller passes that single org's `max_tags` cap. Returns the series +/// tag count when it exceeds the cap, or `None` otherwise. The W13 asymmetry +/// records the Agent not capping tag count, so this is an intake-only check. +#[must_use] +pub fn w13_tag_count(series: &MetricSeries, max_tags: usize) -> Option { + let count = series.tags.len(); + (count > max_tags).then_some(count) +} + +/// W18 -- `MetricSeries`, Resource Count. Specification §Properties.Payloads +/// W18: a series carries at most `MaxResources(orgID)` resources. The bound is +/// inclusive, so intake rejects a series strictly over the cap. An Agent +/// reports to one org, so the caller passes that single org's `max_resources` +/// cap. Returns the series resource count when it exceeds the cap, or `None` +/// otherwise. The Agent does not cap resource count, so this is an intake-only +/// check. +#[must_use] +pub fn w18_resource_count(series: &MetricSeries, max_resources: usize) -> Option { + let count = series.resources.len(); + (count > max_resources).then_some(count) +} + +/// W9 -- `MetricSeries`, Metric Non-Empty. Specification §Properties.Payloads +/// W9: a series carries a non-empty `metric`. Whitespace-only names are +/// non-empty and pass this step. The Agent does not validate the metric name, +/// so this is an intake-only check. +#[must_use] +pub fn w9_metric_non_empty(series: &MetricSeries) -> bool { + !series.metric().is_empty() +} + +/// W10 -- `MetricSeries`, Metric Length. Specification §Properties.Payloads +/// W10: the metric name is at most 350 bytes. Intake measures the byte length +/// inside `ValidateMetricName`, after the empty check and before the +/// alphabetic check. Returns the over-cap byte length when it exceeds the +/// bound, or `None` otherwise. The Agent does not validate the metric name, so +/// this is intake-only. +#[must_use] +pub fn w10_metric_name_too_long(series: &MetricSeries) -> Option { + let len = series.metric().len(); + (len > MAX_METRIC_NAME_BYTES).then_some(len) +} + +/// W11 -- `MetricSeries`, Metric Alphabetic. Specification §Properties.Payloads +/// W11: a metric name contains at least one ASCII alphabetic character. +/// Intake's `ValidateMetricName` scans the name byte by byte with `isAlpha` +/// ([A-Za-z]) and rejects a name with no alphabetic byte, the last of its +/// three metric-name checks. The scan is byte-wise, so a name of only digits, +/// punctuation, or multibyte UTF-8 carries no alphabetic byte. Returns whether +/// the name lacks an alphabetic byte. The empty name is rejected upstream by +/// W9, so the pipeline reaches this check only with a non-empty name. +#[must_use] +pub fn w11_metric_name_no_alpha(series: &MetricSeries) -> bool { + !series.metric().bytes().any(|b| b.is_ascii_alphabetic()) +} + +/// W12 -- `MetricSeries`, Type Enum. Specification §Properties.Payloads W12: +/// the metric type is one of `COUNT`, `RATE`, or `GAUGE`. The Agent's +/// compile-time enum admits only those three, so any payload it sends carries +/// an in-domain type. Intake unmarshals the wire `int32` with no enum check +/// and admits any value, so W12 is observation-only. The proto3 wire default 0 +/// is `UNSPECIFIED`, itself outside the accepted set, so a series that never +/// set the field fires. Returns the out-of-domain wire value, or `None` when +/// the type is one of the three. +#[must_use] +pub fn w12_type_out_of_domain(series: &MetricSeries) -> Option { + let in_domain = matches!( + series.type_.enum_value(), + Ok(MetricType::COUNT | MetricType::RATE | MetricType::GAUGE) + ); + (!in_domain).then_some(series.type_.value()) +} + +/// The two tag prefixes the Agent reserves for resource promotion. A `device:` +/// tag promotes to a `(type='device', name=value)` resource and a +/// `dd.internal.resource:type:name` tag promotes to a `(type, name)` resource. +/// The Agent strips both before sending, so a survivor on the wire is the W14 +/// observation. +const RESERVED_TAG_PREFIXES: [&str; 2] = ["device:", "dd.internal.resource:"]; + +/// W14 -- `MetricSeries`, Tag Prefix Reserved. Specification §Properties.Payloads +/// W14: no tag starts with `device:` or `dd.internal.resource:`. The Agent +/// strips both prefixes pre-send, promoting the tag to a resource, so any +/// payload it sends carries none. Intake parses survivors as routing metadata +/// with no rejection, so W14 is observation-only. Returns the first +/// reserved-prefix tag, or `None` when no tag carries a reserved prefix. +#[must_use] +pub fn w14_reserved_tag_prefix(series: &MetricSeries) -> Option<&str> { + series + .tags + .iter() + .find(|tag| RESERVED_TAG_PREFIXES.iter().any(|prefix| tag.starts_with(prefix))) + .map(String::as_str) +} + +/// W15 -- `MetricSeries`, Per-Series Point Count. Specification +/// §Properties.Payloads W15: a series carries at most +/// `serializer_max_series_points_per_payload` points, the same cap W8 totals +/// across the whole payload, default 10 000. The Agent drops a series whose +/// point count exceeds the cap, so any payload it sends carries none over. +/// Intake does not re-validate, so W15 is observation-only. Returns the +/// over-cap point count, or `None` when the count is at or below the cap. +#[must_use] +pub fn w15_point_count(series: &MetricSeries, max_points: usize) -> Option { + let count = series.points.len(); + (count > max_points).then_some(count) +} + +/// The origin field W16 found out of domain. The handler names it in the +/// triage detail so a reader sees which of the three fields the Agent put out +/// of domain. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum OriginField { + /// `MetricSeries.metadata.origin.origin_product`. + Product, + /// `MetricSeries.metadata.origin.origin_category`. + Category, + /// `MetricSeries.metadata.origin.origin_service`. + Service, +} + +impl OriginField { + /// The proto field name, for the triage detail. + #[must_use] + pub fn as_str(self) -> &'static str { + match self { + OriginField::Product => "origin_product", + OriginField::Category => "origin_category", + OriginField::Service => "origin_service", + } + } +} + +/// A field value is in domain when it is at or below its enum maximum and the +/// proto does not mark it `reserved`. Value `0` is each enum's default +/// no-origin member and sits below the maximum, so it is in domain. +fn origin_field_in_domain(value: u32, max: u32, reserved: &[u32]) -> bool { + value <= max && !reserved.contains(&value) +} + +/// W16 -- `MetricSeries`, Origin Populated. Specification §Properties.Payloads +/// W16: each populated `Origin` field is a defined enum member. The Agent +/// derives all three from the metric source through a fixed map over the +/// `OriginProduct`, `OriginSubProduct`, and `OriginProductDetail` members, so a +/// series it built carries only defined values, and intake backfills an absent +/// `Origin` without validating the domain. W16 is observation-only. A series +/// with no `Metadata` or no `Origin` holds. Returns the first out-of-domain +/// field and its wire value, checking product then category then service, or +/// `None` when every populated field is in domain. +#[must_use] +pub fn w16_origin_out_of_domain(series: &MetricSeries) -> Option<(OriginField, u32)> { + let origin = series.metadata.as_ref()?.origin.as_ref()?; + if !origin_field_in_domain(origin.origin_product, ORIGIN_PRODUCT_MAX, &[]) { + return Some((OriginField::Product, origin.origin_product)); + } + if !origin_field_in_domain(origin.origin_category, ORIGIN_CATEGORY_MAX, &ORIGIN_CATEGORY_RESERVED) { + return Some((OriginField::Category, origin.origin_category)); + } + if !origin_field_in_domain(origin.origin_service, ORIGIN_SERVICE_MAX, &ORIGIN_SERVICE_RESERVED) { + return Some((OriginField::Service, origin.origin_service)); + } + None +} + +/// Why W17 found the host intake resolves is not the Agent hostname. The +/// handler names it in the triage detail so a reader sees whether the host +/// resource was absent or merely misnamed. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum HostResolution<'a> { + /// No resource has `type` `"host"`, so `Host()` resolves to `""`. + Missing, + /// A host resource resolved, but its name is not the Agent hostname. + /// Carries the resolved name, which may be empty when the series wrote a + /// host resource with an empty name. + Mismatch(&'a str), +} + +impl<'a> HostResolution<'a> { + /// A triage-friendly view of the resolution: a stable kind string and the + /// resolved host name when one was present. `Missing` means no `type=host` + /// resource at all. `Mismatch` carries the resolved name, which is the empty + /// string when the series wrote a host resource without a name. The handler + /// puts both in the assertion detail so a reader can tell an omitted host + /// resource from one named something other than the expected hostname. + #[must_use] + pub fn as_detail(self) -> (&'static str, Option<&'a str>) { + match self { + HostResolution::Missing => ("missing", None), + HostResolution::Mismatch(name) => ("mismatch", Some(name)), + } + } +} + +/// W17 -- Resource, Host Resource Resolved. Specification §Properties.Payloads +/// W17: intake's `Host()` scan resolves a host resource named the Agent +/// hostname. Intake reads the host via [`host_resource`], the first resource +/// whose `type` is `"host"`, ignoring position. The Agent writes that resource +/// named its configured hostname, so a series it built resolves to `expected`. +/// W17 is observation-only. Returns [`HostResolution::Missing`] when no +/// resource is host-typed, [`HostResolution::Mismatch`] with the resolved name +/// when it is not `expected`, or `None` when the resolved host name equals +/// `expected`. +#[must_use] +pub fn w17_host_unresolved<'a>(series: &'a MetricSeries, expected: &str) -> Option> { + match host_resource(series) { + None => Some(HostResolution::Missing), + Some(host) if host.name() != expected => Some(HostResolution::Mismatch(host.name())), + Some(_) => None, + } +} + +#[cfg(test)] +mod tests { + use datadog_protos::metrics::metric_payload::{MetricSeries, MetricType, Resource}; + use datadog_protos::metrics::{Metadata, Origin}; + + use super::*; + + fn host_res(name: &str) -> Resource { + let mut r = Resource::new(); + r.set_type("host".into()); + r.set_name(name.into()); + r + } + + #[test] + fn w9_w10_w11_metric_name_checks() { + let mut s = MetricSeries::new(); + assert!(!w9_metric_non_empty(&s)); + s.set_metric("system.cpu.idle".into()); + assert!(w9_metric_non_empty(&s)); + assert!(w10_metric_name_too_long(&s).is_none()); + assert!(!w11_metric_name_no_alpha(&s)); + + s.set_metric("a".repeat(351)); + assert_eq!(w10_metric_name_too_long(&s), Some(351)); + + s.set_metric("123.456".into()); + assert!(w11_metric_name_no_alpha(&s)); + } + + #[test] + fn w12_unspecified_is_out_of_domain() { + let mut s = MetricSeries::new(); + // proto3 default is UNSPECIFIED (0), outside the accepted set. + assert_eq!(w12_type_out_of_domain(&s), Some(0)); + s.set_type(MetricType::GAUGE); + assert!(w12_type_out_of_domain(&s).is_none()); + } + + #[test] + fn w13_w15_w18_counts() { + let mut s = MetricSeries::new(); + for i in 0..101 { + s.tags.push(format!("k{i}:v")); + } + assert_eq!(w13_tag_count(&s, 100), Some(101)); + assert!(w13_tag_count(&s, 200).is_none()); + + let mut s2 = MetricSeries::new(); + s2.resources.push(host_res("h")); + assert!(w18_resource_count(&s2, 500).is_none()); + } + + #[test] + fn w14_reserved_prefix_detected() { + let mut s = MetricSeries::new(); + s.tags.push("env:prod".into()); + assert!(w14_reserved_tag_prefix(&s).is_none()); + s.tags.push("device:eth0".into()); + assert_eq!(w14_reserved_tag_prefix(&s), Some("device:eth0")); + } + + #[test] + fn w17_w19_host_resolution() { + let mut s = MetricSeries::new(); + // No host resource resolves to Missing. + assert_eq!(w17_host_unresolved(&s, "agent"), Some(HostResolution::Missing)); + + assert_eq!(HostResolution::Missing.as_detail(), ("missing", None)); + + s.resources.push(host_res("other")); + assert_eq!( + w17_host_unresolved(&s, "agent"), + Some(HostResolution::Mismatch("other")) + ); + + // An empty host name resolves to Mismatch(""), not Missing: the resource + // exists, it just has no name. The detail preserves that distinction. + let mut s_empty = MetricSeries::new(); + s_empty.resources.push(host_res("")); + assert_eq!( + w17_host_unresolved(&s_empty, "agent").map(HostResolution::as_detail), + Some(("mismatch", Some(""))) + ); + + let mut s2 = MetricSeries::new(); + s2.resources.push(host_res("agent")); + assert!(w17_host_unresolved(&s2, "agent").is_none()); + assert!(w19_host_name_too_long(&s2).is_none()); + + let mut s3 = MetricSeries::new(); + s3.resources.push(host_res(&"h".repeat(256))); + assert!(w19_host_name_too_long(&s3).is_some()); + } + + #[test] + fn w16_origin_domain_checks() { + let mut s = MetricSeries::new(); + // No metadata holds. + assert!(w16_origin_out_of_domain(&s).is_none()); + + let mut origin = Origin::new(); + origin.origin_product = ORIGIN_PRODUCT_MAX + 1; + let mut meta = Metadata::new(); + meta.origin = Some(origin).into(); + s.metadata = Some(meta).into(); + assert_eq!( + w16_origin_out_of_domain(&s), + Some((OriginField::Product, ORIGIN_PRODUCT_MAX + 1)) + ); + + // A reserved category ordinal is out of domain even below the max. + let mut origin = Origin::new(); + origin.origin_category = ORIGIN_CATEGORY_RESERVED[0]; + let mut meta = Metadata::new(); + meta.origin = Some(origin).into(); + let mut s2 = MetricSeries::new(); + s2.metadata = Some(meta).into(); + assert_eq!( + w16_origin_out_of_domain(&s2), + Some((OriginField::Category, ORIGIN_CATEGORY_RESERVED[0])) + ); + } +} diff --git a/test/antithesis/intake/src/state.rs b/test/antithesis/intake/src/state.rs new file mode 100644 index 0000000000..9bd1a8516c --- /dev/null +++ b/test/antithesis/intake/src/state.rs @@ -0,0 +1,79 @@ +//! Shared application state for the intake. +//! +//! Holds the ingested-metric store that backs `/metrics/dump` and the expected +//! Agent hostname that W17 resolves each series against. + +use std::sync::{Arc, Mutex}; + +use datadog_protos::metrics::{MetricPayload, SketchPayload}; +use saluki_error::GenericError; +use stele::Metric; + +/// Application state passed to every handler. +#[derive(Clone)] +pub struct AppState { + /// Ingested metrics, exposed at `/metrics/dump`. + pub metrics: MetricsState, + /// The Agent hostname W17 resolves each series' host resource against. Set + /// from `DD_HOSTNAME` at startup to match the ADP `datadog.yaml`. + pub expected_hostname: Arc, +} + +/// In-memory store of every metric the intake has ingested, in the simplified +/// `stele::Metric` form. Reused by `/metrics/dump`, which the +/// `finally_verify_delivery` test command polls to confirm end-to-end delivery. +#[derive(Clone)] +pub struct MetricsState { + metrics: Arc>>, +} + +impl MetricsState { + /// Creates a new, empty `MetricsState`. + #[must_use] + pub fn new() -> Self { + Self { + metrics: Arc::new(Mutex::new(Vec::new())), + } + } + + /// Dumps the current metrics state. + #[must_use] + pub fn dump_metrics(&self) -> Vec { + self.metrics.lock().unwrap().clone() + } + + /// Merges the given series v2 payload into the current metrics state. + /// + /// # Errors + /// + /// Returns an error when a series carries an `UNSPECIFIED` type or an + /// out-of-range timestamp that `stele` cannot represent. + pub fn merge_series_v2_payload(&self, payload: MetricPayload) -> Result<(), GenericError> { + let metrics = Metric::try_from_series_v2(payload)?; + self.metrics.lock().unwrap().extend(metrics); + Ok(()) + } + + /// Merges the given series v1 payload into the current metrics state. + /// + /// # Errors + /// + /// Returns an error when the bytes do not decode as a v1 series payload. + pub fn merge_series_v1_payload(&self, bytes: &[u8]) -> Result<(), GenericError> { + let metrics = Metric::try_from_series_v1(bytes)?; + self.metrics.lock().unwrap().extend(metrics); + Ok(()) + } + + /// Merges the given sketch payload into the current metrics state. + /// + /// # Errors + /// + /// Returns an error when a sketch carries an out-of-range timestamp that + /// `stele` cannot represent. + pub fn merge_sketch_payload(&self, payload: SketchPayload) -> Result<(), GenericError> { + let metrics = Metric::try_from_sketch(payload)?; + self.metrics.lock().unwrap().extend(metrics); + Ok(()) + } +} diff --git a/test/antithesis/scratchbook/w-property-intake.md b/test/antithesis/scratchbook/w-property-intake.md new file mode 100644 index 0000000000..aacb8195a1 --- /dev/null +++ b/test/antithesis/scratchbook/w-property-intake.md @@ -0,0 +1,83 @@ +# W-property intake + +The antithesis harness has its own intake. It lives in the `antithesis-intake` +crate at `test/antithesis/intake`. It replaces the shared `datadog-intake` in +the `intake` image. + +## Why a dedicated intake + +The shared `bin/correctness/datadog-intake` serves integration and correctness +tests. The W-property assertions belong to the antithesis run alone. A separate +binary keeps the shared intake clean and lets the antithesis intake carry the +Antithesis SDK and evolve on its own. + +## What it does + +It observes every `/api/v2/series` payload ADP delivers and fires an Antithesis +SDK `assert_always!` per W property. The properties come from the `invariant-jig` +checker. See that project's `README.md` §Properties.Payloads for the numbering +and the double-sourced Agent and intake references. + +The intake also keeps the `/metrics/dump` contract `finally_verify_delivery` +polls. It merges v1 series, v2 series, and sketches into a `stele::Metric` store +exposed at `/metrics/dump`. The fallback returns `200 OK` so ADP connectivity +probes and unmodelled endpoints succeed. + +## Properties + +W1-W22, the full set: + +- Envelope W1-W3 read raw headers. Content-Type, Content-Encoding, API key. + W4 is dropped. ADP sends `agent-data-plane/`, not the Go Agent's + `datadog-agent/`, and intake does not validate User-Agent, so there is no + cross-cut contract to assert. Run 5ef26a05 confirmed a 100% false positive. +- Bytes W5, W6, W22 read sizes. Compressed cap, uncompressed cap, content-length + equality. +- Decode W7 parses the body as a `MetricPayload`. +- Payload W8 totals points across the payload. +- Series W9-W19 read each `MetricSeries`. Name, type, tag and resource counts, + reserved tag prefixes, origin enum domain, host resolution and length. +- Point W20, W21 read each `MetricPoint`. NaN value, future timestamp bound. + +## Envelope measurement + +W1-W6 and W22 need the compressed body and the raw headers. The series route +runs a `measure_compressed_size` middleware ahead of the decompression layer. It +records the compressed length, the Content-Encoding, and the declared +Content-Length before the layer strips them, then passes the request along with +those riding as extensions. W6 reads the decompressed length in the handler. + +## Hostname and W17 + +W17 resolves each series host against `DD_HOSTNAME`, default `antithesis-adp`. +Keep it in sync with `hostname:` in `deploy/adp/datadog.yaml`. The `intake` +image sets `DD_HOSTNAME=antithesis-adp`. + +Source-confirmed contract. The Agent emits a `type=host` resource on every v2 +series unconditionally, named `serie.Host` (`datadog-agent` +`pkg/serializer/internal/metrics/iterable_series.go:296`). ADP does the same, +named `metric.metadata().hostname().unwrap_or_default()` +(`encoders/datadog/metrics/mod.rs:919`). So the host resource is never absent on +either side. W17 failures are therefore name mismatches, not missing resources. +The prime suspect is an empty hostname when dogstatsd metrics are not +host-enriched, where the Agent would write the configured hostname. + +W17 now records the resolution in the assertion detail. `observed` carries +`resolution` (`missing` or `mismatch`) and `resolved`, the host name on the +wire. An empty name reads as `mismatch` with `resolved: ""`, which distinguishes +an unnamed host resource from a wrong name. Run 5ef26a05 fired W17 781/851 but +the detail did not carry the resolved value, so the cause was undetermined. The +next run will show it. + +## Probe fast-path + +ADP probes endpoint connectivity with a `{}` body. That is not a metric payload. +The handler returns `202 Accepted` for it and skips the W pipeline so W7 does not +fire a spurious assertion on every probe. + +## Proto layer + +The checker predicates port from `prost` to the `rust-protobuf` types in +`datadog_protos`. Field semantics match. The agent-payload proto is identical. +Access changes only. `series.metric()`, `r.type_()`, `series.type_.enum_value()`, +`MessageField` for `metadata` and `origin`.