-
Notifications
You must be signed in to change notification settings - Fork 1.6k
[Draft] Add support for standalone callbacks #10192
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Draft
chrsmith
wants to merge
5
commits into
main
Choose a base branch
from
chrsmith/standalone-callbacks_ii
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
Changes from all commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
40f2f7e
Implement standalone callbacks
chrsmith b223f4b
Test updates
chrsmith b7a5dfa
Refactor standalone_callbacks_test.go to use testcore.NewEnv
chrsmith 4ab001f
Expanding statemachine_test.go coverage
chrsmith eb31e60
Address race when completing an unstarted Nexus operation
chrsmith File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -4,38 +4,72 @@ import ( | |||||
| "fmt" | ||||||
| "time" | ||||||
|
|
||||||
| callbackpb "go.temporal.io/api/callback/v1" | ||||||
| commonpb "go.temporal.io/api/common/v1" | ||||||
| failurepb "go.temporal.io/api/failure/v1" | ||||||
| "go.temporal.io/api/serviceerror" | ||||||
| "go.temporal.io/server/chasm" | ||||||
| callbackspb "go.temporal.io/server/chasm/lib/callback/gen/callbackpb/v1" | ||||||
| "go.temporal.io/server/common/backoff" | ||||||
| "go.temporal.io/server/common/nexus/nexusrpc" | ||||||
| queueserrors "go.temporal.io/server/service/history/queues/errors" | ||||||
| "google.golang.org/protobuf/proto" | ||||||
| "google.golang.org/protobuf/types/known/durationpb" | ||||||
| "google.golang.org/protobuf/types/known/timestamppb" | ||||||
| ) | ||||||
|
|
||||||
| type CompletionSource interface { | ||||||
| GetNexusCompletion(ctx chasm.Context, requestID string) (nexusrpc.CompleteOperationOptions, error) | ||||||
| } | ||||||
|
|
||||||
| var _ chasm.Component = (*Callback)(nil) | ||||||
| var _ chasm.StateMachine[callbackspb.CallbackStatus] = (*Callback)(nil) | ||||||
| // CompletionSourceFn allows a function value to be used as a CompletionSource instance. | ||||||
| type CompletionSourceFn func(chasm.Context, string) (nexusrpc.CompleteOperationOptions, error) | ||||||
|
|
||||||
| func (csFunc CompletionSourceFn) GetNexusCompletion(ctx chasm.Context, requestID string) (nexusrpc.CompleteOperationOptions, error) { | ||||||
| return csFunc(ctx, requestID) | ||||||
| } | ||||||
|
|
||||||
| var ( | ||||||
| _ chasm.Component = (*Callback)(nil) | ||||||
| _ chasm.StateMachine[callbackspb.CallbackStatus] = (*Callback)(nil) | ||||||
|
|
||||||
| // Capabilities only supported/used for standalone callbacks. | ||||||
| _ chasm.RootComponent = (*Callback)(nil) | ||||||
| _ chasm.VisibilityMemoProvider = (*Callback)(nil) | ||||||
| _ chasm.VisibilitySearchAttributesProvider = (*Callback)(nil) | ||||||
| ) | ||||||
|
|
||||||
| var executionStatusSearchAttribute = chasm.NewSearchAttributeKeyword( | ||||||
| "ExecutionStatus", | ||||||
| chasm.SearchAttributeFieldLowCardinalityKeyword01, | ||||||
| ) | ||||||
|
|
||||||
| // Callback represents a callback component in CHASM. | ||||||
| type Callback struct { | ||||||
| chasm.UnimplementedComponent | ||||||
|
|
||||||
| // Persisted internal state | ||||||
| *callbackspb.CallbackState | ||||||
| // Failure from an external termination (timeout or terminate), stored separately because | ||||||
| // of its potential size, and to not overload CallbackState::LastAttemptFailure. | ||||||
| TerminalFailure chasm.Field[*failurepb.Failure] | ||||||
|
|
||||||
| // For most callbacks, the completion result is obtained from the parent component. | ||||||
| // e.g. the Workflow result to be delivered. However, for "standalone" callbacks, there | ||||||
| // is no parent and the user-supplied SuppliedCompletion will be used instead. | ||||||
| ParentCompletionSource chasm.ParentPtr[CompletionSource] | ||||||
| SuppliedCompletion chasm.Field[*callbackpb.CallbackExecutionCompletion] | ||||||
|
|
||||||
| // Interface to retrieve Nexus operation completion data | ||||||
| CompletionSource chasm.ParentPtr[CompletionSource] | ||||||
| // Visibility sub-component for search attributes and memo indexing. | ||||||
| Visibility chasm.Field[*chasm.Visibility] | ||||||
| } | ||||||
|
|
||||||
| func NewCallback( | ||||||
| // NewEmbeddedCallback returns a Callback component, which will deliver the completion from | ||||||
| // its parent CHASM component. The parent must implement CompletionSource. | ||||||
| func NewEmbeddedCallback( | ||||||
| ctx chasm.MutableContext, | ||||||
| requestID string, | ||||||
| registrationTime *timestamppb.Timestamp, | ||||||
| state *callbackspb.CallbackState, | ||||||
| cb *callbackspb.Callback, | ||||||
| ) *Callback { | ||||||
| return &Callback{ | ||||||
|
|
@@ -45,14 +79,49 @@ func NewCallback( | |||||
| Callback: cb, | ||||||
| Status: callbackspb.CALLBACK_STATUS_STANDBY, | ||||||
| }, | ||||||
| TerminalFailure: chasm.NewDataField[*failurepb.Failure](ctx, nil), | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| type newStandaloneCallbackOpts struct { | ||||||
| RequestID string | ||||||
| RegistrationTime *timestamppb.Timestamp | ||||||
| Callback *callbackspb.Callback | ||||||
|
|
||||||
| CallbackID string | ||||||
| CompletionScheduleToCloseTimeout *durationpb.Duration | ||||||
| Completion *callbackpb.CallbackExecutionCompletion | ||||||
| SearchAttributes map[string]*commonpb.Payload | ||||||
| } | ||||||
|
|
||||||
| // newStandaloneCallback returns a new Callback component which will deliver the supplied | ||||||
| // completion result. | ||||||
| func newStandaloneCallback( | ||||||
| ctx chasm.MutableContext, | ||||||
| opts newStandaloneCallbackOpts, | ||||||
| ) *Callback { | ||||||
| cb := NewEmbeddedCallback(ctx, opts.RequestID, opts.RegistrationTime, opts.Callback) | ||||||
|
|
||||||
| // Add standalone-specific fields. | ||||||
| cb.CallbackId = opts.CallbackID | ||||||
| cb.CompletionScheduleToCloseTimeout = opts.CompletionScheduleToCloseTimeout | ||||||
| cb.SuppliedCompletion = chasm.NewDataField(ctx, opts.Completion) | ||||||
|
|
||||||
| visibility := chasm.NewVisibilityWithData(ctx, opts.SearchAttributes, nil) | ||||||
| cb.Visibility = chasm.NewComponentField(ctx, visibility) | ||||||
|
|
||||||
| return cb | ||||||
| } | ||||||
|
|
||||||
| func (c *Callback) LifecycleState(_ chasm.Context) chasm.LifecycleState { | ||||||
| switch c.Status { | ||||||
| case callbackspb.CALLBACK_STATUS_SUCCEEDED: | ||||||
| return chasm.LifecycleStateCompleted | ||||||
| case callbackspb.CALLBACK_STATUS_FAILED: | ||||||
| case callbackspb.CALLBACK_STATUS_FAILED, | ||||||
| callbackspb.CALLBACK_STATUS_TERMINATED: | ||||||
| // TODO: Use chasm.LifecycleStateTerminated when it's available (currently commented out | ||||||
| // in chasm/component.go:70). For now, LifecycleStateFailed is functionally correct | ||||||
| // as IsClosed() returns true for all states >= LifecycleStateCompleted. | ||||||
| return chasm.LifecycleStateFailed | ||||||
| default: | ||||||
| return chasm.LifecycleStateRunning | ||||||
|
|
@@ -67,6 +136,62 @@ func (c *Callback) SetStateMachineState(status callbackspb.CallbackStatus) { | |||||
| c.Status = status | ||||||
| } | ||||||
|
|
||||||
| func (c *Callback) ContextMetadata(_ chasm.Context) map[string]string { | ||||||
| return map[string]string{ | ||||||
| "RequestID": c.RequestId, | ||||||
| // Only set for standalone callbacks. | ||||||
| "CallbackID": c.CallbackId, | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| // SearchAttributes implements chasm.VisibilitySearchAttributesProvider. | ||||||
| func (c *Callback) SearchAttributes(ctx chasm.Context) []chasm.SearchAttributeKeyValue { | ||||||
| apiStatus := callbackStatusToAPIExecutionStatus(c.Status) | ||||||
| return []chasm.SearchAttributeKeyValue{ | ||||||
| executionStatusSearchAttribute.Value(apiStatus.String()), | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| // Memo implements chasm.VisibilityMemoProvider. Returns the CallbackExecutionListInfo | ||||||
| // as the memo for visibility queries. | ||||||
| func (c *Callback) Memo(ctx chasm.Context) proto.Message { | ||||||
| return &callbackpb.CallbackExecutionListInfo{ | ||||||
| CallbackId: c.CallbackId, | ||||||
| Status: callbackStatusToAPIExecutionStatus(c.Status), | ||||||
| CreateTime: c.RegistrationTime, | ||||||
| CloseTime: c.CloseTime, | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| // Terminate forcefully terminates the callback execution. | ||||||
| // | ||||||
| // If already terminated with the same request ID, this is a no-op. | ||||||
| // If already terminated with a different request ID, returns FailedPrecondition. | ||||||
| func (c *Callback) Terminate( | ||||||
| ctx chasm.MutableContext, | ||||||
| req chasm.TerminateComponentRequest, | ||||||
| ) (chasm.TerminateComponentResponse, error) { | ||||||
| if c.LifecycleState(ctx).IsClosed() { | ||||||
| if c.TerminateRequestId == "" { | ||||||
| // Completed organically (succeeded/failed/timed out), not via Terminate. | ||||||
| err := serviceerror.NewFailedPreconditionf("callback execution already in terminal state %v", c.Status) | ||||||
| return chasm.TerminateComponentResponse{}, err | ||||||
| } | ||||||
| if c.TerminateRequestId != req.RequestID { | ||||||
| err := serviceerror.NewFailedPreconditionf("already terminated with request ID %s", c.TerminateRequestId) | ||||||
| return chasm.TerminateComponentResponse{}, err | ||||||
| } | ||||||
| return chasm.TerminateComponentResponse{}, nil | ||||||
| } | ||||||
| if err := TransitionTerminated.Apply(c, ctx, EventTerminated{Reason: req.Reason}); err != nil { | ||||||
| return chasm.TerminateComponentResponse{}, fmt.Errorf("failed to terminate callback: %w", err) | ||||||
| } | ||||||
|
|
||||||
| c.TerminateRequestId = req.RequestID | ||||||
| // c.TerminalFailure is set in the transition handler. | ||||||
| return chasm.TerminateComponentResponse{}, nil | ||||||
| } | ||||||
|
|
||||||
| func (c *Callback) recordAttempt(ts time.Time) { | ||||||
| c.Attempt++ | ||||||
| c.LastAttemptCompleteTime = timestamppb.New(ts) | ||||||
|
|
@@ -77,9 +202,9 @@ func (c *Callback) loadInvocationArgs( | |||||
| ctx chasm.Context, | ||||||
| _ chasm.NoValue, | ||||||
| ) (invocable, error) { | ||||||
| target := c.CompletionSource.Get(ctx) | ||||||
|
|
||||||
| completion, err := target.GetNexusCompletion(ctx, c.RequestId) | ||||||
| // Get the completion result to be delivered. | ||||||
| completionSource := c.CompletionSource(ctx) | ||||||
| completion, err := completionSource.GetNexusCompletion(ctx, c.RequestId) | ||||||
| if err != nil { | ||||||
| return nil, err | ||||||
| } | ||||||
|
|
@@ -117,6 +242,16 @@ func (c *Callback) saveResult( | |||||
| ctx chasm.MutableContext, | ||||||
| input saveResultInput, | ||||||
| ) (chasm.NoValue, error) { | ||||||
| // If the callback was terminated while the invocation was in-flight, | ||||||
| // the result is no longer relevant. We'll just drop it silently. | ||||||
| // | ||||||
| // This shouldn't happen outside of tests, since the Nexus machinary | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit
Suggested change
|
||||||
| // would prevent an invalid transition anyways. (e.g. terminating | ||||||
| // an already terminated Callback.) | ||||||
| if c.LifecycleState(ctx).IsClosed() { | ||||||
| return nil, nil | ||||||
| } | ||||||
|
|
||||||
| switch r := input.result.(type) { | ||||||
| case invocationResultOK: | ||||||
| err := TransitionSucceeded.Apply(c, ctx, EventSucceeded{Time: ctx.Now(c)}) | ||||||
|
|
||||||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit