diff --git a/lib/saluki-components/src/transforms/aggregate/mod.rs b/lib/saluki-components/src/transforms/aggregate/mod.rs index b62fca1d002..d212a3adc04 100644 --- a/lib/saluki-components/src/transforms/aggregate/mod.rs +++ b/lib/saluki-components/src/transforms/aggregate/mod.rs @@ -1,5 +1,6 @@ use std::{ num::NonZeroU64, + sync::Arc, time::{Duration, Instant}, }; @@ -17,12 +18,13 @@ use saluki_core::{ topology::{interconnect::BufferedDispatcher, OutputDefinition}, topology::{EventsBuffer, EventsDispatcher}, }; -use saluki_error::GenericError; +use saluki_error::{generic_error, GenericError}; use saluki_metrics::MetricsBuilder; use serde::Deserialize; use smallvec::SmallVec; use tokio::{ select, + sync::{mpsc, oneshot}, time::{interval, interval_at}, }; use tracing::{debug, error, trace}; @@ -174,12 +176,46 @@ pub struct AggregateConfiguration { /// distribution aggregation). #[serde(flatten)] hist_config: HistogramConfiguration, + + /// Receiver for on-demand flush requests. Created via `create_handle()`. + /// + /// Wrapped in `Arc>` to allow sharing across component respawns + /// and to enable async-friendly lock acquisition. + #[serde(skip)] + flush_rx: Option>>>>, +} + +/// Handle for triggering on-demand flushes of the aggregation state. +/// +/// This is useful in serverless environments (e.g., Lambda) where flushing needs to happen +/// at invocation boundaries rather than on a fixed timer interval. +#[derive(Clone)] +pub struct AggregatorHandle { + flush_tx: mpsc::Sender>, +} + +impl AggregatorHandle { + /// Triggers an immediate flush of all closed aggregation windows. + /// + /// The flush is dispatched downstream through the normal pipeline (encoder, forwarder, etc.). + /// Returns when the flush has been dispatched (not necessarily when HTTP shipping completes). + pub async fn flush(&self) -> Result<(), GenericError> { + let (tx, rx) = oneshot::channel(); + self.flush_tx + .send(tx) + .await + .map_err(|_| generic_error!("Aggregator has been shut down"))?; + rx.await + .map_err(|_| generic_error!("Aggregator dropped flush response")) + } } impl AggregateConfiguration { /// Creates a new `AggregateConfiguration` from the given configuration. pub fn from_configuration(config: &GenericConfiguration) -> Result { - Ok(config.as_typed()?) + let mut cfg: Self = config.as_typed()?; + cfg.flush_rx = None; + Ok(cfg) } /// Creates a new `AggregateConfiguration` with default values. @@ -193,8 +229,22 @@ impl AggregateConfiguration { passthrough_timestamped_metrics: default_passthrough_timestamped_metrics(), passthrough_idle_flush_timeout: default_passthrough_idle_flush_timeout(), hist_config: HistogramConfiguration::default(), + flush_rx: None, } } + + /// Creates a handle for triggering on-demand flushes from outside the topology. + /// + /// Must be called before the configuration is used to build the transform. The returned + /// handle can be used to trigger flushes at arbitrary times (e.g., Lambda invocation boundaries). + /// + /// The receiver is wrapped in `Arc>` so it can be reused if the component + /// is respawned. + pub fn create_handle(&mut self) -> AggregatorHandle { + let (tx, rx) = mpsc::channel(64); + self.flush_rx = Some(Arc::new(tokio::sync::Mutex::new(rx))); + AggregatorHandle { flush_tx: tx } + } } #[async_trait] @@ -225,6 +275,7 @@ impl TransformBuilder for AggregateConfiguration { flush_open_windows: self.flush_open_windows, passthrough_batcher, passthrough_timestamped_metrics: self.passthrough_timestamped_metrics, + flush_rx: self.flush_rx.clone(), })) } @@ -274,6 +325,7 @@ pub struct Aggregate { flush_open_windows: bool, passthrough_batcher: PassthroughBatcher, passthrough_timestamped_metrics: bool, + flush_rx: Option>>>>, } #[async_trait] @@ -289,6 +341,14 @@ impl Transform for Aggregate { let passthrough_flush = interval(PASSTHROUGH_IDLE_FLUSH_CHECK_INTERVAL); + // Acquire the on-demand flush receiver if one was configured. + // Uses OwnedMutexGuard so the lock is held for the lifetime of this component + // instance, and released if the component is respawned. + let mut flush_rx = match &self.flush_rx { + Some(rx) => Some(rx.clone().lock_owned().await), + None => None, + }; + health.mark_ready(); debug!("Aggregation transform started."); @@ -297,6 +357,30 @@ impl Transform for Aggregate { loop { select! { _ = health.live() => continue, + Some(respond_to) = async { + match flush_rx.as_mut() { + Some(rx) => rx.recv().await, + None => std::future::pending().await, + } + } => { + // On-demand flush triggered by AggregatorHandle. + if !self.state.is_empty() { + debug!("On-demand flush of aggregated metrics..."); + + let mut dispatcher = context.dispatcher().buffered().expect("default output should always exist"); + if let Err(e) = self.state.flush(get_unix_timestamp(), false, &mut dispatcher).await { + error!(error = %e, "Failed to flush aggregation state on demand."); + } + + self.telemetry.increment_flushes(); + + match dispatcher.flush().await { + Ok(aggregated_events) => debug!(aggregated_events, "Dispatched on-demand flushed events."), + Err(e) => error!(error = %e, "Failed to flush on-demand aggregated events."), + } + } + let _ = respond_to.send(()); + }, _ = primary_flush.tick() => { // We've reached the end of the current window. Flush our aggregation state and forward the metrics // onwards. Regardless of whether any metrics were aggregated, we always update the aggregation