diff --git a/.github/workflows/docker/dynamic-config-custom.yaml b/.github/workflows/docker/dynamic-config-custom.yaml index acaddf3c5..32b099acb 100644 --- a/.github/workflows/docker/dynamic-config-custom.yaml +++ b/.github/workflows/docker/dynamic-config-custom.yaml @@ -54,3 +54,7 @@ frontend.WorkerHeartbeatsEnabled: - value: true frontend.ListWorkersEnabled: - value: true +# Will no longer be necessary when CHASM schedule creation is enabled by default. +frontend.allowedExperiments: + - value: + - "chasm-scheduler" diff --git a/internal/cmd/build/main.go b/internal/cmd/build/main.go index 46525de79..b0d49aa52 100644 --- a/internal/cmd/build/main.go +++ b/internal/cmd/build/main.go @@ -161,6 +161,7 @@ func (b *builder) integrationTest() error { "--dynamic-config-value", `component.nexusoperations.useSystemCallbackURL=false`, "--dynamic-config-value", `component.nexusoperations.callback.endpoint.template="http://localhost:7243/namespaces/{{.NamespaceName}}/nexus/callback"`, "--dynamic-config-value", "frontend.ListWorkersEnabled=true", + "--dynamic-config-value", `frontend.allowedExperiments=["chasm-scheduler"]`, // Will no longer be necessary when CHASM schedule creation is enabled by default }, }) if err != nil { diff --git a/internal/internal_schedule_client.go b/internal/internal_schedule_client.go index 48efc0263..86b8b3c4b 100644 --- a/internal/internal_schedule_client.go +++ b/internal/internal_schedule_client.go @@ -297,6 +297,22 @@ func (scheduleHandle *scheduleHandleImpl) Update(ctx context.Context, options Sc } } + var newMemo *commonpb.Memo + if newSchedule.Memo != nil { + dataConverter := WithContext(ctx, scheduleHandle.client.dataConverter) + if dataConverter == nil { + dataConverter = converter.GetDefaultDataConverter() + } + newMemo, err = getWorkflowMemo(*newSchedule.Memo, dataConverter, sdkFlagsAllowed[SDKFlagMemoUserDCEncode]) + if err != nil { + return err + } + if newMemo == nil { + // An empty but non-nil map should clear the memo. + newMemo = &commonpb.Memo{} + } + } + updateRequest := &workflowservice.UpdateScheduleRequest{ Namespace: scheduleHandle.client.namespace, ScheduleId: scheduleHandle.ID, @@ -305,6 +321,7 @@ func (scheduleHandle *scheduleHandleImpl) Update(ctx context.Context, options Sc Identity: scheduleHandle.client.identity, RequestId: uuid.NewString(), SearchAttributes: newSA, + Memo: newMemo, } storeCtx := extstore.WithStorageTarget(ctx, extstore.StorageDriverWorkflowInfo{ diff --git a/internal/internal_schedule_client_test.go b/internal/internal_schedule_client_test.go index e65a34744..cdc55fdd4 100644 --- a/internal/internal_schedule_client_test.go +++ b/internal/internal_schedule_client_test.go @@ -2,16 +2,19 @@ package internal import ( "context" - iconverter "go.temporal.io/sdk/internal/converter" "testing" "github.com/golang/mock/gomock" "github.com/stretchr/testify/suite" + commonpb "go.temporal.io/api/common/v1" schedulepb "go.temporal.io/api/schedule/v1" "go.temporal.io/api/serviceerror" + taskqueuepb "go.temporal.io/api/taskqueue/v1" + workflowpb "go.temporal.io/api/workflow/v1" "go.temporal.io/api/workflowservice/v1" "go.temporal.io/api/workflowservicemock/v1" "go.temporal.io/sdk/converter" + iconverter "go.temporal.io/sdk/internal/converter" ) const ( @@ -221,6 +224,192 @@ func (s *scheduleClientTestSuite) TestIteratorError() { s.NotNil(err) } +func (s *scheduleClientTestSuite) TestUpdateScheduleWithMemo() { + wf := func(ctx Context) string { + panic("this is just a stub") + } + + // Create the schedule first + createResp := &workflowservice.CreateScheduleResponse{} + s.service.EXPECT().CreateSchedule(gomock.Any(), gomock.Any(), gomock.Any()).Return(createResp, nil).Times(1) + + handle, err := s.client.ScheduleClient().Create(context.Background(), ScheduleOptions{ + ID: scheduleID, + Spec: ScheduleSpec{ + CronExpressions: []string{"*"}, + }, + Action: &ScheduleWorkflowAction{ + Workflow: wf, + ID: workflowID, + TaskQueue: taskqueue, + WorkflowExecutionTimeout: timeoutInSeconds, + WorkflowTaskTimeout: timeoutInSeconds, + }, + }) + s.NoError(err) + + // Mock Describe and Update for the update call + describeResp := &workflowservice.DescribeScheduleResponse{ + Schedule: &schedulepb.Schedule{ + Spec: &schedulepb.ScheduleSpec{}, + Action: &schedulepb.ScheduleAction{ + Action: &schedulepb.ScheduleAction_StartWorkflow{ + StartWorkflow: s.createWorkflowExecutionInfo(), + }, + }, + Policies: &schedulepb.SchedulePolicies{}, + State: &schedulepb.ScheduleState{}, + }, + Info: &schedulepb.ScheduleInfo{}, + ConflictToken: nil, + } + s.service.EXPECT().DescribeSchedule(gomock.Any(), gomock.Any(), gomock.Any()).Return(describeResp, nil).Times(1) + + s.service.EXPECT().UpdateSchedule(gomock.Any(), gomock.Any(), gomock.Any()). + DoAndReturn(func(_ context.Context, req *workflowservice.UpdateScheduleRequest, _ ...interface{}) (*workflowservice.UpdateScheduleResponse, error) { + s.NotNil(req.Memo) + s.Len(req.Memo.Fields, 1) + s.Contains(req.Memo.Fields, "key1") + return &workflowservice.UpdateScheduleResponse{}, nil + }).Times(1) + + memo := map[string]interface{}{ + "key1": "value1", + } + err = handle.Update(context.Background(), ScheduleUpdateOptions{ + DoUpdate: func(input ScheduleUpdateInput) (*ScheduleUpdate, error) { + return &ScheduleUpdate{ + Schedule: &input.Description.Schedule, + Memo: &memo, + }, nil + }, + }) + s.NoError(err) +} + +func (s *scheduleClientTestSuite) TestUpdateScheduleWithNilMemoDoesNotSetMemo() { + wf := func(ctx Context) string { + panic("this is just a stub") + } + + createResp := &workflowservice.CreateScheduleResponse{} + s.service.EXPECT().CreateSchedule(gomock.Any(), gomock.Any(), gomock.Any()).Return(createResp, nil).Times(1) + + handle, err := s.client.ScheduleClient().Create(context.Background(), ScheduleOptions{ + ID: scheduleID, + Spec: ScheduleSpec{ + CronExpressions: []string{"*"}, + }, + Action: &ScheduleWorkflowAction{ + Workflow: wf, + ID: workflowID, + TaskQueue: taskqueue, + WorkflowExecutionTimeout: timeoutInSeconds, + WorkflowTaskTimeout: timeoutInSeconds, + }, + }) + s.NoError(err) + + describeResp := &workflowservice.DescribeScheduleResponse{ + Schedule: &schedulepb.Schedule{ + Spec: &schedulepb.ScheduleSpec{}, + Action: &schedulepb.ScheduleAction{ + Action: &schedulepb.ScheduleAction_StartWorkflow{ + StartWorkflow: s.createWorkflowExecutionInfo(), + }, + }, + Policies: &schedulepb.SchedulePolicies{}, + State: &schedulepb.ScheduleState{}, + }, + Info: &schedulepb.ScheduleInfo{}, + ConflictToken: nil, + } + s.service.EXPECT().DescribeSchedule(gomock.Any(), gomock.Any(), gomock.Any()).Return(describeResp, nil).Times(1) + + s.service.EXPECT().UpdateSchedule(gomock.Any(), gomock.Any(), gomock.Any()). + DoAndReturn(func(_ context.Context, req *workflowservice.UpdateScheduleRequest, _ ...interface{}) (*workflowservice.UpdateScheduleResponse, error) { + s.Nil(req.Memo) + return &workflowservice.UpdateScheduleResponse{}, nil + }).Times(1) + + err = handle.Update(context.Background(), ScheduleUpdateOptions{ + DoUpdate: func(input ScheduleUpdateInput) (*ScheduleUpdate, error) { + return &ScheduleUpdate{ + Schedule: &input.Description.Schedule, + // Memo is nil, should not be set on the request + }, nil + }, + }) + s.NoError(err) +} + +func (s *scheduleClientTestSuite) TestUpdateScheduleWithEmptyMemoClears() { + wf := func(ctx Context) string { + panic("this is just a stub") + } + + createResp := &workflowservice.CreateScheduleResponse{} + s.service.EXPECT().CreateSchedule(gomock.Any(), gomock.Any(), gomock.Any()).Return(createResp, nil).Times(1) + + handle, err := s.client.ScheduleClient().Create(context.Background(), ScheduleOptions{ + ID: scheduleID, + Spec: ScheduleSpec{ + CronExpressions: []string{"*"}, + }, + Action: &ScheduleWorkflowAction{ + Workflow: wf, + ID: workflowID, + TaskQueue: taskqueue, + WorkflowExecutionTimeout: timeoutInSeconds, + WorkflowTaskTimeout: timeoutInSeconds, + }, + }) + s.NoError(err) + + describeResp := &workflowservice.DescribeScheduleResponse{ + Schedule: &schedulepb.Schedule{ + Spec: &schedulepb.ScheduleSpec{}, + Action: &schedulepb.ScheduleAction{ + Action: &schedulepb.ScheduleAction_StartWorkflow{ + StartWorkflow: s.createWorkflowExecutionInfo(), + }, + }, + Policies: &schedulepb.SchedulePolicies{}, + State: &schedulepb.ScheduleState{}, + }, + Info: &schedulepb.ScheduleInfo{}, + ConflictToken: nil, + } + s.service.EXPECT().DescribeSchedule(gomock.Any(), gomock.Any(), gomock.Any()).Return(describeResp, nil).Times(1) + + s.service.EXPECT().UpdateSchedule(gomock.Any(), gomock.Any(), gomock.Any()). + DoAndReturn(func(_ context.Context, req *workflowservice.UpdateScheduleRequest, _ ...interface{}) (*workflowservice.UpdateScheduleResponse, error) { + // Empty map should produce an empty Memo (not nil), to signal "clear" + s.NotNil(req.Memo) + s.Empty(req.Memo.Fields) + return &workflowservice.UpdateScheduleResponse{}, nil + }).Times(1) + + emptyMemo := map[string]interface{}{} + err = handle.Update(context.Background(), ScheduleUpdateOptions{ + DoUpdate: func(input ScheduleUpdateInput) (*ScheduleUpdate, error) { + return &ScheduleUpdate{ + Schedule: &input.Description.Schedule, + Memo: &emptyMemo, + }, nil + }, + }) + s.NoError(err) +} + +func (s *scheduleClientTestSuite) createWorkflowExecutionInfo() *workflowpb.NewWorkflowExecutionInfo { + return &workflowpb.NewWorkflowExecutionInfo{ + WorkflowId: workflowID, + WorkflowType: &commonpb.WorkflowType{Name: "test-workflow"}, + TaskQueue: &taskqueuepb.TaskQueue{Name: taskqueue}, + } +} + func (s *scheduleClientTestSuite) TestCreateScheduleWorkflowMemoDataConverter() { testFn := func() { dc := iconverter.NewTestDataConverter() diff --git a/internal/schedule_client.go b/internal/schedule_client.go index bb2285e99..65bda62c2 100644 --- a/internal/schedule_client.go +++ b/internal/schedule_client.go @@ -540,6 +540,14 @@ type ( // attributes present: replace any and all pre-existing assigned search attributes with the defined search // attributes, i.e. upsert TypedSearchAttributes *SearchAttributes + + // Memo - Non-indexed user supplied information to replace on the schedule. + // If set, replaces the entire memo. If nil, leaves the existing memo intact. + // An initialized but empty map will clear the memo. + // + // NOTE: Memo updates are only supported on CHASM-backed schedules. + // Attempting to update memo on a workflow-backed schedule will return an error. + Memo *map[string]interface{} } // ScheduleUpdateInput describes the current state of the schedule to be updated. diff --git a/test/integration_test.go b/test/integration_test.go index 67971e1d4..2887182b8 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -34,6 +34,7 @@ import ( "go.temporal.io/sdk/contrib/sysinfo" "go.uber.org/goleak" "google.golang.org/grpc" + "google.golang.org/grpc/metadata" "google.golang.org/protobuf/proto" "go.temporal.io/sdk/contrib/opentelemetry" @@ -6827,6 +6828,125 @@ func (ts *IntegrationTestSuite) TestScheduleUpdateWorkflowActionMemo() { } } +func (ts *IntegrationTestSuite) TestScheduleUpdateMemo() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + // Use the chasm-scheduler experiment header to create a CHASM-backed schedule. + // This will no longer be necessary when CHASM schedule creation is enabled by default. + ctx = metadata.AppendToOutgoingContext(ctx, "temporal-experiment", "chasm-scheduler") + + initialMemo := map[string]interface{}{ + "key1": "value1", + "key2": "value2", + } + handle, err := ts.client.ScheduleClient().Create(ctx, client.ScheduleOptions{ + ID: "test-schedule-update-memo-schedule", + Spec: client.ScheduleSpec{}, + Action: ts.createBasicScheduleWorkflowAction("test-schedule-update-memo-workflow", ts.workflows.SimplestWorkflow), + Paused: true, + Memo: initialMemo, + }) + ts.NoError(err) + defer func() { + ts.NoError(handle.Delete(ctx)) + }() + + // Verify initial memo + description, err := handle.Describe(ctx) + ts.NoError(err) + ts.NotNil(description.Memo) + ts.Len(description.Memo.Fields, 2) + + // Update memo with new values + updatedMemo := map[string]interface{}{ + "key1": "updated_value1", + "key3": "value3", + } + err = handle.Update(ctx, client.ScheduleUpdateOptions{ + DoUpdate: func(input client.ScheduleUpdateInput) (*client.ScheduleUpdate, error) { + return &client.ScheduleUpdate{ + Schedule: &input.Description.Schedule, + Memo: &updatedMemo, + }, nil + }, + }) + ts.NoError(err) + + // Verify updated memo (CHASM updates are synchronous) + description, err = handle.Describe(ctx) + ts.NoError(err) + ts.NotNil(description.Memo) + ts.Len(description.Memo.Fields, 2) + ts.Contains(description.Memo.Fields, "key1") + ts.Contains(description.Memo.Fields, "key3") + ts.NotContains(description.Memo.Fields, "key2") + + // Clear memo with empty map + emptyMemo := map[string]interface{}{} + err = handle.Update(ctx, client.ScheduleUpdateOptions{ + DoUpdate: func(input client.ScheduleUpdateInput) (*client.ScheduleUpdate, error) { + return &client.ScheduleUpdate{ + Schedule: &input.Description.Schedule, + Memo: &emptyMemo, + }, nil + }, + }) + ts.NoError(err) + + description, err = handle.Describe(ctx) + ts.NoError(err) + ts.NotNil(description.Memo) + ts.Empty(description.Memo.Fields) + + // nil Memo should leave existing memo intact + err = handle.Update(ctx, client.ScheduleUpdateOptions{ + DoUpdate: func(input client.ScheduleUpdateInput) (*client.ScheduleUpdate, error) { + return &client.ScheduleUpdate{ + Schedule: &input.Description.Schedule, + }, nil + }, + }) + ts.NoError(err) + + description, err = handle.Describe(ctx) + ts.NoError(err) + ts.NotNil(description.Memo) + ts.Empty(description.Memo.Fields) +} + +// TestScheduleUpdateMemoNotSupportedOnWorkflowBackedSchedule will fail once CHASM schedule +// creation is enabled by default, at which point this test can be removed. +func (ts *IntegrationTestSuite) TestScheduleUpdateMemoNotSupportedOnWorkflowBackedSchedule() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + // Create a non-CHASM (workflow-backed) schedule by not passing the experiment header. + handle, err := ts.client.ScheduleClient().Create(ctx, client.ScheduleOptions{ + ID: "test-schedule-update-memo-rejected-schedule", + Spec: client.ScheduleSpec{}, + Action: ts.createBasicScheduleWorkflowAction("test-schedule-update-memo-rejected-workflow", ts.workflows.SimplestWorkflow), + Paused: true, + }) + ts.NoError(err) + defer func() { + ts.NoError(handle.Delete(ctx)) + }() + + // Attempt to update memo on a workflow-backed schedule + updatedMemo := map[string]interface{}{ + "key1": "value1", + } + err = handle.Update(ctx, client.ScheduleUpdateOptions{ + DoUpdate: func(input client.ScheduleUpdateInput) (*client.ScheduleUpdate, error) { + return &client.ScheduleUpdate{ + Schedule: &input.Description.Schedule, + Memo: &updatedMemo, + }, nil + }, + }) + ts.Error(err) + ts.Contains(err.Error(), "memo updates are not supported on workflow-backed schedules") +} + func (ts *IntegrationTestSuite) TestNoVersioningBehaviorPanics() { seriesName := "deploy-test-" + uuid.NewString()