From 06f3965a2a00e68f03e1462655d6eb330437a3b7 Mon Sep 17 00:00:00 2001 From: Luke Steensen Date: Thu, 23 Apr 2026 11:57:34 -0500 Subject: [PATCH] feat(aggregate): support configurable sub-10s aggregation windows Introduces `aggregator_bucket_size_seconds` as an alias for the existing `aggregate_window_duration` field, enabling sub-10s aggregation windows (e.g. 1 Hz) for the aggregated pipeline. - Adds a `DEFAULT_BUCKET_SIZE` constant and routes both code and tests through it to eliminate literal `10`s in aggregation code. - Pins the pass-through batcher to `DEFAULT_BUCKET_SIZE` via a dedicated `build_passthrough_batcher` helper. The pass-through wire protocol carries no client-supplied interval, so changing it with the new knob would silently reinterpret every rate sample from every client using that path. - Adds a flexible deserializer so the config value can be supplied as either an integer number of seconds or the legacy serde `Duration` struct form -- preserving pre-existing configs while letting the `_seconds`-suffixed alias behave the way its name implies. - Adds regression tests covering: end-to-end aggregation at 1 Hz with retention of open-bucket values across flushes, pass-through pinning despite a 1 Hz config, both deserialization forms of the new key, and the legacy struct form of the existing key. --- .../src/transforms/aggregate/mod.rs | 172 ++++++++++++++++-- 1 file changed, 154 insertions(+), 18 deletions(-) diff --git a/lib/saluki-components/src/transforms/aggregate/mod.rs b/lib/saluki-components/src/transforms/aggregate/mod.rs index b1bcf814d62..9355480227e 100644 --- a/lib/saluki-components/src/transforms/aggregate/mod.rs +++ b/lib/saluki-components/src/transforms/aggregate/mod.rs @@ -19,7 +19,7 @@ use saluki_core::{ }; use saluki_error::GenericError; use saluki_metrics::MetricsBuilder; -use serde::Deserialize; +use serde::{Deserialize, Deserializer}; use smallvec::SmallVec; use tokio::{ select, @@ -35,8 +35,30 @@ use self::config::HistogramConfiguration; const PASSTHROUGH_IDLE_FLUSH_CHECK_INTERVAL: Duration = Duration::from_secs(2); +const DEFAULT_BUCKET_SIZE: Duration = Duration::from_secs(10); + const fn default_window_duration() -> Duration { - Duration::from_secs(10) + DEFAULT_BUCKET_SIZE +} + +// Accepts either an integer number of seconds (e.g. `10`) or the default serde `Duration` struct form +// (e.g. `{ "secs": 10, "nanos": 0 }`). The struct form keeps existing configs working; the integer form +// lets `aggregator_bucket_size_seconds` behave the way its name implies. +fn deserialize_duration_seconds_or_struct<'de, D>(deserializer: D) -> Result +where + D: Deserializer<'de>, +{ + #[derive(Deserialize)] + #[serde(untagged)] + enum DurationSpec { + Seconds(u64), + Duration(Duration), + } + + Ok(match DurationSpec::deserialize(deserializer)? { + DurationSpec::Seconds(s) => Duration::from_secs(s), + DurationSpec::Duration(d) => d, + }) } const fn default_primary_flush_interval() -> Duration { @@ -87,7 +109,12 @@ pub struct AggregateConfiguration { /// how many data points are emitted downstream. /// /// Defaults to 10 seconds. - #[serde(rename = "aggregate_window_duration", default = "default_window_duration")] + #[serde( + rename = "aggregate_window_duration", + alias = "aggregator_bucket_size_seconds", + default = "default_window_duration", + deserialize_with = "deserialize_duration_seconds_or_struct" + )] window_duration: Duration, /// How often to flush buckets. @@ -195,6 +222,14 @@ impl AggregateConfiguration { hist_config: HistogramConfiguration::default(), } } + + async fn build_passthrough_batcher(&self, telemetry: Telemetry) -> PassthroughBatcher { + // Pass-through metrics arrive with client-supplied timestamps over a wire protocol that does not carry an + // interval. The interval used here is an agent-side convention, not a property of the incoming data, so it + // must not follow the configured bucket size -- doing so would silently reinterpret every rate sample from + // every client using this path. + PassthroughBatcher::new(self.passthrough_idle_flush_timeout, DEFAULT_BUCKET_SIZE, telemetry).await + } } #[async_trait] @@ -211,12 +246,7 @@ impl TransformBuilder for AggregateConfiguration { telemetry.clone(), ); - let passthrough_batcher = PassthroughBatcher::new( - self.passthrough_idle_flush_timeout, - self.window_duration, - telemetry.clone(), - ) - .await; + let passthrough_batcher = self.build_passthrough_batcher(telemetry.clone()).await; Ok(Box::new(Aggregate { state, @@ -443,11 +473,10 @@ impl PassthroughBatcher { } async fn push_metric(&mut self, metric: Metric, dispatcher: &EventsDispatcher) { - // Convert counters to rates before we batch them up. - // - // This involves specifying the rate interval as the bucket width of the aggregate transform itself, which when - // you say it out loud is sort of confusing and nonsensical since the whole point is that these are - // _pre-aggregated_ metrics but we have to match the behavior of the Datadog Agent. ¯\_(ツ)_/¯ + // Convert counters to rates before we batch them up. The rate interval here is an agent-side convention -- + // the wire protocol carries no interval for pre-aggregated metrics -- and matches the Datadog Agent's fixed + // default, not the aggregate transform's configurable bucket width. See + // `AggregateConfiguration::build_passthrough_batcher` for the construction-site invariant. let (context, values, metadata) = metric.into_parts(); let adjusted_values = counter_values_to_rate(values, self.bucket_width); let metric = Metric::from_parts(context, adjusted_values, metadata); @@ -901,12 +930,18 @@ mod tests { } async fn get_flushed_metrics(timestamp: u64, state: &mut AggregationState) -> Vec { + get_flushed_metrics_at(timestamp, true, state).await + } + + async fn get_flushed_metrics_at( + timestamp: u64, flush_open_buckets: bool, state: &mut AggregationState, + ) -> Vec { let (dispatcher, mut dispatcher_receiver) = build_basic_dispatcher(); let mut buffered_dispatcher = dispatcher.buffered().expect("default output should always exist"); // Flush the metrics to an event buffer. state - .flush(timestamp, true, &mut buffered_dispatcher) + .flush(timestamp, flush_open_buckets, &mut buffered_dispatcher) .await .expect("should not fail to flush aggregation state"); @@ -1350,10 +1385,10 @@ mod tests { async fn preaggregated_counters_to_rate() { let counter_value = 42.0; let timestamp = 123456; - let bucket_width = BUCKET_WIDTH; // Create a basic passthrough batcher and forwarder. - let mut batcher = PassthroughBatcher::new(Duration::from_nanos(1), bucket_width, Telemetry::noop()).await; + let mut batcher = + PassthroughBatcher::new(Duration::from_nanos(1), DEFAULT_BUCKET_SIZE, Telemetry::noop()).await; let (dispatcher, mut dispatcher_receiver) = build_basic_dispatcher(); // Create a simple pre-aggregated counter, and batch it. @@ -1367,11 +1402,112 @@ mod tests { let mut flushed_metrics = dispatcher_receiver.collect_next(); assert_eq!(flushed_metrics.len(), 1); assert_eq!( - Metric::rate("metric1", (timestamp, counter_value), bucket_width), + Metric::rate("metric1", (timestamp, counter_value), DEFAULT_BUCKET_SIZE), + flushed_metrics.remove(0) + ); + } + + #[tokio::test] + async fn aggregated_counters_at_1s_bucket_width() { + // T1 from 1HZ_SPEC: three samples of a single logical series into distinct 1s buckets (two closed, one open + // at the flush cutoff). Assert exactly one series is emitted with points at the expected bucket starts and + // `interval = 1s`; then re-flush to prove the open-bucket value was retained (not silently dropped). + // + // Counter expiration is disabled so the second flush doesn't also emit zero-value filler. + let bucket_width = Duration::from_secs(1); + + let mut state = AggregationState::new( + bucket_width, + 10, + None, + HistogramConfiguration::default(), + Telemetry::noop(), + ); + + let input = Metric::counter("metric_a", 0.0); + assert!(state.insert(100, Metric::counter("metric_a", 1.0))); + assert!(state.insert(101, Metric::counter("metric_a", 2.0))); + assert!(state.insert(102, Metric::counter("metric_a", 3.0))); + + // First flush: cutoff t=102 closes buckets 100 and 101, bucket 102 is still open. + let flushed = get_flushed_metrics_at(102, false, &mut state).await; + assert_eq!(flushed.len(), 1, "expected exactly one logical series"); + assert_flushed_scalar_metric!(&input, &flushed[0], [100 => 1.0, 101 => 2.0]); + match flushed[0].values() { + MetricValues::Rate(_, interval) => assert_eq!(*interval, Duration::from_secs(1)), + other => panic!("expected rate, got {}", other.as_str()), + } + + // Second flush at t=103 closes bucket 102. The previously-open value must emerge now; if it had been + // silently dropped on the first flush, this assertion fails. + let flushed = get_flushed_metrics_at(103, false, &mut state).await; + assert_eq!(flushed.len(), 1, "open-bucket value should be retained across flushes"); + assert_flushed_scalar_metric!(&input, &flushed[0], [102 => 3.0]); + } + + #[tokio::test] + async fn passthrough_pinned_to_default_despite_1hz_config() { + // Central regression pin for the 1 Hz aggregation spec: setting `window_duration = 1s` must not change the + // pass-through batcher's emitted rate interval. The invariant lives in + // `AggregateConfiguration::build_passthrough_batcher`, which is the batcher construction site used by + // `build()`. + let mut cfg = AggregateConfiguration::with_defaults(); + cfg.window_duration = Duration::from_secs(1); + cfg.passthrough_idle_flush_timeout = Duration::from_nanos(1); + + let mut batcher = cfg.build_passthrough_batcher(Telemetry::noop()).await; + + let counter_value = 42.0; + let timestamp = 123456; + let (dispatcher, mut dispatcher_receiver) = build_basic_dispatcher(); + + let input_metric = Metric::counter("metric1", (timestamp, counter_value)); + batcher.push_metric(input_metric.clone(), &dispatcher).await; + batcher.try_flush(&dispatcher).await; + + let mut flushed_metrics = dispatcher_receiver.collect_next(); + assert_eq!(flushed_metrics.len(), 1); + assert_eq!( + Metric::rate("metric1", (timestamp, counter_value), DEFAULT_BUCKET_SIZE), flushed_metrics.remove(0) ); } + #[tokio::test] + async fn aliased_bucket_size_config_key_deserializes_as_integer_seconds() { + use saluki_config::ConfigurationLoader; + + let (cfg, _) = ConfigurationLoader::for_tests( + Some(serde_json::json!({ "aggregator_bucket_size_seconds": 1 })), + None, + false, + ) + .await; + + let agg_config = AggregateConfiguration::from_configuration(&cfg).expect("should deserialize via alias key"); + assert_eq!(agg_config.window_duration, Duration::from_secs(1)); + } + + #[tokio::test] + async fn legacy_window_duration_struct_form_still_deserializes() { + // Pre-existing configs set `aggregate_window_duration` as the default serde Duration struct form. The new + // flexible deserializer must not break them. + use saluki_config::ConfigurationLoader; + + let (cfg, _) = ConfigurationLoader::for_tests( + Some(serde_json::json!({ + "aggregate_window_duration": { "secs": 7, "nanos": 0 } + })), + None, + false, + ) + .await; + + let agg_config = + AggregateConfiguration::from_configuration(&cfg).expect("legacy struct-form config must still work"); + assert_eq!(agg_config.window_duration, Duration::from_secs(7)); + } + #[tokio::test] async fn telemetry() { // TODO: We don't check `component_events_dropped_total` or `aggregate_passthrough_metrics_total` here as