diff --git a/chasm/lib/nexusoperation/operation_tasks.go b/chasm/lib/nexusoperation/operation_tasks.go index 681ea278297..ddd21158075 100644 --- a/chasm/lib/nexusoperation/operation_tasks.go +++ b/chasm/lib/nexusoperation/operation_tasks.go @@ -132,7 +132,7 @@ func (h *operationInvocationTaskHandler) Execute( return fmt.Errorf("failed to build callback URL: %w", err) } - token, err := h.generateCallbackToken(args.serializedRef, args.requestID) + token, err := h.generateCallbackToken(args.serializedRef, opRef.NamespaceID, opRef.BusinessID, opRef.RunID, args.requestID) if err != nil { return err } diff --git a/chasm/lib/nexusoperation/task_handler_helpers.go b/chasm/lib/nexusoperation/task_handler_helpers.go index 060c1398812..588e1f066f0 100644 --- a/chasm/lib/nexusoperation/task_handler_helpers.go +++ b/chasm/lib/nexusoperation/task_handler_helpers.go @@ -392,12 +392,20 @@ func lookupEndpoint(ctx context.Context, registry commonnexus.EndpointRegistry, } // generateCallbackToken creates a callback token for the given operation reference. +// +// namespaceID, businessID, and runID are dual-populated alongside ComponentRef so +// legacy HSM-only callback routers can still route the completion. ComponentRef +// remains authoritative for completion handling; the HSM-shaped fields are +// consulted only by routers, never for completion semantics. func (h *operationInvocationTaskHandler) generateCallbackToken( serializedRef []byte, - requestID string, + namespaceID, businessID, runID, requestID string, ) (string, error) { token, err := h.callbackTokenGenerator.Tokenize(&tokenspb.NexusOperationCompletion{ ComponentRef: serializedRef, + NamespaceId: namespaceID, + WorkflowId: businessID, + RunId: runID, RequestId: requestID, }) if err != nil { diff --git a/common/nexus/callback_token.go b/common/nexus/callback_token.go index a2b7c6a44e5..a6b0c6c3bac 100644 --- a/common/nexus/callback_token.go +++ b/common/nexus/callback_token.go @@ -70,25 +70,19 @@ func (g *CallbackTokenGenerator) DecodeCompletion(token *CallbackToken) (*tokens func validateCompletion(completion *tokenspb.NexusOperationCompletion) error { hasCHASMRef := len(completion.GetComponentRef()) > 0 - hasHSMRef := completion.GetNamespaceId() != "" || - completion.GetWorkflowId() != "" || - completion.GetRunId() != "" || - completion.GetRef() != nil - isCompleteHSM := completion.GetNamespaceId() != "" && + hasHSMRef := completion.GetNamespaceId() != "" && completion.GetWorkflowId() != "" && completion.GetRunId() != "" && completion.GetRef() != nil - switch { - case hasCHASMRef && hasHSMRef: - return serviceerror.NewInvalidArgument("callback token contains both HSM and CHASM fields") - case hasCHASMRef: - return nil - case isCompleteHSM: + // CHASM tokens may also carry partial HSM-shaped routing fields + // (NamespaceId/WorkflowId) so that legacy HSM-only callback routers + // can still route the completion. The CHASM ComponentRef remains + // authoritative; the HSM fields are routing hints only. + if hasCHASMRef || hasHSMRef { return nil - default: - return serviceerror.NewInvalidArgument("callback token must contain either all HSM fields or a component ref") } + return serviceerror.NewInvalidArgument("callback token must contain either all HSM fields or a component ref") } // DecodeCallbackToken unmarshals the given token applying minimal data verification. diff --git a/common/nexus/callback_token_test.go b/common/nexus/callback_token_test.go index ea4de4f3fd8..1c3bcf31ce2 100644 --- a/common/nexus/callback_token_test.go +++ b/common/nexus/callback_token_test.go @@ -33,36 +33,22 @@ func TestCallbackTokenGenerator_DecodeCompletion(t *testing.T) { }, }, { - name: "mixed with namespace id", + // CHASM tokens may carry HSM-shaped routing hints alongside ComponentRef + // so legacy HSM-only callback routers can still route the completion. + name: "CHASM with HSM routing hints", completion: &tokenspb.NexusOperationCompletion{ NamespaceId: "ns-id", - ComponentRef: []byte("component-ref"), - }, - wantErr: "both HSM and CHASM", - }, - { - name: "mixed with workflow id", - completion: &tokenspb.NexusOperationCompletion{ WorkflowId: "wf-id", - ComponentRef: []byte("component-ref"), - }, - wantErr: "both HSM and CHASM", - }, - { - name: "mixed with run id", - completion: &tokenspb.NexusOperationCompletion{ RunId: "run-id", ComponentRef: []byte("component-ref"), }, - wantErr: "both HSM and CHASM", }, { - name: "mixed with ref", + name: "CHASM with partial HSM routing hints", completion: &tokenspb.NexusOperationCompletion{ - Ref: &persistencespb.StateMachineRef{}, + NamespaceId: "ns-id", ComponentRef: []byte("component-ref"), }, - wantErr: "both HSM and CHASM", }, { name: "empty", diff --git a/tests/nexus_standalone_test.go b/tests/nexus_standalone_test.go index 4376ce5e753..ff5e5ea28f7 100644 --- a/tests/nexus_standalone_test.go +++ b/tests/nexus_standalone_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + "github.com/google/uuid" "github.com/nexus-rpc/sdk-go/nexus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -19,6 +20,10 @@ import ( "go.temporal.io/api/serviceerror" taskqueuepb "go.temporal.io/api/taskqueue/v1" "go.temporal.io/api/workflowservice/v1" + "go.temporal.io/sdk/client" + "go.temporal.io/sdk/temporalnexus" + "go.temporal.io/sdk/worker" + "go.temporal.io/sdk/workflow" persistencespb "go.temporal.io/server/api/persistence/v1" "go.temporal.io/server/chasm/lib/nexusoperation" "go.temporal.io/server/common/dynamicconfig" @@ -2123,6 +2128,52 @@ func (s *NexusStandaloneTestSuite) TestStandaloneNexusOperationPoll() { }) } +// Verifies that CHASM-backed Standalone Nexus is compatible with HSM-backed callbacks. +func (s *NexusStandaloneTestSuite) TestHSMCallbackHandlerCompatability() { + env := s.newTestEnv(testcore.WithDynamicConfig(dynamicconfig.EnableCHASMCallbacks, false)) + ctx := env.Context() + + handlerTaskQueue := testcore.RandomizeStr(s.T().Name()) + endpointName := env.createNexusEndpoint(ctx, s.T(), testcore.RandomizedNexusEndpoint(s.T().Name()), handlerTaskQueue).GetSpec().GetName() + + handlerWF := func(workflow.Context, nexus.NoValue) (string, error) { + return "ok", nil + } + + svc := nexus.NewService("test-service") + nexusOp := temporalnexus.NewWorkflowRunOperation("test-operation", handlerWF, + func(_ context.Context, _ nexus.NoValue, _ nexus.StartOperationOptions) (client.StartWorkflowOptions, error) { + return client.StartWorkflowOptions{ + ID: "handler-wf-" + uuid.NewString(), + TaskQueue: handlerTaskQueue, + }, nil + }) + svc.MustRegister(nexusOp) + + w := worker.New(env.SdkClient(), handlerTaskQueue, worker.Options{}) + w.RegisterWorkflow(handlerWF) + w.RegisterNexusService(svc) + s.NoError(w.Start()) + defer w.Stop() + + startResp, err := s.startNexusOperation(env, &workflowservice.StartNexusOperationExecutionRequest{ + OperationId: "test-op", + Endpoint: endpointName, + }) + s.NoError(err) + + s.EventuallyWithT(func(t *assert.CollectT) { + descResp, err := env.FrontendClient().DescribeNexusOperationExecution(ctx, &workflowservice.DescribeNexusOperationExecutionRequest{ + Namespace: env.Namespace().String(), + OperationId: "test-op", + RunId: startResp.RunId, + IncludeOutcome: true, + }) + require.NoError(t, err) + require.Equal(t, enumspb.NEXUS_OPERATION_EXECUTION_STATUS_COMPLETED, descResp.GetInfo().GetStatus()) + }, 30*time.Second, 200*time.Millisecond) +} + func (s *NexusStandaloneTestSuite) TestAsyncCompletionIgnoresTransitionFieldsInCallbackToken() { env := s.newTestEnv() handlerLink := &commonpb.Link_WorkflowEvent{