Skip to content

Commit f152d3d

Browse files
bgwinesClaude
andcommitted
Fix spurious per-result waiter count and consolidate counter management
When the global ConsolidatorQueryWaiterCap was hit by a caller of a different query, the current query's per-result waiter count was left spuriously positive, causing unnecessary proto3 row caching. Fix by decrementing the per-result count alongside the global count for all non-original callers. Also refactors counter management: AddWaiterCounter now increments both the per-result and global counters (removing AddPerResultWaiterCounter), and TotalWaiterCount is a read method on the Consolidator interface instead of being read through AddWaiterCounter's return value. Note: the two counter increments in AddWaiterCounter are not jointly atomic. This is benign — the leader checks HasWaiters() before Broadcast(), so it always sees the pre-decrement state. The only effect is momentary soft-limit imprecision on ConsolidatorQueryWaiterCap, which is a soft cap anyway. AI disclosure: Claude Code assisted with development. Every line of code was either written by or carefully reviewed by me :) Co-Authored-By: Claude <svc-devxp-claude@slack-corp.com> Signed-off-by: Brett Wines <bwines@slack-corp.com>
1 parent 0a491d7 commit f152d3d

5 files changed

Lines changed: 43 additions & 28 deletions

File tree

go/sync2/consolidator.go

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ type Consolidator interface {
3030
Create(string) (PendingResult, bool)
3131
Items() []ConsolidatorCacheItem
3232
Record(query string)
33+
TotalWaiterCount() int64
3334
}
3435

3536
// PendingResult is a wrapper for result of a query.
@@ -41,7 +42,7 @@ type PendingResult interface {
4142
Result() *sqltypes.Result
4243
Wait()
4344
HasWaiters() bool
44-
AddWaiterCounter(int64) *int64
45+
AddWaiterCounter(int64)
4546
}
4647

4748
type consolidator struct {
@@ -64,12 +65,12 @@ type pendingResult struct {
6465
// executing is used to block additional requests.
6566
// The original request holds a write lock while additional ones are blocked
6667
// on acquiring a read lock (see Wait() below.)
67-
executing sync.RWMutex
68-
consolidator *consolidator
69-
query string
70-
result *sqltypes.Result
71-
err error
72-
perResultWaiterCount atomic.Int64
68+
executing sync.RWMutex
69+
consolidator *consolidator
70+
query string
71+
result *sqltypes.Result
72+
err error
73+
waiterCount atomic.Int64
7374
}
7475

7576
// Create adds a query to currently executing queries and acquires a
@@ -80,7 +81,6 @@ func (co *consolidator) Create(query string) (PendingResult, bool) {
8081
defer co.mu.Unlock()
8182
var r *pendingResult
8283
if r, ok := co.queries[query]; ok {
83-
r.perResultWaiterCount.Add(1)
8484
r.AddWaiterCounter(1)
8585
return r, false
8686
}
@@ -121,7 +121,7 @@ func (rs *pendingResult) SetResult(res *sqltypes.Result) {
121121
}
122122

123123
func (rs *pendingResult) HasWaiters() bool {
124-
return rs.perResultWaiterCount.Load() > 0
124+
return rs.waiterCount.Load() > 0
125125
}
126126

127127
// Wait waits for the original query to complete execution. Wait should
@@ -131,9 +131,11 @@ func (rs *pendingResult) Wait() {
131131
rs.executing.RLock()
132132
}
133133

134-
func (rs *pendingResult) AddWaiterCounter(c int64) *int64 {
134+
func (rs *pendingResult) AddWaiterCounter(c int64) {
135+
// Non-atomic pair is benign: ConsolidatorQueryWaiterCap is a soft limit and
136+
// the per-waiter count is only checked before Broadcast().
137+
rs.waiterCount.Add(c)
135138
atomic.AddInt64(rs.consolidator.totalWaiterCount, c)
136-
return rs.consolidator.totalWaiterCount
137139
}
138140

139141
// ConsolidatorCache is a thread-safe object used for counting how often recent
@@ -145,6 +147,10 @@ type ConsolidatorCache struct {
145147
totalWaiterCount *int64
146148
}
147149

150+
func (cc *ConsolidatorCache) TotalWaiterCount() int64 {
151+
return atomic.LoadInt64(cc.totalWaiterCount)
152+
}
153+
148154
// NewConsolidatorCache creates a new cache with the given capacity.
149155
func NewConsolidatorCache(capacity int64) *ConsolidatorCache {
150156
return &ConsolidatorCache{cache.NewLRUCache[*ccount](capacity), new(int64)}

go/sync2/consolidator_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,8 @@ func TestAddWaiterCount(t *testing.T) {
4949
wgAdd.Wait()
5050
wgSub.Wait()
5151

52-
if *pr.AddWaiterCounter(0) != 0 {
53-
t.Fatalf("Expect 0 totalWaiterCount but got: %v", *pr.AddWaiterCounter(0))
52+
if con.TotalWaiterCount() != 0 {
53+
t.Fatalf("Expect 0 totalWaiterCount but got: %d", con.TotalWaiterCount())
5454
}
5555
}
5656

go/sync2/fake_consolidator.go

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ type FakeConsolidator struct {
3030
// CreateReturnCreated pre-configures the return value of Create calls.
3131
CreateReturn *FakeConsolidatorCreateReturn
3232
// RecordCalls can be usd to inspect Record calls.
33-
RecordCalls []string
33+
RecordCalls []string
34+
totalWaiterCount int64
3435
}
3536

3637
// FakeConsolidatorCreateReturn wraps the two return values of a call to
@@ -53,10 +54,10 @@ type FakePendingResult struct {
5354
WaitCalls int
5455
// AddWaiterCounterCalls can be used to inspect AddWaiterCounter calls.
5556
AddWaiterCounterCalls []int64
56-
WaiterCount int64
57-
PerResultWaiterCount int64
58-
err error
59-
result *sqltypes.Result
57+
WaiterCount int64
58+
Consolidator *FakeConsolidator
59+
err error
60+
result *sqltypes.Result
6061
}
6162

6263
var (
@@ -87,6 +88,14 @@ func (fc *FakeConsolidator) Items() []ConsolidatorCacheItem {
8788
return nil
8889
}
8990

91+
func (fc *FakeConsolidator) TotalWaiterCount() int64 {
92+
return fc.totalWaiterCount
93+
}
94+
95+
func (fc *FakeConsolidator) SetTotalWaiterCount(count int64) {
96+
fc.totalWaiterCount = count
97+
}
98+
9099
// Broadcast records the Broadcast call for later verification.
91100
func (fr *FakePendingResult) Broadcast() {
92101
fr.BroadcastCalls++
@@ -118,12 +127,12 @@ func (fr *FakePendingResult) Wait() {
118127
}
119128

120129
func (fr *FakePendingResult) HasWaiters() bool {
121-
return fr.PerResultWaiterCount > 0
130+
return fr.WaiterCount > 0
122131
}
123132

124133
// AddWaiterCounter records the call and simulates waiter count changes.
125-
func (fr *FakePendingResult) AddWaiterCounter(delta int64) *int64 {
134+
func (fr *FakePendingResult) AddWaiterCounter(delta int64) {
126135
fr.AddWaiterCounterCalls = append(fr.AddWaiterCounterCalls, delta)
127136
fr.WaiterCount += delta
128-
return &fr.WaiterCount
137+
fr.Consolidator.totalWaiterCount += delta
129138
}

go/vt/vttablet/tabletserver/query_executor.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -761,7 +761,7 @@ func (qre *QueryExecutor) execSelect() (*sqltypes.Result, error) {
761761
}
762762
} else {
763763
waiterCap := qre.tsv.config.ConsolidatorQueryWaiterCap
764-
if waiterCap == 0 || *q.AddWaiterCounter(0) <= waiterCap {
764+
if waiterCap == 0 || qre.tsv.qe.consolidator.TotalWaiterCount() <= waiterCap {
765765
qre.logStats.QuerySources |= tabletenv.QuerySourceConsolidator
766766
startTime := time.Now()
767767
q.Wait()

go/vt/vttablet/tabletserver/query_executor_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1423,7 +1423,7 @@ func TestQueryExecutorShouldConsolidate(t *testing.T) {
14231423

14241424
// Set up consolidator pre-conditions.
14251425

1426-
fakePendingResult := &sync2.FakePendingResult{}
1426+
fakePendingResult := &sync2.FakePendingResult{Consolidator: fakeConsolidator}
14271427
fakePendingResult.SetResult(result)
14281428
fakeConsolidator.CreateReturn = &sync2.FakeConsolidatorCreateReturn{
14291429
Created: !tcase.consolidatorHasIdenticalQuery,
@@ -1494,10 +1494,11 @@ func TestQueryExecutorConsolidatorWaiterCapFallback(t *testing.T) {
14941494
}
14951495

14961496
// Set up consolidator to simulate an identical query already running (Created=false)
1497-
fakePendingResult := &sync2.FakePendingResult{}
1497+
fakePendingResult := &sync2.FakePendingResult{Consolidator: fakeConsolidator}
14981498
fakePendingResult.SetResult(result)
14991499
// Start with waiter count above the cap (2 > 1), so the condition fails
15001500
fakePendingResult.WaiterCount = 2
1501+
fakeConsolidator.SetTotalWaiterCount(2)
15011502

15021503
fakeConsolidator.CreateReturn = &sync2.FakeConsolidatorCreateReturn{
15031504
Created: false, // Simulate identical query already running
@@ -1528,10 +1529,9 @@ func TestQueryExecutorConsolidatorWaiterCapFallback(t *testing.T) {
15281529
// Verify we did NOT broadcast (because we're not the original)
15291530
require.Equal(t, 0, fakePendingResult.BroadcastCalls)
15301531

1531-
// Verify AddWaiterCounter was called: once with 0 (to check count), once with -1 (cleanup)
1532-
require.Len(t, fakePendingResult.AddWaiterCounterCalls, 2)
1533-
require.Equal(t, int64(0), fakePendingResult.AddWaiterCounterCalls[0]) // Check current count
1534-
require.Equal(t, int64(-1), fakePendingResult.AddWaiterCounterCalls[1]) // Decrement
1532+
// Verify AddWaiterCounter was called once with -1 (cleanup)
1533+
require.Len(t, fakePendingResult.AddWaiterCounterCalls, 1)
1534+
require.Equal(t, int64(-1), fakePendingResult.AddWaiterCounterCalls[0])
15351535

15361536
// Verify fallback executed the query independently
15371537
require.Equal(t, 1, db.GetQueryCalledNum(input))

0 commit comments

Comments
 (0)