Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions chasm/lib/activity/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,9 +335,9 @@ func (a *Activity) addCompletionCallbacks(
return serviceerror.NewInvalidArgumentf("unsupported callback variant: %T", variant)
}

// requestID (unique per API call) + idx (position within the request) ensures unique,idempotent callback IDs.
// requestID (unique per API call) + idx (position within the request) ensures unique, idempotent callback IDs.
id := fmt.Sprintf("%s-%d", requestID, idx)
callbackObj := callback.NewCallback(requestID, registrationTime, &callbackspb.CallbackState{}, chasmCB)
callbackObj := callback.NewEmbeddedCallback(ctx, requestID, registrationTime, chasmCB)
a.Callbacks[id] = chasm.NewComponentField(ctx, callbackObj)
}
return nil
Expand Down
155 changes: 145 additions & 10 deletions chasm/lib/callback/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,38 +4,72 @@ import (
"fmt"
"time"

callbackpb "go.temporal.io/api/callback/v1"
commonpb "go.temporal.io/api/common/v1"
failurepb "go.temporal.io/api/failure/v1"
"go.temporal.io/api/serviceerror"
"go.temporal.io/server/chasm"
callbackspb "go.temporal.io/server/chasm/lib/callback/gen/callbackpb/v1"
"go.temporal.io/server/common/backoff"
"go.temporal.io/server/common/nexus/nexusrpc"
queueserrors "go.temporal.io/server/service/history/queues/errors"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"
)

type CompletionSource interface {
GetNexusCompletion(ctx chasm.Context, requestID string) (nexusrpc.CompleteOperationOptions, error)
}

var _ chasm.Component = (*Callback)(nil)
var _ chasm.StateMachine[callbackspb.CallbackStatus] = (*Callback)(nil)
// CompletionSourceFn allows a function value to be used as a CompletionSource instance.
type CompletionSourceFn func(chasm.Context, string) (nexusrpc.CompleteOperationOptions, error)

func (csFunc CompletionSourceFn) GetNexusCompletion(ctx chasm.Context, requestID string) (nexusrpc.CompleteOperationOptions, error) {
return csFunc(ctx, requestID)
}

var (
_ chasm.Component = (*Callback)(nil)
_ chasm.StateMachine[callbackspb.CallbackStatus] = (*Callback)(nil)

// Capabilities only supported/used for standalone callbacks.
_ chasm.RootComponent = (*Callback)(nil)
_ chasm.VisibilityMemoProvider = (*Callback)(nil)
_ chasm.VisibilitySearchAttributesProvider = (*Callback)(nil)
)

var executionStatusSearchAttribute = chasm.NewSearchAttributeKeyword(
"ExecutionStatus",
chasm.SearchAttributeFieldLowCardinalityKeyword01,
)

// Callback represents a callback component in CHASM.
type Callback struct {
chasm.UnimplementedComponent

// Persisted internal state
*callbackspb.CallbackState
// Failure from an external termination (timeout or terminate), stored separately because
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit

Suggested change
// Failure from an external termination (timeout or terminate), stored separately because
// Failure from an external termination (timeout or terminate), stored separately because

// of its potential size, and to not overload CallbackState::LastAttemptFailure.
TerminalFailure chasm.Field[*failurepb.Failure]

// For most callbacks, the completion result is obtained from the parent component.
// e.g. the Workflow result to be delivered. However, for "standalone" callbacks, there
// is no parent and the user-supplied SuppliedCompletion will be used instead.
ParentCompletionSource chasm.ParentPtr[CompletionSource]
SuppliedCompletion chasm.Field[*callbackpb.CallbackExecutionCompletion]

// Interface to retrieve Nexus operation completion data
CompletionSource chasm.ParentPtr[CompletionSource]
// Visibility sub-component for search attributes and memo indexing.
Visibility chasm.Field[*chasm.Visibility]
}

func NewCallback(
// NewEmbeddedCallback returns a Callback component, which will deliver the completion from
// its parent CHASM component. The parent must implement CompletionSource.
func NewEmbeddedCallback(
ctx chasm.MutableContext,
requestID string,
registrationTime *timestamppb.Timestamp,
state *callbackspb.CallbackState,
cb *callbackspb.Callback,
) *Callback {
return &Callback{
Expand All @@ -45,14 +79,49 @@ func NewCallback(
Callback: cb,
Status: callbackspb.CALLBACK_STATUS_STANDBY,
},
TerminalFailure: chasm.NewDataField[*failurepb.Failure](ctx, nil),
}
}

type newStandaloneCallbackOpts struct {
RequestID string
RegistrationTime *timestamppb.Timestamp
Callback *callbackspb.Callback

CallbackID string
CompletionScheduleToCloseTimeout *durationpb.Duration
Completion *callbackpb.CallbackExecutionCompletion
SearchAttributes map[string]*commonpb.Payload
}

// newStandaloneCallback returns a new Callback component which will deliver the supplied
// completion result.
func newStandaloneCallback(
ctx chasm.MutableContext,
opts newStandaloneCallbackOpts,
) *Callback {
cb := NewEmbeddedCallback(ctx, opts.RequestID, opts.RegistrationTime, opts.Callback)

// Add standalone-specific fields.
cb.CallbackId = opts.CallbackID
cb.CompletionScheduleToCloseTimeout = opts.CompletionScheduleToCloseTimeout
cb.SuppliedCompletion = chasm.NewDataField(ctx, opts.Completion)

visibility := chasm.NewVisibilityWithData(ctx, opts.SearchAttributes, nil)
cb.Visibility = chasm.NewComponentField(ctx, visibility)

return cb
}

func (c *Callback) LifecycleState(_ chasm.Context) chasm.LifecycleState {
switch c.Status {
case callbackspb.CALLBACK_STATUS_SUCCEEDED:
return chasm.LifecycleStateCompleted
case callbackspb.CALLBACK_STATUS_FAILED:
case callbackspb.CALLBACK_STATUS_FAILED,
callbackspb.CALLBACK_STATUS_TERMINATED:
// TODO: Use chasm.LifecycleStateTerminated when it's available (currently commented out
// in chasm/component.go:70). For now, LifecycleStateFailed is functionally correct
// as IsClosed() returns true for all states >= LifecycleStateCompleted.
return chasm.LifecycleStateFailed
default:
return chasm.LifecycleStateRunning
Expand All @@ -67,6 +136,62 @@ func (c *Callback) SetStateMachineState(status callbackspb.CallbackStatus) {
c.Status = status
}

func (c *Callback) ContextMetadata(_ chasm.Context) map[string]string {
return map[string]string{
"RequestID": c.RequestId,
// Only set for standalone callbacks.
"CallbackID": c.CallbackId,
}
}

// SearchAttributes implements chasm.VisibilitySearchAttributesProvider.
func (c *Callback) SearchAttributes(ctx chasm.Context) []chasm.SearchAttributeKeyValue {
apiStatus := callbackStatusToAPIExecutionStatus(c.Status)
return []chasm.SearchAttributeKeyValue{
executionStatusSearchAttribute.Value(apiStatus.String()),
}
}

// Memo implements chasm.VisibilityMemoProvider. Returns the CallbackExecutionListInfo
// as the memo for visibility queries.
func (c *Callback) Memo(ctx chasm.Context) proto.Message {
return &callbackpb.CallbackExecutionListInfo{
CallbackId: c.CallbackId,
Status: callbackStatusToAPIExecutionStatus(c.Status),
CreateTime: c.RegistrationTime,
CloseTime: c.CloseTime,
}
}

// Terminate forcefully terminates the callback execution.
//
// If already terminated with the same request ID, this is a no-op.
// If already terminated with a different request ID, returns FailedPrecondition.
func (c *Callback) Terminate(
ctx chasm.MutableContext,
req chasm.TerminateComponentRequest,
) (chasm.TerminateComponentResponse, error) {
if c.LifecycleState(ctx).IsClosed() {
if c.TerminateRequestId == "" {
// Completed organically (succeeded/failed/timed out), not via Terminate.
err := serviceerror.NewFailedPreconditionf("callback execution already in terminal state %v", c.Status)
return chasm.TerminateComponentResponse{}, err
}
if c.TerminateRequestId != req.RequestID {
err := serviceerror.NewFailedPreconditionf("already terminated with request ID %s", c.TerminateRequestId)
return chasm.TerminateComponentResponse{}, err
}
return chasm.TerminateComponentResponse{}, nil
}
if err := TransitionTerminated.Apply(c, ctx, EventTerminated{Reason: req.Reason}); err != nil {
return chasm.TerminateComponentResponse{}, fmt.Errorf("failed to terminate callback: %w", err)
}

c.TerminateRequestId = req.RequestID
// c.TerminalFailure is set in the transition handler.
return chasm.TerminateComponentResponse{}, nil
}

func (c *Callback) recordAttempt(ts time.Time) {
c.Attempt++
c.LastAttemptCompleteTime = timestamppb.New(ts)
Expand All @@ -77,9 +202,9 @@ func (c *Callback) loadInvocationArgs(
ctx chasm.Context,
_ chasm.NoValue,
) (invocable, error) {
target := c.CompletionSource.Get(ctx)

completion, err := target.GetNexusCompletion(ctx, c.RequestId)
// Get the completion result to be delivered.
completionSource := c.CompletionSource(ctx)
completion, err := completionSource.GetNexusCompletion(ctx, c.RequestId)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -117,6 +242,16 @@ func (c *Callback) saveResult(
ctx chasm.MutableContext,
input saveResultInput,
) (chasm.NoValue, error) {
// If the callback was terminated while the invocation was in-flight,
// the result is no longer relevant. We'll just drop it silently.
//
// This shouldn't happen outside of tests, since the Nexus machinary
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit

Suggested change
// This shouldn't happen outside of tests, since the Nexus machinary
// This shouldn't happen outside of tests, since the Nexus machinery

// would prevent an invalid transition anyways. (e.g. terminating
// an already terminated Callback.)
if c.LifecycleState(ctx).IsClosed() {
return nil, nil
}

switch r := input.result.(type) {
case invocationResultOK:
err := TransitionSucceeded.Apply(c, ctx, EventSucceeded{Time: ctx.Now(c)})
Expand Down
Loading
Loading