Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 21 additions & 10 deletions service/frontend/workflow_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4948,18 +4948,29 @@ func (wh *WorkflowHandler) DeleteSchedule(ctx context.Context, request *workflow
return nil, errSchedulesNotAllowed
}

// Prefer CHASM scheduler if enabled.
if wh.chasmSchedulerEnabled(ctx, request.Namespace) {
res, err := wh.deleteScheduleCHASM(ctx, request)
if err == nil {
return res, nil
}
if !isSchedulerErrorLegacyRoutable(err) {
return nil, err
}
// Always attempt deletion in both stacks. A schedule may exist in either or
// both during dual-stack migration (and a V1 sentinel may linger after a
// CHASM-only create). Surface an error only when neither stack succeeded.
chasmEnabled := wh.chasmSchedulerEnabled(ctx, request.Namespace)

var chasmErr error
if chasmEnabled {
_, chasmErr = wh.deleteScheduleCHASM(ctx, request)
}
_, v1Err := wh.deleteScheduleWorkflow(ctx, request)

return wh.deleteScheduleWorkflow(ctx, request)
// At least one side actually deleted → success.
if (chasmEnabled && chasmErr == nil) || v1Err == nil {
return &workflowservice.DeleteScheduleResponse{}, nil
}

// Neither side deleted. Surface a real (non-routable) CHASM failure first;
// otherwise return the V1 error (which is either a real failure or the
// canonical NotFound when neither stack had the schedule).
if chasmEnabled && chasmErr != nil && !isSchedulerErrorLegacyRoutable(chasmErr) {
return nil, chasmErr
}
return nil, v1Err
}

func (wh *WorkflowHandler) deleteScheduleCHASM(ctx context.Context, request *workflowservice.DeleteScheduleRequest) (*workflowservice.DeleteScheduleResponse, error) {
Expand Down
153 changes: 153 additions & 0 deletions tests/schedule_migration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1306,3 +1306,156 @@ func TestScheduleMigrationV1ToV2NoDuplicateRecentActions(t *testing.T) {
})
require.NoError(t, err)
}

// TestDeleteScheduleClearsBothStacks verifies that when a schedule exists in
// both the CHASM (V2) and workflow-backed (V1) stacks for the same scheduleId
// — as can happen during dual-stack migration — a single frontend
// DeleteSchedule call removes it from both stacks.
func (s *ScheduleMigrationTestSuite) TestDeleteScheduleClearsBothStacks() {
env := testcore.NewEnv(
s.T(),
testcore.WithDynamicConfig(dynamicconfig.EnableChasm, true),
testcore.WithDynamicConfig(dynamicconfig.EnableCHASMSchedulerRouting, true),
)

ctx := testcore.NewContext()
sid := testcore.RandomizeStr("sched-delete-both-stacks")
wid := testcore.RandomizeStr("sched-delete-both-stacks-wf")
wt := testcore.RandomizeStr("sched-delete-both-stacks-wt")
tq := testcore.RandomizeStr("tq")

nsName := env.Namespace().String()
nsID := env.NamespaceID().String()
sched := &schedulepb.Schedule{
Spec: &schedulepb.ScheduleSpec{
Interval: []*schedulepb.IntervalSpec{
{Interval: durationpb.New(1 * time.Hour)},
},
},
Action: &schedulepb.ScheduleAction{
Action: &schedulepb.ScheduleAction_StartWorkflow{
StartWorkflow: &workflowpb.NewWorkflowExecutionInfo{
WorkflowId: wid,
WorkflowType: &commonpb.WorkflowType{Name: wt},
TaskQueue: &taskqueuepb.TaskQueue{Name: tq, Kind: enumspb.TASK_QUEUE_KIND_NORMAL},
},
},
},
}

// Create the CHASM schedule directly.
_, err := env.GetTestCluster().SchedulerClient().CreateSchedule(
ctx,
&schedulerpb.CreateScheduleRequest{
NamespaceId: nsID,
FrontendRequest: &workflowservice.CreateScheduleRequest{
Namespace: nsName,
ScheduleId: sid,
Schedule: sched,
Identity: "test",
RequestId: testcore.RandomizeStr("request-id"),
},
},
)
s.NoError(err)

// Create the V1 (workflow-backed) scheduler directly with the same ID.
startArgs := &schedulespb.StartScheduleArgs{
Schedule: sched,
State: &schedulespb.InternalState{
Namespace: nsName,
NamespaceId: nsID,
ScheduleId: sid,
ConflictToken: scheduler.InitialConflictToken,
},
}
inputPayloads, err := sdk.PreferProtoDataConverter.ToPayloads(startArgs)
s.NoError(err)
v1WorkflowID := scheduler.WorkflowIDPrefix + sid
startReq := &workflowservice.StartWorkflowExecutionRequest{
Namespace: nsName,
WorkflowId: v1WorkflowID,
WorkflowType: &commonpb.WorkflowType{Name: scheduler.WorkflowType},
TaskQueue: &taskqueuepb.TaskQueue{Name: primitives.PerNSWorkerTaskQueue},
Input: inputPayloads,
Identity: "test",
RequestId: testcore.RandomizeStr("request-id"),
WorkflowIdReusePolicy: enumspb.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
WorkflowIdConflictPolicy: enumspb.WORKFLOW_ID_CONFLICT_POLICY_FAIL,
}
_, err = env.GetTestCluster().HistoryClient().StartWorkflowExecution(
ctx,
common.CreateHistoryStartWorkflowRequest(nsID, startReq, nil, nil, time.Now().UTC()),
)
s.NoError(err)

// Sanity-check: both stacks have an entry for this scheduleId.
_, err = env.GetTestCluster().SchedulerClient().DescribeSchedule(
ctx,
&schedulerpb.DescribeScheduleRequest{
NamespaceId: nsID,
FrontendRequest: &workflowservice.DescribeScheduleRequest{Namespace: nsName, ScheduleId: sid},
},
)
s.NoError(err)
v1Desc, err := env.GetTestCluster().HistoryClient().DescribeWorkflowExecution(
ctx,
&historyservice.DescribeWorkflowExecutionRequest{
NamespaceId: nsID,
Request: &workflowservice.DescribeWorkflowExecutionRequest{
Namespace: nsName,
Execution: &commonpb.WorkflowExecution{WorkflowId: v1WorkflowID},
},
},
)
s.NoError(err)
s.Equal(enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING, v1Desc.GetWorkflowExecutionInfo().GetStatus())

// Single frontend DeleteSchedule call should clear both stacks.
_, err = env.FrontendClient().DeleteSchedule(ctx, &workflowservice.DeleteScheduleRequest{
Namespace: nsName,
ScheduleId: sid,
Identity: "test",
})
s.NoError(err)

// CHASM side: the scheduler is marked closed; direct describe rejects with
// FailedPrecondition (ErrClosed).
_, err = env.GetTestCluster().SchedulerClient().DescribeSchedule(
ctx,
&schedulerpb.DescribeScheduleRequest{
NamespaceId: nsID,
FrontendRequest: &workflowservice.DescribeScheduleRequest{Namespace: nsName, ScheduleId: sid},
},
)
var failedPreconditionErr *serviceerror.FailedPrecondition
s.ErrorAs(err, &failedPreconditionErr)

// V1 side: the workflow is terminated.
s.Eventually(func() bool {
desc, descErr := env.GetTestCluster().HistoryClient().DescribeWorkflowExecution(
ctx,
&historyservice.DescribeWorkflowExecutionRequest{
NamespaceId: nsID,
Request: &workflowservice.DescribeWorkflowExecutionRequest{
Namespace: nsName,
Execution: &commonpb.WorkflowExecution{WorkflowId: v1WorkflowID},
},
},
)
if descErr != nil {
return false
}
return desc.GetWorkflowExecutionInfo().GetStatus() == enumspb.WORKFLOW_EXECUTION_STATUS_TERMINATED
}, 10*time.Second, 200*time.Millisecond, "V1 schedule workflow should be terminated")

// Frontend describe should also report the schedule as gone.
var notFoundErr *serviceerror.NotFound
s.Eventually(func() bool {
_, descErr := env.FrontendClient().DescribeSchedule(ctx, &workflowservice.DescribeScheduleRequest{
Namespace: nsName,
ScheduleId: sid,
})
return errors.As(descErr, &notFoundErr)
}, 10*time.Second, 200*time.Millisecond, "frontend DescribeSchedule should return NotFound")
}
Loading