Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 86 additions & 2 deletions lib/saluki-components/src/transforms/aggregate/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{
num::NonZeroU64,
sync::Arc,
time::{Duration, Instant},
};

Expand All @@ -17,12 +18,13 @@ use saluki_core::{
topology::{interconnect::BufferedDispatcher, OutputDefinition},
topology::{EventsBuffer, EventsDispatcher},
};
use saluki_error::GenericError;
use saluki_error::{generic_error, GenericError};
use saluki_metrics::MetricsBuilder;
use serde::Deserialize;
use smallvec::SmallVec;
use tokio::{
select,
sync::{mpsc, oneshot},
time::{interval, interval_at},
};
use tracing::{debug, error, trace};
Expand Down Expand Up @@ -174,12 +176,46 @@ pub struct AggregateConfiguration {
/// distribution aggregation).
#[serde(flatten)]
hist_config: HistogramConfiguration,

/// Receiver for on-demand flush requests. Created via `create_handle()`.
///
/// Wrapped in `Arc<tokio::sync::Mutex<...>>` to allow sharing across component respawns
/// and to enable async-friendly lock acquisition.
#[serde(skip)]
flush_rx: Option<Arc<tokio::sync::Mutex<mpsc::Receiver<oneshot::Sender<()>>>>>,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I should have been more clear.

We should always create the flush channel, basically: make this into (Arc<Mutex<Receiver<...>>>, Sender<...>). From there, we would also make flush_rx in Aggregate simply be Arc<Mutex<Receiver<...>>>, and then we get to avoid the code around checking if it's set or not: we just always acquire the lock/receiver. This would also mean that AggregateConfiguration::create_handle just ends up cloning instead of creating anything new.

It's a little less efficient than the pending when not in use, but I prefer the clarity of just assuming we always need to check/care about the flush channel.

}

/// Handle for triggering on-demand flushes of the aggregation state.
///
/// This is useful in serverless environments (e.g., Lambda) where flushing needs to happen
/// at invocation boundaries rather than on a fixed timer interval.
#[derive(Clone)]
pub struct AggregatorHandle {
flush_tx: mpsc::Sender<oneshot::Sender<()>>,
}

impl AggregatorHandle {
/// Triggers an immediate flush of all closed aggregation windows.
///
/// The flush is dispatched downstream through the normal pipeline (encoder, forwarder, etc.).
/// Returns when the flush has been dispatched (not necessarily when HTTP shipping completes).
pub async fn flush(&self) -> Result<(), GenericError> {
let (tx, rx) = oneshot::channel();
self.flush_tx
.send(tx)
.await
.map_err(|_| generic_error!("Aggregator has been shut down"))?;
rx.await
.map_err(|_| generic_error!("Aggregator dropped flush response"))
}
}

impl AggregateConfiguration {
/// Creates a new `AggregateConfiguration` from the given configuration.
pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
Ok(config.as_typed()?)
let mut cfg: Self = config.as_typed()?;
cfg.flush_rx = None;
Ok(cfg)
}

/// Creates a new `AggregateConfiguration` with default values.
Expand All @@ -193,8 +229,22 @@ impl AggregateConfiguration {
passthrough_timestamped_metrics: default_passthrough_timestamped_metrics(),
passthrough_idle_flush_timeout: default_passthrough_idle_flush_timeout(),
hist_config: HistogramConfiguration::default(),
flush_rx: None,
}
}

/// Creates a handle for triggering on-demand flushes from outside the topology.
///
/// Must be called before the configuration is used to build the transform. The returned
/// handle can be used to trigger flushes at arbitrary times (e.g., Lambda invocation boundaries).
///
/// The receiver is wrapped in `Arc<Mutex<...>>` so it can be reused if the component
/// is respawned.
pub fn create_handle(&mut self) -> AggregatorHandle {
let (tx, rx) = mpsc::channel(64);
self.flush_rx = Some(Arc::new(tokio::sync::Mutex::new(rx)));
AggregatorHandle { flush_tx: tx }
}
}

#[async_trait]
Expand Down Expand Up @@ -225,6 +275,7 @@ impl TransformBuilder for AggregateConfiguration {
flush_open_windows: self.flush_open_windows,
passthrough_batcher,
passthrough_timestamped_metrics: self.passthrough_timestamped_metrics,
flush_rx: self.flush_rx.clone(),
}))
}

Expand Down Expand Up @@ -274,6 +325,7 @@ pub struct Aggregate {
flush_open_windows: bool,
passthrough_batcher: PassthroughBatcher,
passthrough_timestamped_metrics: bool,
flush_rx: Option<Arc<tokio::sync::Mutex<mpsc::Receiver<oneshot::Sender<()>>>>>,
}

#[async_trait]
Expand All @@ -289,6 +341,14 @@ impl Transform for Aggregate {

let passthrough_flush = interval(PASSTHROUGH_IDLE_FLUSH_CHECK_INTERVAL);

// Acquire the on-demand flush receiver if one was configured.
// Uses OwnedMutexGuard so the lock is held for the lifetime of this component
// instance, and released if the component is respawned.
let mut flush_rx = match &self.flush_rx {
Some(rx) => Some(rx.clone().lock_owned().await),
None => None,
};

health.mark_ready();
debug!("Aggregation transform started.");

Expand All @@ -297,6 +357,30 @@ impl Transform for Aggregate {
loop {
select! {
_ = health.live() => continue,
Some(respond_to) = async {
match flush_rx.as_mut() {
Some(rx) => rx.recv().await,
None => std::future::pending().await,
}
} => {
// On-demand flush triggered by AggregatorHandle.
if !self.state.is_empty() {
debug!("On-demand flush of aggregated metrics...");

let mut dispatcher = context.dispatcher().buffered().expect("default output should always exist");
if let Err(e) = self.state.flush(get_unix_timestamp(), false, &mut dispatcher).await {
error!(error = %e, "Failed to flush aggregation state on demand.");
}

self.telemetry.increment_flushes();

match dispatcher.flush().await {
Ok(aggregated_events) => debug!(aggregated_events, "Dispatched on-demand flushed events."),
Err(e) => error!(error = %e, "Failed to flush on-demand aggregated events."),
}
}
let _ = respond_to.send(());
},
_ = primary_flush.tick() => {
// We've reached the end of the current window. Flush our aggregation state and forward the metrics
// onwards. Regardless of whether any metrics were aggregated, we always update the aggregation
Expand Down
Loading