diff --git a/comp/core/agenttelemetry/impl/agenttelemetry.go b/comp/core/agenttelemetry/impl/agenttelemetry.go index 6ed63f7dd56c..731e20a436af 100644 --- a/comp/core/agenttelemetry/impl/agenttelemetry.go +++ b/comp/core/agenttelemetry/impl/agenttelemetry.go @@ -449,6 +449,38 @@ func (a *atel) transformMetricFamily(p *Profile, mfam *dto.MetricFamily) *agentm } } +// coalesceMetricFamilies merges compatible metric families with the same name. +// +// The regular and default telemetry registries are gathered separately. Coalescing lets profile aggregation see all +// time series together instead of later payload writes overwriting earlier ones in the sender's metric map. +func coalesceMetricFamilies(pms []*telemetry.MetricFamily) []*telemetry.MetricFamily { + mergedByName := make(map[string]*telemetry.MetricFamily, len(pms)) + merged := make([]*telemetry.MetricFamily, 0, len(pms)) + + for _, pm := range pms { + if pm == nil || pm.Name == nil || pm.Type == nil { + merged = append(merged, pm) + continue + } + + name := pm.GetName() + existing := mergedByName[name] + if existing == nil { + mergedByName[name] = pm + merged = append(merged, pm) + continue + } + if existing.GetType() != pm.GetType() { + merged = append(merged, pm) + continue + } + + existing.Metric = append(existing.Metric, pm.Metric...) + } + + return merged +} + func (a *atel) reportAgentMetrics(session *senderSession, pms []*telemetry.MetricFamily, p *Profile) { // If no metrics are configured nothing to report if len(p.metricsMap) == 0 { @@ -495,6 +527,8 @@ func (a *atel) loadPayloads(profiles []*Profile) (*senderSession, error) { a.logComp.Errorf("failed to get filtered telemetry metrics: %v", err) } + pms = coalesceMetricFamilies(pms) + // All metrics stored in the "pms" slice above must follow the format: // __ // The "subsystem" and "name" should be concatenated with a double underscore ("__") separator, diff --git a/comp/core/agenttelemetry/impl/agenttelemetry_test.go b/comp/core/agenttelemetry/impl/agenttelemetry_test.go index df6dfa3eb52a..2158832f1bbe 100644 --- a/comp/core/agenttelemetry/impl/agenttelemetry_test.go +++ b/comp/core/agenttelemetry/impl/agenttelemetry_test.go @@ -2109,6 +2109,48 @@ func TestUsingPayloadCompressionInAgentTelemetrySender(t *testing.T) { assert.True(t, float64(nonCompressBodyLen)/float64(compressBodyLen) > 1.5) } +func TestCoalescesDefaultAndNoDefaultMetricFamiliesBeforeAggregation(t *testing.T) { + var c = ` + agent_telemetry: + enabled: true + profiles: + - name: points + metric: + metrics: + - name: point.sent + aggregate_tags: + - domain + - remote_agent + ` + + // setup and initiate atel + tel := makeTelMock(t) + s := makeSenderImpl(t, nil, c) + r := newRunnerMock() + a := getTestAtel(t, tel, c, s, nil, r) + require.True(t, a.enabled) + + corePointSent := tel.NewGaugeWithOpts("point", "sent", []string{"domain"}, "", telemetry.Options{DefaultMetric: true}) + adpPointSent := tel.NewGaugeWithOpts("point", "sent", []string{"domain", "remote_agent"}, "", telemetry.Options{DefaultMetric: false}) + corePointSent.Set(5, "https://api.datadoghq.com") + adpPointSent.Set(400, "https://api.datadoghq.com", "agent-data-plane") + + metrics, ok := getPayloadFilteredMetricList(a, "point.sent") + require.True(t, ok) + require.Len(t, metrics, 2) + + coreMetric, ok := getPayloadMetricByTagValues(metrics, map[string]interface{}{"domain": "https://api.datadoghq.com"}) + require.True(t, ok) + assert.Equal(t, 5.0, coreMetric.Value) + + adpMetric, ok := getPayloadMetricByTagValues(metrics, map[string]interface{}{ + "domain": "https://api.datadoghq.com", + "remote_agent": "agent-data-plane", + }) + require.True(t, ok) + assert.Equal(t, 400.0, adpMetric.Value) +} + func TestDefaultAndNoDefaultPromRegistries(t *testing.T) { var c = ` agent_telemetry: diff --git a/comp/core/agenttelemetry/impl/config.go b/comp/core/agenttelemetry/impl/config.go index 19b79be975ca..c32a56330e1d 100644 --- a/comp/core/agenttelemetry/impl/config.go +++ b/comp/core/agenttelemetry/impl/config.go @@ -260,7 +260,13 @@ var defaultProfiles = ` - name: logs.auto_multi_line_default_would_truncate - name: logs_destination.destination_workers - name: point.sent + aggregate_tags: + - domain + - remote_agent - name: point.dropped + aggregate_tags: + - domain + - remote_agent - name: transactions.input_count - name: transactions.requeued - name: transactions.retries diff --git a/pkg/collector/corechecks/telemetry/check.go b/pkg/collector/corechecks/telemetry/check.go index c63195708064..40771400c78e 100644 --- a/pkg/collector/corechecks/telemetry/check.go +++ b/pkg/collector/corechecks/telemetry/check.go @@ -38,6 +38,19 @@ func (c *checkImpl) Run() error { return err } + // Remote Agent Registry telemetry lives in the regular registry. Gather it on a best-effort basis so failures there + // do not prevent the customer-facing telemetry check from reporting Core Agent default telemetry values. + var regularMfs []*dto.MetricFamily + if gathered, err := c.telemetry.Gather(false); err != nil { + log.Warnf("failed to gather regular telemetry metrics for default telemetry merge: %v", err) + } else { + regularMfs = gathered + } + + mergeLabelsByMetric := discoverMergeLabels(mfs, regularMfs) + mergedMetrics := collectMergeMetrics(mfs, false, mergeLabelsByMetric) + mergedMetrics.merge(collectMergeMetrics(regularMfs, true, mergeLabelsByMetric)) + sender, err := c.GetSender() if err != nil { return err @@ -45,6 +58,7 @@ func (c *checkImpl) Run() error { sender.SetNoIndex(true) + c.sendMergedMetrics(mergedMetrics, sender) c.handleMetricFamilies(mfs, sender) return nil @@ -52,7 +66,9 @@ func (c *checkImpl) Run() error { func (c *checkImpl) handleMetricFamilies(mfs []*dto.MetricFamily, sender sender.Sender) { for _, mf := range mfs { - if mf.Name == nil || mf.Type == nil || len(mf.Metric) == 0 { + // Merged metrics are emitted explicitly by sendMergedMetrics so overlapping regular-registry values can be included + // without changing customer-facing metric names or tags. + if mf == nil || mf.Name == nil || mf.Type == nil || len(mf.Metric) == 0 || isMergedMetric(mf.GetName()) { continue } diff --git a/pkg/collector/corechecks/telemetry/check_test.go b/pkg/collector/corechecks/telemetry/check_test.go index fa1470ca2bbb..ee19d71f2dc7 100644 --- a/pkg/collector/corechecks/telemetry/check_test.go +++ b/pkg/collector/corechecks/telemetry/check_test.go @@ -9,6 +9,7 @@ import ( "testing" "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" "github.com/stretchr/testify/require" "github.com/DataDog/datadog-agent/comp/core/autodiscovery/integration" @@ -16,6 +17,148 @@ import ( "github.com/DataDog/datadog-agent/pkg/collector/corechecks" ) +const domainLabel = "domain" + +func stringPtr(value string) *string { + return &value +} + +func float64Ptr(value float64) *float64 { + return &value +} + +func gaugeMetric(labels map[string]string, value float64) *dto.Metric { + metric := &dto.Metric{Gauge: &dto.Gauge{Value: float64Ptr(value)}} + for name, value := range labels { + metric.Label = append(metric.Label, &dto.LabelPair{Name: stringPtr(name), Value: stringPtr(value)}) + } + return metric +} + +func gaugeMetricFamily(name string, metrics ...*dto.Metric) *dto.MetricFamily { + metricType := dto.MetricType_GAUGE + return &dto.MetricFamily{ + Name: stringPtr(name), + Type: &metricType, + Metric: metrics, + } +} + +func counterMetricFamily(name string, value float64) *dto.MetricFamily { + metricType := dto.MetricType_COUNTER + return &dto.MetricFamily{ + Name: stringPtr(name), + Type: &metricType, + Metric: []*dto.Metric{{ + Counter: &dto.Counter{Value: float64Ptr(value)}, + }}, + } +} + +func TestCollectAndMergeRegularRegistryMetrics(t *testing.T) { + defaultMfs := []*dto.MetricFamily{ + gaugeMetricFamily( + "point__sent", + gaugeMetric(map[string]string{domainLabel: "https://api.datadoghq.com"}, 10), + gaugeMetric(map[string]string{}, 1), + ), + gaugeMetricFamily( + "point__dropped", + gaugeMetric(map[string]string{domainLabel: "https://api.datadoghq.com"}, 2), + ), + } + remoteMfs := []*dto.MetricFamily{ + gaugeMetricFamily( + "point__sent", + gaugeMetric(map[string]string{ + domainLabel: "https://api.datadoghq.com", + remoteAgentLabel: "agent-data-plane", + }, 12), + gaugeMetric(map[string]string{ + domainLabel: "https://api.datadoghq.eu", + remoteAgentLabel: "other-remote-agent", + }, 5), + gaugeMetric(map[string]string{domainLabel: "https://api.datadoghq.com"}, 100), + ), + gaugeMetricFamily( + "point__dropped", + gaugeMetric(map[string]string{ + domainLabel: "https://api.datadoghq.com", + remoteAgentLabel: "agent-data-plane", + }, 3), + ), + } + + labelsByMetric := discoverMergeLabels(defaultMfs, remoteMfs) + values := collectMergeMetrics(defaultMfs, false, labelsByMetric) + values.merge(collectMergeMetrics(remoteMfs, true, labelsByMetric)) + + require.Equal(t, []string{domainLabel}, labelsByMetric[pointSentMetric]) + require.Equal(t, []string{domainLabel}, labelsByMetric[pointDroppedMetric]) + + sentDefaultDomain := values[pointSentMetric][mergeKey([]string{"domain:https://api.datadoghq.com"})] + require.Equal(t, mergeMetricSample{tags: []string{"domain:https://api.datadoghq.com"}, value: 22}, sentDefaultDomain) + + sentEmptyDomain := values[pointSentMetric][mergeKey([]string{"domain:"})] + require.Equal(t, mergeMetricSample{tags: []string{"domain:"}, value: 1}, sentEmptyDomain) + + sentRemoteOnlyDomain := values[pointSentMetric][mergeKey([]string{"domain:https://api.datadoghq.eu"})] + require.Equal(t, mergeMetricSample{tags: []string{"domain:https://api.datadoghq.eu"}, value: 5}, sentRemoteOnlyDomain) + + droppedDefaultDomain := values[pointDroppedMetric][mergeKey([]string{"domain:https://api.datadoghq.com"})] + require.Equal(t, mergeMetricSample{tags: []string{"domain:https://api.datadoghq.com"}, value: 5}, droppedDefaultDomain) +} + +func TestCollectMergeMetricsSkipsNonGaugeMetrics(t *testing.T) { + mfs := []*dto.MetricFamily{counterMetricFamily(pointSentMetric, 12)} + + values := collectMergeMetrics(mfs, false, map[string][]string{pointSentMetric: {}}) + + require.Empty(t, values) +} + +func TestDiscoverMergeLabelsFallsBackToRegularRegistry(t *testing.T) { + defaultMfs := []*dto.MetricFamily{} + regularMfs := []*dto.MetricFamily{ + gaugeMetricFamily( + pointSentMetric, + gaugeMetric(map[string]string{ + domainLabel: "https://api.datadoghq.com", + remoteAgentLabel: "agent-data-plane", + }, 12), + ), + } + + labelsByMetric := discoverMergeLabels(defaultMfs, regularMfs) + values := collectMergeMetrics(regularMfs, true, labelsByMetric) + + require.Equal(t, []string{domainLabel}, labelsByMetric[pointSentMetric]) + require.Equal(t, mergeMetricSample{ + tags: []string{"domain:https://api.datadoghq.com"}, + value: 12, + }, values[pointSentMetric][mergeKey([]string{"domain:https://api.datadoghq.com"})]) +} + +func TestSendMergedMetrics(t *testing.T) { + sm := mocksender.CreateDefaultDemultiplexer() + c := &checkImpl{CheckBase: corechecks.NewCheckBase(CheckName)} + c.Configure(sm, integration.FakeConfigHash, nil, nil, "test", "provider") + + s := mocksender.NewMockSenderWithSenderManager(c.ID(), sm) + s.On("Gauge", "datadog.agent.point.sent", 22.0, "", []string{"domain:https://api.datadoghq.com"}).Return().Times(1) + s.On("Gauge", "datadog.agent.point.sent", 1.0, "", []string{"domain:"}).Return().Times(1) + s.On("Gauge", "datadog.agent.point.dropped", 5.0, "", []string{"domain:https://api.datadoghq.com"}).Return().Times(1) + + values := newMergeMetricValues() + values.add(pointSentMetric, []string{"domain:"}, 1) + values.add(pointSentMetric, []string{"domain:https://api.datadoghq.com"}, 22) + values.add(pointDroppedMetric, []string{"domain:https://api.datadoghq.com"}, 5) + + c.sendMergedMetrics(values, s) + + s.AssertExpectations(t) +} + func TestCheck(t *testing.T) { reg := prometheus.NewRegistry() diff --git a/pkg/collector/corechecks/telemetry/merge.go b/pkg/collector/corechecks/telemetry/merge.go new file mode 100644 index 000000000000..3305439afbc3 --- /dev/null +++ b/pkg/collector/corechecks/telemetry/merge.go @@ -0,0 +1,181 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package telemetry + +import ( + "slices" + "strings" + + "github.com/DataDog/datadog-agent/pkg/aggregator/sender" + "github.com/DataDog/datadog-agent/pkg/util/log" + dto "github.com/prometheus/client_model/go" +) + +const ( + remoteAgentLabel = "remote_agent" + + pointSentMetric = "point__sent" + pointDroppedMetric = "point__dropped" +) + +// regularRegistryMergeMetrics is intentionally small and explicit: regular registry telemetry is internal by default. +// Some remote agents, such as ADP, emit telemetry that overlaps with Core Agent default telemetry; only metrics listed +// here are folded into customer-facing datadog.agent.* telemetry. +var regularRegistryMergeMetrics = []string{pointSentMetric, pointDroppedMetric} + +type mergeMetricSample struct { + tags []string + value float64 +} + +// mergeMetricValues stores merge candidates by metric name and customer-facing tag set. +type mergeMetricValues map[string]map[string]mergeMetricSample + +func newMergeMetricValues() mergeMetricValues { + return make(mergeMetricValues, len(regularRegistryMergeMetrics)) +} + +// mergeKey builds a stable key for a customer-facing tag set. +func mergeKey(tags []string) string { + return strings.Join(tags, "\xff") +} + +// add accumulates a metric value into the bucket identified by metric name and tag set. +func (m mergeMetricValues) add(metricName string, tags []string, value float64) { + byKey := m[metricName] + if byKey == nil { + byKey = make(map[string]mergeMetricSample) + m[metricName] = byKey + } + + key := mergeKey(tags) + sample := byKey[key] + if sample.tags == nil { + sample.tags = tags + } + sample.value += value + byKey[key] = sample +} + +// merge folds another merge set into the receiver. +func (m mergeMetricValues) merge(other mergeMetricValues) { + for metricName, otherByKey := range other { + for _, sample := range otherByKey { + m.add(metricName, sample.tags, sample.value) + } + } +} + +func labelValue(labels []*dto.LabelPair, name string) (string, bool) { + for _, label := range labels { + if label.GetName() == name { + return label.GetValue(), true + } + } + return "", false +} + +// isMergedMetric returns true when a default-registry metric should be emitted through the explicit merge path. +func isMergedMetric(name string) bool { + return slices.Contains(regularRegistryMergeMetrics, name) +} + +// mergeLabelNames returns the sorted label names used by the given metric family, excluding remote_agent. +func mergeLabelNames(mfs []*dto.MetricFamily, metricName string) []string { + labelNames := make(map[string]struct{}) + for _, mf := range mfs { + if mf == nil || mf.GetName() != metricName { + continue + } + for _, metric := range mf.Metric { + if metric == nil { + continue + } + for _, label := range metric.Label { + name := label.GetName() + if name == "" || name == remoteAgentLabel { + continue + } + labelNames[name] = struct{}{} + } + } + } + + names := make([]string, 0, len(labelNames)) + for name := range labelNames { + names = append(names, name) + } + slices.Sort(names) + return names +} + +// discoverMergeLabels determines the customer-facing tag shape for each merged metric. +// +// The default registry is preferred because it defines the existing customer-facing metric shape. If the default +// registry has no samples yet, regular-registry labels are used as a fallback while still dropping remote_agent. +func discoverMergeLabels(defaultMfs, regularMfs []*dto.MetricFamily) map[string][]string { + labelsByMetric := make(map[string][]string, len(regularRegistryMergeMetrics)) + for _, metricName := range regularRegistryMergeMetrics { + labels := mergeLabelNames(defaultMfs, metricName) + if len(labels) == 0 { + labels = mergeLabelNames(regularMfs, metricName) + } + labelsByMetric[metricName] = labels + } + return labelsByMetric +} + +// mergeTags builds customer-facing tags from the selected label names, using an empty value for missing labels. +func mergeTags(labels []*dto.LabelPair, labelNames []string) []string { + tags := make([]string, 0, len(labelNames)) + for _, labelName := range labelNames { + value, _ := labelValue(labels, labelName) + tags = append(tags, labelName+":"+value) + } + return tags +} + +// collectMergeMetrics extracts allowlisted gauge metrics and aggregates them by the discovered customer-facing tags. +// +// When requireRemoteAgent is true, only series with a remote_agent label are collected. This prevents unrelated regular +// registry series from being folded into customer-facing telemetry. +func collectMergeMetrics(mfs []*dto.MetricFamily, requireRemoteAgent bool, labelsByMetric map[string][]string) mergeMetricValues { + values := newMergeMetricValues() + + for _, mf := range mfs { + if mf == nil || mf.Name == nil || mf.Type == nil || !isMergedMetric(mf.GetName()) { + continue + } + + if mf.GetType() != dto.MetricType_GAUGE { + log.Warnf("dropping telemetry merge metric %q with unsupported type %s", mf.GetName(), mf.GetType()) + continue + } + + for _, metric := range mf.Metric { + if metric == nil || metric.Gauge == nil { + continue + } + if requireRemoteAgent { + if _, ok := labelValue(metric.Label, remoteAgentLabel); !ok { + continue + } + } + values.add(mf.GetName(), mergeTags(metric.Label, labelsByMetric[mf.GetName()]), metric.Gauge.GetValue()) + } + } + + return values +} + +// sendMergedMetrics emits metrics that combine default-registry values with overlapping regular-registry values. +func (c *checkImpl) sendMergedMetrics(values mergeMetricValues, sender sender.Sender) { + for _, metricName := range regularRegistryMergeMetrics { + for _, sample := range values[metricName] { + sender.Gauge(c.buildName(metricName), sample.value, "", sample.tags) + } + } +}