Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions comp/core/agenttelemetry/impl/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,13 @@ var defaultProfiles = `
- name: logs.auto_multi_line_default_would_truncate
- name: logs_destination.destination_workers
- name: point.sent
aggregate_tags:
- domain
- remote_agent
- name: point.dropped
aggregate_tags:
- domain
- remote_agent
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is these will change only the COAT version of the metrics to start including these two tags. Since COAT is internal, strict compatibility on the shape of these metrics is not required

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should not be a problem

- name: transactions.input_count
- name: transactions.requeued
- name: transactions.retries
Expand Down
114 changes: 113 additions & 1 deletion pkg/collector/corechecks/telemetry/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@ const (
// CheckName is the name of the check
CheckName = "telemetry"
prefix = "datadog.agent."

domainLabel = "domain"
remoteAgentLabel = "remote_agent"
pointSentName = "point_sent"
pointDroppedName = "point_dropped"
pointSentMetric = "point__sent"
pointDroppedMetric = "point__dropped"
)

type checkImpl struct {
Expand All @@ -38,21 +45,126 @@ func (c *checkImpl) Run() error {
return err
}

pointTelemetry := collectPointTelemetry(mfs, false)
remoteMfs, err := c.telemetry.Gather(false)
if err != nil {
log.Warnf("failed to gather remote agent telemetry metrics for point telemetry merge: %v", err)
} else {
mergePointTelemetry(pointTelemetry, collectPointTelemetry(remoteMfs, true))
}

sender, err := c.GetSender()
if err != nil {
return err
}

sender.SetNoIndex(true)

c.sendPointTelemetry(pointTelemetry, sender)
c.handleMetricFamilies(mfs, sender)

return nil
}

func normalizeMetricName(name string) string {
return strings.ReplaceAll(name, "__", "_")
}

func isPointTelemetryMetric(name string) bool {
switch normalizeMetricName(name) {
case pointSentName, pointDroppedName:
return true
default:
return false
}
}

func pointTelemetryMetricName(name string) string {
switch normalizeMetricName(name) {
case pointSentName:
return pointSentMetric
case pointDroppedName:
return pointDroppedMetric
default:
return name
}
}

func labelValue(labels []*dto.LabelPair, name string) (string, bool) {
for _, label := range labels {
if label.GetName() == name {
return label.GetValue(), true
}
}
return "", false
}

type pointTelemetryByDomain map[string]map[string]float64

func collectPointTelemetry(mfs []*dto.MetricFamily, requireRemoteAgent bool) pointTelemetryByDomain {
points := make(pointTelemetryByDomain)

for _, mf := range mfs {
if mf == nil || mf.Name == nil || mf.Type == nil || !isPointTelemetryMetric(mf.GetName()) {
continue
}
if mf.GetType() != dto.MetricType_GAUGE {
log.Warnf("dropping point telemetry metric %q with unsupported type %s", mf.GetName(), mf.GetType())
continue
}

metricName := pointTelemetryMetricName(mf.GetName())
for _, metric := range mf.Metric {
if metric == nil || metric.Gauge == nil {
continue
}
if requireRemoteAgent {
if _, ok := labelValue(metric.Label, remoteAgentLabel); !ok {
continue
}
}
domain, ok := labelValue(metric.Label, domainLabel)
if !ok {
continue
}

byDomain := points[metricName]
if byDomain == nil {
byDomain = make(map[string]float64)
points[metricName] = byDomain
}
byDomain[domain] += metric.Gauge.GetValue()
}
}

return points
}

func mergePointTelemetry(dst, src pointTelemetryByDomain) {
for metricName, srcByDomain := range src {
dstByDomain := dst[metricName]
if dstByDomain == nil {
dstByDomain = make(map[string]float64)
dst[metricName] = dstByDomain
}
for domain, value := range srcByDomain {
dstByDomain[domain] += value
}
}
}

func (c *checkImpl) sendPointTelemetry(points pointTelemetryByDomain, sender sender.Sender) {
for metricName, byDomain := range points {
name := c.buildName(metricName)
for domain, value := range byDomain {
sender.Gauge(name, value, "", []string{fmt.Sprintf("%s:%s", domainLabel, domain)})
}
}
}

func (c *checkImpl) handleMetricFamilies(mfs []*dto.MetricFamily, sender sender.Sender) {
for _, mf := range mfs {
if mf.Name == nil || mf.Type == nil || len(mf.Metric) == 0 {
if mf.Name == nil || mf.Type == nil || len(mf.Metric) == 0 || isPointTelemetryMetric(mf.GetName()) {
continue
}

Expand Down
94 changes: 94 additions & 0 deletions pkg/collector/corechecks/telemetry/check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,107 @@ import (
"testing"

"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/require"

"github.com/DataDog/datadog-agent/comp/core/autodiscovery/integration"
"github.com/DataDog/datadog-agent/pkg/aggregator/mocksender"
"github.com/DataDog/datadog-agent/pkg/collector/corechecks"
)

func stringPtr(value string) *string {
return &value
}

func float64Ptr(value float64) *float64 {
return &value
}

func gaugeMetric(labels map[string]string, value float64) *dto.Metric {
metric := &dto.Metric{Gauge: &dto.Gauge{Value: float64Ptr(value)}}
for name, value := range labels {
metric.Label = append(metric.Label, &dto.LabelPair{Name: stringPtr(name), Value: stringPtr(value)})
}
return metric
}

func gaugeMetricFamily(name string, metrics ...*dto.Metric) *dto.MetricFamily {
metricType := dto.MetricType_GAUGE
return &dto.MetricFamily{
Name: stringPtr(name),
Type: &metricType,
Metric: metrics,
}
}

func TestCollectAndMergePointTelemetry(t *testing.T) {
defaultMfs := []*dto.MetricFamily{
gaugeMetricFamily(
"point__sent",
gaugeMetric(map[string]string{domainLabel: "https://api.datadoghq.com"}, 10),
),
gaugeMetricFamily(
"point__dropped",
gaugeMetric(map[string]string{domainLabel: "https://api.datadoghq.com"}, 2),
),
}
remoteMfs := []*dto.MetricFamily{
gaugeMetricFamily(
"point__sent",
gaugeMetric(map[string]string{
domainLabel: "https://api.datadoghq.com",
remoteAgentLabel: "agent-data-plane",
}, 12),
gaugeMetric(map[string]string{
domainLabel: "https://api.datadoghq.eu",
remoteAgentLabel: "other-remote-agent",
}, 5),
gaugeMetric(map[string]string{domainLabel: "https://api.datadoghq.com"}, 100),
),
gaugeMetricFamily(
"point__dropped",
gaugeMetric(map[string]string{
domainLabel: "https://api.datadoghq.com",
remoteAgentLabel: "agent-data-plane",
}, 3),
),
}

points := collectPointTelemetry(defaultMfs, false)
mergePointTelemetry(points, collectPointTelemetry(remoteMfs, true))

require.Equal(t, pointTelemetryByDomain{
pointSentMetric: {
"https://api.datadoghq.com": 22,
"https://api.datadoghq.eu": 5,
},
pointDroppedMetric: {
"https://api.datadoghq.com": 5,
},
}, points)
}

func TestSendPointTelemetry(t *testing.T) {
sm := mocksender.CreateDefaultDemultiplexer()
c := &checkImpl{CheckBase: corechecks.NewCheckBase(CheckName)}
c.Configure(sm, integration.FakeConfigHash, nil, nil, "test", "provider")

s := mocksender.NewMockSenderWithSenderManager(c.ID(), sm)
s.On("Gauge", "datadog.agent.point.sent", 22.0, "", []string{"domain:https://api.datadoghq.com"}).Return().Times(1)
s.On("Gauge", "datadog.agent.point.dropped", 5.0, "", []string{"domain:https://api.datadoghq.com"}).Return().Times(1)

c.sendPointTelemetry(pointTelemetryByDomain{
pointSentMetric: {
"https://api.datadoghq.com": 22,
},
pointDroppedMetric: {
"https://api.datadoghq.com": 5,
},
}, s)

s.AssertExpectations(t)
}

func TestCheck(t *testing.T) {
reg := prometheus.NewRegistry()

Expand Down
Loading