Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 51 additions & 4 deletions lib/ddsketch/src/agent/sketch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -681,13 +681,17 @@ fn trim_left(bins: &mut SmallVec<[Bin; 4]>, bin_limit: u16) {
generate_bins(&mut overflow, bin_remove.k, missing);
}

let overflow_len = overflow.len();
let (_, bins_end) = bins.split_at(num_to_remove);
overflow.extend_from_slice(bins_end);

// I still don't yet understand how this works, since you'd think bin limit should be the overall limit of the
// number of bins, but we're allowing more than that.. :thinkies:
overflow.truncate(bin_limit + overflow_len);
// Truncate to bin_limit so the total bin count stays within the configured limit. Overflow bins created
// above (when collapsed counts exceed MAX_BIN_WIDTH) are prepended before bins_end, and together the
// combined slice is capped at bin_limit. This may discard some higher-key bins from bins_end when
// overflow is large, which is the expected precision trade-off for a bounded sketch.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically, I think we actually want to trim the lower bins, since lowest-collapsing is the default behavior for the official DDSketch implementation, IIRC: it means you end up trading accuracy at lower percentiles where it matters far less.

As is, we would be harming the upper percentiles: p95, p99, yadda yadda.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yeah truncate keeps the earlier ones, so I guess this wasn't trimming left to begin with. Will fix

//
// As of April 2026, this is an intentional divergence from the Datadog Agent implementation,
// which does not truncate bins to stay under a limit.
overflow.truncate(bin_limit);

mem::swap(bins, &mut overflow);
}
Expand All @@ -712,3 +716,46 @@ fn generate_bins(bins: &mut SmallVec<[Bin; 4]>, k: i16, n: u64) {
}
}
}

#[cfg(test)]
mod tests {
use super::*;

/// Regression test for trim_left bin count explosion with large per-sample weights.
///
/// When a sample weight exceeds MAX_BIN_WIDTH (65535), a single `insert_n` call generates
/// multiple bins with the same key (one per 65535 units of count). Before the fix,
/// `trim_left` accumulated these overflow bins into its output without truncating to
/// `bin_limit`, causing the bin count to grow without bound across insertions. After
/// enough inserts the sketch could have millions of bins rather than the expected 4096,
/// eventually producing a serialized payload that exceeded the encoder's compressed size
/// limit and triggering a panic in the request builder.
///
/// This test inserts several values with a weight representative of what ADP receives when
/// clamping an incoming sample rate of 3e-9 to its minimum of 3.845e-9 (~260M per sample),
/// then asserts the bin count never exceeds DDSKETCH_CONF_BIN_LIMIT.
#[test]
fn trim_left_respects_bin_limit_with_large_weights() {
// Weight corresponding to ADP's minimum safe sample rate (1 / 3.845e-9 ≈ 260_078_024).
// Each insert_n call with this weight generates ceil(260_078_024 / 65535) = 3969 bins
// for a single key, enough to trigger trim_left after just two distinct values.
let weight: u64 = 260_078_024;
let bin_limit = usize::from(DDSKETCH_CONF_BIN_LIMIT);

let mut sketch = DDSketch::default();

// Insert enough distinct values to repeatedly trigger trim_left. Ten values is more
// than sufficient; two already exceed the bin limit without the fix.
for i in 1..=10_i32 {
sketch.insert_n(f64::from(i), weight);
assert!(
sketch.bins().len() <= bin_limit,
"bin count {} exceeded limit {} after inserting {} value(s) at weight {}",
sketch.bins().len(),
bin_limit,
i,
weight,
);
}
}
}
53 changes: 53 additions & 0 deletions lib/saluki-components/src/common/datadog/request_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,15 @@ where
encoded_len: usize,
uncompressed_len_limit: usize,
},
#[snafu(display(
"input encoded size ({} bytes) exceeds compressed payload limit ({} bytes) and can never fit",
encoded_len,
compressed_len_limit
))]
InputExceedsCompressedSizeLimit {
encoded_len: usize,
compressed_len_limit: usize,
},
#[snafu(display("input was invalid for request builder: {:?}'", input))]
InvalidInput { input: E::Input },
#[snafu(display("failed to encode/write payload: {}", source))]
Expand Down Expand Up @@ -333,6 +342,16 @@ where
});
}

// If the input's encoded size already exceeds the compressed payload limit, it can never fit regardless of
// what else is in the payload, so there is no point in asking the caller to flush and retry. Return an error
// so the input is dropped rather than looping forever.
if self.scratch_buf.len() > self.compressed_len_limit {
return Err(RequestBuilderError::InputExceedsCompressedSizeLimit {
encoded_len: self.scratch_buf.len(),
compressed_len_limit: self.compressed_len_limit,
});
}
Comment on lines +345 to +353
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This actually... isn't right, now that I think about it.

The whole thing is that we really can't know if we've exceed the compressed size limit or not until we actually attempt compressing the request payload. I think what's going on, more likely than not, is that we're triggering the "can't encode this metric" pathway (Ok(false)) because CompressionEstimator::would_write_exceed_threshold returns true: naively, it will end up checking if encoded_len is greater than self.compressed_len_limit, and return true if so. That's why your "fix" appears to work, because it's preventing us from reaching that faulty logic.

I think we would actually just want to remove the conditional I mentioned from CompressionEstimator::would_write_exceed_threshold since, realistically, it shouldn't be able to estimate overflow without any compressed data have been written out yet. I don't know why I added that check initially but it's feeling wrong in hindsight.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a further thought, any sort of pathological metric like this would cause problems because even if we allow it in, and then detect an oversized payload during flush and so we split and try to do the two separate halves... the half with the pathological metric will still fail to produce a compressed payload that is within the limit.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll take another look, I'm wondering whether we actually need to bail this out ahead of time or whether we could instead just not panic if we have a buffer with size 1 and still can't flush

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also curious whether we expect any legit payloads to ever actually be this big (>3MB)?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixing the compression estimator is definitely something we should do: its purpose is to act purely as an optimization hint to flush before we grow payloads so large that they need to be split... but in this case, it's causing a logical error (instructing the caller to flush when there's no actual payload) so that's just straight-up wrong.

If we do that, then what will happen is that we'll keep letting encoded metrics through until the compressor finally does an intermediate flush (because it filled its own internal buffers) and then we'll quickly realize whether or not the current compressed size is over the limit or not... and if not, then the actual estimator logic will kick in. Finally, the check during flush protects us from oversized payloads.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also curious whether we expect any legit payloads to ever actually be this big (>3MB)?

Well, in the case of our weird Go tracer behavior reporting allocation statistics with crazy high sample rates... yes? That's uncompressed, though. All of those nearly identical bins should then compress down fairly well.

With the v3 intake protocol (#1175), they'll compress down really well because of the delta encoding that we do.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those don't get so large once the trimming is actually working though, right? Shouldn't they be in the 10s of KBs?


// If the input can't fit into the current request payload based on the uncompressed size limit, or isn't likely
// to fit into the current request payload based on the estimated compressed size limit, then return it to the
// caller: this indicates that a flush must happen before trying to encode the same input again.
Expand Down Expand Up @@ -947,6 +966,40 @@ mod tests {
// size limits.
}

#[tokio::test]
async fn input_exceeds_compressed_size_limit() {
// Regression test: when a single input's encoded size exceeds the compressed payload limit, the request
// builder previously returned Ok(Some(input)) (signalling "flush and retry"), but since the builder
// was empty there was nothing to flush. The caller (run_request_builder in the metrics encoder) would
// then panic because flush() returned an empty vec on a supposedly non-empty builder.
//
// The fix returns Err(InputExceedsCompressedSizeLimit) instead, letting the caller drop the metric
// and continue rather than entering an unresolvable flush loop.
let compressed_limit = 64;
let encoder = TestEncoder::new(compressed_limit, usize::MAX, "/submit");
let mut request_builder = create_no_compression_request_builder(encoder).await;

// This input encodes to more bytes than the compressed limit, so it can never fit.
let oversized_input = "x".repeat(compressed_limit + 1);

match request_builder.encode(oversized_input).await {
Err(RequestBuilderError::InputExceedsCompressedSizeLimit {
encoded_len,
compressed_len_limit,
}) => {
assert_eq!(encoded_len, compressed_limit + 1);
assert_eq!(compressed_len_limit, compressed_limit);
}
other => panic!("expected InputExceedsCompressedSizeLimit, got: {:?}", other),
}

// The builder should still be empty and usable after the error.
assert!(request_builder.flush().await.is_empty());

let small_input = "hello".to_string();
assert_eq!(None, request_builder.encode(small_input).await.unwrap());
}

#[tokio::test]
async fn uncompressed_size_limit_too_small() {
// Make sure that we can't build a request builder with an uncompressed size limit that is smaller than the
Expand Down
Loading