Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions lib/ddsketch/src/agent/bin.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Sketch bin representation.

const MAX_BIN_WIDTH: u16 = u16::MAX;
const MAX_BIN_WIDTH: u32 = u32::MAX;

/// A sketch bin.
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
Expand All @@ -10,7 +10,7 @@ pub struct Bin {
pub(crate) k: i16,

/// The number of observations within the bin.
pub(crate) n: u16,
pub(crate) n: u32,
}

impl Bin {
Expand All @@ -21,7 +21,7 @@ impl Bin {

/// Returns the number of observations within the bin.
pub fn count(&self) -> u32 {
self.n as u32
self.n
}

#[allow(clippy::cast_possible_truncation)]
Expand All @@ -33,8 +33,8 @@ impl Bin {
}

// SAFETY: We already know `next` is less than or equal to `MAX_BIN_WIDTH` if we got here, and `MAX_BIN_WIDTH`
// is u16, so next can't possibly be larger than a u16.
self.n = next as u16;
// is u32, so next can't possibly be larger than a u32.
self.n = next as u32;
0
}
}
173 changes: 148 additions & 25 deletions lib/ddsketch/src/agent/sketch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ static SKETCH_CONFIG: Config = Config::new(
DDSKETCH_CONF_NORM_MIN,
DDSKETCH_CONF_NORM_BIAS,
);
const MAX_BIN_WIDTH: u16 = u16::MAX;
const MAX_BIN_WIDTH: u32 = u32::MAX;

/// [DDSketch][ddsketch] implementation based on the [Datadog Agent][ddagent].
///
Expand Down Expand Up @@ -451,7 +451,7 @@ impl DDSketch {
/// Used only for unit testing so that we can create a sketch with an exact layout, which allows testing around the
/// resulting bins when feeding in specific values, as well as generating explicitly bad layouts for testing.
#[allow(dead_code)]
pub(crate) fn insert_raw_bin(&mut self, k: i16, n: u16) {
pub(crate) fn insert_raw_bin(&mut self, k: i16, n: u32) {
let v = SKETCH_CONFIG.bin_lower_bound(k);
self.adjust_basic_stats(v, u64::from(n));
self.bins.push(Bin { k, n });
Expand Down Expand Up @@ -557,7 +557,7 @@ impl DDSketch {

for bin in &self.bins {
k.push(i32::from(bin.k));
n.push(u32::from(bin.n));
n.push(bin.n);
}

dogsketch.set_k(k);
Expand Down Expand Up @@ -620,7 +620,6 @@ impl TryFrom<Dogsketch> for DDSketch {

for (k, n) in k.into_iter().zip(n.into_iter()) {
let k = i16::try_from(k).map_err(|_| "bin key overflows i16")?;
let n = u16::try_from(n).map_err(|_| "bin count overflows u16")?;

sketch.bins.push(Bin { k, n });
}
Expand Down Expand Up @@ -654,56 +653,68 @@ fn trim_left(bins: &mut SmallVec<[Bin; 4]>, bin_limit: u16) {
// We won't ever support Vector running on anything other than a 32-bit platform and above, I imagine, so this
// should always be safe.
let bin_limit = bin_limit as usize;
if bin_limit == 0 || bins.len() < bin_limit {
if bin_limit == 0 || bins.len() <= bin_limit {
return;
}

let num_to_remove = bins.len() - bin_limit;
let mut missing = 0;
let mut overflow = SmallVec::<[Bin; 4]>::new();
let mut missing: u64 = 0;

// Sum all mass from the bins being removed. Per CollapsingLowestDenseStore in sketches-go,
// all removed mass collapses into the first kept bin (the new minimum index). We accumulate
// here without creating intermediate bins so that the overflow key is correct below.
for bin in bins.iter().take(num_to_remove) {
missing += u64::from(bin.n);

if missing > u64::from(MAX_BIN_WIDTH) {
overflow.push(Bin {
k: bin.k,
n: MAX_BIN_WIDTH,
});

missing -= u64::from(MAX_BIN_WIDTH);
}
}

// Fold the accumulated mass into the first kept bin, matching Go's `bins[newMinIndex] += n`.
let bin_remove = &mut bins[num_to_remove];
let first_kept_k = bin_remove.k;
missing = bin_remove.increment(missing);

// Any mass that overflows the first kept bin's u32 counter generates additional bins at
// first_kept_k — the same key as the first kept bin, not the keys of the removed bins.
let mut overflow = SmallVec::<[Bin; 4]>::new();
if missing > 0 {
generate_bins(&mut overflow, bin_remove.k, missing);
generate_bins(&mut overflow, first_kept_k, missing);
}

let overflow_len = overflow.len();
let (_, bins_end) = bins.split_at(num_to_remove);
overflow.reserve(bins_end.len());
overflow.extend_from_slice(bins_end);

// I still don't yet understand how this works, since you'd think bin limit should be the overall limit of the
// number of bins, but we're allowing more than that.. :thinkies:
overflow.truncate(bin_limit + overflow_len);
// Cap at `bin_limit` by dropping from the front so we keep the suffix (higher keys in `bins_end`).
//
// This can make `sum(bin.n)` smaller than the sketch's logical `count` (inserts still update `count`, min/max,
// sum, avg). `quantile` ranks against `count` while walking bin masses, so results are approximate when those
// diverge — the same class of issue as any hard cap that drops bins without rewriting aggregate stats.
//
// Even though we fold all removed mass into first_kept_k above, `generate_bins` may require more than
// `bin_limit` bins for a single key when total weight is huge (u32 per bin), so "preserve all mass in-bounds"
// is not always achievable; a plain prefix drop keeps the cap and favors retaining the tail keys.
//
// As of April 2026, this is an intentional divergence from the Datadog Agent implementation,
// which does not truncate bins to stay under a limit.
if overflow.len() > bin_limit {
let drop_len = overflow.len() - bin_limit;
overflow.drain(0..drop_len);
}

mem::swap(bins, &mut overflow);
}

#[allow(clippy::cast_possible_truncation)]
fn generate_bins(bins: &mut SmallVec<[Bin; 4]>, k: i16, n: u64) {
if n < u64::from(MAX_BIN_WIDTH) {
// SAFETY: Cannot truncate `n`, as it's less than a u16 value.
bins.push(Bin { k, n: n as u16 });
// SAFETY: Cannot truncate `n`, as it's less than a u32 value.
bins.push(Bin { k, n: n as u32 });
} else {
let overflow = n % u64::from(MAX_BIN_WIDTH);
if overflow != 0 {
bins.push(Bin {
k,
// SAFETY: Cannot truncate `overflow`, as it's modulo'd by a u16 value.
n: overflow as u16,
// SAFETY: Cannot truncate `overflow`, as it's modulo'd by a u32 value.
n: overflow as u32,
});
}

Expand All @@ -712,3 +723,115 @@ fn generate_bins(bins: &mut SmallVec<[Bin; 4]>, k: i16, n: u64) {
}
}
}

#[cfg(test)]
mod tests {
use super::*;

// Helper: build a SmallVec<[Bin; 4]> from (k, n) pairs. Assumes input is already sorted by k.
fn make_bins(pairs: &[(i16, u32)]) -> SmallVec<[Bin; 4]> {
pairs.iter().map(|&(k, n)| Bin { k, n }).collect()
}

// Helper: extract (k, n) pairs from a SmallVec for easy assertion.
fn to_pairs(bins: &SmallVec<[Bin; 4]>) -> Vec<(i16, u32)> {
bins.iter().map(|b| (b.k, b.n)).collect()
}

/// Basic collapse: when bins exceed the limit, the mass from removed bins is merged
/// into the first kept bin (lowest surviving key). This mirrors the CollapsingLowestDenseStore
/// semantics from sketches-go: all bins with index < (maxIndex - limit + 1) collapse into
/// the bin at (maxIndex - limit + 1).
///
/// Input: [(0,2), (1,3), (2,4), (3,5)] limit=2 → remove 2 bins
/// missing = n[0] + n[1] = 2 + 3 = 5
/// first kept bin: k=2, n=4 → increment by 5 → n=9
/// Result: [(2,9), (3,5)]
#[test]
fn trim_left_collapses_removed_mass_into_first_kept_bin() {
let mut bins = make_bins(&[(0, 2), (1, 3), (2, 4), (3, 5)]);
trim_left(&mut bins, 2);
assert_eq!(to_pairs(&bins), vec![(2, 9), (3, 5)]);
}

/// Total count is preserved exactly when the collapse fits within a single u32 bin.
///
/// Input: [(10,5), (20,3), (30,7)] limit=2 → remove 1 bin
/// missing = 5; first kept bin k=20, n=3 → n=8
/// Result: [(20,8), (30,7)], total=15 == 5+3+7
#[test]
fn trim_left_preserves_total_count_when_no_overflow() {
let mut bins = make_bins(&[(10, 5), (20, 3), (30, 7)]);
let total_before: u64 = bins.iter().map(|b| u64::from(b.n)).sum();
trim_left(&mut bins, 2);
let total_after: u64 = bins.iter().map(|b| u64::from(b.n)).sum();
assert_eq!(to_pairs(&bins), vec![(20, 8), (30, 7)]);
assert_eq!(total_before, total_after);
}

/// With u32 bin counts, collapsed mass from multiple removed bins fits in a single bin
/// without saturation for typical weights, fully preserving the total count.
///
/// Input: [(0,50000), (1,50000), (2,1)] limit=1 → remove 2 bins
/// missing = 100000; bins[2].increment(100000): 100001 < u32::MAX → n=100001, returns 0
/// No overflow bins generated. Final: [(2,100001)], all mass preserved.
///
/// With the old u16 layout, this same input would have saturated at 65535 and discarded
/// 34466 observations. u32 eliminates that loss for any per-bin count below ~4.3 billion.
#[test]
fn trim_left_preserves_exact_count_with_u32_bins() {
let mut bins = make_bins(&[(0, 50000), (1, 50000), (2, 1)]);
let total_before: u64 = bins.iter().map(|b| u64::from(b.n)).sum();
trim_left(&mut bins, 1);
let total_after: u64 = bins.iter().map(|b| u64::from(b.n)).sum();
assert_eq!(bins.len(), 1);
assert_eq!(bins[0].k, 2);
assert_eq!(bins[0].n, 100001);
assert_eq!(total_before, total_after, "all mass must be preserved with u32 bins");
}

/// When already at or under the limit, trim_left is a no-op.
#[test]
fn trim_left_no_op_when_within_limit() {
let original = make_bins(&[(5, 10), (6, 20)]);
let mut bins = original.clone();
trim_left(&mut bins, 2);
assert_eq!(to_pairs(&bins), to_pairs(&original));
trim_left(&mut bins, 3);
assert_eq!(to_pairs(&bins), to_pairs(&original));
}

/// Regression test for trim_left bin count with large per-sample weights.
///
/// With the old u16 layout, a sample weight of ~260M would generate ceil(260M / 65535) = 3969
/// bins per key, causing bin count explosion and an encoder panic. With u32, the same weight
/// fits in a single bin (260M < u32::MAX ≈ 4.3B), so one insert_n call produces exactly one
/// bin per key and the bin limit is trivially respected.
///
/// This test inserts several values with a weight representative of what ADP receives when
/// clamping an incoming sample rate of 3e-9 to its minimum of 3.845e-9 (~260M per sample),
/// then asserts the bin count never exceeds DDSKETCH_CONF_BIN_LIMIT.
#[test]
fn trim_left_respects_bin_limit_with_large_weights() {
// Weight corresponding to ADP's minimum safe sample rate (1 / 3.845e-9 ≈ 260_078_024).
// With u32 bins, this fits in a single bin per key (260_078_024 < u32::MAX).
let weight: u64 = 260_078_024;
let bin_limit = usize::from(DDSKETCH_CONF_BIN_LIMIT);

let mut sketch = DDSketch::default();

// Insert enough distinct values to repeatedly trigger trim_left. Ten values is more
// than sufficient; two already exceed the bin limit without the fix.
for i in 1..=10_i32 {
sketch.insert_n(f64::from(i), weight);
assert!(
sketch.bins().len() <= bin_limit,
"bin count {} exceeded limit {} after inserting {} value(s) at weight {}",
sketch.bins().len(),
bin_limit,
i,
weight,
);
}
}
}
4 changes: 2 additions & 2 deletions lib/ddsketch/tests/one_thousand_batched_points_ddsketch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ fn test_one_thousand_single_points_ddsketch() {
let stats = dhat::HeapStats::get();

dhat::assert_eq!(stats.total_blocks, 21);
dhat::assert_eq!(stats.total_bytes, 8080);
dhat::assert_eq!(stats.total_bytes, 10096);
dhat::assert_eq!(stats.max_blocks, 3);
dhat::assert_eq!(stats.max_bytes, 3072);
dhat::assert_eq!(stats.max_bytes, 4096);
dhat::assert_eq!(stats.curr_blocks, 0);
dhat::assert_eq!(stats.curr_bytes, 0);
}
4 changes: 2 additions & 2 deletions lib/ddsketch/tests/one_thousand_single_points_ddsketch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ fn test_one_thousand_single_points_ddsketch() {
let stats = dhat::HeapStats::get();

dhat::assert_eq!(stats.total_blocks, 20);
dhat::assert_eq!(stats.total_bytes, 6080);
dhat::assert_eq!(stats.total_bytes, 8096);
dhat::assert_eq!(stats.max_blocks, 3);
dhat::assert_eq!(stats.max_bytes, 3072);
dhat::assert_eq!(stats.max_bytes, 4096);
dhat::assert_eq!(stats.curr_blocks, 0);
dhat::assert_eq!(stats.curr_bytes, 0);
}
4 changes: 2 additions & 2 deletions lib/ddsketch/tests/ten_batched_points_ddsketch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ fn test_ten_batched_points_ddsketch() {
let stats = dhat::HeapStats::get();

dhat::assert_eq!(stats.total_blocks, 9);
dhat::assert_eq!(stats.total_bytes, 340);
dhat::assert_eq!(stats.total_bytes, 436);
dhat::assert_eq!(stats.max_blocks, 3);
dhat::assert_eq!(stats.max_bytes, 192);
dhat::assert_eq!(stats.max_bytes, 256);
dhat::assert_eq!(stats.curr_blocks, 0);
dhat::assert_eq!(stats.curr_bytes, 0);
}
4 changes: 2 additions & 2 deletions lib/ddsketch/tests/ten_single_points_ddsketch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ fn test_ten_single_points_ddsketch() {
let stats = dhat::HeapStats::get();

dhat::assert_eq!(stats.total_blocks, 8);
dhat::assert_eq!(stats.total_bytes, 320);
dhat::assert_eq!(stats.total_bytes, 416);
dhat::assert_eq!(stats.max_blocks, 3);
dhat::assert_eq!(stats.max_bytes, 192);
dhat::assert_eq!(stats.max_bytes, 256);
dhat::assert_eq!(stats.curr_blocks, 0);
dhat::assert_eq!(stats.curr_bytes, 0);
}
53 changes: 53 additions & 0 deletions lib/saluki-components/src/common/datadog/request_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,15 @@ where
encoded_len: usize,
uncompressed_len_limit: usize,
},
#[snafu(display(
"input encoded size ({} bytes) exceeds compressed payload limit ({} bytes) and can never fit",
encoded_len,
compressed_len_limit
))]
InputExceedsCompressedSizeLimit {
encoded_len: usize,
compressed_len_limit: usize,
},
#[snafu(display("input was invalid for request builder: {:?}'", input))]
InvalidInput { input: E::Input },
#[snafu(display("failed to encode/write payload: {}", source))]
Expand Down Expand Up @@ -333,6 +342,16 @@ where
});
}

// If the input's encoded size already exceeds the compressed payload limit, it can never fit regardless of
// what else is in the payload, so there is no point in asking the caller to flush and retry. Return an error
// so the input is dropped rather than looping forever.
if self.scratch_buf.len() > self.compressed_len_limit {
return Err(RequestBuilderError::InputExceedsCompressedSizeLimit {
encoded_len: self.scratch_buf.len(),
compressed_len_limit: self.compressed_len_limit,
});
}
Comment on lines +345 to +353
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This actually... isn't right, now that I think about it.

The whole thing is that we really can't know if we've exceed the compressed size limit or not until we actually attempt compressing the request payload. I think what's going on, more likely than not, is that we're triggering the "can't encode this metric" pathway (Ok(false)) because CompressionEstimator::would_write_exceed_threshold returns true: naively, it will end up checking if encoded_len is greater than self.compressed_len_limit, and return true if so. That's why your "fix" appears to work, because it's preventing us from reaching that faulty logic.

I think we would actually just want to remove the conditional I mentioned from CompressionEstimator::would_write_exceed_threshold since, realistically, it shouldn't be able to estimate overflow without any compressed data have been written out yet. I don't know why I added that check initially but it's feeling wrong in hindsight.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a further thought, any sort of pathological metric like this would cause problems because even if we allow it in, and then detect an oversized payload during flush and so we split and try to do the two separate halves... the half with the pathological metric will still fail to produce a compressed payload that is within the limit.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll take another look, I'm wondering whether we actually need to bail this out ahead of time or whether we could instead just not panic if we have a buffer with size 1 and still can't flush

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also curious whether we expect any legit payloads to ever actually be this big (>3MB)?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixing the compression estimator is definitely something we should do: its purpose is to act purely as an optimization hint to flush before we grow payloads so large that they need to be split... but in this case, it's causing a logical error (instructing the caller to flush when there's no actual payload) so that's just straight-up wrong.

If we do that, then what will happen is that we'll keep letting encoded metrics through until the compressor finally does an intermediate flush (because it filled its own internal buffers) and then we'll quickly realize whether or not the current compressed size is over the limit or not... and if not, then the actual estimator logic will kick in. Finally, the check during flush protects us from oversized payloads.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also curious whether we expect any legit payloads to ever actually be this big (>3MB)?

Well, in the case of our weird Go tracer behavior reporting allocation statistics with crazy high sample rates... yes? That's uncompressed, though. All of those nearly identical bins should then compress down fairly well.

With the v3 intake protocol (#1175), they'll compress down really well because of the delta encoding that we do.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those don't get so large once the trimming is actually working though, right? Shouldn't they be in the 10s of KBs?


// If the input can't fit into the current request payload based on the uncompressed size limit, or isn't likely
// to fit into the current request payload based on the estimated compressed size limit, then return it to the
// caller: this indicates that a flush must happen before trying to encode the same input again.
Expand Down Expand Up @@ -947,6 +966,40 @@ mod tests {
// size limits.
}

#[tokio::test]
async fn input_exceeds_compressed_size_limit() {
// Regression test: when a single input's encoded size exceeds the compressed payload limit, the request
// builder previously returned Ok(Some(input)) (signalling "flush and retry"), but since the builder
// was empty there was nothing to flush. The caller (run_request_builder in the metrics encoder) would
// then panic because flush() returned an empty vec on a supposedly non-empty builder.
//
// The fix returns Err(InputExceedsCompressedSizeLimit) instead, letting the caller drop the metric
// and continue rather than entering an unresolvable flush loop.
let compressed_limit = 64;
let encoder = TestEncoder::new(compressed_limit, usize::MAX, "/submit");
let mut request_builder = create_no_compression_request_builder(encoder).await;

// This input encodes to more bytes than the compressed limit, so it can never fit.
let oversized_input = "x".repeat(compressed_limit + 1);

match request_builder.encode(oversized_input).await {
Err(RequestBuilderError::InputExceedsCompressedSizeLimit {
encoded_len,
compressed_len_limit,
}) => {
assert_eq!(encoded_len, compressed_limit + 1);
assert_eq!(compressed_len_limit, compressed_limit);
}
other => panic!("expected InputExceedsCompressedSizeLimit, got: {:?}", other),
}

// The builder should still be empty and usable after the error.
assert!(request_builder.flush().await.is_empty());

let small_input = "hello".to_string();
assert_eq!(None, request_builder.encode(small_input).await.unwrap());
}

#[tokio::test]
async fn uncompressed_size_limit_too_small() {
// Make sure that we can't build a request builder with an uncompressed size limit that is smaller than the
Expand Down
Loading