Skip to content
Draft
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 73 additions & 1 deletion lib/saluki-components/src/transforms/aggregate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use serde::Deserialize;
use smallvec::SmallVec;
use tokio::{
select,
sync::{mpsc, oneshot},
time::{interval, interval_at},
};
use tracing::{debug, error, trace};
Expand Down Expand Up @@ -174,12 +175,43 @@ pub struct AggregateConfiguration {
/// distribution aggregation).
#[serde(flatten)]
hist_config: HistogramConfiguration,

/// Receiver for on-demand flush requests. Created via `create_handle()`.
/// Wrapped in a Mutex to allow `take()` from the `&self` `build()` method.
#[serde(skip)]
flush_rx: std::sync::Mutex<Option<mpsc::UnboundedReceiver<oneshot::Sender<()>>>>,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Realistically, this should be Arc<tokio::sync::Mutex<mpsc::Receiver<...>>>.

The design of component configuration is such that we eventually want to be able to respawn components. Components that expose external control mechanisms like this (a handle to externally control behavior) have to do so in a way where another instance of this component could conceivably be spawned and then take over. What we have here currently would consume the receiver and make it unavailable to the next instance of the component. By wrapping it in Arc<tokio::sync::Mutex<...>>, we allow for reusing it in future instances, and also gracefully awaiting lock acquisition in an async-friendly way.

The change from UnboundedReceiver to Receiver is because unbounded channels are bad. We never use them in Saluki.

}

/// Handle for triggering on-demand flushes of the aggregation state.
///
/// This is useful in serverless environments (e.g., Lambda) where flushing needs to happen
/// at invocation boundaries rather than on a fixed timer interval.
#[derive(Clone)]
pub struct AggregatorHandle {
flush_tx: mpsc::UnboundedSender<oneshot::Sender<()>>,
}

impl AggregatorHandle {
/// Triggers an immediate flush of all closed aggregation windows.
///
/// The flush is dispatched downstream through the normal pipeline (encoder, forwarder, etc.).
/// Returns when the flush has been dispatched (not necessarily when HTTP shipping completes).
pub async fn flush(&self) -> Result<(), GenericError> {
let (tx, rx) = oneshot::channel();
self.flush_tx
.send(tx)
.map_err(|_| saluki_error::generic_error!("Aggregator has been shut down"))?;
Comment thread
duncanista marked this conversation as resolved.
Outdated
rx.await
.map_err(|_| saluki_error::generic_error!("Aggregator dropped flush response"))
Comment thread
duncanista marked this conversation as resolved.
Outdated
}
}

impl AggregateConfiguration {
/// Creates a new `AggregateConfiguration` from the given configuration.
pub fn from_configuration(config: &GenericConfiguration) -> Result<Self, GenericError> {
Ok(config.as_typed()?)
let mut cfg: Self = config.as_typed()?;
cfg.flush_rx = std::sync::Mutex::new(None);
Ok(cfg)
}

/// Creates a new `AggregateConfiguration` with default values.
Expand All @@ -193,8 +225,19 @@ impl AggregateConfiguration {
passthrough_timestamped_metrics: default_passthrough_timestamped_metrics(),
passthrough_idle_flush_timeout: default_passthrough_idle_flush_timeout(),
hist_config: HistogramConfiguration::default(),
flush_rx: std::sync::Mutex::new(None),
}
}

/// Creates a handle for triggering on-demand flushes from outside the topology.
///
/// Must be called before the configuration is used to build the transform. The returned
/// handle can be used to trigger flushes at arbitrary times (e.g., Lambda invocation boundaries).
pub fn create_handle(&mut self) -> AggregatorHandle {
let (tx, rx) = mpsc::unbounded_channel();
*self.flush_rx.lock().unwrap() = Some(rx);
AggregatorHandle { flush_tx: tx }
}
}

#[async_trait]
Expand Down Expand Up @@ -225,6 +268,7 @@ impl TransformBuilder for AggregateConfiguration {
flush_open_windows: self.flush_open_windows,
passthrough_batcher,
passthrough_timestamped_metrics: self.passthrough_timestamped_metrics,
flush_rx: self.flush_rx.lock().unwrap().take(),
}))
}

Expand Down Expand Up @@ -274,6 +318,7 @@ pub struct Aggregate {
flush_open_windows: bool,
passthrough_batcher: PassthroughBatcher,
passthrough_timestamped_metrics: bool,
flush_rx: Option<mpsc::UnboundedReceiver<oneshot::Sender<()>>>,
Comment thread
duncanista marked this conversation as resolved.
Outdated
}

#[async_trait]
Expand All @@ -289,6 +334,9 @@ impl Transform for Aggregate {

let passthrough_flush = interval(PASSTHROUGH_IDLE_FLUSH_CHECK_INTERVAL);

// Channel for receiving on-demand flush requests (from AggregatorHandle).
let mut flush_rx = self.flush_rx.take();

health.mark_ready();
debug!("Aggregation transform started.");

Expand All @@ -297,6 +345,30 @@ impl Transform for Aggregate {
loop {
select! {
_ = health.live() => continue,
Some(respond_to) = async {
match flush_rx.as_mut() {
Some(rx) => rx.recv().await,
None => std::future::pending().await,
}
} => {
// On-demand flush triggered by AggregatorHandle.
if !self.state.is_empty() {
debug!("On-demand flush of aggregated metrics...");

let mut dispatcher = context.dispatcher().buffered().expect("default output should always exist");
if let Err(e) = self.state.flush(get_unix_timestamp(), false, &mut dispatcher).await {
error!(error = %e, "Failed to flush aggregation state on demand.");
}

self.telemetry.increment_flushes();

match dispatcher.flush().await {
Ok(aggregated_events) => debug!(aggregated_events, "Dispatched on-demand flushed events."),
Err(e) => error!(error = %e, "Failed to flush on-demand aggregated events."),
}
}
let _ = respond_to.send(());
},
_ = primary_flush.tick() => {
// We've reached the end of the current window. Flush our aggregation state and forward the metrics
// onwards. Regardless of whether any metrics were aggregated, we always update the aggregation
Expand Down
Loading