From 88c2f26d645602ec8e451bc74209daa1b1ea6df1 Mon Sep 17 00:00:00 2001 From: Travis Thieman Date: Wed, 13 May 2026 15:02:53 +0200 Subject: [PATCH 1/7] DADP-71 Add remote point telemetry to Agent telemetry --- comp/core/agenttelemetry/impl/config.go | 6 + pkg/collector/corechecks/telemetry/check.go | 114 +++++++++++++++++- .../corechecks/telemetry/check_test.go | 94 +++++++++++++++ 3 files changed, 213 insertions(+), 1 deletion(-) diff --git a/comp/core/agenttelemetry/impl/config.go b/comp/core/agenttelemetry/impl/config.go index 19b79be975ca..c32a56330e1d 100644 --- a/comp/core/agenttelemetry/impl/config.go +++ b/comp/core/agenttelemetry/impl/config.go @@ -260,7 +260,13 @@ var defaultProfiles = ` - name: logs.auto_multi_line_default_would_truncate - name: logs_destination.destination_workers - name: point.sent + aggregate_tags: + - domain + - remote_agent - name: point.dropped + aggregate_tags: + - domain + - remote_agent - name: transactions.input_count - name: transactions.requeued - name: transactions.retries diff --git a/pkg/collector/corechecks/telemetry/check.go b/pkg/collector/corechecks/telemetry/check.go index c63195708064..db67099b6bc6 100644 --- a/pkg/collector/corechecks/telemetry/check.go +++ b/pkg/collector/corechecks/telemetry/check.go @@ -25,6 +25,13 @@ const ( // CheckName is the name of the check CheckName = "telemetry" prefix = "datadog.agent." + + domainLabel = "domain" + remoteAgentLabel = "remote_agent" + pointSentName = "point_sent" + pointDroppedName = "point_dropped" + pointSentMetric = "point__sent" + pointDroppedMetric = "point__dropped" ) type checkImpl struct { @@ -38,6 +45,14 @@ func (c *checkImpl) Run() error { return err } + pointTelemetry := collectPointTelemetry(mfs, false) + remoteMfs, err := c.telemetry.Gather(false) + if err != nil { + log.Warnf("failed to gather remote agent telemetry metrics for point telemetry merge: %v", err) + } else { + mergePointTelemetry(pointTelemetry, collectPointTelemetry(remoteMfs, true)) + } + sender, err := c.GetSender() if err != nil { return err @@ -45,14 +60,111 @@ func (c *checkImpl) Run() error { sender.SetNoIndex(true) + c.sendPointTelemetry(pointTelemetry, sender) c.handleMetricFamilies(mfs, sender) return nil } +func normalizeMetricName(name string) string { + return strings.ReplaceAll(name, "__", "_") +} + +func isPointTelemetryMetric(name string) bool { + switch normalizeMetricName(name) { + case pointSentName, pointDroppedName: + return true + default: + return false + } +} + +func pointTelemetryMetricName(name string) string { + switch normalizeMetricName(name) { + case pointSentName: + return pointSentMetric + case pointDroppedName: + return pointDroppedMetric + default: + return name + } +} + +func labelValue(labels []*dto.LabelPair, name string) (string, bool) { + for _, label := range labels { + if label.GetName() == name { + return label.GetValue(), true + } + } + return "", false +} + +type pointTelemetryByDomain map[string]map[string]float64 + +func collectPointTelemetry(mfs []*dto.MetricFamily, requireRemoteAgent bool) pointTelemetryByDomain { + points := make(pointTelemetryByDomain) + + for _, mf := range mfs { + if mf == nil || mf.Name == nil || mf.Type == nil || !isPointTelemetryMetric(mf.GetName()) { + continue + } + if mf.GetType() != dto.MetricType_GAUGE { + log.Warnf("dropping point telemetry metric %q with unsupported type %s", mf.GetName(), mf.GetType()) + continue + } + + metricName := pointTelemetryMetricName(mf.GetName()) + for _, metric := range mf.Metric { + if metric == nil || metric.Gauge == nil { + continue + } + if requireRemoteAgent { + if _, ok := labelValue(metric.Label, remoteAgentLabel); !ok { + continue + } + } + domain, ok := labelValue(metric.Label, domainLabel) + if !ok { + continue + } + + byDomain := points[metricName] + if byDomain == nil { + byDomain = make(map[string]float64) + points[metricName] = byDomain + } + byDomain[domain] += metric.Gauge.GetValue() + } + } + + return points +} + +func mergePointTelemetry(dst, src pointTelemetryByDomain) { + for metricName, srcByDomain := range src { + dstByDomain := dst[metricName] + if dstByDomain == nil { + dstByDomain = make(map[string]float64) + dst[metricName] = dstByDomain + } + for domain, value := range srcByDomain { + dstByDomain[domain] += value + } + } +} + +func (c *checkImpl) sendPointTelemetry(points pointTelemetryByDomain, sender sender.Sender) { + for metricName, byDomain := range points { + name := c.buildName(metricName) + for domain, value := range byDomain { + sender.Gauge(name, value, "", []string{fmt.Sprintf("%s:%s", domainLabel, domain)}) + } + } +} + func (c *checkImpl) handleMetricFamilies(mfs []*dto.MetricFamily, sender sender.Sender) { for _, mf := range mfs { - if mf.Name == nil || mf.Type == nil || len(mf.Metric) == 0 { + if mf.Name == nil || mf.Type == nil || len(mf.Metric) == 0 || isPointTelemetryMetric(mf.GetName()) { continue } diff --git a/pkg/collector/corechecks/telemetry/check_test.go b/pkg/collector/corechecks/telemetry/check_test.go index fa1470ca2bbb..1d2afa4a61ec 100644 --- a/pkg/collector/corechecks/telemetry/check_test.go +++ b/pkg/collector/corechecks/telemetry/check_test.go @@ -9,6 +9,7 @@ import ( "testing" "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" "github.com/stretchr/testify/require" "github.com/DataDog/datadog-agent/comp/core/autodiscovery/integration" @@ -16,6 +17,99 @@ import ( "github.com/DataDog/datadog-agent/pkg/collector/corechecks" ) +func stringPtr(value string) *string { + return &value +} + +func float64Ptr(value float64) *float64 { + return &value +} + +func gaugeMetric(labels map[string]string, value float64) *dto.Metric { + metric := &dto.Metric{Gauge: &dto.Gauge{Value: float64Ptr(value)}} + for name, value := range labels { + metric.Label = append(metric.Label, &dto.LabelPair{Name: stringPtr(name), Value: stringPtr(value)}) + } + return metric +} + +func gaugeMetricFamily(name string, metrics ...*dto.Metric) *dto.MetricFamily { + metricType := dto.MetricType_GAUGE + return &dto.MetricFamily{ + Name: stringPtr(name), + Type: &metricType, + Metric: metrics, + } +} + +func TestCollectAndMergePointTelemetry(t *testing.T) { + defaultMfs := []*dto.MetricFamily{ + gaugeMetricFamily( + "point__sent", + gaugeMetric(map[string]string{domainLabel: "https://api.datadoghq.com"}, 10), + ), + gaugeMetricFamily( + "point__dropped", + gaugeMetric(map[string]string{domainLabel: "https://api.datadoghq.com"}, 2), + ), + } + remoteMfs := []*dto.MetricFamily{ + gaugeMetricFamily( + "point__sent", + gaugeMetric(map[string]string{ + domainLabel: "https://api.datadoghq.com", + remoteAgentLabel: "agent-data-plane", + }, 12), + gaugeMetric(map[string]string{ + domainLabel: "https://api.datadoghq.eu", + remoteAgentLabel: "other-remote-agent", + }, 5), + gaugeMetric(map[string]string{domainLabel: "https://api.datadoghq.com"}, 100), + ), + gaugeMetricFamily( + "point__dropped", + gaugeMetric(map[string]string{ + domainLabel: "https://api.datadoghq.com", + remoteAgentLabel: "agent-data-plane", + }, 3), + ), + } + + points := collectPointTelemetry(defaultMfs, false) + mergePointTelemetry(points, collectPointTelemetry(remoteMfs, true)) + + require.Equal(t, pointTelemetryByDomain{ + pointSentMetric: { + "https://api.datadoghq.com": 22, + "https://api.datadoghq.eu": 5, + }, + pointDroppedMetric: { + "https://api.datadoghq.com": 5, + }, + }, points) +} + +func TestSendPointTelemetry(t *testing.T) { + sm := mocksender.CreateDefaultDemultiplexer() + c := &checkImpl{CheckBase: corechecks.NewCheckBase(CheckName)} + c.Configure(sm, integration.FakeConfigHash, nil, nil, "test", "provider") + + s := mocksender.NewMockSenderWithSenderManager(c.ID(), sm) + s.On("Gauge", "datadog.agent.point.sent", 22.0, "", []string{"domain:https://api.datadoghq.com"}).Return().Times(1) + s.On("Gauge", "datadog.agent.point.dropped", 5.0, "", []string{"domain:https://api.datadoghq.com"}).Return().Times(1) + + c.sendPointTelemetry(pointTelemetryByDomain{ + pointSentMetric: { + "https://api.datadoghq.com": 22, + }, + pointDroppedMetric: { + "https://api.datadoghq.com": 5, + }, + }, s) + + s.AssertExpectations(t) +} + func TestCheck(t *testing.T) { reg := prometheus.NewRegistry() From 90975832b8b0153bfe2c21a54e8d9adfb2151815 Mon Sep 17 00:00:00 2001 From: Travis Thieman Date: Wed, 13 May 2026 16:20:35 +0200 Subject: [PATCH 2/7] DADP-71 Simplify point telemetry merge --- pkg/collector/corechecks/telemetry/check.go | 96 ++++++++----------- .../corechecks/telemetry/check_test.go | 14 ++- 2 files changed, 50 insertions(+), 60 deletions(-) diff --git a/pkg/collector/corechecks/telemetry/check.go b/pkg/collector/corechecks/telemetry/check.go index db67099b6bc6..cc1e67458385 100644 --- a/pkg/collector/corechecks/telemetry/check.go +++ b/pkg/collector/corechecks/telemetry/check.go @@ -26,10 +26,9 @@ const ( CheckName = "telemetry" prefix = "datadog.agent." - domainLabel = "domain" - remoteAgentLabel = "remote_agent" - pointSentName = "point_sent" - pointDroppedName = "point_dropped" + domainLabel = "domain" + remoteAgentLabel = "remote_agent" + pointSentMetric = "point__sent" pointDroppedMetric = "point__dropped" ) @@ -50,7 +49,7 @@ func (c *checkImpl) Run() error { if err != nil { log.Warnf("failed to gather remote agent telemetry metrics for point telemetry merge: %v", err) } else { - mergePointTelemetry(pointTelemetry, collectPointTelemetry(remoteMfs, true)) + pointTelemetry.merge(collectPointTelemetry(remoteMfs, true)) } sender, err := c.GetSender() @@ -66,27 +65,24 @@ func (c *checkImpl) Run() error { return nil } -func normalizeMetricName(name string) string { - return strings.ReplaceAll(name, "__", "_") +type pointTelemetryByDomain struct { + sent map[string]float64 + dropped map[string]float64 } -func isPointTelemetryMetric(name string) bool { - switch normalizeMetricName(name) { - case pointSentName, pointDroppedName: - return true - default: - return false +func newPointTelemetryByDomain() pointTelemetryByDomain { + return pointTelemetryByDomain{ + sent: make(map[string]float64), + dropped: make(map[string]float64), } } -func pointTelemetryMetricName(name string) string { - switch normalizeMetricName(name) { - case pointSentName: - return pointSentMetric - case pointDroppedName: - return pointDroppedMetric - default: - return name +func (p pointTelemetryByDomain) merge(other pointTelemetryByDomain) { + for domain, value := range other.sent { + p.sent[domain] += value + } + for domain, value := range other.dropped { + p.dropped[domain] += value } } @@ -99,21 +95,29 @@ func labelValue(labels []*dto.LabelPair, name string) (string, bool) { return "", false } -type pointTelemetryByDomain map[string]map[string]float64 - func collectPointTelemetry(mfs []*dto.MetricFamily, requireRemoteAgent bool) pointTelemetryByDomain { - points := make(pointTelemetryByDomain) + points := newPointTelemetryByDomain() for _, mf := range mfs { - if mf == nil || mf.Name == nil || mf.Type == nil || !isPointTelemetryMetric(mf.GetName()) { + if mf == nil || mf.Name == nil || mf.Type == nil { continue } + + var pointsByDomain map[string]float64 + switch mf.GetName() { + case pointSentMetric: + pointsByDomain = points.sent + case pointDroppedMetric: + pointsByDomain = points.dropped + default: + continue + } + if mf.GetType() != dto.MetricType_GAUGE { log.Warnf("dropping point telemetry metric %q with unsupported type %s", mf.GetName(), mf.GetType()) continue } - metricName := pointTelemetryMetricName(mf.GetName()) for _, metric := range mf.Metric { if metric == nil || metric.Gauge == nil { continue @@ -123,42 +127,20 @@ func collectPointTelemetry(mfs []*dto.MetricFamily, requireRemoteAgent bool) poi continue } } - domain, ok := labelValue(metric.Label, domainLabel) - if !ok { - continue - } - - byDomain := points[metricName] - if byDomain == nil { - byDomain = make(map[string]float64) - points[metricName] = byDomain - } - byDomain[domain] += metric.Gauge.GetValue() + domain, _ := labelValue(metric.Label, domainLabel) + pointsByDomain[domain] += metric.Gauge.GetValue() } } return points } -func mergePointTelemetry(dst, src pointTelemetryByDomain) { - for metricName, srcByDomain := range src { - dstByDomain := dst[metricName] - if dstByDomain == nil { - dstByDomain = make(map[string]float64) - dst[metricName] = dstByDomain - } - for domain, value := range srcByDomain { - dstByDomain[domain] += value - } - } -} - func (c *checkImpl) sendPointTelemetry(points pointTelemetryByDomain, sender sender.Sender) { - for metricName, byDomain := range points { - name := c.buildName(metricName) - for domain, value := range byDomain { - sender.Gauge(name, value, "", []string{fmt.Sprintf("%s:%s", domainLabel, domain)}) - } + for domain, value := range points.sent { + sender.Gauge(c.buildName(pointSentMetric), value, "", []string{fmt.Sprintf("%s:%s", domainLabel, domain)}) + } + for domain, value := range points.dropped { + sender.Gauge(c.buildName(pointDroppedMetric), value, "", []string{fmt.Sprintf("%s:%s", domainLabel, domain)}) } } @@ -197,6 +179,10 @@ func (c *checkImpl) handleMetricFamilies(mfs []*dto.MetricFamily, sender sender. sender.Commit() } +func isPointTelemetryMetric(name string) bool { + return name == pointSentMetric || name == pointDroppedMetric +} + func (c *checkImpl) buildName(name string) string { return prefix + strings.ReplaceAll(name, "__", ".") } diff --git a/pkg/collector/corechecks/telemetry/check_test.go b/pkg/collector/corechecks/telemetry/check_test.go index 1d2afa4a61ec..2cd3fd656b71 100644 --- a/pkg/collector/corechecks/telemetry/check_test.go +++ b/pkg/collector/corechecks/telemetry/check_test.go @@ -47,6 +47,7 @@ func TestCollectAndMergePointTelemetry(t *testing.T) { gaugeMetricFamily( "point__sent", gaugeMetric(map[string]string{domainLabel: "https://api.datadoghq.com"}, 10), + gaugeMetric(map[string]string{}, 1), ), gaugeMetricFamily( "point__dropped", @@ -76,14 +77,15 @@ func TestCollectAndMergePointTelemetry(t *testing.T) { } points := collectPointTelemetry(defaultMfs, false) - mergePointTelemetry(points, collectPointTelemetry(remoteMfs, true)) + points.merge(collectPointTelemetry(remoteMfs, true)) require.Equal(t, pointTelemetryByDomain{ - pointSentMetric: { + sent: map[string]float64{ + "": 1, "https://api.datadoghq.com": 22, "https://api.datadoghq.eu": 5, }, - pointDroppedMetric: { + dropped: map[string]float64{ "https://api.datadoghq.com": 5, }, }, points) @@ -96,13 +98,15 @@ func TestSendPointTelemetry(t *testing.T) { s := mocksender.NewMockSenderWithSenderManager(c.ID(), sm) s.On("Gauge", "datadog.agent.point.sent", 22.0, "", []string{"domain:https://api.datadoghq.com"}).Return().Times(1) + s.On("Gauge", "datadog.agent.point.sent", 1.0, "", []string{"domain:"}).Return().Times(1) s.On("Gauge", "datadog.agent.point.dropped", 5.0, "", []string{"domain:https://api.datadoghq.com"}).Return().Times(1) c.sendPointTelemetry(pointTelemetryByDomain{ - pointSentMetric: { + sent: map[string]float64{ + "": 1, "https://api.datadoghq.com": 22, }, - pointDroppedMetric: { + dropped: map[string]float64{ "https://api.datadoghq.com": 5, }, }, s) From b95d0feee181ec509a3e640a06e39fe54a800524 Mon Sep 17 00:00:00 2001 From: Travis Thieman Date: Wed, 13 May 2026 16:36:26 +0200 Subject: [PATCH 3/7] DADP-71 Generalize telemetry registry merge --- pkg/collector/corechecks/telemetry/check.go | 112 +++++++++++------- .../corechecks/telemetry/check_test.go | 22 ++-- 2 files changed, 81 insertions(+), 53 deletions(-) diff --git a/pkg/collector/corechecks/telemetry/check.go b/pkg/collector/corechecks/telemetry/check.go index cc1e67458385..08d719b78e9a 100644 --- a/pkg/collector/corechecks/telemetry/check.go +++ b/pkg/collector/corechecks/telemetry/check.go @@ -33,6 +33,21 @@ const ( pointDroppedMetric = "point__dropped" ) +// regularRegistryMergeMetric describes a gauge metric from the regular telemetry registry that should be folded into +// the customer-facing default telemetry output. The metric is aggregated by groupByLabel so the emitted tag shape stays +// compatible with the existing default metric. +type regularRegistryMergeMetric struct { + name string + groupByLabel string +} + +// regularRegistryMergeMetrics is intentionally small and explicit: regular registry telemetry is internal by default, +// and only metrics listed here are merged into customer-facing datadog.agent.* telemetry. +var regularRegistryMergeMetrics = []regularRegistryMergeMetric{ + {name: pointSentMetric, groupByLabel: domainLabel}, + {name: pointDroppedMetric, groupByLabel: domainLabel}, +} + type checkImpl struct { corechecks.CheckBase telemetry telemetry.Component @@ -44,12 +59,15 @@ func (c *checkImpl) Run() error { return err } - pointTelemetry := collectPointTelemetry(mfs, false) - remoteMfs, err := c.telemetry.Gather(false) + mergedMetrics := collectMergeMetrics(mfs, false) + + // Remote Agent Registry telemetry lives in the regular registry. Gather it on a best-effort basis so failures there + // do not prevent the default customer-facing telemetry check from reporting Core Agent values. + regularMfs, err := c.telemetry.Gather(false) if err != nil { - log.Warnf("failed to gather remote agent telemetry metrics for point telemetry merge: %v", err) + log.Warnf("failed to gather regular telemetry metrics for default telemetry merge: %v", err) } else { - pointTelemetry.merge(collectPointTelemetry(remoteMfs, true)) + mergedMetrics.merge(collectMergeMetrics(regularMfs, true)) } sender, err := c.GetSender() @@ -59,30 +77,28 @@ func (c *checkImpl) Run() error { sender.SetNoIndex(true) - c.sendPointTelemetry(pointTelemetry, sender) + c.sendMergedMetrics(mergedMetrics, sender) c.handleMetricFamilies(mfs, sender) return nil } -type pointTelemetryByDomain struct { - sent map[string]float64 - dropped map[string]float64 -} +type mergeMetricValues map[string]map[string]float64 -func newPointTelemetryByDomain() pointTelemetryByDomain { - return pointTelemetryByDomain{ - sent: make(map[string]float64), - dropped: make(map[string]float64), - } +func newMergeMetricValues() mergeMetricValues { + return make(mergeMetricValues) } -func (p pointTelemetryByDomain) merge(other pointTelemetryByDomain) { - for domain, value := range other.sent { - p.sent[domain] += value - } - for domain, value := range other.dropped { - p.dropped[domain] += value +func (m mergeMetricValues) merge(other mergeMetricValues) { + for metricName, otherByGroup := range other { + byGroup := m[metricName] + if byGroup == nil { + byGroup = make(map[string]float64) + m[metricName] = byGroup + } + for groupValue, value := range otherByGroup { + byGroup[groupValue] += value + } } } @@ -95,26 +111,30 @@ func labelValue(labels []*dto.LabelPair, name string) (string, bool) { return "", false } -func collectPointTelemetry(mfs []*dto.MetricFamily, requireRemoteAgent bool) pointTelemetryByDomain { - points := newPointTelemetryByDomain() +func mergeMetricConfig(name string) (regularRegistryMergeMetric, bool) { + for _, metric := range regularRegistryMergeMetrics { + if metric.name == name { + return metric, true + } + } + return regularRegistryMergeMetric{}, false +} + +func collectMergeMetrics(mfs []*dto.MetricFamily, requireRemoteAgent bool) mergeMetricValues { + values := newMergeMetricValues() for _, mf := range mfs { if mf == nil || mf.Name == nil || mf.Type == nil { continue } - var pointsByDomain map[string]float64 - switch mf.GetName() { - case pointSentMetric: - pointsByDomain = points.sent - case pointDroppedMetric: - pointsByDomain = points.dropped - default: + mergeMetric, ok := mergeMetricConfig(mf.GetName()) + if !ok { continue } if mf.GetType() != dto.MetricType_GAUGE { - log.Warnf("dropping point telemetry metric %q with unsupported type %s", mf.GetName(), mf.GetType()) + log.Warnf("dropping telemetry merge metric %q with unsupported type %s", mf.GetName(), mf.GetType()) continue } @@ -127,26 +147,33 @@ func collectPointTelemetry(mfs []*dto.MetricFamily, requireRemoteAgent bool) poi continue } } - domain, _ := labelValue(metric.Label, domainLabel) - pointsByDomain[domain] += metric.Gauge.GetValue() + groupValue, _ := labelValue(metric.Label, mergeMetric.groupByLabel) + byGroup := values[mergeMetric.name] + if byGroup == nil { + byGroup = make(map[string]float64) + values[mergeMetric.name] = byGroup + } + byGroup[groupValue] += metric.Gauge.GetValue() } } - return points + return values } -func (c *checkImpl) sendPointTelemetry(points pointTelemetryByDomain, sender sender.Sender) { - for domain, value := range points.sent { - sender.Gauge(c.buildName(pointSentMetric), value, "", []string{fmt.Sprintf("%s:%s", domainLabel, domain)}) - } - for domain, value := range points.dropped { - sender.Gauge(c.buildName(pointDroppedMetric), value, "", []string{fmt.Sprintf("%s:%s", domainLabel, domain)}) +func (c *checkImpl) sendMergedMetrics(values mergeMetricValues, sender sender.Sender) { + for _, mergeMetric := range regularRegistryMergeMetrics { + byGroup := values[mergeMetric.name] + for groupValue, value := range byGroup { + sender.Gauge(c.buildName(mergeMetric.name), value, "", []string{fmt.Sprintf("%s:%s", mergeMetric.groupByLabel, groupValue)}) + } } } func (c *checkImpl) handleMetricFamilies(mfs []*dto.MetricFamily, sender sender.Sender) { for _, mf := range mfs { - if mf.Name == nil || mf.Type == nil || len(mf.Metric) == 0 || isPointTelemetryMetric(mf.GetName()) { + // Merged metrics are emitted explicitly by sendMergedMetrics so regular-registry values can be included without + // changing the customer-facing metric names or tags. + if mf == nil || mf.Name == nil || mf.Type == nil || len(mf.Metric) == 0 || isMergedMetric(mf.GetName()) { continue } @@ -179,8 +206,9 @@ func (c *checkImpl) handleMetricFamilies(mfs []*dto.MetricFamily, sender sender. sender.Commit() } -func isPointTelemetryMetric(name string) bool { - return name == pointSentMetric || name == pointDroppedMetric +func isMergedMetric(name string) bool { + _, ok := mergeMetricConfig(name) + return ok } func (c *checkImpl) buildName(name string) string { diff --git a/pkg/collector/corechecks/telemetry/check_test.go b/pkg/collector/corechecks/telemetry/check_test.go index 2cd3fd656b71..7e8db4211840 100644 --- a/pkg/collector/corechecks/telemetry/check_test.go +++ b/pkg/collector/corechecks/telemetry/check_test.go @@ -42,7 +42,7 @@ func gaugeMetricFamily(name string, metrics ...*dto.Metric) *dto.MetricFamily { } } -func TestCollectAndMergePointTelemetry(t *testing.T) { +func TestCollectAndMergeRegularRegistryMetrics(t *testing.T) { defaultMfs := []*dto.MetricFamily{ gaugeMetricFamily( "point__sent", @@ -76,22 +76,22 @@ func TestCollectAndMergePointTelemetry(t *testing.T) { ), } - points := collectPointTelemetry(defaultMfs, false) - points.merge(collectPointTelemetry(remoteMfs, true)) + values := collectMergeMetrics(defaultMfs, false) + values.merge(collectMergeMetrics(remoteMfs, true)) - require.Equal(t, pointTelemetryByDomain{ - sent: map[string]float64{ + require.Equal(t, mergeMetricValues{ + pointSentMetric: { "": 1, "https://api.datadoghq.com": 22, "https://api.datadoghq.eu": 5, }, - dropped: map[string]float64{ + pointDroppedMetric: { "https://api.datadoghq.com": 5, }, - }, points) + }, values) } -func TestSendPointTelemetry(t *testing.T) { +func TestSendMergedMetrics(t *testing.T) { sm := mocksender.CreateDefaultDemultiplexer() c := &checkImpl{CheckBase: corechecks.NewCheckBase(CheckName)} c.Configure(sm, integration.FakeConfigHash, nil, nil, "test", "provider") @@ -101,12 +101,12 @@ func TestSendPointTelemetry(t *testing.T) { s.On("Gauge", "datadog.agent.point.sent", 1.0, "", []string{"domain:"}).Return().Times(1) s.On("Gauge", "datadog.agent.point.dropped", 5.0, "", []string{"domain:https://api.datadoghq.com"}).Return().Times(1) - c.sendPointTelemetry(pointTelemetryByDomain{ - sent: map[string]float64{ + c.sendMergedMetrics(mergeMetricValues{ + pointSentMetric: { "": 1, "https://api.datadoghq.com": 22, }, - dropped: map[string]float64{ + pointDroppedMetric: { "https://api.datadoghq.com": 5, }, }, s) From 2861fbede103da5593bbc665f1364618cf674f7e Mon Sep 17 00:00:00 2001 From: Travis Thieman Date: Thu, 14 May 2026 10:22:22 +0200 Subject: [PATCH 4/7] DADP-71 Coalesce Agent telemetry metric families --- .../agenttelemetry/impl/agenttelemetry.go | 33 +++++++++++++++ .../impl/agenttelemetry_test.go | 42 +++++++++++++++++++ 2 files changed, 75 insertions(+) diff --git a/comp/core/agenttelemetry/impl/agenttelemetry.go b/comp/core/agenttelemetry/impl/agenttelemetry.go index 6ed63f7dd56c..c10b8c1f878d 100644 --- a/comp/core/agenttelemetry/impl/agenttelemetry.go +++ b/comp/core/agenttelemetry/impl/agenttelemetry.go @@ -449,6 +449,37 @@ func (a *atel) transformMetricFamily(p *Profile, mfam *dto.MetricFamily) *agentm } } +func coalesceMetricFamilies(pms []*telemetry.MetricFamily) []*telemetry.MetricFamily { + mergedByName := make(map[string]*telemetry.MetricFamily) + merged := make([]*telemetry.MetricFamily, 0, len(pms)) + + for _, pm := range pms { + if pm == nil || pm.Name == nil || pm.Type == nil { + merged = append(merged, pm) + continue + } + + name := pm.GetName() + existing := mergedByName[name] + if existing == nil { + mergedByName[name] = pm + merged = append(merged, pm) + continue + } + if existing.GetType() != pm.GetType() { + merged = append(merged, pm) + continue + } + + // The regular and default telemetry registries are gathered separately. Coalesce compatible metric families with + // the same name so profile aggregation sees all time series together instead of later payload writes overwriting + // earlier ones in the sender's metric map. + existing.Metric = append(existing.Metric, pm.Metric...) + } + + return merged +} + func (a *atel) reportAgentMetrics(session *senderSession, pms []*telemetry.MetricFamily, p *Profile) { // If no metrics are configured nothing to report if len(p.metricsMap) == 0 { @@ -495,6 +526,8 @@ func (a *atel) loadPayloads(profiles []*Profile) (*senderSession, error) { a.logComp.Errorf("failed to get filtered telemetry metrics: %v", err) } + pms = coalesceMetricFamilies(pms) + // All metrics stored in the "pms" slice above must follow the format: // __ // The "subsystem" and "name" should be concatenated with a double underscore ("__") separator, diff --git a/comp/core/agenttelemetry/impl/agenttelemetry_test.go b/comp/core/agenttelemetry/impl/agenttelemetry_test.go index df6dfa3eb52a..2158832f1bbe 100644 --- a/comp/core/agenttelemetry/impl/agenttelemetry_test.go +++ b/comp/core/agenttelemetry/impl/agenttelemetry_test.go @@ -2109,6 +2109,48 @@ func TestUsingPayloadCompressionInAgentTelemetrySender(t *testing.T) { assert.True(t, float64(nonCompressBodyLen)/float64(compressBodyLen) > 1.5) } +func TestCoalescesDefaultAndNoDefaultMetricFamiliesBeforeAggregation(t *testing.T) { + var c = ` + agent_telemetry: + enabled: true + profiles: + - name: points + metric: + metrics: + - name: point.sent + aggregate_tags: + - domain + - remote_agent + ` + + // setup and initiate atel + tel := makeTelMock(t) + s := makeSenderImpl(t, nil, c) + r := newRunnerMock() + a := getTestAtel(t, tel, c, s, nil, r) + require.True(t, a.enabled) + + corePointSent := tel.NewGaugeWithOpts("point", "sent", []string{"domain"}, "", telemetry.Options{DefaultMetric: true}) + adpPointSent := tel.NewGaugeWithOpts("point", "sent", []string{"domain", "remote_agent"}, "", telemetry.Options{DefaultMetric: false}) + corePointSent.Set(5, "https://api.datadoghq.com") + adpPointSent.Set(400, "https://api.datadoghq.com", "agent-data-plane") + + metrics, ok := getPayloadFilteredMetricList(a, "point.sent") + require.True(t, ok) + require.Len(t, metrics, 2) + + coreMetric, ok := getPayloadMetricByTagValues(metrics, map[string]interface{}{"domain": "https://api.datadoghq.com"}) + require.True(t, ok) + assert.Equal(t, 5.0, coreMetric.Value) + + adpMetric, ok := getPayloadMetricByTagValues(metrics, map[string]interface{}{ + "domain": "https://api.datadoghq.com", + "remote_agent": "agent-data-plane", + }) + require.True(t, ok) + assert.Equal(t, 400.0, adpMetric.Value) +} + func TestDefaultAndNoDefaultPromRegistries(t *testing.T) { var c = ` agent_telemetry: From 14c6e6ef249c6dc5ac7ea5b5c657ebb39e2ce993 Mon Sep 17 00:00:00 2001 From: Travis Thieman Date: Thu, 14 May 2026 12:16:00 +0200 Subject: [PATCH 5/7] DADP-71 Infer telemetry merge labels --- pkg/collector/corechecks/telemetry/check.go | 160 +++++++++++------- .../corechecks/telemetry/check_test.go | 68 +++++--- 2 files changed, 147 insertions(+), 81 deletions(-) diff --git a/pkg/collector/corechecks/telemetry/check.go b/pkg/collector/corechecks/telemetry/check.go index 08d719b78e9a..69b5acc41415 100644 --- a/pkg/collector/corechecks/telemetry/check.go +++ b/pkg/collector/corechecks/telemetry/check.go @@ -9,6 +9,7 @@ package telemetry import ( "fmt" + "slices" "strings" dto "github.com/prometheus/client_model/go" @@ -33,20 +34,10 @@ const ( pointDroppedMetric = "point__dropped" ) -// regularRegistryMergeMetric describes a gauge metric from the regular telemetry registry that should be folded into -// the customer-facing default telemetry output. The metric is aggregated by groupByLabel so the emitted tag shape stays -// compatible with the existing default metric. -type regularRegistryMergeMetric struct { - name string - groupByLabel string -} - -// regularRegistryMergeMetrics is intentionally small and explicit: regular registry telemetry is internal by default, -// and only metrics listed here are merged into customer-facing datadog.agent.* telemetry. -var regularRegistryMergeMetrics = []regularRegistryMergeMetric{ - {name: pointSentMetric, groupByLabel: domainLabel}, - {name: pointDroppedMetric, groupByLabel: domainLabel}, -} +// regularRegistryMergeMetrics is intentionally small and explicit: regular registry telemetry is internal by default. +// Some remote agents, such as ADP, emit telemetry that overlaps with Core Agent default telemetry; only metrics listed +// here are folded into customer-facing datadog.agent.* telemetry. +var regularRegistryMergeMetrics = []string{pointSentMetric, pointDroppedMetric} type checkImpl struct { corechecks.CheckBase @@ -59,17 +50,19 @@ func (c *checkImpl) Run() error { return err } - mergedMetrics := collectMergeMetrics(mfs, false) - // Remote Agent Registry telemetry lives in the regular registry. Gather it on a best-effort basis so failures there - // do not prevent the default customer-facing telemetry check from reporting Core Agent values. - regularMfs, err := c.telemetry.Gather(false) - if err != nil { + // do not prevent the customer-facing telemetry check from reporting Core Agent default telemetry values. + var regularMfs []*dto.MetricFamily + if gathered, err := c.telemetry.Gather(false); err != nil { log.Warnf("failed to gather regular telemetry metrics for default telemetry merge: %v", err) } else { - mergedMetrics.merge(collectMergeMetrics(regularMfs, true)) + regularMfs = gathered } + mergeLabelsByMetric := discoverMergeLabels(mfs, regularMfs) + mergedMetrics := collectMergeMetrics(mfs, false, mergeLabelsByMetric) + mergedMetrics.merge(collectMergeMetrics(regularMfs, true, mergeLabelsByMetric)) + sender, err := c.GetSender() if err != nil { return err @@ -83,21 +76,41 @@ func (c *checkImpl) Run() error { return nil } -type mergeMetricValues map[string]map[string]float64 +type mergeMetricSample struct { + tags []string + value float64 +} + +type mergeMetricValues map[string]map[string]mergeMetricSample func newMergeMetricValues() mergeMetricValues { return make(mergeMetricValues) } +func mergeKey(tags []string) string { + return strings.Join(tags, "\xff") +} + +func (m mergeMetricValues) add(metricName string, tags []string, value float64) { + byKey := m[metricName] + if byKey == nil { + byKey = make(map[string]mergeMetricSample) + m[metricName] = byKey + } + + key := mergeKey(tags) + sample := byKey[key] + if sample.tags == nil { + sample.tags = tags + } + sample.value += value + byKey[key] = sample +} + func (m mergeMetricValues) merge(other mergeMetricValues) { - for metricName, otherByGroup := range other { - byGroup := m[metricName] - if byGroup == nil { - byGroup = make(map[string]float64) - m[metricName] = byGroup - } - for groupValue, value := range otherByGroup { - byGroup[groupValue] += value + for metricName, otherByKey := range other { + for _, sample := range otherByKey { + m.add(metricName, sample.tags, sample.value) } } } @@ -111,25 +124,66 @@ func labelValue(labels []*dto.LabelPair, name string) (string, bool) { return "", false } -func mergeMetricConfig(name string) (regularRegistryMergeMetric, bool) { - for _, metric := range regularRegistryMergeMetrics { - if metric.name == name { - return metric, true +func isMergedMetric(name string) bool { + return slices.Contains(regularRegistryMergeMetrics, name) +} + +func mergeLabelNames(mfs []*dto.MetricFamily, metricName string) []string { + labelNames := make(map[string]struct{}) + for _, mf := range mfs { + if mf == nil || mf.GetName() != metricName { + continue + } + for _, metric := range mf.Metric { + if metric == nil { + continue + } + for _, label := range metric.Label { + name := label.GetName() + if name == "" || name == remoteAgentLabel { + continue + } + labelNames[name] = struct{}{} + } + } + } + + names := make([]string, 0, len(labelNames)) + for name := range labelNames { + names = append(names, name) + } + slices.Sort(names) + return names +} + +func discoverMergeLabels(defaultMfs, regularMfs []*dto.MetricFamily) map[string][]string { + labelsByMetric := make(map[string][]string, len(regularRegistryMergeMetrics)) + for _, metricName := range regularRegistryMergeMetrics { + labels := mergeLabelNames(defaultMfs, metricName) + if len(labels) == 0 { + // Prefer the default registry's label shape for compatibility. If it has no samples yet, fall back to the + // regular registry while still dropping remote_agent so customer-facing tags do not include attribution. + labels = mergeLabelNames(regularMfs, metricName) } + labelsByMetric[metricName] = labels + } + return labelsByMetric +} + +func mergeTags(labels []*dto.LabelPair, labelNames []string) []string { + tags := make([]string, 0, len(labelNames)) + for _, labelName := range labelNames { + value, _ := labelValue(labels, labelName) + tags = append(tags, fmt.Sprintf("%s:%s", labelName, value)) } - return regularRegistryMergeMetric{}, false + return tags } -func collectMergeMetrics(mfs []*dto.MetricFamily, requireRemoteAgent bool) mergeMetricValues { +func collectMergeMetrics(mfs []*dto.MetricFamily, requireRemoteAgent bool, labelsByMetric map[string][]string) mergeMetricValues { values := newMergeMetricValues() for _, mf := range mfs { - if mf == nil || mf.Name == nil || mf.Type == nil { - continue - } - - mergeMetric, ok := mergeMetricConfig(mf.GetName()) - if !ok { + if mf == nil || mf.Name == nil || mf.Type == nil || !isMergedMetric(mf.GetName()) { continue } @@ -147,13 +201,7 @@ func collectMergeMetrics(mfs []*dto.MetricFamily, requireRemoteAgent bool) merge continue } } - groupValue, _ := labelValue(metric.Label, mergeMetric.groupByLabel) - byGroup := values[mergeMetric.name] - if byGroup == nil { - byGroup = make(map[string]float64) - values[mergeMetric.name] = byGroup - } - byGroup[groupValue] += metric.Gauge.GetValue() + values.add(mf.GetName(), mergeTags(metric.Label, labelsByMetric[mf.GetName()]), metric.Gauge.GetValue()) } } @@ -161,18 +209,17 @@ func collectMergeMetrics(mfs []*dto.MetricFamily, requireRemoteAgent bool) merge } func (c *checkImpl) sendMergedMetrics(values mergeMetricValues, sender sender.Sender) { - for _, mergeMetric := range regularRegistryMergeMetrics { - byGroup := values[mergeMetric.name] - for groupValue, value := range byGroup { - sender.Gauge(c.buildName(mergeMetric.name), value, "", []string{fmt.Sprintf("%s:%s", mergeMetric.groupByLabel, groupValue)}) + for _, metricName := range regularRegistryMergeMetrics { + for _, sample := range values[metricName] { + sender.Gauge(c.buildName(metricName), sample.value, "", sample.tags) } } } func (c *checkImpl) handleMetricFamilies(mfs []*dto.MetricFamily, sender sender.Sender) { for _, mf := range mfs { - // Merged metrics are emitted explicitly by sendMergedMetrics so regular-registry values can be included without - // changing the customer-facing metric names or tags. + // Merged metrics are emitted explicitly by sendMergedMetrics so overlapping regular-registry values can be included + // without changing customer-facing metric names or tags. if mf == nil || mf.Name == nil || mf.Type == nil || len(mf.Metric) == 0 || isMergedMetric(mf.GetName()) { continue } @@ -206,11 +253,6 @@ func (c *checkImpl) handleMetricFamilies(mfs []*dto.MetricFamily, sender sender. sender.Commit() } -func isMergedMetric(name string) bool { - _, ok := mergeMetricConfig(name) - return ok -} - func (c *checkImpl) buildName(name string) string { return prefix + strings.ReplaceAll(name, "__", ".") } diff --git a/pkg/collector/corechecks/telemetry/check_test.go b/pkg/collector/corechecks/telemetry/check_test.go index 7e8db4211840..112dfd3717ff 100644 --- a/pkg/collector/corechecks/telemetry/check_test.go +++ b/pkg/collector/corechecks/telemetry/check_test.go @@ -76,19 +76,46 @@ func TestCollectAndMergeRegularRegistryMetrics(t *testing.T) { ), } - values := collectMergeMetrics(defaultMfs, false) - values.merge(collectMergeMetrics(remoteMfs, true)) - - require.Equal(t, mergeMetricValues{ - pointSentMetric: { - "": 1, - "https://api.datadoghq.com": 22, - "https://api.datadoghq.eu": 5, - }, - pointDroppedMetric: { - "https://api.datadoghq.com": 5, - }, - }, values) + labelsByMetric := discoverMergeLabels(defaultMfs, remoteMfs) + values := collectMergeMetrics(defaultMfs, false, labelsByMetric) + values.merge(collectMergeMetrics(remoteMfs, true, labelsByMetric)) + + require.Equal(t, []string{domainLabel}, labelsByMetric[pointSentMetric]) + require.Equal(t, []string{domainLabel}, labelsByMetric[pointDroppedMetric]) + + sentDefaultDomain := values[pointSentMetric][mergeKey([]string{"domain:https://api.datadoghq.com"})] + require.Equal(t, mergeMetricSample{tags: []string{"domain:https://api.datadoghq.com"}, value: 22}, sentDefaultDomain) + + sentEmptyDomain := values[pointSentMetric][mergeKey([]string{"domain:"})] + require.Equal(t, mergeMetricSample{tags: []string{"domain:"}, value: 1}, sentEmptyDomain) + + sentRemoteOnlyDomain := values[pointSentMetric][mergeKey([]string{"domain:https://api.datadoghq.eu"})] + require.Equal(t, mergeMetricSample{tags: []string{"domain:https://api.datadoghq.eu"}, value: 5}, sentRemoteOnlyDomain) + + droppedDefaultDomain := values[pointDroppedMetric][mergeKey([]string{"domain:https://api.datadoghq.com"})] + require.Equal(t, mergeMetricSample{tags: []string{"domain:https://api.datadoghq.com"}, value: 5}, droppedDefaultDomain) +} + +func TestDiscoverMergeLabelsFallsBackToRegularRegistry(t *testing.T) { + defaultMfs := []*dto.MetricFamily{} + regularMfs := []*dto.MetricFamily{ + gaugeMetricFamily( + pointSentMetric, + gaugeMetric(map[string]string{ + domainLabel: "https://api.datadoghq.com", + remoteAgentLabel: "agent-data-plane", + }, 12), + ), + } + + labelsByMetric := discoverMergeLabels(defaultMfs, regularMfs) + values := collectMergeMetrics(regularMfs, true, labelsByMetric) + + require.Equal(t, []string{domainLabel}, labelsByMetric[pointSentMetric]) + require.Equal(t, mergeMetricSample{ + tags: []string{"domain:https://api.datadoghq.com"}, + value: 12, + }, values[pointSentMetric][mergeKey([]string{"domain:https://api.datadoghq.com"})]) } func TestSendMergedMetrics(t *testing.T) { @@ -101,15 +128,12 @@ func TestSendMergedMetrics(t *testing.T) { s.On("Gauge", "datadog.agent.point.sent", 1.0, "", []string{"domain:"}).Return().Times(1) s.On("Gauge", "datadog.agent.point.dropped", 5.0, "", []string{"domain:https://api.datadoghq.com"}).Return().Times(1) - c.sendMergedMetrics(mergeMetricValues{ - pointSentMetric: { - "": 1, - "https://api.datadoghq.com": 22, - }, - pointDroppedMetric: { - "https://api.datadoghq.com": 5, - }, - }, s) + values := newMergeMetricValues() + values.add(pointSentMetric, []string{"domain:"}, 1) + values.add(pointSentMetric, []string{"domain:https://api.datadoghq.com"}, 22) + values.add(pointDroppedMetric, []string{"domain:https://api.datadoghq.com"}, 5) + + c.sendMergedMetrics(values, s) s.AssertExpectations(t) } From a846511b6de4c689fd5ef80e4845d49c1689e77a Mon Sep 17 00:00:00 2001 From: Travis Thieman Date: Thu, 14 May 2026 12:34:20 +0200 Subject: [PATCH 6/7] DADP-71 Organize telemetry merge helpers --- .../agenttelemetry/impl/agenttelemetry.go | 7 +- pkg/collector/corechecks/telemetry/check.go | 152 --------------- .../corechecks/telemetry/check_test.go | 21 ++ pkg/collector/corechecks/telemetry/merge.go | 182 ++++++++++++++++++ 4 files changed, 207 insertions(+), 155 deletions(-) create mode 100644 pkg/collector/corechecks/telemetry/merge.go diff --git a/comp/core/agenttelemetry/impl/agenttelemetry.go b/comp/core/agenttelemetry/impl/agenttelemetry.go index c10b8c1f878d..bef9e4fd6c5f 100644 --- a/comp/core/agenttelemetry/impl/agenttelemetry.go +++ b/comp/core/agenttelemetry/impl/agenttelemetry.go @@ -449,6 +449,10 @@ func (a *atel) transformMetricFamily(p *Profile, mfam *dto.MetricFamily) *agentm } } +// coalesceMetricFamilies merges compatible metric families with the same name. +// +// The regular and default telemetry registries are gathered separately. Coalescing lets profile aggregation see all +// time series together instead of later payload writes overwriting earlier ones in the sender's metric map. func coalesceMetricFamilies(pms []*telemetry.MetricFamily) []*telemetry.MetricFamily { mergedByName := make(map[string]*telemetry.MetricFamily) merged := make([]*telemetry.MetricFamily, 0, len(pms)) @@ -471,9 +475,6 @@ func coalesceMetricFamilies(pms []*telemetry.MetricFamily) []*telemetry.MetricFa continue } - // The regular and default telemetry registries are gathered separately. Coalesce compatible metric families with - // the same name so profile aggregation sees all time series together instead of later payload writes overwriting - // earlier ones in the sender's metric map. existing.Metric = append(existing.Metric, pm.Metric...) } diff --git a/pkg/collector/corechecks/telemetry/check.go b/pkg/collector/corechecks/telemetry/check.go index 69b5acc41415..40771400c78e 100644 --- a/pkg/collector/corechecks/telemetry/check.go +++ b/pkg/collector/corechecks/telemetry/check.go @@ -9,7 +9,6 @@ package telemetry import ( "fmt" - "slices" "strings" dto "github.com/prometheus/client_model/go" @@ -26,19 +25,8 @@ const ( // CheckName is the name of the check CheckName = "telemetry" prefix = "datadog.agent." - - domainLabel = "domain" - remoteAgentLabel = "remote_agent" - - pointSentMetric = "point__sent" - pointDroppedMetric = "point__dropped" ) -// regularRegistryMergeMetrics is intentionally small and explicit: regular registry telemetry is internal by default. -// Some remote agents, such as ADP, emit telemetry that overlaps with Core Agent default telemetry; only metrics listed -// here are folded into customer-facing datadog.agent.* telemetry. -var regularRegistryMergeMetrics = []string{pointSentMetric, pointDroppedMetric} - type checkImpl struct { corechecks.CheckBase telemetry telemetry.Component @@ -76,146 +64,6 @@ func (c *checkImpl) Run() error { return nil } -type mergeMetricSample struct { - tags []string - value float64 -} - -type mergeMetricValues map[string]map[string]mergeMetricSample - -func newMergeMetricValues() mergeMetricValues { - return make(mergeMetricValues) -} - -func mergeKey(tags []string) string { - return strings.Join(tags, "\xff") -} - -func (m mergeMetricValues) add(metricName string, tags []string, value float64) { - byKey := m[metricName] - if byKey == nil { - byKey = make(map[string]mergeMetricSample) - m[metricName] = byKey - } - - key := mergeKey(tags) - sample := byKey[key] - if sample.tags == nil { - sample.tags = tags - } - sample.value += value - byKey[key] = sample -} - -func (m mergeMetricValues) merge(other mergeMetricValues) { - for metricName, otherByKey := range other { - for _, sample := range otherByKey { - m.add(metricName, sample.tags, sample.value) - } - } -} - -func labelValue(labels []*dto.LabelPair, name string) (string, bool) { - for _, label := range labels { - if label.GetName() == name { - return label.GetValue(), true - } - } - return "", false -} - -func isMergedMetric(name string) bool { - return slices.Contains(regularRegistryMergeMetrics, name) -} - -func mergeLabelNames(mfs []*dto.MetricFamily, metricName string) []string { - labelNames := make(map[string]struct{}) - for _, mf := range mfs { - if mf == nil || mf.GetName() != metricName { - continue - } - for _, metric := range mf.Metric { - if metric == nil { - continue - } - for _, label := range metric.Label { - name := label.GetName() - if name == "" || name == remoteAgentLabel { - continue - } - labelNames[name] = struct{}{} - } - } - } - - names := make([]string, 0, len(labelNames)) - for name := range labelNames { - names = append(names, name) - } - slices.Sort(names) - return names -} - -func discoverMergeLabels(defaultMfs, regularMfs []*dto.MetricFamily) map[string][]string { - labelsByMetric := make(map[string][]string, len(regularRegistryMergeMetrics)) - for _, metricName := range regularRegistryMergeMetrics { - labels := mergeLabelNames(defaultMfs, metricName) - if len(labels) == 0 { - // Prefer the default registry's label shape for compatibility. If it has no samples yet, fall back to the - // regular registry while still dropping remote_agent so customer-facing tags do not include attribution. - labels = mergeLabelNames(regularMfs, metricName) - } - labelsByMetric[metricName] = labels - } - return labelsByMetric -} - -func mergeTags(labels []*dto.LabelPair, labelNames []string) []string { - tags := make([]string, 0, len(labelNames)) - for _, labelName := range labelNames { - value, _ := labelValue(labels, labelName) - tags = append(tags, fmt.Sprintf("%s:%s", labelName, value)) - } - return tags -} - -func collectMergeMetrics(mfs []*dto.MetricFamily, requireRemoteAgent bool, labelsByMetric map[string][]string) mergeMetricValues { - values := newMergeMetricValues() - - for _, mf := range mfs { - if mf == nil || mf.Name == nil || mf.Type == nil || !isMergedMetric(mf.GetName()) { - continue - } - - if mf.GetType() != dto.MetricType_GAUGE { - log.Warnf("dropping telemetry merge metric %q with unsupported type %s", mf.GetName(), mf.GetType()) - continue - } - - for _, metric := range mf.Metric { - if metric == nil || metric.Gauge == nil { - continue - } - if requireRemoteAgent { - if _, ok := labelValue(metric.Label, remoteAgentLabel); !ok { - continue - } - } - values.add(mf.GetName(), mergeTags(metric.Label, labelsByMetric[mf.GetName()]), metric.Gauge.GetValue()) - } - } - - return values -} - -func (c *checkImpl) sendMergedMetrics(values mergeMetricValues, sender sender.Sender) { - for _, metricName := range regularRegistryMergeMetrics { - for _, sample := range values[metricName] { - sender.Gauge(c.buildName(metricName), sample.value, "", sample.tags) - } - } -} - func (c *checkImpl) handleMetricFamilies(mfs []*dto.MetricFamily, sender sender.Sender) { for _, mf := range mfs { // Merged metrics are emitted explicitly by sendMergedMetrics so overlapping regular-registry values can be included diff --git a/pkg/collector/corechecks/telemetry/check_test.go b/pkg/collector/corechecks/telemetry/check_test.go index 112dfd3717ff..ee19d71f2dc7 100644 --- a/pkg/collector/corechecks/telemetry/check_test.go +++ b/pkg/collector/corechecks/telemetry/check_test.go @@ -17,6 +17,8 @@ import ( "github.com/DataDog/datadog-agent/pkg/collector/corechecks" ) +const domainLabel = "domain" + func stringPtr(value string) *string { return &value } @@ -42,6 +44,17 @@ func gaugeMetricFamily(name string, metrics ...*dto.Metric) *dto.MetricFamily { } } +func counterMetricFamily(name string, value float64) *dto.MetricFamily { + metricType := dto.MetricType_COUNTER + return &dto.MetricFamily{ + Name: stringPtr(name), + Type: &metricType, + Metric: []*dto.Metric{{ + Counter: &dto.Counter{Value: float64Ptr(value)}, + }}, + } +} + func TestCollectAndMergeRegularRegistryMetrics(t *testing.T) { defaultMfs := []*dto.MetricFamily{ gaugeMetricFamily( @@ -96,6 +109,14 @@ func TestCollectAndMergeRegularRegistryMetrics(t *testing.T) { require.Equal(t, mergeMetricSample{tags: []string{"domain:https://api.datadoghq.com"}, value: 5}, droppedDefaultDomain) } +func TestCollectMergeMetricsSkipsNonGaugeMetrics(t *testing.T) { + mfs := []*dto.MetricFamily{counterMetricFamily(pointSentMetric, 12)} + + values := collectMergeMetrics(mfs, false, map[string][]string{pointSentMetric: {}}) + + require.Empty(t, values) +} + func TestDiscoverMergeLabelsFallsBackToRegularRegistry(t *testing.T) { defaultMfs := []*dto.MetricFamily{} regularMfs := []*dto.MetricFamily{ diff --git a/pkg/collector/corechecks/telemetry/merge.go b/pkg/collector/corechecks/telemetry/merge.go new file mode 100644 index 000000000000..6e5da9c12bc4 --- /dev/null +++ b/pkg/collector/corechecks/telemetry/merge.go @@ -0,0 +1,182 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package telemetry + +import ( + "fmt" + "slices" + "strings" + + "github.com/DataDog/datadog-agent/pkg/aggregator/sender" + "github.com/DataDog/datadog-agent/pkg/util/log" + dto "github.com/prometheus/client_model/go" +) + +const ( + remoteAgentLabel = "remote_agent" + + pointSentMetric = "point__sent" + pointDroppedMetric = "point__dropped" +) + +// regularRegistryMergeMetrics is intentionally small and explicit: regular registry telemetry is internal by default. +// Some remote agents, such as ADP, emit telemetry that overlaps with Core Agent default telemetry; only metrics listed +// here are folded into customer-facing datadog.agent.* telemetry. +var regularRegistryMergeMetrics = []string{pointSentMetric, pointDroppedMetric} + +type mergeMetricSample struct { + tags []string + value float64 +} + +// mergeMetricValues stores merge candidates by metric name and customer-facing tag set. +type mergeMetricValues map[string]map[string]mergeMetricSample + +func newMergeMetricValues() mergeMetricValues { + return make(mergeMetricValues) +} + +// mergeKey builds a stable key for a customer-facing tag set. +func mergeKey(tags []string) string { + return strings.Join(tags, "\xff") +} + +// add accumulates a metric value into the bucket identified by metric name and tag set. +func (m mergeMetricValues) add(metricName string, tags []string, value float64) { + byKey := m[metricName] + if byKey == nil { + byKey = make(map[string]mergeMetricSample) + m[metricName] = byKey + } + + key := mergeKey(tags) + sample := byKey[key] + if sample.tags == nil { + sample.tags = tags + } + sample.value += value + byKey[key] = sample +} + +// merge folds another merge set into the receiver. +func (m mergeMetricValues) merge(other mergeMetricValues) { + for metricName, otherByKey := range other { + for _, sample := range otherByKey { + m.add(metricName, sample.tags, sample.value) + } + } +} + +func labelValue(labels []*dto.LabelPair, name string) (string, bool) { + for _, label := range labels { + if label.GetName() == name { + return label.GetValue(), true + } + } + return "", false +} + +// isMergedMetric returns true when a default-registry metric should be emitted through the explicit merge path. +func isMergedMetric(name string) bool { + return slices.Contains(regularRegistryMergeMetrics, name) +} + +// mergeLabelNames returns the sorted label names used by the given metric family, excluding remote_agent. +func mergeLabelNames(mfs []*dto.MetricFamily, metricName string) []string { + labelNames := make(map[string]struct{}) + for _, mf := range mfs { + if mf == nil || mf.GetName() != metricName { + continue + } + for _, metric := range mf.Metric { + if metric == nil { + continue + } + for _, label := range metric.Label { + name := label.GetName() + if name == "" || name == remoteAgentLabel { + continue + } + labelNames[name] = struct{}{} + } + } + } + + names := make([]string, 0, len(labelNames)) + for name := range labelNames { + names = append(names, name) + } + slices.Sort(names) + return names +} + +// discoverMergeLabels determines the customer-facing tag shape for each merged metric. +// +// The default registry is preferred because it defines the existing customer-facing metric shape. If the default +// registry has no samples yet, regular-registry labels are used as a fallback while still dropping remote_agent. +func discoverMergeLabels(defaultMfs, regularMfs []*dto.MetricFamily) map[string][]string { + labelsByMetric := make(map[string][]string, len(regularRegistryMergeMetrics)) + for _, metricName := range regularRegistryMergeMetrics { + labels := mergeLabelNames(defaultMfs, metricName) + if len(labels) == 0 { + labels = mergeLabelNames(regularMfs, metricName) + } + labelsByMetric[metricName] = labels + } + return labelsByMetric +} + +// mergeTags builds customer-facing tags from the selected label names, using an empty value for missing labels. +func mergeTags(labels []*dto.LabelPair, labelNames []string) []string { + tags := make([]string, 0, len(labelNames)) + for _, labelName := range labelNames { + value, _ := labelValue(labels, labelName) + tags = append(tags, fmt.Sprintf("%s:%s", labelName, value)) + } + return tags +} + +// collectMergeMetrics extracts allowlisted gauge metrics and aggregates them by the discovered customer-facing tags. +// +// When requireRemoteAgent is true, only series with a remote_agent label are collected. This prevents unrelated regular +// registry series from being folded into customer-facing telemetry. +func collectMergeMetrics(mfs []*dto.MetricFamily, requireRemoteAgent bool, labelsByMetric map[string][]string) mergeMetricValues { + values := newMergeMetricValues() + + for _, mf := range mfs { + if mf == nil || mf.Name == nil || mf.Type == nil || !isMergedMetric(mf.GetName()) { + continue + } + + if mf.GetType() != dto.MetricType_GAUGE { + log.Warnf("dropping telemetry merge metric %q with unsupported type %s", mf.GetName(), mf.GetType()) + continue + } + + for _, metric := range mf.Metric { + if metric == nil || metric.Gauge == nil { + continue + } + if requireRemoteAgent { + if _, ok := labelValue(metric.Label, remoteAgentLabel); !ok { + continue + } + } + values.add(mf.GetName(), mergeTags(metric.Label, labelsByMetric[mf.GetName()]), metric.Gauge.GetValue()) + } + } + + return values +} + +// sendMergedMetrics emits metrics that combine default-registry values with overlapping regular-registry values. +func (c *checkImpl) sendMergedMetrics(values mergeMetricValues, sender sender.Sender) { + for _, metricName := range regularRegistryMergeMetrics { + for _, sample := range values[metricName] { + sender.Gauge(c.buildName(metricName), sample.value, "", sample.tags) + } + } +} From 4537ed7625fd92f1f89d4b2dab7bf23fb98bbe59 Mon Sep 17 00:00:00 2001 From: Travis Thieman Date: Thu, 14 May 2026 12:58:13 +0200 Subject: [PATCH 7/7] DADP-71 Refine telemetry merge helpers --- comp/core/agenttelemetry/impl/agenttelemetry.go | 2 +- pkg/collector/corechecks/telemetry/merge.go | 5 ++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/comp/core/agenttelemetry/impl/agenttelemetry.go b/comp/core/agenttelemetry/impl/agenttelemetry.go index bef9e4fd6c5f..731e20a436af 100644 --- a/comp/core/agenttelemetry/impl/agenttelemetry.go +++ b/comp/core/agenttelemetry/impl/agenttelemetry.go @@ -454,7 +454,7 @@ func (a *atel) transformMetricFamily(p *Profile, mfam *dto.MetricFamily) *agentm // The regular and default telemetry registries are gathered separately. Coalescing lets profile aggregation see all // time series together instead of later payload writes overwriting earlier ones in the sender's metric map. func coalesceMetricFamilies(pms []*telemetry.MetricFamily) []*telemetry.MetricFamily { - mergedByName := make(map[string]*telemetry.MetricFamily) + mergedByName := make(map[string]*telemetry.MetricFamily, len(pms)) merged := make([]*telemetry.MetricFamily, 0, len(pms)) for _, pm := range pms { diff --git a/pkg/collector/corechecks/telemetry/merge.go b/pkg/collector/corechecks/telemetry/merge.go index 6e5da9c12bc4..3305439afbc3 100644 --- a/pkg/collector/corechecks/telemetry/merge.go +++ b/pkg/collector/corechecks/telemetry/merge.go @@ -6,7 +6,6 @@ package telemetry import ( - "fmt" "slices" "strings" @@ -36,7 +35,7 @@ type mergeMetricSample struct { type mergeMetricValues map[string]map[string]mergeMetricSample func newMergeMetricValues() mergeMetricValues { - return make(mergeMetricValues) + return make(mergeMetricValues, len(regularRegistryMergeMetrics)) } // mergeKey builds a stable key for a customer-facing tag set. @@ -134,7 +133,7 @@ func mergeTags(labels []*dto.LabelPair, labelNames []string) []string { tags := make([]string, 0, len(labelNames)) for _, labelName := range labelNames { value, _ := labelValue(labels, labelName) - tags = append(tags, fmt.Sprintf("%s:%s", labelName, value)) + tags = append(tags, labelName+":"+value) } return tags }