Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 154 additions & 18 deletions lib/saluki-components/src/transforms/aggregate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use saluki_core::{
};
use saluki_error::GenericError;
use saluki_metrics::MetricsBuilder;
use serde::Deserialize;
use serde::{Deserialize, Deserializer};
use smallvec::SmallVec;
use tokio::{
select,
Expand All @@ -35,8 +35,30 @@ use self::config::HistogramConfiguration;

const PASSTHROUGH_IDLE_FLUSH_CHECK_INTERVAL: Duration = Duration::from_secs(2);

const DEFAULT_BUCKET_SIZE: Duration = Duration::from_secs(10);

const fn default_window_duration() -> Duration {
Duration::from_secs(10)
DEFAULT_BUCKET_SIZE
}

// Accepts either an integer number of seconds (e.g. `10`) or the default serde `Duration` struct form
// (e.g. `{ "secs": 10, "nanos": 0 }`). The struct form keeps existing configs working; the integer form
// lets `aggregator_bucket_size_seconds` behave the way its name implies.
fn deserialize_duration_seconds_or_struct<'de, D>(deserializer: D) -> Result<Duration, D::Error>
where
D: Deserializer<'de>,
{
#[derive(Deserialize)]
#[serde(untagged)]
enum DurationSpec {
Seconds(u64),
Duration(Duration),
}

Ok(match DurationSpec::deserialize(deserializer)? {
DurationSpec::Seconds(s) => Duration::from_secs(s),
DurationSpec::Duration(d) => d,
})
}

const fn default_primary_flush_interval() -> Duration {
Expand Down Expand Up @@ -87,7 +109,12 @@ pub struct AggregateConfiguration {
/// how many data points are emitted downstream.
///
/// Defaults to 10 seconds.
#[serde(rename = "aggregate_window_duration", default = "default_window_duration")]
#[serde(
rename = "aggregate_window_duration",
alias = "aggregator_bucket_size_seconds",
default = "default_window_duration",
deserialize_with = "deserialize_duration_seconds_or_struct"
)]
window_duration: Duration,

/// How often to flush buckets.
Expand Down Expand Up @@ -195,6 +222,14 @@ impl AggregateConfiguration {
hist_config: HistogramConfiguration::default(),
}
}

async fn build_passthrough_batcher(&self, telemetry: Telemetry) -> PassthroughBatcher {
// Pass-through metrics arrive with client-supplied timestamps over a wire protocol that does not carry an
// interval. The interval used here is an agent-side convention, not a property of the incoming data, so it
// must not follow the configured bucket size -- doing so would silently reinterpret every rate sample from
// every client using this path.
PassthroughBatcher::new(self.passthrough_idle_flush_timeout, DEFAULT_BUCKET_SIZE, telemetry).await
}
}

#[async_trait]
Expand All @@ -211,12 +246,7 @@ impl TransformBuilder for AggregateConfiguration {
telemetry.clone(),
);

let passthrough_batcher = PassthroughBatcher::new(
self.passthrough_idle_flush_timeout,
self.window_duration,
telemetry.clone(),
)
.await;
let passthrough_batcher = self.build_passthrough_batcher(telemetry.clone()).await;

Ok(Box::new(Aggregate {
state,
Expand Down Expand Up @@ -443,11 +473,10 @@ impl PassthroughBatcher {
}

async fn push_metric(&mut self, metric: Metric, dispatcher: &EventsDispatcher) {
// Convert counters to rates before we batch them up.
//
// This involves specifying the rate interval as the bucket width of the aggregate transform itself, which when
// you say it out loud is sort of confusing and nonsensical since the whole point is that these are
// _pre-aggregated_ metrics but we have to match the behavior of the Datadog Agent. ¯\_(ツ)_/¯
// Convert counters to rates before we batch them up. The rate interval here is an agent-side convention --
// the wire protocol carries no interval for pre-aggregated metrics -- and matches the Datadog Agent's fixed
// default, not the aggregate transform's configurable bucket width. See
// `AggregateConfiguration::build_passthrough_batcher` for the construction-site invariant.
let (context, values, metadata) = metric.into_parts();
let adjusted_values = counter_values_to_rate(values, self.bucket_width);
let metric = Metric::from_parts(context, adjusted_values, metadata);
Expand Down Expand Up @@ -901,12 +930,18 @@ mod tests {
}

async fn get_flushed_metrics(timestamp: u64, state: &mut AggregationState) -> Vec<Metric> {
get_flushed_metrics_at(timestamp, true, state).await
}

async fn get_flushed_metrics_at(
timestamp: u64, flush_open_buckets: bool, state: &mut AggregationState,
) -> Vec<Metric> {
let (dispatcher, mut dispatcher_receiver) = build_basic_dispatcher();
let mut buffered_dispatcher = dispatcher.buffered().expect("default output should always exist");

// Flush the metrics to an event buffer.
state
.flush(timestamp, true, &mut buffered_dispatcher)
.flush(timestamp, flush_open_buckets, &mut buffered_dispatcher)
.await
.expect("should not fail to flush aggregation state");

Expand Down Expand Up @@ -1350,10 +1385,10 @@ mod tests {
async fn preaggregated_counters_to_rate() {
let counter_value = 42.0;
let timestamp = 123456;
let bucket_width = BUCKET_WIDTH;

// Create a basic passthrough batcher and forwarder.
let mut batcher = PassthroughBatcher::new(Duration::from_nanos(1), bucket_width, Telemetry::noop()).await;
let mut batcher =
PassthroughBatcher::new(Duration::from_nanos(1), DEFAULT_BUCKET_SIZE, Telemetry::noop()).await;
let (dispatcher, mut dispatcher_receiver) = build_basic_dispatcher();

// Create a simple pre-aggregated counter, and batch it.
Expand All @@ -1367,11 +1402,112 @@ mod tests {
let mut flushed_metrics = dispatcher_receiver.collect_next();
assert_eq!(flushed_metrics.len(), 1);
assert_eq!(
Metric::rate("metric1", (timestamp, counter_value), bucket_width),
Metric::rate("metric1", (timestamp, counter_value), DEFAULT_BUCKET_SIZE),
flushed_metrics.remove(0)
);
}

#[tokio::test]
async fn aggregated_counters_at_1s_bucket_width() {
// T1 from 1HZ_SPEC: three samples of a single logical series into distinct 1s buckets (two closed, one open
// at the flush cutoff). Assert exactly one series is emitted with points at the expected bucket starts and
// `interval = 1s`; then re-flush to prove the open-bucket value was retained (not silently dropped).
//
// Counter expiration is disabled so the second flush doesn't also emit zero-value filler.
let bucket_width = Duration::from_secs(1);

let mut state = AggregationState::new(
bucket_width,
10,
None,
HistogramConfiguration::default(),
Telemetry::noop(),
);

let input = Metric::counter("metric_a", 0.0);
assert!(state.insert(100, Metric::counter("metric_a", 1.0)));
assert!(state.insert(101, Metric::counter("metric_a", 2.0)));
assert!(state.insert(102, Metric::counter("metric_a", 3.0)));

// First flush: cutoff t=102 closes buckets 100 and 101, bucket 102 is still open.
let flushed = get_flushed_metrics_at(102, false, &mut state).await;
assert_eq!(flushed.len(), 1, "expected exactly one logical series");
assert_flushed_scalar_metric!(&input, &flushed[0], [100 => 1.0, 101 => 2.0]);
match flushed[0].values() {
MetricValues::Rate(_, interval) => assert_eq!(*interval, Duration::from_secs(1)),
other => panic!("expected rate, got {}", other.as_str()),
}

// Second flush at t=103 closes bucket 102. The previously-open value must emerge now; if it had been
// silently dropped on the first flush, this assertion fails.
let flushed = get_flushed_metrics_at(103, false, &mut state).await;
assert_eq!(flushed.len(), 1, "open-bucket value should be retained across flushes");
assert_flushed_scalar_metric!(&input, &flushed[0], [102 => 3.0]);
}

#[tokio::test]
async fn passthrough_pinned_to_default_despite_1hz_config() {
// Central regression pin for the 1 Hz aggregation spec: setting `window_duration = 1s` must not change the
// pass-through batcher's emitted rate interval. The invariant lives in
// `AggregateConfiguration::build_passthrough_batcher`, which is the batcher construction site used by
// `build()`.
let mut cfg = AggregateConfiguration::with_defaults();
cfg.window_duration = Duration::from_secs(1);
cfg.passthrough_idle_flush_timeout = Duration::from_nanos(1);

let mut batcher = cfg.build_passthrough_batcher(Telemetry::noop()).await;

let counter_value = 42.0;
let timestamp = 123456;
let (dispatcher, mut dispatcher_receiver) = build_basic_dispatcher();

let input_metric = Metric::counter("metric1", (timestamp, counter_value));
batcher.push_metric(input_metric.clone(), &dispatcher).await;
batcher.try_flush(&dispatcher).await;

let mut flushed_metrics = dispatcher_receiver.collect_next();
assert_eq!(flushed_metrics.len(), 1);
assert_eq!(
Metric::rate("metric1", (timestamp, counter_value), DEFAULT_BUCKET_SIZE),
flushed_metrics.remove(0)
);
}

#[tokio::test]
async fn aliased_bucket_size_config_key_deserializes_as_integer_seconds() {
use saluki_config::ConfigurationLoader;

let (cfg, _) = ConfigurationLoader::for_tests(
Some(serde_json::json!({ "aggregator_bucket_size_seconds": 1 })),
None,
false,
)
.await;

let agg_config = AggregateConfiguration::from_configuration(&cfg).expect("should deserialize via alias key");
assert_eq!(agg_config.window_duration, Duration::from_secs(1));
}

#[tokio::test]
async fn legacy_window_duration_struct_form_still_deserializes() {
// Pre-existing configs set `aggregate_window_duration` as the default serde Duration struct form. The new
// flexible deserializer must not break them.
use saluki_config::ConfigurationLoader;

let (cfg, _) = ConfigurationLoader::for_tests(
Some(serde_json::json!({
"aggregate_window_duration": { "secs": 7, "nanos": 0 }
})),
None,
false,
)
.await;

let agg_config =
AggregateConfiguration::from_configuration(&cfg).expect("legacy struct-form config must still work");
assert_eq!(agg_config.window_duration, Duration::from_secs(7));
}

#[tokio::test]
async fn telemetry() {
// TODO: We don't check `component_events_dropped_total` or `aggregate_passthrough_metrics_total` here as
Expand Down
Loading