Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 120 additions & 0 deletions common/tracing/tracing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* === This file is part of ALICE O² ===
*
* Copyright 2025 CERN and copyright holders of ALICE O².
* Author: Michal Tichak <michal.tichak@cern.ch>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* In applying this license CERN does not waive the privileges and
* immunities granted to it by virtue of its status as an
* Intergovernmental Organization or submit itself to any jurisdiction.
*/

// Package tracing provides OpenTelemetry tracing initialisation for O² Control components.
package tracing

import (
"context"
"fmt"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.40.0"
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/otel/trace/noop"
)

var Tracer trace.Tracer = noop.NewTracerProvider().Tracer("")

type Span struct {
Ctx context.Context
span trace.Span
}

func NewSpan(parent context.Context, name string, opts ...trace.SpanStartOption) Span {
ctx, span := Tracer.Start(parent, name, opts...)
return Span{Ctx: ctx, span: span}
}

func (s *Span) End() {
if s.span != nil {
s.span.End()
}
}

// SetError records err on the span and sets its status. Pass nil to mark the span OK.
func (s *Span) SetError(err error) {
if s.span == nil {
return
}
if err != nil {
s.span.RecordError(err)
s.span.SetStatus(codes.Error, err.Error())
} else {
s.span.SetStatus(codes.Ok, "")
}
}

func (s *Span) Span() trace.Span {
return s.span
}

// Run initialises the global TracerProvider and sets the package-level Tracer.
// It returns a shutdown function that must be called on process exit.
//
// \param ctx parent context
// \param endpoint OTel collector gRPC endpoint, e.g. "localhost:4317"
// \param serviceName OTLP service.name attribute, e.g. "aliecs"
func Run(ctx context.Context, endpoint string, serviceName string) (func(context.Context) error, error) {
exp, err := otlptracegrpc.New(ctx,
otlptracegrpc.WithEndpoint(endpoint),
otlptracegrpc.WithInsecure(),
)
if err != nil {
return nil, fmt.Errorf("tracing: failed to create OTLP exporter: %w", err)
}

r, err := resource.Merge(
resource.Default(),
resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceName(serviceName),
),
)
if err != nil {
return nil, fmt.Errorf("tracing: failed to create resource: %w", err)
}

tp := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(exp),
sdktrace.WithResource(r),
)

otel.SetTracerProvider(tp)
Tracer = otel.Tracer(serviceName)

return tp.Shutdown, nil
}

// Stop is a convenience wrapper — call it when you already hold the shutdown func.
func Stop(ctx context.Context, shutdown func(context.Context) error) error {
if shutdown == nil {
return nil
}
return shutdown(ctx)
}
2 changes: 2 additions & 0 deletions core/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ func setDefaults() error {
viper.SetDefault("metrics.address", getenv("LIBPROCESS_IP", "127.0.0.1"))
viper.SetDefault("metrics.port", getenvInt("PORT0", "64009"))
viper.SetDefault("metrics.path", getenv("METRICS_API_PATH", "/metrics"))
viper.SetDefault("tracingEndpoint", "")
viper.SetDefault("reposSshKey", "")
viper.SetDefault("summaryMetrics", false)
viper.SetDefault("verbose", false)
Expand Down Expand Up @@ -165,6 +166,7 @@ func setFlags() error {
pflag.String("metrics.address", viper.GetString("metrics.address"), "IP of metrics server")
pflag.Int("metrics.port", viper.GetInt("metrics.port"), "Port of metrics server (listens on server.address)")
pflag.String("metrics.path", viper.GetString("metrics.path"), "URI path to metrics endpoint")
pflag.String("tracingEndpoint", viper.GetString("tracingEndpoint"), "Endpoint of the OpenTelemetry collector gRPC service (`host:port`, tracing disabled if empty)")
pflag.String("reposSshKey", viper.GetString("reposSshKey"), "Path to a readable private ssh key for repo operations")
pflag.Bool("summaryMetrics", viper.GetBool("summaryMetrics"), "Collect summary metrics for tasks launched per-offer-cycle, offer processing time, etc.")
pflag.Bool("verbose", viper.GetBool("verbose"), "Verbose logging")
Expand Down
30 changes: 30 additions & 0 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/AliceO2Group/Control/common/logger/infologger"
"github.com/AliceO2Group/Control/common/monitoring"
pb "github.com/AliceO2Group/Control/common/protos"
"github.com/AliceO2Group/Control/common/tracing"
"github.com/AliceO2Group/Control/core/the"

"github.com/AliceO2Group/Control/common/logger"
Expand Down Expand Up @@ -74,6 +75,33 @@ func parseMetricsEndpoint(metricsEndpoint string) (error, uint16, string) {
}
}

func runTracing(ctx context.Context) func() {
endpoint := viper.GetString("tracingEndpoint")
if endpoint == "" {
log.WithField(infologger.Level, infologger.IL_Support).Info("Tracing disabled: tracingEndpoint not set")
return func() {}
}

host, port, err := net.SplitHostPort(endpoint)
if err != nil || host == "" || port == "" {
log.WithField("error", err).Errorf("Invalid tracingEndpoint %q: expected host:port format", endpoint)
return func() {}
}

shutdown, err := tracing.Run(ctx, endpoint, "aliecs")
if err != nil {
log.WithField("error", err).Error("Failed to initialize tracing")
return func() {}
}

log.WithField(infologger.Level, infologger.IL_Support).Infof("Tracing started, exporting to %s", endpoint)
return func() {
if err := tracing.Stop(ctx, shutdown); err != nil {
log.WithField("error", err).Error("Failed to shut down tracing")
}
}
}

func runMetrics() {
metricsEndpoint := viper.GetString("metricsEndpoint")
err, port, endpoint := parseMetricsEndpoint(metricsEndpoint)
Expand Down Expand Up @@ -150,6 +178,8 @@ func Run() error {

// Plugins need to start after taskman is running, because taskman provides the FID
integration.PluginsInstance().InitAll(state.taskman.GetFrameworkID())
stopTracing := runTracing(ctx)
defer stopTracing()
runMetrics()
defer golangmetrics.Stop()
defer monitoring.Stop()
Expand Down
Loading
Loading