diff --git a/apps/search/src/instrumentations/otel-node.ts b/apps/search/src/instrumentations/otel-node.ts index 54cbe87da..38df3ca0c 100644 --- a/apps/search/src/instrumentations/otel-node.ts +++ b/apps/search/src/instrumentations/otel-node.ts @@ -4,15 +4,31 @@ import { ATTR_SERVICE_INSTANCE_ID, } from "@opentelemetry/semantic-conventions/incubating"; import { createBatchSpanProcessor } from "@saleor/apps-otel/src/batch-span-processor-factory"; +import { DeferredSampler } from "@saleor/apps-otel/src/deferred-sampler"; import { createHttpInstrumentation } from "@saleor/apps-otel/src/http-instrumentation-factory"; import { ObservabilityAttributes } from "@saleor/apps-otel/src/observability-attributes"; import { createServiceInstanceId } from "@saleor/apps-otel/src/service-instance-id-factory"; +import { TraceBufferingTailSamplingProcessor } from "@saleor/apps-otel/src/trace-buffering-tail-sampling-processor"; import { registerOTel } from "@vercel/otel"; import pkg from "../../package.json"; +const batchProcessor = createBatchSpanProcessor({ + accessToken: process.env.OTEL_ACCESS_TOKEN, +}); + +const tailSamplingProcessor = new TraceBufferingTailSamplingProcessor({ + processor: batchProcessor, + slowThresholdMs: 5000, + bufferTimeoutMs: 55000, // Under Vercel's 60s timeout + exportErrors: true, + exportSlowSpans: true, +}); + registerOTel({ serviceName: process.env.OTEL_SERVICE_NAME, + // Note: DeferredSampler + TraceBufferingTailSamplingProcessor must be used together + traceSampler: new DeferredSampler(), attributes: { [ATTR_SERVICE_VERSION]: pkg.version, [ATTR_DEPLOYMENT_ENVIRONMENT_NAME]: process.env.ENV, @@ -23,10 +39,6 @@ registerOTel({ env: undefined, [ObservabilityAttributes.VERCEL_ENV]: process.env.VERCEL_ENV, }, - spanProcessors: [ - createBatchSpanProcessor({ - accessToken: process.env.OTEL_ACCESS_TOKEN, - }), - ], - instrumentations: [createHttpInstrumentation()], + spanProcessors: [tailSamplingProcessor], + instrumentations: [createHttpInstrumentation({ usingDeferredSpanProcessor: true })], }); diff --git a/cspell.config.js b/cspell.config.js index a19e69414..0d8ad29ab 100644 --- a/cspell.config.js +++ b/cspell.config.js @@ -79,6 +79,7 @@ export default { "claude", "SEPA", "mailpit", + "traceparent" ], language: "en-US", useGitignore: true, @@ -88,5 +89,8 @@ export default { "**/schema.graphql", "**/generated/types.ts", "**/*.test.ts", + // Copied from @vercel/otel - not our code to spellcheck + "packages/otel/src/fetch-instrumentation.ts", + "packages/otel/src/vercel-resource-attributes.ts", ], }; diff --git a/packages/otel/package.json b/packages/otel/package.json index 2260c0866..3c4356036 100644 --- a/packages/otel/package.json +++ b/packages/otel/package.json @@ -5,7 +5,9 @@ "scripts": { "check-types": "tsc", "lint": "eslint .", - "lint:fix": "eslint --fix ." + "lint:fix": "eslint --fix .", + "test": "vitest", + "test:ci": "vitest run --coverage" }, "dependencies": { "@opentelemetry/exporter-metrics-otlp-http": "catalog:", @@ -18,6 +20,7 @@ }, "devDependencies": { "@opentelemetry/api": "catalog:", + "@opentelemetry/instrumentation": "catalog:", "@opentelemetry/sdk-metrics": "catalog:", "@opentelemetry/sdk-trace-node": "catalog:", "@opentelemetry/semantic-conventions": "catalog:", @@ -25,17 +28,23 @@ "@saleor/eslint-config-apps": "workspace:*", "@saleor/typescript-config-apps": "workspace:*", "@types/node": "catalog:", + "@vercel/otel": "catalog:", + "@vitest/coverage-v8": "catalog:", "eslint": "catalog:", "next": "catalog:", "typescript": "catalog:", - "urql": "catalog:" + "urql": "catalog:", + "vite": "catalog:", + "vitest": "catalog:" }, "peerDependencies": { "@opentelemetry/api": "catalog:", + "@opentelemetry/instrumentation": "catalog:", "@opentelemetry/sdk-metrics": "catalog:", "@opentelemetry/sdk-trace-node": "catalog:", "@opentelemetry/semantic-conventions": "catalog:", "@saleor/app-sdk": "catalog:", + "@vercel/otel": "catalog:", "next": "catalog:", "urql": "catalog:" } diff --git a/packages/otel/src/deferred-sampler.test.ts b/packages/otel/src/deferred-sampler.test.ts new file mode 100644 index 000000000..1a080fd12 --- /dev/null +++ b/packages/otel/src/deferred-sampler.test.ts @@ -0,0 +1,79 @@ +import { ROOT_CONTEXT, SpanKind, trace, TraceFlags } from "@opentelemetry/api"; +import { SamplingDecision } from "@opentelemetry/sdk-trace-node"; +import { describe, expect, it } from "vitest"; + +import { DeferredSampler, SALEOR_SAMPLING_DECISION_ATTR } from "./deferred-sampler"; + +describe("DeferredSampler", () => { + const traceId = "0af7651916cd43dd8448eb211c80319c"; + const spanName = "test-span"; + const spanKind = SpanKind.SERVER; + const attributes = {}; + const links: never[] = []; + + describe("when parent is sampled", () => { + it("should return RECORD_AND_SAMPLED decision", () => { + const sampler = new DeferredSampler(); + const parentContext = trace.setSpanContext(ROOT_CONTEXT, { + traceId, + spanId: "b7ad6b7169203331", + traceFlags: TraceFlags.SAMPLED, + isRemote: true, + }); + + const result = sampler.shouldSample( + parentContext, + traceId, + spanName, + spanKind, + attributes, + links, + ); + + expect(result.decision).toBe(SamplingDecision.RECORD_AND_SAMPLED); + expect(result.attributes?.[SALEOR_SAMPLING_DECISION_ATTR]).toBe("sampled"); + }); + }); + + describe("when parent is not sampled", () => { + it("should return RECORD decision (defer to TailSamplingProcessor)", () => { + const sampler = new DeferredSampler(); + const parentContext = trace.setSpanContext(ROOT_CONTEXT, { + traceId, + spanId: "b7ad6b7169203331", + traceFlags: TraceFlags.NONE, + isRemote: true, + }); + + const result = sampler.shouldSample( + parentContext, + traceId, + spanName, + spanKind, + attributes, + links, + ); + + expect(result.decision).toBe(SamplingDecision.RECORD); + expect(result.attributes?.[SALEOR_SAMPLING_DECISION_ATTR]).toBe("not_sampled"); + }); + }); + + describe("when there is no parent (root span)", () => { + it("should return RECORD decision (defer to TailSamplingProcessor)", () => { + const sampler = new DeferredSampler(); + + const result = sampler.shouldSample( + ROOT_CONTEXT, + traceId, + spanName, + spanKind, + attributes, + links, + ); + + expect(result.decision).toBe(SamplingDecision.RECORD); + expect(result.attributes?.[SALEOR_SAMPLING_DECISION_ATTR]).toBe("none"); + }); + }); +}); diff --git a/packages/otel/src/deferred-sampler.ts b/packages/otel/src/deferred-sampler.ts new file mode 100644 index 000000000..ae845c028 --- /dev/null +++ b/packages/otel/src/deferred-sampler.ts @@ -0,0 +1,66 @@ +import { Attributes, Context, Link, SpanKind, trace, TraceFlags } from "@opentelemetry/api"; +import { Sampler, SamplingDecision, SamplingResult } from "@opentelemetry/sdk-trace-node"; + +/** + * Attribute key to store Saleor's original sampling decision. + * Used by TailSamplingProcessor to know if Saleor wanted this trace. + */ +export const SALEOR_SAMPLING_DECISION_ATTR = "saleor.sampling.decision"; + +/** + * Check if SAMPLED flag is set in traceFlags bitmask. + * Uses bitwise AND operation according to OTEL spec + */ +function isSampled(traceFlags: number): boolean { + return (traceFlags & TraceFlags.SAMPLED) !== 0; +} + +/** + * A sampler that defers the final sampling decision to span end. + * + * - When parent is SAMPLED → return RECORD_AND_SAMPLED (respects parent) + * - When parent is NOT SAMPLED or NO parent → return RECORD (defer decision to TailSamplingProcessor) + * + * This allows the TailSamplingProcessor to make the final decision + * based on error status or latency at span end. + * Without setting `RECORD` we wouldn't store any span data during runtime + */ +export class DeferredSampler implements Sampler { + // eslint-disable-next-line @typescript-eslint/max-params -- Required by OpenTelemetry Sampler interface + shouldSample( + context: Context, + _traceId: string, + _spanName: string, + _spanKind: SpanKind, + _attributes: Attributes, + _links: Link[], + ): SamplingResult { + const parentSpanContext = trace.getSpanContext(context); + const parentSampled = parentSpanContext && isSampled(parentSpanContext.traceFlags); + + if (parentSampled) { + // Parent decided to sample - we MUST sample too + return { + decision: SamplingDecision.RECORD_AND_SAMPLED, + attributes: { + [SALEOR_SAMPLING_DECISION_ATTR]: "sampled", + }, + }; + } + + /* + * No parent OR parent decided NOT to sample. + * Record span but defer export decision to TailSamplingProcessor. + */ + return { + decision: SamplingDecision.RECORD, + attributes: { + [SALEOR_SAMPLING_DECISION_ATTR]: parentSpanContext ? "not_sampled" : "none", + }, + }; + } + + toString(): string { + return "DeferredSampler"; + } +} diff --git a/packages/otel/src/fetch-instrumentation.ts b/packages/otel/src/fetch-instrumentation.ts new file mode 100644 index 000000000..684f86f85 --- /dev/null +++ b/packages/otel/src/fetch-instrumentation.ts @@ -0,0 +1,528 @@ +/* eslint-disable turbo/no-undeclared-env-vars */ +/** + * Fetch instrumentation for tail sampling support. + * + * SOURCE: @vercel/otel v1.10.1 (commit a4a8662) + * REPO: https://github.com/vercel/otel + * + * WHY THIS FILE EXISTS: + * @vercel/otel's FetchInstrumentation has a hardcoded check that skips recording + * for non-sampled spans. For tail sampling to work, we need to record span data + * (including errors) even when the span wasn't initially sampled, so that + * TailSamplingProcessor can decide to promote error spans at span end. + * + * MODIFICATIONS (search for "SALEOR MODIFICATION"): + * 1. Renamed class to TailSamplingFetchInstrumentation + * 2. Line ~291: Removed isSampled() check from early return + * 3. Line ~296: Added isSampled() check to shouldPropagate condition + * 4. Inlined utility functions at bottom (isSampled, resolveTemplate, getVercelRequestContext) + * + * We evaluated wrapper/proxy and monkey-patching approaches but they don't work: + * + * | Approach | Why it doesn't work | + * |-----------------------|-------------------------------------------------------------| + * | Wrapper pattern | Span lifecycle is inside closure - can't intercept | + * | Monkey-patch isSampled| It's an imported module binding - can't patch after import | + * | Proxy TracerProvider | Would enable propagation for ALL spans (we want selective) | + * | SpanProcessor post-fix| Span ends with NO attributes when non-sampled | + * + * The fundamental issue: we need recording=YES for all spans (tail sampling) + * but propagation=ONLY for truly sampled spans. The original code ties both + * behaviors to the same isSampled() check, requiring code modification. + * + * ## What's copied from @vercel/otel + * + * Code is copied from @vercel/otel v1.10.1 FetchInstrumentation.enable(). + * Source: https://github.com/vercel/otel/blob/v1.10.1/packages/otel/src/instrumentations/fetch.ts + * + * Some comments to disable eslint errors were added. + * Actual code modifications are marked with "SALEOR MODIFICATION START/END" comments. + * WHEN TO UPDATE: + * If @vercel/otel updates FetchInstrumentation, copy the new file and re-apply modifications. + */ +import type { Attributes, Span, TextMapSetter, TracerProvider } from "@opentelemetry/api"; +import { + context, + propagation, + SpanKind, + SpanStatusCode, + trace as traceApi, + TraceFlags, +} from "@opentelemetry/api"; +import type { Instrumentation, InstrumentationConfig } from "@opentelemetry/instrumentation"; +import { SemanticAttributes } from "@opentelemetry/semantic-conventions"; + +/** + * Configuration for the "fetch" instrumentation. + * + * Some of this configuration can be overriden on a per-fetch call basis by + * using the `opentelemetry` property in the `RequestInit` object (requires Next 14.1.1 or above). + * This property can include: + * - `ignore`: boolean - whether to ignore the fetch call from tracing. Overrides + * `ignoreUrls`. + * - `propagateContext: boolean`: overrides `propagateContextUrls` for this call. + * - `spanName: string`: overrides the computed span name for this call. + * - `attributes: Attributes`: overrides the computed attributes for this call. + */ +export interface FetchInstrumentationConfig extends InstrumentationConfig { + /** + * A set of URL matchers (string prefix or regex) that should be ignored from tracing. + * By default all URLs are traced. + * Can be overriden by the `opentelemetry.ignore` property in the `RequestInit` object. + * + * Example: `fetch: { ignoreUrls: [/example.com/] }`. + */ + ignoreUrls?: (string | RegExp)[]; + + /** + * A set of URL matchers (string prefix or regex) for which the tracing context + * should be propagated (see [`propagators`](Configuration#propagators)). + * By default the context is propagated _only_ for the + * [deployment URLs](https://vercel.com/docs/deployments/generated-urls), all + * other URLs should be enabled explicitly. + * Can be overriden by the `opentelemetry.propagateContext` property in the `RequestInit` object. + * + * Example: `fetch: { propagateContextUrls: [ /my.api/ ] }`. + */ + propagateContextUrls?: (string | RegExp)[]; + + /** + * A set of URL matchers (string prefix or regex) for which the tracing context + * should not be propagated (see [`propagators`](Configuration#propagators)). This allows you to exclude a + * subset of URLs allowed by the [`propagateContextUrls`](FetchInstrumentationConfig#propagateContextUrls). + * Can be overriden by the `opentelemetry.propagateContext` property in the `RequestInit` object. + */ + dontPropagateContextUrls?: (string | RegExp)[]; + + /** + * A string for the "resource.name" attribute that can include attribute expressions in `{}`. + * Can be overriden by the `opentelemetry.attributes` property in the `RequestInit` object. + * + * Example: `fetch: { resourceNameTemplate: "{http.host}" }`. + */ + resourceNameTemplate?: string; + + /** + * A map of attributes that should be created from the request headers. The keys of the map are + * attribute names and the values are request header names. If a resonse header doesn't exist, no + * attribute will be created for it. + * + * Example: `fetch: { attributesFromRequestHeaders: { "attr1": "X-Attr" } }` + */ + attributesFromRequestHeaders?: Record; + + /** + * A map of attributes that should be created from the response headers. The keys of the map are + * attribute names and the values are response header names. If a resonse header doesn't exist, no + * attribute will be created for it. + * + * Example: `fetch: { attributesFromResponseHeaders: { "attr1": "X-Attr" } }` + */ + attributesFromResponseHeaders?: Record; +} + +declare global { + interface RequestInit { + opentelemetry?: { + ignore?: boolean; + propagateContext?: boolean; + spanName?: string; + attributes?: Attributes; + }; + } +} + +type InternalRequestInit = RequestInit & { + next?: { + internal: boolean; + }; +}; + +// SALEOR MODIFICATION: Renamed class from FetchInstrumentation +export class TailSamplingFetchInstrumentation implements Instrumentation { + instrumentationName = "@saleor/otel/fetch"; + instrumentationVersion = "1.0.0"; + /** @internal */ + private config: FetchInstrumentationConfig; + /** @internal */ + private originalFetch: typeof fetch | undefined; + /** @internal */ + private tracerProvider: TracerProvider | undefined; + + constructor(config: FetchInstrumentationConfig = {}) { + this.config = config; + } + + getConfig(): FetchInstrumentationConfig { + return this.config; + } + + setConfig(): void { + // Nothing. + } + + setTracerProvider(tracerProvider: TracerProvider): void { + this.tracerProvider = tracerProvider; + } + + setMeterProvider(): void { + // Nothing. + } + + public enable(): void { + this.disable(); + + const { tracerProvider } = this; + + if (!tracerProvider) { + return; + } + + const tracer = tracerProvider.getTracer(this.instrumentationName, this.instrumentationVersion); + + const ignoreUrls = this.config.ignoreUrls ?? []; + + const shouldIgnore = (url: URL, init: InternalRequestInit | undefined): boolean => { + if (init?.opentelemetry?.ignore !== undefined) { + return init.opentelemetry.ignore; + } + if (ignoreUrls.length === 0) { + return false; + } + const urlString = url.toString(); + + return ignoreUrls.some((match) => { + if (typeof match === "string") { + if (match === "*") { + return true; + } + + return urlString.startsWith(match); + } + + return match.test(urlString); + }); + }; + + const host = process.env.VERCEL_URL || process.env.NEXT_PUBLIC_VERCEL_URL || null; + const branchHost = + process.env.VERCEL_BRANCH_URL || process.env.NEXT_PUBLIC_VERCEL_BRANCH_URL || null; + const propagateContextUrls = this.config.propagateContextUrls ?? []; + const dontPropagateContextUrls = this.config.dontPropagateContextUrls ?? []; + const resourceNameTemplate = this.config.resourceNameTemplate; + const { attributesFromRequestHeaders, attributesFromResponseHeaders } = this.config; + + const shouldPropagate = (url: URL, init: InternalRequestInit | undefined): boolean => { + if (init?.opentelemetry?.propagateContext) { + return init.opentelemetry.propagateContext; + } + const urlString = url.toString(); + + if ( + dontPropagateContextUrls.length > 0 && + dontPropagateContextUrls.some((match) => { + if (typeof match === "string") { + if (match === "*") { + return true; + } + + return urlString.startsWith(match); + } + + return match.test(urlString); + }) + ) { + return false; + } + // Allow same origin. + if ( + host && + url.protocol === "https:" && + (url.host === host || + url.host === branchHost || + url.host === getVercelRequestContext()?.headers.host) + ) { + return true; + } + // Allow localhost for testing in a dev mode. + if (!host && url.protocol === "http:" && url.hostname === "localhost") { + return true; + } + + return propagateContextUrls.some((match) => { + if (typeof match === "string") { + if (match === "*") { + return true; + } + + return urlString.startsWith(match); + } + + return match.test(urlString); + }); + }; + + // Disable fetch tracing in Next.js. + process.env.NEXT_OTEL_FETCH_DISABLED = "1"; + + const originalFetch = globalThis.fetch; + + this.originalFetch = originalFetch; + + const doFetch: typeof fetch = async (input, initArg) => { + const init = initArg as InternalRequestInit | undefined; + + // Passthrough internal requests. + if (init?.next?.internal) { + return originalFetch(input, init); + } + + const req = new Request( + /* + * The input Request must be cloned to avoid the bug + * on Edge runtime where the `new Request()` eagerly + * consumes the body of the original Request. + */ + input instanceof Request ? input.clone() : input, + init, + ); + const url = new URL(req.url); + + if (shouldIgnore(url, init)) { + return originalFetch(input, init); + } + + const attrs = { + [SemanticAttributes.HTTP_METHOD]: req.method, + [SemanticAttributes.HTTP_URL]: req.url, + [SemanticAttributes.HTTP_HOST]: url.host, + [SemanticAttributes.HTTP_SCHEME]: url.protocol.replace(":", ""), + [SemanticAttributes.NET_PEER_NAME]: url.hostname, + [SemanticAttributes.NET_PEER_PORT]: url.port, + }; + const resourceName = resourceNameTemplate + ? resolveTemplate(resourceNameTemplate, attrs) + : removeSearch(req.url); + + const spanName = init?.opentelemetry?.spanName ?? `fetch ${req.method} ${req.url}`; + + const parentContext = context.active(); + + const span = tracer.startSpan( + spanName, + { + kind: SpanKind.CLIENT, + attributes: { + ...attrs, + "operation.name": `fetch.${req.method}`, + "resource.name": resourceName, + ...init?.opentelemetry?.attributes, + }, + }, + parentContext, + ); + + /* + * SALEOR MODIFICATION: Removed isSampled() check from this condition. + * Original: if (!span.isRecording() || !isSampled(span.spanContext().traceFlags)) + * This allows non-sampled spans to continue recording for tail sampling. + */ + if (!span.isRecording()) { + span.end(); + + return originalFetch(input, init); + } + + /* + * SALEOR MODIFICATION: Added isSampled() check to propagation condition. + * Original: if (shouldPropagate(url, init)) + * Only propagate trace context for sampled spans, not deferred ones. + */ + if (shouldPropagate(url, init) && isSampled(span.spanContext().traceFlags)) { + const fetchContext = traceApi.setSpan(parentContext, span); + + propagation.inject(fetchContext, req.headers, HEADERS_SETTER); + } + + if (attributesFromRequestHeaders) { + headersToAttributes(span, attributesFromRequestHeaders, req.headers); + } + + try { + const startTime = Date.now(); + + /* + * Remove "content-type" for a FormData body because undici regenerates + * a new multipart separator each time. + */ + if (init?.body && init.body instanceof FormData) { + req.headers.delete("content-type"); + } + const res = await originalFetch(input, { + ...init, + headers: req.headers, + }); + const duration = Date.now() - startTime; + + span.setAttribute(SemanticAttributes.HTTP_STATUS_CODE, res.status); + span.setAttribute("http.response_time", duration); + if (attributesFromResponseHeaders) { + headersToAttributes(span, attributesFromResponseHeaders, res.headers); + } + if (res.status >= 500) { + onError(span, `Status: ${res.status} (${res.statusText})`); + } + + // Flush body, but non-blocking. + if (res.body) { + void pipeResponse(res).then( + (byteLength) => { + if (span.isRecording()) { + span.setAttribute( + SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH_UNCOMPRESSED, + byteLength, + ); + span.end(); + } + }, + (err) => { + if (span.isRecording()) { + onError(span, err); + span.end(); + } + }, + ); + } else { + span.end(); + } + + return res; + } catch (e) { + onError(span, e); + span.end(); + throw e; + } + }; + + globalThis.fetch = doFetch; + } + + public disable(): void { + if (this.originalFetch) { + globalThis.fetch = this.originalFetch; + } + } +} + +const HEADERS_SETTER: TextMapSetter = { + set(carrier: Headers, key: string, value: string): void { + carrier.set(key, value); + }, +}; + +function removeSearch(url: string): string { + const index = url.indexOf("?"); + + return index === -1 ? url : url.substring(0, index); +} + +function pipeResponse(res: Response): Promise { + let length = 0; + const clone = res.clone(); + const reader = clone.body?.getReader(); + + if (!reader) { + return Promise.resolve(0); + } + const read = (): Promise => { + return reader.read().then(({ done, value }) => { + if (done) { + return; + } + length += value.byteLength; + + return read(); + }); + }; + + return read().then(() => length); +} + +function onError(span: Span, err: unknown): void { + if (err instanceof Error) { + span.recordException(err); + span.setStatus({ + code: SpanStatusCode.ERROR, + message: err.message, + }); + } else { + const message = String(err); + + span.setStatus({ + code: SpanStatusCode.ERROR, + message, + }); + } +} + +function headersToAttributes( + span: Span, + attrsToHeadersMap: Record, + headers: Headers, +): void { + for (const [attrName, headerName] of Object.entries(attrsToHeadersMap)) { + const headerValue = headers.get(headerName); + + if (headerValue !== null) { + span.setAttribute(attrName, headerValue); + } + } +} + +/* + * ============================================================================= + * SALEOR MODIFICATION: Inlined utilities from @vercel/otel internal modules + * ============================================================================= + */ + +// From @vercel/otel v1.10.1 util/sampled.ts +function isSampled(traceFlags: number): boolean { + return (traceFlags & TraceFlags.SAMPLED) !== 0; +} + +// From @vercel/otel v1.10.1 util/template.ts +function resolveTemplate(template: string, attrs: Attributes): string { + return template.replace(/\{(?[^{}]+)\}/g, (match, key) => { + const value = attrs[key as string]; + + if (value !== undefined) { + return String(value); + } + + return match; + }); +} + +// From @vercel/otel v1.10.1 vercel-request-context/api.ts +interface VercelRequestContext { + waitUntil: (promiseOrFunc: Promise | (() => Promise)) => void; + headers: Record; + url: string; + [key: symbol]: unknown; +} + +interface Reader { + get: () => VercelRequestContext | undefined; +} + +const symbol = Symbol.for("@vercel/request-context"); + +interface GlobalWithReader { + [symbol]?: Reader; +} + +function getVercelRequestContext(): VercelRequestContext | undefined { + const reader = (globalThis as GlobalWithReader)[symbol]; + + return reader?.get(); +} diff --git a/packages/otel/src/http-instrumentation-factory.ts b/packages/otel/src/http-instrumentation-factory.ts index 29154afc7..9c8f62fa7 100644 --- a/packages/otel/src/http-instrumentation-factory.ts +++ b/packages/otel/src/http-instrumentation-factory.ts @@ -1,8 +1,19 @@ import { HttpInstrumentation } from "@opentelemetry/instrumentation-http"; -export const createHttpInstrumentation = () => { +interface HttpInstrumentationFactoryConfig { + /** + * Set to true when using DeferredSampler + TailSamplingProcessor. + * This allows root spans (requests without traceparent set by Saleor and others) + * to be sampled if app decides to sample. + */ + usingDeferredSpanProcessor?: boolean; +} + +export const createHttpInstrumentation = (config?: HttpInstrumentationFactoryConfig) => { + const { usingDeferredSpanProcessor = false } = config ?? {}; + return new HttpInstrumentation({ - requireParentforIncomingSpans: true, + requireParentforIncomingSpans: !usingDeferredSpanProcessor, requireParentforOutgoingSpans: true, ignoreOutgoingRequestHook: (request) => request.hostname?.includes("ingest.sentry.io") ?? false, }); diff --git a/packages/otel/src/tail-sampling-processor.test.ts b/packages/otel/src/tail-sampling-processor.test.ts new file mode 100644 index 000000000..e94d34fc6 --- /dev/null +++ b/packages/otel/src/tail-sampling-processor.test.ts @@ -0,0 +1,364 @@ +import { ROOT_CONTEXT, SpanKind, SpanStatusCode, TraceFlags } from "@opentelemetry/api"; +import { ReadableSpan, Span, SpanProcessor } from "@opentelemetry/sdk-trace-node"; +import { ATTR_HTTP_RESPONSE_STATUS_CODE } from "@opentelemetry/semantic-conventions"; +import { beforeEach, describe, expect, it, vi } from "vitest"; + +import { TAIL_SAMPLING_PROMOTED_ATTR, TailSamplingProcessor } from "./tail-sampling-processor"; + +// Helper to create a mock ReadableSpan +const createMockSpan = (options: { + traceFlags?: number; + statusCode?: SpanStatusCode; + events?: Array<{ name: string }>; + attributes?: Record; + durationMs?: number; + kind?: SpanKind; + instrumentationLibraryName?: string; +}): ReadableSpan => { + const { + traceFlags = TraceFlags.NONE, + statusCode = SpanStatusCode.UNSET, + events = [], + attributes = {}, + durationMs = 100, + kind = SpanKind.INTERNAL, + instrumentationLibraryName = "test", + } = options; + + const startTime: [number, number] = [Math.floor(Date.now() / 1000), 0]; + const durationSeconds = Math.floor(durationMs / 1000); + const durationNanos = (durationMs % 1000) * 1e6; + const endTime: [number, number] = [startTime[0] + durationSeconds, durationNanos]; + + return { + spanContext: () => ({ + traceId: "0af7651916cd43dd8448eb211c80319c", + spanId: "b7ad6b7169203331", + traceFlags, + isRemote: false, + }), + status: { code: statusCode }, + events, + attributes, + startTime, + endTime, + name: "test-span", + kind, + parentSpanId: undefined, + duration: [0, durationMs * 1e6], + ended: true, + resource: { attributes: {} }, + instrumentationLibrary: { name: instrumentationLibraryName }, + links: [], + droppedAttributesCount: 0, + droppedEventsCount: 0, + droppedLinksCount: 0, + } as unknown as ReadableSpan; +}; + +// Helper to create a mock SpanProcessor +const createMockProcessor = () => { + return { + onStart: vi.fn(), + onEnd: vi.fn(), + forceFlush: vi.fn().mockResolvedValue(undefined), + shutdown: vi.fn().mockResolvedValue(undefined), + } satisfies SpanProcessor; +}; + +describe("TailSamplingProcessor", () => { + let mockProcessor: ReturnType; + let tailSamplingProcessor: TailSamplingProcessor; + + beforeEach(() => { + mockProcessor = createMockProcessor(); + tailSamplingProcessor = new TailSamplingProcessor({ + processor: mockProcessor, + slowThresholdMs: 5000, + exportErrors: true, + exportSlowSpans: true, + }); + }); + + describe("onStart", () => { + it("should pass through to downstream processor", () => { + const mockSpan = {} as Span; + + tailSamplingProcessor.onStart(mockSpan, ROOT_CONTEXT); + + expect(mockProcessor.onStart).toHaveBeenCalledWith(mockSpan, ROOT_CONTEXT); + }); + }); + + describe("onEnd with already sampled spans", () => { + it("should pass through sampled spans unchanged", () => { + const span = createMockSpan({ traceFlags: TraceFlags.SAMPLED }); + + tailSamplingProcessor.onEnd(span); + + expect(mockProcessor.onEnd).toHaveBeenCalledWith(span); + }); + }); + + describe("onEnd with non-sampled spans", () => { + describe("error detection", () => { + it("should export span with ERROR status", () => { + const span = createMockSpan({ + traceFlags: TraceFlags.NONE, + statusCode: SpanStatusCode.ERROR, + }); + + tailSamplingProcessor.onEnd(span); + + expect(mockProcessor.onEnd).toHaveBeenCalled(); + const exportedSpan = mockProcessor.onEnd.mock.calls[0][0]; + + expect(exportedSpan.spanContext().traceFlags & TraceFlags.SAMPLED).toBeTruthy(); + expect(exportedSpan.attributes[TAIL_SAMPLING_PROMOTED_ATTR]).toBe(true); + }); + + it("should export span with exception event", () => { + const span = createMockSpan({ + traceFlags: TraceFlags.NONE, + events: [{ name: "exception" }], + }); + + tailSamplingProcessor.onEnd(span); + + expect(mockProcessor.onEnd).toHaveBeenCalled(); + const exportedSpan = mockProcessor.onEnd.mock.calls[0][0]; + + expect(exportedSpan.spanContext().traceFlags & TraceFlags.SAMPLED).toBeTruthy(); + }); + + it("should export span with HTTP 500+ status code", () => { + const span = createMockSpan({ + traceFlags: TraceFlags.NONE, + attributes: { [ATTR_HTTP_RESPONSE_STATUS_CODE]: 500 }, + }); + + tailSamplingProcessor.onEnd(span); + + expect(mockProcessor.onEnd).toHaveBeenCalled(); + const exportedSpan = mockProcessor.onEnd.mock.calls[0][0]; + + expect(exportedSpan.spanContext().traceFlags & TraceFlags.SAMPLED).toBeTruthy(); + }); + + it("should not export span with HTTP 4xx status code", () => { + const span = createMockSpan({ + traceFlags: TraceFlags.NONE, + attributes: { [ATTR_HTTP_RESPONSE_STATUS_CODE]: 404 }, + }); + + tailSamplingProcessor.onEnd(span); + + expect(mockProcessor.onEnd).not.toHaveBeenCalled(); + }); + }); + + describe("slow span detection", () => { + it("should export spans slower than threshold", () => { + const span = createMockSpan({ + traceFlags: TraceFlags.NONE, + durationMs: 6000, // 6 seconds, above 5s threshold + }); + + tailSamplingProcessor.onEnd(span); + + expect(mockProcessor.onEnd).toHaveBeenCalled(); + const exportedSpan = mockProcessor.onEnd.mock.calls[0][0]; + + expect(exportedSpan.spanContext().traceFlags & TraceFlags.SAMPLED).toBeTruthy(); + }); + + it("should not export fast spans without errors", () => { + const span = createMockSpan({ + traceFlags: TraceFlags.NONE, + durationMs: 100, // 100ms, well below threshold + }); + + tailSamplingProcessor.onEnd(span); + + expect(mockProcessor.onEnd).not.toHaveBeenCalled(); + }); + }); + + describe("normal spans", () => { + it("should drop non-sampled spans without errors or slow response", () => { + const span = createMockSpan({ + traceFlags: TraceFlags.NONE, + statusCode: SpanStatusCode.OK, + durationMs: 100, + }); + + tailSamplingProcessor.onEnd(span); + + expect(mockProcessor.onEnd).not.toHaveBeenCalled(); + }); + }); + }); + + describe("configuration options", () => { + it("should respect exportErrors=false", () => { + const processor = new TailSamplingProcessor({ + processor: mockProcessor, + exportErrors: false, + exportSlowSpans: false, + }); + + const span = createMockSpan({ + traceFlags: TraceFlags.NONE, + statusCode: SpanStatusCode.ERROR, + }); + + processor.onEnd(span); + + expect(mockProcessor.onEnd).not.toHaveBeenCalled(); + }); + + it("should respect exportSlowSpans=false", () => { + const processor = new TailSamplingProcessor({ + processor: mockProcessor, + exportErrors: false, + exportSlowSpans: false, + slowThresholdMs: 5000, + }); + + const span = createMockSpan({ + traceFlags: TraceFlags.NONE, + durationMs: 6000, + }); + + processor.onEnd(span); + + expect(mockProcessor.onEnd).not.toHaveBeenCalled(); + }); + + it("should use custom slowThresholdMs", () => { + const processor = new TailSamplingProcessor({ + processor: mockProcessor, + slowThresholdMs: 1000, // 1 second + }); + + const span = createMockSpan({ + traceFlags: TraceFlags.NONE, + durationMs: 1500, // 1.5 seconds + }); + + processor.onEnd(span); + + expect(mockProcessor.onEnd).toHaveBeenCalled(); + }); + }); + + describe("forceFlush", () => { + it("should delegate to downstream processor", async () => { + await tailSamplingProcessor.forceFlush(); + + expect(mockProcessor.forceFlush).toHaveBeenCalled(); + }); + }); + + describe("shutdown", () => { + it("should delegate to downstream processor", async () => { + await tailSamplingProcessor.shutdown(); + + expect(mockProcessor.shutdown).toHaveBeenCalled(); + }); + }); + + describe("resource attributes on promoted spans", () => { + it("should add operation.name and resource.name for HTTP server spans", () => { + const span = createMockSpan({ + traceFlags: TraceFlags.NONE, + statusCode: SpanStatusCode.ERROR, + kind: SpanKind.SERVER, + attributes: { + "http.method": "POST", + "http.route": "/api/webhooks", + }, + instrumentationLibraryName: "@opentelemetry/instrumentation-http", + }); + + tailSamplingProcessor.onEnd(span); + + expect(mockProcessor.onEnd).toHaveBeenCalled(); + const exportedSpan = mockProcessor.onEnd.mock.calls[0][0]; + + expect(exportedSpan.attributes["operation.name"]).toBe("web.request"); + expect(exportedSpan.attributes["resource.name"]).toBe("POST /api/webhooks"); + }); + + it("should add operation.name with library and kind for non-HTTP spans", () => { + const span = createMockSpan({ + traceFlags: TraceFlags.NONE, + statusCode: SpanStatusCode.ERROR, + kind: SpanKind.CLIENT, + instrumentationLibraryName: "@opentelemetry/instrumentation-http", + }); + + tailSamplingProcessor.onEnd(span); + + expect(mockProcessor.onEnd).toHaveBeenCalled(); + const exportedSpan = mockProcessor.onEnd.mock.calls[0][0]; + + expect(exportedSpan.attributes["operation.name"]).toBe( + "opentelemetry_instrumentation-http.client", + ); + }); + + it("should clean library name by removing @ and replacing . / with underscores", () => { + const span = createMockSpan({ + traceFlags: TraceFlags.NONE, + statusCode: SpanStatusCode.ERROR, + kind: SpanKind.SERVER, + instrumentationLibraryName: "@saleor/apps-otel/fetch", + }); + + tailSamplingProcessor.onEnd(span); + + expect(mockProcessor.onEnd).toHaveBeenCalled(); + const exportedSpan = mockProcessor.onEnd.mock.calls[0][0]; + + expect(exportedSpan.attributes["operation.name"]).toBe("saleor_apps-otel_fetch.server"); + }); + + it("should not override existing operation.name attribute", () => { + const span = createMockSpan({ + traceFlags: TraceFlags.NONE, + statusCode: SpanStatusCode.ERROR, + kind: SpanKind.SERVER, + attributes: { + "operation.name": "custom.operation", + "http.method": "POST", + "http.route": "/api/webhooks", + }, + }); + + tailSamplingProcessor.onEnd(span); + + expect(mockProcessor.onEnd).toHaveBeenCalled(); + const exportedSpan = mockProcessor.onEnd.mock.calls[0][0]; + + expect(exportedSpan.attributes["operation.name"]).toBe("custom.operation"); + expect(exportedSpan.attributes["resource.name"]).toBeUndefined(); + }); + + it("should handle INTERNAL spans with just library name", () => { + const span = createMockSpan({ + traceFlags: TraceFlags.NONE, + statusCode: SpanStatusCode.ERROR, + kind: SpanKind.INTERNAL, + instrumentationLibraryName: "my-app", + }); + + tailSamplingProcessor.onEnd(span); + + expect(mockProcessor.onEnd).toHaveBeenCalled(); + const exportedSpan = mockProcessor.onEnd.mock.calls[0][0]; + + expect(exportedSpan.attributes["operation.name"]).toBe("my-app"); + }); + }); +}); diff --git a/packages/otel/src/tail-sampling-processor.ts b/packages/otel/src/tail-sampling-processor.ts new file mode 100644 index 000000000..f9e2695b1 --- /dev/null +++ b/packages/otel/src/tail-sampling-processor.ts @@ -0,0 +1,190 @@ +import { Context, SpanStatusCode, TraceFlags } from "@opentelemetry/api"; +import { ReadableSpan, Span, SpanProcessor } from "@opentelemetry/sdk-trace-node"; +import { ATTR_HTTP_RESPONSE_STATUS_CODE } from "@opentelemetry/semantic-conventions"; + +import { computeVercelResourceAttributes } from "./vercel-resource-attributes"; + +export interface TailSamplingProcessorConfig { + /** + * The downstream processor to send sampled spans to. + * Typically a BatchSpanProcessor. + */ + processor: SpanProcessor; + + /** + * Latency threshold in milliseconds. Spans slower than this are always exported. + * @default 5000 (5 seconds) + */ + slowThresholdMs?: number; + + /** + * Whether to always export error spans. + * @default true + */ + exportErrors?: boolean; + + /** + * Whether to always export slow spans. + * @default true + */ + exportSlowSpans?: boolean; +} + +/** + * Attribute to mark spans that were promoted by tail sampling. + * Used by OTEL collector configuration to propagate spans manually marked as promoted by the app. + */ +export const TAIL_SAMPLING_PROMOTED_ATTR = "sampling.promoted_by_tail_sampler"; + +/** + * A SpanProcessor that implements SDK-level tail sampling. + * + * For spans that were recorded but not sampled (RECORD decision), + * this processor checks at span end if the span should be exported + * based on error status or latency. + * + * If the span should be exported, it modifies the span's traceFlags + * to include the SAMPLED flag before passing to the downstream processor. + * + * For spans that were already sampled it doesn't change them + */ +export class TailSamplingProcessor implements SpanProcessor { + private readonly processor: SpanProcessor; + private readonly slowThresholdMs: number; + private readonly exportErrors: boolean; + private readonly exportSlowSpans: boolean; + + constructor(config: TailSamplingProcessorConfig) { + this.processor = config.processor; + this.slowThresholdMs = config.slowThresholdMs ?? 5000; + this.exportErrors = config.exportErrors ?? true; + this.exportSlowSpans = config.exportSlowSpans ?? true; + } + + onStart(span: Span, parentContext: Context): void { + // Pass through to downstream processor + this.processor.onStart(span, parentContext); + } + + onEnd(span: ReadableSpan): void { + const spanContext = span.spanContext(); + const isAlreadySampled = (spanContext.traceFlags & TraceFlags.SAMPLED) !== 0; + + // If already sampled, pass through directly + if (isAlreadySampled) { + this.processor.onEnd(span); + + return; + } + + // Span was recorded but not sampled - check if we should export it + const shouldExport = this.shouldExport(span); + + if (shouldExport) { + // Promote span to SAMPLED by creating a modified version + const sampledSpan = this.promoteToSampled(span); + + this.processor.onEnd(sampledSpan); + } + // If not shouldExport, span is dropped (not passed to downstream) + } + + forceFlush(): Promise { + return this.processor.forceFlush(); + } + + shutdown(): Promise { + return this.processor.shutdown(); + } + + /** + * Determine if a non-sampled span should be exported. + */ + private shouldExport(span: ReadableSpan): boolean { + // Check for errors + if (this.exportErrors && this.isError(span)) { + return true; + } + + // Check for slow spans + if (this.exportSlowSpans && this.isSlow(span)) { + return true; + } + + return false; + } + + private isError(span: ReadableSpan): boolean { + // Check span status + if (span.status.code === SpanStatusCode.ERROR) { + return true; + } + + // Check for recorded exceptions + if (span.events.some((event) => event.name === "exception")) { + return true; + } + + // Check HTTP status code attribute (using semantic convention) + const httpStatusCode = span.attributes[ATTR_HTTP_RESPONSE_STATUS_CODE]; + + if (typeof httpStatusCode === "number" && httpStatusCode >= 500) { + return true; + } + + return false; + } + + private isSlow(span: ReadableSpan): boolean { + /* + * Calculate duration in milliseconds + * span.startTime and span.endTime are [seconds, nanoseconds] tuples + */ + const startMs = span.startTime[0] * 1000 + span.startTime[1] / 1e6; + const endMs = span.endTime[0] * 1000 + span.endTime[1] / 1e6; + const durationMs = endMs - startMs; + + return durationMs >= this.slowThresholdMs; + } + + /** + * Create a modified span with SAMPLED flag set. + * + * Since spanContext is supposed to be immutable, we use Object.create() + * to create a new object that inherits from the original but overrides + * the spanContext method using Object.defineProperty. + */ + private promoteToSampled(span: ReadableSpan): ReadableSpan { + const originalContext = span.spanContext(); + + // Create modified spanContext with SAMPLED flag + const sampledContext = { + ...originalContext, + traceFlags: originalContext.traceFlags | TraceFlags.SAMPLED, + }; + + // Create a new object that inherits from span but overrides spanContext + const sampledSpan = Object.create(span) as ReadableSpan; + + // Override spanContext method using Object.defineProperty + Object.defineProperty(sampledSpan, "spanContext", { + value: () => sampledContext, + enumerable: true, + }); + + // Build new attributes with promotion marker + Vercel/DataDog resource attributes + const newAttributes = { + ...span.attributes, + [TAIL_SAMPLING_PROMOTED_ATTR]: true, + ...computeVercelResourceAttributes(span), + }; + + // Override attributes getter + Object.defineProperty(sampledSpan, "attributes", { + value: newAttributes, + enumerable: true, + }); + + return sampledSpan; + } +} diff --git a/packages/otel/src/trace-buffering-tail-sampling-processor.test.ts b/packages/otel/src/trace-buffering-tail-sampling-processor.test.ts new file mode 100644 index 000000000..6c72d25af --- /dev/null +++ b/packages/otel/src/trace-buffering-tail-sampling-processor.test.ts @@ -0,0 +1,674 @@ +import { ROOT_CONTEXT, SpanKind, SpanStatusCode, TraceFlags } from "@opentelemetry/api"; +import { ReadableSpan, Span, SpanProcessor } from "@opentelemetry/sdk-trace-node"; +import { ATTR_HTTP_RESPONSE_STATUS_CODE } from "@opentelemetry/semantic-conventions"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; + +import { + TAIL_SAMPLING_PROMOTED_ATTR, + TraceBufferingTailSamplingProcessor, +} from "./trace-buffering-tail-sampling-processor"; + +// Mock Vercel request context +const mockWaitUntil = vi.fn(); +const mockVercelRequestContext = { + waitUntil: mockWaitUntil, + headers: { + "x-vercel-id": "test-request-id", + "x-matched-path": "/api/test", + host: "example.com", + }, + url: "https://example.com/api/test", +}; + +vi.mock("./vercel-request-context", () => ({ + getVercelRequestContext: vi.fn(() => mockVercelRequestContext), + getVercelRequestContextAttributes: vi.fn(() => ({ + "vercel.request_id": "test-request-id", + "vercel.matched_path": "/api/test", + "http.host": "example.com", + })), +})); + +// Helper to create a mock Span (for onStart) +const createMockSpan = (options: { + traceId?: string; + spanId?: string; + traceFlags?: number; +}): Span => { + const { traceId = "trace-1", spanId = "span-1", traceFlags = TraceFlags.NONE } = options; + + return { + spanContext: () => ({ + traceId, + spanId, + traceFlags, + isRemote: false, + }), + } as unknown as Span; +}; + +// Helper to create a mock ReadableSpan (for onEnd) +const createMockReadableSpan = (options: { + traceId?: string; + spanId?: string; + traceFlags?: number; + statusCode?: SpanStatusCode; + events?: Array<{ name: string }>; + attributes?: Record; + durationMs?: number; + kind?: SpanKind; + instrumentationLibraryName?: string; +}): ReadableSpan => { + const { + traceId = "trace-1", + spanId = "span-1", + traceFlags = TraceFlags.NONE, + statusCode = SpanStatusCode.UNSET, + events = [], + attributes = {}, + durationMs = 100, + kind = SpanKind.INTERNAL, + instrumentationLibraryName = "test", + } = options; + + const startTime: [number, number] = [Math.floor(Date.now() / 1000), 0]; + const durationSeconds = Math.floor(durationMs / 1000); + const durationNanos = (durationMs % 1000) * 1e6; + const endTime: [number, number] = [startTime[0] + durationSeconds, durationNanos]; + + return { + spanContext: () => ({ + traceId, + spanId, + traceFlags, + isRemote: false, + }), + status: { code: statusCode }, + events, + attributes, + startTime, + endTime, + name: "test-span", + kind, + parentSpanId: undefined, + duration: [0, durationMs * 1e6], + ended: true, + resource: { attributes: {} }, + instrumentationLibrary: { name: instrumentationLibraryName }, + links: [], + droppedAttributesCount: 0, + droppedEventsCount: 0, + droppedLinksCount: 0, + } as unknown as ReadableSpan; +}; + +// Helper to create a mock SpanProcessor +const createMockProcessor = () => { + return { + onStart: vi.fn(), + onEnd: vi.fn(), + forceFlush: vi.fn().mockResolvedValue(undefined), + shutdown: vi.fn().mockResolvedValue(undefined), + } satisfies SpanProcessor; +}; + +describe("TraceBufferingTailSamplingProcessor", () => { + let mockProcessor: ReturnType; + let processor: TraceBufferingTailSamplingProcessor; + + beforeEach(() => { + vi.clearAllMocks(); + mockProcessor = createMockProcessor(); + processor = new TraceBufferingTailSamplingProcessor({ + processor: mockProcessor, + slowThresholdMs: 5000, + bufferTimeoutMs: 55000, + maxTraces: 1000, + maxSpansPerTrace: 100, + }); + }); + + afterEach(() => { + vi.clearAllMocks(); + }); + + describe("onStart", () => { + it("should pass through to downstream processor", () => { + const span = createMockSpan({}); + + processor.onStart(span, ROOT_CONTEXT); + + expect(mockProcessor.onStart).toHaveBeenCalledWith(span, ROOT_CONTEXT); + }); + + it("should not create buffer for already-sampled traces", () => { + const span = createMockSpan({ traceFlags: TraceFlags.SAMPLED }); + + processor.onStart(span, ROOT_CONTEXT); + + // Verify no buffer created by trying to end the span - should pass through + const readableSpan = createMockReadableSpan({ + traceId: "trace-1", + spanId: "span-1", + traceFlags: TraceFlags.SAMPLED, + }); + + processor.onEnd(readableSpan); + + expect(mockProcessor.onEnd).toHaveBeenCalledWith(readableSpan); + }); + + it("should create buffer for non-sampled traces", () => { + const span = createMockSpan({ traceFlags: TraceFlags.NONE }); + + processor.onStart(span, ROOT_CONTEXT); + + // Verify buffer exists by ending a normal span - should NOT pass through immediately + const readableSpan = createMockReadableSpan({ + traceId: "trace-1", + spanId: "span-2", // Different span, not the root + traceFlags: TraceFlags.NONE, + }); + + processor.onEnd(readableSpan); + + // Not passed through because buffered and root hasn't ended + expect(mockProcessor.onEnd).not.toHaveBeenCalled(); + }); + }); + + describe("onEnd with already-sampled spans", () => { + it("should pass through sampled spans unchanged", () => { + const span = createMockReadableSpan({ traceFlags: TraceFlags.SAMPLED }); + + processor.onEnd(span); + + expect(mockProcessor.onEnd).toHaveBeenCalledWith(span); + }); + }); + + describe("onEnd with non-sampled spans", () => { + describe("trace buffering", () => { + it("should buffer spans until local root ends", () => { + // Start root span + const rootSpan = createMockSpan({ traceId: "trace-1", spanId: "root-span" }); + + processor.onStart(rootSpan, ROOT_CONTEXT); + + // Start child span + const childSpan = createMockSpan({ traceId: "trace-1", spanId: "child-span" }); + + processor.onStart(childSpan, ROOT_CONTEXT); + + // End child span (with error) + const childReadable = createMockReadableSpan({ + traceId: "trace-1", + spanId: "child-span", + statusCode: SpanStatusCode.ERROR, + }); + + processor.onEnd(childReadable); + + // Not exported yet - waiting for root + expect(mockProcessor.onEnd).not.toHaveBeenCalled(); + + // End root span + const rootReadable = createMockReadableSpan({ + traceId: "trace-1", + spanId: "root-span", + }); + + processor.onEnd(rootReadable); + + // Now both spans should be exported + expect(mockProcessor.onEnd).toHaveBeenCalledTimes(2); + }); + + it("should export ALL spans when any span has error", () => { + const rootSpan = createMockSpan({ traceId: "trace-1", spanId: "root" }); + + processor.onStart(rootSpan, ROOT_CONTEXT); + + // Create 3 child spans, only one has error + for (let i = 1; i <= 3; i++) { + const childSpan = createMockSpan({ traceId: "trace-1", spanId: `child-${i}` }); + + processor.onStart(childSpan, ROOT_CONTEXT); + } + + // End children - only child-2 has error + for (let i = 1; i <= 3; i++) { + processor.onEnd( + createMockReadableSpan({ + traceId: "trace-1", + spanId: `child-${i}`, + statusCode: i === 2 ? SpanStatusCode.ERROR : SpanStatusCode.OK, + }), + ); + } + + // End root + processor.onEnd(createMockReadableSpan({ traceId: "trace-1", spanId: "root" })); + + // All 4 spans exported + expect(mockProcessor.onEnd).toHaveBeenCalledTimes(4); + }); + + it("should drop all spans when no error/slow", () => { + const rootSpan = createMockSpan({ traceId: "trace-1", spanId: "root" }); + + processor.onStart(rootSpan, ROOT_CONTEXT); + + const childSpan = createMockSpan({ traceId: "trace-1", spanId: "child" }); + + processor.onStart(childSpan, ROOT_CONTEXT); + + // End both with OK status and fast duration + processor.onEnd( + createMockReadableSpan({ + traceId: "trace-1", + spanId: "child", + statusCode: SpanStatusCode.OK, + durationMs: 100, + }), + ); + + processor.onEnd( + createMockReadableSpan({ + traceId: "trace-1", + spanId: "root", + statusCode: SpanStatusCode.OK, + durationMs: 100, + }), + ); + + // No spans exported + expect(mockProcessor.onEnd).not.toHaveBeenCalled(); + }); + }); + + describe("error detection", () => { + it("should detect ERROR status", () => { + const span = createMockSpan({ traceId: "trace-1", spanId: "span-1" }); + + processor.onStart(span, ROOT_CONTEXT); + + processor.onEnd( + createMockReadableSpan({ + traceId: "trace-1", + spanId: "span-1", + statusCode: SpanStatusCode.ERROR, + }), + ); + + expect(mockProcessor.onEnd).toHaveBeenCalled(); + + const exportedSpan = mockProcessor.onEnd.mock.calls[0][0]; + + expect(exportedSpan.spanContext().traceFlags & TraceFlags.SAMPLED).toBeTruthy(); + expect(exportedSpan.attributes[TAIL_SAMPLING_PROMOTED_ATTR]).toBe(true); + }); + + it("should detect exception events", () => { + const span = createMockSpan({ traceId: "trace-1", spanId: "span-1" }); + + processor.onStart(span, ROOT_CONTEXT); + + processor.onEnd( + createMockReadableSpan({ + traceId: "trace-1", + spanId: "span-1", + events: [{ name: "exception" }], + }), + ); + + expect(mockProcessor.onEnd).toHaveBeenCalled(); + }); + + it("should detect HTTP 500+ status (new semconv)", () => { + const span = createMockSpan({ traceId: "trace-1", spanId: "span-1" }); + + processor.onStart(span, ROOT_CONTEXT); + + processor.onEnd( + createMockReadableSpan({ + traceId: "trace-1", + spanId: "span-1", + attributes: { [ATTR_HTTP_RESPONSE_STATUS_CODE]: 500 }, + }), + ); + + expect(mockProcessor.onEnd).toHaveBeenCalled(); + }); + + it("should detect HTTP 500+ status (old semconv)", () => { + const span = createMockSpan({ traceId: "trace-1", spanId: "span-1" }); + + processor.onStart(span, ROOT_CONTEXT); + + processor.onEnd( + createMockReadableSpan({ + traceId: "trace-1", + spanId: "span-1", + attributes: { "http.status_code": 503 }, + }), + ); + + expect(mockProcessor.onEnd).toHaveBeenCalled(); + }); + + it("should not export on HTTP 4xx status", () => { + const span = createMockSpan({ traceId: "trace-1", spanId: "span-1" }); + + processor.onStart(span, ROOT_CONTEXT); + + processor.onEnd( + createMockReadableSpan({ + traceId: "trace-1", + spanId: "span-1", + attributes: { [ATTR_HTTP_RESPONSE_STATUS_CODE]: 404 }, + }), + ); + + expect(mockProcessor.onEnd).not.toHaveBeenCalled(); + }); + }); + + describe("slow span detection", () => { + it("should export when any span is slow", () => { + const rootSpan = createMockSpan({ traceId: "trace-1", spanId: "root" }); + + processor.onStart(rootSpan, ROOT_CONTEXT); + + const childSpan = createMockSpan({ traceId: "trace-1", spanId: "child" }); + + processor.onStart(childSpan, ROOT_CONTEXT); + + // Child is slow + processor.onEnd( + createMockReadableSpan({ + traceId: "trace-1", + spanId: "child", + durationMs: 6000, // Above 5s threshold + }), + ); + + // Root is fast + processor.onEnd( + createMockReadableSpan({ + traceId: "trace-1", + spanId: "root", + durationMs: 100, + }), + ); + + // Both exported because child was slow + expect(mockProcessor.onEnd).toHaveBeenCalledTimes(2); + }); + }); + }); + + describe("waitUntil integration", () => { + it("should call waitUntil when promoting spans", () => { + const span = createMockSpan({ traceId: "trace-1", spanId: "span-1" }); + + processor.onStart(span, ROOT_CONTEXT); + + processor.onEnd( + createMockReadableSpan({ + traceId: "trace-1", + spanId: "span-1", + statusCode: SpanStatusCode.ERROR, + }), + ); + + expect(mockWaitUntil).toHaveBeenCalled(); + }); + + it("should not call waitUntil when dropping spans", () => { + const span = createMockSpan({ traceId: "trace-1", spanId: "span-1" }); + + processor.onStart(span, ROOT_CONTEXT); + + processor.onEnd( + createMockReadableSpan({ + traceId: "trace-1", + spanId: "span-1", + statusCode: SpanStatusCode.OK, + durationMs: 100, + }), + ); + + expect(mockWaitUntil).not.toHaveBeenCalled(); + }); + }); + + describe("memory bounds", () => { + it("should evict oldest traces when maxTraces exceeded", () => { + const smallProcessor = new TraceBufferingTailSamplingProcessor({ + processor: mockProcessor, + maxTraces: 2, + }); + + // Create 3 traces + for (let i = 1; i <= 3; i++) { + const span = createMockSpan({ traceId: `trace-${i}`, spanId: `span-${i}` }); + + smallProcessor.onStart(span, ROOT_CONTEXT); + + // Add error to all traces + smallProcessor.onEnd( + createMockReadableSpan({ + traceId: `trace-${i}`, + spanId: `span-${i}-child`, + statusCode: SpanStatusCode.ERROR, + }), + ); + } + + // First trace should have been evicted and exported + expect(mockProcessor.onEnd).toHaveBeenCalled(); + }); + + it("should drop oldest spans when maxSpansPerTrace exceeded", () => { + const smallProcessor = new TraceBufferingTailSamplingProcessor({ + processor: mockProcessor, + maxSpansPerTrace: 2, + }); + + const rootSpan = createMockSpan({ traceId: "trace-1", spanId: "root" }); + + smallProcessor.onStart(rootSpan, ROOT_CONTEXT); + + // Create 3 child spans + for (let i = 1; i <= 3; i++) { + const childSpan = createMockSpan({ traceId: "trace-1", spanId: `child-${i}` }); + + smallProcessor.onStart(childSpan, ROOT_CONTEXT); + smallProcessor.onEnd( + createMockReadableSpan({ + traceId: "trace-1", + spanId: `child-${i}`, + statusCode: i === 3 ? SpanStatusCode.ERROR : SpanStatusCode.OK, + }), + ); + } + + // End root + smallProcessor.onEnd(createMockReadableSpan({ traceId: "trace-1", spanId: "root" })); + + // Only 2 spans exported (maxSpansPerTrace) + expect(mockProcessor.onEnd).toHaveBeenCalledTimes(2); + }); + }); + + describe("Vercel request context attributes", () => { + it("should add vercel attributes to promoted spans", () => { + const span = createMockSpan({ traceId: "trace-1", spanId: "span-1" }); + + processor.onStart(span, ROOT_CONTEXT); + + processor.onEnd( + createMockReadableSpan({ + traceId: "trace-1", + spanId: "span-1", + statusCode: SpanStatusCode.ERROR, + }), + ); + + const exportedSpan = mockProcessor.onEnd.mock.calls[0][0]; + + expect(exportedSpan.attributes["vercel.request_id"]).toBe("test-request-id"); + expect(exportedSpan.attributes["vercel.matched_path"]).toBe("/api/test"); + expect(exportedSpan.attributes["http.host"]).toBe("example.com"); + }); + }); + + describe("resource attributes on promoted spans", () => { + it("should add operation.name and resource.name for HTTP server spans", () => { + const span = createMockSpan({ traceId: "trace-1", spanId: "span-1" }); + + processor.onStart(span, ROOT_CONTEXT); + + processor.onEnd( + createMockReadableSpan({ + traceId: "trace-1", + spanId: "span-1", + statusCode: SpanStatusCode.ERROR, + kind: SpanKind.SERVER, + attributes: { + "http.method": "POST", + "http.route": "/api/webhooks", + }, + instrumentationLibraryName: "@opentelemetry/instrumentation-http", + }), + ); + + const exportedSpan = mockProcessor.onEnd.mock.calls[0][0]; + + expect(exportedSpan.attributes["operation.name"]).toBe("web.request"); + expect(exportedSpan.attributes["resource.name"]).toBe("POST /api/webhooks"); + }); + }); + + describe("forceFlush", () => { + it("should delegate to downstream processor", async () => { + await processor.forceFlush(); + + expect(mockProcessor.forceFlush).toHaveBeenCalled(); + }); + }); + + describe("shutdown", () => { + it("should export buffered error traces and delegate to downstream", async () => { + const span = createMockSpan({ traceId: "trace-1", spanId: "span-1" }); + + processor.onStart(span, ROOT_CONTEXT); + + // Add span with error but don't end root + processor.onEnd( + createMockReadableSpan({ + traceId: "trace-1", + spanId: "span-2", + statusCode: SpanStatusCode.ERROR, + }), + ); + + // Nothing exported yet + expect(mockProcessor.onEnd).not.toHaveBeenCalled(); + + // Shutdown should export buffered error traces + await processor.shutdown(); + + expect(mockProcessor.onEnd).toHaveBeenCalled(); + expect(mockProcessor.shutdown).toHaveBeenCalled(); + }); + + it("should not export buffered traces without errors on shutdown", async () => { + const span = createMockSpan({ traceId: "trace-1", spanId: "span-1" }); + + processor.onStart(span, ROOT_CONTEXT); + + processor.onEnd( + createMockReadableSpan({ + traceId: "trace-1", + spanId: "span-2", + statusCode: SpanStatusCode.OK, + }), + ); + + await processor.shutdown(); + + // Only shutdown called, no spans exported + expect(mockProcessor.onEnd).not.toHaveBeenCalled(); + expect(mockProcessor.shutdown).toHaveBeenCalled(); + }); + }); + + describe("configuration options", () => { + it("should respect exportErrors=false", () => { + const configuredProcessor = new TraceBufferingTailSamplingProcessor({ + processor: mockProcessor, + exportErrors: false, + exportSlowSpans: false, + }); + + const span = createMockSpan({ traceId: "trace-1", spanId: "span-1" }); + + configuredProcessor.onStart(span, ROOT_CONTEXT); + + configuredProcessor.onEnd( + createMockReadableSpan({ + traceId: "trace-1", + spanId: "span-1", + statusCode: SpanStatusCode.ERROR, + }), + ); + + expect(mockProcessor.onEnd).not.toHaveBeenCalled(); + }); + + it("should respect exportSlowSpans=false", () => { + const configuredProcessor = new TraceBufferingTailSamplingProcessor({ + processor: mockProcessor, + exportErrors: false, + exportSlowSpans: false, + }); + + const span = createMockSpan({ traceId: "trace-1", spanId: "span-1" }); + + configuredProcessor.onStart(span, ROOT_CONTEXT); + + configuredProcessor.onEnd( + createMockReadableSpan({ + traceId: "trace-1", + spanId: "span-1", + durationMs: 10000, + }), + ); + + expect(mockProcessor.onEnd).not.toHaveBeenCalled(); + }); + + it("should use custom slowThresholdMs", () => { + const configuredProcessor = new TraceBufferingTailSamplingProcessor({ + processor: mockProcessor, + slowThresholdMs: 1000, // 1 second + }); + + const span = createMockSpan({ traceId: "trace-1", spanId: "span-1" }); + + configuredProcessor.onStart(span, ROOT_CONTEXT); + + configuredProcessor.onEnd( + createMockReadableSpan({ + traceId: "trace-1", + spanId: "span-1", + durationMs: 1500, // 1.5 seconds + }), + ); + + expect(mockProcessor.onEnd).toHaveBeenCalled(); + }); + }); +}); diff --git a/packages/otel/src/trace-buffering-tail-sampling-processor.ts b/packages/otel/src/trace-buffering-tail-sampling-processor.ts new file mode 100644 index 000000000..7559b6d4d --- /dev/null +++ b/packages/otel/src/trace-buffering-tail-sampling-processor.ts @@ -0,0 +1,397 @@ +import { Context, SpanStatusCode, TraceFlags } from "@opentelemetry/api"; +import { ReadableSpan, Span, SpanProcessor } from "@opentelemetry/sdk-trace-node"; +import { ATTR_HTTP_RESPONSE_STATUS_CODE } from "@opentelemetry/semantic-conventions"; + +import { + getVercelRequestContext, + getVercelRequestContextAttributes, +} from "./vercel-request-context"; +import { computeVercelResourceAttributes } from "./vercel-resource-attributes"; + +/** + * Attribute to mark spans that were promoted by tail sampling. + * Used by OTEL collector configuration to propagate spans manually marked as promoted by the app. + */ +export const TAIL_SAMPLING_PROMOTED_ATTR = "sampling.promoted_by_tail_sampler"; + +export interface TraceBufferingTailSamplingProcessorConfig { + /** + * The downstream processor to send sampled spans to. + * Typically a BatchSpanProcessor. + */ + processor: SpanProcessor; + + /** + * Latency threshold in milliseconds. Traces with spans slower than this are always exported. + * @default 5000 (5 seconds) + */ + slowThresholdMs?: number; + + /** + * Timeout in milliseconds for how long to buffer traces. + * Traces older than this are cleaned up (and exported if they have errors/slow spans). + * @default 55000 (55 seconds, under Vercel's 60s timeout) + */ + bufferTimeoutMs?: number; + + /** + * Maximum number of traces to buffer. Oldest traces are evicted when exceeded. + * @default 1000 + */ + maxTraces?: number; + + /** + * Maximum number of spans to buffer per trace. Oldest spans are dropped when exceeded. + * @default 100 + */ + maxSpansPerTrace?: number; + + /** + * Whether to always export error spans. + * @default true + */ + exportErrors?: boolean; + + /** + * Whether to always export slow spans. + * @default true + */ + exportSlowSpans?: boolean; +} + +interface TraceBuffer { + spans: ReadableSpan[]; + localRootSpanId: string | undefined; + localRootEnded: boolean; + createdAt: number; + hasError: boolean; + hasSlow: boolean; + /** + * Capture the request context early; by the time onEnd runs we still expect it + * to be available, but storing it avoids edge cases. + */ + vercelRequestContext?: ReturnType; +} + +/** + * Check if SAMPLED flag is set in traceFlags bitmask. + */ +function isSampled(traceFlags: number): boolean { + return (traceFlags & TraceFlags.SAMPLED) !== 0; +} + +/** + * A SpanProcessor that implements trace-level tail sampling with buffering. + * + * Unlike TailSamplingProcessor which makes per-span decisions (causing "swiss cheese" traces), + * this processor buffers all spans for a trace and makes a single export decision + * when the local root span ends. + * + * When ANY span in a trace has an error or is slow, ALL buffered spans for that trace + * are exported together. This ensures complete traces for debugging. + * + * Key behaviors: + * - Already-sampled traces pass through immediately (no buffering) + * - Non-sampled traces are buffered until local root ends + * - Export decision based on any error/slow span in the entire trace + * - Calls waitUntil() for promoted traces to ensure flush on Vercel + * + * Memory protection: + * - maxTraces: caps total buffered traces (evicts oldest) + * - maxSpansPerTrace: caps spans per trace (drops oldest spans) + * - bufferTimeoutMs: cleans up stale buffers + */ +export class TraceBufferingTailSamplingProcessor implements SpanProcessor { + private traceBuffers = new Map(); + private readonly processor: SpanProcessor; + private readonly slowThresholdMs: number; + private readonly bufferTimeoutMs: number; + private readonly maxTraces: number; + private readonly maxSpansPerTrace: number; + private readonly exportErrors: boolean; + private readonly exportSlowSpans: boolean; + + constructor(config: TraceBufferingTailSamplingProcessorConfig) { + this.processor = config.processor; + this.slowThresholdMs = config.slowThresholdMs ?? 5000; + this.bufferTimeoutMs = config.bufferTimeoutMs ?? 55000; + this.maxTraces = config.maxTraces ?? 1000; + this.maxSpansPerTrace = config.maxSpansPerTrace ?? 100; + this.exportErrors = config.exportErrors ?? true; + this.exportSlowSpans = config.exportSlowSpans ?? true; + } + + onStart(span: Span, parentContext: Context): void { + // Always delegate to downstream processor first + this.processor.onStart(span, parentContext); + + const { traceId, spanId, traceFlags } = span.spanContext(); + + // Never buffer already-sampled traces (avoid memory leaks) + if (isSampled(traceFlags)) { + return; + } + + /* + * Mirror @vercel/otel CompositeSpanProcessor root logic: + * First span we see for a traceId is treated as local root. + */ + const isFirstSpanInTrace = !this.traceBuffers.has(traceId); + + if (isFirstSpanInTrace) { + // Clean up stale buffers on access (serverless-friendly, no background intervals) + this.cleanupStaleBuffers(); + + // Evict oldest traces if at capacity + this.evictOldestTraces(); + + this.traceBuffers.set(traceId, { + spans: [], + localRootSpanId: spanId, + localRootEnded: false, + createdAt: Date.now(), + hasError: false, + hasSlow: false, + vercelRequestContext: getVercelRequestContext(), + }); + } + } + + onEnd(span: ReadableSpan): void { + const spanContext = span.spanContext(); + const { traceId, spanId, traceFlags } = spanContext; + + // If already sampled, pass through directly (no buffering) + if (isSampled(traceFlags)) { + this.processor.onEnd(span); + + return; + } + + const buffer = this.traceBuffers.get(traceId); + + // No buffer = this trace was never tracked (shouldn't happen, but be defensive) + if (!buffer) { + return; + } + + // Update error/slow flags based on this span + if (this.exportErrors && this.isError(span)) { + buffer.hasError = true; + } + + if (this.exportSlowSpans && this.isSlow(span)) { + buffer.hasSlow = true; + } + + // Add span to buffer (with cap enforcement) + this.addSpanToBuffer(buffer, span); + + // Check if this is the local root span ending + if (spanId === buffer.localRootSpanId) { + buffer.localRootEnded = true; + this.finalizeTrace(traceId); + } + } + + forceFlush(): Promise { + return this.processor.forceFlush(); + } + + shutdown(): Promise { + // Export any remaining buffered traces that have errors/slow spans + for (const [traceId, buffer] of this.traceBuffers) { + if (buffer.hasError || buffer.hasSlow) { + this.exportBuffer(buffer); + } + + this.traceBuffers.delete(traceId); + } + + return this.processor.shutdown(); + } + + /** + * Add span to buffer with maxSpansPerTrace enforcement. + * When exceeded, oldest spans are dropped. + */ + private addSpanToBuffer(buffer: TraceBuffer, span: ReadableSpan): void { + if (buffer.spans.length >= this.maxSpansPerTrace) { + // Drop oldest span + buffer.spans.shift(); + } + + buffer.spans.push(span); + } + + /** + * Finalize trace when local root ends. + * Exports all buffered spans if any had errors or were slow. + */ + private finalizeTrace(traceId: string): void { + const buffer = this.traceBuffers.get(traceId); + + if (!buffer) { + return; + } + + const shouldExport = buffer.hasError || buffer.hasSlow; + + if (shouldExport) { + this.exportBuffer(buffer); + + /* + * CRITICAL: Schedule waitUntil for flush + * This ensures promoted spans are flushed before the request ends on Vercel + */ + const vrc = buffer.vercelRequestContext ?? getVercelRequestContext(); + + vrc?.waitUntil(this.processor.forceFlush()); + } + + this.traceBuffers.delete(traceId); + } + + /** + * Export all spans from a buffer. + */ + private exportBuffer(buffer: TraceBuffer): void { + for (const span of buffer.spans) { + const sampledSpan = this.promoteToSampled(span); + + this.processor.onEnd(sampledSpan); + } + } + + /** + * Clean up stale buffers that have exceeded the timeout. + * Called on-access (serverless-friendly, no background intervals). + */ + private cleanupStaleBuffers(): void { + const now = Date.now(); + + for (const [traceId, buffer] of this.traceBuffers) { + if (now - buffer.createdAt > this.bufferTimeoutMs) { + // Export if it had errors/slow spans, otherwise just drop + if (buffer.hasError || buffer.hasSlow) { + this.exportBuffer(buffer); + + const vrc = buffer.vercelRequestContext ?? getVercelRequestContext(); + + vrc?.waitUntil(this.processor.forceFlush()); + } + + this.traceBuffers.delete(traceId); + } + } + } + + /** + * Evict oldest traces if at maxTraces capacity. + */ + private evictOldestTraces(): void { + while (this.traceBuffers.size >= this.maxTraces) { + // Map iteration order is insertion order, so first key is oldest + const oldestTraceId = this.traceBuffers.keys().next().value; + + if (oldestTraceId) { + const buffer = this.traceBuffers.get(oldestTraceId); + + // Export if it had errors/slow spans before evicting + if (buffer && (buffer.hasError || buffer.hasSlow)) { + this.exportBuffer(buffer); + } + + this.traceBuffers.delete(oldestTraceId); + } + } + } + + /** + * Determine if a span has an error. + */ + private isError(span: ReadableSpan): boolean { + // Check span status + if (span.status.code === SpanStatusCode.ERROR) { + return true; + } + + // Check for recorded exceptions + if (span.events.some((event) => event.name === "exception")) { + return true; + } + + // Check HTTP status code attribute (covers both old and new semconv) + const httpStatusCode = + span.attributes[ATTR_HTTP_RESPONSE_STATUS_CODE] ?? span.attributes["http.status_code"]; + + if (typeof httpStatusCode === "number" && httpStatusCode >= 500) { + return true; + } + + return false; + } + + /** + * Determine if a span is slow. + */ + private isSlow(span: ReadableSpan): boolean { + /* + * Calculate duration in milliseconds + * span.startTime and span.endTime are [seconds, nanoseconds] tuples + */ + const startMs = span.startTime[0] * 1000 + span.startTime[1] / 1e6; + const endMs = span.endTime[0] * 1000 + span.endTime[1] / 1e6; + const durationMs = endMs - startMs; + + return durationMs >= this.slowThresholdMs; + } + + /** + * Create a modified span with SAMPLED flag set and additional attributes. + * + * Since spanContext is supposed to be immutable, we use Object.create() + * to create a new object that inherits from the original but overrides + * the spanContext method using Object.defineProperty. + */ + private promoteToSampled(span: ReadableSpan): ReadableSpan { + const originalContext = span.spanContext(); + + // Create modified spanContext with SAMPLED flag + const sampledContext = { + ...originalContext, + traceFlags: originalContext.traceFlags | TraceFlags.SAMPLED, + }; + + /* + * Build new attributes with: + * 1. Original span attributes + * 2. Promotion marker + * 3. DataDog resource attrs (operation.name, resource.name) + * 4. Vercel request-context attrs (vercel.request_id, etc.) + */ + const newAttributes = { + ...span.attributes, + [TAIL_SAMPLING_PROMOTED_ATTR]: true, + ...computeVercelResourceAttributes(span), + ...getVercelRequestContextAttributes(), + }; + + // Create a new object that inherits from span but overrides spanContext + const sampledSpan = Object.create(span) as ReadableSpan; + + // Override spanContext method + Object.defineProperty(sampledSpan, "spanContext", { + value: () => sampledContext, + enumerable: true, + }); + + // Override attributes getter + Object.defineProperty(sampledSpan, "attributes", { + value: newAttributes, + enumerable: true, + }); + + return sampledSpan; + } +} diff --git a/packages/otel/src/vercel-request-context.ts b/packages/otel/src/vercel-request-context.ts new file mode 100644 index 000000000..754808906 --- /dev/null +++ b/packages/otel/src/vercel-request-context.ts @@ -0,0 +1,86 @@ +/** + * Vercel Request Context utilities extracted from @vercel/otel. + * + * SOURCE: @vercel/otel v1.10.1 + * REPO: https://github.com/vercel/otel + * + * WHY THIS FILE EXISTS: + * When using tail sampling with trace buffering, promoted spans need to: + * 1. Call waitUntil() to ensure they're flushed before the request ends + * 2. Include Vercel request-context attributes (vercel.request_id, etc.) + * + * @vercel/otel's CompositeSpanProcessor handles this for sampled-at-start spans, + * but promoted spans bypass that flow. We need direct access to these utilities. + */ + +const VERCEL_REQUEST_CONTEXT_SYMBOL = Symbol.for("@vercel/request-context"); + +interface VercelRequestContext { + waitUntil: (promiseOrFunc: Promise | (() => Promise)) => void; + headers: Record; + url: string; +} + +interface GlobalWithVercelRequestContext { + [VERCEL_REQUEST_CONTEXT_SYMBOL]?: { + get: () => VercelRequestContext | undefined; + }; +} + +/** + * Get the Vercel request context for the current request. + * Returns undefined when not running in Vercel environment. + */ +export function getVercelRequestContext(): VercelRequestContext | undefined { + const globalWithContext = globalThis as GlobalWithVercelRequestContext; + const reader = globalWithContext[VERCEL_REQUEST_CONTEXT_SYMBOL]; + + return reader?.get(); +} + +/** + * Extract Vercel-specific attributes from the request context. + * These attributes are normally added by @vercel/otel's CompositeSpanProcessor + * for sampled spans, but promoted spans need them added explicitly. + * + * Based on @vercel/otel vercel-request-context/attributes.ts + */ +export function getVercelRequestContextAttributes(): Record { + const vrc = getVercelRequestContext(); + + if (!vrc) { + return {}; + } + + const attrs: Record = {}; + + // x-vercel-id contains the request ID + const requestId = vrc.headers["x-vercel-id"]; + + if (requestId) { + attrs["vercel.request_id"] = requestId; + } + + // x-matched-path contains the matched route + const matchedPath = vrc.headers["x-matched-path"]; + + if (matchedPath) { + attrs["vercel.matched_path"] = matchedPath; + } + + // Host header + const host = vrc.headers["host"]; + + if (host) { + attrs["http.host"] = host; + } + + // Edge region (for edge functions) + const region = vrc.headers["x-vercel-edge-region"]; + + if (region) { + attrs["vercel.edge_region"] = region; + } + + return attrs; +} diff --git a/packages/otel/src/vercel-resource-attributes.ts b/packages/otel/src/vercel-resource-attributes.ts new file mode 100644 index 000000000..b4c9471b5 --- /dev/null +++ b/packages/otel/src/vercel-resource-attributes.ts @@ -0,0 +1,132 @@ +/** + * DataDog resource attributes extracted from @vercel/otel. + * + * SOURCE: @vercel/otel v1.10.1 (commit a4a8662) + * REPO: https://github.com/vercel/otel + * + * WHY THIS FILE EXISTS: + * @vercel/otel's CompositeSpanProcessor adds DataDog-specific attributes + * (`operation.name`, `resource.name`) in onEnd() - but ONLY for sampled spans. + * + * When using tail sampling, spans start as non-sampled (RECORD decision) and + * get promoted to SAMPLED by TailSamplingProcessor. The problem is: + * + * 1. Span ends → CompositeSpanProcessor.onEnd() runs first + * 2. CompositeSpanProcessor checks isSampled() → false → skips resource attributes + * 3. TailSamplingProcessor.onEnd() runs → promotes span to SAMPLED + * 4. Promoted span goes to BatchSpanProcessor (never back through CompositeSpanProcessor) + * + * Result: promoted spans are missing DataDog attributes that normally sampled spans have. + * + * WHY CompositeSpanProcessor RUNS FIRST + * + * registerOTel() from @vercel/otel wraps our spanProcessors inside CompositeSpanProcessor: + * + * ``` + * tracerProvider.addSpanProcessor( + * new CompositeSpanProcessor(spanProcessors, ...) // Our processors nested inside + * ) + * ``` + * + * This creates the processor chain: + * SDK → CompositeSpanProcessor → TailSamplingProcessor → BatchSpanProcessor + * SOLUTION: + * We extract getResourceAttributes() and its dependencies from @vercel/otel so that + * TailSamplingProcessor can add these attributes when promoting spans. + * + * MODIFICATIONS: + * - Removed CompositeSpanProcessor class (not needed) + * - Kept only getResourceAttributes(), toOperationName(), and SPAN_KIND_NAME + * - Renamed getResourceAttributes to computeVercelResourceAttributes and exported it + * + * WHEN TO UPDATE: + * If @vercel/otel changes getResourceAttributes() logic, copy new file and re-apply modifications. + */ +import type { Attributes } from "@opentelemetry/api"; +import { SpanKind } from "@opentelemetry/api"; +import type { ReadableSpan } from "@opentelemetry/sdk-trace-node"; + +const SPAN_KIND_NAME: { [key in SpanKind]: string } = { + [SpanKind.INTERNAL]: "internal", + [SpanKind.SERVER]: "server", + [SpanKind.CLIENT]: "client", + [SpanKind.PRODUCER]: "producer", + [SpanKind.CONSUMER]: "consumer", +}; + +// SALEOR MODIFICATION: Renamed from getResourceAttributes and exported +export function computeVercelResourceAttributes(span: ReadableSpan): Attributes | undefined { + const { kind, attributes } = span; + const { + "operation.name": operationName, + "resource.name": resourceName, + "span.type": spanTypeAttr, + "next.span_type": nextSpanType, + "http.method": httpMethod, + "http.route": httpRoute, + } = attributes; + + if (operationName) { + return undefined; + } + + const resourceNameResolved = + resourceName ?? + (httpMethod && typeof httpMethod === "string" && httpRoute && typeof httpRoute === "string" + ? `${httpMethod} ${httpRoute}` + : httpRoute); + + if ( + span.kind === SpanKind.SERVER && + httpMethod && + httpRoute && + typeof httpMethod === "string" && + typeof httpRoute === "string" + ) { + return { + "operation.name": "web.request", + "resource.name": resourceNameResolved, + }; + } + + /* + * Per https://github.com/DataDog/datadog-agent/blob/main/pkg/config/config_template.yaml, + * the default operation.name is "library name + span kind". + */ + const libraryName = span.instrumentationLibrary.name; + const spanType = nextSpanType ?? spanTypeAttr; + + if (spanType && typeof spanType === "string") { + const nextOperationName = toOperationName(libraryName, spanType); + + if (httpRoute) { + return { + "operation.name": nextOperationName, + "resource.name": resourceNameResolved, + }; + } + + return { "operation.name": nextOperationName }; + } + + return { + "operation.name": toOperationName( + libraryName, + // SALEOR MODIFICATION: Added type assertion for stricter TypeScript config + kind === SpanKind.INTERNAL ? "" : SPAN_KIND_NAME[kind as SpanKind], + ), + }; +} + +function toOperationName(libraryName: string, name: string): string { + if (!libraryName) { + return name; + } + let cleanLibraryName = libraryName.replace(/[ @./]/g, "_"); + + if (cleanLibraryName.startsWith("_")) { + cleanLibraryName = cleanLibraryName.slice(1); + } + + return name ? `${cleanLibraryName}.${name}` : cleanLibraryName; +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index cfd591c6f..cef661d9d 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -2434,6 +2434,9 @@ importers: '@opentelemetry/api': specifier: 'catalog:' version: 1.9.0 + '@opentelemetry/instrumentation': + specifier: 'catalog:' + version: 0.57.2(@opentelemetry/api@1.9.0) '@opentelemetry/sdk-metrics': specifier: 'catalog:' version: 1.30.1(@opentelemetry/api@1.9.0) @@ -2449,6 +2452,12 @@ importers: '@types/node': specifier: 'catalog:' version: 22.13.10 + '@vercel/otel': + specifier: 'catalog:' + version: 1.10.1(@opentelemetry/api-logs@0.57.2)(@opentelemetry/api@1.9.0)(@opentelemetry/instrumentation@0.57.2(@opentelemetry/api@1.9.0))(@opentelemetry/resources@1.30.1(@opentelemetry/api@1.9.0))(@opentelemetry/sdk-logs@0.57.2(@opentelemetry/api@1.9.0))(@opentelemetry/sdk-metrics@1.30.1(@opentelemetry/api@1.9.0))(@opentelemetry/sdk-trace-base@1.30.1(@opentelemetry/api@1.9.0)) + '@vitest/coverage-v8': + specifier: 'catalog:' + version: 3.1.1(vitest@3.1.1(@types/node@22.13.10)(jiti@2.4.2)(jsdom@20.0.3)(msw@2.10.2(@types/node@22.13.10)(typescript@5.8.2))(terser@5.18.0)(tsx@4.19.3)(yaml@2.7.0)) eslint: specifier: 'catalog:' version: 9.23.0(jiti@2.4.2) @@ -2461,6 +2470,12 @@ importers: urql: specifier: 'catalog:' version: 4.0.4(graphql@16.11.0)(react@18.2.0) + vite: + specifier: 'catalog:' + version: 6.2.4(@types/node@22.13.10)(jiti@2.4.2)(terser@5.18.0)(tsx@4.19.3)(yaml@2.7.0) + vitest: + specifier: 'catalog:' + version: 3.1.1(@types/node@22.13.10)(jiti@2.4.2)(jsdom@20.0.3)(msw@2.10.2(@types/node@22.13.10)(typescript@5.8.2))(terser@5.18.0)(tsx@4.19.3)(yaml@2.7.0) packages/react-hook-form-macaw: devDependencies: @@ -3410,11 +3425,6 @@ packages: resolution: {integrity: sha512-U5eyP/CTFPuNE3qk+WZMxFkp/4zUzdceQlfzf7DdGdhp+Fezd7HD+i8Y24ZuTMKX3wQBld449jijbGq6OdGNQg==} engines: {node: '>=6.9.0'} - '@babel/parser@7.26.5': - resolution: {integrity: sha512-SRJ4jYmXRqV1/Xc+TIVG84WjHBXKlxO9sHQnA2Pf12QQEAp1LOh6kDzNHXcUnbH1QI0FDoPPVOt+vyUDucxpaw==} - engines: {node: '>=6.0.0'} - hasBin: true - '@babel/parser@7.27.0': resolution: {integrity: sha512-iaepho73/2Pz7w2eMS0Q5f83+0RKI7i4xmiYeBmDzfRVbQtTOG7Ts0S4HzJVsTMGI9keU8rNfuZr8DKfSt7Yyg==} engines: {node: '>=6.0.0'} @@ -3630,10 +3640,6 @@ packages: resolution: {integrity: sha512-19lYZFzYVQkkHkl4Cy4WrAVcqBkgvV2YM2TU3xG6DIwO7O3ecbDPfW3yM3bjAGcqcQHi+CCtjMR3dIEHxsd6bA==} engines: {node: '>=6.9.0'} - '@babel/types@7.26.5': - resolution: {integrity: sha512-L6mZmwFDK6Cjh1nRCLXpa6no13ZIioJDz7mdkzHv399pThrTa/k0nUlNaenOeh2kWu/iaOQYElEpKPUswUa9Vg==} - engines: {node: '>=6.9.0'} - '@babel/types@7.27.0': resolution: {integrity: sha512-H45s8fVLYjbhFH62dIJ3WtmJ6RSPt/3DRO0ZcT2SUiYiQyz3BLVb9ADEnLl91m74aQPS3AzzeajZHYOalWe3bg==} engines: {node: '>=6.9.0'} @@ -13297,10 +13303,6 @@ snapshots: '@babel/template': 7.27.0 '@babel/types': 7.27.0 - '@babel/parser@7.26.5': - dependencies: - '@babel/types': 7.27.0 - '@babel/parser@7.27.0': dependencies: '@babel/types': 7.27.0 @@ -13536,11 +13538,6 @@ snapshots: transitivePeerDependencies: - supports-color - '@babel/types@7.26.5': - dependencies: - '@babel/helper-string-parser': 7.25.9 - '@babel/helper-validator-identifier': 7.25.9 - '@babel/types@7.27.0': dependencies: '@babel/helper-string-parser': 7.25.9 @@ -14822,10 +14819,10 @@ snapshots: '@graphql-tools/graphql-tag-pluck@8.3.0(graphql@16.11.0)': dependencies: '@babel/core': 7.26.10 - '@babel/parser': 7.26.5 + '@babel/parser': 7.27.0 '@babel/plugin-syntax-import-assertions': 7.23.3(@babel/core@7.26.10) '@babel/traverse': 7.23.9 - '@babel/types': 7.26.5 + '@babel/types': 7.27.0 '@graphql-tools/utils': 10.8.6(graphql@16.11.0) graphql: 16.11.0 tslib: 2.8.1 @@ -14835,10 +14832,10 @@ snapshots: '@graphql-tools/graphql-tag-pluck@8.3.0(graphql@16.7.1)': dependencies: '@babel/core': 7.26.10 - '@babel/parser': 7.26.5 + '@babel/parser': 7.27.0 '@babel/plugin-syntax-import-assertions': 7.23.3(@babel/core@7.26.10) '@babel/traverse': 7.23.9 - '@babel/types': 7.26.5 + '@babel/types': 7.27.0 '@graphql-tools/utils': 10.8.6(graphql@16.7.1) graphql: 16.7.1 tslib: 2.8.1