diff --git a/contrib/opentelemetry/tracing_interceptor_test.go b/contrib/opentelemetry/tracing_interceptor_test.go index f1ec5ab57..fef14097c 100644 --- a/contrib/opentelemetry/tracing_interceptor_test.go +++ b/contrib/opentelemetry/tracing_interceptor_test.go @@ -14,8 +14,10 @@ import ( "go.opentelemetry.io/otel/sdk/trace/tracetest" "go.opentelemetry.io/otel/trace" + "go.temporal.io/sdk/client" "go.temporal.io/sdk/contrib/opentelemetry" "go.temporal.io/sdk/interceptor" + "go.temporal.io/sdk/internal" "go.temporal.io/sdk/internal/interceptortest" "go.temporal.io/sdk/temporal" "go.temporal.io/sdk/testsuite" @@ -35,6 +37,34 @@ func TestSpanPropagation(t *testing.T) { interceptortest.AssertSpanPropagation(t, testTracer) } +func TestStandaloneActivitySpanCreation(t *testing.T) { + rec := tracetest.NewSpanRecorder() + tracer, err := opentelemetry.NewTracer(opentelemetry.TracerOptions{ + Tracer: sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(rec)).Tracer(""), + }) + require.NoError(t, err) + + ctx := internal.NewHeaderContext(context.Background()) + outbound := interceptor.NewTracingInterceptor(tracer).InterceptClient(&testNoopClientOutbound{}) + _, _ = outbound.ExecuteActivity(ctx, &interceptor.ClientExecuteActivityInput{ + ActivityType: "test-saa", + Options: &client.StartActivityOptions{ID: "test-saa-123"}, + }) + + spans := rec.Ended() + require.Len(t, spans, 1) + assert.Equal(t, "StartActivity:test-saa", spans[0].Name()) + + var foundActivityID bool + for _, attr := range spans[0].Attributes() { + if string(attr.Key) == "temporalActivityID" { + foundActivityID = true + assert.Equal(t, "test-saa-123", attr.Value.AsString()) + } + } + require.True(t, foundActivityID, "expected activity ID span attribute") +} + type testTracer struct { interceptor.Tracer rec *tracetest.SpanRecorder @@ -235,3 +265,11 @@ func TestSpanFromWorkflowContextNoOpSpan(t *testing.T) { require.True(t, env.IsWorkflowCompleted()) require.NoError(t, env.GetWorkflowError()) } + +type testNoopClientOutbound struct { + interceptor.ClientOutboundInterceptorBase +} + +func (n *testNoopClientOutbound) ExecuteActivity(_ context.Context, _ *interceptor.ClientExecuteActivityInput) (client.ActivityHandle, error) { + return nil, nil +} diff --git a/interceptor/interceptor.go b/interceptor/interceptor.go index 55f497f67..9c260a116 100644 --- a/interceptor/interceptor.go +++ b/interceptor/interceptor.go @@ -302,6 +302,7 @@ type NexusCancelOperationInput = internal.NexusCancelOperationInput // // This returns a non-nil map only for contexts inside // ActivityInboundInterceptor.ExecuteActivity, +// ClientOutboundInterceptor.ExecuteActivity, // ClientOutboundInterceptor.ExecuteWorkflow, and // ClientOutboundInterceptor.SignalWithStartWorkflow. func Header(ctx context.Context) map[string]*commonpb.Payload { diff --git a/interceptor/tracing_interceptor.go b/interceptor/tracing_interceptor.go index 91fa489c0..0ed642c6a 100644 --- a/interceptor/tracing_interceptor.go +++ b/interceptor/tracing_interceptor.go @@ -394,6 +394,28 @@ func (t *tracingClientOutboundInterceptor) UpdateWithStartWorkflow( return val, err } +func (t *tracingClientOutboundInterceptor) ExecuteActivity( + ctx context.Context, + in *ClientExecuteActivityInput, +) (client.ActivityHandle, error) { + span, ctx, err := t.root.startSpanFromContext(ctx, &TracerStartSpanOptions{ + Operation: "StartActivity", + Name: in.ActivityType, + Tags: map[string]string{activityIDTagKey: in.Options.ID}, + ToHeader: true, + Time: time.Now(), + }, t.root.headerReader(ctx), t.root.headerWriter(ctx)) + if err != nil { + return nil, err + } + var finishOpts TracerFinishSpanOptions + defer span.Finish(&finishOpts) + + handle, err := t.Next.ExecuteActivity(ctx, in) + finishOpts.Error = err + return handle, err +} + type tracingActivityOutboundInterceptor struct { ActivityOutboundInterceptorBase root *tracingInterceptor diff --git a/internal/cmd/tools/doclink/doclink.go b/internal/cmd/tools/doclink/doclink.go index ea57440b3..50f0b02a2 100644 --- a/internal/cmd/tools/doclink/doclink.go +++ b/internal/cmd/tools/doclink/doclink.go @@ -390,7 +390,25 @@ func processInternal(cfg config, file *os.File, pairs map[string]map[string]stri trimmedNextLine = nextLine } - // Check for new doc links to add + // Track whether nextLine is inside a function or interface block. + // We update this first so the first line inside a block is not treated + // as a top-level definition. + if strings.HasPrefix(trimmedLine, "func ") { + funcSpaces = indentSize + inFunc = true + } else if inFunc && trimmedLine == "}" && funcSpaces == indentSize { + funcSpaces = -1 + inFunc = false + } + if strings.HasSuffix(trimmedLine, "interface {") { + interfaceSpaces = indentSize + inInterface = true + } else if inInterface && trimmedLine == "}" && interfaceSpaces == indentSize { + interfaceSpaces = -1 + inInterface = false + } + + // Check for new doc links to add on top-level definitions only. if !inFunc && !inInterface && isValidDefinition(trimmedNextLine, &inGroup, &inStruct) { // Find the "Exposed As" line in the doc comment var existingDoclink string @@ -451,23 +469,6 @@ func processInternal(cfg config, file *os.File, pairs map[string]map[string]stri } } - // update inFunc after we actually check for doclinks to allow us to check - // a function's definition, without checking anything inside the function - if strings.HasPrefix(trimmedLine, "func ") { - funcSpaces = indentSize - inFunc = true - } else if inFunc && trimmedLine == "}" && funcSpaces == indentSize { - funcSpaces = -1 - inFunc = false - } - if strings.HasSuffix(trimmedLine, "interface {") { - interfaceSpaces = indentSize - inInterface = true - } else if inInterface && trimmedLine == "}" && interfaceSpaces == indentSize { - interfaceSpaces = -1 - inInterface = false - } - newFile += line + "\n" } diff --git a/internal/interceptor.go b/internal/interceptor.go index dbccfb945..acefa8d98 100644 --- a/internal/interceptor.go +++ b/internal/interceptor.go @@ -419,6 +419,7 @@ type ClientOutboundInterceptor interface { DescribeWorkflow(context.Context, *ClientDescribeWorkflowInput) (*ClientDescribeWorkflowOutput, error) // ExecuteActivity intercepts client.Client.ExecuteActivity. + // interceptor.Header will return a non-nil map for this context. // // NOTE: Experimental ExecuteActivity(context.Context, *ClientExecuteActivityInput) (ClientActivityHandle, error) diff --git a/internal/interceptor_header.go b/internal/interceptor_header.go index 836924e29..4f014a7ef 100644 --- a/internal/interceptor_header.go +++ b/internal/interceptor_header.go @@ -23,6 +23,10 @@ func contextWithNewHeader(ctx context.Context) context.Context { return context.WithValue(ctx, headerKey{}, map[string]*commonpb.Payload{}) } +func NewHeaderContext(ctx context.Context) context.Context { + return contextWithNewHeader(ctx) +} + func contextWithoutHeader(ctx context.Context) context.Context { return context.WithValue(ctx, headerKey{}, nil) }