diff --git a/lib/ddsketch/src/agent/bin.rs b/lib/ddsketch/src/agent/bin.rs index 25d3dd3cc35..27a6438f856 100644 --- a/lib/ddsketch/src/agent/bin.rs +++ b/lib/ddsketch/src/agent/bin.rs @@ -1,6 +1,6 @@ //! Sketch bin representation. -const MAX_BIN_WIDTH: u16 = u16::MAX; +const MAX_BIN_WIDTH: u32 = u32::MAX; /// A sketch bin. #[derive(Clone, Copy, Debug, Eq, PartialEq)] @@ -10,7 +10,7 @@ pub struct Bin { pub(crate) k: i16, /// The number of observations within the bin. - pub(crate) n: u16, + pub(crate) n: u32, } impl Bin { @@ -21,7 +21,7 @@ impl Bin { /// Returns the number of observations within the bin. pub fn count(&self) -> u32 { - self.n as u32 + self.n } #[allow(clippy::cast_possible_truncation)] @@ -33,8 +33,8 @@ impl Bin { } // SAFETY: We already know `next` is less than or equal to `MAX_BIN_WIDTH` if we got here, and `MAX_BIN_WIDTH` - // is u16, so next can't possibly be larger than a u16. - self.n = next as u16; + // is u32, so next can't possibly be larger than a u32. + self.n = next as u32; 0 } } diff --git a/lib/ddsketch/src/agent/sketch.rs b/lib/ddsketch/src/agent/sketch.rs index d52d3d433b2..9dc69830c0a 100644 --- a/lib/ddsketch/src/agent/sketch.rs +++ b/lib/ddsketch/src/agent/sketch.rs @@ -21,7 +21,7 @@ static SKETCH_CONFIG: Config = Config::new( DDSKETCH_CONF_NORM_MIN, DDSKETCH_CONF_NORM_BIAS, ); -const MAX_BIN_WIDTH: u16 = u16::MAX; +const MAX_BIN_WIDTH: u32 = u32::MAX; /// [DDSketch][ddsketch] implementation based on the [Datadog Agent][ddagent]. /// @@ -451,7 +451,7 @@ impl DDSketch { /// Used only for unit testing so that we can create a sketch with an exact layout, which allows testing around the /// resulting bins when feeding in specific values, as well as generating explicitly bad layouts for testing. #[allow(dead_code)] - pub(crate) fn insert_raw_bin(&mut self, k: i16, n: u16) { + pub(crate) fn insert_raw_bin(&mut self, k: i16, n: u32) { let v = SKETCH_CONFIG.bin_lower_bound(k); self.adjust_basic_stats(v, u64::from(n)); self.bins.push(Bin { k, n }); @@ -557,7 +557,7 @@ impl DDSketch { for bin in &self.bins { k.push(i32::from(bin.k)); - n.push(u32::from(bin.n)); + n.push(bin.n); } dogsketch.set_k(k); @@ -620,7 +620,6 @@ impl TryFrom for DDSketch { for (k, n) in k.into_iter().zip(n.into_iter()) { let k = i16::try_from(k).map_err(|_| "bin key overflows i16")?; - let n = u16::try_from(n).map_err(|_| "bin count overflows u16")?; sketch.bins.push(Bin { k, n }); } @@ -654,40 +653,52 @@ fn trim_left(bins: &mut SmallVec<[Bin; 4]>, bin_limit: u16) { // We won't ever support Vector running on anything other than a 32-bit platform and above, I imagine, so this // should always be safe. let bin_limit = bin_limit as usize; - if bin_limit == 0 || bins.len() < bin_limit { + if bin_limit == 0 || bins.len() <= bin_limit { return; } let num_to_remove = bins.len() - bin_limit; - let mut missing = 0; - let mut overflow = SmallVec::<[Bin; 4]>::new(); + let mut missing: u64 = 0; + // Sum all mass from the bins being removed. Per CollapsingLowestDenseStore in sketches-go, + // all removed mass collapses into the first kept bin (the new minimum index). We accumulate + // here without creating intermediate bins so that the overflow key is correct below. for bin in bins.iter().take(num_to_remove) { missing += u64::from(bin.n); - - if missing > u64::from(MAX_BIN_WIDTH) { - overflow.push(Bin { - k: bin.k, - n: MAX_BIN_WIDTH, - }); - - missing -= u64::from(MAX_BIN_WIDTH); - } } + // Fold the accumulated mass into the first kept bin, matching Go's `bins[newMinIndex] += n`. let bin_remove = &mut bins[num_to_remove]; + let first_kept_k = bin_remove.k; missing = bin_remove.increment(missing); + + // Any mass that overflows the first kept bin's u32 counter generates additional bins at + // first_kept_k — the same key as the first kept bin, not the keys of the removed bins. + let mut overflow = SmallVec::<[Bin; 4]>::new(); if missing > 0 { - generate_bins(&mut overflow, bin_remove.k, missing); + generate_bins(&mut overflow, first_kept_k, missing); } - let overflow_len = overflow.len(); let (_, bins_end) = bins.split_at(num_to_remove); + overflow.reserve(bins_end.len()); overflow.extend_from_slice(bins_end); - // I still don't yet understand how this works, since you'd think bin limit should be the overall limit of the - // number of bins, but we're allowing more than that.. :thinkies: - overflow.truncate(bin_limit + overflow_len); + // Cap at `bin_limit` by dropping from the front so we keep the suffix (higher keys in `bins_end`). + // + // This can make `sum(bin.n)` smaller than the sketch's logical `count` (inserts still update `count`, min/max, + // sum, avg). `quantile` ranks against `count` while walking bin masses, so results are approximate when those + // diverge — the same class of issue as any hard cap that drops bins without rewriting aggregate stats. + // + // Even though we fold all removed mass into first_kept_k above, `generate_bins` may require more than + // `bin_limit` bins for a single key when total weight is huge (u32 per bin), so "preserve all mass in-bounds" + // is not always achievable; a plain prefix drop keeps the cap and favors retaining the tail keys. + // + // As of April 2026, this is an intentional divergence from the Datadog Agent implementation, + // which does not truncate bins to stay under a limit. + if overflow.len() > bin_limit { + let drop_len = overflow.len() - bin_limit; + overflow.drain(0..drop_len); + } mem::swap(bins, &mut overflow); } @@ -695,15 +706,15 @@ fn trim_left(bins: &mut SmallVec<[Bin; 4]>, bin_limit: u16) { #[allow(clippy::cast_possible_truncation)] fn generate_bins(bins: &mut SmallVec<[Bin; 4]>, k: i16, n: u64) { if n < u64::from(MAX_BIN_WIDTH) { - // SAFETY: Cannot truncate `n`, as it's less than a u16 value. - bins.push(Bin { k, n: n as u16 }); + // SAFETY: Cannot truncate `n`, as it's less than a u32 value. + bins.push(Bin { k, n: n as u32 }); } else { let overflow = n % u64::from(MAX_BIN_WIDTH); if overflow != 0 { bins.push(Bin { k, - // SAFETY: Cannot truncate `overflow`, as it's modulo'd by a u16 value. - n: overflow as u16, + // SAFETY: Cannot truncate `overflow`, as it's modulo'd by a u32 value. + n: overflow as u32, }); } @@ -712,3 +723,115 @@ fn generate_bins(bins: &mut SmallVec<[Bin; 4]>, k: i16, n: u64) { } } } + +#[cfg(test)] +mod tests { + use super::*; + + // Helper: build a SmallVec<[Bin; 4]> from (k, n) pairs. Assumes input is already sorted by k. + fn make_bins(pairs: &[(i16, u32)]) -> SmallVec<[Bin; 4]> { + pairs.iter().map(|&(k, n)| Bin { k, n }).collect() + } + + // Helper: extract (k, n) pairs from a SmallVec for easy assertion. + fn to_pairs(bins: &SmallVec<[Bin; 4]>) -> Vec<(i16, u32)> { + bins.iter().map(|b| (b.k, b.n)).collect() + } + + /// Basic collapse: when bins exceed the limit, the mass from removed bins is merged + /// into the first kept bin (lowest surviving key). This mirrors the CollapsingLowestDenseStore + /// semantics from sketches-go: all bins with index < (maxIndex - limit + 1) collapse into + /// the bin at (maxIndex - limit + 1). + /// + /// Input: [(0,2), (1,3), (2,4), (3,5)] limit=2 → remove 2 bins + /// missing = n[0] + n[1] = 2 + 3 = 5 + /// first kept bin: k=2, n=4 → increment by 5 → n=9 + /// Result: [(2,9), (3,5)] + #[test] + fn trim_left_collapses_removed_mass_into_first_kept_bin() { + let mut bins = make_bins(&[(0, 2), (1, 3), (2, 4), (3, 5)]); + trim_left(&mut bins, 2); + assert_eq!(to_pairs(&bins), vec![(2, 9), (3, 5)]); + } + + /// Total count is preserved exactly when the collapse fits within a single u32 bin. + /// + /// Input: [(10,5), (20,3), (30,7)] limit=2 → remove 1 bin + /// missing = 5; first kept bin k=20, n=3 → n=8 + /// Result: [(20,8), (30,7)], total=15 == 5+3+7 + #[test] + fn trim_left_preserves_total_count_when_no_overflow() { + let mut bins = make_bins(&[(10, 5), (20, 3), (30, 7)]); + let total_before: u64 = bins.iter().map(|b| u64::from(b.n)).sum(); + trim_left(&mut bins, 2); + let total_after: u64 = bins.iter().map(|b| u64::from(b.n)).sum(); + assert_eq!(to_pairs(&bins), vec![(20, 8), (30, 7)]); + assert_eq!(total_before, total_after); + } + + /// With u32 bin counts, collapsed mass from multiple removed bins fits in a single bin + /// without saturation for typical weights, fully preserving the total count. + /// + /// Input: [(0,50000), (1,50000), (2,1)] limit=1 → remove 2 bins + /// missing = 100000; bins[2].increment(100000): 100001 < u32::MAX → n=100001, returns 0 + /// No overflow bins generated. Final: [(2,100001)], all mass preserved. + /// + /// With the old u16 layout, this same input would have saturated at 65535 and discarded + /// 34466 observations. u32 eliminates that loss for any per-bin count below ~4.3 billion. + #[test] + fn trim_left_preserves_exact_count_with_u32_bins() { + let mut bins = make_bins(&[(0, 50000), (1, 50000), (2, 1)]); + let total_before: u64 = bins.iter().map(|b| u64::from(b.n)).sum(); + trim_left(&mut bins, 1); + let total_after: u64 = bins.iter().map(|b| u64::from(b.n)).sum(); + assert_eq!(bins.len(), 1); + assert_eq!(bins[0].k, 2); + assert_eq!(bins[0].n, 100001); + assert_eq!(total_before, total_after, "all mass must be preserved with u32 bins"); + } + + /// When already at or under the limit, trim_left is a no-op. + #[test] + fn trim_left_no_op_when_within_limit() { + let original = make_bins(&[(5, 10), (6, 20)]); + let mut bins = original.clone(); + trim_left(&mut bins, 2); + assert_eq!(to_pairs(&bins), to_pairs(&original)); + trim_left(&mut bins, 3); + assert_eq!(to_pairs(&bins), to_pairs(&original)); + } + + /// Regression test for trim_left bin count with large per-sample weights. + /// + /// With the old u16 layout, a sample weight of ~260M would generate ceil(260M / 65535) = 3969 + /// bins per key, causing bin count explosion and an encoder panic. With u32, the same weight + /// fits in a single bin (260M < u32::MAX ≈ 4.3B), so one insert_n call produces exactly one + /// bin per key and the bin limit is trivially respected. + /// + /// This test inserts several values with a weight representative of what ADP receives when + /// clamping an incoming sample rate of 3e-9 to its minimum of 3.845e-9 (~260M per sample), + /// then asserts the bin count never exceeds DDSKETCH_CONF_BIN_LIMIT. + #[test] + fn trim_left_respects_bin_limit_with_large_weights() { + // Weight corresponding to ADP's minimum safe sample rate (1 / 3.845e-9 ≈ 260_078_024). + // With u32 bins, this fits in a single bin per key (260_078_024 < u32::MAX). + let weight: u64 = 260_078_024; + let bin_limit = usize::from(DDSKETCH_CONF_BIN_LIMIT); + + let mut sketch = DDSketch::default(); + + // Insert enough distinct values to repeatedly trigger trim_left. Ten values is more + // than sufficient; two already exceed the bin limit without the fix. + for i in 1..=10_i32 { + sketch.insert_n(f64::from(i), weight); + assert!( + sketch.bins().len() <= bin_limit, + "bin count {} exceeded limit {} after inserting {} value(s) at weight {}", + sketch.bins().len(), + bin_limit, + i, + weight, + ); + } + } +} diff --git a/lib/ddsketch/tests/one_thousand_batched_points_ddsketch.rs b/lib/ddsketch/tests/one_thousand_batched_points_ddsketch.rs index 3c95646def4..c160b3ec406 100644 --- a/lib/ddsketch/tests/one_thousand_batched_points_ddsketch.rs +++ b/lib/ddsketch/tests/one_thousand_batched_points_ddsketch.rs @@ -20,9 +20,9 @@ fn test_one_thousand_single_points_ddsketch() { let stats = dhat::HeapStats::get(); dhat::assert_eq!(stats.total_blocks, 21); - dhat::assert_eq!(stats.total_bytes, 8080); + dhat::assert_eq!(stats.total_bytes, 10096); dhat::assert_eq!(stats.max_blocks, 3); - dhat::assert_eq!(stats.max_bytes, 3072); + dhat::assert_eq!(stats.max_bytes, 4096); dhat::assert_eq!(stats.curr_blocks, 0); dhat::assert_eq!(stats.curr_bytes, 0); } diff --git a/lib/ddsketch/tests/one_thousand_single_points_ddsketch.rs b/lib/ddsketch/tests/one_thousand_single_points_ddsketch.rs index 4f84a5125de..ec39e9f11e7 100644 --- a/lib/ddsketch/tests/one_thousand_single_points_ddsketch.rs +++ b/lib/ddsketch/tests/one_thousand_single_points_ddsketch.rs @@ -20,9 +20,9 @@ fn test_one_thousand_single_points_ddsketch() { let stats = dhat::HeapStats::get(); dhat::assert_eq!(stats.total_blocks, 20); - dhat::assert_eq!(stats.total_bytes, 6080); + dhat::assert_eq!(stats.total_bytes, 8096); dhat::assert_eq!(stats.max_blocks, 3); - dhat::assert_eq!(stats.max_bytes, 3072); + dhat::assert_eq!(stats.max_bytes, 4096); dhat::assert_eq!(stats.curr_blocks, 0); dhat::assert_eq!(stats.curr_bytes, 0); } diff --git a/lib/ddsketch/tests/ten_batched_points_ddsketch.rs b/lib/ddsketch/tests/ten_batched_points_ddsketch.rs index faa8b89ffc0..410e6e1e35c 100644 --- a/lib/ddsketch/tests/ten_batched_points_ddsketch.rs +++ b/lib/ddsketch/tests/ten_batched_points_ddsketch.rs @@ -20,9 +20,9 @@ fn test_ten_batched_points_ddsketch() { let stats = dhat::HeapStats::get(); dhat::assert_eq!(stats.total_blocks, 9); - dhat::assert_eq!(stats.total_bytes, 340); + dhat::assert_eq!(stats.total_bytes, 436); dhat::assert_eq!(stats.max_blocks, 3); - dhat::assert_eq!(stats.max_bytes, 192); + dhat::assert_eq!(stats.max_bytes, 256); dhat::assert_eq!(stats.curr_blocks, 0); dhat::assert_eq!(stats.curr_bytes, 0); } diff --git a/lib/ddsketch/tests/ten_single_points_ddsketch.rs b/lib/ddsketch/tests/ten_single_points_ddsketch.rs index 705be5309e3..ae3ad0b1434 100644 --- a/lib/ddsketch/tests/ten_single_points_ddsketch.rs +++ b/lib/ddsketch/tests/ten_single_points_ddsketch.rs @@ -20,9 +20,9 @@ fn test_ten_single_points_ddsketch() { let stats = dhat::HeapStats::get(); dhat::assert_eq!(stats.total_blocks, 8); - dhat::assert_eq!(stats.total_bytes, 320); + dhat::assert_eq!(stats.total_bytes, 416); dhat::assert_eq!(stats.max_blocks, 3); - dhat::assert_eq!(stats.max_bytes, 192); + dhat::assert_eq!(stats.max_bytes, 256); dhat::assert_eq!(stats.curr_blocks, 0); dhat::assert_eq!(stats.curr_bytes, 0); } diff --git a/lib/saluki-components/src/common/datadog/request_builder.rs b/lib/saluki-components/src/common/datadog/request_builder.rs index 1e79e190ba0..c3852d4b3a1 100644 --- a/lib/saluki-components/src/common/datadog/request_builder.rs +++ b/lib/saluki-components/src/common/datadog/request_builder.rs @@ -104,6 +104,15 @@ where encoded_len: usize, uncompressed_len_limit: usize, }, + #[snafu(display( + "input encoded size ({} bytes) exceeds compressed payload limit ({} bytes) and can never fit", + encoded_len, + compressed_len_limit + ))] + InputExceedsCompressedSizeLimit { + encoded_len: usize, + compressed_len_limit: usize, + }, #[snafu(display("input was invalid for request builder: {:?}'", input))] InvalidInput { input: E::Input }, #[snafu(display("failed to encode/write payload: {}", source))] @@ -333,6 +342,16 @@ where }); } + // If the input's encoded size already exceeds the compressed payload limit, it can never fit regardless of + // what else is in the payload, so there is no point in asking the caller to flush and retry. Return an error + // so the input is dropped rather than looping forever. + if self.scratch_buf.len() > self.compressed_len_limit { + return Err(RequestBuilderError::InputExceedsCompressedSizeLimit { + encoded_len: self.scratch_buf.len(), + compressed_len_limit: self.compressed_len_limit, + }); + } + // If the input can't fit into the current request payload based on the uncompressed size limit, or isn't likely // to fit into the current request payload based on the estimated compressed size limit, then return it to the // caller: this indicates that a flush must happen before trying to encode the same input again. @@ -947,6 +966,40 @@ mod tests { // size limits. } + #[tokio::test] + async fn input_exceeds_compressed_size_limit() { + // Regression test: when a single input's encoded size exceeds the compressed payload limit, the request + // builder previously returned Ok(Some(input)) (signalling "flush and retry"), but since the builder + // was empty there was nothing to flush. The caller (run_request_builder in the metrics encoder) would + // then panic because flush() returned an empty vec on a supposedly non-empty builder. + // + // The fix returns Err(InputExceedsCompressedSizeLimit) instead, letting the caller drop the metric + // and continue rather than entering an unresolvable flush loop. + let compressed_limit = 64; + let encoder = TestEncoder::new(compressed_limit, usize::MAX, "/submit"); + let mut request_builder = create_no_compression_request_builder(encoder).await; + + // This input encodes to more bytes than the compressed limit, so it can never fit. + let oversized_input = "x".repeat(compressed_limit + 1); + + match request_builder.encode(oversized_input).await { + Err(RequestBuilderError::InputExceedsCompressedSizeLimit { + encoded_len, + compressed_len_limit, + }) => { + assert_eq!(encoded_len, compressed_limit + 1); + assert_eq!(compressed_len_limit, compressed_limit); + } + other => panic!("expected InputExceedsCompressedSizeLimit, got: {:?}", other), + } + + // The builder should still be empty and usable after the error. + assert!(request_builder.flush().await.is_empty()); + + let small_input = "hello".to_string(); + assert_eq!(None, request_builder.encode(small_input).await.unwrap()); + } + #[tokio::test] async fn uncompressed_size_limit_too_small() { // Make sure that we can't build a request builder with an uncompressed size limit that is smaller than the