From 1beb445f061c5a4f62708c264f574fb9894453b5 Mon Sep 17 00:00:00 2001 From: "Brian L. Troutwine" Date: Fri, 29 May 2026 17:26:56 +0000 Subject: [PATCH] chore(agent-data-plane): failing repros for six discovered bugs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Summary Six TDD-style tests that each assert the behavior agent-data-plane *should* have and currently fail, demonstrating a real defect. No production code changes — these are the failing tests a fix would turn green. Root-cause notes for each are in `test/antithesis/scratchbook/bug-ledger.md`. - aggregate: a sub-second `aggregate_window_duration` truncates to a 0-second bucket and panics on `timestamp % 0` at the first insert. - aggregate: a forward wall-clock jump backfills zero-value points across the whole jump (O(jump) work and allocation), flooding output. - ddsketch: a single non-finite sample silently poisons `sum`/`avg` (no finiteness guard). - dogstatsd replay: a corrupt length prefix is read as a clean EOF, silently dropping the records after it. - context resolver: with the default heap fallback, a full interner never refuses, so resolution is unbounded under high cardinality. - config: `ready()` waits for the first dynamic snapshot with no timeout, so startup hangs forever if it never arrives. ## Change Type - [ ] Bug fix - [ ] New feature - [x] Non-functional (chore, refactoring, docs) - [ ] Performance ## How did you test this PR? `cargo nextest run` on the six tests — all six fail, each for its intended reason (panic / NaN / silent `Ok(None)` / point flood / unbounded resolution / `ready()` timeout). `cargo fmt --check` is clean. CI note: because these are intentionally-failing repros, the `unit-tests` jobs (`make test`) will be **red** until the underlying bugs are fixed. Decide how to land them — e.g. `#[ignore]` with a tracking issue per bug, or keep them red as known-failing demonstrations. ## References - Root-cause analysis and the full ledger: `test/antithesis/scratchbook/bug-ledger.md` (research PR). - The harness that can exercise some of these under fault injection: the harness PR in this stack. --- lib/ddsketch/src/agent/sketch.rs | 30 ++++++++ .../src/sources/dogstatsd/replay/reader.rs | 64 ++++++++++++++++ .../src/transforms/aggregate/mod.rs | 74 +++++++++++++++++++ lib/saluki-config/src/lib.rs | 28 +++++++ lib/saluki-context/src/resolver.rs | 40 ++++++++++ 5 files changed, 236 insertions(+) diff --git a/lib/ddsketch/src/agent/sketch.rs b/lib/ddsketch/src/agent/sketch.rs index 10d3e5e5b1b..eb142b8b9fa 100644 --- a/lib/ddsketch/src/agent/sketch.rs +++ b/lib/ddsketch/src/agent/sketch.rs @@ -738,6 +738,36 @@ fn generate_bins(bins: &mut SmallVec<[Bin; 4]>, k: i16, n: u64) { mod tests { use super::*; + // BUG (RED) — antithesis-research property `ddsketch-no-nan-poison`. + // + // This test asserts the DESIRED invariant: a non-finite sample must not corrupt the sketch's + // `sum`/`avg`. It currently FAILS (red), demonstrating the bug: `adjust_basic_stats` does + // `self.sum += v * n` and the running `avg` update with no finiteness check, so a single NaN + // permanently poisons `sum` and `avg` (NaN is sticky), while `count`/`min`/`max` stay valid — a + // silent corruption. Finiteness is enforced only per-source (the DSD codec); a non-DSD producer + // (e.g. a `checks_ipc` Histogram NaN reaching the metrics encoder's `insert_n`) bypasses it. Make + // green by rejecting/sanitizing non-finite values at the sketch boundary. Do not delete. + #[test] + fn bug_nan_sample_poisons_sum_and_avg() { + let mut sketch = DDSketch::default(); + sketch.insert(1.0); + assert_eq!(sketch.sum(), Some(1.0)); + + // A single NaN sample reaches the sketch boundary. + sketch.insert(f64::NAN); + + // DESIRED: sum/avg remain finite (NaN rejected/sanitized at the boundary). Today they are + // NaN, so these assertions fail (red). + assert!( + sketch.sum().expect("non-empty").is_finite(), + "sum must stay finite after a non-finite sample (it is silently poisoned to NaN today)" + ); + assert!( + sketch.avg().expect("non-empty").is_finite(), + "avg must stay finite after a non-finite sample (it is silently poisoned to NaN today)" + ); + } + // Helper: build a SmallVec<[Bin; 4]> from (k, n) pairs. Assumes input is already sorted by k. fn make_bins(pairs: &[(i16, u32)]) -> SmallVec<[Bin; 4]> { pairs.iter().map(|&(k, n)| Bin { k, n }).collect() diff --git a/lib/saluki-components/src/sources/dogstatsd/replay/reader.rs b/lib/saluki-components/src/sources/dogstatsd/replay/reader.rs index 1c8682bdf96..8edb160b7b6 100644 --- a/lib/saluki-components/src/sources/dogstatsd/replay/reader.rs +++ b/lib/saluki-components/src/sources/dogstatsd/replay/reader.rs @@ -256,6 +256,70 @@ mod tests { assert!(reader.read_next().expect("clean EOF on truncation").is_none()); } + // BUG (RED) — antithesis-research property `replay-corruption-not-silent-eof`. + // + // This test asserts the DESIRED invariant: a corrupt/oversized length prefix mid-stream must be + // surfaced as an error, NOT silently treated as a clean end-of-stream. It currently FAILS (red), + // demonstrating the bug: `read_next` returns `Ok(None)` for any prefix that would overrun the + // buffer, so a single corrupt prefix silently truncates the entire remaining record stream — + // every well-formed record after it is dropped with no error and no diagnostic (false replay + // fidelity / silent data loss). Make green by surfacing an error (or otherwise distinguishing + // corruption from a clean trailer/EOF). Do not delete. + // + // Stream: [header][len=N][record1][corrupt oversized prefix][len=M][record2]. Today the reader + // yields record1, then returns Ok(None) at the corrupt prefix, never reaching the well-formed + // record2. + #[test] + fn bug_corrupt_length_prefix_silently_drops_following_records() { + let rec1 = UnixDogstatsdMsg { + payload: b"metric.a:1|c".to_vec(), + ..Default::default() + } + .encode_to_vec(); + let rec2 = UnixDogstatsdMsg { + payload: b"metric.b:2|c".to_vec(), + ..Default::default() + } + .encode_to_vec(); + assert!(!rec1.is_empty() && !rec2.is_empty()); + + let mut bytes = Vec::new(); + // Valid capture header at file version 3 (VERSION_INDEX == 4). + let mut header = DATADOG_HEADER; + header[4] |= 3; + bytes.extend_from_slice(&header); + // record 1: 4-byte little-endian length prefix + protobuf body. + bytes.extend_from_slice(&(rec1.len() as u32).to_le_bytes()); + bytes.extend_from_slice(&rec1); + // A corrupt, oversized length prefix mid-stream (claims far more bytes than remain). + bytes.extend_from_slice(&0xFFFF_FFFFu32.to_le_bytes()); + // A perfectly well-formed record that follows -- this is what gets silently dropped. + bytes.extend_from_slice(&(rec2.len() as u32).to_le_bytes()); + bytes.extend_from_slice(&rec2); + + let path = unique_path("reader-corrupt-prefix"); + fs::write(&path, &bytes).expect("write corrupt capture"); + + let mut reader = TrafficCaptureReader::from_path(&path).expect("reader should open"); + + let first = reader + .read_next() + .expect("record 1 reads without error") + .expect("record 1 is present"); + assert_eq!(first.payload, b"metric.a:1|c"); + + // DESIRED: the corrupt/oversized length prefix is surfaced as an error rather than a clean + // end-of-stream, so the trailing well-formed record is not silently dropped. Today + // `read_next` returns `Ok(None)` here, so this assertion fails (red). + let second = reader.read_next(); + let _ = fs::remove_file(&path); + assert!( + second.is_err(), + "a corrupt/oversized length prefix must surface an error, not a silent clean EOF that \ + drops the following well-formed record" + ); + } + #[test] fn bad_header_is_rejected() { let tmp = unique_path("reader-bad-header"); diff --git a/lib/saluki-components/src/transforms/aggregate/mod.rs b/lib/saluki-components/src/transforms/aggregate/mod.rs index 22d3ad6b388..2f384d5e9a4 100644 --- a/lib/saluki-components/src/transforms/aggregate/mod.rs +++ b/lib/saluki-components/src/transforms/aggregate/mod.rs @@ -1106,6 +1106,80 @@ mod tests { assert_eq!(input_metrics[3].context(), flushed_metrics[1].context()); } + // BUG (RED) — antithesis-research property `aggregate-no-panic-any-window`. + // + // This test asserts the DESIRED invariant: a sub-second `aggregate_window_duration` must be + // handled without crashing. It currently FAILS (red), demonstrating the bug: + // `AggregationState::new` stores `bucket_width_secs = bucket_width.as_secs()`, which truncates any + // sub-second window (e.g. 500ms) to 0, and `align_to_bucket_start` then panics on `timestamp % 0` + // at the first insert (the flush path has the same hazard via `step_by(0)`). No config validation + // rejects sub-second windows. Make this green by validating/clamping the window (>= 1s) or + // supporting true sub-second bucketing. Do not delete; do not weaken to `#[should_panic]`. + #[tokio::test] + async fn bug_sub_second_aggregate_window_panics_on_insert() { + let mut state = AggregationState::new( + Duration::from_millis(500), // sub-second window -> bucket_width_secs == 0 (the defect) + 100, + COUNTER_EXPIRE, + HistogramConfiguration::default(), + Telemetry::noop(), + ); + + // DESIRED: the insert is handled without panicking. Today this line panics inside + // `align_to_bucket_start` on `timestamp % 0`, so the test fails (red). + let accepted = state.insert(insert_ts(1), Metric::counter("metric1", 1.0)); + assert!( + accepted, + "a sub-second aggregate window must be handled without panicking" + ); + } + + // BUG (RED) — antithesis-research property `aggregate-clock-skew-stable`. + // + // This test asserts the DESIRED invariant: a forward wall-clock jump must not flood — one idle + // counter should emit a bounded number of points regardless of the jump size. It currently FAILS + // (red), demonstrating the bug: bucket alignment uses the wall clock while the flush cadence uses + // a monotonic timer, and `flush` has no guard that `current_time >= last_flush`, so it builds + // `zero_value_buckets` over the entire `(align(last_flush)..current_time)` interval (O(jump) work + // and allocation) and floods each kept-alive counter with that many backfilled points. Make green + // by bounding zero-value emission / guarding against non-monotonic time. Do not delete. + #[tokio::test] + async fn bug_forward_clock_jump_floods_zero_value_points() { + const JUMP_WINDOWS: u64 = 1000; + // A bounded number of points we'd accept from one idle counter regardless of the jump size. + const MAX_REASONABLE_POINTS: usize = 8; + // Expiry large enough that the backfilled zero-value buckets are retained across the jump. + let big_expire = Some(Duration::from_secs(JUMP_WINDOWS * BUCKET_WIDTH_SECS * 2)); + let mut state = AggregationState::new( + BUCKET_WIDTH, + 100, + big_expire, + HistogramConfiguration::default(), + Telemetry::noop(), + ); + + // One counter, one real value; initial flush establishes `last_flush`. + assert!(state.insert(insert_ts(1), Metric::counter("metric1", 1.0))); + let _ = get_flushed_metrics(flush_ts(1), &mut state).await; + + // Simulate a forward wall-clock jump of 1000 buckets, then flush. + let jumped_time = flush_ts(1) + JUMP_WINDOWS * BUCKET_WIDTH_SECS; + let flushed = get_flushed_metrics(jumped_time, &mut state).await; + + assert_eq!(flushed.len(), 1, "the single idle counter should flush"); + let point_count = match flushed[0].values() { + MetricValues::Counter(p) | MetricValues::Gauge(p) | MetricValues::Rate(p, _) => p.len(), + _ => panic!("unexpected flushed value type for an aggregated counter"), + }; + // DESIRED: emission is bounded regardless of the jump. Today `point_count` is ~JUMP_WINDOWS, + // so this fails (red). + assert!( + point_count <= MAX_REASONABLE_POINTS, + "a forward clock jump flooded {point_count} points (expected <= {MAX_REASONABLE_POINTS}); \ + aggregation must bound zero-value emission regardless of wall-clock jumps" + ); + } + #[tokio::test] async fn context_limit_with_zero_value_counters() { // We test here to ensure that zero-value counters contribute to the context limit. diff --git a/lib/saluki-config/src/lib.rs b/lib/saluki-config/src/lib.rs index a0f4416b151..6982cc58192 100644 --- a/lib/saluki-config/src/lib.rs +++ b/lib/saluki-config/src/lib.rs @@ -895,6 +895,34 @@ mod tests { )); } + // antithesis-research property `config-stall-no-deadlock`. + // + // Why/how the bug happens: when dynamic configuration is enabled, `GenericConfiguration::ready()` + // awaits the first config snapshot via a bare `ready_rx.await` with NO internal timeout. In + // production, ADP startup blocks on `ready()` waiting for the Core Agent's authoritative config; + // if the Agent never sends it (slow, crashed, or a partitioned config stream), ADP hangs forever + // with no diagnostic deadline. + // + // This test asserts the DESIRED invariant — `ready()` must not block indefinitely when no snapshot + // arrives — so it FAILS today. We enable dynamic config, hold the sender open but silent (modeling + // "the Agent never sends config"), and require `ready()` to complete within a generous bound; it + // never does. Make it pass by giving `ready()` an internal timeout/deadline. No fix applied. + #[tokio::test] + async fn bug_config_ready_hangs_forever_without_snapshot() { + let (config, sender) = ConfigurationLoader::for_tests(None, None, true).await; + // Hold the sender open (do NOT drop it): an open-but-silent stream models "the Agent never + // sends config". A dropped sender would instead unblock ready() with an error. + let _sender = sender.expect("dynamic configuration should provide a sender"); + + // DESIRED: ready() resolves within a bound rather than blocking startup forever. Today it has + // no internal timeout, so the outer timeout fires and this assertion fails. + let result = tokio::time::timeout(std::time::Duration::from_millis(500), config.ready()).await; + assert!( + result.is_ok(), + "ready() hung with no config snapshot; startup must not block indefinitely (ready() needs an internal timeout/deadline)" + ); + } + #[tokio::test] async fn test_dynamic_configuration() { let (cfg, sender) = ConfigurationLoader::for_tests( diff --git a/lib/saluki-context/src/resolver.rs b/lib/saluki-context/src/resolver.rs index 68db83ce3bd..2a7ec7349cb 100644 --- a/lib/saluki-context/src/resolver.rs +++ b/lib/saluki-context/src/resolver.rs @@ -821,6 +821,46 @@ mod tests { } } + // antithesis-research property `rss-bounded-under-cardinality` (root cause shared with + // `interner-full-bounded`). + // + // Why/how the bug happens: ADP advertises "deterministic resource usage" / bounded memory, but + // `allow_heap_allocations` defaults to `true` (`ContextResolverBuilder::build` -> `unwrap_or(true)`). + // With that default, once the fixed-size string interner is full, context strings silently spill + // to the heap (`MetaString` heap allocation) instead of the interner, and `resolve` NEVER returns + // `None`. Nothing applies backpressure, so a high-cardinality flood grows memory without bound. + // + // This test asserts the DESIRED invariant — under the default config a full interner bounds + // resolution (refuses some contexts) — so it FAILS today (all 1000 resolve via heap fallback). + // Make it pass by bounding default-config resolution / applying backpressure when the interner is + // full. No fix applied. + #[test] + fn bug_default_heap_fallback_makes_context_resolution_unbounded() { + // Names long enough that they cannot be inlined and cannot fit a 1-byte interner, forcing + // either an interner entry or (when full) a heap allocation. + let long_name = |i: usize| format!("adp.unbounded.context.name.number.{i:08}"); + let no_tags: &[&str] = &[]; + + // DEFAULT config: `for_tests()` uses a 1-byte interner with heap allocations allowed -- the + // production default for the heap-fallback flag -- so it models default behavior exactly. + let mut resolver = ContextResolverBuilder::for_tests().build(); + let mut resolved = 0usize; + for i in 0..1000 { + let name = long_name(i); + if resolver.resolve(name.as_str(), no_tags, None).is_some() { + resolved += 1; + } + } + + // DESIRED: the default config bounds memory by refusing some contexts once the 1-byte interner + // is full. Today all 1000 resolve (silent heap spill, no backpressure), so this fails. + assert!( + resolved < 1000, + "default config resolved all {resolved} contexts against a 1-byte interner -> unbounded \ + heap growth; a full interner must apply backpressure (refuse) rather than spill without limit" + ); + } + #[test] fn basic() { let mut resolver = ContextResolverBuilder::for_tests().build();