diff --git a/service/history/handler.go b/service/history/handler.go index 2248f7c6a1..7aee0d04d0 100644 --- a/service/history/handler.go +++ b/service/history/handler.go @@ -61,6 +61,7 @@ import ( "go.temporal.io/server/service/history/consts" "go.temporal.io/server/service/history/events" "go.temporal.io/server/service/history/hsm" + historyi "go.temporal.io/server/service/history/interfaces" "go.temporal.io/server/service/history/replication" "go.temporal.io/server/service/history/shard" "go.temporal.io/server/service/history/tasks" @@ -210,16 +211,9 @@ func (h *Handler) DeepHealthCheck( func (h *Handler) IsWorkflowTaskValid(ctx context.Context, request *historyservice.IsWorkflowTaskValidRequest) (*historyservice.IsWorkflowTaskValidResponse, error) { namespaceID := namespace.ID(request.GetNamespaceId()) - if namespaceID == "" { - return nil, h.convertError(errNamespaceNotSet) - } workflowID := request.Execution.WorkflowId - shardContext, err := h.controller.GetShardByNamespaceWorkflow(namespaceID, workflowID) - if err != nil { - return nil, h.convertError(err) - } - engine, err := shardContext.GetEngine(ctx) + _, engine, err := h.getEngineByNamespaceWorkflow(ctx, namespaceID, workflowID) if err != nil { return nil, h.convertError(err) } @@ -235,16 +229,9 @@ func (h *Handler) IsWorkflowTaskValid(ctx context.Context, request *historyservi func (h *Handler) IsActivityTaskValid(ctx context.Context, request *historyservice.IsActivityTaskValidRequest) (*historyservice.IsActivityTaskValidResponse, error) { namespaceID := namespace.ID(request.GetNamespaceId()) - if namespaceID == "" { - return nil, h.convertError(errNamespaceNotSet) - } workflowID := request.Execution.WorkflowId - shardContext, err := h.controller.GetShardByNamespaceWorkflow(namespaceID, workflowID) - if err != nil { - return nil, h.convertError(err) - } - engine, err := shardContext.GetEngine(ctx) + _, engine, err := h.getEngineByNamespaceWorkflow(ctx, namespaceID, workflowID) if err != nil { return nil, h.convertError(err) } @@ -288,15 +275,7 @@ func (h *Handler) RecordActivityTaskHeartbeat(ctx context.Context, request *hist // Handle worklow activity (mutable state backed implementation). namespaceID := namespace.ID(request.GetNamespaceId()) - if namespaceID == "" { - return nil, h.convertError(errNamespaceNotSet) - } - - shardContext, err := h.controller.GetShardByNamespaceWorkflow(namespaceID, taskToken.GetWorkflowId()) - if err != nil { - return nil, h.convertError(err) - } - engine, err := shardContext.GetEngine(ctx) + _, engine, err := h.getEngineByNamespaceWorkflow(ctx, namespaceID, taskToken.GetWorkflowId()) if err != nil { return nil, h.convertError(err) } @@ -327,15 +306,7 @@ func (h *Handler) RecordActivityTaskStarted(ctx context.Context, request *histor // Handle worklow activity (mutable state backed implementation). namespaceID := namespace.ID(request.GetNamespaceId()) - if namespaceID == "" { - return nil, h.convertError(errNamespaceNotSet) - } - - shardContext, err := h.controller.GetShardByNamespaceWorkflow(namespaceID, request.GetWorkflowExecution().GetWorkflowId()) - if err != nil { - return nil, h.convertError(err) - } - engine, err := shardContext.GetEngine(ctx) + _, engine, err := h.getEngineByNamespaceWorkflow(ctx, namespaceID, request.GetWorkflowExecution().GetWorkflowId()) if err != nil { return nil, h.convertError(err) } @@ -416,15 +387,7 @@ func (h *Handler) RespondActivityTaskCompleted(ctx context.Context, request *his // Handle worklow activity (mutable state backed implementation). namespaceID := namespace.ID(request.GetNamespaceId()) - if namespaceID == "" { - return nil, h.convertError(errNamespaceNotSet) - } - - shardContext, err := h.controller.GetShardByNamespaceWorkflow(namespaceID, taskToken.GetWorkflowId()) - if err != nil { - return nil, h.convertError(err) - } - engine, err := shardContext.GetEngine(ctx) + _, engine, err := h.getEngineByNamespaceWorkflow(ctx, namespaceID, taskToken.GetWorkflowId()) if err != nil { return nil, h.convertError(err) } @@ -467,15 +430,7 @@ func (h *Handler) RespondActivityTaskFailed(ctx context.Context, request *histor // Handle worklow activity (mutable state backed implementation). namespaceID := namespace.ID(request.GetNamespaceId()) - if namespaceID == "" { - return nil, h.convertError(errNamespaceNotSet) - } - - shardContext, err := h.controller.GetShardByNamespaceWorkflow(namespaceID, taskToken.GetWorkflowId()) - if err != nil { - return nil, h.convertError(err) - } - engine, err := shardContext.GetEngine(ctx) + _, engine, err := h.getEngineByNamespaceWorkflow(ctx, namespaceID, taskToken.GetWorkflowId()) if err != nil { return nil, h.convertError(err) } @@ -518,15 +473,7 @@ func (h *Handler) RespondActivityTaskCanceled(ctx context.Context, request *hist // Handle worklow activity (mutable state backed implementation). namespaceID := namespace.ID(request.GetNamespaceId()) - if namespaceID == "" { - return nil, h.convertError(errNamespaceNotSet) - } - - shardContext, err := h.controller.GetShardByNamespaceWorkflow(namespaceID, taskToken.GetWorkflowId()) - if err != nil { - return nil, h.convertError(err) - } - engine, err := shardContext.GetEngine(ctx) + _, engine, err := h.getEngineByNamespaceWorkflow(ctx, namespaceID, taskToken.GetWorkflowId()) if err != nil { return nil, h.convertError(err) } @@ -562,13 +509,8 @@ func (h *Handler) RespondWorkflowTaskCompleted(ctx context.Context, request *his if err != nil { return nil, h.convertError(err) } - workflowID := token.GetWorkflowId() - shardContext, err := h.controller.GetShardByNamespaceWorkflow(namespaceID, workflowID) - if err != nil { - return nil, h.convertError(err) - } - engine, err := shardContext.GetEngine(ctx) + _, engine, err := h.getEngineByNamespaceWorkflow(ctx, namespaceID, token.GetWorkflowId()) if err != nil { return nil, h.convertError(err) } @@ -604,13 +546,8 @@ func (h *Handler) RespondWorkflowTaskFailed(ctx context.Context, request *histor if err != nil { return nil, h.convertError(err) } - workflowID := token.GetWorkflowId() - shardContext, err := h.controller.GetShardByNamespaceWorkflow(namespaceID, workflowID) - if err != nil { - return nil, h.convertError(err) - } - engine, err := shardContext.GetEngine(ctx) + _, engine, err := h.getEngineByNamespaceWorkflow(ctx, namespaceID, token.GetWorkflowId()) if err != nil { return nil, h.convertError(err) } @@ -626,18 +563,9 @@ func (h *Handler) RespondWorkflowTaskFailed(ctx context.Context, request *histor // StartWorkflowExecution - creates a new workflow execution func (h *Handler) StartWorkflowExecution(ctx context.Context, request *historyservice.StartWorkflowExecutionRequest) (*historyservice.StartWorkflowExecutionResponse, error) { namespaceID := namespace.ID(request.GetNamespaceId()) - if namespaceID == "" { - return nil, h.convertError(errNamespaceNotSet) - } - startRequest := request.StartRequest - workflowID := startRequest.GetWorkflowId() - shardContext, err := h.controller.GetShardByNamespaceWorkflow(namespaceID, workflowID) - if err != nil { - return nil, h.convertError(err) - } - engine, err := shardContext.GetEngine(ctx) + shardContext, engine, err := h.getEngineByNamespaceWorkflow(ctx, namespaceID, startRequest.GetWorkflowId()) if err != nil { return nil, h.convertError(err) } @@ -660,16 +588,8 @@ func (h *Handler) ExecuteMultiOperation( request *historyservice.ExecuteMultiOperationRequest, ) (*historyservice.ExecuteMultiOperationResponse, error) { namespaceID := namespace.ID(request.GetNamespaceId()) - if namespaceID == "" { - return nil, h.convertError(errNamespaceNotSet) - } - - shardContext, err := h.controller.GetShardByNamespaceWorkflow(namespaceID, request.WorkflowId) - if err != nil { - return nil, h.convertError(err) - } - engine, err := shardContext.GetEngine(ctx) + shardContext, engine, err := h.getEngineByNamespaceWorkflow(ctx, namespaceID, request.WorkflowId) if err != nil { return nil, h.convertError(err) } @@ -767,17 +687,9 @@ func (h *Handler) GetShard(ctx context.Context, request *historyservice.GetShard // RebuildMutableState attempts to rebuild mutable state according to persisted history events func (h *Handler) RebuildMutableState(ctx context.Context, request *historyservice.RebuildMutableStateRequest) (*historyservice.RebuildMutableStateResponse, error) { namespaceID := namespace.ID(request.GetNamespaceId()) - if namespaceID == "" { - return nil, h.convertError(errNamespaceNotSet) - } - workflowExecution := request.Execution - workflowID := workflowExecution.GetWorkflowId() - shardContext, err := h.controller.GetShardByNamespaceWorkflow(namespaceID, workflowID) - if err != nil { - return nil, h.convertError(err) - } - engine, err := shardContext.GetEngine(ctx) + + _, engine, err := h.getEngineByNamespaceWorkflow(ctx, namespaceID, workflowExecution.GetWorkflowId()) if err != nil { return nil, h.convertError(err) } @@ -808,11 +720,7 @@ func (h *Handler) ImportWorkflowExecution(ctx context.Context, request *historys if runID == "" { return nil, h.convertError(errRunIDNotValid) } - shardContext, err := h.controller.GetShardByNamespaceWorkflow(namespaceID, workflowID) - if err != nil { - return nil, h.convertError(err) - } - engine, err := shardContext.GetEngine(ctx) + shardContext, engine, err := h.getEngineByNamespaceWorkflow(ctx, namespaceID, workflowID) if err != nil { return nil, h.convertError(err) } @@ -830,17 +738,9 @@ func (h *Handler) ImportWorkflowExecution(ctx context.Context, request *historys // DescribeMutableState - returns the internal analysis of workflow execution state func (h *Handler) DescribeMutableState(ctx context.Context, request *historyservice.DescribeMutableStateRequest) (*historyservice.DescribeMutableStateResponse, error) { namespaceID := namespace.ID(request.GetNamespaceId()) - if namespaceID == "" { - return nil, h.convertError(errNamespaceNotSet) - } - workflowExecution := request.Execution - workflowID := workflowExecution.GetWorkflowId() - shardContext, err := h.controller.GetShardByNamespaceWorkflow(namespaceID, workflowID) - if err != nil { - return nil, h.convertError(err) - } - engine, err := shardContext.GetEngine(ctx) + + _, engine, err := h.getEngineByNamespaceWorkflow(ctx, namespaceID, workflowExecution.GetWorkflowId()) if err != nil { return nil, h.convertError(err) } @@ -855,17 +755,9 @@ func (h *Handler) DescribeMutableState(ctx context.Context, request *historyserv // GetMutableState - returns the id of the next event in the execution's history func (h *Handler) GetMutableState(ctx context.Context, request *historyservice.GetMutableStateRequest) (*historyservice.GetMutableStateResponse, error) { namespaceID := namespace.ID(request.GetNamespaceId()) - if namespaceID == "" { - return nil, h.convertError(errNamespaceNotSet) - } - workflowExecution := request.Execution - workflowID := workflowExecution.GetWorkflowId() - shardContext, err := h.controller.GetShardByNamespaceWorkflow(namespaceID, workflowID) - if err != nil { - return nil, h.convertError(err) - } - engine, err := shardContext.GetEngine(ctx) + + _, engine, err := h.getEngineByNamespaceWorkflow(ctx, namespaceID, workflowExecution.GetWorkflowId()) if err != nil { return nil, h.convertError(err) } @@ -880,17 +772,9 @@ func (h *Handler) GetMutableState(ctx context.Context, request *historyservice.G // PollMutableState - returns the id of the next event in the execution's history func (h *Handler) PollMutableState(ctx context.Context, request *historyservice.PollMutableStateRequest) (*historyservice.PollMutableStateResponse, error) { namespaceID := namespace.ID(request.GetNamespaceId()) - if namespaceID == "" { - return nil, h.convertError(errNamespaceNotSet) - } - workflowExecution := request.Execution - workflowID := workflowExecution.GetWorkflowId() - shardContext, err := h.controller.GetShardByNamespaceWorkflow(namespaceID, workflowID) - if err != nil { - return nil, h.convertError(err) - } - engine, err := shardContext.GetEngine(ctx) + + _, engine, err := h.getEngineByNamespaceWorkflow(ctx, namespaceID, workflowExecution.GetWorkflowId()) if err != nil { return nil, h.convertError(err) } @@ -905,17 +789,9 @@ func (h *Handler) PollMutableState(ctx context.Context, request *historyservice. // DescribeWorkflowExecution returns information about the specified workflow execution. func (h *Handler) DescribeWorkflowExecution(ctx context.Context, request *historyservice.DescribeWorkflowExecutionRequest) (*historyservice.DescribeWorkflowExecutionResponse, error) { namespaceID := namespace.ID(request.GetNamespaceId()) - if namespaceID == "" { - return nil, h.convertError(errNamespaceNotSet) - } - workflowExecution := request.Request.Execution - workflowID := workflowExecution.GetWorkflowId() - shardContext, err := h.controller.GetShardByNamespaceWorkflow(namespaceID, workflowID) - if err != nil { - return nil, h.convertError(err) - } - engine, err := shardContext.GetEngine(ctx) + + _, engine, err := h.getEngineByNamespaceWorkflow(ctx, namespaceID, workflowExecution.GetWorkflowId()) if err != nil { return nil, h.convertError(err) } @@ -941,12 +817,7 @@ func (h *Handler) RequestCancelWorkflowExecution(ctx context.Context, request *h tag.WorkflowID(cancelRequest.WorkflowExecution.GetWorkflowId()), tag.WorkflowRunID(cancelRequest.WorkflowExecution.GetRunId())) - workflowID := cancelRequest.WorkflowExecution.GetWorkflowId() - shardContext, err := h.controller.GetShardByNamespaceWorkflow(namespaceID, workflowID) - if err != nil { - return nil, h.convertError(err) - } - engine, err := shardContext.GetEngine(ctx) + _, engine, err := h.getEngineByNamespaceWorkflow(ctx, namespaceID, cancelRequest.WorkflowExecution.GetWorkflowId()) if err != nil { return nil, h.convertError(err) } @@ -963,17 +834,9 @@ func (h *Handler) RequestCancelWorkflowExecution(ctx context.Context, request *h // WorkflowExecutionSignaled event recorded in the history and a workflow task being created for the execution. func (h *Handler) SignalWorkflowExecution(ctx context.Context, request *historyservice.SignalWorkflowExecutionRequest) (*historyservice.SignalWorkflowExecutionResponse, error) { namespaceID := namespace.ID(request.GetNamespaceId()) - if namespaceID == "" { - return nil, h.convertError(errNamespaceNotSet) - } - workflowExecution := request.SignalRequest.WorkflowExecution - workflowID := workflowExecution.GetWorkflowId() - shardContext, err := h.controller.GetShardByNamespaceWorkflow(namespaceID, workflowID) - if err != nil { - return nil, h.convertError(err) - } - engine, err := shardContext.GetEngine(ctx) + + _, engine, err := h.getEngineByNamespaceWorkflow(ctx, namespaceID, workflowExecution.GetWorkflowId()) if err != nil { return nil, h.convertError(err) } @@ -993,17 +856,9 @@ func (h *Handler) SignalWorkflowExecution(ctx context.Context, request *historys // event recorded in history, and a workflow task being created for the execution func (h *Handler) SignalWithStartWorkflowExecution(ctx context.Context, request *historyservice.SignalWithStartWorkflowExecutionRequest) (*historyservice.SignalWithStartWorkflowExecutionResponse, error) { namespaceID := namespace.ID(request.GetNamespaceId()) - if namespaceID == "" { - return nil, h.convertError(errNamespaceNotSet) - } - signalWithStartRequest := request.SignalWithStartRequest - workflowID := signalWithStartRequest.GetWorkflowId() - shardContext, err := h.controller.GetShardByNamespaceWorkflow(namespaceID, workflowID) - if err != nil { - return nil, h.convertError(err) - } - engine, err := shardContext.GetEngine(ctx) + + _, engine, err := h.getEngineByNamespaceWorkflow(ctx, namespaceID, signalWithStartRequest.GetWorkflowId()) if err != nil { return nil, h.convertError(err) } @@ -1043,17 +898,9 @@ func (h *Handler) SignalWithStartWorkflowExecution(ctx context.Context, request // used to clean execution info when signal workflow task finished. func (h *Handler) RemoveSignalMutableState(ctx context.Context, request *historyservice.RemoveSignalMutableStateRequest) (*historyservice.RemoveSignalMutableStateResponse, error) { namespaceID := namespace.ID(request.GetNamespaceId()) - if namespaceID == "" { - return nil, h.convertError(errNamespaceNotSet) - } - workflowExecution := request.WorkflowExecution - workflowID := workflowExecution.GetWorkflowId() - shardContext, err := h.controller.GetShardByNamespaceWorkflow(namespaceID, workflowID) - if err != nil { - return nil, h.convertError(err) - } - engine, err := shardContext.GetEngine(ctx) + + _, engine, err := h.getEngineByNamespaceWorkflow(ctx, namespaceID, workflowExecution.GetWorkflowId()) if err != nil { return nil, h.convertError(err) } @@ -1070,17 +917,9 @@ func (h *Handler) RemoveSignalMutableState(ctx context.Context, request *history // in the history and immediately terminating the execution instance. func (h *Handler) TerminateWorkflowExecution(ctx context.Context, request *historyservice.TerminateWorkflowExecutionRequest) (*historyservice.TerminateWorkflowExecutionResponse, error) { namespaceID := namespace.ID(request.GetNamespaceId()) - if namespaceID == "" { - return nil, h.convertError(errNamespaceNotSet) - } - workflowExecution := request.TerminateRequest.WorkflowExecution - workflowID := workflowExecution.GetWorkflowId() - shardContext, err := h.controller.GetShardByNamespaceWorkflow(namespaceID, workflowID) - if err != nil { - return nil, h.convertError(err) - } - engine, err := shardContext.GetEngine(ctx) + + _, engine, err := h.getEngineByNamespaceWorkflow(ctx, namespaceID, workflowExecution.GetWorkflowId()) if err != nil { return nil, h.convertError(err) } @@ -1095,17 +934,9 @@ func (h *Handler) TerminateWorkflowExecution(ctx context.Context, request *histo func (h *Handler) DeleteWorkflowExecution(ctx context.Context, request *historyservice.DeleteWorkflowExecutionRequest) (*historyservice.DeleteWorkflowExecutionResponse, error) { namespaceID := namespace.ID(request.GetNamespaceId()) - if namespaceID == "" { - return nil, h.convertError(errNamespaceNotSet) - } - workflowExecution := request.WorkflowExecution - workflowID := workflowExecution.GetWorkflowId() - shardContext, err := h.controller.GetShardByNamespaceWorkflow(namespaceID, workflowID) - if err != nil { - return nil, h.convertError(err) - } - engine, err := shardContext.GetEngine(ctx) + + _, engine, err := h.getEngineByNamespaceWorkflow(ctx, namespaceID, workflowExecution.GetWorkflowId()) if err != nil { return nil, h.convertError(err) } @@ -1126,17 +957,9 @@ func (h *Handler) DeleteWorkflowExecution(ctx context.Context, request *historys // in the history and immediately terminating the execution instance. func (h *Handler) ResetWorkflowExecution(ctx context.Context, request *historyservice.ResetWorkflowExecutionRequest) (*historyservice.ResetWorkflowExecutionResponse, error) { namespaceID := namespace.ID(request.GetNamespaceId()) - if namespaceID == "" { - return nil, h.convertError(errNamespaceNotSet) - } - workflowExecution := request.ResetRequest.WorkflowExecution - workflowID := workflowExecution.GetWorkflowId() - shardContext, err := h.controller.GetShardByNamespaceWorkflow(namespaceID, workflowID) - if err != nil { - return nil, h.convertError(err) - } - engine, err := shardContext.GetEngine(ctx) + + _, engine, err := h.getEngineByNamespaceWorkflow(ctx, namespaceID, workflowExecution.GetWorkflowId()) if err != nil { return nil, h.convertError(err) } @@ -1153,17 +976,9 @@ func (h *Handler) ResetWorkflowExecution(ctx context.Context, request *historyse // Can be used to set and unset versioning behavior override. func (h *Handler) UpdateWorkflowExecutionOptions(ctx context.Context, request *historyservice.UpdateWorkflowExecutionOptionsRequest) (*historyservice.UpdateWorkflowExecutionOptionsResponse, error) { namespaceID := namespace.ID(request.GetNamespaceId()) - if namespaceID == "" { - return nil, h.convertError(errNamespaceNotSet) - } - workflowExecution := request.UpdateRequest.WorkflowExecution - workflowID := workflowExecution.GetWorkflowId() - shardContext, err := h.controller.GetShardByNamespaceWorkflow(namespaceID, workflowID) - if err != nil { - return nil, h.convertError(err) - } - engine, err := shardContext.GetEngine(ctx) + + _, engine, err := h.getEngineByNamespaceWorkflow(ctx, namespaceID, workflowExecution.GetWorkflowId()) if err != nil { return nil, h.convertError(err) } @@ -1179,16 +994,8 @@ func (h *Handler) UpdateWorkflowExecutionOptions(ctx context.Context, request *h // QueryWorkflow queries a workflow. func (h *Handler) QueryWorkflow(ctx context.Context, request *historyservice.QueryWorkflowRequest) (*historyservice.QueryWorkflowResponse, error) { namespaceID := namespace.ID(request.GetNamespaceId()) - if namespaceID == "" { - return nil, h.convertError(errNamespaceNotSet) - } - workflowID := request.GetRequest().GetExecution().GetWorkflowId() - shardContext, err := h.controller.GetShardByNamespaceWorkflow(namespaceID, workflowID) - if err != nil { - return nil, h.convertError(err) - } - engine, err := shardContext.GetEngine(ctx) + _, engine, err := h.getEngineByNamespaceWorkflow(ctx, namespaceID, request.GetRequest().GetExecution().GetWorkflowId()) if err != nil { return nil, h.convertError(err) } @@ -1215,13 +1022,7 @@ func (h *Handler) ScheduleWorkflowTask(ctx context.Context, request *historyserv return nil, h.convertError(errWorkflowExecutionNotSet) } - workflowExecution := request.WorkflowExecution - workflowID := workflowExecution.GetWorkflowId() - shardContext, err := h.controller.GetShardByNamespaceWorkflow(namespaceID, workflowID) - if err != nil { - return nil, h.convertError(err) - } - engine, err := shardContext.GetEngine(ctx) + _, engine, err := h.getEngineByNamespaceWorkflow(ctx, namespaceID, request.WorkflowExecution.GetWorkflowId()) if err != nil { return nil, h.convertError(err) } @@ -1247,13 +1048,7 @@ func (h *Handler) VerifyFirstWorkflowTaskScheduled( return nil, h.convertError(errWorkflowExecutionNotSet) } - workflowExecution := request.WorkflowExecution - workflowID := workflowExecution.GetWorkflowId() - shardContext, err := h.controller.GetShardByNamespaceWorkflow(namespaceID, workflowID) - if err != nil { - return nil, h.convertError(err) - } - engine, err := shardContext.GetEngine(ctx) + _, engine, err := h.getEngineByNamespaceWorkflow(ctx, namespaceID, request.WorkflowExecution.GetWorkflowId()) if err != nil { return nil, h.convertError(err) } @@ -1278,11 +1073,7 @@ func (h *Handler) RecordChildExecutionCompleted(ctx context.Context, request *hi return nil, h.convertError(errWorkflowExecutionNotSet) } - shardContext, err := h.controller.GetShardByNamespaceWorkflow(namespaceID, request.GetParentExecution().WorkflowId) - if err != nil { - return nil, h.convertError(err) - } - engine, err := shardContext.GetEngine(ctx) + _, engine, err := h.getEngineByNamespaceWorkflow(ctx, namespaceID, request.GetParentExecution().WorkflowId) if err != nil { return nil, h.convertError(err) } @@ -1308,11 +1099,7 @@ func (h *Handler) VerifyChildExecutionCompletionRecorded( return nil, h.convertError(errWorkflowExecutionNotSet) } - shardContext, err := h.controller.GetShardByNamespaceWorkflow(namespaceID, request.ParentExecution.GetWorkflowId()) - if err != nil { - return nil, h.convertError(err) - } - engine, err := shardContext.GetEngine(ctx) + _, engine, err := h.getEngineByNamespaceWorkflow(ctx, namespaceID, request.ParentExecution.GetWorkflowId()) if err != nil { return nil, h.convertError(err) } @@ -1331,16 +1118,8 @@ func (h *Handler) VerifyChildExecutionCompletionRecorded( // 2. StickyScheduleToStartTimeout func (h *Handler) ResetStickyTaskQueue(ctx context.Context, request *historyservice.ResetStickyTaskQueueRequest) (*historyservice.ResetStickyTaskQueueResponse, error) { namespaceID := namespace.ID(request.GetNamespaceId()) - if namespaceID == "" { - return nil, h.convertError(errNamespaceNotSet) - } - workflowID := request.Execution.GetWorkflowId() - shardContext, err := h.controller.GetShardByNamespaceWorkflow(namespaceID, workflowID) - if err != nil { - return nil, h.convertError(err) - } - engine, err := shardContext.GetEngine(ctx) + _, engine, err := h.getEngineByNamespaceWorkflow(ctx, namespaceID, request.Execution.GetWorkflowId()) if err != nil { return nil, h.convertError(err) } @@ -1360,17 +1139,9 @@ func (h *Handler) ReplicateEventsV2(ctx context.Context, request *historyservice } namespaceID := namespace.ID(request.GetNamespaceId()) - if namespaceID == "" { - return nil, h.convertError(errNamespaceNotSet) - } - workflowExecution := request.WorkflowExecution - workflowID := workflowExecution.GetWorkflowId() - shardContext, err := h.controller.GetShardByNamespaceWorkflow(namespaceID, workflowID) - if err != nil { - return nil, h.convertError(err) - } - engine, err := shardContext.GetEngine(ctx) + + _, engine, err := h.getEngineByNamespaceWorkflow(ctx, namespaceID, workflowExecution.GetWorkflowId()) if err != nil { return nil, h.convertError(err) } @@ -2192,6 +1963,26 @@ func (h *Handler) CompleteNexusOperationChasm( return &historyservice.CompleteNexusOperationChasmResponse{}, nil } +// getEngineByNamespaceWorkflow validates the namespace ID and returns the shard context +// and history engine for the shard owning the given namespace/workflow pair. +// NOTE: getEngineByNamespaceWorkflow(...) does not wrap errors in h.convertError(err) +// +// since this is the responsibility of the APIs that history client directly interface with. +func (h *Handler) getEngineByNamespaceWorkflow(ctx context.Context, namespaceID namespace.ID, workflowID string) (historyi.ShardContext, historyi.Engine, error) { + if namespaceID == "" { + return nil, nil, errNamespaceNotSet + } + shardContext, err := h.controller.GetShardByNamespaceWorkflow(namespaceID, workflowID) + if err != nil { + return nil, nil, err + } + engine, err := shardContext.GetEngine(ctx) + if err != nil { + return nil, nil, err + } + return shardContext, engine, nil +} + // convertError is a helper method to convert ShardOwnershipLostError from persistence layer returned by various // HistoryEngine API calls to ShardOwnershipLost error return by HistoryService for client to be redirected to the // correct shard. @@ -2262,16 +2053,8 @@ func (h *Handler) InvokeStateMachineMethod(ctx context.Context, request *history func (h *Handler) SyncWorkflowState(ctx context.Context, request *historyservice.SyncWorkflowStateRequest) (*historyservice.SyncWorkflowStateResponse, error) { namespaceID := namespace.ID(request.GetNamespaceId()) - if namespaceID == "" { - return nil, h.convertError(errNamespaceNotSet) - } - workflowID := request.Execution.WorkflowId - shardContext, err := h.controller.GetShardByNamespaceWorkflow(namespaceID, workflowID) - if err != nil { - return nil, h.convertError(err) - } - engine, err := shardContext.GetEngine(ctx) + _, engine, err := h.getEngineByNamespaceWorkflow(ctx, namespaceID, request.Execution.WorkflowId) if err != nil { return nil, h.convertError(err) } @@ -2287,16 +2070,8 @@ func (h *Handler) UpdateActivityOptions( ctx context.Context, request *historyservice.UpdateActivityOptionsRequest, ) (*historyservice.UpdateActivityOptionsResponse, error) { namespaceID := namespace.ID(request.GetNamespaceId()) - workflowID := request.GetUpdateRequest().GetExecution().GetWorkflowId() - if request.GetNamespaceId() == "" { - return nil, h.convertError(errNamespaceNotSet) - } - shardContext, err := h.controller.GetShardByNamespaceWorkflow(namespaceID, workflowID) - if err != nil { - return nil, h.convertError(err) - } - engine, err := shardContext.GetEngine(ctx) + _, engine, err := h.getEngineByNamespaceWorkflow(ctx, namespaceID, request.GetUpdateRequest().GetExecution().GetWorkflowId()) if err != nil { return nil, h.convertError(err) } @@ -2312,16 +2087,8 @@ func (h *Handler) PauseActivity( ctx context.Context, request *historyservice.PauseActivityRequest, ) (*historyservice.PauseActivityResponse, error) { namespaceID := namespace.ID(request.GetNamespaceId()) - workflowID := request.GetFrontendRequest().GetExecution().GetWorkflowId() - if request.GetNamespaceId() == "" { - return nil, h.convertError(errNamespaceNotSet) - } - shardContext, err := h.controller.GetShardByNamespaceWorkflow(namespaceID, workflowID) - if err != nil { - return nil, h.convertError(err) - } - engine, err := shardContext.GetEngine(ctx) + _, engine, err := h.getEngineByNamespaceWorkflow(ctx, namespaceID, request.GetFrontendRequest().GetExecution().GetWorkflowId()) if err != nil { return nil, h.convertError(err) } @@ -2337,16 +2104,8 @@ func (h *Handler) UnpauseActivity( ctx context.Context, request *historyservice.UnpauseActivityRequest, ) (*historyservice.UnpauseActivityResponse, error) { namespaceID := namespace.ID(request.GetNamespaceId()) - workflowID := request.GetFrontendRequest().GetExecution().GetWorkflowId() - if request.GetNamespaceId() == "" { - return nil, h.convertError(errNamespaceNotSet) - } - shardContext, err := h.controller.GetShardByNamespaceWorkflow(namespaceID, workflowID) - if err != nil { - return nil, h.convertError(err) - } - engine, err := shardContext.GetEngine(ctx) + _, engine, err := h.getEngineByNamespaceWorkflow(ctx, namespaceID, request.GetFrontendRequest().GetExecution().GetWorkflowId()) if err != nil { return nil, h.convertError(err) } @@ -2362,16 +2121,8 @@ func (h *Handler) ResetActivity( ctx context.Context, request *historyservice.ResetActivityRequest, ) (*historyservice.ResetActivityResponse, error) { namespaceID := namespace.ID(request.GetNamespaceId()) - workflowID := request.GetFrontendRequest().GetExecution().GetWorkflowId() - if request.GetNamespaceId() == "" { - return nil, h.convertError(errNamespaceNotSet) - } - shardContext, err := h.controller.GetShardByNamespaceWorkflow(namespaceID, workflowID) - if err != nil { - return nil, h.convertError(err) - } - engine, err := shardContext.GetEngine(ctx) + _, engine, err := h.getEngineByNamespaceWorkflow(ctx, namespaceID, request.GetFrontendRequest().GetExecution().GetWorkflowId()) if err != nil { return nil, h.convertError(err) } @@ -2387,16 +2138,8 @@ func (h *Handler) ResetActivity( // WorkflowExecutionPaused event recorded in the history. func (h *Handler) PauseWorkflowExecution(ctx context.Context, request *historyservice.PauseWorkflowExecutionRequest) (*historyservice.PauseWorkflowExecutionResponse, error) { namespaceID := namespace.ID(request.GetNamespaceId()) - if namespaceID == "" { - return nil, h.convertError(errNamespaceNotSet) - } - workflowID := request.GetPauseRequest().GetWorkflowId() - shardContext, err := h.controller.GetShardByNamespaceWorkflow(namespaceID, workflowID) - if err != nil { - return nil, h.convertError(err) - } - engine, err := shardContext.GetEngine(ctx) + _, engine, err := h.getEngineByNamespaceWorkflow(ctx, namespaceID, request.GetPauseRequest().GetWorkflowId()) if err != nil { return nil, h.convertError(err) } @@ -2411,16 +2154,8 @@ func (h *Handler) PauseWorkflowExecution(ctx context.Context, request *historyse func (h *Handler) UnpauseWorkflowExecution(ctx context.Context, request *historyservice.UnpauseWorkflowExecutionRequest) (*historyservice.UnpauseWorkflowExecutionResponse, error) { namespaceID := namespace.ID(request.GetNamespaceId()) - if namespaceID == "" { - return nil, h.convertError(errNamespaceNotSet) - } - workflowID := request.GetUnpauseRequest().GetWorkflowId() - shardContext, err := h.controller.GetShardByNamespaceWorkflow(namespaceID, workflowID) - if err != nil { - return nil, h.convertError(err) - } - engine, err := shardContext.GetEngine(ctx) + _, engine, err := h.getEngineByNamespaceWorkflow(ctx, namespaceID, request.GetUnpauseRequest().GetWorkflowId()) if err != nil { return nil, h.convertError(err) }