Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/docker/dynamic-config-custom.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,7 @@ frontend.WorkerHeartbeatsEnabled:
- value: true
frontend.ListWorkersEnabled:
- value: true
# Will no longer be necessary when CHASM schedule creation is enabled by default.
frontend.allowedExperiments:
- value:
- "chasm-scheduler"
1 change: 1 addition & 0 deletions internal/cmd/build/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ func (b *builder) integrationTest() error {
"--dynamic-config-value", `component.nexusoperations.useSystemCallbackURL=false`,
"--dynamic-config-value", `component.nexusoperations.callback.endpoint.template="http://localhost:7243/namespaces/{{.NamespaceName}}/nexus/callback"`,
"--dynamic-config-value", "frontend.ListWorkersEnabled=true",
"--dynamic-config-value", `frontend.allowedExperiments=["chasm-scheduler"]`, // Will no longer be necessary when CHASM schedule creation is enabled by default
},
})
if err != nil {
Expand Down
17 changes: 17 additions & 0 deletions internal/internal_schedule_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,22 @@ func (scheduleHandle *scheduleHandleImpl) Update(ctx context.Context, options Sc
}
}

var newMemo *commonpb.Memo
if newSchedule.Memo != nil {
dataConverter := WithContext(ctx, scheduleHandle.client.dataConverter)
if dataConverter == nil {
dataConverter = converter.GetDefaultDataConverter()
}
newMemo, err = getWorkflowMemo(*newSchedule.Memo, dataConverter, sdkFlagsAllowed[SDKFlagMemoUserDCEncode])
if err != nil {
return err
}
if newMemo == nil {
// An empty but non-nil map should clear the memo.
newMemo = &commonpb.Memo{}
}
}

updateRequest := &workflowservice.UpdateScheduleRequest{
Namespace: scheduleHandle.client.namespace,
ScheduleId: scheduleHandle.ID,
Expand All @@ -305,6 +321,7 @@ func (scheduleHandle *scheduleHandleImpl) Update(ctx context.Context, options Sc
Identity: scheduleHandle.client.identity,
RequestId: uuid.NewString(),
SearchAttributes: newSA,
Memo: newMemo,
}

storeCtx := extstore.WithStorageTarget(ctx, extstore.StorageDriverWorkflowInfo{
Expand Down
191 changes: 190 additions & 1 deletion internal/internal_schedule_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,19 @@ package internal

import (
"context"
iconverter "go.temporal.io/sdk/internal/converter"
"testing"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/suite"
commonpb "go.temporal.io/api/common/v1"
schedulepb "go.temporal.io/api/schedule/v1"
"go.temporal.io/api/serviceerror"
taskqueuepb "go.temporal.io/api/taskqueue/v1"
workflowpb "go.temporal.io/api/workflow/v1"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/api/workflowservicemock/v1"
"go.temporal.io/sdk/converter"
iconverter "go.temporal.io/sdk/internal/converter"
)

const (
Expand Down Expand Up @@ -221,6 +224,192 @@ func (s *scheduleClientTestSuite) TestIteratorError() {
s.NotNil(err)
}

func (s *scheduleClientTestSuite) TestUpdateScheduleWithMemo() {
wf := func(ctx Context) string {
panic("this is just a stub")
}

// Create the schedule first
createResp := &workflowservice.CreateScheduleResponse{}
s.service.EXPECT().CreateSchedule(gomock.Any(), gomock.Any(), gomock.Any()).Return(createResp, nil).Times(1)

handle, err := s.client.ScheduleClient().Create(context.Background(), ScheduleOptions{
ID: scheduleID,
Spec: ScheduleSpec{
CronExpressions: []string{"*"},
},
Action: &ScheduleWorkflowAction{
Workflow: wf,
ID: workflowID,
TaskQueue: taskqueue,
WorkflowExecutionTimeout: timeoutInSeconds,
WorkflowTaskTimeout: timeoutInSeconds,
},
})
s.NoError(err)

// Mock Describe and Update for the update call
describeResp := &workflowservice.DescribeScheduleResponse{
Schedule: &schedulepb.Schedule{
Spec: &schedulepb.ScheduleSpec{},
Action: &schedulepb.ScheduleAction{
Action: &schedulepb.ScheduleAction_StartWorkflow{
StartWorkflow: s.createWorkflowExecutionInfo(),
},
},
Policies: &schedulepb.SchedulePolicies{},
State: &schedulepb.ScheduleState{},
},
Info: &schedulepb.ScheduleInfo{},
ConflictToken: nil,
}
s.service.EXPECT().DescribeSchedule(gomock.Any(), gomock.Any(), gomock.Any()).Return(describeResp, nil).Times(1)

s.service.EXPECT().UpdateSchedule(gomock.Any(), gomock.Any(), gomock.Any()).
DoAndReturn(func(_ context.Context, req *workflowservice.UpdateScheduleRequest, _ ...interface{}) (*workflowservice.UpdateScheduleResponse, error) {
s.NotNil(req.Memo)
s.Len(req.Memo.Fields, 1)
s.Contains(req.Memo.Fields, "key1")
return &workflowservice.UpdateScheduleResponse{}, nil
}).Times(1)

memo := map[string]interface{}{
"key1": "value1",
}
err = handle.Update(context.Background(), ScheduleUpdateOptions{
DoUpdate: func(input ScheduleUpdateInput) (*ScheduleUpdate, error) {
return &ScheduleUpdate{
Schedule: &input.Description.Schedule,
Memo: &memo,
}, nil
},
})
s.NoError(err)
}

func (s *scheduleClientTestSuite) TestUpdateScheduleWithNilMemoDoesNotSetMemo() {
wf := func(ctx Context) string {
panic("this is just a stub")
}

createResp := &workflowservice.CreateScheduleResponse{}
s.service.EXPECT().CreateSchedule(gomock.Any(), gomock.Any(), gomock.Any()).Return(createResp, nil).Times(1)

handle, err := s.client.ScheduleClient().Create(context.Background(), ScheduleOptions{
ID: scheduleID,
Spec: ScheduleSpec{
CronExpressions: []string{"*"},
},
Action: &ScheduleWorkflowAction{
Workflow: wf,
ID: workflowID,
TaskQueue: taskqueue,
WorkflowExecutionTimeout: timeoutInSeconds,
WorkflowTaskTimeout: timeoutInSeconds,
},
})
s.NoError(err)

describeResp := &workflowservice.DescribeScheduleResponse{
Schedule: &schedulepb.Schedule{
Spec: &schedulepb.ScheduleSpec{},
Action: &schedulepb.ScheduleAction{
Action: &schedulepb.ScheduleAction_StartWorkflow{
StartWorkflow: s.createWorkflowExecutionInfo(),
},
},
Policies: &schedulepb.SchedulePolicies{},
State: &schedulepb.ScheduleState{},
},
Info: &schedulepb.ScheduleInfo{},
ConflictToken: nil,
}
s.service.EXPECT().DescribeSchedule(gomock.Any(), gomock.Any(), gomock.Any()).Return(describeResp, nil).Times(1)

s.service.EXPECT().UpdateSchedule(gomock.Any(), gomock.Any(), gomock.Any()).
DoAndReturn(func(_ context.Context, req *workflowservice.UpdateScheduleRequest, _ ...interface{}) (*workflowservice.UpdateScheduleResponse, error) {
s.Nil(req.Memo)
return &workflowservice.UpdateScheduleResponse{}, nil
}).Times(1)

err = handle.Update(context.Background(), ScheduleUpdateOptions{
DoUpdate: func(input ScheduleUpdateInput) (*ScheduleUpdate, error) {
return &ScheduleUpdate{
Schedule: &input.Description.Schedule,
// Memo is nil, should not be set on the request
}, nil
},
})
s.NoError(err)
}

func (s *scheduleClientTestSuite) TestUpdateScheduleWithEmptyMemoClears() {
wf := func(ctx Context) string {
panic("this is just a stub")
}

createResp := &workflowservice.CreateScheduleResponse{}
s.service.EXPECT().CreateSchedule(gomock.Any(), gomock.Any(), gomock.Any()).Return(createResp, nil).Times(1)

handle, err := s.client.ScheduleClient().Create(context.Background(), ScheduleOptions{
ID: scheduleID,
Spec: ScheduleSpec{
CronExpressions: []string{"*"},
},
Action: &ScheduleWorkflowAction{
Workflow: wf,
ID: workflowID,
TaskQueue: taskqueue,
WorkflowExecutionTimeout: timeoutInSeconds,
WorkflowTaskTimeout: timeoutInSeconds,
},
})
s.NoError(err)

describeResp := &workflowservice.DescribeScheduleResponse{
Schedule: &schedulepb.Schedule{
Spec: &schedulepb.ScheduleSpec{},
Action: &schedulepb.ScheduleAction{
Action: &schedulepb.ScheduleAction_StartWorkflow{
StartWorkflow: s.createWorkflowExecutionInfo(),
},
},
Policies: &schedulepb.SchedulePolicies{},
State: &schedulepb.ScheduleState{},
},
Info: &schedulepb.ScheduleInfo{},
ConflictToken: nil,
}
s.service.EXPECT().DescribeSchedule(gomock.Any(), gomock.Any(), gomock.Any()).Return(describeResp, nil).Times(1)

s.service.EXPECT().UpdateSchedule(gomock.Any(), gomock.Any(), gomock.Any()).
DoAndReturn(func(_ context.Context, req *workflowservice.UpdateScheduleRequest, _ ...interface{}) (*workflowservice.UpdateScheduleResponse, error) {
// Empty map should produce an empty Memo (not nil), to signal "clear"
s.NotNil(req.Memo)
s.Empty(req.Memo.Fields)
return &workflowservice.UpdateScheduleResponse{}, nil
}).Times(1)

emptyMemo := map[string]interface{}{}
err = handle.Update(context.Background(), ScheduleUpdateOptions{
DoUpdate: func(input ScheduleUpdateInput) (*ScheduleUpdate, error) {
return &ScheduleUpdate{
Schedule: &input.Description.Schedule,
Memo: &emptyMemo,
}, nil
},
})
s.NoError(err)
}

func (s *scheduleClientTestSuite) createWorkflowExecutionInfo() *workflowpb.NewWorkflowExecutionInfo {
return &workflowpb.NewWorkflowExecutionInfo{
WorkflowId: workflowID,
WorkflowType: &commonpb.WorkflowType{Name: "test-workflow"},
TaskQueue: &taskqueuepb.TaskQueue{Name: taskqueue},
}
}

func (s *scheduleClientTestSuite) TestCreateScheduleWorkflowMemoDataConverter() {
testFn := func() {
dc := iconverter.NewTestDataConverter()
Expand Down
8 changes: 8 additions & 0 deletions internal/schedule_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,14 @@ type (
// attributes present: replace any and all pre-existing assigned search attributes with the defined search
// attributes, i.e. upsert
TypedSearchAttributes *SearchAttributes

// Memo - Non-indexed user supplied information to replace on the schedule.
// If set, replaces the entire memo. If nil, leaves the existing memo intact.
// An initialized but empty map will clear the memo.
//
// NOTE: Memo updates are only supported on CHASM-backed schedules.
// Attempting to update memo on a workflow-backed schedule will return an error.
Memo *map[string]interface{}
}

// ScheduleUpdateInput describes the current state of the schedule to be updated.
Expand Down
Loading