Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions comp/core/agenttelemetry/impl/agenttelemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,37 @@ func (a *atel) transformMetricFamily(p *Profile, mfam *dto.MetricFamily) *agentm
}
}

func coalesceMetricFamilies(pms []*telemetry.MetricFamily) []*telemetry.MetricFamily {
mergedByName := make(map[string]*telemetry.MetricFamily)
merged := make([]*telemetry.MetricFamily, 0, len(pms))

for _, pm := range pms {
if pm == nil || pm.Name == nil || pm.Type == nil {
merged = append(merged, pm)
continue
}

name := pm.GetName()
existing := mergedByName[name]
if existing == nil {
mergedByName[name] = pm
merged = append(merged, pm)
continue
}
if existing.GetType() != pm.GetType() {
merged = append(merged, pm)
continue
}

// The regular and default telemetry registries are gathered separately. Coalesce compatible metric families with
// the same name so profile aggregation sees all time series together instead of later payload writes overwriting
// earlier ones in the sender's metric map.
existing.Metric = append(existing.Metric, pm.Metric...)
}

return merged
}

func (a *atel) reportAgentMetrics(session *senderSession, pms []*telemetry.MetricFamily, p *Profile) {
// If no metrics are configured nothing to report
if len(p.metricsMap) == 0 {
Expand Down Expand Up @@ -495,6 +526,8 @@ func (a *atel) loadPayloads(profiles []*Profile) (*senderSession, error) {
a.logComp.Errorf("failed to get filtered telemetry metrics: %v", err)
}

pms = coalesceMetricFamilies(pms)

// All metrics stored in the "pms" slice above must follow the format:
// <subsystem>__<metric_name>
// The "subsystem" and "name" should be concatenated with a double underscore ("__") separator,
Expand Down
42 changes: 42 additions & 0 deletions comp/core/agenttelemetry/impl/agenttelemetry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2109,6 +2109,48 @@ func TestUsingPayloadCompressionInAgentTelemetrySender(t *testing.T) {
assert.True(t, float64(nonCompressBodyLen)/float64(compressBodyLen) > 1.5)
}

func TestCoalescesDefaultAndNoDefaultMetricFamiliesBeforeAggregation(t *testing.T) {
var c = `
agent_telemetry:
enabled: true
profiles:
- name: points
metric:
metrics:
- name: point.sent
aggregate_tags:
- domain
- remote_agent
`

// setup and initiate atel
tel := makeTelMock(t)
s := makeSenderImpl(t, nil, c)
r := newRunnerMock()
a := getTestAtel(t, tel, c, s, nil, r)
require.True(t, a.enabled)

corePointSent := tel.NewGaugeWithOpts("point", "sent", []string{"domain"}, "", telemetry.Options{DefaultMetric: true})
adpPointSent := tel.NewGaugeWithOpts("point", "sent", []string{"domain", "remote_agent"}, "", telemetry.Options{DefaultMetric: false})
corePointSent.Set(5, "https://api.datadoghq.com")
adpPointSent.Set(400, "https://api.datadoghq.com", "agent-data-plane")

metrics, ok := getPayloadFilteredMetricList(a, "point.sent")
require.True(t, ok)
require.Len(t, metrics, 2)

coreMetric, ok := getPayloadMetricByTagValues(metrics, map[string]interface{}{"domain": "https://api.datadoghq.com"})
require.True(t, ok)
assert.Equal(t, 5.0, coreMetric.Value)

adpMetric, ok := getPayloadMetricByTagValues(metrics, map[string]interface{}{
"domain": "https://api.datadoghq.com",
"remote_agent": "agent-data-plane",
})
require.True(t, ok)
assert.Equal(t, 400.0, adpMetric.Value)
}

func TestDefaultAndNoDefaultPromRegistries(t *testing.T) {
var c = `
agent_telemetry:
Expand Down
6 changes: 6 additions & 0 deletions comp/core/agenttelemetry/impl/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,13 @@ var defaultProfiles = `
- name: logs.auto_multi_line_default_would_truncate
- name: logs_destination.destination_workers
- name: point.sent
aggregate_tags:
- domain
- remote_agent
- name: point.dropped
aggregate_tags:
- domain
- remote_agent
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is these will change only the COAT version of the metrics to start including these two tags. Since COAT is internal, strict compatibility on the shape of these metrics is not required

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should not be a problem

- name: transactions.input_count
- name: transactions.requeued
- name: transactions.retries
Expand Down
170 changes: 169 additions & 1 deletion pkg/collector/corechecks/telemetry/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package telemetry

import (
"fmt"
"slices"
"strings"

dto "github.com/prometheus/client_model/go"
Expand All @@ -25,8 +26,19 @@ const (
// CheckName is the name of the check
CheckName = "telemetry"
prefix = "datadog.agent."

domainLabel = "domain"
remoteAgentLabel = "remote_agent"

pointSentMetric = "point__sent"
pointDroppedMetric = "point__dropped"
)

// regularRegistryMergeMetrics is intentionally small and explicit: regular registry telemetry is internal by default.
// Some remote agents, such as ADP, emit telemetry that overlaps with Core Agent default telemetry; only metrics listed
// here are folded into customer-facing datadog.agent.* telemetry.
var regularRegistryMergeMetrics = []string{pointSentMetric, pointDroppedMetric}

type checkImpl struct {
corechecks.CheckBase
telemetry telemetry.Component
Expand All @@ -38,21 +50,177 @@ func (c *checkImpl) Run() error {
return err
}

// Remote Agent Registry telemetry lives in the regular registry. Gather it on a best-effort basis so failures there
// do not prevent the customer-facing telemetry check from reporting Core Agent default telemetry values.
var regularMfs []*dto.MetricFamily
if gathered, err := c.telemetry.Gather(false); err != nil {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that this will now call out to remote agents via RAR every 15 seconds (default interval on telemetry check). Per @tobz we do not expect this to be a significant runtime cost.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But why is it important to change if it will be sent out only every 15m?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

COAT is emitted every 15m, but this PR also includes the telemetry in the point.sent / point.dropped that is sent to the customer's org where we want more frequent reporting.

log.Warnf("failed to gather regular telemetry metrics for default telemetry merge: %v", err)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this fails it could get pretty noisy as it would emit every 15 seconds, wondering if I should remove it or make it debug level?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would vouch for debug level

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if it is only every 15s it seems reasonable to me. The impact is that this telemetry is missing which is significant.

} else {
regularMfs = gathered
}

mergeLabelsByMetric := discoverMergeLabels(mfs, regularMfs)
mergedMetrics := collectMergeMetrics(mfs, false, mergeLabelsByMetric)
mergedMetrics.merge(collectMergeMetrics(regularMfs, true, mergeLabelsByMetric))

sender, err := c.GetSender()
if err != nil {
return err
}

sender.SetNoIndex(true)

c.sendMergedMetrics(mergedMetrics, sender)
c.handleMetricFamilies(mfs, sender)

return nil
}

type mergeMetricSample struct {
tags []string
value float64
}

type mergeMetricValues map[string]map[string]mergeMetricSample

func newMergeMetricValues() mergeMetricValues {
return make(mergeMetricValues)
}

func mergeKey(tags []string) string {
return strings.Join(tags, "\xff")
}

func (m mergeMetricValues) add(metricName string, tags []string, value float64) {
byKey := m[metricName]
if byKey == nil {
byKey = make(map[string]mergeMetricSample)
m[metricName] = byKey
}

key := mergeKey(tags)
sample := byKey[key]
if sample.tags == nil {
sample.tags = tags
}
sample.value += value
byKey[key] = sample
}

func (m mergeMetricValues) merge(other mergeMetricValues) {
for metricName, otherByKey := range other {
for _, sample := range otherByKey {
m.add(metricName, sample.tags, sample.value)
}
}
}

func labelValue(labels []*dto.LabelPair, name string) (string, bool) {
for _, label := range labels {
if label.GetName() == name {
return label.GetValue(), true
}
}
return "", false
}

func isMergedMetric(name string) bool {
return slices.Contains(regularRegistryMergeMetrics, name)
}

func mergeLabelNames(mfs []*dto.MetricFamily, metricName string) []string {
labelNames := make(map[string]struct{})
for _, mf := range mfs {
if mf == nil || mf.GetName() != metricName {
continue
}
for _, metric := range mf.Metric {
if metric == nil {
continue
}
for _, label := range metric.Label {
name := label.GetName()
if name == "" || name == remoteAgentLabel {
continue
}
labelNames[name] = struct{}{}
}
}
}

names := make([]string, 0, len(labelNames))
for name := range labelNames {
names = append(names, name)
}
slices.Sort(names)
return names
}

func discoverMergeLabels(defaultMfs, regularMfs []*dto.MetricFamily) map[string][]string {
labelsByMetric := make(map[string][]string, len(regularRegistryMergeMetrics))
for _, metricName := range regularRegistryMergeMetrics {
labels := mergeLabelNames(defaultMfs, metricName)
if len(labels) == 0 {
// Prefer the default registry's label shape for compatibility. If it has no samples yet, fall back to the
// regular registry while still dropping remote_agent so customer-facing tags do not include attribution.
labels = mergeLabelNames(regularMfs, metricName)
}
labelsByMetric[metricName] = labels
}
return labelsByMetric
}

func mergeTags(labels []*dto.LabelPair, labelNames []string) []string {
tags := make([]string, 0, len(labelNames))
for _, labelName := range labelNames {
value, _ := labelValue(labels, labelName)
tags = append(tags, fmt.Sprintf("%s:%s", labelName, value))
}
return tags
}

func collectMergeMetrics(mfs []*dto.MetricFamily, requireRemoteAgent bool, labelsByMetric map[string][]string) mergeMetricValues {
values := newMergeMetricValues()

for _, mf := range mfs {
if mf == nil || mf.Name == nil || mf.Type == nil || !isMergedMetric(mf.GetName()) {
continue
}

if mf.GetType() != dto.MetricType_GAUGE {
log.Warnf("dropping telemetry merge metric %q with unsupported type %s", mf.GetName(), mf.GetType())
continue
}

for _, metric := range mf.Metric {
if metric == nil || metric.Gauge == nil {
continue
}
if requireRemoteAgent {
if _, ok := labelValue(metric.Label, remoteAgentLabel); !ok {
continue
}
}
values.add(mf.GetName(), mergeTags(metric.Label, labelsByMetric[mf.GetName()]), metric.Gauge.GetValue())
}
}

return values
}

func (c *checkImpl) sendMergedMetrics(values mergeMetricValues, sender sender.Sender) {
for _, metricName := range regularRegistryMergeMetrics {
for _, sample := range values[metricName] {
sender.Gauge(c.buildName(metricName), sample.value, "", sample.tags)
}
}
}

func (c *checkImpl) handleMetricFamilies(mfs []*dto.MetricFamily, sender sender.Sender) {
for _, mf := range mfs {
if mf.Name == nil || mf.Type == nil || len(mf.Metric) == 0 {
// Merged metrics are emitted explicitly by sendMergedMetrics so overlapping regular-registry values can be included
// without changing customer-facing metric names or tags.
if mf == nil || mf.Name == nil || mf.Type == nil || len(mf.Metric) == 0 || isMergedMetric(mf.GetName()) {
continue
}

Expand Down
Loading
Loading