From 935bba59d33e7364dd9ceea9bf3014ca02d07fa6 Mon Sep 17 00:00:00 2001 From: Raymond Zhao Date: Thu, 28 May 2026 12:04:37 -0400 Subject: [PATCH] encode metric units in v3 payloads --- .../src/encoders/datadog/metrics/mod.rs | 72 ++++++++++++++++- .../encoders/datadog/metrics/v3/constants.rs | 2 + .../src/encoders/datadog/metrics/v3/writer.rs | 81 ++++++++++++++++++- 3 files changed, 152 insertions(+), 3 deletions(-) diff --git a/lib/saluki-components/src/encoders/datadog/metrics/mod.rs b/lib/saluki-components/src/encoders/datadog/metrics/mod.rs index 209d2eb8d09..cd7cb1c267c 100644 --- a/lib/saluki-components/src/encoders/datadog/metrics/mod.rs +++ b/lib/saluki-components/src/encoders/datadog/metrics/mod.rs @@ -979,6 +979,12 @@ fn write_metric_to_v3(writer: &mut v3::V3Writer, metric: &Metric, additional_tag } } + if metric_type != v3::V3MetricType::Sketch { + if let Some(unit) = metric.metadata().unit() { + builder.set_unit(unit); + } + } + // Points based on metric type match metric.values() { MetricValues::Counter(points) | MetricValues::Gauge(points) => { @@ -1102,7 +1108,12 @@ async fn create_v3_request( #[cfg(test)] mod tests { - use saluki_core::data_model::{event::Event, payload::Payload}; + use saluki_context::Context; + use saluki_core::data_model::{ + event::{metric::MetricMetadata, Event}, + payload::Payload, + }; + use stringtheory::MetaString; use tokio::time::timeout; use super::*; @@ -1153,6 +1164,65 @@ serializer_experimental_use_v3_api: assert_eq!("/api/intake/metrics/custom/series", request.uri()); } + #[test] + fn v3_series_metric_unit_refs_are_encoded_sparsely() { + let context = Context::from_static_parts("my.timer.avg", &[]); + let metadata = MetricMetadata::default().with_unit(MetaString::from_static("millisecond")); + let gauge = Metric::from_parts(context, MetricValues::gauge([1.0_f64]), metadata); + let context = Context::from_static_parts("my.counter", &[]); + let no_unit = Metric::from_parts(context, MetricValues::gauge([2.0_f64]), MetricMetadata::default()); + let context = Context::from_static_parts("my.timer.max", &[]); + let metadata = MetricMetadata::default().with_unit(MetaString::from_static("millisecond")); + let same_unit = Metric::from_parts(context, MetricValues::gauge([3.0_f64]), metadata); + + let payload = encode_v3_metrics_batch(&[gauge, no_unit, same_unit], &SharedTagSet::default()) + .expect("V3 metric should encode successfully"); + + let expected_unit_dict = [ + 0xca, 0x01, // field 25, length-delimited. + 0x0c, // field payload length: varint string length + string bytes. + 0x0b, b'm', b'i', b'l', b'l', b'i', b's', b'e', b'c', b'o', b'n', b'd', + ]; + assert!( + payload + .windows(expected_unit_dict.len()) + .any(|window| window == expected_unit_dict), + "V3 payload should contain DictUnitStr field for 'millisecond', got bytes: {:?}", + payload + ); + + let expected_unit_ref = [ + 0xd2, 0x01, // field 26, length-delimited. + 0x02, // packed field payload length. + 0x02, 0x00, // sparse unit refs for metrics 1 and 3 only: refs [1, 1] -> deltas [1, 0]. + ]; + assert!( + payload + .windows(expected_unit_ref.len()) + .any(|window| window == expected_unit_ref), + "V3 payload should contain UnitRef field for 'millisecond', got bytes: {:?}", + payload + ); + } + + #[test] + fn v3_sketch_metric_unit_not_encoded() { + let context = Context::from_static_parts("my.histogram", &[]); + let metadata = MetricMetadata::default().with_unit(MetaString::from_static("millisecond")); + let histogram = Metric::from_parts(context, MetricValues::histogram([1.0_f64]), metadata); + + let payload = encode_v3_metrics_batch(&[histogram], &SharedTagSet::default()) + .expect("V3 sketch metric should encode successfully"); + + assert!( + !payload + .windows(b"millisecond".len()) + .any(|window| window == b"millisecond"), + "V3 sketch payload should not contain unit bytes, matching the Agent V3 sketch builder: {:?}", + payload + ); + } + #[tokio::test] async fn validation_split_flush_assigns_batch_id_to_carried_metric() { let v2_endpoint_config = EndpointConfiguration::new(CompressionScheme::noop(), 1, None); diff --git a/lib/saluki-components/src/encoders/datadog/metrics/v3/constants.rs b/lib/saluki-components/src/encoders/datadog/metrics/v3/constants.rs index 5f606bfcdee..6f54ea38af4 100644 --- a/lib/saluki-components/src/encoders/datadog/metrics/v3/constants.rs +++ b/lib/saluki-components/src/encoders/datadog/metrics/v3/constants.rs @@ -25,3 +25,5 @@ pub const SKETCH_BIN_KEYS_FIELD_NUMBER: u32 = 21; pub const SKETCH_BIN_CNTS_FIELD_NUMBER: u32 = 22; pub const SOURCE_TYPE_NAME_FIELD_NUMBER: u32 = 23; pub const ORIGIN_INFO_FIELD_NUMBER: u32 = 24; +pub const DICT_UNIT_STR_FIELD_NUMBER: u32 = 25; +pub const UNIT_REFS_FIELD_NUMBER: u32 = 26; diff --git a/lib/saluki-components/src/encoders/datadog/metrics/v3/writer.rs b/lib/saluki-components/src/encoders/datadog/metrics/v3/writer.rs index 07d48b14234..71ccd96e8f8 100644 --- a/lib/saluki-components/src/encoders/datadog/metrics/v3/writer.rs +++ b/lib/saluki-components/src/encoders/datadog/metrics/v3/writer.rs @@ -11,6 +11,7 @@ use super::interner::Interner; use super::types::{value_type_for_values, V3MetricType, V3ValueType}; const FLAG_NO_INDEX: u64 = 0x100; +const FLAG_HAS_UNIT: u64 = 0x200; /// Encoded V3 payload data ready for protobuf serialization. /// @@ -27,8 +28,9 @@ struct V3EncodedData { pub dict_resource_name: Vec, pub dict_source_type_bytes: Vec, pub dict_origin_info: Vec, + pub dict_unit_bytes: Vec, - // Per-metric columns (one entry per metric) + // Per-metric columns (one entry per metric, except conditional columns) pub types: Vec, pub names: Vec, pub tags: Vec, @@ -37,6 +39,7 @@ struct V3EncodedData { pub num_points: Vec, pub source_type_names: Vec, pub origin_infos: Vec, + pub unit_refs: Vec, // Present only for metrics with FLAG_HAS_UNIT set. // Point data (varies per metric based on num_points) pub timestamps: Vec, @@ -65,6 +68,7 @@ pub struct V3Writer { resource_interner: Interner>, source_type_interner: Interner, origin_interner: Interner<(i32, i32, i32)>, + unit_interner: Interner, // Dictionary encoded bytes dict_name_bytes: Vec, @@ -76,8 +80,9 @@ pub struct V3Writer { dict_resource_name: Vec, dict_source_type_bytes: Vec, dict_origin_info: Vec, + dict_unit_bytes: Vec, - // Per-metric columns + // Per-metric columns (one entry per metric, except conditional columns) types: Vec, names: Vec, tags: Vec, @@ -86,6 +91,7 @@ pub struct V3Writer { num_points: Vec, source_type_names: Vec, origin_infos: Vec, + unit_refs: Vec, // Present only for metrics with FLAG_HAS_UNIT set. // Point data timestamps: Vec, @@ -134,6 +140,7 @@ impl V3Writer { point_start_idx, sint64_start_idx, metric_idx, + unit_ref_idx: None, } } @@ -144,6 +151,7 @@ impl V3Writer { delta_encode(&mut self.resources); delta_encode(&mut self.source_type_names); delta_encode(&mut self.origin_infos); + delta_encode(&mut self.unit_refs); delta_encode(&mut self.timestamps); V3EncodedData { @@ -156,6 +164,7 @@ impl V3Writer { dict_resource_name: self.dict_resource_name, dict_source_type_bytes: self.dict_source_type_bytes, dict_origin_info: self.dict_origin_info, + dict_unit_bytes: self.dict_unit_bytes, types: self.types, names: self.names, tags: self.tags, @@ -164,6 +173,7 @@ impl V3Writer { num_points: self.num_points, source_type_names: self.source_type_names, origin_infos: self.origin_infos, + unit_refs: self.unit_refs, timestamps: self.timestamps, vals_sint64: self.vals_sint64, vals_float32: self.vals_float32, @@ -207,6 +217,9 @@ impl V3Writer { } os.write_repeated_packed_int32(DICT_ORIGIN_INFO_FIELD_NUMBER, &data.dict_origin_info)?; + if !data.dict_unit_bytes.is_empty() { + os.write_bytes(DICT_UNIT_STR_FIELD_NUMBER, &data.dict_unit_bytes)?; + } // Per-metric columns os.write_repeated_packed_uint64(TYPES_FIELD_NUMBER, &data.types)?; @@ -217,6 +230,7 @@ impl V3Writer { os.write_repeated_packed_uint64(NUM_POINTS_FIELD_NUMBER, &data.num_points)?; os.write_repeated_packed_sint64(SOURCE_TYPE_NAME_FIELD_NUMBER, &data.source_type_names)?; os.write_repeated_packed_sint64(ORIGIN_INFO_FIELD_NUMBER, &data.origin_infos)?; + os.write_repeated_packed_sint64(UNIT_REFS_FIELD_NUMBER, &data.unit_refs)?; // Point data os.write_repeated_packed_sint64(TIMESTAMPS_FIELD_NUMBER, &data.timestamps)?; @@ -363,6 +377,17 @@ impl V3Writer { } id } + + fn intern_unit(&mut self, unit: &str) -> i64 { + if unit.is_empty() { + return 0; + } + let (id, is_new) = self.unit_interner.get_or_insert(unit); + if is_new { + append_len_str(&mut self.dict_unit_bytes, unit); + } + id + } } /// Builder for a single metric within a V3 payload. @@ -374,6 +399,7 @@ pub struct V3MetricBuilder<'a> { point_start_idx: usize, sint64_start_idx: usize, metric_idx: usize, + unit_ref_idx: Option, } impl<'a> V3MetricBuilder<'a> { @@ -423,6 +449,26 @@ impl<'a> V3MetricBuilder<'a> { } } + /// Sets the unit for this metric. + pub fn set_unit(&mut self, unit: &str) { + if unit.is_empty() { + self.writer.types[self.metric_idx] &= !FLAG_HAS_UNIT; + if let Some(unit_ref_idx) = self.unit_ref_idx.take() { + self.writer.unit_refs.remove(unit_ref_idx); + } + return; + } + + let id = self.writer.intern_unit(unit); + if let Some(unit_ref_idx) = self.unit_ref_idx { + self.writer.unit_refs[unit_ref_idx] = id; + } else { + self.unit_ref_idx = Some(self.writer.unit_refs.len()); + self.writer.unit_refs.push(id); + } + self.writer.types[self.metric_idx] |= FLAG_HAS_UNIT; + } + /// Adds a data point to this metric. pub fn add_point(&mut self, timestamp: i64, value: f64) { self.writer.timestamps.push(timestamp); @@ -608,6 +654,37 @@ mod tests { assert_eq!(data.timestamps.len(), 2); } + #[test] + fn test_writer_unit() { + let mut writer = V3Writer::new(); + + { + let mut metric = writer.write(V3MetricType::Gauge, "has.unit"); + metric.set_unit("millisecond"); + metric.add_point(1000, 42.0); + metric.close(); + } + { + let mut metric = writer.write(V3MetricType::Gauge, "no.unit"); + metric.add_point(1000, 43.0); + metric.close(); + } + { + let mut metric = writer.write(V3MetricType::Gauge, "same.unit"); + metric.set_unit("millisecond"); + metric.add_point(1000, 44.0); + metric.close(); + } + + let data = writer.finalize_inner(); + + assert_eq!(data.unit_refs, vec![1, 0]); + assert_eq!(data.dict_unit_bytes, b"\x0bmillisecond"); + assert_eq!(data.types[0] & FLAG_HAS_UNIT, FLAG_HAS_UNIT); + assert_eq!(data.types[1] & FLAG_HAS_UNIT, 0); + assert_eq!(data.types[2] & FLAG_HAS_UNIT, FLAG_HAS_UNIT); + } + #[test] fn test_writer_multiple_metrics() { let mut writer = V3Writer::new();