diff --git a/bin/agent-data-plane/src/components/tag_filterlist/mod.rs b/bin/agent-data-plane/src/components/tag_filterlist/mod.rs index ff33b4f3faa..542465b7a3a 100644 --- a/bin/agent-data-plane/src/components/tag_filterlist/mod.rs +++ b/bin/agent-data-plane/src/components/tag_filterlist/mod.rs @@ -13,7 +13,7 @@ use foldhash::fast::RandomState as FoldHashState; use hashbrown::{HashMap, HashSet}; use memory_accounting::{MemoryBounds, MemoryBoundsBuilder}; use saluki_config::GenericConfiguration; -use saluki_context::tags::{Tag, TagSet}; +use saluki_context::{tags::Tag, TagSetMutViewState}; use saluki_core::{ components::{ transforms::{Transform, TransformBuilder, TransformContext}, @@ -170,6 +170,7 @@ impl TransformBuilder for TagFilterlistConfiguration { .clone() .expect("configuration must be set via from_configuration"), telemetry: Telemetry::new(&metrics_builder), + view_state: TagSetMutViewState::new(), })) } } @@ -184,6 +185,7 @@ struct TagFilterlist { filters: CompiledFilters, configuration: GenericConfiguration, telemetry: Telemetry, + view_state: TagSetMutViewState, } #[async_trait] @@ -204,7 +206,7 @@ impl Transform for TagFilterlist { for event in &mut events { if let Some(metric) = event.try_as_metric_mut() { if metric.values().is_sketch() { - let outcome = filter_metric_tags(metric, &self.filters); + let outcome = filter_metric_tags(metric, &self.filters, &mut self.view_state); self.telemetry.record(outcome); } } @@ -233,54 +235,43 @@ fn should_keep_tag(tag: &Tag, is_exclude: bool, names: &HashSet) -> bool { - tags.into_iter().any(|tag| !should_keep_tag(tag, is_exclude, names)) -} - /// Filter the tags of a distribution metric according to the compiled filter table. /// /// Both instrumented tags and origin tags are filtered using the same tag key list. /// If the metric name is not present in `filters`, the metric is left unchanged. /// If filtering would not change any tags, the metric context is left untouched (zero allocations). +/// +/// Uses a lazy copy-on-write view to scan tags in a single read-only pass and defer the +/// `Arc` clone + context key recomputation until tags are actually removed. #[inline] -pub fn filter_metric_tags(metric: &mut Metric, filters: &CompiledFilters) -> FilterMetricTagsOutcome { +pub fn filter_metric_tags( + metric: &mut Metric, filters: &CompiledFilters, view_state: &mut TagSetMutViewState, +) -> FilterMetricTagsOutcome { let Some((is_exclude, tag_names)) = filters.get(metric.context().name().as_ref()) else { return FilterMetricTagsOutcome::RuleMiss; }; let is_exclude = *is_exclude; - let has_tag_removals = has_removable_tags(metric.context().tags(), is_exclude, tag_names); - let has_origin_removals = has_removable_tags(metric.context().origin_tags(), is_exclude, tag_names); - - if !has_tag_removals && !has_origin_removals { - return FilterMetricTagsOutcome::NoChange; - } - - let mut total_removed = 0; - metric.context_mut().with_tag_sets_mut(|tags, origin_tags| { - if has_tag_removals { - let before = tags.len(); - tags.retain(|tag| should_keep_tag(tag, is_exclude, tag_names)); - total_removed += before - tags.len(); - } - if has_origin_removals { - let before = origin_tags.len(); - origin_tags.retain(|tag| should_keep_tag(tag, is_exclude, tag_names)); - total_removed += before - origin_tags.len(); - } - }); + // Single read-only scan of both tag sets, collecting removal indices. + // No Arc clone or rehash occurs until finish() — and only if something was flagged. + let mut view = metric.context_mut().tags_mut_view(view_state); + view.retain_tags(|tag| should_keep_tag(tag, is_exclude, tag_names)); + view.retain_origin_tags(|tag| should_keep_tag(tag, is_exclude, tag_names)); - FilterMetricTagsOutcome::Modified { - removed_tags: total_removed, + match view.finish() { + 0 => FilterMetricTagsOutcome::NoChange, + n => FilterMetricTagsOutcome::Modified { removed_tags: n }, } } #[cfg(test)] mod tests { use saluki_config::{dynamic::ConfigUpdate, ConfigurationLoader}; - use saluki_context::{tags::Tag, Context}; + use saluki_context::{ + tags::{Tag, TagSet}, + Context, + }; use saluki_core::data_model::event::metric::Metric; use saluki_metrics::{test::TestRecorder, MetricsBuilder}; use serde_json::json; @@ -326,7 +317,7 @@ mod tests { let filters = compile_filters(&entries); let mut metric = distribution_metric("my.dist", &["env:prod", "service:web", "host:h1"]); - filter_metric_tags(&mut metric, &filters); + filter_metric_tags(&mut metric, &filters, &mut TagSetMutViewState::new()); assert_eq!(tag_names(&metric), vec!["service:web"]); } @@ -341,7 +332,7 @@ mod tests { let filters = compile_filters(&entries); let mut metric = distribution_metric("my.dist", &["env:prod", "service:web", "host:h1"]); - filter_metric_tags(&mut metric, &filters); + filter_metric_tags(&mut metric, &filters, &mut TagSetMutViewState::new()); assert_eq!(tag_names(&metric), vec!["env:prod"]); } @@ -356,7 +347,7 @@ mod tests { let filters = compile_filters(&entries); let mut metric = distribution_metric("my.dist", &["env:prod", "service:web"]); - filter_metric_tags(&mut metric, &filters); + filter_metric_tags(&mut metric, &filters, &mut TagSetMutViewState::new()); assert_eq!(tag_names(&metric), vec!["env:prod", "service:web"]); } @@ -369,48 +360,33 @@ mod tests { } #[test] - fn bare_tag_excluded_by_name() { + fn empty_tag_list_exclude_keeps_all() { let entries = vec![MetricTagFilterEntry { metric_name: "my.dist".to_string(), action: FilterAction::Exclude, - tags: vec!["production".to_string()], - }]; - let filters = compile_filters(&entries); - - let mut metric = distribution_metric("my.dist", &["production", "service:web"]); - filter_metric_tags(&mut metric, &filters); - - assert_eq!(tag_names(&metric), vec!["service:web"]); - } - - #[test] - fn empty_tag_list_include_removes_all() { - let entries = vec![MetricTagFilterEntry { - metric_name: "my.dist".to_string(), - action: FilterAction::Include, tags: vec![], }]; let filters = compile_filters(&entries); let mut metric = distribution_metric("my.dist", &["env:prod", "service:web"]); - filter_metric_tags(&mut metric, &filters); + filter_metric_tags(&mut metric, &filters, &mut TagSetMutViewState::new()); - assert!(metric.context().tags().is_empty()); + assert_eq!(tag_names(&metric), vec!["env:prod", "service:web"]); } #[test] - fn empty_tag_list_exclude_keeps_all() { + fn empty_tag_list_include_removes_all() { let entries = vec![MetricTagFilterEntry { metric_name: "my.dist".to_string(), - action: FilterAction::Exclude, + action: FilterAction::Include, tags: vec![], }]; let filters = compile_filters(&entries); let mut metric = distribution_metric("my.dist", &["env:prod", "service:web"]); - filter_metric_tags(&mut metric, &filters); + filter_metric_tags(&mut metric, &filters, &mut TagSetMutViewState::new()); - assert_eq!(tag_names(&metric), vec!["env:prod", "service:web"]); + assert!(tag_names(&metric).is_empty()); } #[test] @@ -430,7 +406,7 @@ mod tests { let filters = compile_filters(&entries); let mut metric = distribution_metric("my.dist", &["env:prod", "host:h1", "service:web"]); - filter_metric_tags(&mut metric, &filters); + filter_metric_tags(&mut metric, &filters, &mut TagSetMutViewState::new()); assert_eq!(tag_names(&metric), vec!["service:web"]); } @@ -452,7 +428,7 @@ mod tests { let filters = compile_filters(&entries); let mut metric = distribution_metric("my.dist", &["env:prod", "host:h1", "service:web"]); - filter_metric_tags(&mut metric, &filters); + filter_metric_tags(&mut metric, &filters, &mut TagSetMutViewState::new()); assert_eq!(tag_names(&metric), vec!["env:prod", "service:web"]); } @@ -474,16 +450,32 @@ mod tests { let filters = compile_filters(&entries); let mut metric = distribution_metric("my.dist", &["env:prod", "host:h1", "service:web"]); - filter_metric_tags(&mut metric, &filters); + filter_metric_tags(&mut metric, &filters, &mut TagSetMutViewState::new()); assert_eq!(tag_names(&metric), vec!["env:prod", "service:web"]); } + #[test] + fn bare_tag_excluded_by_name() { + let entries = vec![MetricTagFilterEntry { + metric_name: "my.dist".to_string(), + action: FilterAction::Exclude, + tags: vec!["production".to_string()], + }]; + let filters = compile_filters(&entries); + + // "production" is a bare tag (no colon); tag.name() returns "production". + let mut metric = distribution_metric("my.dist", &["production", "service:web"]); + filter_metric_tags(&mut metric, &filters, &mut TagSetMutViewState::new()); + + assert_eq!(tag_names(&metric), vec!["service:web"]); + } + #[test] fn no_config_is_noop() { let filters = compile_filters(&[]); let mut metric = distribution_metric("my.dist", &["env:prod", "service:web"]); - filter_metric_tags(&mut metric, &filters); + filter_metric_tags(&mut metric, &filters, &mut TagSetMutViewState::new()); assert_eq!(tag_names(&metric), vec!["env:prod", "service:web"]); } @@ -563,7 +555,7 @@ mod tests { let mut metric = distribution_metric_with_origin_tags("my.dist", &["env:prod"], &["env:prod", "host:h1", "service:web"]); - filter_metric_tags(&mut metric, &filters); + filter_metric_tags(&mut metric, &filters, &mut TagSetMutViewState::new()); assert_eq!(origin_tag_names(&metric), vec!["service:web"]); } @@ -579,7 +571,7 @@ mod tests { let mut metric = distribution_metric_with_origin_tags("my.dist", &["env:prod"], &["env:prod", "host:h1", "service:web"]); - filter_metric_tags(&mut metric, &filters); + filter_metric_tags(&mut metric, &filters, &mut TagSetMutViewState::new()); assert_eq!(origin_tag_names(&metric), vec!["env:prod"]); } @@ -594,7 +586,7 @@ mod tests { let filters = compile_filters(&entries); let mut metric = distribution_metric("my.dist", &["env:prod", "service:web"]); - filter_metric_tags(&mut metric, &filters); + filter_metric_tags(&mut metric, &filters, &mut TagSetMutViewState::new()); assert_eq!(tag_names(&metric), vec!["service:web"]); assert!(metric.context().origin_tags().is_empty()); @@ -621,7 +613,7 @@ mod tests { }]; let filters = compile_filters(&entries); - filter_metric_tags(&mut metric1, &filters); + filter_metric_tags(&mut metric1, &filters, &mut TagSetMutViewState::new()); assert_eq!(origin_tag_names(&metric1), vec!["service:web"]); let metric2_origin: Vec<_> = metric2 @@ -651,7 +643,7 @@ mod tests { &["env:prod", "service:web", "host:h1"], &["env:prod", "host:h1", "region:us-east-1"], ); - filter_metric_tags(&mut metric, &filters); + filter_metric_tags(&mut metric, &filters, &mut TagSetMutViewState::new()); assert_eq!(tag_names(&metric), vec!["service:web"]); assert_eq!(origin_tag_names(&metric), vec!["region:us-east-1"]); @@ -714,7 +706,7 @@ mod tests { let filters = compile_filters(new_entries.as_deref().unwrap_or(&[])); let mut metric = distribution_metric("my.dist", &["env:prod", "host:h1", "service:web"]); - filter_metric_tags(&mut metric, &filters); + filter_metric_tags(&mut metric, &filters, &mut TagSetMutViewState::new()); assert_eq!(tag_names(&metric), vec!["env:prod", "service:web"]); } @@ -765,7 +757,7 @@ mod tests { let filters = compile_filters(new_entries.as_deref().unwrap_or(&[])); let mut metric = distribution_metric("my.dist", &["env:prod", "service:web"]); - filter_metric_tags(&mut metric, &filters); + filter_metric_tags(&mut metric, &filters, &mut TagSetMutViewState::new()); assert_eq!(tag_names(&metric), vec!["env:prod", "service:web"]); } @@ -800,7 +792,7 @@ mod tests { let filters = compile_filters(new_entries.as_deref().unwrap_or(&[])); let mut metric = distribution_metric("my.dist", &["env:prod", "service:web", "host:h1"]); - filter_metric_tags(&mut metric, &filters); + filter_metric_tags(&mut metric, &filters, &mut TagSetMutViewState::new()); assert_eq!(tag_names(&metric), vec!["service:web"]); } @@ -815,7 +807,7 @@ mod tests { let mut metric = distribution_metric("my.dist", &["env:prod", "host:h1"]); - let outcome = filter_metric_tags(&mut metric, &filters); + let outcome = filter_metric_tags(&mut metric, &filters, &mut TagSetMutViewState::new()); assert_eq!(outcome, FilterMetricTagsOutcome::Modified { removed_tags: 1 }); assert_eq!(tag_names(&metric), vec!["env:prod"]); @@ -841,7 +833,7 @@ mod tests { assert!(!metric.context().tags().is_modified()); assert!(!metric.context().origin_tags().is_modified()); - let outcome = filter_metric_tags(&mut metric, &filters); + let outcome = filter_metric_tags(&mut metric, &filters, &mut TagSetMutViewState::new()); assert_eq!(outcome, FilterMetricTagsOutcome::NoChange); assert_eq!(tag_names(&metric), vec!["env:prod", "host:h1"]); diff --git a/lib/saluki-context/src/context.rs b/lib/saluki-context/src/context.rs index 31877543b41..138660f7c33 100644 --- a/lib/saluki-context/src/context.rs +++ b/lib/saluki-context/src/context.rs @@ -1,11 +1,11 @@ use std::{fmt, hash, sync::Arc}; use metrics::Gauge; -use saluki_common::collections::ContiguousBitSet; +use saluki_common::collections::PrehashedHashSet; use stringtheory::MetaString; use crate::{ - hash::{hash_context, ContextKey}, + hash::{hash_context, hash_context_with_seen, ContextKey}, tags::{Tag, TagSet}, }; @@ -295,14 +295,15 @@ impl fmt::Display for Context { /// Reusable scratch space for [`TagSetMutView`] operations. /// -/// Holding a long-lived instance across calls amortizes bitset allocations. The bitsets are +/// Holding a long-lived instance across calls amortizes vector allocations. The vectors are /// cleared automatically when the associated [`TagSetMutView`] is dropped. #[derive(Debug, Default)] pub struct TagSetMutViewState { - tag_base_removals: ContiguousBitSet, - tag_addition_removals: ContiguousBitSet, - origin_base_removals: ContiguousBitSet, - origin_addition_removals: ContiguousBitSet, + tag_base_removals: Vec, + tag_addition_removals: Vec, + origin_base_removals: Vec, + origin_addition_removals: Vec, + hash_seen: PrehashedHashSet, } impl TagSetMutViewState { @@ -312,10 +313,18 @@ impl TagSetMutViewState { } fn clear(&mut self) { - self.tag_base_removals.clear_all(); - self.tag_addition_removals.clear_all(); - self.origin_base_removals.clear_all(); - self.origin_addition_removals.clear_all(); + self.tag_base_removals.clear(); + self.tag_addition_removals.clear(); + self.origin_base_removals.clear(); + self.origin_addition_removals.clear(); + self.hash_seen.clear(); + } + + fn normalize(&mut self) { + normalize_removals(&mut self.tag_base_removals); + normalize_removals(&mut self.tag_addition_removals); + normalize_removals(&mut self.origin_base_removals); + normalize_removals(&mut self.origin_addition_removals); } } @@ -359,9 +368,9 @@ impl<'a, 'b> TagSetMutView<'a, 'b> { /// If no changes were recorded, this is a no-op: no `Arc` clone, no rehash, returns 0. /// Otherwise, triggers `Arc::make_mut` on the context, applies the changes to both tag sets, /// and recomputes the context key. - /// - /// Returns the number of tags removed. pub fn finish(self) -> usize { + self.state.normalize(); + let total_tags = self.state.tag_base_removals.len() + self.state.tag_addition_removals.len(); let total_origin = self.state.origin_base_removals.len() + self.state.origin_addition_removals.len(); let total = total_tags + total_origin; @@ -383,13 +392,18 @@ impl<'a, 'b> TagSetMutView<'a, 'b> { .apply_removals(&self.state.origin_base_removals, &self.state.origin_addition_removals); } - let (key, _) = hash_context(&inner.name, &inner.tags, &inner.origin_tags); + let (key, _) = hash_context_with_seen(&inner.name, &inner.tags, &inner.origin_tags, &mut self.state.hash_seen); inner.key = key; total } } +fn normalize_removals(removals: &mut Vec) { + removals.sort_unstable(); + removals.dedup(); +} + impl Drop for TagSetMutView<'_, '_> { fn drop(&mut self) { self.state.clear(); diff --git a/lib/saluki-context/src/tags/tagset/owned.rs b/lib/saluki-context/src/tags/tagset/owned.rs index 5b0fbb5d00f..c2079f78112 100644 --- a/lib/saluki-context/src/tags/tagset/owned.rs +++ b/lib/saluki-context/src/tags/tagset/owned.rs @@ -222,31 +222,30 @@ impl TagSet { /// Scans this tag set and collects the indices of tags that should be removed according to the predicate. /// - /// Tags for which `f` returns `false` have their indices recorded in the provided bitsets. + /// Tags for which `f` returns `false` will have their indices appended to the provided vectors. /// `base_removals` receives flattened base indices. `addition_removals` receives indices into /// the additions overlay. /// /// This is a read-only scan: no mutation occurs. Use [`apply_removals`][TagSet::apply_removals] - /// to apply the collected indices. Multiple calls accumulate correctly because bitset sets are - /// idempotent; tags already flagged for removal are skipped. + /// to apply the collected indices. pub(crate) fn collect_removals( - &self, mut f: F, base_removals: &mut ContiguousBitSet, addition_removals: &mut ContiguousBitSet, + &self, mut f: F, base_removals: &mut Vec, addition_removals: &mut Vec, ) where F: FnMut(&Tag) -> bool, { - // Scan additions, skipping those already flagged. + // Scan additions. if let Some(overlay) = &self.overlay { for (i, tag) in overlay.additions.iter().enumerate() { - if !addition_removals.is_set(i) && !f(tag) { - addition_removals.set(i); + if !f(tag) { + addition_removals.push(i); } } } - // Scan base tags, skipping those already removed (by the overlay) or already flagged. + // Scan base tags. for (idx, base_tag) in base_indexed_iter(&self.base) { - if !is_overlay_removed(&self.overlay, idx) && !base_removals.is_set(idx) && !f(base_tag) { - base_removals.set(idx); + if !is_overlay_removed(&self.overlay, idx) && !f(base_tag) { + base_removals.push(idx); } } } @@ -254,21 +253,21 @@ impl TagSet { /// Applies previously collected removal indices from [`collect_removals`][TagSet::collect_removals]. /// /// `base_removals` contains flattened base indices to mark as removed. - /// `addition_removals` contains indices of additions to remove. - pub(crate) fn apply_removals(&mut self, base_removals: &ContiguousBitSet, addition_removals: &ContiguousBitSet) { - // Apply addition removals in descending order to maintain index validity. + /// `addition_removals` contains indices into the additions overlay to remove (must be sorted ascending). + pub(crate) fn apply_removals(&mut self, base_removals: &[usize], addition_removals: &[usize]) { + // Apply addition removals in reverse order to maintain index validity. if !addition_removals.is_empty() { let overlay = self.ensure_overlay(); - for i in addition_removals.into_iter().rev() { + for &i in addition_removals.iter().rev() { overlay.additions.remove(i); } } - // Apply base removals by setting the corresponding bits in the overlay. + // Apply base removals. if !base_removals.is_empty() { let overlay = self.ensure_overlay(); - for i in base_removals { - overlay.removals.set(i); + for &idx in base_removals { + overlay.removals.set(idx); } } }