diff --git a/common/metrics/metric_defs.go b/common/metrics/metric_defs.go index c722600863..099bf80c20 100644 --- a/common/metrics/metric_defs.go +++ b/common/metrics/metric_defs.go @@ -1200,9 +1200,13 @@ var ( LoadedPhysicalTaskQueueGauge = NewGaugeDef("loaded_physical_task_queue_count") TaskQueueStartedCounter = NewCounterDef("task_queue_started") TaskQueueStoppedCounter = NewCounterDef("task_queue_stopped") - TaskWriteThrottlePerTaskQueueCounter = NewCounterDef("task_write_throttle_count") - TaskWriteLatencyPerTaskQueue = NewTimerDef("task_write_latency") - TaskRewrites = NewCounterDef( + TasksAddedCounter = NewCounterDef( + "tasks_added", + WithDescription("Number of tasks arriving at a physical task queue, broken down by add result, forwarding, and versioning behavior"), + ) + TaskWriteThrottlePerTaskQueueCounter = NewCounterDef("task_write_throttle_count") + TaskWriteLatencyPerTaskQueue = NewTimerDef("task_write_latency") + TaskRewrites = NewCounterDef( "task_rewrites", WithDescription("Number of times tasks are rewritten to persistence after failing to process"), ) diff --git a/common/metrics/tags.go b/common/metrics/tags.go index 1f29860564..66fe95aa08 100644 --- a/common/metrics/tags.go +++ b/common/metrics/tags.go @@ -46,6 +46,7 @@ const ( replicationTaskType = "replicationTaskType" replicationTaskPriority = "replicationTaskPriority" taskExpireStage = "task_expire_stage" + taskAddResult = "task_add_result" versioningBehavior = "versioning_behavior" continueAsNewVersioningBehavior = "continue_as_new_versioning_behavior" suggestContinueAsNewReasonTooManyUpdates = "suggest_continue_as_new_reason_too_many_updates" @@ -315,6 +316,17 @@ func ForwardedTag(forwarded bool) Tag { return Tag{Key: forwardedTag, Value: strconv.FormatBool(forwarded)} } +const ( + TaskAddResultSyncMatch = "sync_match" + TaskAddResultBacklog = "backlog" + TaskAddResultThrottled = "throttled" + TaskAddResultFailure = "failure" +) + +func TaskAddResultTag(result string) Tag { + return Tag{Key: taskAddResult, Value: result} +} + func MatchingTaskPriorityTag(value int32) Tag { priStr := "" if value != 0 { diff --git a/service/matching/physical_task_queue_manager.go b/service/matching/physical_task_queue_manager.go index f15cc13838..fd076700c3 100644 --- a/service/matching/physical_task_queue_manager.go +++ b/service/matching/physical_task_queue_manager.go @@ -451,6 +451,15 @@ func (c *physicalTaskQueueManagerImpl) SpoolTask(taskInfo *persistencespb.TaskIn return c.backlogMgr.SpoolTask(taskInfo) } +func (c *physicalTaskQueueManagerImpl) RecordTaskAdd(result string, forwarded bool, behavior enumspb.VersioningBehavior) { + c.metricsHandler.Counter(metrics.TasksAddedCounter.Name()).Record( + 1, + metrics.TaskAddResultTag(result), + metrics.ForwardedTag(forwarded), + metrics.VersioningBehaviorTag(behavior), + ) +} + // PollTask blocks waiting for a task. // Returns error when context deadline is exceeded // maxDispatchPerSecond is the max rate at which tasks are allowed diff --git a/service/matching/physical_task_queue_manager_interface.go b/service/matching/physical_task_queue_manager_interface.go index cab1ef067f..b03dc2726f 100644 --- a/service/matching/physical_task_queue_manager_interface.go +++ b/service/matching/physical_task_queue_manager_interface.go @@ -6,6 +6,7 @@ import ( "context" "time" + enumspb "go.temporal.io/api/enums/v1" taskqueuepb "go.temporal.io/api/taskqueue/v1" "go.temporal.io/server/api/matchingservice/v1" persistencespb "go.temporal.io/server/api/persistence/v1" @@ -64,5 +65,8 @@ type ( // GetFairnessWeightOverrides returns current fairness weight overrides for this queue. GetFairnessWeightOverrides() fairnessWeightOverrides UpdateRemotePriorityBacklogs(remotePriorityBacklogSet) + // RecordTaskAdd records the outcome of a task add to this physical queue using + // the queue's tagged metrics handler, so all per-physical-queue labels are included. + RecordTaskAdd(result string, forwarded bool, behavior enumspb.VersioningBehavior) } ) diff --git a/service/matching/physical_task_queue_manager_mock.go b/service/matching/physical_task_queue_manager_mock.go index cfc8c369b0..321177c22a 100644 --- a/service/matching/physical_task_queue_manager_mock.go +++ b/service/matching/physical_task_queue_manager_mock.go @@ -14,6 +14,7 @@ import ( reflect "reflect" time "time" + enums "go.temporal.io/api/enums/v1" taskqueue "go.temporal.io/api/taskqueue/v1" matchingservice "go.temporal.io/server/api/matchingservice/v1" persistence "go.temporal.io/server/api/persistence/v1" @@ -282,6 +283,18 @@ func (mr *MockphysicalTaskQueueManagerMockRecorder) QueueKey() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "QueueKey", reflect.TypeOf((*MockphysicalTaskQueueManager)(nil).QueueKey)) } +// RecordTaskAdd mocks base method. +func (m *MockphysicalTaskQueueManager) RecordTaskAdd(result string, forwarded bool, behavior enums.VersioningBehavior) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "RecordTaskAdd", result, forwarded, behavior) +} + +// RecordTaskAdd indicates an expected call of RecordTaskAdd. +func (mr *MockphysicalTaskQueueManagerMockRecorder) RecordTaskAdd(result, forwarded, behavior any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RecordTaskAdd", reflect.TypeOf((*MockphysicalTaskQueueManager)(nil).RecordTaskAdd), result, forwarded, behavior) +} + // RemovePoller mocks base method. func (m *MockphysicalTaskQueueManager) RemovePoller(arg0 pollerIdentity) { m.ctrl.T.Helper() diff --git a/service/matching/task_queue_partition_manager.go b/service/matching/task_queue_partition_manager.go index 796582da55..0dee7e0161 100644 --- a/service/matching/task_queue_partition_manager.go +++ b/service/matching/task_queue_partition_manager.go @@ -439,15 +439,31 @@ reredirectTask: return "", false, err } + behavior := directive.GetBehavior() + forwarded := params.forwardInfo != nil + if isActive { syncMatched, err = syncMatchQueue.TrySyncMatch(ctx, syncMatchTask) if syncMatched && !pm.shouldBacklogSyncMatchTaskOnError(err) { // Only fire hooks for non-forwarded tasks. Forwarded tasks already had hooks fired // on the child partition that originally received the task. - if params.forwardInfo == nil { + if !forwarded { pm.processTaskAddHooks(ctx, targetVersion, syncMatched) } + var syncMatchResult string + if err != nil { + var resourceExhausted *serviceerror.ResourceExhausted + if errors.As(err, &resourceExhausted) { + syncMatchResult = metrics.TaskAddResultThrottled + } else { + syncMatchResult = metrics.TaskAddResultFailure + } + } else { + syncMatchResult = metrics.TaskAddResultSyncMatch + } + syncMatchQueue.RecordTaskAdd(syncMatchResult, forwarded, behavior) + // Build ID is not returned for sync match. The returned build ID is used by History to update // mutable state (and visibility) when the first workflow task is spooled. // For sync-match case, History has already received the build ID in the Record*TaskStarted call. @@ -463,6 +479,7 @@ reredirectTask: if spoolQueue == nil { // This means the task is being forwarded. Child partition will persist the task when sync match fails. + // No metric emitted here: the child partition will emit when it calls SpoolTask. return "", false, errRemoteSyncMatchFailed } @@ -474,7 +491,15 @@ reredirectTask: err = spoolQueue.SpoolTask(params.taskInfo) if err == nil { + spoolQueue.RecordTaskAdd(metrics.TaskAddResultBacklog, forwarded, behavior) pm.processTaskAddHooks(ctx, targetVersion, false) + } else { + var resourceExhausted *serviceerror.ResourceExhausted + if errors.As(err, &resourceExhausted) { + spoolQueue.RecordTaskAdd(metrics.TaskAddResultThrottled, forwarded, behavior) + } else { + spoolQueue.RecordTaskAdd(metrics.TaskAddResultFailure, forwarded, behavior) + } } return assignedBuildId, false, err