Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 49 additions & 79 deletions v3/otel/fiber.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"context"
"io"
"net/http"
"sync"
"sync/atomic"
"time"

"github.com/gofiber/contrib/v3/otel/internal"
Expand All @@ -15,13 +13,34 @@ import (
otelcontrib "go.opentelemetry.io/contrib"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/baggage"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/propagation"
semconv "go.opentelemetry.io/otel/semconv/v1.39.0"
oteltrace "go.opentelemetry.io/otel/trace"
)

func bodyStreamSize(stream io.Reader) (int64, bool) {
if stream == nil {
return 0, false
}

if limitedReader, ok := stream.(*io.LimitedReader); ok && limitedReader.N >= 0 {
return limitedReader.N, true
}

type lenReader interface {
Len() int
}

if sizedReader, ok := stream.(lenReader); ok {
if remaining := sizedReader.Len(); remaining >= 0 {
return int64(remaining), true
}
}

return 0, false
}

const (
tracerKey = "gofiber-contrib-tracer-fiber"
instrumentationName = "github.com/gofiber/contrib/v3/otel"
Expand Down Expand Up @@ -49,51 +68,6 @@ const (
UnitMilliseconds = "ms"
)

type bodyStreamSizeReader struct {
reader io.Reader
onEOF func(read int64)
read int64
eof sync.Once
}

func (b *bodyStreamSizeReader) Read(p []byte) (n int, err error) {
n, err = b.reader.Read(p)
if n > 0 {
atomic.AddInt64(&b.read, int64(n))
}
if err == io.EOF && b.onEOF != nil {
read := atomic.LoadInt64(&b.read)
b.eof.Do(func() {
b.onEOF(read)
})
}

return n, err
}

func (b *bodyStreamSizeReader) Close() error {
closer, ok := b.reader.(io.Closer)
if !ok {
return nil
}

return closer.Close()
}

func detachedMetricContext(ctx context.Context) context.Context {
detached := context.Background()

if spanContext := oteltrace.SpanContextFromContext(ctx); spanContext.IsValid() {
detached = oteltrace.ContextWithSpanContext(detached, spanContext)
}

if bg := baggage.FromContext(ctx); bg.Len() > 0 {
detached = baggage.ContextWithBaggage(detached, bg)
}

return detached
}

// Middleware returns fiber handler which will trace incoming requests.
func Middleware(opts ...Option) fiber.Handler {
cfg := config{
Expand Down Expand Up @@ -171,17 +145,19 @@ func Middleware(opts ...Option) fiber.Handler {
copy(responseMetricAttrs, requestMetricsAttrs)

request := c.Request()
isRequestBodyStream := request.IsBodyStream()
requestSize := int64(0)
var requestBodyStreamSizeReader *bodyStreamSizeReader
if isRequestBodyStream && !cfg.withoutMetrics {
requestBodyStream := request.BodyStream()
if requestBodyStream != nil {
requestBodyStreamSizeReader = &bodyStreamSizeReader{reader: requestBodyStream}
request.SetBodyStream(requestBodyStreamSizeReader, -1)
requestSizeKnown := false
if request.IsBodyStream() {
if contentLength := request.Header.ContentLength(); contentLength >= 0 {
requestSize = int64(contentLength)
requestSizeKnown = true
} else if streamSize, ok := bodyStreamSize(request.BodyStream()); ok {
requestSize = streamSize
requestSizeKnown = true
}
} else {
requestSize = int64(len(request.Body()))
requestSizeKnown = true
}

reqHeader := make(http.Header)
Expand Down Expand Up @@ -225,39 +201,33 @@ func Middleware(opts ...Option) fiber.Handler {
response := c.Response()
isSSE := c.GetRespHeader("Content-Type") == "text/event-stream"
Comment thread
edgarsilva marked this conversation as resolved.
Outdated
responseSize := int64(0)
responseSizeKnown := false
isResponseBodyStream := response.IsBodyStream()
if !isResponseBodyStream && !isSSE {
responseSize = int64(len(response.Body()))
}

if isResponseBodyStream && !isSSE && !cfg.withoutMetrics {
responseBodyStream := response.BodyStream()
if responseBodyStream != nil {
responseMetricAttrsWithResponse := append(responseMetricAttrs, responseAttrs...)
responseMetricsCtx := detachedMetricContext(savedCtx)
responseBodyStreamReader := &bodyStreamSizeReader{
reader: responseBodyStream,
onEOF: func(read int64) {
httpServerResponseSize.Record(responseMetricsCtx, read, metric.WithAttributes(responseMetricAttrsWithResponse...))
},
}
response.SetBodyStream(responseBodyStreamReader, -1)
} else {
isResponseBodyStream = false
if isSSE {
// skip size calculation for SSE streams
} else if isResponseBodyStream {
if contentLength := response.Header.ContentLength(); contentLength >= 0 {
responseSize = int64(contentLength)
responseSizeKnown = true
} else if streamSize, ok := bodyStreamSize(response.BodyStream()); ok {
responseSize = streamSize
responseSizeKnown = true
}
} else {
responseSize = int64(len(response.Body()))
responseSizeKnown = true
}

defer func() {
responseMetricAttrs = append(responseMetricAttrs, responseAttrs...)
if requestBodyStreamSizeReader != nil {
requestSize = atomic.LoadInt64(&requestBodyStreamSizeReader.read)
}

if !cfg.withoutMetrics {
httpServerActiveRequests.Add(savedCtx, -1, metric.WithAttributes(requestMetricsAttrs...))
httpServerDuration.Record(savedCtx, time.Since(start).Seconds(), metric.WithAttributes(responseMetricAttrs...))
httpServerRequestSize.Record(savedCtx, requestSize, metric.WithAttributes(responseMetricAttrs...))
if !isResponseBodyStream {
if requestSizeKnown {
httpServerRequestSize.Record(savedCtx, requestSize, metric.WithAttributes(responseMetricAttrs...))
}
if responseSizeKnown {
httpServerResponseSize.Record(savedCtx, responseSize, metric.WithAttributes(responseMetricAttrs...))
}
}
Expand All @@ -266,7 +236,7 @@ func Middleware(opts ...Option) fiber.Handler {
cancel()
}()

if !isResponseBodyStream {
if responseSizeKnown {
span.SetAttributes(append(responseAttrs, semconv.HTTPResponseBodySizeKey.Int64(responseSize))...)
} else {
span.SetAttributes(responseAttrs...)
Expand Down
66 changes: 66 additions & 0 deletions v3/otel/fiber_stream_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package otel

import (
"io"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"testing"

"github.com/gofiber/fiber/v3"
"github.com/gofiber/fiber/v3/middleware/static"
"github.com/stretchr/testify/require"
)

func TestMiddleware_StaticAssetsDoNotHang(t *testing.T) {
t.Parallel()

dir := t.TempDir()
require.NoError(t, os.WriteFile(filepath.Clean(filepath.Join(dir, "repro.css")), []byte("body{font-family:sans-serif;}"), 0o644))
require.NoError(t, os.WriteFile(filepath.Clean(filepath.Join(dir, "repro.js")), []byte("console.log('ok');"), 0o644))

app := fiber.New()
app.Use(Middleware())
app.Use("/public", static.New(dir))

testCases := []struct {
path string
contentType string
body string
}{
{path: "/public/repro.css", contentType: "text/css", body: "body{font-family:sans-serif;}"},
{path: "/public/repro.js", contentType: "javascript", body: "console.log('ok');"},
}

for i := 0; i < 25; i++ {
for _, tc := range testCases {
resp, err := app.Test(httptest.NewRequest(http.MethodGet, tc.path, nil))
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode)

body, readErr := io.ReadAll(resp.Body)
require.NoError(t, resp.Body.Close())
require.NoError(t, readErr)
require.Equal(t, tc.body, string(body))
require.Contains(t, resp.Header.Get("Content-Type"), tc.contentType)
}
}
}

func TestMiddleware_NotFoundPathDoesNotHang(t *testing.T) {
t.Parallel()

app := fiber.New()
app.Use(Middleware())

for i := 0; i < 25; i++ {
resp, err := app.Test(httptest.NewRequest(http.MethodGet, "/.well-known/appspecific/com.chrome.devtools.json", nil))
require.NoError(t, err)
require.Equal(t, http.StatusNotFound, resp.StatusCode)

_, readErr := io.ReadAll(resp.Body)
require.NoError(t, resp.Body.Close())
require.NoError(t, readErr)
}
}