Skip to content

Commit 45d9375

Browse files
committed
Slack is noticing a lot of "stream lagged behind during consolidation"
errors because the "streamBufferSize" is hard-coded at only 8. This PR adds vttablet cli arg --consolidator-stream-buffer-size which allows increasing beyond the default (8, for parity) if the site prefers trading some more memory so large results complete.
1 parent 18ff7c0 commit 45d9375

7 files changed

Lines changed: 48 additions & 41 deletions

File tree

go/flags/endtoend/vtcombo.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ Flags:
4848
--config-persistence-min-interval duration minimum interval between persisting dynamic config changes back to disk (if no change has occurred, nothing is done). (default 1s)
4949
--config-type string Config file type (omit to infer config type from file extension).
5050
--consolidator-query-waiter-cap int Configure the maximum number of clients allowed to wait on the consolidator.
51+
--consolidator-stream-buffer-size int Configure the stream consolidator buffer size for follower streams. A larger buffer reduces the chance of a follower being dropped during consolidation at the cost of increased memory usage. (default 8)
5152
--consolidator-stream-query-size int Configure the stream consolidator query size in bytes. Setting to 0 disables the stream consolidator. (default 2097152)
5253
--consolidator-stream-total-size int Configure the stream consolidator total size in bytes. Setting to 0 disables the stream consolidator. (default 134217728)
5354
--consul-auth-static-file string JSON File to read the topos/tokens from.

go/flags/endtoend/vttablet.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ Flags:
8181
--config-persistence-min-interval duration minimum interval between persisting dynamic config changes back to disk (if no change has occurred, nothing is done). (default 1s)
8282
--config-type string Config file type (omit to infer config type from file extension).
8383
--consolidator-query-waiter-cap int Configure the maximum number of clients allowed to wait on the consolidator.
84+
--consolidator-stream-buffer-size int Configure the stream consolidator buffer size for follower streams. A larger buffer reduces the chance of a follower being dropped during consolidation at the cost of increased memory usage. (default 8)
8485
--consolidator-stream-query-size int Configure the stream consolidator query size in bytes. Setting to 0 disables the stream consolidator. (default 2097152)
8586
--consolidator-stream-total-size int Configure the stream consolidator total size in bytes. Setting to 0 disables the stream consolidator. (default 134217728)
8687
--consul-auth-static-file string JSON File to read the topos/tokens from.

go/vt/vttablet/tabletserver/query_engine.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ func NewQueryEngine(env tabletenv.Env, se *schema.Engine) *QueryEngine {
238238
qe.consolidator = sync2.NewConsolidator()
239239
if config.ConsolidatorStreamTotalSize > 0 && config.ConsolidatorStreamQuerySize > 0 {
240240
log.Info(fmt.Sprintf("Stream consolidator is enabled with query size set to %d and total size set to %d.", config.ConsolidatorStreamQuerySize, config.ConsolidatorStreamTotalSize))
241-
qe.streamConsolidator = NewStreamConsolidator(config.ConsolidatorStreamTotalSize, config.ConsolidatorStreamQuerySize, returnStreamResult)
241+
qe.streamConsolidator = NewStreamConsolidator(config.ConsolidatorStreamTotalSize, config.ConsolidatorStreamQuerySize, config.ConsolidatorStreamBufferSize, returnStreamResult)
242242
} else {
243243
log.Info("Stream consolidator is not enabled.")
244244
}

go/vt/vttablet/tabletserver/stream_consolidator.go

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,29 +28,30 @@ import (
2828
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
2929
)
3030

31-
const streamBufferSize = 8
32-
3331
// StreamConsolidator is a data structure capable of merging several identical streaming queries so only
3432
// one query is executed in MySQL and its response is fanned out to all the clients simultaneously.
3533
type StreamConsolidator struct {
3634
mu sync.Mutex
3735
inflight map[string]*streamInFlight
3836
memory int64
3937
maxMemoryTotal, maxMemoryQuery int64
38+
streamBufferSize int
4039
blocking bool
4140
cleanup StreamCallback
4241
}
4342

4443
// NewStreamConsolidator allocates a stream consolidator. The consolidator will use up to maxMemoryTotal
4544
// bytes in order to allow simultaneous queries to "catch up" to each other. Each individual stream will
4645
// only use up to maxMemoryQuery bytes of memory as a history buffer to catch up.
47-
func NewStreamConsolidator(maxMemoryTotal, maxMemoryQuery int64, cleanup StreamCallback) *StreamConsolidator {
46+
// streamBufferSize controls the channel buffer depth for each follower stream.
47+
func NewStreamConsolidator(maxMemoryTotal, maxMemoryQuery int64, streamBufferSize int, cleanup StreamCallback) *StreamConsolidator {
4848
return &StreamConsolidator{
49-
inflight: make(map[string]*streamInFlight),
50-
maxMemoryTotal: maxMemoryTotal,
51-
maxMemoryQuery: maxMemoryQuery,
52-
blocking: false,
53-
cleanup: cleanup,
49+
inflight: make(map[string]*streamInFlight),
50+
maxMemoryTotal: maxMemoryTotal,
51+
maxMemoryQuery: maxMemoryQuery,
52+
streamBufferSize: streamBufferSize,
53+
blocking: false,
54+
cleanup: cleanup,
5455
}
5556
}
5657

@@ -87,7 +88,7 @@ func (sc *StreamConsolidator) Consolidate(waitTimings *servenv.TimingsWrapper, l
8788

8889
// if there's an existing stream for our query, try to follow it
8990
if inflight != nil {
90-
catchup, followChan = inflight.follow()
91+
catchup, followChan = inflight.follow(sc.streamBufferSize)
9192
}
9293

9394
// if there isn't an existing stream; OR if there is an existing stream but
@@ -194,7 +195,7 @@ type streamInFlight struct {
194195
// that will receive all the Results in the future.
195196
// If this stream has been running for too long and we cannot catch up to it, follow
196197
// returns a nil channel.
197-
func (s *streamInFlight) follow() ([]*sqltypes.Result, chan *sqltypes.Result) {
198+
func (s *streamInFlight) follow(streamBufferSize int) ([]*sqltypes.Result, chan *sqltypes.Result) {
198199
s.mu.Lock()
199200
defer s.mu.Unlock()
200201

go/vt/vttablet/tabletserver/stream_consolidator_flaky_test.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ func (ct *consolidationTest) run(workers int, generateCallback func(int) (string
148148

149149
func TestConsolidatorSimple(t *testing.T) {
150150
ct := consolidationTest{
151-
cc: NewStreamConsolidator(128*1024, 2*1024, nocleanup),
151+
cc: NewStreamConsolidator(128*1024, 2*1024, 8, nocleanup),
152152
streamItemDelay: 10 * time.Millisecond,
153153
streamItemCount: 10,
154154
}
@@ -175,7 +175,7 @@ func TestConsolidatorSimple(t *testing.T) {
175175
func TestConsolidatorErrorPropagation(t *testing.T) {
176176
t.Run("from mysql", func(t *testing.T) {
177177
ct := consolidationTest{
178-
cc: NewStreamConsolidator(128*1024, 2*1024, nocleanup),
178+
cc: NewStreamConsolidator(128*1024, 2*1024, 8, nocleanup),
179179
leaderCallback: func(callback StreamCallback) error {
180180
time.Sleep(100 * time.Millisecond)
181181
return errors.New("mysqld error")
@@ -195,7 +195,7 @@ func TestConsolidatorErrorPropagation(t *testing.T) {
195195

196196
t.Run("from leader", func(t *testing.T) {
197197
ct := consolidationTest{
198-
cc: NewStreamConsolidator(128*1024, 2*1024, nocleanup),
198+
cc: NewStreamConsolidator(128*1024, 2*1024, 8, nocleanup),
199199
streamItemDelay: 10 * time.Millisecond,
200200
streamItemCount: 10,
201201
}
@@ -229,7 +229,7 @@ func TestConsolidatorErrorPropagation(t *testing.T) {
229229

230230
t.Run("from followers", func(t *testing.T) {
231231
ct := consolidationTest{
232-
cc: NewStreamConsolidator(128*1024, 2*1024, nocleanup),
232+
cc: NewStreamConsolidator(128*1024, 2*1024, 8, nocleanup),
233233
streamItemDelay: 10 * time.Millisecond,
234234
streamItemCount: 10,
235235
}
@@ -263,7 +263,7 @@ func TestConsolidatorErrorPropagation(t *testing.T) {
263263

264264
func TestConsolidatorDelayedListener(t *testing.T) {
265265
ct := consolidationTest{
266-
cc: NewStreamConsolidator(128*1024, 2*1024, nocleanup),
266+
cc: NewStreamConsolidator(128*1024, 2*1024, 8, nocleanup),
267267
streamItemDelay: 1 * time.Millisecond,
268268
streamItemCount: 100,
269269
}
@@ -304,7 +304,7 @@ func TestConsolidatorDelayedListener(t *testing.T) {
304304
func TestConsolidatorMemoryLimits(t *testing.T) {
305305
t.Run("rows too large", func(t *testing.T) {
306306
ct := consolidationTest{
307-
cc: NewStreamConsolidator(128*1024, 32, nocleanup),
307+
cc: NewStreamConsolidator(128*1024, 32, 8, nocleanup),
308308
streamItemDelay: 1 * time.Millisecond,
309309
streamItemCount: 100,
310310
}
@@ -326,7 +326,7 @@ func TestConsolidatorMemoryLimits(t *testing.T) {
326326

327327
t.Run("two-phase consolidation (time)", func(t *testing.T) {
328328
ct := consolidationTest{
329-
cc: NewStreamConsolidator(128*1024, 2*1024, nocleanup),
329+
cc: NewStreamConsolidator(128*1024, 2*1024, 8, nocleanup),
330330
streamItemDelay: 2 * time.Millisecond,
331331
streamItemCount: 10,
332332
}
@@ -354,7 +354,7 @@ func TestConsolidatorMemoryLimits(t *testing.T) {
354354
rsize := results[0].CachedSize(true)
355355

356356
ct := consolidationTest{
357-
cc: NewStreamConsolidator(128*1024, rsize*streamsInFirstBatch+1, nocleanup),
357+
cc: NewStreamConsolidator(128*1024, rsize*streamsInFirstBatch+1, 8, nocleanup),
358358
streamItemDelay: 1 * time.Millisecond,
359359
streamItems: results,
360360
}
@@ -379,7 +379,7 @@ func TestConsolidatorMemoryLimits(t *testing.T) {
379379
rsize := results[0].CachedSize(true)
380380

381381
ct := consolidationTest{
382-
cc: NewStreamConsolidator(128*1024, rsize*2+1, nocleanup),
382+
cc: NewStreamConsolidator(128*1024, rsize*2+1, 8, nocleanup),
383383
streamItemDelay: 10 * time.Millisecond,
384384
streamItems: results,
385385
}

go/vt/vttablet/tabletserver/tabletenv/config.go

Lines changed: 24 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,7 @@ func registerTabletEnvFlags(fs *pflag.FlagSet) {
204204
utils.SetFlagBoolVar(fs, &enableConsolidatorReplicas, "enable-consolidator-replicas", false, "This option enables the query consolidator only on replicas.")
205205
fs.Int64Var(&currentConfig.ConsolidatorStreamQuerySize, "consolidator-stream-query-size", defaultConfig.ConsolidatorStreamQuerySize, "Configure the stream consolidator query size in bytes. Setting to 0 disables the stream consolidator.")
206206
fs.Int64Var(&currentConfig.ConsolidatorStreamTotalSize, "consolidator-stream-total-size", defaultConfig.ConsolidatorStreamTotalSize, "Configure the stream consolidator total size in bytes. Setting to 0 disables the stream consolidator.")
207+
fs.IntVar(&currentConfig.ConsolidatorStreamBufferSize, "consolidator-stream-buffer-size", defaultConfig.ConsolidatorStreamBufferSize, "Configure the stream consolidator buffer size for follower streams. A larger buffer reduces the chance of a follower being dropped during consolidation at the cost of increased memory usage.")
207208

208209
fs.Int64Var(&currentConfig.ConsolidatorQueryWaiterCap, "consolidator-query-waiter-cap", 0, "Configure the maximum number of clients allowed to wait on the consolidator.")
209210
utils.SetFlagDurationVar(fs, &healthCheckInterval, "health-check-interval", defaultConfig.Healthcheck.Interval, "Interval between health checks")
@@ -335,24 +336,25 @@ type TabletConfig struct {
335336
ReplicationTracker ReplicationTrackerConfig `json:"replicationTracker"`
336337

337338
// Consolidator can be enable, disable, or notOnPrimary. Default is enable.
338-
Consolidator string `json:"consolidator,omitempty"`
339-
PassthroughDML bool `json:"passthroughDML,omitempty"`
340-
StreamBufferSize int `json:"streamBufferSize,omitempty"`
341-
ConsolidatorStreamTotalSize int64 `json:"consolidatorStreamTotalSize,omitempty"`
342-
ConsolidatorStreamQuerySize int64 `json:"consolidatorStreamQuerySize,omitempty"`
343-
ConsolidatorQueryWaiterCap int64 `json:"consolidatorMaxQueryWait,omitempty"`
344-
QueryCacheMemory int64 `json:"queryCacheMemory,omitempty"`
345-
QueryCacheDoorkeeper bool `json:"queryCacheDoorkeeper,omitempty"`
346-
SchemaReloadInterval time.Duration `json:"schemaReloadIntervalSeconds,omitempty"`
347-
SchemaChangeReloadTimeout time.Duration `json:"schemaChangeReloadTimeout,omitempty"`
348-
WatchReplication bool `json:"watchReplication,omitempty"` // Ignored and unused, remove in v25
349-
TrackSchemaVersions bool `json:"trackSchemaVersions,omitempty"`
350-
SchemaVersionMaxAgeSeconds int64 `json:"schemaVersionMaxAgeSeconds,omitempty"`
351-
TerseErrors bool `json:"terseErrors,omitempty"`
352-
TruncateErrorLen int `json:"truncateErrorLen,omitempty"`
353-
AnnotateQueries bool `json:"annotateQueries,omitempty"`
354-
MessagePostponeParallelism int `json:"messagePostponeParallelism,omitempty"`
355-
SignalWhenSchemaChange bool `json:"signalWhenSchemaChange,omitempty"`
339+
Consolidator string `json:"consolidator,omitempty"`
340+
PassthroughDML bool `json:"passthroughDML,omitempty"`
341+
StreamBufferSize int `json:"streamBufferSize,omitempty"`
342+
ConsolidatorStreamTotalSize int64 `json:"consolidatorStreamTotalSize,omitempty"`
343+
ConsolidatorStreamQuerySize int64 `json:"consolidatorStreamQuerySize,omitempty"`
344+
ConsolidatorStreamBufferSize int `json:"consolidatorStreamBufferSize,omitempty"`
345+
ConsolidatorQueryWaiterCap int64 `json:"consolidatorMaxQueryWait,omitempty"`
346+
QueryCacheMemory int64 `json:"queryCacheMemory,omitempty"`
347+
QueryCacheDoorkeeper bool `json:"queryCacheDoorkeeper,omitempty"`
348+
SchemaReloadInterval time.Duration `json:"schemaReloadIntervalSeconds,omitempty"`
349+
SchemaChangeReloadTimeout time.Duration `json:"schemaChangeReloadTimeout,omitempty"`
350+
WatchReplication bool `json:"watchReplication,omitempty"` // Ignored and unused, remove in v25
351+
TrackSchemaVersions bool `json:"trackSchemaVersions,omitempty"`
352+
SchemaVersionMaxAgeSeconds int64 `json:"schemaVersionMaxAgeSeconds,omitempty"`
353+
TerseErrors bool `json:"terseErrors,omitempty"`
354+
TruncateErrorLen int `json:"truncateErrorLen,omitempty"`
355+
AnnotateQueries bool `json:"annotateQueries,omitempty"`
356+
MessagePostponeParallelism int `json:"messagePostponeParallelism,omitempty"`
357+
SignalWhenSchemaChange bool `json:"signalWhenSchemaChange,omitempty"`
356358

357359
ExternalConnections map[string]*dbconfigs.DBConfigs `json:"externalConnections,omitempty"`
358360

@@ -1093,9 +1095,10 @@ var defaultConfig = TabletConfig{
10931095
// of them ready in MySQL and profit from a pipelining effect.
10941096
MaxConcurrency: 5,
10951097
},
1096-
Consolidator: Enable,
1097-
ConsolidatorStreamTotalSize: 128 * 1024 * 1024,
1098-
ConsolidatorStreamQuerySize: 2 * 1024 * 1024,
1098+
Consolidator: Enable,
1099+
ConsolidatorStreamTotalSize: 128 * 1024 * 1024,
1100+
ConsolidatorStreamQuerySize: 2 * 1024 * 1024,
1101+
ConsolidatorStreamBufferSize: 8,
10991102
// The value for StreamBufferSize was chosen after trying out a few of
11001103
// them. Too small buffers force too many packets to be sent. Too big
11011104
// buffers force the clients to read them in multiple chunks and make

go/vt/vttablet/tabletserver/tabletenv/config_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ func TestDefaultConfig(t *testing.T) {
132132
gotBytes, err := yaml2.Marshal(NewDefaultConfig())
133133
require.NoError(t, err)
134134
want := `consolidator: enable
135+
consolidatorStreamBufferSize: 8
135136
consolidatorStreamQuerySize: 2097152
136137
consolidatorStreamTotalSize: 134217728
137138
gracePeriods:

0 commit comments

Comments
 (0)