diff --git a/lib/saluki-components/src/common/datadog/request_builder.rs b/lib/saluki-components/src/common/datadog/request_builder.rs index 1e79e190ba0..83bfa17a45d 100644 --- a/lib/saluki-components/src/common/datadog/request_builder.rs +++ b/lib/saluki-components/src/common/datadog/request_builder.rs @@ -1114,4 +1114,30 @@ mod tests { prop_assert_eq!(original_inputs_len, flushed_inputs_len); } + + #[tokio::test] + async fn first_input_larger_than_compressed_limit_does_not_panic() { + // Regression test for a panic triggered when the very first metric's uncompressed encoded + // size exceeded the configured compressed size limit. + // + // Previously, `CompressionEstimator::would_write_exceed_threshold` had an early exit that + // returned `true` when `len > threshold`, even before any compressed data had been written. + // This caused `encode_inner` to return `Ok(false)` (signal to flush), but since nothing had + // been written, `flush()` returned an empty vec, hitting: + // panic!("builder told us to flush, but gave us nothing") + // + // The fix removes that early exit. When no compressed data has been written yet, the + // estimator returns `false` (allow the write), so the large input proceeds to the compressor + // rather than triggering a premature flush of an empty builder. + let large_input = "x".repeat(10_000); + + // Compressed limit set below the uncompressed input length to trigger the old bug. + let encoder = TestEncoder::new(1_000, usize::MAX, "/submit"); + let mut request_builder = create_zstd_compression_request_builder(encoder).await; + + // This must not panic. The input may produce an oversized payload on flush, but that is + // handled gracefully — the important invariant is that encode itself does not panic. + let result = request_builder.encode(large_input).await; + assert!(result.is_ok(), "encode returned an error: {:?}", result.err()); + } } diff --git a/lib/saluki-io/src/compression.rs b/lib/saluki-io/src/compression.rs index d759271cf2b..496d2b9730c 100644 --- a/lib/saluki-io/src/compression.rs +++ b/lib/saluki-io/src/compression.rs @@ -292,11 +292,6 @@ impl CompressionEstimator { /// Estimates if writing `len` bytes to the compressor would cause the final compressed size to exceed `threshold` /// bytes. pub fn would_write_exceed_threshold(&self, len: usize, threshold: usize) -> bool { - // If the length of the data to be written exceeds the threshold, then it obviously would exceed the threshold. - if len > threshold { - return true; - } - // If we have yet to see any compressed data, we can't make a meaningful estimate, and this likely means that // the compressor is still actively able to compress more data into the first block, which when eventually // written, should never exceed the compressed size limit... so we choose to not block writes in this case. @@ -362,8 +357,12 @@ mod tests { fn compression_estimator_no_output() { let estimator = CompressionEstimator::default(); + // Without any compressed data, we cannot estimate whether a write would exceed the threshold, + // so we always return false to allow the write. This includes the case where the uncompressed + // size exceeds the threshold, because many inputs compress significantly (e.g. sketches with + // many near-identical bins). assert!(!estimator.would_write_exceed_threshold(10, 100)); - assert!(estimator.would_write_exceed_threshold(100, 90)); + assert!(!estimator.would_write_exceed_threshold(100, 90)); } #[test]