Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 62 additions & 70 deletions bin/agent-data-plane/src/components/tag_filterlist/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use foldhash::fast::RandomState as FoldHashState;
use hashbrown::{HashMap, HashSet};
use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
use saluki_config::GenericConfiguration;
use saluki_context::tags::{Tag, TagSet};
use saluki_context::{tags::Tag, TagSetMutViewState};
use saluki_core::{
components::{
transforms::{Transform, TransformBuilder, TransformContext},
Expand Down Expand Up @@ -170,6 +170,7 @@ impl TransformBuilder for TagFilterlistConfiguration {
.clone()
.expect("configuration must be set via from_configuration"),
telemetry: Telemetry::new(&metrics_builder),
view_state: TagSetMutViewState::new(),
}))
}
}
Expand All @@ -184,6 +185,7 @@ struct TagFilterlist {
filters: CompiledFilters,
configuration: GenericConfiguration,
telemetry: Telemetry,
view_state: TagSetMutViewState,
}

#[async_trait]
Expand All @@ -204,7 +206,7 @@ impl Transform for TagFilterlist {
for event in &mut events {
if let Some(metric) = event.try_as_metric_mut() {
if metric.values().is_sketch() {
let outcome = filter_metric_tags(metric, &self.filters);
let outcome = filter_metric_tags(metric, &self.filters, &mut self.view_state);
self.telemetry.record(outcome);
}
}
Expand Down Expand Up @@ -233,54 +235,43 @@ fn should_keep_tag(tag: &Tag, is_exclude: bool, names: &HashSet<String, FoldHash
is_exclude != names.contains(tag.as_borrowed().name())
}

#[inline]
fn has_removable_tags(tags: &TagSet, is_exclude: bool, names: &HashSet<String, FoldHashState>) -> bool {
tags.into_iter().any(|tag| !should_keep_tag(tag, is_exclude, names))
}

/// Filter the tags of a distribution metric according to the compiled filter table.
///
/// Both instrumented tags and origin tags are filtered using the same tag key list.
/// If the metric name is not present in `filters`, the metric is left unchanged.
/// If filtering would not change any tags, the metric context is left untouched (zero allocations).
///
/// Uses a lazy copy-on-write view to scan tags in a single read-only pass and defer the
/// `Arc` clone + context key recomputation until tags are actually removed.
#[inline]
pub fn filter_metric_tags(metric: &mut Metric, filters: &CompiledFilters) -> FilterMetricTagsOutcome {
pub fn filter_metric_tags(
metric: &mut Metric, filters: &CompiledFilters, view_state: &mut TagSetMutViewState,
) -> FilterMetricTagsOutcome {
let Some((is_exclude, tag_names)) = filters.get(metric.context().name().as_ref()) else {
return FilterMetricTagsOutcome::RuleMiss;
};

let is_exclude = *is_exclude;

let has_tag_removals = has_removable_tags(metric.context().tags(), is_exclude, tag_names);
let has_origin_removals = has_removable_tags(metric.context().origin_tags(), is_exclude, tag_names);

if !has_tag_removals && !has_origin_removals {
return FilterMetricTagsOutcome::NoChange;
}

let mut total_removed = 0;
metric.context_mut().with_tag_sets_mut(|tags, origin_tags| {
if has_tag_removals {
let before = tags.len();
tags.retain(|tag| should_keep_tag(tag, is_exclude, tag_names));
total_removed += before - tags.len();
}
if has_origin_removals {
let before = origin_tags.len();
origin_tags.retain(|tag| should_keep_tag(tag, is_exclude, tag_names));
total_removed += before - origin_tags.len();
}
});
// Single read-only scan of both tag sets, collecting removal indices.
// No Arc clone or rehash occurs until finish() — and only if something was flagged.
let mut view = metric.context_mut().tags_mut_view(view_state);
view.retain_tags(|tag| should_keep_tag(tag, is_exclude, tag_names));
view.retain_origin_tags(|tag| should_keep_tag(tag, is_exclude, tag_names));

FilterMetricTagsOutcome::Modified {
removed_tags: total_removed,
match view.finish() {
0 => FilterMetricTagsOutcome::NoChange,
n => FilterMetricTagsOutcome::Modified { removed_tags: n },
}
}

#[cfg(test)]
mod tests {
use saluki_config::{dynamic::ConfigUpdate, ConfigurationLoader};
use saluki_context::{tags::Tag, Context};
use saluki_context::{
tags::{Tag, TagSet},
Context,
};
use saluki_core::data_model::event::metric::Metric;
use saluki_metrics::{test::TestRecorder, MetricsBuilder};
use serde_json::json;
Expand Down Expand Up @@ -326,7 +317,7 @@ mod tests {
let filters = compile_filters(&entries);

let mut metric = distribution_metric("my.dist", &["env:prod", "service:web", "host:h1"]);
filter_metric_tags(&mut metric, &filters);
filter_metric_tags(&mut metric, &filters, &mut TagSetMutViewState::new());

assert_eq!(tag_names(&metric), vec!["service:web"]);
}
Expand All @@ -341,7 +332,7 @@ mod tests {
let filters = compile_filters(&entries);

let mut metric = distribution_metric("my.dist", &["env:prod", "service:web", "host:h1"]);
filter_metric_tags(&mut metric, &filters);
filter_metric_tags(&mut metric, &filters, &mut TagSetMutViewState::new());

assert_eq!(tag_names(&metric), vec!["env:prod"]);
}
Expand All @@ -356,7 +347,7 @@ mod tests {
let filters = compile_filters(&entries);

let mut metric = distribution_metric("my.dist", &["env:prod", "service:web"]);
filter_metric_tags(&mut metric, &filters);
filter_metric_tags(&mut metric, &filters, &mut TagSetMutViewState::new());

assert_eq!(tag_names(&metric), vec!["env:prod", "service:web"]);
}
Expand All @@ -369,48 +360,33 @@ mod tests {
}

#[test]
fn bare_tag_excluded_by_name() {
fn empty_tag_list_exclude_keeps_all() {
let entries = vec![MetricTagFilterEntry {
metric_name: "my.dist".to_string(),
action: FilterAction::Exclude,
tags: vec!["production".to_string()],
}];
let filters = compile_filters(&entries);

let mut metric = distribution_metric("my.dist", &["production", "service:web"]);
filter_metric_tags(&mut metric, &filters);

assert_eq!(tag_names(&metric), vec!["service:web"]);
}

#[test]
fn empty_tag_list_include_removes_all() {
let entries = vec![MetricTagFilterEntry {
metric_name: "my.dist".to_string(),
action: FilterAction::Include,
tags: vec![],
}];
let filters = compile_filters(&entries);

let mut metric = distribution_metric("my.dist", &["env:prod", "service:web"]);
filter_metric_tags(&mut metric, &filters);
filter_metric_tags(&mut metric, &filters, &mut TagSetMutViewState::new());

assert!(metric.context().tags().is_empty());
assert_eq!(tag_names(&metric), vec!["env:prod", "service:web"]);
}

#[test]
fn empty_tag_list_exclude_keeps_all() {
fn empty_tag_list_include_removes_all() {
let entries = vec![MetricTagFilterEntry {
metric_name: "my.dist".to_string(),
action: FilterAction::Exclude,
action: FilterAction::Include,
tags: vec![],
}];
let filters = compile_filters(&entries);

let mut metric = distribution_metric("my.dist", &["env:prod", "service:web"]);
filter_metric_tags(&mut metric, &filters);
filter_metric_tags(&mut metric, &filters, &mut TagSetMutViewState::new());

assert_eq!(tag_names(&metric), vec!["env:prod", "service:web"]);
assert!(tag_names(&metric).is_empty());
}

#[test]
Expand All @@ -430,7 +406,7 @@ mod tests {
let filters = compile_filters(&entries);

let mut metric = distribution_metric("my.dist", &["env:prod", "host:h1", "service:web"]);
filter_metric_tags(&mut metric, &filters);
filter_metric_tags(&mut metric, &filters, &mut TagSetMutViewState::new());

assert_eq!(tag_names(&metric), vec!["service:web"]);
}
Expand All @@ -452,7 +428,7 @@ mod tests {
let filters = compile_filters(&entries);

let mut metric = distribution_metric("my.dist", &["env:prod", "host:h1", "service:web"]);
filter_metric_tags(&mut metric, &filters);
filter_metric_tags(&mut metric, &filters, &mut TagSetMutViewState::new());

assert_eq!(tag_names(&metric), vec!["env:prod", "service:web"]);
}
Expand All @@ -474,16 +450,32 @@ mod tests {
let filters = compile_filters(&entries);

let mut metric = distribution_metric("my.dist", &["env:prod", "host:h1", "service:web"]);
filter_metric_tags(&mut metric, &filters);
filter_metric_tags(&mut metric, &filters, &mut TagSetMutViewState::new());

assert_eq!(tag_names(&metric), vec!["env:prod", "service:web"]);
}

#[test]
fn bare_tag_excluded_by_name() {
let entries = vec![MetricTagFilterEntry {
metric_name: "my.dist".to_string(),
action: FilterAction::Exclude,
tags: vec!["production".to_string()],
}];
let filters = compile_filters(&entries);

// "production" is a bare tag (no colon); tag.name() returns "production".
let mut metric = distribution_metric("my.dist", &["production", "service:web"]);
filter_metric_tags(&mut metric, &filters, &mut TagSetMutViewState::new());

assert_eq!(tag_names(&metric), vec!["service:web"]);
}

#[test]
fn no_config_is_noop() {
let filters = compile_filters(&[]);
let mut metric = distribution_metric("my.dist", &["env:prod", "service:web"]);
filter_metric_tags(&mut metric, &filters);
filter_metric_tags(&mut metric, &filters, &mut TagSetMutViewState::new());
assert_eq!(tag_names(&metric), vec!["env:prod", "service:web"]);
}

Expand Down Expand Up @@ -563,7 +555,7 @@ mod tests {

let mut metric =
distribution_metric_with_origin_tags("my.dist", &["env:prod"], &["env:prod", "host:h1", "service:web"]);
filter_metric_tags(&mut metric, &filters);
filter_metric_tags(&mut metric, &filters, &mut TagSetMutViewState::new());

assert_eq!(origin_tag_names(&metric), vec!["service:web"]);
}
Expand All @@ -579,7 +571,7 @@ mod tests {

let mut metric =
distribution_metric_with_origin_tags("my.dist", &["env:prod"], &["env:prod", "host:h1", "service:web"]);
filter_metric_tags(&mut metric, &filters);
filter_metric_tags(&mut metric, &filters, &mut TagSetMutViewState::new());

assert_eq!(origin_tag_names(&metric), vec!["env:prod"]);
}
Expand All @@ -594,7 +586,7 @@ mod tests {
let filters = compile_filters(&entries);

let mut metric = distribution_metric("my.dist", &["env:prod", "service:web"]);
filter_metric_tags(&mut metric, &filters);
filter_metric_tags(&mut metric, &filters, &mut TagSetMutViewState::new());

assert_eq!(tag_names(&metric), vec!["service:web"]);
assert!(metric.context().origin_tags().is_empty());
Expand All @@ -621,7 +613,7 @@ mod tests {
}];
let filters = compile_filters(&entries);

filter_metric_tags(&mut metric1, &filters);
filter_metric_tags(&mut metric1, &filters, &mut TagSetMutViewState::new());

assert_eq!(origin_tag_names(&metric1), vec!["service:web"]);
let metric2_origin: Vec<_> = metric2
Expand Down Expand Up @@ -651,7 +643,7 @@ mod tests {
&["env:prod", "service:web", "host:h1"],
&["env:prod", "host:h1", "region:us-east-1"],
);
filter_metric_tags(&mut metric, &filters);
filter_metric_tags(&mut metric, &filters, &mut TagSetMutViewState::new());

assert_eq!(tag_names(&metric), vec!["service:web"]);
assert_eq!(origin_tag_names(&metric), vec!["region:us-east-1"]);
Expand Down Expand Up @@ -714,7 +706,7 @@ mod tests {
let filters = compile_filters(new_entries.as_deref().unwrap_or(&[]));

let mut metric = distribution_metric("my.dist", &["env:prod", "host:h1", "service:web"]);
filter_metric_tags(&mut metric, &filters);
filter_metric_tags(&mut metric, &filters, &mut TagSetMutViewState::new());
assert_eq!(tag_names(&metric), vec!["env:prod", "service:web"]);
}

Expand Down Expand Up @@ -765,7 +757,7 @@ mod tests {
let filters = compile_filters(new_entries.as_deref().unwrap_or(&[]));

let mut metric = distribution_metric("my.dist", &["env:prod", "service:web"]);
filter_metric_tags(&mut metric, &filters);
filter_metric_tags(&mut metric, &filters, &mut TagSetMutViewState::new());
assert_eq!(tag_names(&metric), vec!["env:prod", "service:web"]);
}

Expand Down Expand Up @@ -800,7 +792,7 @@ mod tests {
let filters = compile_filters(new_entries.as_deref().unwrap_or(&[]));

let mut metric = distribution_metric("my.dist", &["env:prod", "service:web", "host:h1"]);
filter_metric_tags(&mut metric, &filters);
filter_metric_tags(&mut metric, &filters, &mut TagSetMutViewState::new());
assert_eq!(tag_names(&metric), vec!["service:web"]);
}

Expand All @@ -815,7 +807,7 @@ mod tests {

let mut metric = distribution_metric("my.dist", &["env:prod", "host:h1"]);

let outcome = filter_metric_tags(&mut metric, &filters);
let outcome = filter_metric_tags(&mut metric, &filters, &mut TagSetMutViewState::new());

assert_eq!(outcome, FilterMetricTagsOutcome::Modified { removed_tags: 1 });
assert_eq!(tag_names(&metric), vec!["env:prod"]);
Expand All @@ -841,7 +833,7 @@ mod tests {
assert!(!metric.context().tags().is_modified());
assert!(!metric.context().origin_tags().is_modified());

let outcome = filter_metric_tags(&mut metric, &filters);
let outcome = filter_metric_tags(&mut metric, &filters, &mut TagSetMutViewState::new());

assert_eq!(outcome, FilterMetricTagsOutcome::NoChange);
assert_eq!(tag_names(&metric), vec!["env:prod", "host:h1"]);
Expand Down
Loading
Loading