Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions lib/ddsketch/src/agent/sketch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -738,6 +738,36 @@ fn generate_bins(bins: &mut SmallVec<[Bin; 4]>, k: i16, n: u64) {
mod tests {
use super::*;

// BUG (RED) — antithesis-research property `ddsketch-no-nan-poison`.
//
// This test asserts the DESIRED invariant: a non-finite sample must not corrupt the sketch's
// `sum`/`avg`. It currently FAILS (red), demonstrating the bug: `adjust_basic_stats` does
// `self.sum += v * n` and the running `avg` update with no finiteness check, so a single NaN
// permanently poisons `sum` and `avg` (NaN is sticky), while `count`/`min`/`max` stay valid — a
// silent corruption. Finiteness is enforced only per-source (the DSD codec); a non-DSD producer
// (e.g. a `checks_ipc` Histogram NaN reaching the metrics encoder's `insert_n`) bypasses it. Make
// green by rejecting/sanitizing non-finite values at the sketch boundary. Do not delete.
#[test]
fn bug_nan_sample_poisons_sum_and_avg() {
let mut sketch = DDSketch::default();
sketch.insert(1.0);
assert_eq!(sketch.sum(), Some(1.0));

// A single NaN sample reaches the sketch boundary.
sketch.insert(f64::NAN);

// DESIRED: sum/avg remain finite (NaN rejected/sanitized at the boundary). Today they are
// NaN, so these assertions fail (red).
assert!(
sketch.sum().expect("non-empty").is_finite(),
"sum must stay finite after a non-finite sample (it is silently poisoned to NaN today)"
);
assert!(
sketch.avg().expect("non-empty").is_finite(),
"avg must stay finite after a non-finite sample (it is silently poisoned to NaN today)"
);
}

// Helper: build a SmallVec<[Bin; 4]> from (k, n) pairs. Assumes input is already sorted by k.
fn make_bins(pairs: &[(i16, u32)]) -> SmallVec<[Bin; 4]> {
pairs.iter().map(|&(k, n)| Bin { k, n }).collect()
Expand Down
64 changes: 64 additions & 0 deletions lib/saluki-components/src/sources/dogstatsd/replay/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,70 @@ mod tests {
assert!(reader.read_next().expect("clean EOF on truncation").is_none());
}

// BUG (RED) — antithesis-research property `replay-corruption-not-silent-eof`.
//
// This test asserts the DESIRED invariant: a corrupt/oversized length prefix mid-stream must be
// surfaced as an error, NOT silently treated as a clean end-of-stream. It currently FAILS (red),
// demonstrating the bug: `read_next` returns `Ok(None)` for any prefix that would overrun the
// buffer, so a single corrupt prefix silently truncates the entire remaining record stream —
// every well-formed record after it is dropped with no error and no diagnostic (false replay
// fidelity / silent data loss). Make green by surfacing an error (or otherwise distinguishing
// corruption from a clean trailer/EOF). Do not delete.
//
// Stream: [header][len=N][record1][corrupt oversized prefix][len=M][record2]. Today the reader
// yields record1, then returns Ok(None) at the corrupt prefix, never reaching the well-formed
// record2.
#[test]
fn bug_corrupt_length_prefix_silently_drops_following_records() {
let rec1 = UnixDogstatsdMsg {
payload: b"metric.a:1|c".to_vec(),
..Default::default()
}
.encode_to_vec();
let rec2 = UnixDogstatsdMsg {
payload: b"metric.b:2|c".to_vec(),
..Default::default()
}
.encode_to_vec();
assert!(!rec1.is_empty() && !rec2.is_empty());

let mut bytes = Vec::new();
// Valid capture header at file version 3 (VERSION_INDEX == 4).
let mut header = DATADOG_HEADER;
header[4] |= 3;
bytes.extend_from_slice(&header);
// record 1: 4-byte little-endian length prefix + protobuf body.
bytes.extend_from_slice(&(rec1.len() as u32).to_le_bytes());
bytes.extend_from_slice(&rec1);
// A corrupt, oversized length prefix mid-stream (claims far more bytes than remain).
bytes.extend_from_slice(&0xFFFF_FFFFu32.to_le_bytes());
// A perfectly well-formed record that follows -- this is what gets silently dropped.
bytes.extend_from_slice(&(rec2.len() as u32).to_le_bytes());
bytes.extend_from_slice(&rec2);

let path = unique_path("reader-corrupt-prefix");
fs::write(&path, &bytes).expect("write corrupt capture");

let mut reader = TrafficCaptureReader::from_path(&path).expect("reader should open");

let first = reader
.read_next()
.expect("record 1 reads without error")
.expect("record 1 is present");
assert_eq!(first.payload, b"metric.a:1|c");

// DESIRED: the corrupt/oversized length prefix is surfaced as an error rather than a clean
// end-of-stream, so the trailing well-formed record is not silently dropped. Today
// `read_next` returns `Ok(None)` here, so this assertion fails (red).
let second = reader.read_next();
let _ = fs::remove_file(&path);
assert!(
second.is_err(),
"a corrupt/oversized length prefix must surface an error, not a silent clean EOF that \
drops the following well-formed record"
);
}

#[test]
fn bad_header_is_rejected() {
let tmp = unique_path("reader-bad-header");
Expand Down
74 changes: 74 additions & 0 deletions lib/saluki-components/src/transforms/aggregate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1106,6 +1106,80 @@ mod tests {
assert_eq!(input_metrics[3].context(), flushed_metrics[1].context());
}

// BUG (RED) — antithesis-research property `aggregate-no-panic-any-window`.
//
// This test asserts the DESIRED invariant: a sub-second `aggregate_window_duration` must be
// handled without crashing. It currently FAILS (red), demonstrating the bug:
// `AggregationState::new` stores `bucket_width_secs = bucket_width.as_secs()`, which truncates any
// sub-second window (e.g. 500ms) to 0, and `align_to_bucket_start` then panics on `timestamp % 0`
// at the first insert (the flush path has the same hazard via `step_by(0)`). No config validation
// rejects sub-second windows. Make this green by validating/clamping the window (>= 1s) or
// supporting true sub-second bucketing. Do not delete; do not weaken to `#[should_panic]`.
#[tokio::test]
async fn bug_sub_second_aggregate_window_panics_on_insert() {
let mut state = AggregationState::new(
Duration::from_millis(500), // sub-second window -> bucket_width_secs == 0 (the defect)
100,
COUNTER_EXPIRE,
HistogramConfiguration::default(),
Telemetry::noop(),
);

// DESIRED: the insert is handled without panicking. Today this line panics inside
// `align_to_bucket_start` on `timestamp % 0`, so the test fails (red).
let accepted = state.insert(insert_ts(1), Metric::counter("metric1", 1.0));
assert!(
accepted,
"a sub-second aggregate window must be handled without panicking"
);
}

// BUG (RED) — antithesis-research property `aggregate-clock-skew-stable`.
//
// This test asserts the DESIRED invariant: a forward wall-clock jump must not flood — one idle
// counter should emit a bounded number of points regardless of the jump size. It currently FAILS
// (red), demonstrating the bug: bucket alignment uses the wall clock while the flush cadence uses
// a monotonic timer, and `flush` has no guard that `current_time >= last_flush`, so it builds
// `zero_value_buckets` over the entire `(align(last_flush)..current_time)` interval (O(jump) work
// and allocation) and floods each kept-alive counter with that many backfilled points. Make green
// by bounding zero-value emission / guarding against non-monotonic time. Do not delete.
#[tokio::test]
async fn bug_forward_clock_jump_floods_zero_value_points() {
const JUMP_WINDOWS: u64 = 1000;
// A bounded number of points we'd accept from one idle counter regardless of the jump size.
const MAX_REASONABLE_POINTS: usize = 8;
// Expiry large enough that the backfilled zero-value buckets are retained across the jump.
let big_expire = Some(Duration::from_secs(JUMP_WINDOWS * BUCKET_WIDTH_SECS * 2));
let mut state = AggregationState::new(
BUCKET_WIDTH,
100,
big_expire,
HistogramConfiguration::default(),
Telemetry::noop(),
);

// One counter, one real value; initial flush establishes `last_flush`.
assert!(state.insert(insert_ts(1), Metric::counter("metric1", 1.0)));
let _ = get_flushed_metrics(flush_ts(1), &mut state).await;

// Simulate a forward wall-clock jump of 1000 buckets, then flush.
let jumped_time = flush_ts(1) + JUMP_WINDOWS * BUCKET_WIDTH_SECS;
let flushed = get_flushed_metrics(jumped_time, &mut state).await;

assert_eq!(flushed.len(), 1, "the single idle counter should flush");
let point_count = match flushed[0].values() {
MetricValues::Counter(p) | MetricValues::Gauge(p) | MetricValues::Rate(p, _) => p.len(),
_ => panic!("unexpected flushed value type for an aggregated counter"),
};
// DESIRED: emission is bounded regardless of the jump. Today `point_count` is ~JUMP_WINDOWS,
// so this fails (red).
assert!(
point_count <= MAX_REASONABLE_POINTS,
"a forward clock jump flooded {point_count} points (expected <= {MAX_REASONABLE_POINTS}); \
aggregation must bound zero-value emission regardless of wall-clock jumps"
);
}

#[tokio::test]
async fn context_limit_with_zero_value_counters() {
// We test here to ensure that zero-value counters contribute to the context limit.
Expand Down
28 changes: 28 additions & 0 deletions lib/saluki-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -895,6 +895,34 @@ mod tests {
));
}

// antithesis-research property `config-stall-no-deadlock`.
//
// Why/how the bug happens: when dynamic configuration is enabled, `GenericConfiguration::ready()`
// awaits the first config snapshot via a bare `ready_rx.await` with NO internal timeout. In
// production, ADP startup blocks on `ready()` waiting for the Core Agent's authoritative config;
// if the Agent never sends it (slow, crashed, or a partitioned config stream), ADP hangs forever
// with no diagnostic deadline.
//
// This test asserts the DESIRED invariant — `ready()` must not block indefinitely when no snapshot
// arrives — so it FAILS today. We enable dynamic config, hold the sender open but silent (modeling
// "the Agent never sends config"), and require `ready()` to complete within a generous bound; it
// never does. Make it pass by giving `ready()` an internal timeout/deadline. No fix applied.
#[tokio::test]
async fn bug_config_ready_hangs_forever_without_snapshot() {
let (config, sender) = ConfigurationLoader::for_tests(None, None, true).await;
// Hold the sender open (do NOT drop it): an open-but-silent stream models "the Agent never
// sends config". A dropped sender would instead unblock ready() with an error.
let _sender = sender.expect("dynamic configuration should provide a sender");

// DESIRED: ready() resolves within a bound rather than blocking startup forever. Today it has
// no internal timeout, so the outer timeout fires and this assertion fails.
let result = tokio::time::timeout(std::time::Duration::from_millis(500), config.ready()).await;
assert!(
result.is_ok(),
"ready() hung with no config snapshot; startup must not block indefinitely (ready() needs an internal timeout/deadline)"
);
}

#[tokio::test]
async fn test_dynamic_configuration() {
let (cfg, sender) = ConfigurationLoader::for_tests(
Expand Down
40 changes: 40 additions & 0 deletions lib/saluki-context/src/resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -821,6 +821,46 @@ mod tests {
}
}

// antithesis-research property `rss-bounded-under-cardinality` (root cause shared with
// `interner-full-bounded`).
//
// Why/how the bug happens: ADP advertises "deterministic resource usage" / bounded memory, but
// `allow_heap_allocations` defaults to `true` (`ContextResolverBuilder::build` -> `unwrap_or(true)`).
// With that default, once the fixed-size string interner is full, context strings silently spill
// to the heap (`MetaString` heap allocation) instead of the interner, and `resolve` NEVER returns
// `None`. Nothing applies backpressure, so a high-cardinality flood grows memory without bound.
//
// This test asserts the DESIRED invariant — under the default config a full interner bounds
// resolution (refuses some contexts) — so it FAILS today (all 1000 resolve via heap fallback).
// Make it pass by bounding default-config resolution / applying backpressure when the interner is
// full. No fix applied.
#[test]
fn bug_default_heap_fallback_makes_context_resolution_unbounded() {
// Names long enough that they cannot be inlined and cannot fit a 1-byte interner, forcing
// either an interner entry or (when full) a heap allocation.
let long_name = |i: usize| format!("adp.unbounded.context.name.number.{i:08}");
let no_tags: &[&str] = &[];

// DEFAULT config: `for_tests()` uses a 1-byte interner with heap allocations allowed -- the
// production default for the heap-fallback flag -- so it models default behavior exactly.
let mut resolver = ContextResolverBuilder::for_tests().build();
let mut resolved = 0usize;
for i in 0..1000 {
let name = long_name(i);
if resolver.resolve(name.as_str(), no_tags, None).is_some() {
resolved += 1;
}
}

// DESIRED: the default config bounds memory by refusing some contexts once the 1-byte interner
// is full. Today all 1000 resolve (silent heap spill, no backpressure), so this fails.
assert!(
resolved < 1000,
"default config resolved all {resolved} contexts against a 1-byte interner -> unbounded \
heap growth; a full interner must apply backpressure (refuse) rather than spill without limit"
);
}

#[test]
fn basic() {
let mut resolver = ContextResolverBuilder::for_tests().build();
Expand Down
Loading