diff --git a/common/Makefile.am b/common/Makefile.am index 53ab345f3..e4f275de7 100644 --- a/common/Makefile.am +++ b/common/Makefile.am @@ -127,6 +127,15 @@ common_libswsscommon_la_SOURCES += \ common_libswsscommon_la_LIBADD += -lyang endif +if OTLP +common_libswsscommon_la_SOURCES += \ + common/component_stats_otlp.cpp + +common_libswsscommon_la_CPPFLAGS += $(OPENTELEMETRY_CFLAGS) +common_libswsscommon_la_CXXFLAGS += $(OPENTELEMETRY_CFLAGS) +common_libswsscommon_la_LIBADD += $(OPENTELEMETRY_LIBS) +endif + common_swssloglevel_SOURCES = \ common/loglevel.cpp \ common/loglevel_util.cpp diff --git a/common/component_stats_otlp.cpp b/common/component_stats_otlp.cpp new file mode 100644 index 000000000..446bbf9f8 --- /dev/null +++ b/common/component_stats_otlp.cpp @@ -0,0 +1,245 @@ +#include "common/component_stats_otlp.h" + +#include "common/logger.h" + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace otlp_exporter = opentelemetry::exporter::otlp; +namespace sdk_common = opentelemetry::sdk::common; +namespace sdk_metrics = opentelemetry::sdk::metrics; +namespace sdk_resource = opentelemetry::sdk::resource; +namespace sdk_scope = opentelemetry::sdk::instrumentationscope; + +namespace swss { + +namespace { + +// One InstrumentationScope per process is plenty; the SDK does not require +// it to be unique per sink. +std::unique_ptr makeScope() +{ + return sdk_scope::InstrumentationScope::Create( + "swss.component_stats", "1.0", "https://opentelemetry.io/schemas/1.30.0"); +} + +opentelemetry::common::SystemTimestamp toSystemTimestamp(uint64_t unixNano) +{ + return opentelemetry::common::SystemTimestamp{ + std::chrono::system_clock::time_point{std::chrono::nanoseconds{unixNano}}}; +} + +} // namespace + +struct OtlpSink::Impl +{ + // Per-series state needed to convert cumulative counters into the delta + // points that Geneva mdm requires. Keyed by "\x1f". + struct SeriesState + { + uint64_t lastValue = 0; + opentelemetry::common::SystemTimestamp lastEndTs; + bool hasLastEndTs = false; + }; + + Config config; + sdk_resource::Resource resource; + std::unique_ptr scope; + std::unique_ptr exporter; + std::unordered_map series; + opentelemetry::common::SystemTimestamp creationTs; + bool stopped = false; + + explicit Impl(Config c) + : config(std::move(c)), + resource(sdk_resource::Resource::Create({ + {"service.name", config.componentName}, + {"service.instance.id", config.serviceInstanceId}, + {"sonic.component", config.componentName}, + })), + scope(makeScope()), + creationTs(toSystemTimestamp(config.startTimeUnixNano)) + { + otlp_exporter::OtlpGrpcMetricExporterOptions opts; + opts.endpoint = config.endpoint; + opts.use_ssl_credentials = false; + opts.timeout = std::chrono::duration_cast( + config.exportTimeout); + exporter = otlp_exporter::OtlpGrpcMetricExporterFactory::Create(opts); + } + + bool exportBatch(const std::vector& points) + { + if (stopped) + { + return false; + } + if (points.empty()) + { + return true; + } + + const auto endTs = opentelemetry::common::SystemTimestamp{std::chrono::system_clock::now()}; + + // Group data points by full metric name. Each metric carries one + // PointDataAttributes per entity, which keeps "entity" as a label + // rather than as part of the metric name. + // + // For Sum points we emit DELTA temporality (Geneva mdm rejects + // CUMULATIVE), so we maintain a per-series cache of the last + // cumulative value and the last end_ts. delta = current - last; on + // counter reset (current < last) we treat current as the delta and + // restart the window at creationTs. + std::unordered_map byMetric; + + for (const auto& dp : points) + { + const std::string fullName = "sonic." + config.componentName + "." + dp.metric; + const std::string seriesKey = dp.entity + "\x1f" + dp.metric; + auto& state = series[seriesKey]; + + auto it = byMetric.find(fullName); + if (it == byMetric.end()) + { + sdk_metrics::MetricData md; + md.instrument_descriptor.name_ = fullName; + md.instrument_descriptor.description_ = ""; + md.instrument_descriptor.unit_ = "1"; + md.instrument_descriptor.type_ = dp.isMonotonic + ? sdk_metrics::InstrumentType::kCounter + : sdk_metrics::InstrumentType::kGauge; + md.instrument_descriptor.value_type_ = sdk_metrics::InstrumentValueType::kLong; + // Gauge ignores temporality; Sum requires DELTA for mdm. + md.aggregation_temporality = dp.isMonotonic + ? sdk_metrics::AggregationTemporality::kDelta + : sdk_metrics::AggregationTemporality::kUnspecified; + md.start_ts = state.hasLastEndTs ? state.lastEndTs : creationTs; + md.end_ts = endTs; + it = byMetric.emplace(fullName, std::move(md)).first; + } + + sdk_metrics::PointDataAttributes pda; + pda.attributes.SetAttribute("entity", dp.entity); + + if (dp.isMonotonic) + { + const uint64_t delta = (dp.value >= state.lastValue) + ? (dp.value - state.lastValue) + : dp.value; // counter reset: treat current as delta + + sdk_metrics::SumPointData spd; + spd.value_ = static_cast(delta); + spd.is_monotonic_ = true; + pda.point_data = std::move(spd); + } + else + { + sdk_metrics::LastValuePointData lvd; + lvd.value_ = static_cast(dp.value); + lvd.is_lastvalue_valid_ = true; + lvd.sample_ts_ = endTs; + pda.point_data = std::move(lvd); + } + + it->second.point_data_attr_.push_back(std::move(pda)); + + state.lastValue = dp.value; + state.lastEndTs = endTs; + state.hasLastEndTs = true; + } + + sdk_metrics::ScopeMetrics scopeMetrics; + scopeMetrics.scope_ = scope.get(); + scopeMetrics.metric_data_.reserve(byMetric.size()); + for (auto& kv : byMetric) + { + scopeMetrics.metric_data_.push_back(std::move(kv.second)); + } + + sdk_metrics::ResourceMetrics rm; + rm.resource_ = &resource; + rm.scope_metric_data_.push_back(std::move(scopeMetrics)); + + try + { + const auto result = exporter->Export(rm); + if (result != sdk_common::ExportResult::kSuccess) + { + SWSS_LOG_WARN("OtlpSink: Export to %s returned %d", + config.endpoint.c_str(), static_cast(result)); + return false; + } + } + catch (const std::exception& e) + { + SWSS_LOG_WARN("OtlpSink: Export to %s threw: %s", + config.endpoint.c_str(), e.what()); + return false; + } + return true; + } + + void shutdown() + { + if (stopped) + { + return; + } + stopped = true; + + if (!exporter) + { + return; + } + + try + { + exporter->ForceFlush(config.exportTimeout); + exporter->Shutdown(config.exportTimeout); + } + catch (...) + { + // shutdown() is contractually noexcept from the caller's view. + } + } +}; + +OtlpSink::OtlpSink(Config config) + : m_impl(std::make_unique(std::move(config))) +{ +} + +OtlpSink::~OtlpSink() +{ + if (m_impl) + { + m_impl->shutdown(); + } +} + +OtlpSink::OtlpSink(OtlpSink&&) noexcept = default; +OtlpSink& OtlpSink::operator=(OtlpSink&&) noexcept = default; + +bool OtlpSink::exportBatch(const std::vector& points) +{ + return m_impl->exportBatch(points); +} + +void OtlpSink::shutdown() +{ + m_impl->shutdown(); +} + +} // namespace swss diff --git a/common/component_stats_otlp.h b/common/component_stats_otlp.h new file mode 100644 index 000000000..56da30677 --- /dev/null +++ b/common/component_stats_otlp.h @@ -0,0 +1,102 @@ +#pragma once + +#include +#include +#include +#include +#include + +namespace swss { + +// OtlpSink converts ComponentStats counter snapshots into OTLP/gRPC metric +// exports destined for a local OpenTelemetry Collector. +// +// This is the OTLP half of the dual-sink design described in +// doc/component-stats/component-stats-hld.md (sonic-net/SONiC#2312). Phase 1 +// (sonic-net/sonic-swss-common#1180 and sonic-net/sonic-swss#4516) provides +// the in-memory counters and the COUNTERS_DB sink. Phase 2 plugs this class +// into the ComponentStats writer thread so the same snapshot is also +// exported via OTLP. +// +// Design notes: +// * Construction does not connect; the underlying gRPC channel is created +// lazily on the first exportBatch() call. +// * Failures are non-throwing: exportBatch() returns false and writes one +// log line, so a dead Collector cannot stall the ComponentStats writer +// thread or affect the DB sink (HLD requirement R9). +// * The class uses the PIMPL idiom so that callers (notably +// ComponentStats) do not transitively include any OpenTelemetry C++ SDK +// headers. +// * The class is move-only by design; copying a sink would imply two +// gRPC channels exporting the same counters. +class OtlpSink +{ +public: + struct Config + { + // gRPC endpoint, e.g. "localhost:4317". Plaintext on loopback by + // design — TLS and authentication live in the local OTel Collector, + // not in the producer. + std::string endpoint = "localhost:4317"; + + // OTel resource attributes applied to every batch. + // componentName → service.name, sonic.component + // serviceInstanceId → service.instance.id + std::string componentName; + std::string serviceInstanceId; + + // Per-Export() deadline. Short by design: the writer thread runs + // every intervalSec (default 1 s) and must not block the next tick. + std::chrono::milliseconds exportTimeout{500}; + + // Wall-clock time at which this sink was constructed, used as the + // start_time of the very first delta export per (entity, metric). + // Subsequent exports use the previous export's end_time as their + // start_time, which is the OTLP-defined contract for delta + // temporality. + uint64_t startTimeUnixNano = 0; + }; + + // One counter sample contributed by the ComponentStats writer thread. + // + // Final OTel metric name is "sonic..". + // The entity is exported as a data-point attribute, never as part of + // the metric name, so dashboards can pivot freely. + // + // value is always the *cumulative* in-memory counter from + // ComponentStats. The sink converts cumulative → delta internally + // (Geneva mdm only accepts AGGREGATION_TEMPORALITY_DELTA), so callers + // never have to track per-sample state. + struct DataPoint + { + std::string entity; + std::string metric; + uint64_t value = 0; + bool isMonotonic = true; // false ⇒ exported as Gauge + }; + + explicit OtlpSink(Config config); + ~OtlpSink(); + + OtlpSink(const OtlpSink&) = delete; + OtlpSink& operator=(const OtlpSink&) = delete; + OtlpSink(OtlpSink&&) noexcept; + OtlpSink& operator=(OtlpSink&&) noexcept; + + // Convert one snapshot to OTLP and ship it. Returns true on a + // successful Export() RPC. Never throws. Safe to call from the + // ComponentStats writer thread. + // + // An empty batch is a no-op and returns true. + bool exportBatch(const std::vector& points); + + // Flush in-flight batches and tear down the gRPC channel. Idempotent; + // exportBatch() after shutdown() is a no-op that returns false. + void shutdown(); + +private: + struct Impl; + std::unique_ptr m_impl; +}; + +} // namespace swss diff --git a/configure.ac b/configure.ac index 287ff2722..13c973178 100644 --- a/configure.ac +++ b/configure.ac @@ -44,9 +44,39 @@ AC_ARG_ENABLE(yangmodules, no) yangmodules=false ;; *) AC_MSG_ERROR(bad value ${enableval} for --enable-yangmodules) ;; esac],[yangmodules=true]) +AC_ARG_ENABLE(otlp, +[ --enable-otlp Build the OTLP sink (requires opentelemetry-cpp)], +[case "${enableval}" in + yes) otlp=true ;; + no) otlp=false ;; + *) AC_MSG_ERROR(bad value ${enableval} for --enable-otlp) ;; +esac],[otlp=false]) AM_CONDITIONAL(DEBUG, test x$debug = xtrue) AM_CONDITIONAL(PYTHON2, test x$python2 = xtrue) AM_CONDITIONAL(YANGMODS, test x$yangmodules = xtrue) +AM_CONDITIONAL(OTLP, test x$otlp = xtrue) + +if test x$otlp = xtrue; then + PKG_CHECK_MODULES([OPENTELEMETRY], + [opentelemetry_api opentelemetry_sdk opentelemetry_exporter_otlp_grpc], + [have_otlp_pkgconfig=yes], + [have_otlp_pkgconfig=no]) + if test x$have_otlp_pkgconfig = xno; then + # opentelemetry-cpp does not ship pkg-config files in all distributions. + # Fall back to header + library probes so the build still succeeds when + # the SDK is installed under a standard prefix. + AC_LANG_PUSH([C++]) + AC_CHECK_HEADER([opentelemetry/version.h], [], + [AC_MSG_ERROR([--enable-otlp requested but opentelemetry-cpp headers were not found. Install opentelemetry-cpp or pass CPPFLAGS=-I/include.])]) + AC_LANG_POP([C++]) + OPENTELEMETRY_CFLAGS="" + OPENTELEMETRY_LIBS="-lopentelemetry_exporter_otlp_grpc -lopentelemetry_otlp_recordable -lopentelemetry_proto -lopentelemetry_resources -lopentelemetry_trace -lopentelemetry_metrics -lopentelemetry_common -lgrpc++ -lgrpc -lprotobuf" + fi + AC_DEFINE([HAVE_OTLP], [1], [Define to 1 if the OTLP sink is built]) +fi +AC_SUBST([OPENTELEMETRY_CFLAGS]) +AC_SUBST([OPENTELEMETRY_LIBS]) + if test x$CONFIGURED_ARCH = xarmhf && test x$CROSS_BUILD_ENVIRON = xy; then AM_CONDITIONAL(ARCH64, false) else diff --git a/debian/rules b/debian/rules index 928253b00..f2cf172a3 100755 --- a/debian/rules +++ b/debian/rules @@ -35,6 +35,16 @@ else CONFIGURE_ARGS += --enable-yangmodules endif +# Build the OTLP sink only when the 'otlp' build profile is active. The +# OTLP sink depends on opentelemetry-cpp, which is not yet packaged in +# every distribution; the default build keeps OTLP off so behaviour is +# unchanged. +ifneq (,$(filter otlp,$(DEB_BUILD_PROFILES))) +CONFIGURE_ARGS += --enable-otlp +else +CONFIGURE_ARGS += --disable-otlp +endif + # main packaging script %: dh $@ diff --git a/tests/Makefile.am b/tests/Makefile.am index 2724e2e3c..5b44d1f77 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -55,3 +55,9 @@ tests_tests_LDADD = $(LDADD_GTEST) -lpthread common/libswsscommon.la $(LIBNL_LIB if YANGMODS tests_tests_SOURCES += tests/defaultvalueprovider_ut.cpp endif + +if OTLP +tests_tests_SOURCES += tests/component_stats_otlp_ut.cpp +tests_tests_CPPFLAGS += $(OPENTELEMETRY_CFLAGS) +tests_tests_LDADD += $(OPENTELEMETRY_LIBS) +endif diff --git a/tests/component_stats_otlp_ut.cpp b/tests/component_stats_otlp_ut.cpp new file mode 100644 index 000000000..7f1a24456 --- /dev/null +++ b/tests/component_stats_otlp_ut.cpp @@ -0,0 +1,104 @@ +#include "common/component_stats_otlp.h" + +#include +#include +#include + +namespace swss { +namespace test { + +namespace { + +OtlpSink::Config makeConfig() +{ + OtlpSink::Config c; + // Pick a port that no Collector is listening on. The point of these + // tests is to verify the wrapper's contract — never throw, never crash — + // not to validate the on-the-wire OTLP shape, which requires an + // in-process gRPC mock server (deferred to a follow-up). + c.endpoint = "127.0.0.1:14317"; + c.componentName = "swss-ut"; + c.serviceInstanceId = "ut-host"; + c.startTimeUnixNano = 1700000000000000000ULL; + c.exportTimeout = std::chrono::milliseconds(100); + return c; +} + +} // namespace + +TEST(OtlpSink, ConstructAndDestructDoesNotCrash) +{ + OtlpSink sink(makeConfig()); + SUCCEED(); +} + +TEST(OtlpSink, EmptyBatchIsNoOp) +{ + OtlpSink sink(makeConfig()); + EXPECT_TRUE(sink.exportBatch({})); +} + +TEST(OtlpSink, ExportToUnreachableCollectorDoesNotThrow) +{ + OtlpSink sink(makeConfig()); + const std::vector points = { + {"PORT_TABLE", "SET", 42, /*isMonotonic*/ true}, + {"PORT_TABLE", "DEL", 7, /*isMonotonic*/ true}, + {"PORT_TABLE", "BACKLOG", 3, /*isMonotonic*/ false}, // gauge + }; + + bool ok = true; + EXPECT_NO_THROW(ok = sink.exportBatch(points)); + // The exact return value depends on local network behaviour: in CI the + // gRPC client may report failure quickly, on a developer box it may + // succeed against a stray listener. The wrapper's contract only forbids + // throwing; the boolean is informational. + (void)ok; +} + +TEST(OtlpSink, ShutdownIsIdempotent) +{ + OtlpSink sink(makeConfig()); + sink.shutdown(); + sink.shutdown(); + EXPECT_FALSE(sink.exportBatch({{"E", "M", 1, true}})); +} + +TEST(OtlpSink, MovedFromInstanceIsHarmless) +{ + OtlpSink first(makeConfig()); + OtlpSink second(std::move(first)); + EXPECT_TRUE(second.exportBatch({})); +} + +TEST(OtlpSink, DeltaConversionDoesNotThrowAcrossMultipleExports) +{ + OtlpSink sink(makeConfig()); + // Three consecutive cumulative snapshots of the same series. The sink + // is expected to convert these to deltas (10, 5, 8) internally without + // crashing or throwing, regardless of whether the export RPC itself + // succeeds. + const std::vector first = {{"PORT", "SET", 10, true}}; + const std::vector second = {{"PORT", "SET", 15, true}}; + const std::vector third = {{"PORT", "SET", 23, true}}; + + EXPECT_NO_THROW({ + (void)sink.exportBatch(first); + (void)sink.exportBatch(second); + (void)sink.exportBatch(third); + }); +} + +TEST(OtlpSink, CounterResetIsTreatedAsDelta) +{ + OtlpSink sink(makeConfig()); + // After a counter reset (current < last), the sink must not underflow + // a uint64_t. The contract is to emit the current value as the delta. + EXPECT_NO_THROW({ + (void)sink.exportBatch({{"PORT", "SET", 100, true}}); + (void)sink.exportBatch({{"PORT", "SET", 7, true}}); + }); +} + +} // namespace test +} // namespace swss