Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 26 additions & 15 deletions service/matching/matcher_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,20 +415,23 @@ func (d *matcherData) ReprocessTasks(pred func(*internalTask) bool) []*internalT
return reprocess
}

// findMatch should return the highest priority task+poller match even if the per-task rate
// limit doesn't allow the task to be matched yet.
// findMatch returns the highest-priority ready task+poller pair, skipping tasks that are
// rate-limited. If no task is ready, returns (nil, nil, minDelay) where minDelay is the
// shortest wait across all rate-limited tasks that have a matching poller available.
// call with lock held
// nolint:revive // will improve later
func (d *matcherData) findMatch(allowForwarding bool) (*internalTask, *waitingPoller) {
func (d *matcherData) findMatch(allowForwarding bool, now int64) (*internalTask, *waitingPoller, time.Duration) {
// TODO(pri): optimize so it's not O(d*n) worst case
// TODO(pri): this iterates over heap as slice, which isn't quite correct, but okay for now
var minDelay time.Duration
for _, task := range d.tasks.heap {
// disallow normal poll forwarding when allowForwarding is false, but allow the
// "priority backlog poll forwarders".
if !allowForwarding && task.pollForwarderType == parentPollForwarder {
continue
}

var matched *waitingPoller
for _, poller := range d.pollers.heap {
// can't match cases:
if poller.queryOnly && !task.isQuery() && !task.isPollForwarder() {
Expand All @@ -449,11 +452,23 @@ func (d *matcherData) findMatch(allowForwarding bool) (*internalTask, *waitingPo
// their priority above "1". that's inaccurate but it's just a temporary situation.
continue
}
matched = poller
break
}
if matched == nil {
continue
}

return task, poller
delay := d.rateLimitManager.readyTimeForTask(task).delay(now)
if delay > 0 {
if minDelay == 0 || delay < minDelay {
minDelay = delay
}
continue
}
return task, matched, 0
}
return nil, nil
return nil, nil, minDelay
}

// call with lock held
Expand Down Expand Up @@ -498,20 +513,16 @@ func (d *matcherData) findAndWakeMatches() {

for {
// search for highest priority match
task, poller := d.findMatch(allowForwarding)
task, poller, minDelay := d.findMatch(allowForwarding, now)
if task == nil || poller == nil {
// no more current matches, stop rate limit timer if was running
d.rateLimitTimer.unset()
if minDelay > 0 {
d.rateLimitTimer.set(d.timeSource, d.rematchAfterTimer, minDelay)
} else {
d.rateLimitTimer.unset()
}
return
}

// check ready time
delay := d.rateLimitManager.readyTimeForTask(task).delay(now)
d.rateLimitTimer.set(d.timeSource, d.rematchAfterTimer, delay)
if delay > 0 {
return // not ready yet, timer will call match later
}

// ready to signal match
d.tasks.Remove(task)
d.pollers.Remove(poller)
Expand Down
140 changes: 139 additions & 1 deletion service/matching/matcher_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,144 @@ func (s *MatcherDataSuite) TestPerKeyRateLimit() {
s.Less(elapsed, 20*time.Second)
}

func (s *MatcherDataSuite) TestPerKeyRateLimitDoesNotBlockOtherKeys() {
// Set per-key rate limit: 1 RPS, no burst. After consuming one token for a key,
// the next token for that key is unavailable for 1 second.
s.md.rateLimitManager.SetFairnessKeyRateLimitDefaultForTesting(1.0, enumspb.RATE_LIMIT_SOURCE_API)
s.md.rateLimitManager.UpdatePerKeySimpleRateLimitWithBurstForTesting(0)

// Dispatch one task from "key1" to exhaust its rate limit token.
key1Task1 := s.newBacklogTaskWithPriority(1, 0, nil, &commonpb.Priority{PriorityKey: 1, FairnessKey: "key1"})
s.Require().NoError(s.md.EnqueueTaskNoWait(key1Task1))
firstPoll := s.pollFakeTime(time.Second)
s.Require().NoError(firstPoll.ctxErr)
s.Equal(key1Task1, firstPoll.task)

// key1 is now rate-limited for 1 second of fake time.
// key2 has no consumption yet and is immediately ready.
//
// Enqueue a new key1 task at priority 1 (higher urgency, comes first in the heap)
// and a key2 task at priority 2 (lower urgency, but not rate-limited).
key1Task2 := s.newBacklogTaskWithPriority(2, 0, nil, &commonpb.Priority{PriorityKey: 1, FairnessKey: "key1"})
key2Task := s.newBacklogTaskWithPriority(3, 0, nil, &commonpb.Priority{PriorityKey: 2, FairnessKey: "key2"})
s.Require().NoError(s.md.EnqueueTaskNoWait(key1Task2))
s.Require().NoError(s.md.EnqueueTaskNoWait(key2Task))

// The rate limit should block only key1, not the whole queue.
// key2Task should be dispatchable immediately even though the higher-priority
// key1Task2 is rate-limited and sits at the front of the task heap.
immediateMatch := s.pollImmediately(nil)
s.Require().NotNil(immediateMatch, "key2Task should be dispatched immediately; rate limit on key1 should not block the whole queue")
s.Equal(key2Task, immediateMatch.task)
}

func (s *MatcherDataSuite) pollFakeTimeAsync(timeout time.Duration) <-chan *matchResult {
ch := make(chan *matchResult, 1)
go func() { ch <- s.pollFakeTime(timeout) }()
return ch
}

// advanceFakeTimeUntil advances fake time in 10ms steps (with goroutine yields) until
// the provided channel has a result ready, then returns that result.
// Fails the test if the channel does not receive within 10 seconds of fake time.
func (s *MatcherDataSuite) advanceFakeTimeUntil(ch <-chan *matchResult) *matchResult {
const maxSteps = 1000 // 10s of fake time at 10ms/step
for range maxSteps {
gosched(3)
select {
case r := <-ch:
return r
default:
s.ts.Advance(10 * time.Millisecond)
}
}
s.FailNow("advanceFakeTimeUntil: timed out after 10s of fake time")
return nil
}

// TestPerKeyRateLimitTimerFiresForSkippedKey verifies the end-to-end flow:
// a rate-limited high-priority task is skipped so a lower-priority ready task
// dispatches immediately, and then the rateLimitTimer fires and dispatches the
// originally-skipped task once its rate-limit interval elapses.
func (s *MatcherDataSuite) TestPerKeyRateLimitTimerFiresForSkippedKey() {
s.md.rateLimitManager.SetFairnessKeyRateLimitDefaultForTesting(1.0, enumspb.RATE_LIMIT_SOURCE_API)
s.md.rateLimitManager.UpdatePerKeySimpleRateLimitWithBurstForTesting(0)

// Exhaust key1's token so it is rate-limited for 1s of fake time.
key1Seed := s.newBacklogTaskWithPriority(1, 0, nil, &commonpb.Priority{PriorityKey: 1, FairnessKey: "key1"})
s.Require().NoError(s.md.EnqueueTaskNoWait(key1Seed))
s.Require().NoError(s.pollFakeTime(time.Second).ctxErr)

key1Blocked := s.newBacklogTaskWithPriority(2, 0, nil, &commonpb.Priority{PriorityKey: 1, FairnessKey: "key1"})
key2Ready := s.newBacklogTaskWithPriority(3, 0, nil, &commonpb.Priority{PriorityKey: 2, FairnessKey: "key2"})
s.Require().NoError(s.md.EnqueueTaskNoWait(key1Blocked))
s.Require().NoError(s.md.EnqueueTaskNoWait(key2Ready))

// key2 dispatches immediately despite rate-limited key1 sitting first in the heap.
got := s.pollImmediately(nil)
s.Require().NotNil(got)
s.Equal(key2Ready, got.task)

// key1 must not dispatch before its interval elapses.
s.Nil(s.pollImmediately(nil), "key1 should still be rate-limited")

// Start a poll, then advance fake time until the rateLimitTimer fires and dispatches key1.
result := s.advanceFakeTimeUntil(s.pollFakeTimeAsync(2 * time.Second))
s.Require().NoError(result.ctxErr)
s.Equal(key1Blocked, result.task)
}

// TestWholeQueueRateLimitBlocksAllTasks is a regression test: a whole-queue rate
// limit (not per-key) must block ALL tasks, including lower-priority ones from
// different fairness keys. findMatch must not allow a lower-priority task to
// bypass a limit that applies to every task in the queue.
func (s *MatcherDataSuite) TestWholeQueueRateLimitBlocksAllTasks() {
s.md.rateLimitManager.SetEffectiveRPSAndSourceForTesting(1.0, enumspb.RATE_LIMIT_SOURCE_API)
s.md.rateLimitManager.UpdateSimpleRateLimitWithBurstForTesting(0)

// Consume the whole-queue token.
seed := s.newBacklogTaskWithPriority(1, 0, nil, &commonpb.Priority{PriorityKey: 1, FairnessKey: "key1"})
s.Require().NoError(s.md.EnqueueTaskNoWait(seed))
s.Require().NoError(s.pollFakeTime(time.Second).ctxErr)

t1 := s.newBacklogTaskWithPriority(2, 0, nil, &commonpb.Priority{PriorityKey: 1, FairnessKey: "key1"})
t2 := s.newBacklogTaskWithPriority(3, 0, nil, &commonpb.Priority{PriorityKey: 2, FairnessKey: "key2"})
s.Require().NoError(s.md.EnqueueTaskNoWait(t1))
s.Require().NoError(s.md.EnqueueTaskNoWait(t2))

// Both tasks share the whole-queue token; neither should dispatch immediately.
s.Nil(s.pollImmediately(nil), "whole-queue limit must block all tasks, not just the high-priority key")

// After the whole-queue token refills, t1 (priority 1) dispatches first.
r1 := s.advanceFakeTimeUntil(s.pollFakeTimeAsync(2 * time.Second))
s.Require().NoError(r1.ctxErr)
s.Equal(t1, r1.task)

r2 := s.advanceFakeTimeUntil(s.pollFakeTimeAsync(2 * time.Second))
s.Require().NoError(r2.ctxErr)
s.Equal(t2, r2.task)
}

// TestPerKeyRateLimitSameKeyTasksShareToken verifies that all tasks for the same
// fairness key share one token bucket. After one key1 task dispatches, a second
// key1 task must wait for the full interval even though no other key is involved.
func (s *MatcherDataSuite) TestPerKeyRateLimitSameKeyTasksShareToken() {
s.md.rateLimitManager.SetFairnessKeyRateLimitDefaultForTesting(1.0, enumspb.RATE_LIMIT_SOURCE_API)
s.md.rateLimitManager.UpdatePerKeySimpleRateLimitWithBurstForTesting(0)

t1 := s.newBacklogTaskWithPriority(1, 0, nil, &commonpb.Priority{PriorityKey: 1, FairnessKey: "key1"})
s.Require().NoError(s.md.EnqueueTaskNoWait(t1))
s.Require().NoError(s.pollFakeTime(time.Second).ctxErr)

t2 := s.newBacklogTaskWithPriority(2, 0, nil, &commonpb.Priority{PriorityKey: 1, FairnessKey: "key1"})
s.Require().NoError(s.md.EnqueueTaskNoWait(t2))
s.Nil(s.pollImmediately(nil), "second same-key task must wait; key1 token is exhausted")

r := s.advanceFakeTimeUntil(s.pollFakeTimeAsync(2 * time.Second))
s.Require().NoError(r.ctxErr)
s.Equal(t2, r.task)
}

func (s *MatcherDataSuite) TestOrder() {
t1 := s.newBacklogTaskWithPriority(1, 0, nil, &commonpb.Priority{PriorityKey: 1})
t2 := s.newBacklogTaskWithPriority(2, 0, nil, &commonpb.Priority{PriorityKey: 2})
Expand Down Expand Up @@ -844,7 +982,7 @@ func (s *MatcherDataSuite) TestFindMatch() {

// Call findMatch
s.md.lock.Lock()
foundTask, foundPoller := s.md.findMatch(tc.allowForwarding)
foundTask, foundPoller, _ := s.md.findMatch(tc.allowForwarding, s.ts.Now().UnixNano())
s.md.lock.Unlock()

if tc.shouldMatch {
Expand Down
Loading