Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 9 additions & 12 deletions test/antithesis/deploy/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
# Build context is the repository root. Three named targets:
# - adp : agent-data-plane built WITH Antithesis coverage instrumentation + SDK (the SUT)
# - intake : datadog-intake mock Datadog intake (dependency)
# - workload : millstone load generator + test templates + setup-complete (the client)
# - workload : DogStatsD driver + test templates + setup-complete (the client)
#
# ADP is built native x86_64-unknown-linux-gnu (glibc), so no musl cross-compile headers are needed.

Expand Down Expand Up @@ -67,7 +67,7 @@ RUN --mount=type=cache,target=/adp/target,id=antithesis-adp-target \
echo "Instrumentation symbols present."

# ---------------------------------------------------------------------------
# Build the correctness tools (datadog-intake + millstone), uninstrumented.
# Build the correctness tools (datadog-intake) and the test-command binaries, uninstrumented.
# These are supporting harness components, not the SUT, so they need no coverage instrumentation.
# ---------------------------------------------------------------------------
FROM build-base AS tools-builder
Expand All @@ -77,11 +77,10 @@ RUN --mount=type=cache,target=/tools/target,id=antithesis-tools-target \
--mount=type=cache,target=/root/.cargo/registry,id=cargo-registry \
--mount=type=cache,target=/root/.cargo/git,id=cargo-git \
cargo build --release \
--bin datadog-intake --bin millstone \
--bin datadog-intake \
--bin parallel_driver_send_dogstatsd --bin finally_verify_delivery --bin eventually_adp_alive \
--bin first_sample_config && \
cp /tools/target/release/datadog-intake /usr/local/bin/datadog-intake && \
cp /tools/target/release/millstone /usr/local/bin/millstone && \
cp /tools/target/release/parallel_driver_send_dogstatsd /usr/local/bin/parallel_driver_send_dogstatsd && \
cp /tools/target/release/finally_verify_delivery /usr/local/bin/finally_verify_delivery && \
cp /tools/target/release/eventually_adp_alive /usr/local/bin/eventually_adp_alive && \
Expand All @@ -91,19 +90,18 @@ RUN --mount=type=cache,target=/tools/target,id=antithesis-tools-target \
# Runtime: Agent Data Plane (SUT).
# ---------------------------------------------------------------------------
FROM ${APP_IMAGE} AS adp
ENV NO_COLOR=1 \
RUST_BACKTRACE=1
ENV NO_COLOR=1
RUN apt-get update && \
apt-get install --no-install-recommends -y ca-certificates openssl && \
rm -rf /var/lib/apt/lists/*
COPY --from=adp-builder /usr/local/bin/agent-data-plane /usr/local/bin/agent-data-plane
# Expose DWARF/build-id symbols to Antithesis for symbolization (one-hop symlink to the unstripped binary).
RUN mkdir -p /symbols && ln -s /usr/local/bin/agent-data-plane /symbols/agent-data-plane
# main.rs requires the bootstrap config file to exist at the default path; ship a minimal standalone
# config as a fallback. The boot wrapper overwrites it with the per-replay config written by the
# `first_sample_config` workload command onto the shared `agent-config` volume.
# main.rs requires a config file at the default path. Ship a minimal standalone config as a
# fallback. The boot wrapper overwrites it with the per-timeline config that first_sample_config
# samples onto the shared `agent-config` volume.
COPY test/antithesis/deploy/adp/datadog.yaml /etc/datadog-agent/datadog.yaml
# Boot wrapper: waits for the drawn config sentinel, copies the config into place, then execs ADP.
# Boot wrapper waits for the config sentinel, copies the config into place, then execs ADP.
COPY --chmod=755 test/antithesis/deploy/adp/entrypoint.sh /entrypoint.sh
# ADP's control-plane secure API requires an IPC TLS cert (a single PEM holding both certificate and
# private key) that the Core Agent normally generates. In standalone mode there is no Core Agent, so
Expand All @@ -126,15 +124,14 @@ COPY --from=tools-builder /usr/local/bin/datadog-intake /usr/local/bin/datadog-i
ENTRYPOINT ["/usr/local/bin/datadog-intake"]

# ---------------------------------------------------------------------------
# Runtime: workload client (millstone load generator + test templates).
# Runtime: workload client (DogStatsD driver + test templates).
# ---------------------------------------------------------------------------
FROM ${APP_IMAGE} AS workload
ENV NO_COLOR=1
RUN test -d /usr/share/ca-certificates || ( \
apt-get update && \
apt-get install --no-install-recommends -y ca-certificates && \
rm -rf /var/lib/apt/lists/* )
COPY --from=tools-builder /usr/local/bin/millstone /usr/local/bin/millstone
# Antithesis setup-complete helper and test templates (helper files + the "main" template dir).
COPY --chmod=755 test/antithesis/deploy/workload/setup-complete.sh /opt/antithesis/setup-complete.sh
COPY test/antithesis/deploy/workload/test/ /opt/antithesis/test/
Expand Down
1 change: 0 additions & 1 deletion test/antithesis/deploy/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ services:
command: ["run"]
environment:
NO_COLOR: "1"
RUST_BACKTRACE: "1"
DD_API_KEY: "antithesis-test-api-key"
DD_DATA_PLANE_ENABLED: "true"
DD_DATA_PLANE_STANDALONE_MODE: "true"
Expand Down
2 changes: 2 additions & 0 deletions test/antithesis/harness/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ clap = { workspace = true, features = [
"std",
"usage",
] }
itoa = { workspace = true }
num-traits = { workspace = true }
rand = { workspace = true }
rand_distr = { workspace = true }
ryu = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
serde_yaml = { workspace = true }
Expand Down
10 changes: 5 additions & 5 deletions test/antithesis/harness/src/bin/first_sample_config/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,17 +64,17 @@ impl Distribution<DurationSeconds> for Probe {
#[derive(Debug, Clone, Copy, Serialize)]
#[serde(rename_all = "lowercase")]
pub(crate) enum LogLevel {
/// Warnings and above.
Warn,
/// Errors only.
/// Errors only — the quietest level that still logs.
Error,
/// No logs at all — the floor of the log-output budget.
Off,
}

impl Distribution<LogLevel> for StandardUniform {
fn sample<R: Rng + ?Sized>(&self, rng: &mut R) -> LogLevel {
match rng.random_range(0..2u8) {
0 => LogLevel::Warn,
_ => LogLevel::Error,
0 => LogLevel::Error,
_ => LogLevel::Off,
}
}
Comment thread
blt marked this conversation as resolved.
}
Expand Down
117 changes: 59 additions & 58 deletions test/antithesis/harness/src/bin/parallel_driver_send_dogstatsd.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,17 @@
//! Antithesis `parallel_driver_` test command: sends a batch of `DogStatsD` metrics to ADP.
//!
//! Draws a per-timeline cardinality regime (swarm biasing) and a batch size, then sends metrics over
//! UDS. The high-cardinality regime floods distinct aggregation contexts, targeting the
//! `rss-bounded-under-cardinality` property (ADP's memory limiter is disabled by default, so RSS can
//! grow without bound under sustained high cardinality).
//! Feral `DogStatsD` load generator: pick a batch size, then fire that many
//! sampled metric lines at the socket and exit. Antithesis runs many of these
//! in parallel to drive concurrency and push context limits.

use std::os::unix::net::UnixDatagram;
use std::path::{Path, PathBuf};
use std::thread::sleep;
use std::time::{Duration, Instant};

use antithesis_sdk::prelude::*;
use antithesis_sdk::random::AntithesisRng;
use anyhow::Context as _;
use antithesis_sdk::random::{random_choice, AntithesisRng};
use clap::Parser;
use rand::{rand_core::UnwrapErr, seq::IndexedRandom as _, RngExt as _};
use harness::payload::dogstatsd;
use rand::{rand_core::UnwrapErr, RngExt};
use serde_json::json;

#[derive(Debug, Parser)]
Expand All @@ -28,81 +25,85 @@ struct Config {
dogstatsd_socket: PathBuf,
}

#[derive(Clone, Copy, Debug)]
enum Cardinality {
Low,
Medium,
High,
/// Per-batch composition: 50% clean, 25% feral, 25% mixed.
#[derive(Clone, Copy)]
enum Batch {
Clean,
Feral,
Mixed,
}

fn main() -> anyhow::Result<()> {
antithesis_init();

let config = Config::try_parse()?;
let mut rng = UnwrapErr(AntithesisRng);
let regimes = [Cardinality::Low, Cardinality::Medium, Cardinality::High];
let regime = *regimes
.choose(&mut rng)
.context("cardinality regime choices must not be empty")?;
let regime_label = match regime {
Cardinality::Low => "low",
Cardinality::Medium => "medium",
Cardinality::High => "high",
};
let count: u64 = rng.random_range(50..=2000);

let socket = connect_with_retry(&config.dogstatsd_socket)?;
// Socket unavailable (ADP booting, or a fault). No-op exit, not a failure.
let Some(socket) = connect_with_retry(&config.dogstatsd_socket) else {
return Ok(());
};
Comment thread
blt marked this conversation as resolved.

let names = ["adp.test.foo", "adp.test.bar", "adp.test.balkajsldfkjasdlfkjasdfz"];
let metric_types = ["c", "g"];
let batch = match random_choice(&[Batch::Clean, Batch::Clean, Batch::Feral, Batch::Mixed]) {
Some(Batch::Feral) => Batch::Feral,
Some(Batch::Mixed) => Batch::Mixed,
_ => Batch::Clean,
};
let count = rng.random_range(0..=10_000u64);
let mut line: Vec<u8> = Vec::new();
let mut attempted = 0usize;
for i in 0..count {
let name = *names
.choose(&mut rng)
.context("metric name choices must not be empty")?;
let metric_type = *metric_types
.choose(&mut rng)
.context("metric type choices must not be empty")?;
let value: u64 = rng.random_range(0..=1000);
let tag = match regime {
Cardinality::Low => format!("host:h{}", rng.random_range(0..4)),
Cardinality::Medium => format!("host:h{}", rng.random_range(0..256)),
Cardinality::High => format!("uid:{i}-{}", rng.random::<u64>()),
for _ in 0..count {
let vibe = match batch {
Batch::Clean => dogstatsd::Vibe::Clean,
Batch::Feral => dogstatsd::Vibe::Feral,
Batch::Mixed => dogstatsd::sample_vibe(),
};
let line = format!("{name}:{value}|{metric_type}|#{tag}\n");
if socket.send(line.as_bytes()).is_ok() {
dogstatsd::send(&mut rng, &mut line, vibe);
if socket.send(&line).is_ok() {
attempted += 1;
}
}

assert_reachable!(
"workload sent a dogstatsd batch",
&json!({
"attempted": attempted,
"regime": regime_label,
"socket": config.dogstatsd_socket.display().to_string(),
})
"workload ran a dogstatsd batch",
&json!({ "attempted": attempted, "dogstatsd_socket": config.dogstatsd_socket.display().to_string() })
);
assert_sometimes!(
attempted > 0,
"workload delivered a dogstatsd line",
&json!({ "attempted": attempted })
);

// Confirm timelines sometimes drive a high-cardinality flood (the interesting case for memory).
assert_sometimes!(
matches!(regime, Cardinality::High),
"workload drove a high-cardinality dogstatsd flood",
attempted > 0 && matches!(batch, Batch::Clean),
"workload ran a fully clean batch",
&json!({ "attempted": attempted })
);
assert_sometimes!(
attempted > 0 && matches!(batch, Batch::Feral),
"workload ran a fully feral batch",
&json!({ "attempted": attempted })
);
assert_sometimes!(
attempted > 0 && matches!(batch, Batch::Mixed),
"workload ran a mixed batch",
&json!({ "attempted": attempted })
);

Ok(())
}

// Wait for ADP to bind the socket, intentionally naive.
fn connect_with_retry(path: &Path) -> anyhow::Result<UnixDatagram> {
/// Wait for ADP to bind the socket, intentionally naive.
fn connect_with_retry(path: &Path) -> Option<UnixDatagram> {
let deadline = Instant::now() + Duration::from_secs(30);
loop {
let socket = UnixDatagram::unbound()?;
match socket.connect(path) {
Ok(()) => return Ok(socket),
Err(_) if Instant::now() < deadline => sleep(Duration::from_millis(250)),
Err(e) => return Err(e).with_context(|| format!("ADP did not bind {} within 30s", path.display())),
if let Ok(socket) = UnixDatagram::unbound() {
if socket.connect(path).is_ok() {
return Some(socket);
}
}
if Instant::now() >= deadline {
return None;
}
sleep(Duration::from_millis(250));
}
}
1 change: 1 addition & 0 deletions test/antithesis/harness/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
//! Shared helpers for the Antithesis harness, used by the `src/bin/*` test
//! commands.

pub mod payload;
pub mod rand;
3 changes: 3 additions & 0 deletions test/antithesis/harness/src/payload.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
//! Payload generators for the protocols under test.

pub mod dogstatsd;
86 changes: 86 additions & 0 deletions test/antithesis/harness/src/payload/dogstatsd.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
//! `DogStatsD` payload generation.

// Here's the basic idea.
//
// Dogstatsd is three message types:
//
// * metric
// * event
// * service check
//
// # Metrics
//
// <NAME>:<VALUE>|<TYPE>|@<SAMPLE_RATE>|#<TAG>,<TAG>...|c:<CONTAINER>|T<TS>|e:<EXT>|card:<CARD>
//
// Required: <NAME>:<VALUE>|<TYPE>.
//
// * <NAME> := [^:|\n]+
// * <VALUE> := <NUMBER>(:<NUMBER>)* ':'-packed multi-value, non-set
// | [^|\n]+ raw string, set type
// * <NUMBER> := [+-]?(\d+\.?\d*|\.\d+)([eE][+-]?\d+)? | [+-]?(inf|infinity|nan)
// * <TYPE> := c|g|ms|h|s|d count gauge timer histogram set distribution
// * <SAMPLE_RATE> := @<NUMBER>
// * <TAG> := [^,|\n]+ conventionally <KEY>:<VALUE>, the ':' is not required
// * <CONTAINER> := c:[^|\n]+ e.g. ci-<id>, in-<inode>
// * <TS> := T\d+ unix seconds
// * <EXT> := e:[^|\n]+ e.g. it-,cn-,pu-
// * <CARD> := card:[^|\n]+ recognized: none|low|orchestrator|high
//
// # Events
//
// _e{<TITLE_LEN>,<TEXT_LEN>}:<TITLE>|<TEXT>|d:<TS>|h:<HOST>|k:<AGGKEY>|p:<PRIO>|s:<SRC>|t:<ALERT>|#<TAGS>
//
// Required: _e{<TITLE_LEN>,<TEXT_LEN>}:<TITLE>|<TEXT>. c: / e: / card: are valid here too.
//
// * <TITLE_LEN>,
// <TEXT_LEN> := \d+ byte length of TITLE / TEXT
// * <TITLE>,
// <TEXT> := [^\n]{LEN} length-delimited, so '|' and ':' are allowed; '\\n' -> newline
// * <TS> := d:\d+ unix seconds
// * <HOST> := h:[^|\n]+
// * <AGGKEY> := k:[^|\n]+
// * <PRIO> := p:[^|\n]+ recognized: normal|low (else default)
// * <SRC> := s:[^|\n]+
// * <ALERT> := t:[^|\n]+ recognized: error|warning|info|success (else default)
// * <TAGS> := #<TAG>(,<TAG>)*
//
// # Service checks
//
// _sc|<NAME>|<STATUS>|d:<TS>|h:<HOST>|#<TAG>,<TAG>...|m:<MESSAGE>
//
// Required: _sc|<NAME>|<STATUS>. c: / e: / card: are valid here too.
//
// * <NAME> := [^|\n]+
// * <STATUS> := [0-3] OK warning critical unknown
// * <TS> := d:\d+ unix seconds
// * <HOST> := h:[^|\n]+
// * <TAGS> := #<TAG>(,<TAG>)*
// * <MESSAGE> := m:[^|\n]+

use antithesis_sdk::random::random_choice;
use rand::Rng;

mod common;
mod events;
mod metrics;
mod service_checks;

pub use common::{sample_vibe, Vibe};

/// The three `DogStatsD` message types.
#[derive(Clone, Copy)]
enum Message {
Metric,
Event,
ServiceCheck,
}

/// Write one `DogStatsD` message of a random type to `buf` at the given vibe.
pub fn send<R: Rng + ?Sized>(rng: &mut R, buf: &mut Vec<u8>, vibe: Vibe) {
buf.clear();
match random_choice(&[Message::Metric, Message::Event, Message::ServiceCheck]) {
Some(Message::Event) => events::write(rng, buf, vibe),
Some(Message::ServiceCheck) => service_checks::write(rng, buf, vibe),
_ => metrics::write(rng, buf, vibe),
}
}
Loading
Loading