diff --git a/common/metrics/metric_defs.go b/common/metrics/metric_defs.go index c722600863..808e8e37f7 100644 --- a/common/metrics/metric_defs.go +++ b/common/metrics/metric_defs.go @@ -1198,6 +1198,8 @@ var ( ForceLoadedTaskQueuePartitions = NewCounterDef("force_loaded_task_queue_partitions_count") ForceLoadedTaskQueuePartitionUnnecessarilyCounter = NewCounterDef("force_loaded_task_queue_partition_unnecessarily_count") LoadedPhysicalTaskQueueGauge = NewGaugeDef("loaded_physical_task_queue_count") + PendingPolls = NewGaugeDef("pending_polls") + WorkerCountPerTaskQueue = NewGaugeDef("worker_count") TaskQueueStartedCounter = NewCounterDef("task_queue_started") TaskQueueStoppedCounter = NewCounterDef("task_queue_stopped") TaskWriteThrottlePerTaskQueueCounter = NewCounterDef("task_write_throttle_count") diff --git a/service/matching/matcher.go b/service/matching/matcher.go index 7948d0814b..4faf54cb14 100644 --- a/service/matching/matcher.go +++ b/service/matching/matcher.go @@ -422,10 +422,19 @@ func (tm *TaskMatcher) poll( defer func() { if pollMetadata.forwardedFrom == "" { // Only recording for original polls + var pollResult string + if err == nil { + pollResult = "dispatch" + } else if errors.Is(err, errNoTasks) { + pollResult = "timeout" + } else { + pollResult = "failed" + } metrics.PollLatencyPerTaskQueue.With(tm.metricsHandler).Record( time.Since(start), metrics.StringTag("forwarded", strconv.FormatBool(forwardedPoll)), metrics.StringTag(metrics.TaskPriorityTagName, ""), + metrics.StringTag("result", pollResult), ) } diff --git a/service/matching/physical_task_queue_manager.go b/service/matching/physical_task_queue_manager.go index f15cc13838..38c77ddecb 100644 --- a/service/matching/physical_task_queue_manager.go +++ b/service/matching/physical_task_queue_manager.go @@ -462,7 +462,11 @@ func (c *physicalTaskQueueManagerImpl) PollTask( c.liveness.markAlive() c.currentPolls.Add(1) - defer c.currentPolls.Add(-1) + metrics.PendingPolls.With(c.metricsHandler).Record(float64(c.currentPolls.Load())) + defer func() { + c.currentPolls.Add(-1) + metrics.PendingPolls.With(c.metricsHandler).Record(float64(c.currentPolls.Load())) + }() namespaceId := namespace.ID(c.queue.NamespaceId()) namespaceEntry, err := c.namespaceRegistry.GetNamespaceByID(namespaceId) @@ -596,11 +600,17 @@ func (c *physicalTaskQueueManagerImpl) DispatchNexusTask( func (c *physicalTaskQueueManagerImpl) UpdatePollerInfo(id pollerIdentity, pollMetadata *pollMetadata) { c.pollerHistory.updatePollerInfo(id, pollMetadata) + if c.queue.Partition().IsRoot() { + metrics.WorkerCountPerTaskQueue.With(c.metricsHandler).Record(float64(c.pollerHistory.size())) + } } func (c *physicalTaskQueueManagerImpl) RemovePoller(id pollerIdentity) { if c.pollerHistory != nil { c.pollerHistory.removePoller(id) + if c.queue.Partition().IsRoot() { + metrics.WorkerCountPerTaskQueue.With(c.metricsHandler).Record(float64(c.pollerHistory.size())) + } } } diff --git a/service/matching/poller_history.go b/service/matching/poller_history.go index 8875fe9a6a..97aae01330 100644 --- a/service/matching/poller_history.go +++ b/service/matching/poller_history.go @@ -70,6 +70,10 @@ func (pollers *pollerHistory) getPollerInfo(earliestAccessTime time.Time) []*tas return result } +func (pollers *pollerHistory) size() int { + return pollers.history.Size() +} + func defaultRPS(wrapper *wrapperspb.DoubleValue) float64 { if wrapper != nil { return wrapper.Value diff --git a/service/matching/pri_matcher.go b/service/matching/pri_matcher.go index 497b0c61b7..2c9cb6ca39 100644 --- a/service/matching/pri_matcher.go +++ b/service/matching/pri_matcher.go @@ -602,6 +602,7 @@ func (tm *priTaskMatcher) poll( start := time.Now() pollWasForwarded := false var priority int32 + pollResult := "failed" defer func() { // TODO(pri): can we consolidate all the metrics code below? @@ -611,6 +612,7 @@ func (tm *priTaskMatcher) poll( time.Since(start), metrics.StringTag("forwarded", strconv.FormatBool(pollWasForwarded)), metrics.MatchingTaskPriorityTag(priority), + metrics.StringTag("result", pollResult), ) } }() @@ -631,11 +633,13 @@ func (tm *priTaskMatcher) poll( } if res == nil { + pollResult = "timeout" return nil, errNoTasks // only possible for MatchPollerImmediately } else if res.ctxErr != nil { if res.ctxErrIdx == 0 { metrics.PollTimeoutPerTaskQueueCounter.With(tm.metricsHandler).Record(1) } + pollResult = "timeout" return nil, errNoTasks } @@ -646,6 +650,7 @@ func (tm *priTaskMatcher) poll( task := res.task pollWasForwarded = task.isStarted() // true if this poll was forwarded _from_ this matcher priority = task.getPriority().GetPriorityKey() + pollResult = "dispatch" if !pollWasForwarded { // Only record these metrics on the parent for forwarded polls