Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ members = [
"lib/saluki-tls",
"lib/stringtheory",
"test/antithesis/harness",
"test/antithesis/intake",
]
resolver = "2"

Expand Down
17 changes: 10 additions & 7 deletions test/antithesis/deploy/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#
# Build context is the repository root. Three named targets:
# - adp : agent-data-plane built WITH Antithesis coverage instrumentation + SDK (the SUT)
# - intake : datadog-intake mock Datadog intake (dependency)
# - intake : antithesis-intake mock Datadog intake + W-property assertions (dependency)
# - workload : DogStatsD driver + test templates + setup-complete (the client)
#
# ADP is built native x86_64-unknown-linux-gnu (glibc), so no musl cross-compile headers are needed.
Expand Down Expand Up @@ -67,7 +67,7 @@ RUN --mount=type=cache,target=/adp/target,id=antithesis-adp-target \
echo "Instrumentation symbols present."

# ---------------------------------------------------------------------------
# Build the correctness tools (datadog-intake) and the test-command binaries, uninstrumented.
# Build the antithesis-intake mock and the test-command binaries, uninstrumented.
# These are supporting harness components, not the SUT, so they need no coverage instrumentation.
# ---------------------------------------------------------------------------
FROM build-base AS tools-builder
Expand All @@ -77,11 +77,11 @@ RUN --mount=type=cache,target=/tools/target,id=antithesis-tools-target \
--mount=type=cache,target=/root/.cargo/registry,id=cargo-registry \
--mount=type=cache,target=/root/.cargo/git,id=cargo-git \
cargo build --release \
--bin datadog-intake \
--bin antithesis-intake \
--bin parallel_driver_send_dogstatsd --bin parallel_driver_sketchburst \
--bin finally_verify_delivery --bin eventually_adp_alive \
--bin first_sample_config && \
cp /tools/target/release/datadog-intake /usr/local/bin/datadog-intake && \
cp /tools/target/release/antithesis-intake /usr/local/bin/antithesis-intake && \
cp /tools/target/release/parallel_driver_send_dogstatsd /usr/local/bin/parallel_driver_send_dogstatsd && \
cp /tools/target/release/parallel_driver_sketchburst /usr/local/bin/parallel_driver_sketchburst && \
cp /tools/target/release/finally_verify_delivery /usr/local/bin/finally_verify_delivery && \
Expand Down Expand Up @@ -118,12 +118,15 @@ ENTRYPOINT ["/entrypoint.sh"]
CMD ["run"]

# ---------------------------------------------------------------------------
# Runtime: datadog-intake (mock Datadog intake dependency).
# Runtime: antithesis-intake (mock Datadog intake + W-property assertions).
# ---------------------------------------------------------------------------
FROM ${APP_IMAGE} AS intake
ENV NO_COLOR=1
COPY --from=tools-builder /usr/local/bin/datadog-intake /usr/local/bin/datadog-intake
ENTRYPOINT ["/usr/local/bin/datadog-intake"]
# W17 resolves each series' host resource against this hostname. Keep it in sync
# with `hostname:` in adp/datadog.yaml and the DD_HOSTNAME default in the harness.
ENV DD_HOSTNAME=antithesis-adp
COPY --from=tools-builder /usr/local/bin/antithesis-intake /usr/local/bin/antithesis-intake
ENTRYPOINT ["/usr/local/bin/antithesis-intake"]

# ---------------------------------------------------------------------------
# Runtime: workload client (DogStatsD driver + test templates).
Expand Down
2 changes: 1 addition & 1 deletion test/antithesis/deploy/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ services:
environment:
NO_COLOR: "1"
healthcheck:
# datadog-intake serves HTTP on :2049. /dev/tcp avoids needing curl in the image.
# antithesis-intake serves HTTP on :2049. /dev/tcp avoids needing curl in the image.
test: ["CMD-SHELL", "bash -c 'exec 3<>/dev/tcp/localhost/2049'"]
interval: 2s
timeout: 2s
Expand Down
47 changes: 47 additions & 0 deletions test/antithesis/intake/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
[package]
name = "antithesis-intake"
version = "0.1.0"
edition = { workspace = true }
license = { workspace = true }
repository = { workspace = true }
publish = false

[[bin]]
name = "antithesis-intake"
path = "src/main.rs"

[lints]
workspace = true

[dependencies]
antithesis_sdk = { workspace = true, features = ["full"] }
axum = { workspace = true, features = ["http1", "json", "tokio", "tracing"] }
datadog-protos = { workspace = true }
protobuf = { workspace = true }
saluki-error = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
stele = { workspace = true }
tokio = { workspace = true, features = [
"macros",
"net",
"rt",
"rt-multi-thread",
"signal",
] }
tower = { workspace = true }
tower-http = { workspace = true, features = [
"decompression-deflate",
"decompression-gzip",
"decompression-zstd",
] }
tracing = { workspace = true }
tracing-subscriber = { workspace = true, features = [
"ansi",
"env-filter",
"fmt",
"local-time",
"registry",
"std",
"tracing-log",
] }
60 changes: 60 additions & 0 deletions test/antithesis/intake/src/handlers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
//! Non-asserting handlers: the metrics dump, the v1-series and sketch merge
//! endpoints, and the permissive fallback.
//!
//! These do not carry W-property assertions. The v1-series and sketch
//! endpoints exist so the Agent's other metric submissions still reach
//! `/metrics/dump`, which `finally_verify_delivery` polls. The fallback returns
//! `200 OK` so the Agent's connectivity probes and any unmodelled endpoint
//! succeed, matching the prior `datadog-intake` behaviour.

use axum::{body::Bytes, extract::State, http::StatusCode, http::Uri, Json};
use datadog_protos::metrics::SketchPayload;
use protobuf::Message as _;
use stele::Metric;
use tracing::{debug, error};

use crate::state::AppState;

/// `GET /metrics/dump`: returns every ingested metric in simplified form.
pub async fn handle_metrics_dump(State(state): State<AppState>) -> Json<Vec<Metric>> {
Json(state.metrics.dump_metrics())
}

/// `POST /api/v1/series`: merges a legacy JSON series payload into the dump
/// store. A `{}` body is a connectivity probe and is accepted without merging.
pub async fn handle_series_v1(State(state): State<AppState>, body: Bytes) -> StatusCode {
if body == b"{}"[..] {
return StatusCode::ACCEPTED;
}
match state.metrics.merge_series_v1_payload(&body) {
Ok(()) => StatusCode::ACCEPTED,
Err(e) => {
error!(error = %e, "Failed to merge series v1 payload.");
StatusCode::BAD_REQUEST
}
}
}

/// `POST /api/beta/sketches`: merges a sketch payload into the dump store.
pub async fn handle_sketch(State(state): State<AppState>, body: Bytes) -> StatusCode {
let payload = match SketchPayload::parse_from_bytes(&body) {
Ok(payload) => payload,
Err(e) => {
error!(error = %e, "Failed to parse sketch payload.");
return StatusCode::BAD_REQUEST;
}
};
match state.metrics.merge_sketch_payload(payload) {
Ok(()) => StatusCode::ACCEPTED,
Err(e) => {
error!(error = %e, "Failed to merge sketch payload.");
StatusCode::BAD_REQUEST
}
}
}

/// Permissive fallback for unmodelled endpoints and connectivity probes.
pub async fn fallback(uri: Uri) -> StatusCode {
debug!("Got unhandled request: path={}", uri);
StatusCode::OK
}
116 changes: 116 additions & 0 deletions test/antithesis/intake/src/http.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
//! Axum HTTP surface for the intake.
//!
//! The `/api/v2/series` route stacks a measurement middleware ahead of the
//! decompression layer so the W5 (compressed size), W6 (uncompressed size),
//! and W22 (content-length) checks can read both the on-the-wire body length
//! and the decompressed body length. The middleware records the compressed
//! length, the `Content-Encoding`, and the declared `Content-Length` before
//! `RequestDecompressionLayer` consumes the encoding and strips the header,
//! then hands the request along unchanged with those measurements riding as
//! request extensions.

use axum::{
body::Body,
extract::{DefaultBodyLimit, Request},
http::{HeaderMap, StatusCode},
middleware::{from_fn, Next},
response::{IntoResponse, Response},
routing::{get, post},
Router,
};
use tower::ServiceBuilder;
use tower_http::decompression::RequestDecompressionLayer;

use crate::{handlers, intake, state::AppState};

/// Memory backstop on the body buffered by the measurement middleware. Sits
/// well above any spec-admitted body so it never preempts the W5 strict-below
/// check. Decompressed sketch payloads can be tens of MiB.
const MAX_BUFFERED_BODY_BYTES: usize = 64 * 1024 * 1024;

/// Compressed request-body length recorded before decompression and attached
/// as a request extension for the W5/W22 checks.
#[derive(Clone, Copy, Debug)]
pub struct CompressedLen(pub u64);

/// Whether the request entered the decompression path. Recorded from the
/// `Content-Encoding` header before the decompression layer consumes it,
/// attached as a request extension so the handler knows whether the W6
/// uncompressed bound applies.
#[derive(Clone, Copy, Debug)]
pub struct DecompressionApplied(pub bool);

/// The `Content-Length` value declared on the wire, read before the
/// decompression layer strips the header. Attached as a request extension so
/// the W22 check compares the declared length against the measured wire body
/// length rather than against the post-decompression headers, where the header
/// is absent.
#[derive(Clone, Copy, Debug)]
pub struct DeclaredContentLength(pub Option<u64>);

/// Build the intake router. `/api/v2/series` carries the W-property assertions;
/// the other metric routes merge into the same store so `/metrics/dump`
/// reflects everything the Agent delivered. The fallback returns `200 OK` so
/// the Agent's connectivity probes and any unmodelled endpoint succeed.
pub fn build_router(state: AppState) -> Router {
// W1-W6 and W22 need the compressed body and raw headers, so the series
// route runs `measure_compressed_size` outermost, then decompresses, then
// lifts the body limit (the middleware's own cap is the backstop).
let series = post(intake::handle_series).layer(
ServiceBuilder::new()
.layer(from_fn(measure_compressed_size))
.layer(RequestDecompressionLayer::new().pass_through_unaccepted(true))
.layer(DefaultBodyLimit::disable()),
);
// v1 series and sketches do not carry W assertions but still arrive
// compressed, so they decompress before merging into the dump store.
let decompress = || {
ServiceBuilder::new()
.layer(RequestDecompressionLayer::new().pass_through_unaccepted(true))
.layer(DefaultBodyLimit::max(MAX_BUFFERED_BODY_BYTES))
};

Router::new()
.route("/ready", get(|| async { StatusCode::OK }))
.route("/metrics/dump", get(handlers::handle_metrics_dump))
.route("/api/v2/series", series)
.route("/api/v1/series", post(handlers::handle_series_v1).layer(decompress()))
.route("/api/beta/sketches", post(handlers::handle_sketch).layer(decompress()))
.fallback(handlers::fallback)
.with_state(state)
}

fn declared_content_length(headers: &HeaderMap) -> Option<u64> {
headers
.get("content-length")
.and_then(|v| v.to_str().ok())
.and_then(|s| s.parse::<u64>().ok())
}

fn decompression_applied(headers: &HeaderMap) -> bool {
let Some(enc) = headers.get("content-encoding").and_then(|v| v.to_str().ok()) else {
return false;
};
let enc = enc.trim();
["deflate", "gzip", "zstd"]
.iter()
.any(|known| enc.eq_ignore_ascii_case(known))
}

/// Buffer the body, count its bytes, observe the `Content-Encoding` and
/// `Content-Length` headers before the decompression layer consumes them, then
/// hand the request along unchanged with the measurements riding as extensions.
async fn measure_compressed_size(req: Request, next: Next) -> Response {
let (parts, body) = req.into_parts();
let Ok(bytes) = axum::body::to_bytes(body, MAX_BUFFERED_BODY_BYTES).await else {
return StatusCode::PAYLOAD_TOO_LARGE.into_response();
};
let len = bytes.len() as u64;
let applied = decompression_applied(&parts.headers);
let declared = declared_content_length(&parts.headers);
let mut req = Request::from_parts(parts, Body::from(bytes));
req.extensions_mut().insert(CompressedLen(len));
req.extensions_mut().insert(DecompressionApplied(applied));
req.extensions_mut().insert(DeclaredContentLength(declared));
next.run(req).await
}
Loading
Loading