diff --git a/Cargo.lock b/Cargo.lock index 78aab11c87a..a626d73c49d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3534,6 +3534,7 @@ dependencies = [ "serde", "serde_with", "sha3", + "smallvec", "stringtheory", "tokio", "tokio-test", @@ -3623,6 +3624,7 @@ dependencies = [ name = "saluki-context" version = "0.1.0" dependencies = [ + "criterion", "indexmap 2.13.0", "memchr", "metrics", diff --git a/bin/agent-data-plane/src/components/ottl_filter_processor/span_context.rs b/bin/agent-data-plane/src/components/ottl_filter_processor/span_context.rs index 65bf1a58030..f1b35288eb2 100644 --- a/bin/agent-data-plane/src/components/ottl_filter_processor/span_context.rs +++ b/bin/agent-data-plane/src/components/ottl_filter_processor/span_context.rs @@ -11,7 +11,7 @@ use std::collections::HashMap; use std::sync::Arc; use ottl::{EvalContextFamily, IndexExpr, PathAccessor, PathResolverMap, Value}; -use saluki_context::tags::SharedTagSet; +use saluki_context::tags::TagSet; use saluki_core::data_model::event::trace::Span; /// Family type for the span filter evaluation context. @@ -34,13 +34,13 @@ pub struct SpanFilterContext<'a> { /// Reference to the span being evaluated. pub(super) span: &'a Span, /// Reference to the trace's resource-level tags. - pub(super) resource_tags: &'a SharedTagSet, + pub(super) resource_tags: &'a TagSet, } impl<'a> SpanFilterContext<'a> { /// Creates a context from references to the current span and resource tags. #[inline] - pub fn new(span: &'a Span, resource_tags: &'a SharedTagSet) -> Self { + pub fn new(span: &'a Span, resource_tags: &'a TagSet) -> Self { Self { span, resource_tags } } } diff --git a/bin/agent-data-plane/src/components/ottl_transform_processor/mod.rs b/bin/agent-data-plane/src/components/ottl_transform_processor/mod.rs index 3608b2ce0c7..84e8f7d0b6c 100644 --- a/bin/agent-data-plane/src/components/ottl_transform_processor/mod.rs +++ b/bin/agent-data-plane/src/components/ottl_transform_processor/mod.rs @@ -11,7 +11,7 @@ use async_trait::async_trait; use memory_accounting::{MemoryBounds, MemoryBoundsBuilder}; use ottl::{CallbackMap, EnumMap, OttlParser}; use saluki_config::GenericConfiguration; -use saluki_context::tags::SharedTagSet; +use saluki_context::tags::TagSet; use saluki_core::{ components::{transforms::*, ComponentContext}, data_model::event::trace::Span, @@ -107,7 +107,7 @@ impl OttlTransform { /// Each statement is executed in order. For editor statements (e.g. `set`), the `where` /// clause is evaluated first; if it matches (or is absent), the editor function runs. /// Errors are handled according to `error_mode`. - fn transform_span(&self, span: &mut Span, resource_tags: &SharedTagSet) { + fn transform_span(&self, span: &mut Span, resource_tags: &TagSet) { let mut ctx = SpanTransformContext::new(span, resource_tags); for parser in &self.span_parsers { diff --git a/bin/agent-data-plane/src/components/ottl_transform_processor/span_context.rs b/bin/agent-data-plane/src/components/ottl_transform_processor/span_context.rs index d792af85e98..54685e4543d 100644 --- a/bin/agent-data-plane/src/components/ottl_transform_processor/span_context.rs +++ b/bin/agent-data-plane/src/components/ottl_transform_processor/span_context.rs @@ -9,13 +9,13 @@ //! immutable reference to the resource tags. //! //! `attributes` supports both read and write via [`SpanAttributesAccessor`]. -//! `resource.attributes` is read-only because [`SharedTagSet`] does not expose mutable access. +//! `resource.attributes` is read-only because [`TagSet`] does not expose mutable access. use std::collections::HashMap; use std::sync::Arc; use ottl::{EvalContextFamily, IndexExpr, PathAccessor, PathResolverMap, Value}; -use saluki_context::tags::SharedTagSet; +use saluki_context::tags::TagSet; use saluki_core::data_model::event::trace::Span; use stringtheory::MetaString; @@ -40,13 +40,13 @@ pub struct SpanTransformContext<'a> { /// Mutable reference to the span being transformed. pub(super) span: &'a mut Span, /// Reference to the trace's resource-level tags (read-only). - pub(super) resource_tags: &'a SharedTagSet, + pub(super) resource_tags: &'a TagSet, } impl<'a> SpanTransformContext<'a> { /// Creates a context from a mutable span reference and immutable resource tags. #[inline] - pub fn new(span: &'a mut Span, resource_tags: &'a SharedTagSet) -> Self { + pub fn new(span: &'a mut Span, resource_tags: &'a TagSet) -> Self { Self { span, resource_tags } } } @@ -117,7 +117,7 @@ impl PathAccessor for SpanAttributesAccessor { /// Path accessor for `resource.attributes` (trace resource tags). /// -/// Read-only: [`SharedTagSet`] does not expose mutable access, so `set` returns an error. +/// Read-only: [`TagSet`] does not expose mutable access, so `set` returns an error. #[derive(Debug)] pub struct ResourceAttributesAccessor; @@ -137,15 +137,14 @@ impl PathAccessor for ResourceAttributesAccessor { Ok(value) } - /// AZH: TODO - /// Always returns an error: `SharedTagSet` is an Arc-based immutable type and `Trace` + /// Always returns an error: `TagSet` is an Arc-based immutable type and `Trace` /// does not expose yet mutable way to access resource_tags, so there is no way to write changes /// back to the trace's resource tags. fn set<'a>( &self, _ctx: &mut SpanTransformContext<'a>, path: &str, _indexes: &[IndexExpr], _value: &Value, ) -> ottl::Result<()> { Err(format!( - "resource.attributes is read-only; setting path `{}` is not supported because SharedTagSet does not expose mutable access", + "resource.attributes is read-only; setting path `{}` is not supported because TagSet does not expose mutable access", path ) .into()) diff --git a/lib/saluki-common/Cargo.toml b/lib/saluki-common/Cargo.toml index ccbcd6be814..4b37ad2514f 100644 --- a/lib/saluki-common/Cargo.toml +++ b/lib/saluki-common/Cargo.toml @@ -25,6 +25,7 @@ saluki-metrics = { workspace = true } serde = { workspace = true } serde_with = { workspace = true } sha3 = { workspace = true } +smallvec = { workspace = true } stringtheory = { workspace = true } tokio = { workspace = true, features = ["rt", "io-util", "macros", "rt-multi-thread"] } tracing = { workspace = true } diff --git a/lib/saluki-common/src/collections/bitset.rs b/lib/saluki-common/src/collections/bitset.rs new file mode 100644 index 00000000000..2b13c9d6163 --- /dev/null +++ b/lib/saluki-common/src/collections/bitset.rs @@ -0,0 +1,147 @@ +use smallvec::SmallVec; + +/// A dense, contiguous bitset. +/// +/// `ContiguousBitSet` is designed for tracking set membership of dense, contiguous values, such as the indexes of +/// values in a vector. It is able to hold up to 128 values (indices 0–127) inline with no heap allocation. It is _not_ +/// suitable for sparse values (values that are far apart in value) as the size of the underlying storage will tied to +/// the largest value in the set: roughly `((max_value / 64) + 1) * 8` bytes. +/// +/// All operations are O(1) with the exception of `set` when setting a bit that extends beyond the current capacity, +/// which will require a heap allocation. +#[derive(Clone, Debug, Default)] +pub struct ContiguousBitSet { + words: SmallVec<[u64; 2]>, +} + +impl ContiguousBitSet { + /// Creates a new, empty bitset with no bits set. + pub fn new() -> Self { + Self::default() + } + + /// Returns the number of bits that are set. + pub fn len(&self) -> usize { + self.words.iter().map(|w| w.count_ones() as usize).sum() + } + + /// Returns `false` if no bits are set. + pub fn is_empty(&self) -> bool { + self.words.iter().all(|w| *w == 0) + } + + /// Sets the bit at `index`. + pub fn set(&mut self, index: usize) { + let word = index / 64; + let bit = index % 64; + + // Grow if needed. + while self.words.len() <= word { + self.words.push(0); + } + + self.words[word] |= 1 << bit; + } + + /// Clears the bit at `index`. + /// + /// If `index` is beyond the current capacity, this is a no-op. + pub fn clear(&mut self, index: usize) { + let word = index / 64; + let bit = index % 64; + + if let Some(w) = self.words.get_mut(word) { + *w &= !(1 << bit); + } + } + + /// Returns `true` if the bit at `index` is set. + pub fn is_set(&self, index: usize) -> bool { + let word = index / 64; + let bit = index % 64; + self.words.get(word).is_some_and(|w| w & (1 << bit) != 0) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn empty_bitset() { + let bs = ContiguousBitSet::new(); + assert!(bs.is_empty()); + assert!(!bs.is_set(0)); + assert!(!bs.is_set(127)); + assert_eq!(bs.len(), 0); + } + + #[test] + fn set_and_check() { + let mut bs = ContiguousBitSet::new(); + bs.set(0); + bs.set(63); + bs.set(64); + bs.set(127); + + assert!(bs.is_set(0)); + assert!(bs.is_set(63)); + assert!(bs.is_set(64)); + assert!(bs.is_set(127)); + assert!(!bs.is_set(1)); + assert!(!bs.is_set(65)); + assert!(!bs.is_empty()); + assert_eq!(bs.len(), 4); + } + + #[test] + fn clear_bit() { + let mut bs = ContiguousBitSet::new(); + bs.set(10); + assert!(bs.is_set(10)); + + bs.clear(10); + assert!(!bs.is_set(10)); + assert!(bs.is_empty()); + } + + #[test] + fn clear_beyond_capacity_is_noop() { + let mut bs = ContiguousBitSet::new(); + bs.clear(999); // Should not panic or allocate. + assert!(bs.is_empty()); + } + + #[test] + fn grows_beyond_inline() { + let mut bs = ContiguousBitSet::new(); + bs.set(200); + + assert!(bs.is_set(200)); + assert!(!bs.is_set(199)); + assert!(!bs.is_empty()); + assert_eq!(bs.len(), 1); + } + + #[test] + fn len_across_words() { + let mut bs = ContiguousBitSet::new(); + for i in (0..192).step_by(3) { + bs.set(i); + } + assert_eq!(bs.len(), 64); + } + + #[test] + fn clone_independence() { + let mut bs = ContiguousBitSet::new(); + bs.set(5); + let mut cloned = bs.clone(); + cloned.set(10); + + assert!(bs.is_set(5)); + assert!(!bs.is_set(10)); + assert!(cloned.is_set(5)); + assert!(cloned.is_set(10)); + } +} diff --git a/lib/saluki-common/src/collections.rs b/lib/saluki-common/src/collections/mod.rs similarity index 97% rename from lib/saluki-common/src/collections.rs rename to lib/saluki-common/src/collections/mod.rs index 7c1b9ddf7fc..b3b94eb9b08 100644 --- a/lib/saluki-common/src/collections.rs +++ b/lib/saluki-common/src/collections/mod.rs @@ -1,5 +1,8 @@ use crate::hash::{FastBuildHasher, NoopU64BuildHasher}; +mod bitset; +pub use self::bitset::ContiguousBitSet; + /// A hash set based on the standard library's ([`HashSet`][std::collections::HashSet]) using [`FastHasher`][crate::hash::FastHasher]. pub type FastHashSet = std::collections::HashSet; diff --git a/lib/saluki-components/src/common/otlp/util.rs b/lib/saluki-components/src/common/otlp/util.rs index b9e8917b1e3..11523bdd294 100644 --- a/lib/saluki-components/src/common/otlp/util.rs +++ b/lib/saluki-components/src/common/otlp/util.rs @@ -7,7 +7,7 @@ use std::sync::LazyLock; use opentelemetry_semantic_conventions::resource::*; use otlp_protos::opentelemetry::proto::common::v1::{self as otlp_common, any_value::Value}; use saluki_common::collections::{FastHashMap, FastHashSet}; -use saluki_context::tags::{SharedTagSet, TagSet}; +use saluki_context::tags::TagSet; // ============================================================================ // Datadog attribute key constants shared across the encoder and translator @@ -144,9 +144,9 @@ pub fn extract_container_tags_from_resource_attributes(attributes: &[otlp_common /// Extracts container tags from a resource tagset and inserts them into the provided TagSet. /// -/// This mirrors `extract_container_tags_from_resource_attributes`, but operates on a `SharedTagSet` representation of +/// This mirrors `extract_container_tags_from_resource_attributes`, but operates on a `TagSet` representation of /// the resource. -pub fn extract_container_tags_from_resource_tagset(resource_tags: &SharedTagSet, tags: &mut TagSet) { +pub fn extract_container_tags_from_resource_tagset(resource_tags: &TagSet, tags: &mut TagSet) { let mut extracted_tags = FastHashSet::default(); for tag in resource_tags { @@ -208,10 +208,10 @@ pub fn resource_to_source(resource: &otlp_protos::opentelemetry::proto::resource None } -/// Resolves the source metadata from a resource `SharedTagSet`. +/// Resolves the source metadata from a resource `TagSet`. /// /// This is equivalent to `resource_to_source`, but avoids the OTLP protobuf resource type. -pub fn tags_to_source(resource_tags: &SharedTagSet) -> Option { +pub fn tags_to_source(resource_tags: &TagSet) -> Option { let get = |key: &str| -> Option<&str> { resource_tags.get_single_tag(key).and_then(|t| t.value()) }; // AWS ECS Fargate diff --git a/lib/saluki-components/src/destinations/dsd_stats/mod.rs b/lib/saluki-components/src/destinations/dsd_stats/mod.rs index 5fcd35de3c5..1d174a1ccc6 100644 --- a/lib/saluki-components/src/destinations/dsd_stats/mod.rs +++ b/lib/saluki-components/src/destinations/dsd_stats/mod.rs @@ -9,7 +9,7 @@ use saluki_api::{ APIHandler, StatusCode, }; use saluki_common::time::get_coarse_unix_timestamp; -use saluki_context::tags::SharedTagSet; +use saluki_context::tags::TagSet; use saluki_core::{ components::{ destinations::{Destination, DestinationBuilder, DestinationContext}, @@ -195,7 +195,7 @@ impl Destination for DogStatsDStats { #[derive(Eq, Hash, PartialEq, Serialize)] struct ContextNoOrigin { name: MetaString, - tags: SharedTagSet, + tags: TagSet, } #[derive(Deserialize)] struct StatsQueryParams { diff --git a/lib/saluki-components/src/encoders/datadog/traces/mod.rs b/lib/saluki-components/src/encoders/datadog/traces/mod.rs index 0ac307279cd..0abde3977c5 100644 --- a/lib/saluki-components/src/encoders/datadog/traces/mod.rs +++ b/lib/saluki-components/src/encoders/datadog/traces/mod.rs @@ -16,7 +16,7 @@ use piecemeal::{ScratchBuffer, ScratchWriter}; use saluki_common::strings::StringBuilder; use saluki_common::task::HandleExt as _; use saluki_config::GenericConfiguration; -use saluki_context::tags::{SharedTagSet, TagSet}; +use saluki_context::tags::TagSet; use saluki_core::data_model::event::trace::{AttributeScalarValue, AttributeValue, Span as DdSpan}; use saluki_core::topology::{EventsBuffer, PayloadsBuffer}; use saluki_core::{ @@ -688,12 +688,12 @@ fn encode_attribute_array_value( Ok(()) } -fn get_resource_tag_value<'a>(resource_tags: &'a SharedTagSet, key: &str) -> Option<&'a str> { +fn get_resource_tag_value<'a>(resource_tags: &'a TagSet, key: &str) -> Option<&'a str> { resource_tags.get_single_tag(key).and_then(|t| t.value()) } fn resolve_hostname<'a>( - resource_tags: &'a SharedTagSet, source: Option<&'a OtlpSource>, default_hostname: Option<&'a str>, + resource_tags: &'a TagSet, source: Option<&'a OtlpSource>, default_hostname: Option<&'a str>, ignore_missing_fields: bool, ) -> Option<&'a str> { let mut hostname = match source { @@ -715,7 +715,7 @@ fn resolve_hostname<'a>( hostname } -fn resolve_env(resource_tags: &SharedTagSet, ignore_missing_fields: bool) -> Option<&str> { +fn resolve_env(resource_tags: &TagSet, ignore_missing_fields: bool) -> Option<&str> { if let Some(value) = get_resource_tag_value(resource_tags, KEY_DATADOG_ENVIRONMENT) { return Some(value); } @@ -728,7 +728,7 @@ fn resolve_env(resource_tags: &SharedTagSet, ignore_missing_fields: bool) -> Opt get_resource_tag_value(resource_tags, DEPLOYMENT_ENVIRONMENT_KEY) } -fn resolve_container_id<'a>(resource_tags: &'a SharedTagSet, first_span: Option<&'a DdSpan>) -> Option<&'a str> { +fn resolve_container_id<'a>(resource_tags: &'a TagSet, first_span: Option<&'a DdSpan>) -> Option<&'a str> { for key in [KEY_DATADOG_CONTAINER_ID, CONTAINER_ID, K8S_POD_UID] { if let Some(value) = get_resource_tag_value(resource_tags, key) { return Some(value); @@ -746,7 +746,7 @@ fn resolve_container_id<'a>(resource_tags: &'a SharedTagSet, first_span: Option< None } -fn resolve_app_version(resource_tags: &SharedTagSet) -> Option<&str> { +fn resolve_app_version(resource_tags: &TagSet) -> Option<&str> { if let Some(value) = get_resource_tag_value(resource_tags, KEY_DATADOG_VERSION) { return Some(value); } @@ -754,7 +754,7 @@ fn resolve_app_version(resource_tags: &SharedTagSet) -> Option<&str> { } fn resolve_container_tags( - resource_tags: &SharedTagSet, source: Option<&OtlpSource>, ignore_missing_fields: bool, + resource_tags: &TagSet, source: Option<&OtlpSource>, ignore_missing_fields: bool, ) -> Option { // TODO: some refactoring is probably needed to normalize this function, the tags should already be normalized // since we do so when we transform OTLP spans to DD spans however to make this class extensible for non otlp traces, we would diff --git a/lib/saluki-components/src/sources/otlp/logs/transform.rs b/lib/saluki-components/src/sources/otlp/logs/transform.rs index b4c3e24908b..492f4ec904b 100644 --- a/lib/saluki-components/src/sources/otlp/logs/transform.rs +++ b/lib/saluki-components/src/sources/otlp/logs/transform.rs @@ -6,7 +6,7 @@ use otlp_common::any_value::Value::{ArrayValue, BoolValue, BytesValue, DoubleVal use otlp_protos::opentelemetry::proto::common::v1 as otlp_common; use otlp_protos::opentelemetry::proto::logs::v1::LogRecord; use otlp_protos::opentelemetry::proto::resource::v1::Resource; -use saluki_context::tags::{SharedTagSet, TagSet}; +use saluki_context::tags::TagSet; use saluki_core::data_model::event::log::{Log, LogStatus}; use serde_json::Value as JsonValue; use stringtheory::MetaString; @@ -196,7 +196,7 @@ fn safe_insert(map: &mut HashMap, key: &str, value: JsonV pub fn transform_log_record( mut lr: LogRecord, resource: &Resource, scope: Option<&otlp_common::InstrumentationScope>, - record_host: Option, record_service: Option, mut tags: SharedTagSet, + record_host: Option, record_service: Option, mut tags: TagSet, ) -> Log { // Build additional properties map with resource, scope and record attributes let mut additional_properties = HashMap::new(); @@ -265,14 +265,12 @@ pub fn transform_log_record( k if k == DDTAGS_ATTR => { if let Some(av) = kv.value.as_ref() { if let Some(OtlpStringValue(s)) = av.value.as_ref() { - let mut extra = TagSet::default(); for raw in s.split(',') { let t = raw.trim(); if !t.is_empty() { - extra.insert_tag(t); + tags.insert_tag(t); } } - tags.extend_from_shared(&extra.into_shared()); } } } diff --git a/lib/saluki-components/src/sources/otlp/logs/translator.rs b/lib/saluki-components/src/sources/otlp/logs/translator.rs index 0bc40e6633c..be31459cb77 100644 --- a/lib/saluki-components/src/sources/otlp/logs/translator.rs +++ b/lib/saluki-components/src/sources/otlp/logs/translator.rs @@ -5,7 +5,7 @@ use otlp_protos::opentelemetry::proto::common::v1::InstrumentationScope; use otlp_protos::opentelemetry::proto::logs::v1::{LogRecord, ResourceLogs as OtlpResourceLogs, ScopeLogs}; use otlp_protos::opentelemetry::proto::resource::v1::Resource; use saluki_context::origin::OriginTagsResolver; -use saluki_context::tags::{SharedTagSet, Tag}; +use saluki_context::tags::{Tag, TagSet}; use saluki_core::data_model::event::Event; use stringtheory::MetaString; @@ -22,7 +22,7 @@ pub struct OtlpLogsTranslator { resource: Resource, host: Option, service: Option, - attribute_tags: SharedTagSet, + attribute_tags: TagSet, scope_logs: IntoIter, current_scope_logs: Option<(Option, IntoIter)>, } @@ -43,18 +43,17 @@ impl OtlpLogsTranslator { let service = get_string_attribute(&resource.attributes, SERVICE_NAME).map(MetaString::from); let mut attribute_tags = tags_from_attributes(&resource.attributes); attribute_tags.insert_tag(OTEL_SOURCE_TAG.clone()); - let mut shared_attribute_tags = attribute_tags.into_shared(); let origin = raw_origin_from_attributes(&resource.attributes); if let Some(resolver) = origin_tag_resolver { let origin_tags = resolver.resolve_origin_tags(origin); - shared_attribute_tags.extend_from_shared(&origin_tags); + attribute_tags.merge_shared(&origin_tags); } Self { resource, host, service, - attribute_tags: shared_attribute_tags, + attribute_tags, scope_logs: resource_logs.scope_logs.into_iter(), current_scope_logs: None, } diff --git a/lib/saluki-components/src/transforms/apm_stats/mod.rs b/lib/saluki-components/src/transforms/apm_stats/mod.rs index a647ad2b533..35cfde8674b 100644 --- a/lib/saluki-components/src/transforms/apm_stats/mod.rs +++ b/lib/saluki-components/src/transforms/apm_stats/mod.rs @@ -11,10 +11,7 @@ use async_trait::async_trait; use memory_accounting::{MemoryBounds, MemoryBoundsBuilder}; use opentelemetry_semantic_conventions::resource::{CONTAINER_ID, K8S_POD_UID}; use saluki_config::GenericConfiguration; -use saluki_context::{ - origin::OriginTagCardinality, - tags::{SharedTagSet, TagSet}, -}; +use saluki_context::{origin::OriginTagCardinality, tags::TagSet}; use saluki_core::{ components::{transforms::*, ComponentContext}, data_model::event::{ @@ -184,7 +181,7 @@ impl ApmStats { let resource_tags = trace.resource_tags(); let container_id = resolve_container_id(resource_tags); let mut container_tags = if container_id.is_empty() { - SharedTagSet::default() + TagSet::default() } else { extract_container_tags(resource_tags) }; @@ -194,7 +191,7 @@ impl ApmStats { if let Some(workload_provider) = &self.workload_provider { let entity_id = EntityId::Container(container_id.clone()); if let Some(tags) = workload_provider.get_tags_for_entity(&entity_id, OriginTagCardinality::Low) { - container_tags.extend_from_shared(&tags); + container_tags.merge_shared(&tags); } } } @@ -425,7 +422,7 @@ fn now_nanos() -> u64 { } /// Resolves container ID from OTLP resource tags. -fn resolve_container_id(resource_tags: &SharedTagSet) -> MetaString { +fn resolve_container_id(resource_tags: &TagSet) -> MetaString { for key in [KEY_DATADOG_CONTAINER_ID, CONTAINER_ID, K8S_POD_UID] { if let Some(tag) = resource_tags.get_single_tag(key) { if let Some(value) = tag.value() { @@ -440,11 +437,11 @@ fn resolve_container_id(resource_tags: &SharedTagSet) -> MetaString { } /// Extracts container tags from OTLP resource tags. -fn extract_container_tags(resource_tags: &SharedTagSet) -> SharedTagSet { +fn extract_container_tags(resource_tags: &TagSet) -> TagSet { let mut container_tags_set = TagSet::default(); extract_container_tags_from_resource_tagset(resource_tags, &mut container_tags_set); - container_tags_set.into_shared() + container_tags_set } /// Extracts process tags from trace. diff --git a/lib/saluki-components/src/transforms/apm_stats/span_concentrator.rs b/lib/saluki-components/src/transforms/apm_stats/span_concentrator.rs index 9bb4274a5e4..6eb81170154 100644 --- a/lib/saluki-components/src/transforms/apm_stats/span_concentrator.rs +++ b/lib/saluki-components/src/transforms/apm_stats/span_concentrator.rs @@ -1,7 +1,7 @@ //! Span concentrator for APM stats computation. use saluki_common::collections::FastHashMap; -use saluki_context::tags::SharedTagSet; +use saluki_context::tags::TagSet; use saluki_core::data_model::event::trace::Span; use saluki_core::data_model::event::trace_stats::{ClientStatsBucket, ClientStatsPayload}; use stringtheory::MetaString; @@ -70,7 +70,7 @@ pub struct InfraTags { /// Container ID from the tracer payload. pub container_id: MetaString, /// Container tags resolved from the container runtime. - pub container_tags: SharedTagSet, + pub container_tags: TagSet, /// Hash of the process tags string. pub process_tags_hash: u64, /// Process tags string from the tracer payload. @@ -79,7 +79,7 @@ pub struct InfraTags { impl InfraTags { pub fn new( - container_id: impl Into, container_tags: SharedTagSet, process_tags: impl Into, + container_id: impl Into, container_tags: TagSet, process_tags: impl Into, ) -> Self { let process_tags: MetaString = process_tags.into(); let process_tags_hash = process_tags_hash(process_tags.as_ref()); @@ -173,7 +173,7 @@ impl SpanConcentrator { pub fn flush(&mut self, now: u64, force: bool) -> Vec { let mut m = FastHashMap::>::default(); - let mut container_tags_by_id = FastHashMap::::default(); + let mut container_tags_by_id = FastHashMap::::default(); let mut process_tags_by_hash = FastHashMap::::default(); let timestamps: Vec = self.buckets.keys().copied().collect(); diff --git a/lib/saluki-components/src/transforms/apm_stats/statsraw.rs b/lib/saluki-components/src/transforms/apm_stats/statsraw.rs index 3ae7a51033c..258cb1d1bdb 100644 --- a/lib/saluki-components/src/transforms/apm_stats/statsraw.rs +++ b/lib/saluki-components/src/transforms/apm_stats/statsraw.rs @@ -6,7 +6,7 @@ use ddsketch::canonical::PositiveOnlyDDSketch; use protobuf::Message; use rand::Rng as _; use saluki_common::collections::{FastHashMap, PrehashedHashMap}; -use saluki_context::tags::SharedTagSet; +use saluki_context::tags::TagSet; use saluki_core::data_model::event::trace_stats::{ClientGroupedStats, ClientStatsBucket}; use stringtheory::MetaString; use tracing::error; @@ -97,7 +97,7 @@ pub struct RawBucket { /// The full aggregation keys are stored in the shared `AggregationRegistry`. data: FastHashMap, /// Map of container ID to container tags. - pub(super) container_tags_by_id: FastHashMap, + pub(super) container_tags_by_id: FastHashMap, /// Map of process tags hash to process tags. pub(super) process_tags_by_hash: PrehashedHashMap, } @@ -119,13 +119,13 @@ impl RawBucket { } /// Store container tags for later export. - pub fn set_container_tags(&mut self, container_id: MetaString, tags: impl Into) { + pub fn set_container_tags(&mut self, container_id: MetaString, tags: impl Into) { self.container_tags_by_id.insert(container_id, tags.into()); } /// Get container tags by container ID. #[allow(unused)] - pub fn get_container_tags(&self, container_id: &MetaString) -> Option<&SharedTagSet> { + pub fn get_container_tags(&self, container_id: &MetaString) -> Option<&TagSet> { self.container_tags_by_id.get(container_id) } @@ -217,7 +217,7 @@ impl RawBucket { pub struct ExportedBucket { pub data: FastHashMap, - pub container_tags_by_id: FastHashMap, + pub container_tags_by_id: FastHashMap, pub process_tags_by_hash: PrehashedHashMap, } diff --git a/lib/saluki-components/src/transforms/host_tags/mod.rs b/lib/saluki-components/src/transforms/host_tags/mod.rs index 188a56556cc..6c6244091e6 100644 --- a/lib/saluki-components/src/transforms/host_tags/mod.rs +++ b/lib/saluki-components/src/transforms/host_tags/mod.rs @@ -1,21 +1,16 @@ use std::{ - num::NonZeroUsize, sync::Arc, time::{Duration, Instant}, }; use async_trait::async_trait; -use bytesize::ByteSize; use memory_accounting::{MemoryBounds, MemoryBoundsBuilder}; use saluki_config::GenericConfiguration; -use saluki_context::{ - tags::{SharedTagSet, Tag}, - ContextResolver, ContextResolverBuilder, -}; +use saluki_context::tags::{SharedTagSet, Tag}; use saluki_core::{components::transforms::*, topology::EventsBuffer}; use saluki_core::{components::ComponentContext, data_model::event::metric::Metric}; use saluki_env::helpers::remote_agent::RemoteAgentClient; -use saluki_error::{generic_error, GenericError}; +use saluki_error::GenericError; use stringtheory::MetaString; /// Host Tags synchronous transform. @@ -24,12 +19,10 @@ use stringtheory::MetaString; /// preventing gaps in queryability until the backend starts adding these tags automatically. pub struct HostTagsConfiguration { client: RemoteAgentClient, - host_tags_context_string_interner_bytes: ByteSize, expected_tags_duration: u64, } const DEFAULT_EXPECTED_TAGS_DURATION: u64 = 0; -const DEFAULT_HOST_TAGS_CONTEXT_STRING_INTERNER_BYTES: ByteSize = ByteSize::kib(64); impl HostTagsConfiguration { /// Creates a new `HostTagsConfiguration` from the given configuration. @@ -38,13 +31,9 @@ impl HostTagsConfiguration { let expected_tags_duration = config .try_get_typed::("expected_tags_duration")? .unwrap_or(DEFAULT_EXPECTED_TAGS_DURATION); - let host_tags_context_string_interner_bytes = config - .try_get_typed::("host_tags_context_string_interner_bytes")? - .unwrap_or(DEFAULT_HOST_TAGS_CONTEXT_STRING_INTERNER_BYTES); Ok(Self { client, - host_tags_context_string_interner_bytes, expected_tags_duration, }) } @@ -52,7 +41,7 @@ impl HostTagsConfiguration { #[async_trait] impl SynchronousTransformBuilder for HostTagsConfiguration { - async fn build(&self, context: ComponentContext) -> Result, GenericError> { + async fn build(&self, _context: ComponentContext) -> Result, GenericError> { // Make an initial request of the host tags from the Datadog Agent. // // We only pay attention to the "system" tags, as the "google_cloud_platform" tags are not relevant here. @@ -65,19 +54,8 @@ impl SynchronousTransformBuilder for HostTagsConfiguration { .map(Tag::from) .collect::(); - let context_string_interner_size = - NonZeroUsize::new(self.host_tags_context_string_interner_bytes.as_u64() as usize) - .ok_or_else(|| generic_error!("host_tags_context_string_interner_bytes must be greater than 0")) - .unwrap(); - let context_resolver = - ContextResolverBuilder::from_name(format!("{}/host_tags/primary", context.component_id())) - .expect("resolver name is not empty") - .with_interner_capacity_bytes(context_string_interner_size) - .with_idle_context_expiration(Duration::from_secs(30)) - .build(); Ok(Box::new(HostTagsEnrichment { start: Instant::now(), - context_resolver: Some(context_resolver), expected_tags_duration: Duration::from_secs(self.expected_tags_duration), host_tags: Some(host_tags), })) @@ -87,41 +65,25 @@ impl SynchronousTransformBuilder for HostTagsConfiguration { impl MemoryBounds for HostTagsConfiguration { fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) { builder - // Capture the size of the heap allocation when the component is built. .minimum() - .with_single_value::("component struct") - // We also allocate the backing storage for the string interner up front, which is used by our context - // resolver. - .with_fixed_amount( - "string interner", - self.host_tags_context_string_interner_bytes.as_u64() as usize, - ); + .with_single_value::("component struct"); } } pub struct HostTagsEnrichment { start: Instant, - context_resolver: Option, expected_tags_duration: Duration, host_tags: Option, } impl HostTagsEnrichment { fn enrich_metric(&mut self, metric: &mut Metric) { - // Get our context resolver and host tags. - // - // If they're not available, then we skip adding host tags. - let (resolver, host_tags) = match (self.context_resolver.as_mut(), self.host_tags.as_ref()) { - (Some(resolver), Some(host_tags)) => (resolver, host_tags), - _ => return, + let host_tags = match self.host_tags.as_ref() { + Some(host_tags) => host_tags, + None => return, }; - let tags = metric.context().tags().into_iter().chain(host_tags); - let origin_tags = metric.context().origin_tags().clone(); - - if let Some(context) = resolver.resolve_with_origin_tags(metric.context().name(), tags, origin_tags) { - *metric.context_mut() = context; - } + metric.context_mut().with_tags_mut(|tags| tags.merge_shared(host_tags)); } } @@ -129,7 +91,6 @@ impl SynchronousTransform for HostTagsEnrichment { fn transform_buffer(&mut self, event_buffer: &mut EventsBuffer) { // Skip adding host tags if duration has elapsed. if self.start.elapsed() >= self.expected_tags_duration { - self.context_resolver = None; self.host_tags = None; return; } @@ -146,18 +107,16 @@ impl SynchronousTransform for HostTagsEnrichment { mod tests { use std::time::{Duration, Instant}; - use saluki_context::{Context, ContextResolverBuilder}; + use saluki_context::Context; use saluki_core::data_model::event::metric::Metric; use super::*; #[test] fn basic() { - let context_resolver = ContextResolverBuilder::for_tests().build(); let host_tags = SharedTagSet::from_iter(vec![Tag::from("hosttag1"), Tag::from("hosttag2")]); let mut host_tags_enrichment = HostTagsEnrichment { start: Instant::now(), - context_resolver: Some(context_resolver), expected_tags_duration: Duration::from_secs(30), host_tags: Some(host_tags.clone()), }; @@ -170,8 +129,7 @@ mod tests { assert!(metric1.context().tags().has_tag(tag)); } - // Simulate exceeding our configured enrichment duration by clearing the context resolver and host tags. - host_tags_enrichment.context_resolver = None; + // Simulate exceeding our configured enrichment duration by clearing host tags. host_tags_enrichment.host_tags = None; // We should no longer enrich the metric with host tags. diff --git a/lib/saluki-components/src/transforms/tag_filterlist/mod.rs b/lib/saluki-components/src/transforms/tag_filterlist/mod.rs index 73e8981dd2d..1efcfa0de94 100644 --- a/lib/saluki-components/src/transforms/tag_filterlist/mod.rs +++ b/lib/saluki-components/src/transforms/tag_filterlist/mod.rs @@ -8,17 +8,12 @@ mod telemetry; -use std::{num::NonZeroUsize, time::Duration}; - use async_trait::async_trait; use foldhash::fast::RandomState as FoldHashState; use hashbrown::{HashMap, HashSet}; use memory_accounting::{MemoryBounds, MemoryBoundsBuilder}; use saluki_config::GenericConfiguration; -use saluki_context::{ - tags::{SharedTagSet, TagSet}, - ContextResolver, ContextResolverBuilder, -}; +use saluki_context::tags::{Tag, TagSet}; use saluki_core::{ components::{ transforms::{Transform, TransformBuilder, TransformContext}, @@ -60,11 +55,6 @@ pub struct MetricTagFilterEntry { /// Compiled filter table: metric name → (is_exclude, set of tag key names). pub type CompiledFilters = HashMap), FoldHashState>; -struct FilteredTagSet { - tags: TagSet, - removed: usize, -} - #[derive(Clone, Copy, Debug, Eq, PartialEq)] /// Outcome of attempting to apply `metric_tag_filterlist` rules to a metric. pub enum FilterMetricTagsOutcome { @@ -114,14 +104,6 @@ pub fn compile_filters(entries: &[MetricTagFilterEntry]) -> CompiledFilters { filters } -/// Default number of filtered contexts to retain in the local resolver cache. -fn default_context_resolver_limit() -> usize { - 5000 -} - -const DEFAULT_TAG_FILTERLIST_INTERNER_CAPACITY_BYTES: usize = 64 * 1024; -const TAG_FILTERLIST_RESOLVER_IDLE_EXPIRATION: Duration = Duration::from_secs(30); - /// Metric Tag Filterlist transform. /// /// Removes or retains specific tags from distribution metrics based on per-metric configuration. @@ -131,13 +113,6 @@ pub struct TagFilterlistConfiguration { #[serde(default, rename = "metric_tag_filterlist")] entries: Vec, - /// Maximum number of filtered contexts to cache in the local resolver. - /// - /// This should generally match `aggregate_context_limit` so the resolver can retain one - /// filtered context per distinct aggregated context. Defaults to 5000. - #[serde(rename = "aggregate_context_limit", default = "default_context_resolver_limit")] - context_resolver_limit: usize, - #[serde(skip)] configuration: Option, } @@ -164,17 +139,6 @@ impl TransformBuilder for TagFilterlistConfiguration { async fn build(&self, context: ComponentContext) -> Result, GenericError> { let metrics_builder = MetricsBuilder::from_component_context(&context); - let context_resolver_limit = - NonZeroUsize::new(self.context_resolver_limit.max(1)).expect("context_resolver_limit is always >= 1"); - let context_resolver = - ContextResolverBuilder::from_name(format!("{}/tag_filterlist/primary", context.component_id()))? - .with_cached_contexts_limit(context_resolver_limit.get()) - .with_idle_context_expiration(TAG_FILTERLIST_RESOLVER_IDLE_EXPIRATION) - .with_interner_capacity_bytes( - NonZeroUsize::new(DEFAULT_TAG_FILTERLIST_INTERNER_CAPACITY_BYTES) - .expect("tag filter interner capacity is non-zero"), - ) - .build(); Ok(Box::new(TagFilterlist { filters: compile_filters(&self.entries), @@ -183,20 +147,13 @@ impl TransformBuilder for TagFilterlistConfiguration { .clone() .expect("configuration must be set via from_configuration"), telemetry: Telemetry::new(&metrics_builder), - context_resolver, })) } } impl MemoryBounds for TagFilterlistConfiguration { fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) { - builder - .minimum() - .with_single_value::("component struct") - .with_fixed_amount( - "tag filter context resolver string interner", - DEFAULT_TAG_FILTERLIST_INTERNER_CAPACITY_BYTES, - ); + builder.minimum().with_single_value::("component struct"); } } @@ -204,7 +161,6 @@ struct TagFilterlist { filters: CompiledFilters, configuration: GenericConfiguration, telemetry: Telemetry, - context_resolver: ContextResolver, } #[async_trait] @@ -225,11 +181,7 @@ impl Transform for TagFilterlist { for event in &mut events { if let Some(metric) = event.try_as_metric_mut() { if metric.values().is_sketch() { - let outcome = filter_metric_tags_with_resolver( - metric, - &self.filters, - &mut self.context_resolver, - ); + let outcome = filter_metric_tags(metric, &self.filters); self.telemetry.record(outcome); } } @@ -253,123 +205,14 @@ impl Transform for TagFilterlist { } } -/// Applies a tag filter to a shared tag set, returning `Some(TagSet)` if any tags were -/// filtered out, or `None` if the result would be identical to the source. -/// -/// Tags whose key is in `names` are excluded when `is_exclude` is true, or kept when false. -/// Constructs a fresh `TagSet` without mutating the source, preserving isolation for -/// metrics that share the same underlying `Arc`. #[inline] -fn apply_tag_filter( - tags: &SharedTagSet, is_exclude: bool, names: &HashSet, -) -> Option { - let estimated_capacity = if is_exclude { - tags.len().saturating_sub(names.len()) - } else { - names.len().min(tags.len()) - }; - let mut out: Option = None; - let mut removed = 0; - - for (idx, tag) in tags.into_iter().enumerate() { - if is_exclude != names.contains(tag.name()) { - if let Some(out) = out.as_mut() { - out.extend([tag.clone()]); - } - continue; - } - - removed += 1; - if out.is_none() { - // Delay allocating the output until we know filtering actually changes the tag set. - let mut filtered = TagSet::with_capacity(estimated_capacity.max(idx)); - for existing in tags.into_iter().take(idx) { - filtered.extend([existing.clone()]); - } - out = Some(filtered); - } - } - - out.map(|tags| FilteredTagSet { tags, removed }) +fn should_keep_tag(tag: &Tag, is_exclude: bool, names: &HashSet) -> bool { + is_exclude != names.contains(tag.as_borrowed().name()) } -/// Filters the tags of a sketch metric and resolves modified outputs through a local context -/// resolver so repeated filtered contexts can be reused across metrics. -fn filter_metric_tags_with_resolver( - metric: &mut Metric, filters: &CompiledFilters, context_resolver: &mut ContextResolver, -) -> FilterMetricTagsOutcome { - let Some((is_exclude, tag_names)) = filters.get(metric.context().name().as_ref()) else { - return FilterMetricTagsOutcome::RuleMiss; - }; - - let new_tags = apply_tag_filter(metric.context().tags(), *is_exclude, tag_names); - - if metric.context().origin_tags().is_empty() { - match new_tags { - Some(filtered) => { - let removed_tags = filtered.removed; - if let Some(context) = context_resolver.resolve_with_origin_tags( - metric.context().name(), - &filtered.tags, - metric.context().origin_tags().clone(), - ) { - *metric.context_mut() = context; - } else { - *metric.context_mut() = metric.context().with_tags(filtered.tags.into_shared()); - } - FilterMetricTagsOutcome::Modified { removed_tags } - } - None => FilterMetricTagsOutcome::NoChange, - } - } else { - let new_origin = apply_tag_filter(metric.context().origin_tags(), *is_exclude, tag_names); - match (new_tags, new_origin) { - (None, None) => FilterMetricTagsOutcome::NoChange, - (Some(tags), None) => { - let removed_tags = tags.removed; - if let Some(context) = context_resolver.resolve_with_origin_tags( - metric.context().name(), - &tags.tags, - metric.context().origin_tags().clone(), - ) { - *metric.context_mut() = context; - } else { - *metric.context_mut() = metric.context().with_tags(tags.tags.into_shared()); - } - FilterMetricTagsOutcome::Modified { removed_tags } - } - (None, Some(origin)) => { - let removed_tags = origin.removed; - let filtered_origin = origin.tags.into_shared(); - if let Some(context) = context_resolver.resolve_with_origin_tags( - metric.context().name(), - metric.context().tags(), - filtered_origin.clone(), - ) { - *metric.context_mut() = context; - } else { - *metric.context_mut() = metric.context().with_origin_tags(filtered_origin); - } - FilterMetricTagsOutcome::Modified { removed_tags } - } - (Some(tags), Some(origin)) => { - let removed_tags = tags.removed + origin.removed; - let filtered_origin = origin.tags.into_shared(); - if let Some(context) = context_resolver.resolve_with_origin_tags( - metric.context().name(), - &tags.tags, - filtered_origin.clone(), - ) { - *metric.context_mut() = context; - } else { - *metric.context_mut() = metric - .context() - .with_tags_and_origin_tags(tags.tags.into_shared(), filtered_origin); - } - FilterMetricTagsOutcome::Modified { removed_tags } - } - } - } +#[inline] +fn has_removable_tags(tags: &TagSet, is_exclude: bool, names: &HashSet) -> bool { + tags.into_iter().any(|tag| !should_keep_tag(tag, is_exclude, names)) } /// Filter the tags of a distribution metric according to the compiled filter table. @@ -383,47 +226,41 @@ pub fn filter_metric_tags(metric: &mut Metric, filters: &CompiledFilters) -> Fil return FilterMetricTagsOutcome::RuleMiss; }; - let new_tags = apply_tag_filter(metric.context().tags(), *is_exclude, tag_names); + let is_exclude = *is_exclude; - if metric.context().origin_tags().is_empty() { - if let Some(filtered) = new_tags { - let removed_tags = filtered.removed; - *metric.context_mut() = metric.context().with_tags(filtered.tags.into_shared()); - FilterMetricTagsOutcome::Modified { removed_tags } - } else { - FilterMetricTagsOutcome::NoChange + // Quick read-only check: avoids CoW clone + hash_context recomputation when nothing changes. + // any() short-circuits on the first removable tag. + let has_tag_removals = has_removable_tags(metric.context().tags(), is_exclude, tag_names); + let has_origin_removals = has_removable_tags(metric.context().origin_tags(), is_exclude, tag_names); + + if !has_tag_removals && !has_origin_removals { + return FilterMetricTagsOutcome::NoChange; + } + + // Single-pass mutation: use len() before/after retain() to count removals. + let mut total_removed = 0; + metric.context_mut().with_tag_sets_mut(|tags, origin_tags| { + if has_tag_removals { + let before = tags.len(); + tags.retain(|tag| should_keep_tag(tag, is_exclude, tag_names)); + total_removed += before - tags.len(); } - } else { - let new_origin = apply_tag_filter(metric.context().origin_tags(), *is_exclude, tag_names); - match (new_tags, new_origin) { - (None, None) => FilterMetricTagsOutcome::NoChange, - (Some(tags), None) => { - let removed_tags = tags.removed; - *metric.context_mut() = metric.context().with_tags(tags.tags.into_shared()); - FilterMetricTagsOutcome::Modified { removed_tags } - } - (None, Some(origin)) => { - let removed_tags = origin.removed; - *metric.context_mut() = metric.context().with_origin_tags(origin.tags.into_shared()); - FilterMetricTagsOutcome::Modified { removed_tags } - } - (Some(tags), Some(origin)) => { - let removed_tags = tags.removed + origin.removed; - *metric.context_mut() = metric - .context() - .with_tags_and_origin_tags(tags.tags.into_shared(), origin.tags.into_shared()); - FilterMetricTagsOutcome::Modified { removed_tags } - } + if has_origin_removals { + let before = origin_tags.len(); + origin_tags.retain(|tag| should_keep_tag(tag, is_exclude, tag_names)); + total_removed += before - origin_tags.len(); } + }); + + FilterMetricTagsOutcome::Modified { + removed_tags: total_removed, } } #[cfg(test)] mod tests { - use std::sync::Arc; - use saluki_config::{dynamic::ConfigUpdate, ConfigurationLoader}; - use saluki_context::{tags::Tag, Context, ContextResolverBuilder}; + use saluki_context::{tags::Tag, Context}; use saluki_core::data_model::event::metric::Metric; use saluki_metrics::{test::TestRecorder, MetricsBuilder}; @@ -940,56 +777,50 @@ mod tests { assert_eq!(tag_names(&metric), vec!["service:web"]); } - // --- Resolver behavior tests --- + // --- Mutation-path tests --- #[test] - fn resolver_hit_reuses_filtered_context() { + fn modified_filter_marks_tagsets_as_modified() { let entries = vec![MetricTagFilterEntry { metric_name: "my.dist".to_string(), action: FilterAction::Exclude, tags: vec!["host".to_string()], }]; let filters = compile_filters(&entries); - let mut context_resolver = ContextResolverBuilder::for_tests().build(); - let mut metric1 = distribution_metric("my.dist", &["env:prod", "host:h1"]); - let mut metric2 = distribution_metric("my.dist", &["env:prod", "host:h1"]); + let mut metric = distribution_metric("my.dist", &["env:prod", "host:h1"]); - let outcome1 = filter_metric_tags_with_resolver(&mut metric1, &filters, &mut context_resolver); - let outcome2 = filter_metric_tags_with_resolver(&mut metric2, &filters, &mut context_resolver); + let outcome = filter_metric_tags(&mut metric, &filters); - assert_eq!(outcome1, FilterMetricTagsOutcome::Modified { removed_tags: 1 }); - assert_eq!(outcome2, FilterMetricTagsOutcome::Modified { removed_tags: 1 }); - assert!(metric1 - .context() - .tags() - .as_tag_sets() - .iter() - .zip(metric2.context().tags().as_tag_sets().iter()) - .all(|(a, b)| Arc::ptr_eq(a, b))); + assert_eq!(outcome, FilterMetricTagsOutcome::Modified { removed_tags: 1 }); + assert_eq!(tag_names(&metric), vec!["env:prod"]); + assert!(metric.context().tags().is_modified()); } #[test] - fn resolver_no_change_preserves_original_context_tags() { + fn no_change_does_not_mark_tagsets_as_modified() { let entries = vec![MetricTagFilterEntry { metric_name: "my.dist".to_string(), action: FilterAction::Exclude, tags: vec!["region".to_string()], }]; let filters = compile_filters(&entries); - let mut context_resolver = ContextResolverBuilder::for_tests().build(); - let mut metric = distribution_metric("my.dist", &["env:prod", "host:h1"]); - let original_tags = metric.context().tags().clone(); + let shared_tags = ["env:prod", "host:h1"] + .into_iter() + .map(Tag::from) + .collect::() + .into_shared(); + let context = Context::from_parts("my.dist", shared_tags); + let mut metric = Metric::distribution(context, 1.0); + + assert!(!metric.context().tags().is_modified()); + assert!(!metric.context().origin_tags().is_modified()); - let outcome = filter_metric_tags_with_resolver(&mut metric, &filters, &mut context_resolver); + let outcome = filter_metric_tags(&mut metric, &filters); assert_eq!(outcome, FilterMetricTagsOutcome::NoChange); - assert!(metric - .context() - .tags() - .as_tag_sets() - .iter() - .zip(original_tags.as_tag_sets().iter()) - .all(|(a, b)| Arc::ptr_eq(a, b))); + assert_eq!(tag_names(&metric), vec!["env:prod", "host:h1"]); + assert!(!metric.context().tags().is_modified()); + assert!(!metric.context().origin_tags().is_modified()); } } diff --git a/lib/saluki-context/Cargo.toml b/lib/saluki-context/Cargo.toml index d25400da908..d2e008d2303 100644 --- a/lib/saluki-context/Cargo.toml +++ b/lib/saluki-context/Cargo.toml @@ -22,5 +22,10 @@ tokio = { workspace = true, features = ["rt", "time"] } tracing = { workspace = true } [dev-dependencies] +criterion = { workspace = true } metrics-util = { workspace = true, features = ["debugging"] } proptest = { workspace = true } + +[[bench]] +name = "tagset" +harness = false diff --git a/lib/saluki-context/benches/tagset.rs b/lib/saluki-context/benches/tagset.rs new file mode 100644 index 00000000000..106680fedf6 --- /dev/null +++ b/lib/saluki-context/benches/tagset.rs @@ -0,0 +1,248 @@ +use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput}; +use saluki_context::tags::{SharedTagSet, Tag, TagSet}; + +/// Build a SharedTagSet with the given number of tags. +fn make_base(n: usize) -> SharedTagSet { + let ts: TagSet = (0..n).map(|i| Tag::from(format!("key{}:value{}", i, i))).collect(); + ts.into_shared() +} + +/// Build a SharedTagSet with two chained groups. +fn make_chained_base(n: usize) -> SharedTagSet { + let half = n / 2; + let ts1: TagSet = (0..half).map(|i| Tag::from(format!("key{}:value{}", i, i))).collect(); + let ts2: TagSet = (half..n).map(|i| Tag::from(format!("key{}:value{}", i, i))).collect(); + let mut shared = ts1.into_shared(); + shared.extend_from_shared(&ts2.into_shared()); + shared +} + +fn bench_insert_fresh(c: &mut Criterion) { + let sizes = [10, 25, 50]; + let mut group = c.benchmark_group("TagSet/insert_fresh"); + + for &size in &sizes { + group.throughput(Throughput::Elements(size as u64)); + group.bench_with_input(BenchmarkId::from_parameter(size), &size, |b, &size| { + b.iter(|| { + let mut ts = TagSet::with_capacity(size); + for i in 0..size { + ts.insert_tag(Tag::from(format!("key{}:value{}", i, i))); + } + ts + }); + }); + } + + group.finish(); +} + +fn bench_insert_over_base(c: &mut Criterion) { + let sizes = [10, 25, 50]; + let mut group = c.benchmark_group("TagSet/insert_over_base"); + + for &size in &sizes { + let base = make_base(size); + group.bench_with_input(BenchmarkId::from_parameter(size), &size, |b, &size| { + b.iter(|| { + let mut ts = TagSet::from(base.clone()); + // Insert a tag that shadows the middle tag in the base. + ts.insert_tag(Tag::from(format!("key{}:new_value", size / 2))); + ts + }); + }); + } + + group.finish(); +} + +fn bench_remove_from_base(c: &mut Criterion) { + let sizes = [10, 25, 50]; + let mut group = c.benchmark_group("TagSet/remove_from_base"); + + for &size in &sizes { + let base = make_base(size); + group.bench_with_input(BenchmarkId::from_parameter(size), &size, |b, &size| { + b.iter(|| { + let mut ts = TagSet::from(base.clone()); + ts.remove_tags(format!("key{}", size / 2)); + ts + }); + }); + } + + group.finish(); +} + +fn bench_remove_from_additions(c: &mut Criterion) { + let mut group = c.benchmark_group("TagSet/remove_from_additions"); + + for &size in &[10, 25, 50] { + group.bench_with_input(BenchmarkId::from_parameter(size), &size, |b, &size| { + b.iter(|| { + let mut ts = TagSet::with_capacity(size); + for i in 0..size { + ts.insert_tag(Tag::from(format!("key{}:value{}", i, i))); + } + ts.remove_tags(format!("key{}", size / 2)); + ts + }); + }); + } + + group.finish(); +} + +fn bench_get_single_tag_in_base(c: &mut Criterion) { + let sizes = [10, 25, 50]; + let mut group = c.benchmark_group("TagSet/get_single_tag_in_base"); + + for &size in &sizes { + let base = make_base(size); + let ts = TagSet::from(base); + let key = format!("key{}", size / 2); + + group.bench_with_input(BenchmarkId::from_parameter(size), &size, |b, _| { + b.iter(|| ts.get_single_tag(&key)); + }); + } + + group.finish(); +} + +fn bench_get_single_tag_in_additions(c: &mut Criterion) { + let sizes = [10, 25, 50]; + let mut group = c.benchmark_group("TagSet/get_single_tag_in_additions"); + + for &size in &sizes { + let base = make_base(size); + let mut ts = TagSet::from(base); + ts.insert_tag(Tag::from("lookup_target:found")); + + group.bench_with_input(BenchmarkId::from_parameter(size), &size, |b, _| { + b.iter(|| ts.get_single_tag("lookup_target")); + }); + } + + group.finish(); +} + +fn bench_iterate_unmodified(c: &mut Criterion) { + let sizes = [10, 25, 50]; + let mut group = c.benchmark_group("TagSet/iterate_unmodified"); + + for &size in &sizes { + let base = make_base(size); + let ts = TagSet::from(base); + + group.throughput(Throughput::Elements(size as u64)); + group.bench_with_input(BenchmarkId::from_parameter(size), &size, |b, _| { + b.iter(|| { + let mut count = 0u32; + for _ in &ts { + count += 1; + } + count + }); + }); + } + + group.finish(); +} + +fn bench_iterate_with_removals(c: &mut Criterion) { + let sizes = [10, 25, 50]; + let mut group = c.benchmark_group("TagSet/iterate_with_removals"); + + for &size in &sizes { + let base = make_base(size); + let mut ts = TagSet::from(base); + // Remove ~20% of tags. + for i in (0..size).step_by(5) { + ts.remove_tags(format!("key{}", i)); + } + + group.throughput(Throughput::Elements(ts.len() as u64)); + group.bench_with_input(BenchmarkId::from_parameter(size), &size, |b, _| { + b.iter(|| { + let mut count = 0u32; + for _ in &ts { + count += 1; + } + count + }); + }); + } + + group.finish(); +} + +fn bench_into_shared_unmodified(c: &mut Criterion) { + let sizes = [10, 25, 50]; + let mut group = c.benchmark_group("TagSet/into_shared_unmodified"); + + for &size in &sizes { + let base = make_base(size); + group.bench_with_input(BenchmarkId::from_parameter(size), &size, |b, _| { + b.iter(|| { + let ts = TagSet::from(base.clone()); + ts.into_shared() + }); + }); + } + + group.finish(); +} + +fn bench_into_shared_modified(c: &mut Criterion) { + let sizes = [10, 25, 50]; + let mut group = c.benchmark_group("TagSet/into_shared_modified"); + + for &size in &sizes { + let base = make_base(size); + group.bench_with_input(BenchmarkId::from_parameter(size), &size, |b, _| { + b.iter(|| { + let mut ts = TagSet::from(base.clone()); + ts.insert_tag(Tag::from("extra:tag")); + ts.remove_tags("key0"); + ts.into_shared() + }); + }); + } + + group.finish(); +} + +fn bench_clone(c: &mut Criterion) { + let sizes = [10, 25, 50]; + let mut group = c.benchmark_group("TagSet/clone"); + + for &size in &sizes { + let base = make_chained_base(size); + let mut ts = TagSet::from(base); + ts.insert_tag(Tag::from("extra:tag")); + ts.remove_tags("key0"); + + group.bench_with_input(BenchmarkId::from_parameter(size), &size, |b, _| { + b.iter(|| ts.clone()); + }); + } + + group.finish(); +} + +criterion_group!( + benches, + bench_insert_fresh, + bench_insert_over_base, + bench_remove_from_base, + bench_remove_from_additions, + bench_get_single_tag_in_base, + bench_get_single_tag_in_additions, + bench_iterate_unmodified, + bench_iterate_with_removals, + bench_into_shared_unmodified, + bench_into_shared_modified, + bench_clone, +); +criterion_main!(benches); diff --git a/lib/saluki-context/proptest-regressions/tags/tagset/owned.txt b/lib/saluki-context/proptest-regressions/tags/tagset/owned.txt new file mode 100644 index 00000000000..35617f1b6ac --- /dev/null +++ b/lib/saluki-context/proptest-regressions/tags/tagset/owned.txt @@ -0,0 +1,7 @@ +# Seeds for failure cases proptest has generated in the past. It is +# automatically read and these particular cases re-run before any +# novel cases are generated. +# +# It is recommended to check this file in to source control so that +# everyone who runs the test benefits from these saved cases. +cc ba61ffa8cbde327be9e3437206cbc326c4f37534af68b0b21ef4fc7c7493313f # shrinks to base_groups = [["p:0"], ["p:0"]], ops = [] diff --git a/lib/saluki-context/src/context.rs b/lib/saluki-context/src/context.rs index fca64c49f0f..3aba7740116 100644 --- a/lib/saluki-context/src/context.rs +++ b/lib/saluki-context/src/context.rs @@ -5,7 +5,7 @@ use stringtheory::MetaString; use crate::{ hash::{hash_context, ContextKey}, - tags::{SharedTagSet, TagSet}, + tags::TagSet, }; const BASE_CONTEXT_SIZE: usize = std::mem::size_of::() + std::mem::size_of::(); @@ -19,8 +19,8 @@ pub struct Context { impl Context { /// Creates a new `Context` from the given static name. pub fn from_static_name(name: &'static str) -> Self { - let tags = SharedTagSet::default(); - let origin_tags = SharedTagSet::default(); + let tags = TagSet::default(); + let origin_tags = TagSet::default(); let (key, _) = hash_context(name, &tags, &origin_tags); Self { @@ -41,13 +41,13 @@ impl Context { tag_set.insert_tag(MetaString::from_static(tag)); } - let origin_tags = SharedTagSet::default(); + let origin_tags = TagSet::default(); let (key, _) = hash_context(name, tags, &origin_tags); Self { inner: Arc::new(ContextInner { name: MetaString::from_static(name), - tags: tag_set.into_shared(), + tags: tag_set, origin_tags, key, active_count: Gauge::noop(), @@ -56,9 +56,10 @@ impl Context { } /// Creates a new `Context` from the given name and given tags. - pub fn from_parts>(name: S, tags: SharedTagSet) -> Self { + pub fn from_parts>(name: S, tags: impl Into) -> Self { let name = name.into(); - let origin_tags = SharedTagSet::default(); + let tags = tags.into(); + let origin_tags = TagSet::default(); let (key, _) = hash_context(&name, &tags, &origin_tags); Self { inner: Arc::new(ContextInner { @@ -93,8 +94,9 @@ impl Context { /// Clones this context, and uses the given tags for the cloned context. /// /// The name and origin tags of this context are preserved. - pub fn with_tags(&self, tags: SharedTagSet) -> Self { + pub fn with_tags(&self, tags: impl Into) -> Self { let name = self.inner.name.clone(); + let tags = tags.into(); let origin_tags = self.inner.origin_tags.clone(); let (key, _) = hash_context(&name, &tags, &origin_tags); @@ -112,9 +114,10 @@ impl Context { /// Clones this context, and uses the given origin tags for the cloned context. /// /// The name and instrumented tags of this context are preserved. - pub fn with_origin_tags(&self, origin_tags: SharedTagSet) -> Self { + pub fn with_origin_tags(&self, origin_tags: impl Into) -> Self { let name = self.inner.name.clone(); let tags = self.inner.tags.clone(); + let origin_tags = origin_tags.into(); let (key, _) = hash_context(&name, &tags, &origin_tags); Self { @@ -132,8 +135,10 @@ impl Context { /// /// Preferred over two separate `with_tags` / `with_origin_tags` calls when both sets need to /// be replaced, as it halves the number of `Arc` allocations. - pub fn with_tags_and_origin_tags(&self, tags: SharedTagSet, origin_tags: SharedTagSet) -> Self { + pub fn with_tags_and_origin_tags(&self, tags: impl Into, origin_tags: impl Into) -> Self { let name = self.inner.name.clone(); + let tags = tags.into(); + let origin_tags = origin_tags.into(); let (key, _) = hash_context(&name, &tags, &origin_tags); Self { @@ -162,15 +167,55 @@ impl Context { } /// Returns the instrumented tags of this context. - pub fn tags(&self) -> &SharedTagSet { + pub fn tags(&self) -> &TagSet { &self.inner.tags } /// Returns the origin tags of this context. - pub fn origin_tags(&self) -> &SharedTagSet { + pub fn origin_tags(&self) -> &TagSet { &self.inner.origin_tags } + /// Mutates the instrumented tags of this context via a closure. + /// + /// Uses copy-on-write semantics: if this context shares its inner data with other clones, the + /// inner data is cloned first so that mutations do not affect other holders. If this context is + /// the sole owner, the mutation happens in place. + /// + /// The context key is automatically recomputed after the closure returns. + pub fn with_tags_mut(&mut self, f: impl FnOnce(&mut TagSet)) { + self.mutate_inner(|inner| f(&mut inner.tags)); + } + + /// Mutates the origin tags of this context via a closure. + /// + /// Uses copy-on-write semantics: if this context shares its inner data with other clones, the + /// inner data is cloned first so that mutations do not affect other holders. If this context is + /// the sole owner, the mutation happens in place. + /// + /// The context key is automatically recomputed after the closure returns. + pub fn with_origin_tags_mut(&mut self, f: impl FnOnce(&mut TagSet)) { + self.mutate_inner(|inner| f(&mut inner.origin_tags)); + } + + /// Mutates both instrumented tags and origin tags via a single closure. + /// + /// Uses copy-on-write semantics: if this context shares its inner data with other clones, the + /// inner data is cloned first so that mutations do not affect other holders. If this context is + /// the sole owner, the mutation happens in place. + /// + /// The context key is recomputed once after the closure returns. + pub fn with_tag_sets_mut(&mut self, f: impl FnOnce(&mut TagSet, &mut TagSet)) { + self.mutate_inner(|inner| f(&mut inner.tags, &mut inner.origin_tags)); + } + + fn mutate_inner(&mut self, f: impl FnOnce(&mut ContextInner)) { + let inner = Arc::make_mut(&mut self.inner); + f(inner); + let (key, _) = hash_context(&inner.name, &inner.tags, &inner.origin_tags); + inner.key = key; + } + /// Returns the size of this context in bytes. /// /// A context's size is the sum of the sizes of its fields and the size of the `Context` struct itself, and @@ -233,14 +278,14 @@ impl fmt::Display for Context { pub(super) struct ContextInner { key: ContextKey, name: MetaString, - tags: SharedTagSet, - origin_tags: SharedTagSet, + tags: TagSet, + origin_tags: TagSet, active_count: Gauge, } impl ContextInner { pub fn from_parts( - key: ContextKey, name: MetaString, tags: SharedTagSet, origin_tags: SharedTagSet, active_count: Gauge, + key: ContextKey, name: MetaString, tags: TagSet, origin_tags: TagSet, active_count: Gauge, ) -> Self { Self { key, @@ -309,8 +354,8 @@ mod tests { const SIZE_OF_CONTEXT_TAGS: &[&str] = &["size_of_test_tag1", "size_of_test_tag2"]; const SIZE_OF_CONTEXT_ORIGIN_TAGS: &[&str] = &["size_of_test_origin_tag1", "size_of_test_origin_tag2"]; - fn tag_set(tags: &[&str]) -> SharedTagSet { - tags.iter().map(|s| Tag::from(*s)).collect::().into_shared() + fn tag_set(tags: &[&str]) -> TagSet { + tags.iter().map(|s| Tag::from(*s)).collect::() } #[test] @@ -381,4 +426,72 @@ mod tests { BASE_CONTEXT_SIZE + SIZE_OF_CONTEXT_NAME.len() + tags.size_of() + origin_tags.size_of() ); } + + #[test] + fn with_tags_mut_clones_shared_context() { + let original = Context::from_static_parts("metric", &["env:prod"]); + let mut mutated = original.clone(); + + // They share the same Arc before mutation. + assert!(original.ptr_eq(&mutated)); + + mutated.with_tags_mut(|tags| { + tags.insert_tag(Tag::from("service:web")); + }); + + // After mutation, they no longer share the same inner. + assert!(!original.ptr_eq(&mutated)); + } + + #[test] + fn with_tags_mut_does_not_affect_original() { + let original = Context::from_static_parts("metric", &["env:prod"]); + let mut mutated = original.clone(); + + mutated.with_tags_mut(|tags| { + tags.insert_tag(Tag::from("service:web")); + }); + + // Original is unchanged. + assert_eq!(original.tags().len(), 1); + assert!(original.tags().has_tag("env:prod")); + assert!(!original.tags().has_tag("service:web")); + + // Mutated has both tags. + assert_eq!(mutated.tags().len(), 2); + assert!(mutated.tags().has_tag("env:prod")); + assert!(mutated.tags().has_tag("service:web")); + } + + #[test] + fn with_tags_mut_rehashes() { + // Build a context and mutate it to add a tag. + let mut mutated = Context::from_static_parts("metric", &["env:prod"]); + mutated.with_tags_mut(|tags| { + tags.insert_tag(Tag::from("service:web")); + }); + + // Build an equivalent context from scratch with both tags. + let expected = Context::from_static_parts("metric", &["env:prod", "service:web"]); + + // The recomputed key should match a freshly-constructed context with the same state. + assert_eq!(mutated, expected); + } + + #[test] + fn with_origin_tags_mut_clones_shared_context() { + let original = Context::from_static_name("metric"); + let mut mutated = original.clone(); + + assert!(original.ptr_eq(&mutated)); + + mutated.with_origin_tags_mut(|tags| { + tags.insert_tag(Tag::from("origin:tag")); + }); + + assert!(!original.ptr_eq(&mutated)); + assert!(original.origin_tags().is_empty()); + assert_eq!(mutated.origin_tags().len(), 1); + assert!(mutated.origin_tags().has_tag("origin:tag")); + } } diff --git a/lib/saluki-context/src/resolver.rs b/lib/saluki-context/src/resolver.rs index 8146bfbbb3d..f1dc779a948 100644 --- a/lib/saluki-context/src/resolver.rs +++ b/lib/saluki-context/src/resolver.rs @@ -378,8 +378,8 @@ impl ContextResolver { Some(Context::from_inner(ContextInner::from_parts( key, context_name, - context_tags, - origin_tags, + context_tags.into(), + origin_tags.into(), self.telemetry.active_contexts().clone(), ))) } @@ -422,13 +422,15 @@ impl ContextResolver { /// This method is intended primarily to allow for resolving contexts in a consistent way while _reusing_ the origin /// tags from another context, such as when remapping the name and/or instrumented tags of a given metric, while /// maintaining its origin association. - pub fn resolve_with_origin_tags(&mut self, name: N, tags: I, origin_tags: SharedTagSet) -> Option + pub fn resolve_with_origin_tags( + &mut self, name: N, tags: I, origin_tags: impl Into, + ) -> Option where N: AsRef + CheapMetaString, I: IntoIterator + Clone, T: AsRef + CheapMetaString, { - self.resolve_inner(name, tags, origin_tags) + self.resolve_inner(name, tags, origin_tags.into()) } fn resolve_inner(&mut self, name: N, tags: I, origin_tags: SharedTagSet) -> Option diff --git a/lib/saluki-context/src/tags/tagset/frozen.rs b/lib/saluki-context/src/tags/tagset/frozen.rs new file mode 100644 index 00000000000..da4af4af795 --- /dev/null +++ b/lib/saluki-context/src/tags/tagset/frozen.rs @@ -0,0 +1,85 @@ +use std::fmt; + +use crate::tags::Tag; + +/// Flat, immutable tag storage. +/// +/// This is a private type used as the backing storage inside `SharedTagSet`. It holds a simple +/// `Vec` and provides read-only access. External callers work with `TagSet` (mutable) and +/// `SharedTagSet` (shared/frozen) instead. +#[derive(Clone, Debug, Default)] +pub(crate) struct FrozenTagSet(Vec); + +impl FrozenTagSet { + /// Creates a new `FrozenTagSet` from the given vector of tags. + pub(crate) fn new(tags: Vec) -> Self { + Self(tags) + } + + /// Returns `true` if the tag set is empty. + pub(crate) fn is_empty(&self) -> bool { + self.0.is_empty() + } + + /// Returns the number of tags in the set. + pub(crate) fn len(&self) -> usize { + self.0.len() + } + + /// Returns `true` if the given tag is contained in the set. + /// + /// This matches the complete tag, rather than just the name. + pub(crate) fn has_tag(&self, tag: &str) -> bool { + self.0.iter().any(|existing| existing.as_str() == tag) + } + + /// Gets a single tag, by name, from the set. + /// + /// If multiple tags are present with the same name, the first tag with a matching name will be + /// returned. If no tag in the set matches, `None` is returned. + pub(crate) fn get_single_tag(&self, tag_name: &str) -> Option<&Tag> { + self.0.iter().find(|tag| tag.name() == tag_name) + } + + /// Returns the size of the tag set, in bytes. + pub(crate) fn size_of(&self) -> usize { + (self.len() * std::mem::size_of::()) + self.0.iter().map(|tag| tag.len()).sum::() + } +} + +impl<'a> IntoIterator for &'a FrozenTagSet { + type Item = &'a Tag; + type IntoIter = std::slice::Iter<'a, Tag>; + + fn into_iter(self) -> Self::IntoIter { + self.0.iter() + } +} + +impl FromIterator for FrozenTagSet { + fn from_iter>(iter: I) -> Self { + Self(iter.into_iter().collect()) + } +} + +impl From for FrozenTagSet { + fn from(tag: Tag) -> Self { + Self(vec![tag]) + } +} + +impl fmt::Display for FrozenTagSet { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "[")?; + + for (i, tag) in self.0.iter().enumerate() { + if i > 0 { + write!(f, ",")?; + } + + write!(f, "{}", tag.as_str())?; + } + + write!(f, "]") + } +} diff --git a/lib/saluki-context/src/tags/tagset/mod.rs b/lib/saluki-context/src/tags/tagset/mod.rs index ba7a095767a..84a23076615 100644 --- a/lib/saluki-context/src/tags/tagset/mod.rs +++ b/lib/saluki-context/src/tags/tagset/mod.rs @@ -1,5 +1,7 @@ +pub(crate) mod frozen; + mod owned; pub use self::owned::TagSet; mod shared; -pub use self::shared::SharedTagSet; +pub use self::shared::{SharedTagSet, SharedTagSetIterator}; diff --git a/lib/saluki-context/src/tags/tagset/owned.rs b/lib/saluki-context/src/tags/tagset/owned.rs index c9b85e1dbd7..4a15a80064b 100644 --- a/lib/saluki-context/src/tags/tagset/owned.rs +++ b/lib/saluki-context/src/tags/tagset/owned.rs @@ -1,74 +1,146 @@ -use std::{fmt, ops::Deref as _}; +use std::{fmt, hash}; -use serde::{Deserialize, Serialize}; +use saluki_common::collections::ContiguousBitSet; +use serde::{ser::SerializeSeq as _, Serialize}; +use smallvec::SmallVec; use super::SharedTagSet; use crate::tags::Tag; -/// A set of tags. -#[derive(Clone, Debug, Default, Deserialize, Serialize)] -#[serde(transparent)] -pub struct TagSet(Vec); +/// Heap-allocated overlay for mutation state, only created when a `TagSet` is actually mutated. +#[derive(Clone, Debug)] +struct TagSetOverlay { + /// Tags added or used to replace base tags. + additions: SmallVec<[Tag; 4]>, + /// Bitset of flattened base indices that have been removed or shadowed. + removals: ContiguousBitSet, +} + +/// A mutable set of tags, optionally backed by a [`SharedTagSet`] base. +/// +/// `TagSet` supports efficient mutation through an overlay approach: it wraps an immutable [`SharedTagSet`] base with a +/// small set of additions and a bitset tracking which base tags have been removed. This avoids full materialization +/// when only a few tags need to change. +/// +/// # Memory layout +/// +/// When no mutations have been made, `TagSet` is compact (base pointer + `None` overlay). The mutation state +/// (additions and removals) is heap-allocated on demand only when `insert_tag`, `remove_tags`, or similar mutating +/// methods are called. +/// +/// # Construction +/// +/// A `TagSet` can be created from scratch (empty base, tags go into additions) or by wrapping an existing +/// [`SharedTagSet`] for mutation. When constructed from scratch, the usage pattern is the same as before: call +/// [`insert_tag`][TagSet::insert_tag] in a loop, then [`into_shared`][TagSet::into_shared] to freeze. +/// +/// # Conversion +/// +/// When converting back to [`SharedTagSet`] via [`into_shared`][TagSet::into_shared], if no mutations have been made, +/// the base is returned as-is with zero cost. Otherwise, the effective tags are materialized into a new +/// [`SharedTagSet`]. +#[derive(Clone, Debug, Default)] +pub struct TagSet { + /// Immutable base (structurally shared via Arc, never modified). + base: SharedTagSet, + /// Lazily allocated mutation overlay. `None` when no mutations have been made. + overlay: Option>, +} impl TagSet { - /// Creates a new, empty tag set with the given capacity. + /// Creates a new, empty `TagSet` with the given capacity for additions. pub fn with_capacity(capacity: usize) -> Self { - Self(Vec::with_capacity(capacity)) + Self { + base: SharedTagSet::default(), + overlay: Some(Box::new(TagSetOverlay { + additions: SmallVec::with_capacity(capacity), + removals: ContiguousBitSet::new(), + })), + } } /// Returns `true` if the tag set is empty. pub fn is_empty(&self) -> bool { - self.0.is_empty() + match &self.overlay { + None => self.base.is_empty(), + Some(overlay) => overlay.additions.is_empty() && self.base_len_minus_removals() == 0, + } } /// Returns the number of tags in the set. pub fn len(&self) -> usize { - self.0.len() + match &self.overlay { + None => self.base.len(), + Some(overlay) => self.base_len_minus_removals() + overlay.additions.len(), + } } /// Inserts a tag into the set. /// - /// If the tag is already present in the set, this does nothing. + /// If the tag is already present in the set, this does nothing. Presence is checked by exact + /// tag value (both name and value), not just by name — multiple tags with the same name but + /// different values can coexist. pub fn insert_tag(&mut self, tag: T) where T: Into, { let tag = tag.into(); - if !self.0.iter().any(|existing| existing == &tag) { - self.0.push(tag); + + // Check if the exact tag already exists in additions. + if let Some(overlay) = &self.overlay { + if overlay.additions.iter().any(|existing| existing == &tag) { + return; + } } - } - /// Inserts a tag into the set. - /// - /// If the tag is already present in the set, this does nothing. - fn insert_tag_borrowed(&mut self, tag: &Tag) { - if !self.0.iter().any(|existing| existing == tag) { - self.0.push(tag.clone()); + // Check if the exact tag exists in the base (and is not removed). + let found_in_base = base_indexed_iter(&self.base) + .any(|(idx, base_tag)| !is_overlay_removed(&self.overlay, idx) && base_tag == &tag); + if found_in_base { + return; } + + self.ensure_overlay().additions.push(tag); } - /// Removes a tag, by name, from the set. + /// Removes all tags with the given name from the set. + /// + /// Returns the removed tags, or `None` if no tags matched. pub fn remove_tags(&mut self, tag_name: T) -> Option> where T: AsRef, { let tag_name = tag_name.as_ref(); + let mut removed = Vec::new(); + + // Remove from additions. + if let Some(overlay) = &mut self.overlay { + let mut i = 0; + while i < overlay.additions.len() { + if overlay.additions[i].name() == tag_name { + removed.push(overlay.additions.remove(i)); + } else { + i += 1; + } + } + } - let mut tags = Vec::new(); - let mut idx = 0; - while idx < self.0.len() { - if tag_has_name(&self.0[idx], tag_name) { - tags.push(self.0.swap_remove(idx)); - } else { - idx += 1; + // Mark matching base tags as removed: collect indices first. + let base_matches: SmallVec<[usize; 4]> = base_indexed_iter(&self.base) + .filter(|&(idx, base_tag)| !is_overlay_removed(&self.overlay, idx) && base_tag.name() == tag_name) + .map(|(idx, _)| idx) + .collect(); + for &idx in &base_matches { + if let Some(tag) = self.base.get_by_flat_index(idx) { + removed.push(tag.clone()); } + self.ensure_overlay().removals.set(idx); } - if tags.is_empty() { + if removed.is_empty() { None } else { - Some(tags) + Some(removed) } } @@ -80,77 +152,157 @@ impl TagSet { T: AsRef, { let tag = tag.as_ref(); - self.0.iter().any(|existing| existing.0.as_ref() == tag) + + // Check additions first. + if let Some(overlay) = &self.overlay { + if overlay.additions.iter().any(|t| t.as_str() == tag) { + return true; + } + } + + // Check base, skipping removed. + for (idx, base_tag) in base_indexed_iter(&self.base) { + if !is_overlay_removed(&self.overlay, idx) && base_tag.as_str() == tag { + return true; + } + } + + false } /// Gets a single tag, by name, from the set. /// - /// If multiple tags are present with the same name, the first tag with a matching name will be returned. If no tag - /// in the set matches, `None` is returned. + /// If multiple tags are present with the same name, the first tag with a matching name will be + /// returned. Additions are checked before the base. If no tag in the set matches, `None` is + /// returned. pub fn get_single_tag(&self, tag_name: T) -> Option<&Tag> where T: AsRef, { let tag_name = tag_name.as_ref(); - self.0.iter().find(|tag| tag_has_name(tag, tag_name)) + + // Check additions first (they shadow base tags). + if let Some(overlay) = &self.overlay { + if let Some(tag) = overlay.additions.iter().find(|t| t.name() == tag_name) { + return Some(tag); + } + } + + // Check base, skipping removed. + for (idx, base_tag) in base_indexed_iter(&self.base) { + if !is_overlay_removed(&self.overlay, idx) && base_tag.name() == tag_name { + return Some(base_tag); + } + } + + None } /// Retains only the tags specified by the predicate. /// - /// In other words, remove all tags `t` for which `f(&t)` returns `false`. This method operates in place, visiting - /// each element exactly once in the original order, and preserves the order of the retained tags. + /// In other words, remove all tags `t` for which `f(&t)` returns `false`. pub fn retain(&mut self, mut f: F) where F: FnMut(&Tag) -> bool, { - self.0.retain(|tag| f(tag)); + // Filter additions in-place. + if let Some(overlay) = &mut self.overlay { + overlay.additions.retain(|tag| f(tag)); + } + + // Set removal bits for rejected base tags: collect indices first. + let to_remove: SmallVec<[usize; 4]> = base_indexed_iter(&self.base) + .filter(|&(idx, base_tag)| !is_overlay_removed(&self.overlay, idx) && !f(base_tag)) + .map(|(idx, _)| idx) + .collect(); + for idx in to_remove { + self.ensure_overlay().removals.set(idx); + } } /// Merges the tags from another set into this set. /// /// If a tag from `other` is already present in this set, it will not be added. pub fn merge_missing(&mut self, other: Self) { - for tag in other.0 { + for tag in other { self.insert_tag(tag); } } - /// Merges the tags from another shared set into this set. + /// Merges the tags from a shared set into this set. /// /// If a tag from `other` is already present in this set, it will not be added. pub fn merge_missing_shared(&mut self, other: &SharedTagSet) { for tag in other { - self.insert_tag_borrowed(tag); + if !self.has_tag(tag.as_str()) { + // We know this tag doesn't exist, so we can push directly. + self.ensure_overlay().additions.push(tag.clone()); + } } } + /// Merges the tags from a shared set into this set. + /// + /// This method does not attempt to avoid adding duplicate tags. + pub fn merge_shared(&mut self, other: &SharedTagSet) { + self.base.extend_from_shared(other); + } + /// Consumes this `TagSet` and returns a shared, read-only version of it. + /// + /// If no mutations have been made (no additions, no removals), the base `SharedTagSet` is + /// returned as-is with zero cost. pub fn into_shared(self) -> SharedTagSet { - SharedTagSet::from(self) + if self.overlay.is_none() { + return self.base; + } + + // Materialize: collect effective tags into a new SharedTagSet. + let effective: Vec = self.into_iter().collect(); + SharedTagSet::from_tags(effective) } - /// Returns the size of the tag set, in bytes. + /// Returns the estimated size of the tag set, in bytes. /// - /// This includes the size of the vector holding the tags as well as each individual tag. - /// - /// Additionally, the value returned by this method does not compensate for externalities such as whether or not - /// tags are are inlined, interned, or heap allocated. This means that the value returned is essentially the - /// worst-case usage, and should be used as a rough estimate. - pub(crate) fn size_of(&self) -> usize { - (self.len() * std::mem::size_of::()) + self.0.iter().map(|tag| tag.len()).sum::() + /// This includes the size of the base `SharedTagSet`, additions, and removal tracking. The value returned is a rough + /// estimate and does not compensate for inlined, interned, or heap-allocated tags. + pub fn size_of(&self) -> usize { + let additions_size = self + .overlay + .as_ref() + .map_or(0, |o| o.additions.iter().map(|t| t.len()).sum::()); + self.base.size_of() + additions_size + } + + /// Returns `true` if this `TagSet` has been modified from its base. + pub fn is_modified(&self) -> bool { + self.overlay.is_some() + } + + fn ensure_overlay(&mut self) -> &mut TagSetOverlay { + self.overlay.get_or_insert_with(|| { + Box::new(TagSetOverlay { + additions: SmallVec::new(), + removals: ContiguousBitSet::new(), + }) + }) + } + + /// Returns the number of base tags minus the number of removed ones. + fn base_len_minus_removals(&self) -> usize { + let removal_count = self.overlay.as_ref().map_or(0, |o| o.removals.len()); + self.base.len() - removal_count } } impl PartialEq for TagSet { fn eq(&self, other: &TagSet) -> bool { - // NOTE: We could try storing tags in sorted order internally, which would make this moot... but for now, we'll - // avoid the sort (which lets us avoid an allocation) and just do the naive per-item comparison. - if self.0.len() != other.0.len() { + if self.len() != other.len() { return false; } - for other_tag in &other.0 { - if !self.0.iter().any(|tag| tag == other_tag) { + for other_tag in other { + if !self.has_tag(other_tag.as_str()) { return false; } } @@ -159,39 +311,179 @@ impl PartialEq for TagSet { } } +impl Eq for TagSet {} + +impl hash::Hash for TagSet { + fn hash(&self, state: &mut H) { + for tag in self { + tag.hash(state); + } + } +} + +impl Serialize for TagSet { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let mut seq = serializer.serialize_seq(Some(self.len()))?; + for tag in self { + seq.serialize_element(tag)?; + } + seq.end() + } +} + impl IntoIterator for TagSet { type Item = Tag; - type IntoIter = std::vec::IntoIter; + type IntoIter = TagSetIntoIter; fn into_iter(self) -> Self::IntoIter { - self.0.into_iter() + let (removals, additions) = match self.overlay { + Some(overlay) => (Some(overlay.removals), overlay.additions.into_iter()), + None => (None, SmallVec::new().into_iter()), + }; + TagSetIntoIter { + base: self.base, + removals, + base_index: 0, + base_phase_done: false, + additions, + } + } +} + +/// Owned iterator over a `TagSet`. +pub struct TagSetIntoIter { + base: SharedTagSet, + removals: Option, + base_index: usize, + base_phase_done: bool, + additions: smallvec::IntoIter<[Tag; 4]>, +} + +impl Iterator for TagSetIntoIter { + type Item = Tag; + + fn next(&mut self) -> Option { + // First yield non-removed base tags. + if !self.base_phase_done { + loop { + let idx = self.base_index; + if let Some(tag) = self.base.get_by_flat_index(idx) { + self.base_index += 1; + let removed = self.removals.as_ref().is_some_and(|r| r.is_set(idx)); + if !removed { + return Some(tag.clone()); + } + } else { + self.base_phase_done = true; + break; + } + } + } + + // Then yield additions. + self.additions.next() } } impl<'a> IntoIterator for &'a TagSet { type Item = &'a Tag; - type IntoIter = std::slice::Iter<'a, Tag>; + type IntoIter = TagSetIter<'a>; fn into_iter(self) -> Self::IntoIter { - self.0.iter() + TagSetIter { + base_iter: self.base.into_iter(), + base_index: 0, + overlay: self.overlay.as_deref(), + additions_iter: self.overlay.as_ref().map_or([].iter(), |o| o.additions.iter()), + base_phase_done: false, + } + } +} + +/// Borrowing iterator over a `TagSet`. +pub struct TagSetIter<'a> { + base_iter: super::SharedTagSetIterator<'a>, + base_index: usize, + overlay: Option<&'a TagSetOverlay>, + additions_iter: std::slice::Iter<'a, Tag>, + base_phase_done: bool, +} + +impl<'a> Iterator for TagSetIter<'a> { + type Item = &'a Tag; + + fn next(&mut self) -> Option { + // First yield non-removed base tags. + if !self.base_phase_done { + loop { + if let Some(tag) = self.base_iter.next() { + let idx = self.base_index; + self.base_index += 1; + let removed = self.overlay.is_some_and(|o| o.removals.is_set(idx)); + if !removed { + return Some(tag); + } + } else { + self.base_phase_done = true; + break; + } + } + } + + // Then yield additions. + self.additions_iter.next() } } impl FromIterator for TagSet { fn from_iter>(iter: I) -> Self { - Self(iter.into_iter().collect()) + let additions: SmallVec<[Tag; 4]> = iter.into_iter().collect(); + if additions.is_empty() { + Self { + base: SharedTagSet::default(), + overlay: None, + } + } else { + Self { + base: SharedTagSet::default(), + overlay: Some(Box::new(TagSetOverlay { + additions, + removals: ContiguousBitSet::new(), + })), + } + } } } impl Extend for TagSet { fn extend>(&mut self, iter: T) { - self.0.extend(iter) + self.ensure_overlay().additions.extend(iter) } } impl From for TagSet { fn from(tag: Tag) -> Self { - Self(vec![tag]) + let mut additions = SmallVec::new(); + additions.push(tag); + Self { + base: SharedTagSet::default(), + overlay: Some(Box::new(TagSetOverlay { + additions, + removals: ContiguousBitSet::new(), + })), + } + } +} + +impl From for TagSet { + fn from(shared: SharedTagSet) -> Self { + Self { + base: shared, + overlay: None, + } } } @@ -199,11 +491,12 @@ impl fmt::Display for TagSet { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "[")?; - for (i, tag) in self.0.iter().enumerate() { - if i > 0 { + let mut first = true; + for tag in self { + if !first { write!(f, ",")?; } - + first = false; write!(f, "{}", tag.as_str())?; } @@ -211,11 +504,504 @@ impl fmt::Display for TagSet { } } -fn tag_has_name(tag: &Tag, tag_name: &str) -> bool { - // Try matching it as a key-value pair (e.g., `env:production`) first, and then just try matching it as a bare tag - // (e.g., `production`). - let tag_str = tag.0.deref(); - tag_str - .split_once(':') - .map_or_else(|| tag_str == tag_name, |(name, _)| name == tag_name) +/// Creates an indexed iterator over the tags in a `SharedTagSet`. +fn base_indexed_iter(base: &SharedTagSet) -> BaseIndexIter<'_> { + BaseIndexIter { + inner: base.into_iter(), + index: 0, + } +} + +/// Checks if a given flattened index is marked as removed in the overlay's bitset. +fn is_overlay_removed(overlay: &Option>, index: usize) -> bool { + overlay.as_ref().is_some_and(|o| o.removals.is_set(index)) +} + +/// Helper iterator that pairs base tags with their flattened index. +struct BaseIndexIter<'a> { + inner: super::SharedTagSetIterator<'a>, + index: usize, +} + +impl<'a> Iterator for BaseIndexIter<'a> { + type Item = (usize, &'a Tag); + + fn next(&mut self) -> Option { + self.inner.next().map(|tag| { + let idx = self.index; + self.index += 1; + (idx, tag) + }) + } +} + +#[cfg(test)] +mod tests { + use std::collections::{BTreeSet, HashSet}; + + use proptest::{prelude::*, prop_oneof}; + + use super::*; + + /// Helper: create a SharedTagSet from static tag strings. + fn shared_from(tags: &[&str]) -> SharedTagSet { + let ts: TagSet = tags.iter().map(|s| Tag::from(*s)).collect(); + ts.into_shared() + } + + /// Helper: create a SharedTagSet with multiple chained FrozenTagSets. + fn chained_shared(groups: &[&[&str]]) -> SharedTagSet { + let mut shared = shared_from(groups[0]); + for group in &groups[1..] { + shared.extend_from_shared(&shared_from(group)); + } + shared + } + + /// Collect all tags from a TagSet as sorted strings for comparison. + fn collect_sorted(ts: &TagSet) -> Vec { + let mut tags: Vec = ts.into_iter().map(|t| t.as_str().to_string()).collect(); + tags.sort(); + tags + } + + // --- Size --- + + #[test] + fn tagset_size_is_compact() { + assert_eq!(std::mem::size_of::(), 32); + } + + // --- Construction --- + + #[test] + fn construction_from_scratch() { + let mut ts = TagSet::with_capacity(3); + ts.insert_tag(Tag::from("env:prod")); + ts.insert_tag(Tag::from("service:web")); + ts.insert_tag(Tag::from("bare_tag")); + + assert_eq!(ts.len(), 3); + assert!(!ts.is_empty()); + assert!(ts.has_tag("env:prod")); + assert!(ts.has_tag("service:web")); + assert!(ts.has_tag("bare_tag")); + } + + #[test] + fn construction_empty() { + let ts = TagSet::default(); + assert!(ts.is_empty()); + assert_eq!(ts.len(), 0); + } + + // --- Round-trip --- + + #[test] + fn round_trip_no_mutations() { + let original = shared_from(&["a:1", "b:2", "c:3"]); + let ts = TagSet::from(original.clone()); + + // No mutations — into_shared should return the base as-is. + assert!(!ts.is_modified()); + let result = ts.into_shared(); + assert_eq!(result, original); + } + + #[test] + fn round_trip_with_mutations() { + let original = shared_from(&["a:1", "b:2"]); + let mut ts = TagSet::from(original); + ts.insert_tag(Tag::from("c:3")); + + let result = ts.into_shared(); + assert_eq!(result.len(), 3); + assert!(result.has_tag("a:1")); + assert!(result.has_tag("b:2")); + assert!(result.has_tag("c:3")); + } + + // --- Insert --- + + #[test] + fn insert_duplicate_is_noop() { + let mut ts = TagSet::default(); + ts.insert_tag(Tag::from("a:1")); + ts.insert_tag(Tag::from("a:1")); + assert_eq!(ts.len(), 1); + } + + #[test] + fn insert_duplicate_in_base_is_noop() { + let base = shared_from(&["a:1", "b:2"]); + let mut ts = TagSet::from(base); + ts.insert_tag(Tag::from("a:1")); + // Should not be modified since the exact tag exists in the base. + assert!(!ts.is_modified()); + assert_eq!(ts.len(), 2); + } + + #[test] + fn insert_same_name_different_value_keeps_both() { + let base = shared_from(&["env:staging", "service:web"]); + let mut ts = TagSet::from(base); + ts.insert_tag(Tag::from("env:production")); + + // Both env tags coexist — insert only deduplicates by exact value, not by name. + assert_eq!(ts.len(), 3); + assert!(ts.has_tag("env:staging")); + assert!(ts.has_tag("env:production")); + assert!(ts.has_tag("service:web")); + } + + #[test] + fn insert_same_name_different_value_in_additions() { + let mut ts = TagSet::default(); + ts.insert_tag(Tag::from("env:staging")); + ts.insert_tag(Tag::from("env:production")); + + // Both coexist. + assert_eq!(ts.len(), 2); + assert!(ts.has_tag("env:staging")); + assert!(ts.has_tag("env:production")); + } + + // --- Remove --- + + #[test] + fn remove_from_base() { + let base = shared_from(&["a:1", "b:2", "c:3"]); + let mut ts = TagSet::from(base); + + let removed = ts.remove_tags("b").unwrap(); + assert_eq!(removed.len(), 1); + assert_eq!(removed[0].as_str(), "b:2"); + + assert_eq!(ts.len(), 2); + assert!(ts.has_tag("a:1")); + assert!(!ts.has_tag("b:2")); + assert!(ts.has_tag("c:3")); + } + + #[test] + fn remove_from_additions() { + let mut ts = TagSet::default(); + ts.insert_tag(Tag::from("a:1")); + ts.insert_tag(Tag::from("b:2")); + + let removed = ts.remove_tags("a").unwrap(); + assert_eq!(removed.len(), 1); + assert_eq!(removed[0].as_str(), "a:1"); + assert_eq!(ts.len(), 1); + assert!(!ts.has_tag("a:1")); + } + + #[test] + fn remove_nonexistent_returns_none() { + let base = shared_from(&["a:1"]); + let mut ts = TagSet::from(base); + assert!(ts.remove_tags("z").is_none()); + } + + #[test] + fn remove_then_reinsert() { + let base = shared_from(&["env:staging"]); + let mut ts = TagSet::from(base); + + ts.remove_tags("env"); + assert!(!ts.has_tag("env:staging")); + + ts.insert_tag(Tag::from("env:production")); + assert_eq!(ts.len(), 1); + assert_eq!(ts.get_single_tag("env").unwrap().as_str(), "env:production"); + assert!(!ts.has_tag("env:staging")); + } + + // --- Iteration --- + + #[test] + fn iteration_order_base_then_additions() { + let base = shared_from(&["a:1", "b:2"]); + let mut ts = TagSet::from(base); + ts.insert_tag(Tag::from("c:3")); + + let tags: Vec<&str> = (&ts).into_iter().map(|t| t.as_str()).collect(); + assert_eq!(tags, vec!["a:1", "b:2", "c:3"]); + } + + #[test] + fn iteration_skips_removed() { + let base = shared_from(&["a:1", "b:2", "c:3"]); + let mut ts = TagSet::from(base); + ts.remove_tags("b"); + + let tags: Vec<&str> = (&ts).into_iter().map(|t| t.as_str()).collect(); + assert_eq!(tags, vec!["a:1", "c:3"]); + } + + // --- Chained bases --- + + #[test] + fn remove_from_second_chain() { + let base = chained_shared(&[&["a:1", "b:2"], &["c:3", "d:4"]]); + assert_eq!(base.len(), 4); + + let mut ts = TagSet::from(base); + ts.remove_tags("c"); + + assert_eq!(ts.len(), 3); + assert_eq!(collect_sorted(&ts), vec!["a:1", "b:2", "d:4"]); + } + + #[test] + fn insert_same_name_in_second_chain_keeps_both() { + let base = chained_shared(&[&["a:1"], &["b:2"]]); + let mut ts = TagSet::from(base); + ts.insert_tag(Tag::from("b:other")); + + // Both b tags coexist. + assert_eq!(ts.len(), 3); + assert!(ts.has_tag("b:2")); + assert!(ts.has_tag("b:other")); + } + + // --- Clone independence --- + + #[test] + fn clone_is_independent() { + let base = shared_from(&["a:1", "b:2"]); + let mut ts = TagSet::from(base); + let mut cloned = ts.clone(); + + ts.insert_tag(Tag::from("c:3")); + cloned.remove_tags("a"); + + assert_eq!(ts.len(), 3); + assert_eq!(cloned.len(), 1); + assert!(ts.has_tag("a:1")); + assert!(!cloned.has_tag("a:1")); + } + + // --- into_shared optimization --- + + #[test] + fn into_shared_unmodified_returns_base() { + let base = shared_from(&["a:1", "b:2"]); + let ts = TagSet::from(base.clone()); + let result = ts.into_shared(); + // They should be equal. + assert_eq!(result, base); + } + + // --- len correctness --- + + #[test] + fn len_accounts_for_all_layers() { + let base = shared_from(&["a:1", "b:2", "c:3"]); + let mut ts = TagSet::from(base); + + assert_eq!(ts.len(), 3); + + ts.remove_tags("b"); + assert_eq!(ts.len(), 2); + + ts.insert_tag(Tag::from("d:4")); + assert_eq!(ts.len(), 3); + + ts.insert_tag(Tag::from("a:other")); // Coexists with base a:1 + assert_eq!(ts.len(), 4); + } + + // --- Retain --- + + #[test] + fn retain_filters_base_and_additions() { + let base = shared_from(&["a:1", "b:2"]); + let mut ts = TagSet::from(base); + ts.insert_tag(Tag::from("c:3")); + + // Keep only tags whose name starts with 'a' or 'c'. + ts.retain(|tag| { + let name = tag.name(); + name.starts_with('a') || name.starts_with('c') + }); + + assert_eq!(ts.len(), 2); + assert_eq!(collect_sorted(&ts), vec!["a:1", "c:3"]); + } + + // --- Large base (bitset growth) --- + + #[test] + fn large_base_beyond_128_tags() { + let tags: Vec = (0..200).map(|i| format!("tag{}:{}", i, i)).collect(); + let tag_strs: Vec<&str> = tags.iter().map(|s| s.as_str()).collect(); + let base = shared_from(&tag_strs); + + let mut ts = TagSet::from(base); + // Remove a tag near the end (index > 128, requiring bitset growth). + ts.remove_tags("tag199"); + assert_eq!(ts.len(), 199); + assert!(!ts.has_tag("tag199:199")); + assert!(ts.has_tag("tag0:0")); + assert!(ts.has_tag("tag150:150")); + } + + // --- Merge --- + + #[test] + fn merge_missing_skips_duplicates() { + let base = shared_from(&["a:1", "b:2"]); + let mut ts = TagSet::from(base); + + let other: TagSet = vec![Tag::from("b:2"), Tag::from("c:3")].into_iter().collect(); + ts.merge_missing(other); + + assert_eq!(ts.len(), 3); + assert_eq!(collect_sorted(&ts), vec!["a:1", "b:2", "c:3"]); + } + + #[test] + fn merge_missing_shared() { + let base = shared_from(&["a:1"]); + let mut ts = TagSet::from(base); + + let other = shared_from(&["a:1", "b:2"]); + ts.merge_missing_shared(&other); + + assert_eq!(ts.len(), 2); + assert_eq!(collect_sorted(&ts), vec!["a:1", "b:2"]); + } + + // --- to_mutable convenience --- + + #[test] + fn shared_to_mutable() { + let shared = shared_from(&["a:1", "b:2"]); + let mut ts = shared.to_mutable(); + ts.insert_tag(Tag::from("c:3")); + assert_eq!(ts.len(), 3); + } + + /// Operations we can apply to a TagSet. + #[derive(Clone, Debug)] + enum Op { + Insert(String), + RemoveByName(String), + } + + /// Strategy for generating a tag string like "key:value". + fn arb_tag() -> impl Strategy { + ("[a-z]{1,4}", "[a-z0-9]{1,4}").prop_map(|(k, v)| format!("{k}:{v}")) + } + + /// Strategy for generating a tag key name. + fn arb_key() -> impl Strategy { + "[a-z]{1,4}".prop_map(|s| s) + } + + /// Strategy for generating a random operation. + fn arb_op() -> impl Strategy { + prop_oneof![arb_tag().prop_map(Op::Insert), arb_key().prop_map(Op::RemoveByName)] + } + + /// Strategy for generating a group of tags (for one FrozenTagSet in the chain). + fn arb_tag_group() -> impl Strategy> { + proptest::collection::vec(arb_tag(), 0..10) + } + + /// Strategy for generating a base with 1-3 chained tag groups. + fn arb_base_groups() -> impl Strategy>> { + proptest::collection::vec(arb_tag_group(), 1..4) + } + + /// Build a SharedTagSet from multiple groups (each becomes a chained FrozenTagSet). + /// Deduplicates exact tags across groups to avoid cross-chain duplicates. + fn build_chained_base(groups: &[Vec]) -> SharedTagSet { + let mut seen_tags = HashSet::new(); + + let mut shared = { + let ts: TagSet = groups[0] + .iter() + .filter(|s| seen_tags.insert(s.to_string())) + .map(|s| Tag::from(s.as_str())) + .collect(); + ts.into_shared() + }; + for group in &groups[1..] { + let ts: TagSet = group + .iter() + .filter(|s| seen_tags.insert(s.to_string())) + .map(|s| Tag::from(s.as_str())) + .collect(); + shared.extend_from_shared(&ts.into_shared()); + } + shared + } + + /// Reference model: a Vec of full tag strings. + /// Insert adds if exact tag not present. Remove-by-name removes all with matching name. + fn apply_to_reference(reference: &mut Vec, op: &Op) { + match op { + Op::Insert(tag) => { + if !reference.contains(tag) { + reference.push(tag.clone()); + } + } + Op::RemoveByName(name) => { + reference.retain(|t| { + let tag_name = t.split_once(':').map_or(t.as_str(), |(n, _)| n); + tag_name != name.as_str() + }); + } + } + } + + /// Apply operations to a TagSet. + fn apply_to_tagset(ts: &mut TagSet, op: &Op) { + match op { + Op::Insert(tag) => { + ts.insert_tag(Tag::from(tag.as_str())); + } + Op::RemoveByName(name) => { + ts.remove_tags(name.as_str()); + } + } + } + + proptest! { + #![proptest_config(ProptestConfig::with_cases(10000))] + + #[test] + #[cfg_attr(miri, ignore)] + fn property_test_overlay_matches_reference( + base_groups in arb_base_groups(), + ops in prop::collection::vec(arb_op(), 0..20), + ) { + let base = build_chained_base(&base_groups); + + // Build the reference model from the base. + let mut reference: Vec = base.into_iter().map(|t| t.as_str().to_string()).collect(); + + // Build the TagSet from the base. + let mut ts = TagSet::from(base); + + // Apply operations to both. + for op in &ops { + apply_to_reference(&mut reference, op); + apply_to_tagset(&mut ts, op); + } + + // Verify len matches before consuming. + prop_assert_eq!(ts.len(), reference.len(), + "len mismatch: TagSet={}, reference={}", ts.len(), reference.len()); + + // Compare as sorted sets of tag strings. + let ref_set: BTreeSet = reference.iter().cloned().collect(); + let ts_set: BTreeSet = ts.into_iter().map(|t| t.as_str().to_string()).collect(); + + prop_assert_eq!(&ref_set, &ts_set, + "TagSet and reference diverged after ops: {:?}", ops); + } + } } diff --git a/lib/saluki-context/src/tags/tagset/shared.rs b/lib/saluki-context/src/tags/tagset/shared.rs index deff88e4a31..a3749fb8961 100644 --- a/lib/saluki-context/src/tags/tagset/shared.rs +++ b/lib/saluki-context/src/tags/tagset/shared.rs @@ -3,7 +3,7 @@ use std::{fmt, ops::Deref as _, sync::Arc}; use serde::{ser::SerializeSeq as _, Serialize}; use smallvec::SmallVec; -use super::TagSet; +use super::{frozen::FrozenTagSet, TagSet}; use crate::tags::Tag; /// A shared, read-only set of tags. @@ -12,14 +12,14 @@ use crate::tags::Tag; /// /// In many cases, it is useful to extend a set of tags with additional tags, without needing to clone the additional /// tags or re-allocate the underlying storage to fit the entire set of tags. `SharedTagSet` supports this by utilizing -/// "structural sharing", where `SharedTagSet` is internally represented by a set of smart pointers to `TagSet`. +/// "structural sharing", where `SharedTagSet` is internally represented by a set of smart pointers to `FrozenTagSet`. /// /// This allows `SharedTagSet` to be cheaply extended with additional `SharedTagSet` instances, without needing to /// allocate enough underlying storage to hold all of the individual tags. Extending a `SharedTagSet` will allocate a /// small amount of memory (8 bytes) for each additional `SharedTagSet` that is chained after the first additional one: /// this means that all new `SharedTagSet` instances can be extended once with no allocations whatsoever. #[derive(Clone, Debug, Default)] -pub struct SharedTagSet(SmallVec<[Arc; 2]>); +pub struct SharedTagSet(SmallVec<[Arc; 2]>); impl SharedTagSet { /// Returns `true` if the tag set is empty. @@ -55,11 +55,11 @@ impl SharedTagSet { /// Extends `self` with the tags from the `other`. /// - /// If any of the individual `TagSet` instances in `other` are already present in `self`, they will not be added + /// If any of the individual `FrozenTagSet` instances in `other` are already present in `self`, they will not be added /// again. This method does not avoid duplicates across different `SharedTagSet` instances, so if the same tag is /// present in both `self` and `other`, it will be present when querying the resulting `SharedTagSet`. pub fn extend_from_shared(&mut self, other: &SharedTagSet) { - // For each underlying `TagSet` in the other `SharedTagSet`, check if it is already present in this one, and if + // For each underlying `FrozenTagSet` in the other `SharedTagSet`, check if it is already present in this one, and if // not, add it. for tag_set in &other.0 { if !self.0.iter().any(|ts| Arc::ptr_eq(ts, tag_set)) { @@ -68,9 +68,35 @@ impl SharedTagSet { } } - /// Returns a reference to the underlying `TagSet` instances. - pub fn as_tag_sets(&self) -> &[Arc] { - &self.0 + /// Creates a `SharedTagSet` from a vector of tags. + pub(super) fn from_tags(tags: Vec) -> Self { + if tags.is_empty() { + return Self::default(); + } + let mut inner = SmallVec::new(); + inner.push(Arc::new(FrozenTagSet::new(tags))); + Self(inner) + } + + /// Gets a tag by its flattened index across all chained tag sets. + pub(super) fn get_by_flat_index(&self, index: usize) -> Option<&Tag> { + let mut remaining = index; + for ts in &self.0 { + let len = ts.len(); + if remaining < len { + return ts.into_iter().nth(remaining); + } + remaining -= len; + } + None + } + + /// Creates a mutable `TagSet` from this `SharedTagSet`. + /// + /// This clones the `SharedTagSet` (cheap — just Arc pointer copies) and wraps it as the base + /// of a new `TagSet` that can be mutated. + pub fn to_mutable(&self) -> TagSet { + TagSet::from(self.clone()) } /// Returns the size of the tag set, in bytes. @@ -82,15 +108,13 @@ impl SharedTagSet { /// worst-case usage, and should be used as a rough estimate. pub fn size_of(&self) -> usize { // Calculate the size of the SharedTagSet, which includes the size of the SmallVec and the size of each Arc. - (self.0.len() * std::mem::size_of::>()) + self.0.iter().map(|ts| ts.size_of()).sum::() + (self.0.len() * std::mem::size_of::>()) + self.0.iter().map(|ts| ts.size_of()).sum::() } } impl From for SharedTagSet { fn from(tag_set: TagSet) -> Self { - let mut inner = SmallVec::new(); - inner.push(Arc::new(tag_set)); - Self(inner) + tag_set.into_shared() } } @@ -196,7 +220,7 @@ impl FromIterator for SharedTagSet { /// Iterator over the tags in a `SharedTagSet`. #[derive(Clone)] pub struct SharedTagSetIterator<'a> { - inner: std::slice::Iter<'a, Arc>, + inner: std::slice::Iter<'a, Arc>, current: Option>, } diff --git a/lib/saluki-core/src/data_model/event/eventd/mod.rs b/lib/saluki-core/src/data_model/event/eventd/mod.rs index a32bc20261a..4979773429b 100644 --- a/lib/saluki-core/src/data_model/event/eventd/mod.rs +++ b/lib/saluki-core/src/data_model/event/eventd/mod.rs @@ -2,7 +2,7 @@ use std::{fmt, num::NonZeroU64}; -use saluki_context::tags::SharedTagSet; +use saluki_context::tags::TagSet; use serde::{Serialize, Serializer}; use stringtheory::MetaString; @@ -132,8 +132,8 @@ pub struct EventD { #[serde(skip_serializing_if = "MetaString::is_empty")] source_type_name: MetaString, alert_type: Option, - tags: SharedTagSet, - origin_tags: SharedTagSet, + tags: TagSet, + origin_tags: TagSet, } impl EventD { @@ -192,12 +192,12 @@ impl EventD { } /// Returns the tags associated with the event. - pub fn tags(&self) -> &SharedTagSet { + pub fn tags(&self) -> &TagSet { &self.tags } /// Returns the origin tags associated with the event. - pub fn origin_tags(&self) -> &SharedTagSet { + pub fn origin_tags(&self) -> &TagSet { &self.origin_tags } @@ -308,21 +308,21 @@ impl EventD { /// Set the tags of the event. /// /// This variant is specifically for use in builder-style APIs. - pub fn with_tags(mut self, tags: impl Into) -> Self { + pub fn with_tags(mut self, tags: impl Into) -> Self { self.tags = tags.into(); self } /// Set the tags of the event. - pub fn set_tags(&mut self, tags: impl Into>) { + pub fn set_tags(&mut self, tags: impl Into>) { self.tags = tags.into().unwrap_or_default(); } /// Set the origin tags of the event. /// /// This variant is specifically for use in builder-style APIs. - pub fn with_origin_tags(mut self, origin_tags: SharedTagSet) -> Self { - self.origin_tags = origin_tags; + pub fn with_origin_tags(mut self, origin_tags: impl Into) -> Self { + self.origin_tags = origin_tags.into(); self } @@ -339,8 +339,8 @@ impl EventD { priority: Some(Priority::Normal), source_type_name: MetaString::empty(), alert_type: Some(AlertType::Info), - tags: SharedTagSet::default(), - origin_tags: SharedTagSet::default(), + tags: TagSet::default(), + origin_tags: TagSet::default(), } } } diff --git a/lib/saluki-core/src/data_model/event/log/mod.rs b/lib/saluki-core/src/data_model/event/log/mod.rs index 63fbe147137..c7d73eb43ed 100644 --- a/lib/saluki-core/src/data_model/event/log/mod.rs +++ b/lib/saluki-core/src/data_model/event/log/mod.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; -use saluki_context::tags::SharedTagSet; +use saluki_context::tags::TagSet; use serde_json::Value as JsonValue; use stringtheory::MetaString; @@ -20,7 +20,7 @@ pub struct Log { /// Service associated with the log. service: MetaString, /// Tags of the log. - tags: SharedTagSet, + tags: TagSet, /// Additional properties of the log. additional_properties: HashMap, } @@ -74,7 +74,7 @@ impl Log { source: None, hostname: MetaString::empty(), service: MetaString::empty(), - tags: SharedTagSet::default(), + tags: TagSet::default(), additional_properties: HashMap::new(), } } @@ -104,8 +104,8 @@ impl Log { } /// Sets the tags string. - pub fn with_tags(mut self, tags: impl Into>) -> Self { - self.tags = tags.into().unwrap_or_else(SharedTagSet::default); + pub fn with_tags(mut self, tags: impl Into>) -> Self { + self.tags = tags.into().unwrap_or_else(TagSet::default); self } @@ -143,7 +143,7 @@ impl Log { } /// Returns the tags, if set. - pub fn tags(&self) -> &SharedTagSet { + pub fn tags(&self) -> &TagSet { &self.tags } diff --git a/lib/saluki-core/src/data_model/event/service_check/mod.rs b/lib/saluki-core/src/data_model/event/service_check/mod.rs index 1b6b4441c40..cac27b436fa 100644 --- a/lib/saluki-core/src/data_model/event/service_check/mod.rs +++ b/lib/saluki-core/src/data_model/event/service_check/mod.rs @@ -1,7 +1,7 @@ //! Service checks. use saluki_common::iter::ReusableDeduplicator; -use saluki_context::tags::SharedTagSet; +use saluki_context::tags::TagSet; use serde::{ser::SerializeMap as _, Serialize, Serializer}; use stringtheory::MetaString; @@ -32,8 +32,8 @@ pub struct ServiceCheck { timestamp: Option, hostname: MetaString, message: MetaString, - tags: SharedTagSet, - origin_tags: SharedTagSet, + tags: TagSet, + origin_tags: TagSet, } impl ServiceCheck { @@ -73,12 +73,12 @@ impl ServiceCheck { } /// Returns the tags associated with the check. - pub fn tags(&self) -> &SharedTagSet { + pub fn tags(&self) -> &TagSet { &self.tags } /// Returns the origin tags associated with the check. - pub fn origin_tags(&self) -> &SharedTagSet { + pub fn origin_tags(&self) -> &TagSet { &self.origin_tags } @@ -90,8 +90,8 @@ impl ServiceCheck { timestamp: None, hostname: MetaString::empty(), message: MetaString::empty(), - tags: SharedTagSet::default(), - origin_tags: SharedTagSet::default(), + tags: TagSet::default(), + origin_tags: TagSet::default(), } } @@ -119,7 +119,7 @@ impl ServiceCheck { /// Set the tags of the service check /// /// This variant is specifically for use in builder-style APIs. - pub fn with_tags(mut self, tags: impl Into) -> Self { + pub fn with_tags(mut self, tags: impl Into) -> Self { self.tags = tags.into(); self } @@ -138,8 +138,8 @@ impl ServiceCheck { /// Set the origin tags of the service check /// /// This variant is specifically for use in builder-style APIs. - pub fn with_origin_tags(mut self, origin_tags: SharedTagSet) -> Self { - self.origin_tags = origin_tags; + pub fn with_origin_tags(mut self, origin_tags: impl Into) -> Self { + self.origin_tags = origin_tags.into(); self } } @@ -221,8 +221,8 @@ impl TryFrom for CheckStatus { // Helper type to let us serialize deduplicated tags. struct DeduplicatedTagsSerializable<'a> { - tags: &'a SharedTagSet, - origin_tags: &'a SharedTagSet, + tags: &'a TagSet, + origin_tags: &'a TagSet, } impl<'a> Serialize for DeduplicatedTagsSerializable<'a> { diff --git a/lib/saluki-core/src/data_model/event/trace/mod.rs b/lib/saluki-core/src/data_model/event/trace/mod.rs index 74ab404c4b4..cd2214238f4 100644 --- a/lib/saluki-core/src/data_model/event/trace/mod.rs +++ b/lib/saluki-core/src/data_model/event/trace/mod.rs @@ -1,7 +1,7 @@ //! Traces. use saluki_common::collections::FastHashMap; -use saluki_context::tags::SharedTagSet; +use saluki_context::tags::TagSet; use stringtheory::MetaString; /// Trace-level sampling metadata. @@ -61,7 +61,7 @@ pub struct Trace { /// Resource-level tags associated with this trace. /// /// This is derived from the resource of the spans and used to construct the tracer payload. - resource_tags: SharedTagSet, + resource_tags: TagSet, /// Trace-level sampling metadata. /// /// This field contains sampling decision information (priority, decision maker, rates) @@ -72,7 +72,7 @@ pub struct Trace { impl Trace { /// Creates a new `Trace` with the given spans. - pub fn new(spans: Vec, resource_tags: impl Into) -> Self { + pub fn new(spans: Vec, resource_tags: impl Into) -> Self { Self { spans, resource_tags: resource_tags.into(), @@ -142,7 +142,7 @@ impl Trace { } /// Returns the resource-level tags associated with this trace. - pub fn resource_tags(&self) -> &SharedTagSet { + pub fn resource_tags(&self) -> &TagSet { &self.resource_tags } diff --git a/lib/saluki-core/src/data_model/event/trace_stats/mod.rs b/lib/saluki-core/src/data_model/event/trace_stats/mod.rs index 03ae2913caa..d51f0db4bf6 100644 --- a/lib/saluki-core/src/data_model/event/trace_stats/mod.rs +++ b/lib/saluki-core/src/data_model/event/trace_stats/mod.rs @@ -1,6 +1,6 @@ //! Trace stats. -use saluki_context::tags::SharedTagSet; +use saluki_context::tags::TagSet; use stringtheory::MetaString; /// Trace statistics output from the APM Stats transform. @@ -47,7 +47,7 @@ pub struct ClientStatsPayload { agent_aggregation: MetaString, service: MetaString, container_id: MetaString, - tags: SharedTagSet, + tags: TagSet, git_commit_sha: MetaString, image_tag: MetaString, process_tags_hash: u64, @@ -114,7 +114,7 @@ impl ClientStatsPayload { } /// Sets the orchestrator tags. - pub fn with_tags(mut self, tags: impl Into) -> Self { + pub fn with_tags(mut self, tags: impl Into) -> Self { self.tags = tags.into(); self } @@ -199,7 +199,7 @@ impl ClientStatsPayload { } /// Returns the orchestrator tags. - pub fn tags(&self) -> &SharedTagSet { + pub fn tags(&self) -> &TagSet { &self.tags }