Skip to content

Commit 22c9862

Browse files
ejortegauClaude
andcommitted
smartconnpool: add per-pool waiter cap to limit queries waiting for a connection (#827)
This is backport of vitessio#19811 Adds a configurable limit on the number of queries that can wait for a connection from each pool. When the cap is reached, new requests are rejected immediately with RESOURCE_EXHAUSTED instead of queueing unboundedly. New flags (Default 0 = unlimited): --queryserver-config-query-pool-waiter-cap --queryserver-config-stream-pool-waiter-cap --queryserver-config-txpool-waiter-cap How it works Cap enforcement lives inside waitlist.waitForConn using a double-checked locking pattern: a lockless atomic pre-check rejects most over-cap requests early, and a strict check under the mutex guarantees correctness. WaitCount is now incremented via a callback (onWait) when the wait actually begins (before mutex acquisition), not after completion. A new WaiterCapRejected counter (exposed as {name}WaiterCapRejected) tracks how many requests were rejected due to the cap. New metrics Metric Type Description {pool}WaiterCapRejected Counter Requests rejected because the waiter cap was reached --------- Signed-off-by: Eduardo Ortega <5791035+ejortegau@users.noreply.github.com> Co-authored-by: Claude <svc-devxp-claude@slack-corp.com>
1 parent dd41012 commit 22c9862

8 files changed

Lines changed: 127 additions & 9 deletions

File tree

go/flags/endtoend/vtcombo.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,18 +298,21 @@ Flags:
298298
--queryserver-config-pool-size int query server read pool size, connection pool is used by regular queries (non streaming, not in a transaction) (default 16)
299299
--queryserver-config-query-cache-memory int query server query cache size in bytes, maximum amount of memory to be used for caching. vttablet analyzes every incoming query and generate a query plan, these plans are being cached in a lru cache. This config controls the capacity of the lru cache. (default 33554432)
300300
--queryserver-config-query-pool-timeout duration query server query pool timeout, it is how long vttablet waits for a connection from the query pool. If set to 0 (default) then the overall query timeout is used instead.
301+
--queryserver-config-query-pool-waiter-cap uint query server query pool waiter cap is the maximum number of queries allowed to wait for a connection from the pool. If set to 0 (default) then there is no limit.
301302
--queryserver-config-query-timeout duration query server query timeout, this is the query timeout in vttablet side. If a query takes more than this timeout, it will be killed. (default 30s)
302303
--queryserver-config-schema-change-signal query server schema signal, will signal connected vtgates that schema has changed whenever this is detected. VTGates will need to have -schema_change_signal enabled for this to work (default true)
303304
--queryserver-config-schema-reload-time duration query server schema reload time, how often vttablet reloads schemas from underlying MySQL instance. vttablet keeps table schemas in its own memory and periodically refreshes it from MySQL. This config controls the reload time. (default 30m0s)
304305
--queryserver-config-stream-buffer-size int query server stream buffer size, the maximum number of bytes sent from vttablet for each stream call. It's recommended to keep this value in sync with vtgate's stream_buffer_size. (default 32768)
305306
--queryserver-config-stream-pool-size int query server stream connection pool size, stream pool is used by stream queries: queries that return results to client in a streaming fashion (default 200)
306307
--queryserver-config-stream-pool-timeout duration query server stream pool timeout, it is how long vttablet waits for a connection from the stream pool. If set to 0 (default) then there is no timeout.
308+
--queryserver-config-stream-pool-waiter-cap uint query server stream pool waiter cap is the maximum number of streaming queries allowed to wait for a connection from the pool. If set to 0 (default) then there is no limit.
307309
--queryserver-config-strict-table-acl only allow queries that pass table acl checks
308310
--queryserver-config-terse-errors prevent bind vars from escaping in client error messages
309311
--queryserver-config-transaction-cap int query server transaction cap is the maximum number of transactions allowed to happen at any given point of a time for a single vttablet. E.g. by setting transaction cap to 100, there are at most 100 transactions will be processed by a vttablet and the 101th transaction will be blocked (and fail if it cannot get connection within specified timeout) (default 20)
310312
--queryserver-config-transaction-timeout duration query server transaction timeout, a transaction will be killed if it takes longer than this value (default 30s)
311313
--queryserver-config-truncate-error-len int truncate errors sent to client if they are longer than this value (0 means do not truncate)
312314
--queryserver-config-txpool-timeout duration query server transaction pool timeout, it is how long vttablet waits if tx pool is full (default 1s)
315+
--queryserver-config-txpool-waiter-cap uint query server transaction pool waiter cap is the maximum number of transactions allowed to wait for a connection from the pool. If set to 0 (default) then there is no limit.
313316
--queryserver-config-warn-result-size int query server result size warning threshold, warn if number of rows returned from vttablet for non-streaming queries exceeds this
314317
--queryserver-enable-settings-pool Enable pooling of connections with modified system settings (default true)
315318
--queryserver-enable-views Enable views support in vttablet.

go/flags/endtoend/vttablet.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,18 +288,21 @@ Flags:
288288
--queryserver-config-pool-size int query server read pool size, connection pool is used by regular queries (non streaming, not in a transaction) (default 16)
289289
--queryserver-config-query-cache-memory int query server query cache size in bytes, maximum amount of memory to be used for caching. vttablet analyzes every incoming query and generate a query plan, these plans are being cached in a lru cache. This config controls the capacity of the lru cache. (default 33554432)
290290
--queryserver-config-query-pool-timeout duration query server query pool timeout, it is how long vttablet waits for a connection from the query pool. If set to 0 (default) then the overall query timeout is used instead.
291+
--queryserver-config-query-pool-waiter-cap uint query server query pool waiter cap is the maximum number of queries allowed to wait for a connection from the pool. If set to 0 (default) then there is no limit.
291292
--queryserver-config-query-timeout duration query server query timeout, this is the query timeout in vttablet side. If a query takes more than this timeout, it will be killed. (default 30s)
292293
--queryserver-config-schema-change-signal query server schema signal, will signal connected vtgates that schema has changed whenever this is detected. VTGates will need to have -schema_change_signal enabled for this to work (default true)
293294
--queryserver-config-schema-reload-time duration query server schema reload time, how often vttablet reloads schemas from underlying MySQL instance. vttablet keeps table schemas in its own memory and periodically refreshes it from MySQL. This config controls the reload time. (default 30m0s)
294295
--queryserver-config-stream-buffer-size int query server stream buffer size, the maximum number of bytes sent from vttablet for each stream call. It's recommended to keep this value in sync with vtgate's stream_buffer_size. (default 32768)
295296
--queryserver-config-stream-pool-size int query server stream connection pool size, stream pool is used by stream queries: queries that return results to client in a streaming fashion (default 200)
296297
--queryserver-config-stream-pool-timeout duration query server stream pool timeout, it is how long vttablet waits for a connection from the stream pool. If set to 0 (default) then there is no timeout.
298+
--queryserver-config-stream-pool-waiter-cap uint query server stream pool waiter cap is the maximum number of streaming queries allowed to wait for a connection from the pool. If set to 0 (default) then there is no limit.
297299
--queryserver-config-strict-table-acl only allow queries that pass table acl checks
298300
--queryserver-config-terse-errors prevent bind vars from escaping in client error messages
299301
--queryserver-config-transaction-cap int query server transaction cap is the maximum number of transactions allowed to happen at any given point of a time for a single vttablet. E.g. by setting transaction cap to 100, there are at most 100 transactions will be processed by a vttablet and the 101th transaction will be blocked (and fail if it cannot get connection within specified timeout) (default 20)
300302
--queryserver-config-transaction-timeout duration query server transaction timeout, a transaction will be killed if it takes longer than this value (default 30s)
301303
--queryserver-config-truncate-error-len int truncate errors sent to client if they are longer than this value (0 means do not truncate)
302304
--queryserver-config-txpool-timeout duration query server transaction pool timeout, it is how long vttablet waits if tx pool is full (default 1s)
305+
--queryserver-config-txpool-waiter-cap uint query server transaction pool waiter cap is the maximum number of transactions allowed to wait for a connection from the pool. If set to 0 (default) then there is no limit.
303306
--queryserver-config-warn-result-size int query server result size warning threshold, warn if number of rows returned from vttablet for non-streaming queries exceeds this
304307
--queryserver-enable-settings-pool Enable pooling of connections with modified system settings (default true)
305308
--queryserver-enable-views Enable views support in vttablet.

go/pools/smartconnpool/pool.go

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package smartconnpool
1818

1919
import (
2020
"context"
21+
"errors"
2122
"math/rand/v2"
2223
"sync"
2324
"sync/atomic"
@@ -39,6 +40,9 @@ var (
3940
// ErrConnPoolClosed is returned when trying to get a connection from a closed conn pool
4041
ErrConnPoolClosed = vterrors.New(vtrpcpb.Code_INTERNAL, "connection pool is closed")
4142

43+
// ErrPoolWaiterCapReached is returned when the waiter cap has been reached
44+
ErrPoolWaiterCapReached = vterrors.New(vtrpcpb.Code_RESOURCE_EXHAUSTED, "connection pool waiter cap reached")
45+
4246
// PoolCloseTimeout is how long to wait for all connections to be returned to the pool during close
4347
PoolCloseTimeout = 10 * time.Second
4448
)
@@ -52,6 +56,7 @@ type Metrics struct {
5256
idleClosed atomic.Int64
5357
diffSetting atomic.Int64
5458
resetSetting atomic.Int64
59+
waiterCapRejected atomic.Int64
5560
}
5661

5762
func (m *Metrics) MaxLifetimeClosed() int64 {
@@ -86,6 +91,10 @@ func (m *Metrics) ResetSettingCount() int64 {
8691
return m.resetSetting.Load()
8792
}
8893

94+
func (m *Metrics) WaiterCapRejected() int64 {
95+
return m.waiterCapRejected.Load()
96+
}
97+
8998
type Connector[C Connection] func(ctx context.Context) (C, error)
9099
type RefreshCheck func() (bool, error)
91100

@@ -94,6 +103,7 @@ type Config[C Connection] struct {
94103
IdleTimeout time.Duration
95104
MaxLifetime time.Duration
96105
RefreshInterval time.Duration
106+
MaxWaiters uint
97107
LogWait func(time.Time)
98108
}
99109

@@ -144,6 +154,9 @@ type ConnPool[C Connection] struct {
144154
refreshInterval atomic.Int64
145155
// logWait is called every time a client must block waiting for a connection
146156
logWait func(time.Time)
157+
// maxWaiters is the maximum number of clients that can be waiting for a connection;
158+
// 0 means unlimited
159+
maxWaiters uint
147160
}
148161

149162
Metrics Metrics
@@ -159,7 +172,14 @@ func NewPool[C Connection](config *Config[C]) *ConnPool[C] {
159172
pool.config.idleTimeout.Store(config.IdleTimeout.Nanoseconds())
160173
pool.config.refreshInterval.Store(config.RefreshInterval.Nanoseconds())
161174
pool.config.logWait = config.LogWait
175+
pool.config.maxWaiters = config.MaxWaiters
162176
pool.wait.init()
177+
pool.wait.onWait = func() {
178+
pool.Metrics.waitCount.Add(1)
179+
}
180+
pool.wait.onWaiterCapReached = func() {
181+
pool.Metrics.waiterCapRejected.Add(1)
182+
}
163183

164184
return pool
165185
}
@@ -353,8 +373,7 @@ func (pool *ConnPool[D]) RefreshInterval() time.Duration {
353373
return time.Duration(pool.config.refreshInterval.Load())
354374
}
355375

356-
func (pool *ConnPool[C]) recordWait(start time.Time) {
357-
pool.Metrics.waitCount.Add(1)
376+
func (pool *ConnPool[C]) recordWaitDuration(start time.Time) {
358377
pool.Metrics.waitTime.Add(time.Since(start).Nanoseconds())
359378
if pool.config.logWait != nil {
360379
pool.config.logWait(start)
@@ -571,11 +590,14 @@ func (pool *ConnPool[C]) get(ctx context.Context) (*Pooled[C], error) {
571590
return nil, ErrConnPoolClosed
572591
}
573592

574-
conn, err = pool.wait.waitForConn(ctx, nil, *closeChan)
593+
conn, err = pool.wait.waitForConn(ctx, nil, *closeChan, pool.config.maxWaiters)
575594
if err != nil {
595+
if errors.Is(err, ErrPoolWaiterCapReached) {
596+
return nil, err
597+
}
576598
return nil, ErrTimeout
577599
}
578-
pool.recordWait(start)
600+
pool.recordWaitDuration(start)
579601
}
580602
// no connections available and no connections to wait for (pool is closed)
581603
if conn == nil {
@@ -634,11 +656,14 @@ func (pool *ConnPool[C]) getWithSetting(ctx context.Context, setting *Setting) (
634656
return nil, ErrConnPoolClosed
635657
}
636658

637-
conn, err = pool.wait.waitForConn(ctx, setting, *closeChan)
659+
conn, err = pool.wait.waitForConn(ctx, setting, *closeChan, pool.config.maxWaiters)
638660
if err != nil {
661+
if errors.Is(err, ErrPoolWaiterCapReached) {
662+
return nil, err
663+
}
639664
return nil, ErrTimeout
640665
}
641-
pool.recordWait(start)
666+
pool.recordWaitDuration(start)
642667
}
643668
// no connections available and no connections to wait for (pool is closed)
644669
if conn == nil {
@@ -873,6 +898,9 @@ func (pool *ConnPool[C]) RegisterStats(stats *servenv.Exporter, name string) {
873898
stats.NewCounterFunc(name+"GetSetting", "Tablet server conn pool get with setting count", func() int64 {
874899
return pool.Metrics.GetSettingCount()
875900
})
901+
stats.NewCounterFunc(name+"WaiterCapRejected", "Tablet server conn pool waiter cap rejected", func() int64 {
902+
return pool.Metrics.WaiterCapRejected()
903+
})
876904
stats.NewCounterFunc(name+"DiffSetting", "Number of times pool applied different setting", func() int64 {
877905
return pool.Metrics.DiffSettingCount()
878906
})

go/pools/smartconnpool/waitlist.go

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,21 +39,60 @@ type waitlist[C Connection] struct {
3939
nodes sync.Pool
4040
mu sync.Mutex
4141
list list.List[waiter[C]]
42+
// onWait is called when a client gets to the point in which it is waiting for a connection - or the mutex that it needs to grab to wait for a connection.
43+
onWait func()
44+
// onWaiterCapReached is called when the waitlist has reached its maximum capacity.
45+
onWaiterCapReached func()
4246
}
4347

4448
// waitForConn blocks until a connection with the given Setting is returned by another client,
4549
// or until the given context expires.
50+
// If maxWaiters is > 0 and the waitlist already has that many waiters, it returns
51+
// ErrPoolWaiterCapReached immediately without blocking.
4652
// The returned connection may _not_ have the requested Setting. This function can
4753
// also return a `nil` connection even if our context has expired, if the pool has
4854
// forced an expiration of all waiters in the waitlist.
49-
func (wl *waitlist[C]) waitForConn(ctx context.Context, setting *Setting, closeChan <-chan struct{}) (*Pooled[C], error) {
55+
func (wl *waitlist[C]) waitForConn(ctx context.Context, setting *Setting, closeChan <-chan struct{}, maxWaiters uint) (*Pooled[C], error) {
5056
elem := wl.nodes.Get().(*list.Element[waiter[C]])
5157
defer wl.nodes.Put(elem)
5258

5359
elem.Value = waiter[C]{conn: elem.Value.conn, setting: setting}
5460

61+
// Fast path: reject early using an atomic read of the list length to avoid
62+
// contending on the mutex under high query rates. This is racy — the count
63+
// can change between this check and the lock acquisition — so we re-check
64+
// under the lock below for correctness. Still, we expect to reject most
65+
// requests early here when under a heavy load.
66+
//
67+
// We do this here rather than further upstream (e.g. in ConnPool.Get) because
68+
// callers only reach waitForConn after exhausting all other options (idle
69+
// connections, new connections, settings stacks). There is no point in checking
70+
// there when those requests can still get a connection without waiting. The cap
71+
// is just for waiting.
72+
if wl.aboveWaiterCap(maxWaiters) {
73+
if wl.onWaiterCapReached != nil {
74+
wl.onWaiterCapReached()
75+
}
76+
return nil, ErrPoolWaiterCapReached
77+
}
78+
79+
// If we reach this point, we are waiting, at the very least on the mutex, likely
80+
// on the connection. So call onWait which takes care of recording the wait.
81+
if wl.onWait != nil {
82+
wl.onWait()
83+
}
84+
5585
wl.mu.Lock()
56-
// add ourselves as a waiter at the end of the waitlist
86+
// Strict check: the list length may have changed since the lockless check
87+
// above, so we verify again while holding the lock to guarantee the cap is
88+
// never exceeded.
89+
if wl.aboveWaiterCap(maxWaiters) {
90+
wl.mu.Unlock()
91+
if wl.onWaiterCapReached != nil {
92+
wl.onWaiterCapReached()
93+
}
94+
return nil, ErrPoolWaiterCapReached
95+
}
5796
wl.list.PushBackValue(elem)
5897
wl.mu.Unlock()
5998

@@ -110,6 +149,10 @@ func (wl *waitlist[C]) waitForConn(ctx context.Context, setting *Setting, closeC
110149
}
111150
}
112151

152+
func (wl *waitlist[C]) aboveWaiterCap(maxWaiters uint) bool {
153+
return maxWaiters > 0 && wl.list.Len() >= int(maxWaiters)
154+
}
155+
113156
func (wl *waitlist[C]) maybeStarvingCount() (maybeStarving int) {
114157
if wl.list.Len() == 0 {
115158
return

go/pools/smartconnpool/waitlist_test.go

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ func TestWaitlistPoolCloseWithMultipleWaiters(t *testing.T) {
4040

4141
for i := 0; i < waiterCount; i++ {
4242
go func() {
43-
_, err := wait.waitForConn(ctx, nil, poolClose)
43+
_, err := wait.waitForConn(ctx, nil, poolClose, 0)
4444

4545
if err != nil {
4646
expireCount.Add(1)
@@ -68,3 +68,34 @@ func TestWaitlistPoolCloseWithMultipleWaiters(t *testing.T) {
6868

6969
assert.Equal(t, int32(waiterCount), expireCount.Load())
7070
}
71+
72+
func TestWaitlistWaiterCap(t *testing.T) {
73+
wl := waitlist[*TestConn]{}
74+
wl.init()
75+
76+
poolClose := make(chan struct{})
77+
78+
const maxWaiters = 3
79+
80+
errs := make(chan error, maxWaiters)
81+
for i := 1; i <= maxWaiters; i++ {
82+
go func() {
83+
_, err := wl.waitForConn(context.Background(), nil, poolClose, maxWaiters)
84+
errs <- err
85+
}()
86+
87+
assert.Eventually(t, func() bool {
88+
return wl.waiting() == i
89+
}, time.Second, 5*time.Millisecond)
90+
}
91+
92+
_, err := wl.waitForConn(context.Background(), nil, poolClose, maxWaiters)
93+
assert.ErrorIs(t, err, ErrPoolWaiterCapReached)
94+
assert.Equal(t, maxWaiters, wl.waiting())
95+
96+
close(poolClose)
97+
98+
for i := 0; i < maxWaiters; i++ {
99+
assert.NotErrorIs(t, <-errs, ErrPoolWaiterCapReached)
100+
}
101+
}

go/vt/vttablet/tabletserver/connpool/pool.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ func NewPool(env tabletenv.Env, name string, cfg tabletenv.ConnPoolConfig) *Pool
7171
IdleTimeout: cfg.IdleTimeout,
7272
MaxLifetime: cfg.MaxLifetime,
7373
RefreshInterval: mysqlctl.PoolDynamicHostnameResolution,
74+
MaxWaiters: cfg.MaxWaiters,
7475
}
7576

7677
if name != "" {

0 commit comments

Comments
 (0)