Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 71 additions & 1 deletion lib/saluki-components/src/encoders/datadog/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -979,6 +979,12 @@ fn write_metric_to_v3(writer: &mut v3::V3Writer, metric: &Metric, additional_tag
}
}

if metric_type != v3::V3MetricType::Sketch {
if let Some(unit) = metric.metadata().unit() {
builder.set_unit(unit);
}
}

// Points based on metric type
match metric.values() {
MetricValues::Counter(points) | MetricValues::Gauge(points) => {
Expand Down Expand Up @@ -1102,7 +1108,12 @@ async fn create_v3_request(

#[cfg(test)]
mod tests {
use saluki_core::data_model::{event::Event, payload::Payload};
use saluki_context::Context;
use saluki_core::data_model::{
event::{metric::MetricMetadata, Event},
payload::Payload,
};
use stringtheory::MetaString;
use tokio::time::timeout;

use super::*;
Expand Down Expand Up @@ -1153,6 +1164,65 @@ serializer_experimental_use_v3_api:
assert_eq!("/api/intake/metrics/custom/series", request.uri());
}

#[test]
fn v3_series_metric_unit_refs_are_encoded_sparsely() {
let context = Context::from_static_parts("my.timer.avg", &[]);
let metadata = MetricMetadata::default().with_unit(MetaString::from_static("millisecond"));
let gauge = Metric::from_parts(context, MetricValues::gauge([1.0_f64]), metadata);
let context = Context::from_static_parts("my.counter", &[]);
let no_unit = Metric::from_parts(context, MetricValues::gauge([2.0_f64]), MetricMetadata::default());
let context = Context::from_static_parts("my.timer.max", &[]);
let metadata = MetricMetadata::default().with_unit(MetaString::from_static("millisecond"));
let same_unit = Metric::from_parts(context, MetricValues::gauge([3.0_f64]), metadata);

let payload = encode_v3_metrics_batch(&[gauge, no_unit, same_unit], &SharedTagSet::default())
.expect("V3 metric should encode successfully");

let expected_unit_dict = [
0xca, 0x01, // field 25, length-delimited.
0x0c, // field payload length: varint string length + string bytes.
0x0b, b'm', b'i', b'l', b'l', b'i', b's', b'e', b'c', b'o', b'n', b'd',
];
assert!(
payload
.windows(expected_unit_dict.len())
.any(|window| window == expected_unit_dict),
"V3 payload should contain DictUnitStr field for 'millisecond', got bytes: {:?}",
payload
);

let expected_unit_ref = [
0xd2, 0x01, // field 26, length-delimited.
0x02, // packed field payload length.
0x02, 0x00, // sparse unit refs for metrics 1 and 3 only: refs [1, 1] -> deltas [1, 0].
];
assert!(
payload
.windows(expected_unit_ref.len())
.any(|window| window == expected_unit_ref),
"V3 payload should contain UnitRef field for 'millisecond', got bytes: {:?}",
payload
);
}

#[test]
fn v3_sketch_metric_unit_not_encoded() {
let context = Context::from_static_parts("my.histogram", &[]);
let metadata = MetricMetadata::default().with_unit(MetaString::from_static("millisecond"));
let histogram = Metric::from_parts(context, MetricValues::histogram([1.0_f64]), metadata);

let payload = encode_v3_metrics_batch(&[histogram], &SharedTagSet::default())
.expect("V3 sketch metric should encode successfully");

assert!(
!payload
.windows(b"millisecond".len())
.any(|window| window == b"millisecond"),
"V3 sketch payload should not contain unit bytes, matching the Agent V3 sketch builder: {:?}",
payload
);
}

#[tokio::test]
async fn validation_split_flush_assigns_batch_id_to_carried_metric() {
let v2_endpoint_config = EndpointConfiguration::new(CompressionScheme::noop(), 1, None);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,5 @@ pub const SKETCH_BIN_KEYS_FIELD_NUMBER: u32 = 21;
pub const SKETCH_BIN_CNTS_FIELD_NUMBER: u32 = 22;
pub const SOURCE_TYPE_NAME_FIELD_NUMBER: u32 = 23;
pub const ORIGIN_INFO_FIELD_NUMBER: u32 = 24;
pub const DICT_UNIT_STR_FIELD_NUMBER: u32 = 25;
pub const UNIT_REFS_FIELD_NUMBER: u32 = 26;
81 changes: 79 additions & 2 deletions lib/saluki-components/src/encoders/datadog/metrics/v3/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use super::interner::Interner;
use super::types::{value_type_for_values, V3MetricType, V3ValueType};

const FLAG_NO_INDEX: u64 = 0x100;
const FLAG_HAS_UNIT: u64 = 0x200;

/// Encoded V3 payload data ready for protobuf serialization.
///
Expand All @@ -27,8 +28,9 @@ struct V3EncodedData {
pub dict_resource_name: Vec<i64>,
pub dict_source_type_bytes: Vec<u8>,
pub dict_origin_info: Vec<i32>,
pub dict_unit_bytes: Vec<u8>,

// Per-metric columns (one entry per metric)
// Per-metric columns (one entry per metric, except conditional columns)
pub types: Vec<u64>,
pub names: Vec<i64>,
pub tags: Vec<i64>,
Expand All @@ -37,6 +39,7 @@ struct V3EncodedData {
pub num_points: Vec<u64>,
pub source_type_names: Vec<i64>,
pub origin_infos: Vec<i64>,
pub unit_refs: Vec<i64>, // Present only for metrics with FLAG_HAS_UNIT set.

// Point data (varies per metric based on num_points)
pub timestamps: Vec<i64>,
Expand Down Expand Up @@ -65,6 +68,7 @@ pub struct V3Writer {
resource_interner: Interner<Vec<(i64, i64)>>,
source_type_interner: Interner<String>,
origin_interner: Interner<(i32, i32, i32)>,
unit_interner: Interner<String>,

// Dictionary encoded bytes
dict_name_bytes: Vec<u8>,
Expand All @@ -76,8 +80,9 @@ pub struct V3Writer {
dict_resource_name: Vec<i64>,
dict_source_type_bytes: Vec<u8>,
dict_origin_info: Vec<i32>,
dict_unit_bytes: Vec<u8>,

// Per-metric columns
// Per-metric columns (one entry per metric, except conditional columns)
types: Vec<u64>,
names: Vec<i64>,
tags: Vec<i64>,
Expand All @@ -86,6 +91,7 @@ pub struct V3Writer {
num_points: Vec<u64>,
source_type_names: Vec<i64>,
origin_infos: Vec<i64>,
unit_refs: Vec<i64>, // Present only for metrics with FLAG_HAS_UNIT set.

// Point data
timestamps: Vec<i64>,
Expand Down Expand Up @@ -134,6 +140,7 @@ impl V3Writer {
point_start_idx,
sint64_start_idx,
metric_idx,
unit_ref_idx: None,
}
}

Expand All @@ -144,6 +151,7 @@ impl V3Writer {
delta_encode(&mut self.resources);
delta_encode(&mut self.source_type_names);
delta_encode(&mut self.origin_infos);
delta_encode(&mut self.unit_refs);
delta_encode(&mut self.timestamps);

V3EncodedData {
Expand All @@ -156,6 +164,7 @@ impl V3Writer {
dict_resource_name: self.dict_resource_name,
dict_source_type_bytes: self.dict_source_type_bytes,
dict_origin_info: self.dict_origin_info,
dict_unit_bytes: self.dict_unit_bytes,
types: self.types,
names: self.names,
tags: self.tags,
Expand All @@ -164,6 +173,7 @@ impl V3Writer {
num_points: self.num_points,
source_type_names: self.source_type_names,
origin_infos: self.origin_infos,
unit_refs: self.unit_refs,
timestamps: self.timestamps,
vals_sint64: self.vals_sint64,
vals_float32: self.vals_float32,
Expand Down Expand Up @@ -207,6 +217,9 @@ impl V3Writer {
}

os.write_repeated_packed_int32(DICT_ORIGIN_INFO_FIELD_NUMBER, &data.dict_origin_info)?;
if !data.dict_unit_bytes.is_empty() {
os.write_bytes(DICT_UNIT_STR_FIELD_NUMBER, &data.dict_unit_bytes)?;
}

// Per-metric columns
os.write_repeated_packed_uint64(TYPES_FIELD_NUMBER, &data.types)?;
Expand All @@ -217,6 +230,7 @@ impl V3Writer {
os.write_repeated_packed_uint64(NUM_POINTS_FIELD_NUMBER, &data.num_points)?;
os.write_repeated_packed_sint64(SOURCE_TYPE_NAME_FIELD_NUMBER, &data.source_type_names)?;
os.write_repeated_packed_sint64(ORIGIN_INFO_FIELD_NUMBER, &data.origin_infos)?;
os.write_repeated_packed_sint64(UNIT_REFS_FIELD_NUMBER, &data.unit_refs)?;

// Point data
os.write_repeated_packed_sint64(TIMESTAMPS_FIELD_NUMBER, &data.timestamps)?;
Expand Down Expand Up @@ -363,6 +377,17 @@ impl V3Writer {
}
id
}

fn intern_unit(&mut self, unit: &str) -> i64 {
if unit.is_empty() {
return 0;
}
let (id, is_new) = self.unit_interner.get_or_insert(unit);
if is_new {
append_len_str(&mut self.dict_unit_bytes, unit);
}
id
}
}

/// Builder for a single metric within a V3 payload.
Expand All @@ -374,6 +399,7 @@ pub struct V3MetricBuilder<'a> {
point_start_idx: usize,
sint64_start_idx: usize,
metric_idx: usize,
unit_ref_idx: Option<usize>,
}

impl<'a> V3MetricBuilder<'a> {
Expand Down Expand Up @@ -423,6 +449,26 @@ impl<'a> V3MetricBuilder<'a> {
}
}

/// Sets the unit for this metric.
pub fn set_unit(&mut self, unit: &str) {
if unit.is_empty() {
self.writer.types[self.metric_idx] &= !FLAG_HAS_UNIT;
if let Some(unit_ref_idx) = self.unit_ref_idx.take() {
self.writer.unit_refs.remove(unit_ref_idx);
}
return;
}

let id = self.writer.intern_unit(unit);
if let Some(unit_ref_idx) = self.unit_ref_idx {
self.writer.unit_refs[unit_ref_idx] = id;
} else {
self.unit_ref_idx = Some(self.writer.unit_refs.len());
self.writer.unit_refs.push(id);
}
self.writer.types[self.metric_idx] |= FLAG_HAS_UNIT;
}

/// Adds a data point to this metric.
pub fn add_point(&mut self, timestamp: i64, value: f64) {
self.writer.timestamps.push(timestamp);
Expand Down Expand Up @@ -608,6 +654,37 @@ mod tests {
assert_eq!(data.timestamps.len(), 2);
}

#[test]
fn test_writer_unit() {
let mut writer = V3Writer::new();

{
let mut metric = writer.write(V3MetricType::Gauge, "has.unit");
metric.set_unit("millisecond");
metric.add_point(1000, 42.0);
metric.close();
}
{
let mut metric = writer.write(V3MetricType::Gauge, "no.unit");
metric.add_point(1000, 43.0);
metric.close();
}
{
let mut metric = writer.write(V3MetricType::Gauge, "same.unit");
metric.set_unit("millisecond");
metric.add_point(1000, 44.0);
metric.close();
}

let data = writer.finalize_inner();

assert_eq!(data.unit_refs, vec![1, 0]);
assert_eq!(data.dict_unit_bytes, b"\x0bmillisecond");
assert_eq!(data.types[0] & FLAG_HAS_UNIT, FLAG_HAS_UNIT);
assert_eq!(data.types[1] & FLAG_HAS_UNIT, 0);
assert_eq!(data.types[2] & FLAG_HAS_UNIT, FLAG_HAS_UNIT);
}

#[test]
fn test_writer_multiple_metrics() {
let mut writer = V3Writer::new();
Expand Down
Loading