From 636838e1734cf915dbc81280431960e9bec08cac Mon Sep 17 00:00:00 2001 From: edgarsilva Date: Wed, 6 May 2026 18:00:30 -0600 Subject: [PATCH 1/2] fix(otel): avoid replacing body streams Stop wrapping request/response body streams for size metrics so middleware no longer triggers fasthttp stream reset/close side effects on static file readers. Add regression coverage for repeated static and .well-known requests while preserving size metrics when stream length is known. --- v3/otel/fiber.go | 128 ++++++++++++++--------------------- v3/otel/fiber_stream_test.go | 66 ++++++++++++++++++ 2 files changed, 115 insertions(+), 79 deletions(-) create mode 100644 v3/otel/fiber_stream_test.go diff --git a/v3/otel/fiber.go b/v3/otel/fiber.go index 13af3f3d1..309077dbc 100644 --- a/v3/otel/fiber.go +++ b/v3/otel/fiber.go @@ -4,8 +4,6 @@ import ( "context" "io" "net/http" - "sync" - "sync/atomic" "time" "github.com/gofiber/contrib/v3/otel/internal" @@ -15,13 +13,34 @@ import ( otelcontrib "go.opentelemetry.io/contrib" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/baggage" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/propagation" semconv "go.opentelemetry.io/otel/semconv/v1.39.0" oteltrace "go.opentelemetry.io/otel/trace" ) +func bodyStreamSize(stream io.Reader) (int64, bool) { + if stream == nil { + return 0, false + } + + if limitedReader, ok := stream.(*io.LimitedReader); ok && limitedReader.N >= 0 { + return limitedReader.N, true + } + + type lenReader interface { + Len() int + } + + if sizedReader, ok := stream.(lenReader); ok { + if remaining := sizedReader.Len(); remaining >= 0 { + return int64(remaining), true + } + } + + return 0, false +} + const ( tracerKey = "gofiber-contrib-tracer-fiber" instrumentationName = "github.com/gofiber/contrib/v3/otel" @@ -49,51 +68,6 @@ const ( UnitMilliseconds = "ms" ) -type bodyStreamSizeReader struct { - reader io.Reader - onEOF func(read int64) - read int64 - eof sync.Once -} - -func (b *bodyStreamSizeReader) Read(p []byte) (n int, err error) { - n, err = b.reader.Read(p) - if n > 0 { - atomic.AddInt64(&b.read, int64(n)) - } - if err == io.EOF && b.onEOF != nil { - read := atomic.LoadInt64(&b.read) - b.eof.Do(func() { - b.onEOF(read) - }) - } - - return n, err -} - -func (b *bodyStreamSizeReader) Close() error { - closer, ok := b.reader.(io.Closer) - if !ok { - return nil - } - - return closer.Close() -} - -func detachedMetricContext(ctx context.Context) context.Context { - detached := context.Background() - - if spanContext := oteltrace.SpanContextFromContext(ctx); spanContext.IsValid() { - detached = oteltrace.ContextWithSpanContext(detached, spanContext) - } - - if bg := baggage.FromContext(ctx); bg.Len() > 0 { - detached = baggage.ContextWithBaggage(detached, bg) - } - - return detached -} - // Middleware returns fiber handler which will trace incoming requests. func Middleware(opts ...Option) fiber.Handler { cfg := config{ @@ -171,17 +145,19 @@ func Middleware(opts ...Option) fiber.Handler { copy(responseMetricAttrs, requestMetricsAttrs) request := c.Request() - isRequestBodyStream := request.IsBodyStream() requestSize := int64(0) - var requestBodyStreamSizeReader *bodyStreamSizeReader - if isRequestBodyStream && !cfg.withoutMetrics { - requestBodyStream := request.BodyStream() - if requestBodyStream != nil { - requestBodyStreamSizeReader = &bodyStreamSizeReader{reader: requestBodyStream} - request.SetBodyStream(requestBodyStreamSizeReader, -1) + requestSizeKnown := false + if request.IsBodyStream() { + if contentLength := request.Header.ContentLength(); contentLength >= 0 { + requestSize = int64(contentLength) + requestSizeKnown = true + } else if streamSize, ok := bodyStreamSize(request.BodyStream()); ok { + requestSize = streamSize + requestSizeKnown = true } } else { requestSize = int64(len(request.Body())) + requestSizeKnown = true } reqHeader := make(http.Header) @@ -225,39 +201,33 @@ func Middleware(opts ...Option) fiber.Handler { response := c.Response() isSSE := c.GetRespHeader("Content-Type") == "text/event-stream" responseSize := int64(0) + responseSizeKnown := false isResponseBodyStream := response.IsBodyStream() - if !isResponseBodyStream && !isSSE { - responseSize = int64(len(response.Body())) - } - - if isResponseBodyStream && !isSSE && !cfg.withoutMetrics { - responseBodyStream := response.BodyStream() - if responseBodyStream != nil { - responseMetricAttrsWithResponse := append(responseMetricAttrs, responseAttrs...) - responseMetricsCtx := detachedMetricContext(savedCtx) - responseBodyStreamReader := &bodyStreamSizeReader{ - reader: responseBodyStream, - onEOF: func(read int64) { - httpServerResponseSize.Record(responseMetricsCtx, read, metric.WithAttributes(responseMetricAttrsWithResponse...)) - }, - } - response.SetBodyStream(responseBodyStreamReader, -1) - } else { - isResponseBodyStream = false + if isSSE { + // skip size calculation for SSE streams + } else if isResponseBodyStream { + if contentLength := response.Header.ContentLength(); contentLength >= 0 { + responseSize = int64(contentLength) + responseSizeKnown = true + } else if streamSize, ok := bodyStreamSize(response.BodyStream()); ok { + responseSize = streamSize + responseSizeKnown = true } + } else { + responseSize = int64(len(response.Body())) + responseSizeKnown = true } defer func() { responseMetricAttrs = append(responseMetricAttrs, responseAttrs...) - if requestBodyStreamSizeReader != nil { - requestSize = atomic.LoadInt64(&requestBodyStreamSizeReader.read) - } if !cfg.withoutMetrics { httpServerActiveRequests.Add(savedCtx, -1, metric.WithAttributes(requestMetricsAttrs...)) httpServerDuration.Record(savedCtx, time.Since(start).Seconds(), metric.WithAttributes(responseMetricAttrs...)) - httpServerRequestSize.Record(savedCtx, requestSize, metric.WithAttributes(responseMetricAttrs...)) - if !isResponseBodyStream { + if requestSizeKnown { + httpServerRequestSize.Record(savedCtx, requestSize, metric.WithAttributes(responseMetricAttrs...)) + } + if responseSizeKnown { httpServerResponseSize.Record(savedCtx, responseSize, metric.WithAttributes(responseMetricAttrs...)) } } @@ -266,7 +236,7 @@ func Middleware(opts ...Option) fiber.Handler { cancel() }() - if !isResponseBodyStream { + if responseSizeKnown { span.SetAttributes(append(responseAttrs, semconv.HTTPResponseBodySizeKey.Int64(responseSize))...) } else { span.SetAttributes(responseAttrs...) diff --git a/v3/otel/fiber_stream_test.go b/v3/otel/fiber_stream_test.go new file mode 100644 index 000000000..50b0cfa3e --- /dev/null +++ b/v3/otel/fiber_stream_test.go @@ -0,0 +1,66 @@ +package otel + +import ( + "io" + "net/http" + "net/http/httptest" + "os" + "path/filepath" + "testing" + + "github.com/gofiber/fiber/v3" + "github.com/gofiber/fiber/v3/middleware/static" + "github.com/stretchr/testify/require" +) + +func TestMiddleware_StaticAssetsDoNotHang(t *testing.T) { + t.Parallel() + + dir := t.TempDir() + require.NoError(t, os.WriteFile(filepath.Clean(filepath.Join(dir, "repro.css")), []byte("body{font-family:sans-serif;}"), 0o644)) + require.NoError(t, os.WriteFile(filepath.Clean(filepath.Join(dir, "repro.js")), []byte("console.log('ok');"), 0o644)) + + app := fiber.New() + app.Use(Middleware()) + app.Use("/public", static.New(dir)) + + testCases := []struct { + path string + contentType string + body string + }{ + {path: "/public/repro.css", contentType: "text/css", body: "body{font-family:sans-serif;}"}, + {path: "/public/repro.js", contentType: "javascript", body: "console.log('ok');"}, + } + + for i := 0; i < 25; i++ { + for _, tc := range testCases { + resp, err := app.Test(httptest.NewRequest(http.MethodGet, tc.path, nil)) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + + body, readErr := io.ReadAll(resp.Body) + require.NoError(t, resp.Body.Close()) + require.NoError(t, readErr) + require.Equal(t, tc.body, string(body)) + require.Contains(t, resp.Header.Get("Content-Type"), tc.contentType) + } + } +} + +func TestMiddleware_NotFoundPathDoesNotHang(t *testing.T) { + t.Parallel() + + app := fiber.New() + app.Use(Middleware()) + + for i := 0; i < 25; i++ { + resp, err := app.Test(httptest.NewRequest(http.MethodGet, "/.well-known/appspecific/com.chrome.devtools.json", nil)) + require.NoError(t, err) + require.Equal(t, http.StatusNotFound, resp.StatusCode) + + _, readErr := io.ReadAll(resp.Body) + require.NoError(t, resp.Body.Close()) + require.NoError(t, readErr) + } +} From 2134901f7c9f2de330594e4be84c8e8318022668 Mon Sep 17 00:00:00 2001 From: edgarsilva Date: Wed, 6 May 2026 18:15:37 -0600 Subject: [PATCH 2/2] fix(otel): normalize SSE content type check Treat response Content-Type as media type plus optional parameters and match SSE values case-insensitively so event streams are consistently excluded from body-size calculation. --- v3/otel/fiber.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/v3/otel/fiber.go b/v3/otel/fiber.go index 309077dbc..3cbc29bd1 100644 --- a/v3/otel/fiber.go +++ b/v3/otel/fiber.go @@ -4,6 +4,7 @@ import ( "context" "io" "net/http" + "strings" "time" "github.com/gofiber/contrib/v3/otel/internal" @@ -199,7 +200,8 @@ func Middleware(opts ...Option) fiber.Handler { } response := c.Response() - isSSE := c.GetRespHeader("Content-Type") == "text/event-stream" + contentType, _, _ := strings.Cut(c.GetRespHeader("Content-Type"), ";") + isSSE := utils.EqualFold(strings.TrimSpace(contentType), "text/event-stream") responseSize := int64(0) responseSizeKnown := false isResponseBodyStream := response.IsBodyStream()