Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions common/metrics/metric_defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1200,9 +1200,13 @@ var (
LoadedPhysicalTaskQueueGauge = NewGaugeDef("loaded_physical_task_queue_count")
TaskQueueStartedCounter = NewCounterDef("task_queue_started")
TaskQueueStoppedCounter = NewCounterDef("task_queue_stopped")
TaskWriteThrottlePerTaskQueueCounter = NewCounterDef("task_write_throttle_count")
TaskWriteLatencyPerTaskQueue = NewTimerDef("task_write_latency")
TaskRewrites = NewCounterDef(
TasksDispatchedPerTaskQueueCounter = NewCounterDef(
"tasks_dispatched_per_task_queue",
WithDescription("Number of tasks arriving at a physical task queue, broken down by dispatch result, forwarding, and versioning behavior"),
)
TaskWriteThrottlePerTaskQueueCounter = NewCounterDef("task_write_throttle_count")
TaskWriteLatencyPerTaskQueue = NewTimerDef("task_write_latency")
TaskRewrites = NewCounterDef(
"task_rewrites",
WithDescription("Number of times tasks are rewritten to persistence after failing to process"),
)
Expand Down
12 changes: 12 additions & 0 deletions common/metrics/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ const (
replicationTaskType = "replicationTaskType"
replicationTaskPriority = "replicationTaskPriority"
taskExpireStage = "task_expire_stage"
taskDispatchResult = "dispatch_result"
versioningBehavior = "versioning_behavior"
continueAsNewVersioningBehavior = "continue_as_new_versioning_behavior"
suggestContinueAsNewReasonTooManyUpdates = "suggest_continue_as_new_reason_too_many_updates"
Expand Down Expand Up @@ -315,6 +316,17 @@ func ForwardedTag(forwarded bool) Tag {
return Tag{Key: forwardedTag, Value: strconv.FormatBool(forwarded)}
}

const (
TaskDispatchResultSyncMatch = "sync_match"
TaskDispatchResultBacklog = "backlog"
TaskDispatchResultThrottled = "throttled"
TaskDispatchResultFailure = "failure"
)

func TaskDispatchResultTag(result string) Tag {
return Tag{Key: taskDispatchResult, Value: result}
}

func MatchingTaskPriorityTag(value int32) Tag {
priStr := ""
if value != 0 {
Expand Down
9 changes: 9 additions & 0 deletions service/matching/physical_task_queue_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,15 @@ func (c *physicalTaskQueueManagerImpl) SpoolTask(taskInfo *persistencespb.TaskIn
return c.backlogMgr.SpoolTask(taskInfo)
}

func (c *physicalTaskQueueManagerImpl) RecordTaskDispatched(result string, forwarded bool, behavior enumspb.VersioningBehavior) {
c.metricsHandler.Counter(metrics.TasksDispatchedPerTaskQueueCounter.Name()).Record(
1,
metrics.TaskDispatchResultTag(result),
metrics.ForwardedTag(forwarded),
metrics.VersioningBehaviorTag(behavior),
)
}

// PollTask blocks waiting for a task.
// Returns error when context deadline is exceeded
// maxDispatchPerSecond is the max rate at which tasks are allowed
Expand Down
4 changes: 4 additions & 0 deletions service/matching/physical_task_queue_manager_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"
"time"

enumspb "go.temporal.io/api/enums/v1"
taskqueuepb "go.temporal.io/api/taskqueue/v1"
"go.temporal.io/server/api/matchingservice/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
Expand Down Expand Up @@ -64,5 +65,8 @@ type (
// GetFairnessWeightOverrides returns current fairness weight overrides for this queue.
GetFairnessWeightOverrides() fairnessWeightOverrides
UpdateRemotePriorityBacklogs(remotePriorityBacklogSet)
// RecordTaskDispatched records the outcome of a task add to this physical queue using
// the queue's tagged metrics handler, so all per-physical-queue labels are included.
RecordTaskDispatched(result string, forwarded bool, behavior enumspb.VersioningBehavior)
}
)
13 changes: 13 additions & 0 deletions service/matching/physical_task_queue_manager_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions service/matching/task_queue_partition_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,9 @@ reredirectTask:
return "", false, err
}

behavior := directive.GetBehavior()
forwarded := params.forwardInfo != nil

if isActive {
syncMatched, err = syncMatchQueue.TrySyncMatch(ctx, syncMatchTask)
if syncMatched && !pm.shouldBacklogSyncMatchTaskOnError(err) {
Expand All @@ -448,6 +451,8 @@ reredirectTask:
pm.processTaskAddHooks(ctx, targetVersion, syncMatched)
}

syncMatchQueue.RecordTaskDispatched(metrics.TaskDispatchResultSyncMatch, forwarded, behavior)

// Build ID is not returned for sync match. The returned build ID is used by History to update
// mutable state (and visibility) when the first workflow task is spooled.
// For sync-match case, History has already received the build ID in the Record*TaskStarted call.
Expand All @@ -463,6 +468,7 @@ reredirectTask:

if spoolQueue == nil {
// This means the task is being forwarded. Child partition will persist the task when sync match fails.
// No metric emitted here: the child partition will emit when it calls SpoolTask.
return "", false, errRemoteSyncMatchFailed
}

Expand All @@ -474,7 +480,15 @@ reredirectTask:

err = spoolQueue.SpoolTask(params.taskInfo)
if err == nil {
spoolQueue.RecordTaskDispatched(metrics.TaskDispatchResultBacklog, forwarded, behavior)
pm.processTaskAddHooks(ctx, targetVersion, false)
} else {
var resourceExhausted *serviceerror.ResourceExhausted
if errors.As(err, &resourceExhausted) {
spoolQueue.RecordTaskDispatched(metrics.TaskDispatchResultThrottled, forwarded, behavior)
} else {
spoolQueue.RecordTaskDispatched(metrics.TaskDispatchResultFailure, forwarded, behavior)
}
}

return assignedBuildId, false, err
Expand Down
Loading