diff --git a/Cargo.lock b/Cargo.lock index 4d50783135d..e0d4c0d7733 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -28,11 +28,14 @@ dependencies = [ "colored", "comfy-table", "datadog-protos", + "foldhash 0.2.0", "futures", + "hashbrown 0.16.1", "http", "http-body-util", "hyper", "memory-accounting", + "metrics", "ottl", "papaya", "prometheus-exposition", @@ -48,6 +51,7 @@ dependencies = [ "saluki-health", "saluki-io", "saluki-metadata", + "saluki-metrics", "serde", "serde_json", "serde_yaml", diff --git a/bin/agent-data-plane/Cargo.toml b/bin/agent-data-plane/Cargo.toml index 07ede928551..63e7936dc38 100644 --- a/bin/agent-data-plane/Cargo.toml +++ b/bin/agent-data-plane/Cargo.toml @@ -20,11 +20,14 @@ chrono = { workspace = true } colored = { workspace = true } comfy-table = { workspace = true } datadog-protos = { workspace = true } +foldhash = { workspace = true } futures = { workspace = true } +hashbrown = { workspace = true } http = { workspace = true } http-body-util = { workspace = true } hyper = { workspace = true } memory-accounting = { workspace = true } +metrics = { workspace = true } ottl = { workspace = true } papaya = { workspace = true } prometheus-exposition = { workspace = true } @@ -40,6 +43,7 @@ saluki-error = { workspace = true } saluki-health = { workspace = true } saluki-io = { workspace = true } saluki-metadata = { workspace = true } +saluki-metrics = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } serde_yaml = { workspace = true } @@ -63,3 +67,6 @@ tikv-jemallocator = { workspace = true, features = [ [build-dependencies] chrono = { workspace = true } + +[dev-dependencies] +saluki-metrics = { workspace = true, features = ["test"] } diff --git a/bin/agent-data-plane/src/cli/run.rs b/bin/agent-data-plane/src/cli/run.rs index 9021633a487..6cb2533a4b3 100644 --- a/bin/agent-data-plane/src/cli/run.rs +++ b/bin/agent-data-plane/src/cli/run.rs @@ -40,7 +40,7 @@ use tracing::{error, info, warn}; use crate::{ components::{ apm_onboarding::ApmOnboardingConfiguration, ottl_filter_processor::OttlFilterConfiguration, - ottl_transform_processor::OttlTransformConfiguration, + ottl_transform_processor::OttlTransformConfiguration, tag_filterlist::TagFilterlistConfiguration, }, internal::{create_internal_supervisor, remote_agent::RemoteAgentBootstrap}, }; @@ -463,6 +463,8 @@ async fn add_dsd_pipeline_to_blueprint( let dsd_mapper_config = DogStatsDMapperConfiguration::from_configuration(config)?; let dsd_enrich_config = ChainedConfiguration::default().with_transform_builder("dogstatsd_mapper", dsd_mapper_config); + let dsd_tag_filterlist_config = TagFilterlistConfiguration::from_configuration(config) + .error_context("Failed to configure metric tag filterlist transform.")?; let dsd_agg_config = AggregateConfiguration::from_configuration(config).error_context("Failed to configure aggregate transform.")?; let dd_events_config = DatadogEventsConfiguration::from_configuration(config) @@ -477,6 +479,7 @@ async fn add_dsd_pipeline_to_blueprint( .add_source("dsd_in", dsd_config)? .add_transform("dsd_prefix_filter", dsd_prefix_filter_configuration)? .add_transform("dsd_enrich", dsd_enrich_config)? + .add_transform("dsd_tag_filterlist", dsd_tag_filterlist_config)? .add_transform("dsd_agg", dsd_agg_config)? .add_encoder("dd_events_encode", dd_events_config)? .add_encoder("dd_service_checks_encode", dd_service_checks_config)? @@ -484,7 +487,8 @@ async fn add_dsd_pipeline_to_blueprint( // Metrics. .connect_component("dsd_prefix_filter", ["dsd_in.metrics"])? .connect_component("dsd_enrich", ["dsd_prefix_filter"])? - .connect_component("dsd_agg", ["dsd_enrich"])? + .connect_component("dsd_tag_filterlist", ["dsd_enrich"])? + .connect_component("dsd_agg", ["dsd_tag_filterlist"])? .connect_component("metrics_enrich", ["dsd_agg"])? .connect_component("dd_service_checks_encode", ["dsd_in.service_checks"])? .connect_component("dd_events_encode", ["dsd_in.events"])? diff --git a/bin/agent-data-plane/src/components/mod.rs b/bin/agent-data-plane/src/components/mod.rs index 35fb04977d3..6f9816a6811 100644 --- a/bin/agent-data-plane/src/components/mod.rs +++ b/bin/agent-data-plane/src/components/mod.rs @@ -1,3 +1,4 @@ pub mod apm_onboarding; pub mod ottl_filter_processor; pub mod ottl_transform_processor; +pub mod tag_filterlist; diff --git a/bin/agent-data-plane/src/components/tag_filterlist/mod.rs b/bin/agent-data-plane/src/components/tag_filterlist/mod.rs new file mode 100644 index 00000000000..ff33b4f3faa --- /dev/null +++ b/bin/agent-data-plane/src/components/tag_filterlist/mod.rs @@ -0,0 +1,851 @@ +//! Metric Tag Filterlist synchronous transform. +//! +//! Removes or retains specific tags from distribution metrics based on per-metric configuration. +//! Supports both "exclude" (denylist) and "include" (allowlist) modes. +//! +//! Configuration is read from the `metric_tag_filterlist` key and can be updated at runtime via +//! Remote Config. + +mod telemetry; + +use async_trait::async_trait; +use foldhash::fast::RandomState as FoldHashState; +use hashbrown::{HashMap, HashSet}; +use memory_accounting::{MemoryBounds, MemoryBoundsBuilder}; +use saluki_config::GenericConfiguration; +use saluki_context::tags::{Tag, TagSet}; +use saluki_core::{ + components::{ + transforms::{Transform, TransformBuilder, TransformContext}, + ComponentContext, + }, + data_model::event::{metric::Metric, EventType}, + observability::ComponentMetricsExt, + topology::OutputDefinition, +}; +use saluki_error::GenericError; +use saluki_metrics::MetricsBuilder; +use serde::{de::Deserializer, Deserialize}; +use tokio::select; +use tracing::{debug, warn}; + +use self::telemetry::Telemetry; + +/// Action applied to the configured tag list: keep only listed tags, or remove listed tags. +#[derive(Clone, Copy, Debug, Default, PartialEq)] +pub enum FilterAction { + /// Keep only the tags whose key appears in the configured list. + Include, + /// Remove the tags whose key appears in the configured list. + #[default] + Exclude, +} + +impl<'de> Deserialize<'de> for FilterAction { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let raw = Option::::deserialize(deserializer)?; + + match raw.as_deref() { + Some("include") => Ok(Self::Include), + Some("exclude") | None | Some("") => Ok(Self::Exclude), + Some(other) => { + warn!( + action = %other, + "`metric_tag_filterlist.*.action` should be either `include` or `exclude`; defaulting to `exclude`." + ); + Ok(Self::Exclude) + } + } + } +} + +/// A single metric tag filter entry. +#[derive(Clone, Debug, Deserialize)] +pub struct MetricTagFilterEntry { + /// The exact metric name this entry applies to. + pub metric_name: String, + /// Whether to include or exclude the listed tags. + #[serde(default)] + pub action: FilterAction, + /// Tag key names to include or exclude. + pub tags: Vec, +} + +/// Compiled filter table: metric name → (is_exclude, set of tag key names). +pub type CompiledFilters = HashMap), FoldHashState>; + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +/// Outcome of attempting to apply `metric_tag_filterlist` rules to a metric. +pub enum FilterMetricTagsOutcome { + /// No rule existed for the metric name. + RuleMiss, + /// A rule existed, but applying it did not change any tags. + NoChange, + /// A rule existed and removed one or more tags. + Modified { + /// Total number of instrumented and origin tags removed. + removed_tags: usize, + }, +} + +/// Compile a slice of filter entries into an O(1)-lookup table. +/// +/// Merge rules: +/// - Same metric name + same action → union of tag key sets. +/// - Same metric name + conflicting actions → `exclude` wins. +pub fn compile_filters(entries: &[MetricTagFilterEntry]) -> CompiledFilters { + let mut filters: CompiledFilters = HashMap::with_hasher(FoldHashState::default()); + + for entry in entries { + if entry.metric_name.is_empty() { + continue; + } + + let is_exclude = entry.action == FilterAction::Exclude; + let mut tag_set = HashSet::with_capacity_and_hasher(entry.tags.len(), FoldHashState::default()); + tag_set.extend(entry.tags.iter().cloned()); + + match filters.entry(entry.metric_name.clone()) { + hashbrown::hash_map::Entry::Vacant(e) => { + e.insert((is_exclude, tag_set)); + } + hashbrown::hash_map::Entry::Occupied(mut e) => { + let (existing_is_exclude, existing_tags) = e.get_mut(); + if *existing_is_exclude == is_exclude { + existing_tags.extend(tag_set); + } else if is_exclude { + *existing_is_exclude = true; + *existing_tags = tag_set; + } + } + } + } + + filters +} + +/// Metric Tag Filterlist transform. +/// +/// Removes or retains specific tags from distribution metrics based on per-metric configuration. +/// Configuration is read from `metric_tag_filterlist` and supports runtime updates via Remote Config. +#[derive(Deserialize)] +pub struct TagFilterlistConfiguration { + #[serde(default, rename = "metric_tag_filterlist")] + entries: Vec, + + #[serde(skip)] + configuration: Option, +} + +impl TagFilterlistConfiguration { + /// Creates a new `TagFilterlistConfiguration` from the given configuration. + pub fn from_configuration(config: &GenericConfiguration) -> Result { + let mut typed: Self = config.as_typed()?; + typed.configuration = Some(config.clone()); + Ok(typed) + } +} + +#[async_trait] +impl TransformBuilder for TagFilterlistConfiguration { + fn input_event_type(&self) -> EventType { + EventType::Metric + } + + fn outputs(&self) -> &[OutputDefinition] { + static OUTPUTS: &[OutputDefinition] = &[OutputDefinition::default_output(EventType::Metric)]; + OUTPUTS + } + + async fn build(&self, context: ComponentContext) -> Result, GenericError> { + let metrics_builder = MetricsBuilder::from_component_context(&context); + + Ok(Box::new(TagFilterlist { + filters: compile_filters(&self.entries), + configuration: self + .configuration + .clone() + .expect("configuration must be set via from_configuration"), + telemetry: Telemetry::new(&metrics_builder), + })) + } +} + +impl MemoryBounds for TagFilterlistConfiguration { + fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) { + builder.minimum().with_single_value::("component struct"); + } +} + +struct TagFilterlist { + filters: CompiledFilters, + configuration: GenericConfiguration, + telemetry: Telemetry, +} + +#[async_trait] +impl Transform for TagFilterlist { + async fn run(mut self: Box, mut context: TransformContext) -> Result<(), GenericError> { + let mut health = context.take_health_handle(); + health.mark_ready(); + + let mut watcher = self.configuration.watch_for_updates("metric_tag_filterlist"); + + debug!("Metric Tag Filterlist transform started."); + + loop { + select! { + _ = health.live() => continue, + maybe_events = context.events().next() => match maybe_events { + Some(mut events) => { + for event in &mut events { + if let Some(metric) = event.try_as_metric_mut() { + if metric.values().is_sketch() { + let outcome = filter_metric_tags(metric, &self.filters); + self.telemetry.record(outcome); + } + } + } + if let Err(e) = context.dispatcher().dispatch(events).await { + tracing::error!(error = %e, "Failed to dispatch events."); + } + } + None => break, + }, + (_, new_entries) = watcher.changed::>() => { + self.filters = compile_filters(new_entries.as_deref().unwrap_or(&[])); + debug!("Updated metric tag filterlist."); + }, + } + } + + debug!("Metric Tag Filterlist transform stopped."); + + Ok(()) + } +} + +#[inline] +fn should_keep_tag(tag: &Tag, is_exclude: bool, names: &HashSet) -> bool { + is_exclude != names.contains(tag.as_borrowed().name()) +} + +#[inline] +fn has_removable_tags(tags: &TagSet, is_exclude: bool, names: &HashSet) -> bool { + tags.into_iter().any(|tag| !should_keep_tag(tag, is_exclude, names)) +} + +/// Filter the tags of a distribution metric according to the compiled filter table. +/// +/// Both instrumented tags and origin tags are filtered using the same tag key list. +/// If the metric name is not present in `filters`, the metric is left unchanged. +/// If filtering would not change any tags, the metric context is left untouched (zero allocations). +#[inline] +pub fn filter_metric_tags(metric: &mut Metric, filters: &CompiledFilters) -> FilterMetricTagsOutcome { + let Some((is_exclude, tag_names)) = filters.get(metric.context().name().as_ref()) else { + return FilterMetricTagsOutcome::RuleMiss; + }; + + let is_exclude = *is_exclude; + + let has_tag_removals = has_removable_tags(metric.context().tags(), is_exclude, tag_names); + let has_origin_removals = has_removable_tags(metric.context().origin_tags(), is_exclude, tag_names); + + if !has_tag_removals && !has_origin_removals { + return FilterMetricTagsOutcome::NoChange; + } + + let mut total_removed = 0; + metric.context_mut().with_tag_sets_mut(|tags, origin_tags| { + if has_tag_removals { + let before = tags.len(); + tags.retain(|tag| should_keep_tag(tag, is_exclude, tag_names)); + total_removed += before - tags.len(); + } + if has_origin_removals { + let before = origin_tags.len(); + origin_tags.retain(|tag| should_keep_tag(tag, is_exclude, tag_names)); + total_removed += before - origin_tags.len(); + } + }); + + FilterMetricTagsOutcome::Modified { + removed_tags: total_removed, + } +} + +#[cfg(test)] +mod tests { + use saluki_config::{dynamic::ConfigUpdate, ConfigurationLoader}; + use saluki_context::{tags::Tag, Context}; + use saluki_core::data_model::event::metric::Metric; + use saluki_metrics::{test::TestRecorder, MetricsBuilder}; + use serde_json::json; + + use super::*; + + fn distribution_metric(name: &'static str, tags: &[&'static str]) -> Metric { + let context = Context::from_static_parts(name, tags); + Metric::distribution(context, 1.0) + } + + fn distribution_metric_with_origin_tags( + name: &'static str, tags: &[&'static str], origin_tags: &[&'static str], + ) -> Metric { + let origin_tag_set: TagSet = origin_tags.iter().map(|s| Tag::from(*s)).collect(); + let context = Context::from_static_parts(name, tags).with_origin_tags(origin_tag_set.into_shared()); + Metric::distribution(context, 1.0) + } + + fn counter_metric(name: &'static str, tags: &[&'static str]) -> Metric { + let context = Context::from_static_parts(name, tags); + Metric::counter(context, 1.0) + } + + fn tag_names(metric: &Metric) -> Vec { + let mut names: Vec<_> = metric + .context() + .tags() + .into_iter() + .map(|t| t.as_str().to_owned()) + .collect(); + names.sort(); + names + } + + #[test] + fn exclude_removes_listed_tags() { + let entries = vec![MetricTagFilterEntry { + metric_name: "my.dist".to_string(), + action: FilterAction::Exclude, + tags: vec!["env".to_string(), "host".to_string()], + }]; + let filters = compile_filters(&entries); + + let mut metric = distribution_metric("my.dist", &["env:prod", "service:web", "host:h1"]); + filter_metric_tags(&mut metric, &filters); + + assert_eq!(tag_names(&metric), vec!["service:web"]); + } + + #[test] + fn include_keeps_only_listed_tags() { + let entries = vec![MetricTagFilterEntry { + metric_name: "my.dist".to_string(), + action: FilterAction::Include, + tags: vec!["env".to_string()], + }]; + let filters = compile_filters(&entries); + + let mut metric = distribution_metric("my.dist", &["env:prod", "service:web", "host:h1"]); + filter_metric_tags(&mut metric, &filters); + + assert_eq!(tag_names(&metric), vec!["env:prod"]); + } + + #[test] + fn non_matching_metric_unchanged() { + let entries = vec![MetricTagFilterEntry { + metric_name: "other.dist".to_string(), + action: FilterAction::Exclude, + tags: vec!["env".to_string()], + }]; + let filters = compile_filters(&entries); + + let mut metric = distribution_metric("my.dist", &["env:prod", "service:web"]); + filter_metric_tags(&mut metric, &filters); + + assert_eq!(tag_names(&metric), vec!["env:prod", "service:web"]); + } + + #[test] + fn non_distribution_metric_unchanged() { + let metric = counter_metric("my.counter", &["env:prod", "service:web"]); + assert!(!metric.values().is_sketch(), "counter should not be a sketch"); + assert_eq!(tag_names(&metric), vec!["env:prod", "service:web"]); + } + + #[test] + fn bare_tag_excluded_by_name() { + let entries = vec![MetricTagFilterEntry { + metric_name: "my.dist".to_string(), + action: FilterAction::Exclude, + tags: vec!["production".to_string()], + }]; + let filters = compile_filters(&entries); + + let mut metric = distribution_metric("my.dist", &["production", "service:web"]); + filter_metric_tags(&mut metric, &filters); + + assert_eq!(tag_names(&metric), vec!["service:web"]); + } + + #[test] + fn empty_tag_list_include_removes_all() { + let entries = vec![MetricTagFilterEntry { + metric_name: "my.dist".to_string(), + action: FilterAction::Include, + tags: vec![], + }]; + let filters = compile_filters(&entries); + + let mut metric = distribution_metric("my.dist", &["env:prod", "service:web"]); + filter_metric_tags(&mut metric, &filters); + + assert!(metric.context().tags().is_empty()); + } + + #[test] + fn empty_tag_list_exclude_keeps_all() { + let entries = vec![MetricTagFilterEntry { + metric_name: "my.dist".to_string(), + action: FilterAction::Exclude, + tags: vec![], + }]; + let filters = compile_filters(&entries); + + let mut metric = distribution_metric("my.dist", &["env:prod", "service:web"]); + filter_metric_tags(&mut metric, &filters); + + assert_eq!(tag_names(&metric), vec!["env:prod", "service:web"]); + } + + #[test] + fn merge_same_action_unions_tags() { + let entries = vec![ + MetricTagFilterEntry { + metric_name: "my.dist".to_string(), + action: FilterAction::Exclude, + tags: vec!["env".to_string()], + }, + MetricTagFilterEntry { + metric_name: "my.dist".to_string(), + action: FilterAction::Exclude, + tags: vec!["host".to_string()], + }, + ]; + let filters = compile_filters(&entries); + + let mut metric = distribution_metric("my.dist", &["env:prod", "host:h1", "service:web"]); + filter_metric_tags(&mut metric, &filters); + + assert_eq!(tag_names(&metric), vec!["service:web"]); + } + + #[test] + fn merge_conflicting_actions_exclude_wins() { + let entries = vec![ + MetricTagFilterEntry { + metric_name: "my.dist".to_string(), + action: FilterAction::Include, + tags: vec!["env".to_string()], + }, + MetricTagFilterEntry { + metric_name: "my.dist".to_string(), + action: FilterAction::Exclude, + tags: vec!["host".to_string()], + }, + ]; + let filters = compile_filters(&entries); + + let mut metric = distribution_metric("my.dist", &["env:prod", "host:h1", "service:web"]); + filter_metric_tags(&mut metric, &filters); + + assert_eq!(tag_names(&metric), vec!["env:prod", "service:web"]); + } + + #[test] + fn merge_conflicting_actions_exclude_first_wins() { + let entries = vec![ + MetricTagFilterEntry { + metric_name: "my.dist".to_string(), + action: FilterAction::Exclude, + tags: vec!["host".to_string()], + }, + MetricTagFilterEntry { + metric_name: "my.dist".to_string(), + action: FilterAction::Include, + tags: vec!["env".to_string()], + }, + ]; + let filters = compile_filters(&entries); + + let mut metric = distribution_metric("my.dist", &["env:prod", "host:h1", "service:web"]); + filter_metric_tags(&mut metric, &filters); + + assert_eq!(tag_names(&metric), vec!["env:prod", "service:web"]); + } + + #[test] + fn no_config_is_noop() { + let filters = compile_filters(&[]); + let mut metric = distribution_metric("my.dist", &["env:prod", "service:web"]); + filter_metric_tags(&mut metric, &filters); + assert_eq!(tag_names(&metric), vec!["env:prod", "service:web"]); + } + + #[test] + fn empty_metric_name_is_ignored() { + let entries = vec![ + MetricTagFilterEntry { + metric_name: String::new(), + action: FilterAction::Exclude, + tags: vec!["env".to_string()], + }, + MetricTagFilterEntry { + metric_name: "my.dist".to_string(), + action: FilterAction::Exclude, + tags: vec!["host".to_string()], + }, + ]; + let filters = compile_filters(&entries); + + assert!(!filters.contains_key("")); + assert!(filters.contains_key("my.dist")); + } + + #[test] + fn missing_action_defaults_to_exclude() { + let entry: MetricTagFilterEntry = serde_json::from_value(json!({ + "metric_name": "my.dist", + "tags": ["env"] + })) + .unwrap(); + + assert_eq!(entry.action, FilterAction::Exclude); + } + + #[test] + fn invalid_action_defaults_to_exclude() { + let entry: MetricTagFilterEntry = serde_json::from_value(json!({ + "metric_name": "my.dist", + "action": "invalid", + "tags": ["env"] + })) + .unwrap(); + + assert_eq!(entry.action, FilterAction::Exclude); + } + + #[test] + fn origin_tags_preserved_after_filtering() { + let context = Context::from_static_parts("my.dist", &["env:prod", "host:h1"]); + let tag_set: TagSet = [Tag::from("service:web")].into_iter().collect(); + let new_context = context.with_tags(tag_set.into_shared()); + assert_eq!(new_context.name().as_ref(), "my.dist"); + assert!(new_context.origin_tags().is_empty()); + let names: Vec<_> = new_context.tags().into_iter().map(|t| t.as_str().to_owned()).collect(); + assert_eq!(names, vec!["service:web"]); + } + + fn origin_tag_names(metric: &Metric) -> Vec { + let mut names: Vec<_> = metric + .context() + .origin_tags() + .into_iter() + .map(|t| t.as_str().to_owned()) + .collect(); + names.sort(); + names + } + + #[test] + fn exclude_removes_listed_origin_tags() { + let entries = vec![MetricTagFilterEntry { + metric_name: "my.dist".to_string(), + action: FilterAction::Exclude, + tags: vec!["env".to_string(), "host".to_string()], + }]; + let filters = compile_filters(&entries); + + let mut metric = + distribution_metric_with_origin_tags("my.dist", &["env:prod"], &["env:prod", "host:h1", "service:web"]); + filter_metric_tags(&mut metric, &filters); + + assert_eq!(origin_tag_names(&metric), vec!["service:web"]); + } + + #[test] + fn include_keeps_only_listed_origin_tags() { + let entries = vec![MetricTagFilterEntry { + metric_name: "my.dist".to_string(), + action: FilterAction::Include, + tags: vec!["env".to_string()], + }]; + let filters = compile_filters(&entries); + + let mut metric = + distribution_metric_with_origin_tags("my.dist", &["env:prod"], &["env:prod", "host:h1", "service:web"]); + filter_metric_tags(&mut metric, &filters); + + assert_eq!(origin_tag_names(&metric), vec!["env:prod"]); + } + + #[test] + fn origin_tags_empty_unchanged() { + let entries = vec![MetricTagFilterEntry { + metric_name: "my.dist".to_string(), + action: FilterAction::Exclude, + tags: vec!["env".to_string()], + }]; + let filters = compile_filters(&entries); + + let mut metric = distribution_metric("my.dist", &["env:prod", "service:web"]); + filter_metric_tags(&mut metric, &filters); + + assert_eq!(tag_names(&metric), vec!["service:web"]); + assert!(metric.context().origin_tags().is_empty()); + } + + #[test] + fn filtering_origin_tags_does_not_affect_shared_origin() { + let origin_tag_set: TagSet = ["env:prod", "host:h1", "service:web"] + .iter() + .map(|s| Tag::from(*s)) + .collect(); + let shared_origin = origin_tag_set.into_shared(); + + let ctx1 = Context::from_static_parts("my.dist", &[]).with_origin_tags(shared_origin.clone()); + let ctx2 = Context::from_static_parts("my.dist", &[]).with_origin_tags(shared_origin.clone()); + + let mut metric1 = Metric::distribution(ctx1, 1.0); + let metric2 = Metric::distribution(ctx2, 1.0); + + let entries = vec![MetricTagFilterEntry { + metric_name: "my.dist".to_string(), + action: FilterAction::Exclude, + tags: vec!["env".to_string(), "host".to_string()], + }]; + let filters = compile_filters(&entries); + + filter_metric_tags(&mut metric1, &filters); + + assert_eq!(origin_tag_names(&metric1), vec!["service:web"]); + let metric2_origin: Vec<_> = metric2 + .context() + .origin_tags() + .into_iter() + .map(|t| t.as_str().to_owned()) + .collect(); + assert!( + metric2_origin.contains(&"env:prod".to_owned()), + "shared origin_tags should not be mutated" + ); + assert!(metric2_origin.contains(&"host:h1".to_owned())); + } + + #[test] + fn combined_tags_and_origin_tags_filtering() { + let entries = vec![MetricTagFilterEntry { + metric_name: "my.dist".to_string(), + action: FilterAction::Exclude, + tags: vec!["env".to_string(), "host".to_string()], + }]; + let filters = compile_filters(&entries); + + let mut metric = distribution_metric_with_origin_tags( + "my.dist", + &["env:prod", "service:web", "host:h1"], + &["env:prod", "host:h1", "region:us-east-1"], + ); + filter_metric_tags(&mut metric, &filters); + + assert_eq!(tag_names(&metric), vec!["service:web"]); + assert_eq!(origin_tag_names(&metric), vec!["region:us-east-1"]); + } + + #[test] + fn telemetry_records_hits_misses_and_filtered_tags() { + let recorder = TestRecorder::default(); + let _local = metrics::set_default_local_recorder(&recorder); + + let builder = MetricsBuilder::default(); + let telemetry = Telemetry::new(&builder); + + assert_eq!(recorder.counter("tag_filterlist_rule_hits_total"), Some(0)); + assert_eq!(recorder.counter("tag_filterlist_rule_misses_total"), Some(0)); + assert_eq!(recorder.counter("tag_filterlist_noop_hits_total"), Some(0)); + assert_eq!(recorder.counter("tag_filterlist_metrics_modified_total"), Some(0)); + assert_eq!(recorder.counter("tag_filterlist_tags_filtered_total"), Some(0)); + + telemetry.record(FilterMetricTagsOutcome::RuleMiss); + telemetry.record(FilterMetricTagsOutcome::NoChange); + telemetry.record(FilterMetricTagsOutcome::Modified { removed_tags: 3 }); + + assert_eq!(recorder.counter("tag_filterlist_rule_hits_total"), Some(2)); + assert_eq!(recorder.counter("tag_filterlist_rule_misses_total"), Some(1)); + assert_eq!(recorder.counter("tag_filterlist_noop_hits_total"), Some(1)); + assert_eq!(recorder.counter("tag_filterlist_metrics_modified_total"), Some(1)); + assert_eq!(recorder.counter("tag_filterlist_tags_filtered_total"), Some(3)); + } + + #[tokio::test] + async fn dynamic_update_partial_replaces_filter() { + let (cfg, sender) = ConfigurationLoader::for_tests(Some(serde_json::json!({})), None, true).await; + let sender = sender.expect("sender should exist"); + sender + .send(ConfigUpdate::Snapshot(serde_json::json!({}))) + .await + .unwrap(); + cfg.ready().await; + + let mut watcher = cfg.watch_for_updates("metric_tag_filterlist"); + + sender + .send(ConfigUpdate::Partial { + key: "metric_tag_filterlist".to_string(), + value: serde_json::json!([ + { "metric_name": "my.dist", "action": "exclude", "tags": ["host"] } + ]), + }) + .await + .unwrap(); + + let (_, new_entries) = tokio::time::timeout( + std::time::Duration::from_secs(2), + watcher.changed::>(), + ) + .await + .expect("timed out waiting for metric_tag_filterlist update"); + + let filters = compile_filters(new_entries.as_deref().unwrap_or(&[])); + + let mut metric = distribution_metric("my.dist", &["env:prod", "host:h1", "service:web"]); + filter_metric_tags(&mut metric, &filters); + assert_eq!(tag_names(&metric), vec!["env:prod", "service:web"]); + } + + #[tokio::test] + async fn dynamic_update_to_empty_clears_filter() { + let (cfg, sender) = ConfigurationLoader::for_tests(Some(serde_json::json!({})), None, true).await; + let sender = sender.expect("sender should exist"); + sender + .send(ConfigUpdate::Snapshot(serde_json::json!({}))) + .await + .unwrap(); + cfg.ready().await; + + let mut watcher = cfg.watch_for_updates("metric_tag_filterlist"); + + sender + .send(ConfigUpdate::Partial { + key: "metric_tag_filterlist".to_string(), + value: serde_json::json!([ + { "metric_name": "my.dist", "action": "exclude", "tags": ["env"] } + ]), + }) + .await + .unwrap(); + + let _ = tokio::time::timeout( + std::time::Duration::from_secs(2), + watcher.changed::>(), + ) + .await + .expect("timed out waiting for initial metric_tag_filterlist update"); + + sender + .send(ConfigUpdate::Partial { + key: "metric_tag_filterlist".to_string(), + value: serde_json::json!([]), + }) + .await + .unwrap(); + + let (_, new_entries) = tokio::time::timeout( + std::time::Duration::from_secs(2), + watcher.changed::>(), + ) + .await + .expect("timed out waiting for cleared metric_tag_filterlist update"); + + let filters = compile_filters(new_entries.as_deref().unwrap_or(&[])); + + let mut metric = distribution_metric("my.dist", &["env:prod", "service:web"]); + filter_metric_tags(&mut metric, &filters); + assert_eq!(tag_names(&metric), vec!["env:prod", "service:web"]); + } + + #[tokio::test] + async fn dynamic_update_snapshot_applies_filter() { + let (cfg, sender) = ConfigurationLoader::for_tests(Some(serde_json::json!({})), None, true).await; + let sender = sender.expect("sender should exist"); + sender + .send(ConfigUpdate::Snapshot(serde_json::json!({}))) + .await + .unwrap(); + cfg.ready().await; + + let mut watcher = cfg.watch_for_updates("metric_tag_filterlist"); + + sender + .send(ConfigUpdate::Snapshot(serde_json::json!({ + "metric_tag_filterlist": [ + { "metric_name": "my.dist", "action": "include", "tags": ["service"] } + ] + }))) + .await + .unwrap(); + + let (_, new_entries) = tokio::time::timeout( + std::time::Duration::from_secs(2), + watcher.changed::>(), + ) + .await + .expect("timed out waiting for metric_tag_filterlist update"); + + let filters = compile_filters(new_entries.as_deref().unwrap_or(&[])); + + let mut metric = distribution_metric("my.dist", &["env:prod", "service:web", "host:h1"]); + filter_metric_tags(&mut metric, &filters); + assert_eq!(tag_names(&metric), vec!["service:web"]); + } + + #[test] + fn modified_filter_marks_tagsets_as_modified() { + let entries = vec![MetricTagFilterEntry { + metric_name: "my.dist".to_string(), + action: FilterAction::Exclude, + tags: vec!["host".to_string()], + }]; + let filters = compile_filters(&entries); + + let mut metric = distribution_metric("my.dist", &["env:prod", "host:h1"]); + + let outcome = filter_metric_tags(&mut metric, &filters); + + assert_eq!(outcome, FilterMetricTagsOutcome::Modified { removed_tags: 1 }); + assert_eq!(tag_names(&metric), vec!["env:prod"]); + assert!(metric.context().tags().is_modified()); + } + + #[test] + fn no_change_does_not_mark_tagsets_as_modified() { + let entries = vec![MetricTagFilterEntry { + metric_name: "my.dist".to_string(), + action: FilterAction::Exclude, + tags: vec!["region".to_string()], + }]; + let filters = compile_filters(&entries); + let shared_tags = ["env:prod", "host:h1"] + .into_iter() + .map(Tag::from) + .collect::() + .into_shared(); + let context = Context::from_parts("my.dist", shared_tags); + let mut metric = Metric::distribution(context, 1.0); + + assert!(!metric.context().tags().is_modified()); + assert!(!metric.context().origin_tags().is_modified()); + + let outcome = filter_metric_tags(&mut metric, &filters); + + assert_eq!(outcome, FilterMetricTagsOutcome::NoChange); + assert_eq!(tag_names(&metric), vec!["env:prod", "host:h1"]); + assert!(!metric.context().tags().is_modified()); + assert!(!metric.context().origin_tags().is_modified()); + } +} diff --git a/bin/agent-data-plane/src/components/tag_filterlist/telemetry.rs b/bin/agent-data-plane/src/components/tag_filterlist/telemetry.rs new file mode 100644 index 00000000000..97f88dc2515 --- /dev/null +++ b/bin/agent-data-plane/src/components/tag_filterlist/telemetry.rs @@ -0,0 +1,42 @@ +use metrics::Counter; +use saluki_metrics::MetricsBuilder; + +use super::FilterMetricTagsOutcome; + +#[derive(Clone)] +pub struct Telemetry { + rule_hits: Counter, + rule_misses: Counter, + noop_hits: Counter, + metrics_modified: Counter, + tags_filtered: Counter, +} + +impl Telemetry { + pub fn new(builder: &MetricsBuilder) -> Self { + Self { + rule_hits: builder.register_debug_counter("tag_filterlist_rule_hits_total"), + rule_misses: builder.register_debug_counter("tag_filterlist_rule_misses_total"), + noop_hits: builder.register_debug_counter("tag_filterlist_noop_hits_total"), + metrics_modified: builder.register_debug_counter("tag_filterlist_metrics_modified_total"), + tags_filtered: builder.register_debug_counter("tag_filterlist_tags_filtered_total"), + } + } + + pub fn record(&self, outcome: FilterMetricTagsOutcome) { + match outcome { + FilterMetricTagsOutcome::RuleMiss => { + self.rule_misses.increment(1); + } + FilterMetricTagsOutcome::NoChange => { + self.rule_hits.increment(1); + self.noop_hits.increment(1); + } + FilterMetricTagsOutcome::Modified { removed_tags } => { + self.rule_hits.increment(1); + self.metrics_modified.increment(1); + self.tags_filtered.increment(removed_tags as u64); + } + } + } +} diff --git a/lib/saluki-context/src/context.rs b/lib/saluki-context/src/context.rs index f9a0e2249eb..31877543b41 100644 --- a/lib/saluki-context/src/context.rs +++ b/lib/saluki-context/src/context.rs @@ -44,7 +44,7 @@ impl Context { let origin_tags = TagSet::default(); - let (key, _) = hash_context(name, tags, &origin_tags); + let (key, _) = hash_context(name, &tag_set, &origin_tags); Self { inner: Arc::new(ContextInner { name: MetaString::from_static(name), @@ -92,6 +92,67 @@ impl Context { } } + /// Clones this context, and uses the given tags for the cloned context. + /// + /// The name and origin tags of this context are preserved. + pub fn with_tags(&self, tags: impl Into) -> Self { + let name = self.inner.name.clone(); + let tags = tags.into(); + let origin_tags = self.inner.origin_tags.clone(); + let (key, _) = hash_context(&name, &tags, &origin_tags); + + Self { + inner: Arc::new(ContextInner { + name, + tags, + origin_tags, + key, + active_count: Gauge::noop(), + }), + } + } + + /// Clones this context, and uses the given origin tags for the cloned context. + /// + /// The name and instrumented tags of this context are preserved. + pub fn with_origin_tags(&self, origin_tags: impl Into) -> Self { + let name = self.inner.name.clone(); + let tags = self.inner.tags.clone(); + let origin_tags = origin_tags.into(); + let (key, _) = hash_context(&name, &tags, &origin_tags); + + Self { + inner: Arc::new(ContextInner { + name, + tags, + origin_tags, + key, + active_count: Gauge::noop(), + }), + } + } + + /// Clones this context, replacing both instrumented tags and origin tags in a single allocation. + /// + /// Preferred over two separate `with_tags` / `with_origin_tags` calls when both sets need to + /// be replaced, as it halves the number of `Arc` allocations. + pub fn with_tags_and_origin_tags(&self, tags: impl Into, origin_tags: impl Into) -> Self { + let name = self.inner.name.clone(); + let tags = tags.into(); + let origin_tags = origin_tags.into(); + let (key, _) = hash_context(&name, &tags, &origin_tags); + + Self { + inner: Arc::new(ContextInner { + name, + tags, + origin_tags, + key, + active_count: Gauge::noop(), + }), + } + } + pub(crate) fn from_inner(inner: ContextInner) -> Self { Self { inner: Arc::new(inner) } } @@ -138,6 +199,17 @@ impl Context { self.mutate_inner(|inner| f(&mut inner.origin_tags)); } + /// Mutates both instrumented tags and origin tags via a single closure. + /// + /// Uses copy-on-write semantics: if this context shares its inner data with other clones, the + /// inner data is cloned first so that mutations do not affect other holders. If this context is + /// the sole owner, the mutation happens in place. + /// + /// The context key is recomputed once after the closure returns. + pub fn with_tag_sets_mut(&mut self, f: impl FnOnce(&mut TagSet, &mut TagSet)) { + self.mutate_inner(|inner| f(&mut inner.tags, &mut inner.origin_tags)); + } + /// Runs the given closure on the inner context data, recomputing the context key afterwards. /// /// When the inner context state is shared (we aren't the only ones with a strong reference), we clone the inner