Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions comp/core/agenttelemetry/impl/agenttelemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,38 @@ func (a *atel) transformMetricFamily(p *Profile, mfam *dto.MetricFamily) *agentm
}
}

// coalesceMetricFamilies merges compatible metric families with the same name.
//
// The regular and default telemetry registries are gathered separately. Coalescing lets profile aggregation see all
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting.

// time series together instead of later payload writes overwriting earlier ones in the sender's metric map.
func coalesceMetricFamilies(pms []*telemetry.MetricFamily) []*telemetry.MetricFamily {
mergedByName := make(map[string]*telemetry.MetricFamily, len(pms))
merged := make([]*telemetry.MetricFamily, 0, len(pms))

for _, pm := range pms {
if pm == nil || pm.Name == nil || pm.Type == nil {
merged = append(merged, pm)
continue
}

name := pm.GetName()
existing := mergedByName[name]
if existing == nil {
mergedByName[name] = pm
merged = append(merged, pm)
continue
}
if existing.GetType() != pm.GetType() {
merged = append(merged, pm)
continue
}

existing.Metric = append(existing.Metric, pm.Metric...)
}

return merged
}

func (a *atel) reportAgentMetrics(session *senderSession, pms []*telemetry.MetricFamily, p *Profile) {
// If no metrics are configured nothing to report
if len(p.metricsMap) == 0 {
Expand Down Expand Up @@ -495,6 +527,8 @@ func (a *atel) loadPayloads(profiles []*Profile) (*senderSession, error) {
a.logComp.Errorf("failed to get filtered telemetry metrics: %v", err)
}

pms = coalesceMetricFamilies(pms)

// All metrics stored in the "pms" slice above must follow the format:
// <subsystem>__<metric_name>
// The "subsystem" and "name" should be concatenated with a double underscore ("__") separator,
Expand Down
42 changes: 42 additions & 0 deletions comp/core/agenttelemetry/impl/agenttelemetry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2109,6 +2109,48 @@ func TestUsingPayloadCompressionInAgentTelemetrySender(t *testing.T) {
assert.True(t, float64(nonCompressBodyLen)/float64(compressBodyLen) > 1.5)
}

func TestCoalescesDefaultAndNoDefaultMetricFamiliesBeforeAggregation(t *testing.T) {
var c = `
agent_telemetry:
enabled: true
profiles:
- name: points
metric:
metrics:
- name: point.sent
aggregate_tags:
- domain
- remote_agent
`

// setup and initiate atel
tel := makeTelMock(t)
s := makeSenderImpl(t, nil, c)
r := newRunnerMock()
a := getTestAtel(t, tel, c, s, nil, r)
require.True(t, a.enabled)

corePointSent := tel.NewGaugeWithOpts("point", "sent", []string{"domain"}, "", telemetry.Options{DefaultMetric: true})
adpPointSent := tel.NewGaugeWithOpts("point", "sent", []string{"domain", "remote_agent"}, "", telemetry.Options{DefaultMetric: false})
corePointSent.Set(5, "https://api.datadoghq.com")
adpPointSent.Set(400, "https://api.datadoghq.com", "agent-data-plane")

metrics, ok := getPayloadFilteredMetricList(a, "point.sent")
require.True(t, ok)
require.Len(t, metrics, 2)

coreMetric, ok := getPayloadMetricByTagValues(metrics, map[string]interface{}{"domain": "https://api.datadoghq.com"})
require.True(t, ok)
assert.Equal(t, 5.0, coreMetric.Value)

adpMetric, ok := getPayloadMetricByTagValues(metrics, map[string]interface{}{
"domain": "https://api.datadoghq.com",
"remote_agent": "agent-data-plane",
})
require.True(t, ok)
assert.Equal(t, 400.0, adpMetric.Value)
}

func TestDefaultAndNoDefaultPromRegistries(t *testing.T) {
var c = `
agent_telemetry:
Expand Down
6 changes: 6 additions & 0 deletions comp/core/agenttelemetry/impl/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,13 @@ var defaultProfiles = `
- name: logs.auto_multi_line_default_would_truncate
- name: logs_destination.destination_workers
- name: point.sent
aggregate_tags:
- domain
- remote_agent
- name: point.dropped
aggregate_tags:
- domain
- remote_agent
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is these will change only the COAT version of the metrics to start including these two tags. Since COAT is internal, strict compatibility on the shape of these metrics is not required

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should not be a problem

- name: transactions.input_count
- name: transactions.requeued
- name: transactions.retries
Expand Down
18 changes: 17 additions & 1 deletion pkg/collector/corechecks/telemetry/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,37 @@ func (c *checkImpl) Run() error {
return err
}

// Remote Agent Registry telemetry lives in the regular registry. Gather it on a best-effort basis so failures there
// do not prevent the customer-facing telemetry check from reporting Core Agent default telemetry values.
var regularMfs []*dto.MetricFamily
if gathered, err := c.telemetry.Gather(false); err != nil {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that this will now call out to remote agents via RAR every 15 seconds (default interval on telemetry check). Per @tobz we do not expect this to be a significant runtime cost.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But why is it important to change if it will be sent out only every 15m?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

COAT is emitted every 15m, but this PR also includes the telemetry in the point.sent / point.dropped that is sent to the customer's org where we want more frequent reporting.

log.Warnf("failed to gather regular telemetry metrics for default telemetry merge: %v", err)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this fails it could get pretty noisy as it would emit every 15 seconds, wondering if I should remove it or make it debug level?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would vouch for debug level

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if it is only every 15s it seems reasonable to me. The impact is that this telemetry is missing which is significant.

} else {
regularMfs = gathered
}

mergeLabelsByMetric := discoverMergeLabels(mfs, regularMfs)
mergedMetrics := collectMergeMetrics(mfs, false, mergeLabelsByMetric)
mergedMetrics.merge(collectMergeMetrics(regularMfs, true, mergeLabelsByMetric))

sender, err := c.GetSender()
if err != nil {
return err
}

sender.SetNoIndex(true)

c.sendMergedMetrics(mergedMetrics, sender)
c.handleMetricFamilies(mfs, sender)

return nil
}

func (c *checkImpl) handleMetricFamilies(mfs []*dto.MetricFamily, sender sender.Sender) {
for _, mf := range mfs {
if mf.Name == nil || mf.Type == nil || len(mf.Metric) == 0 {
// Merged metrics are emitted explicitly by sendMergedMetrics so overlapping regular-registry values can be included
// without changing customer-facing metric names or tags.
if mf == nil || mf.Name == nil || mf.Type == nil || len(mf.Metric) == 0 || isMergedMetric(mf.GetName()) {
continue
}

Expand Down
143 changes: 143 additions & 0 deletions pkg/collector/corechecks/telemetry/check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,156 @@ import (
"testing"

"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/require"

"github.com/DataDog/datadog-agent/comp/core/autodiscovery/integration"
"github.com/DataDog/datadog-agent/pkg/aggregator/mocksender"
"github.com/DataDog/datadog-agent/pkg/collector/corechecks"
)

const domainLabel = "domain"

func stringPtr(value string) *string {
return &value
}

func float64Ptr(value float64) *float64 {
return &value
}

func gaugeMetric(labels map[string]string, value float64) *dto.Metric {
metric := &dto.Metric{Gauge: &dto.Gauge{Value: float64Ptr(value)}}
for name, value := range labels {
metric.Label = append(metric.Label, &dto.LabelPair{Name: stringPtr(name), Value: stringPtr(value)})
}
return metric
}

func gaugeMetricFamily(name string, metrics ...*dto.Metric) *dto.MetricFamily {
metricType := dto.MetricType_GAUGE
return &dto.MetricFamily{
Name: stringPtr(name),
Type: &metricType,
Metric: metrics,
}
}

func counterMetricFamily(name string, value float64) *dto.MetricFamily {
metricType := dto.MetricType_COUNTER
return &dto.MetricFamily{
Name: stringPtr(name),
Type: &metricType,
Metric: []*dto.Metric{{
Counter: &dto.Counter{Value: float64Ptr(value)},
}},
}
}

func TestCollectAndMergeRegularRegistryMetrics(t *testing.T) {
defaultMfs := []*dto.MetricFamily{
gaugeMetricFamily(
"point__sent",
gaugeMetric(map[string]string{domainLabel: "https://api.datadoghq.com"}, 10),
gaugeMetric(map[string]string{}, 1),
),
gaugeMetricFamily(
"point__dropped",
gaugeMetric(map[string]string{domainLabel: "https://api.datadoghq.com"}, 2),
),
}
remoteMfs := []*dto.MetricFamily{
gaugeMetricFamily(
"point__sent",
gaugeMetric(map[string]string{
domainLabel: "https://api.datadoghq.com",
remoteAgentLabel: "agent-data-plane",
}, 12),
gaugeMetric(map[string]string{
domainLabel: "https://api.datadoghq.eu",
remoteAgentLabel: "other-remote-agent",
}, 5),
gaugeMetric(map[string]string{domainLabel: "https://api.datadoghq.com"}, 100),
),
gaugeMetricFamily(
"point__dropped",
gaugeMetric(map[string]string{
domainLabel: "https://api.datadoghq.com",
remoteAgentLabel: "agent-data-plane",
}, 3),
),
}

labelsByMetric := discoverMergeLabels(defaultMfs, remoteMfs)
values := collectMergeMetrics(defaultMfs, false, labelsByMetric)
values.merge(collectMergeMetrics(remoteMfs, true, labelsByMetric))

require.Equal(t, []string{domainLabel}, labelsByMetric[pointSentMetric])
require.Equal(t, []string{domainLabel}, labelsByMetric[pointDroppedMetric])

sentDefaultDomain := values[pointSentMetric][mergeKey([]string{"domain:https://api.datadoghq.com"})]
require.Equal(t, mergeMetricSample{tags: []string{"domain:https://api.datadoghq.com"}, value: 22}, sentDefaultDomain)

sentEmptyDomain := values[pointSentMetric][mergeKey([]string{"domain:"})]
require.Equal(t, mergeMetricSample{tags: []string{"domain:"}, value: 1}, sentEmptyDomain)

sentRemoteOnlyDomain := values[pointSentMetric][mergeKey([]string{"domain:https://api.datadoghq.eu"})]
require.Equal(t, mergeMetricSample{tags: []string{"domain:https://api.datadoghq.eu"}, value: 5}, sentRemoteOnlyDomain)

droppedDefaultDomain := values[pointDroppedMetric][mergeKey([]string{"domain:https://api.datadoghq.com"})]
require.Equal(t, mergeMetricSample{tags: []string{"domain:https://api.datadoghq.com"}, value: 5}, droppedDefaultDomain)
}

func TestCollectMergeMetricsSkipsNonGaugeMetrics(t *testing.T) {
mfs := []*dto.MetricFamily{counterMetricFamily(pointSentMetric, 12)}

values := collectMergeMetrics(mfs, false, map[string][]string{pointSentMetric: {}})

require.Empty(t, values)
}

func TestDiscoverMergeLabelsFallsBackToRegularRegistry(t *testing.T) {
defaultMfs := []*dto.MetricFamily{}
regularMfs := []*dto.MetricFamily{
gaugeMetricFamily(
pointSentMetric,
gaugeMetric(map[string]string{
domainLabel: "https://api.datadoghq.com",
remoteAgentLabel: "agent-data-plane",
}, 12),
),
}

labelsByMetric := discoverMergeLabels(defaultMfs, regularMfs)
values := collectMergeMetrics(regularMfs, true, labelsByMetric)

require.Equal(t, []string{domainLabel}, labelsByMetric[pointSentMetric])
require.Equal(t, mergeMetricSample{
tags: []string{"domain:https://api.datadoghq.com"},
value: 12,
}, values[pointSentMetric][mergeKey([]string{"domain:https://api.datadoghq.com"})])
}

func TestSendMergedMetrics(t *testing.T) {
sm := mocksender.CreateDefaultDemultiplexer()
c := &checkImpl{CheckBase: corechecks.NewCheckBase(CheckName)}
c.Configure(sm, integration.FakeConfigHash, nil, nil, "test", "provider")

s := mocksender.NewMockSenderWithSenderManager(c.ID(), sm)
s.On("Gauge", "datadog.agent.point.sent", 22.0, "", []string{"domain:https://api.datadoghq.com"}).Return().Times(1)
s.On("Gauge", "datadog.agent.point.sent", 1.0, "", []string{"domain:"}).Return().Times(1)
s.On("Gauge", "datadog.agent.point.dropped", 5.0, "", []string{"domain:https://api.datadoghq.com"}).Return().Times(1)

values := newMergeMetricValues()
values.add(pointSentMetric, []string{"domain:"}, 1)
values.add(pointSentMetric, []string{"domain:https://api.datadoghq.com"}, 22)
values.add(pointDroppedMetric, []string{"domain:https://api.datadoghq.com"}, 5)

c.sendMergedMetrics(values, s)

s.AssertExpectations(t)
}

func TestCheck(t *testing.T) {
reg := prometheus.NewRegistry()

Expand Down
Loading
Loading