Skip to content

Commit a20d887

Browse files
bgwinesClaude
andauthored
Backport vitessio#19872: cache proto3 row encoding to reduce query consolidation memory (#846)
* vttablet: cache proto3 row encoding to reduce query consolidation memory utilization (vitessio#19872) Signed-off-by: Brett Wines <bwines@slack-corp.com> Co-authored-by: Claude <svc-devxp-claude@slack-corp.com> * fix: resolve CI failures from backport cherry-pick Two issues from the cherry-pick: 1. sizegen formatting: remove blank lines between generated functions in cached_size.go to match sizegen's output format 2. test panic: set FakePendingResult.Consolidator in the waiter cap reject test to prevent nil pointer dereference in AddWaiterCounter Co-Authored-By: Claude <svc-devxp-claude@slack-corp.com> AI disclosure: Claude Code assisted with development. Every line of code was either written by or carefully reviewed by me :) * Fix TestQueryExecutorConsolidatorWaiterCapReject: set consolidator total waiter count The test was setting per-query WaiterCount but not the consolidator's TotalWaiterCount, which is what the actual code checks to decide whether the cap is exceeded. Also corrected assertions to match the actual code path (one AddWaiterCounter(-1) call, not two). Co-Authored-By: Claude <svc-devxp-claude@slack-corp.com> AI disclosure: Claude Code assisted with development. Every line of code was either written by or carefully reviewed by me :) --------- Signed-off-by: Brett Wines <bwines@slack-corp.com> Co-authored-by: Claude <svc-devxp-claude@slack-corp.com>
1 parent 5da5871 commit a20d887

13 files changed

Lines changed: 360 additions & 27 deletions

File tree

go/flags/endtoend/vtcombo.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ Flags:
4444
--config-path strings Paths to search for config files in. (default [{{ .Workdir }}])
4545
--config-persistence-min-interval duration minimum interval between persisting dynamic config changes back to disk (if no change has occurred, nothing is done). (default 1s)
4646
--config-type string Config file type (omit to infer config type from file extension).
47+
--consolidator-cache-proto3-rows If true, the consolidation leader pre-caches proto3-encoded rows so that waiters avoid redundant encoding work.
4748
--consolidator-query-waiter-cap int Configure the maximum number of clients allowed to wait on the consolidator.
4849
--consolidator-query-waiter-cap-method string Configure the method when consolidator waiter cap is exceeded. Options: fallthrough, reject. (default "fallthrough")
4950
--consolidator-stream-query-size int Configure the stream consolidator query size in bytes. Setting to 0 disables the stream consolidator. (default 2097152)

go/flags/endtoend/vttablet.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ Flags:
7878
--config-path strings Paths to search for config files in. (default [{{ .Workdir }}])
7979
--config-persistence-min-interval duration minimum interval between persisting dynamic config changes back to disk (if no change has occurred, nothing is done). (default 1s)
8080
--config-type string Config file type (omit to infer config type from file extension).
81+
--consolidator-cache-proto3-rows If true, the consolidation leader pre-caches proto3-encoded rows so that waiters avoid redundant encoding work.
8182
--consolidator-query-waiter-cap int Configure the maximum number of clients allowed to wait on the consolidator.
8283
--consolidator-query-waiter-cap-method string Configure the method when consolidator waiter cap is exceeded. Options: fallthrough, reject. (default "fallthrough")
8384
--consolidator-stream-query-size int Configure the stream consolidator query size in bytes. Setting to 0 disables the stream consolidator. (default 2097152)

go/sqltypes/cached_size.go

Lines changed: 8 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

go/sqltypes/proto3.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,17 +94,25 @@ func proto3ToRows(fields []*querypb.Field, rows []*querypb.Row) [][]Value {
9494
return result
9595
}
9696

97-
// ResultToProto3 converts Result to proto3.
9897
func ResultToProto3(qr *Result) *querypb.QueryResult {
9998
if qr == nil {
10099
return nil
101100
}
101+
// This read is susceptible to TOCTOU if proto3Rows is populated
102+
// concurrently, but the worst case is a redundant RowsToProto3 call.
103+
// In the consolidation path this can't happen today (the leader sets
104+
// the cached value before releasing the RWMutex), but the fallback is
105+
// harmless.
106+
rows := qr.proto3Rows
107+
if rows == nil {
108+
rows = RowsToProto3(qr.Rows)
109+
}
102110
return &querypb.QueryResult{
103111
Fields: qr.Fields,
104112
RowsAffected: qr.RowsAffected,
105113
InsertId: qr.InsertID,
106114
InsertIdChanged: qr.InsertIDChanged,
107-
Rows: RowsToProto3(qr.Rows),
115+
Rows: rows,
108116
Info: qr.Info,
109117
SessionStateChanges: qr.SessionStateChanges,
110118
}

go/sqltypes/proto3_test.go

Lines changed: 227 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@ limitations under the License.
1717
package sqltypes
1818

1919
import (
20+
"fmt"
21+
"runtime"
22+
"strconv"
23+
"sync"
2024
"testing"
2125

2226
"github.com/stretchr/testify/require"
@@ -330,3 +334,226 @@ func TestProto3ValuesEqual(t *testing.T) {
330334
require.Equal(t, tc.expected, Proto3ValuesEqual(tc.v1, tc.v2))
331335
}
332336
}
337+
338+
func TestResultToProto3_CachedRows(t *testing.T) {
339+
fields := []*querypb.Field{{
340+
Name: "col1",
341+
Type: VarChar,
342+
}, {
343+
Name: "col2",
344+
Type: Int64,
345+
}}
346+
result := &Result{
347+
Fields: fields,
348+
RowsAffected: 2,
349+
Rows: [][]Value{{
350+
TestValue(VarChar, "aa"),
351+
TestValue(Int64, "1"),
352+
}, {
353+
TestValue(VarChar, "bb"),
354+
TestValue(Int64, "2"),
355+
}},
356+
}
357+
358+
uncached := ResultToProto3(result)
359+
require.Len(t, uncached.Rows, 2)
360+
361+
result.CacheProto3Rows()
362+
363+
cached := ResultToProto3(result)
364+
require.True(t, proto.Equal(uncached, cached), "cached and uncached proto3 results differ")
365+
366+
require.Same(t, cached.Rows[0], result.proto3Rows[0])
367+
require.Same(t, cached.Rows[1], result.proto3Rows[1])
368+
}
369+
370+
func TestResultToProto3_NilAndEmptyCache(t *testing.T) {
371+
require.Nil(t, ResultToProto3(nil))
372+
var nilResult *Result
373+
nilResult.CacheProto3Rows() // does not panic
374+
375+
result := &Result{
376+
Fields: []*querypb.Field{{Name: "col1", Type: VarChar}},
377+
}
378+
result.CacheProto3Rows()
379+
require.Nil(t, result.proto3Rows)
380+
381+
p3 := ResultToProto3(result)
382+
require.NotNil(t, p3)
383+
require.Empty(t, p3.Rows)
384+
}
385+
386+
func TestCopy_DoesNotPropagateProto3RowCache(t *testing.T) {
387+
result := &Result{
388+
Fields: []*querypb.Field{{Name: "col1", Type: Int64}},
389+
Rows: [][]Value{{
390+
TestValue(Int64, "42"),
391+
}},
392+
}
393+
result.CacheProto3Rows()
394+
require.NotNil(t, result.proto3Rows)
395+
396+
require.Nil(t, result.ShallowCopy().proto3Rows)
397+
require.Nil(t, result.Copy().proto3Rows)
398+
}
399+
400+
func TestMutations_InvalidateCachedProto3Rows(t *testing.T) {
401+
resultWithProto3FieldPopulated := func(t *testing.T) *Result {
402+
t.Helper()
403+
result := &Result{
404+
Fields: []*querypb.Field{{Name: "col1", Type: VarChar}},
405+
Rows: [][]Value{{
406+
TestValue(VarChar, "hello"),
407+
}},
408+
}
409+
result.CacheProto3Rows()
410+
require.NotNil(t, result.proto3Rows)
411+
return result
412+
}
413+
414+
t.Run("AppendResult", func(t *testing.T) {
415+
result := resultWithProto3FieldPopulated(t)
416+
result.AppendResult(&Result{
417+
Rows: [][]Value{{
418+
TestValue(VarChar, "world"),
419+
}},
420+
})
421+
require.Nil(t, result.proto3Rows, "AppendResult must invalidate proto3Rows cache")
422+
423+
p3 := ResultToProto3(result)
424+
require.Len(t, p3.Rows, 2)
425+
})
426+
427+
t.Run("Repair", func(t *testing.T) {
428+
result := resultWithProto3FieldPopulated(t)
429+
result.Repair([]*querypb.Field{{Name: "col1", Type: VarBinary}})
430+
require.Nil(t, result.proto3Rows, "Repair must invalidate proto3Rows cache")
431+
432+
p3 := ResultToProto3(result)
433+
require.Len(t, p3.Rows, 1)
434+
})
435+
}
436+
437+
// makeTestResult builds a Result with numRows rows, each containing 5 columns
438+
// of mixed types. Row values are deterministic so benchmarks are reproducible.
439+
func makeTestResult(numRows int) *Result {
440+
fields := MakeTestFields(
441+
"col1|col2|col3|col4|col5",
442+
"int64|varchar|varchar|float64|int64",
443+
)
444+
rows := make([][]Value, numRows)
445+
for i := range rows {
446+
rows[i] = []Value{
447+
TestValue(Int64, strconv.Itoa(i)),
448+
TestValue(VarChar, fmt.Sprintf("val-%06d", i)),
449+
TestValue(VarChar, fmt.Sprintf("val-%06d-longervalue", i)),
450+
TestValue(Float64, fmt.Sprintf("%d.%02d", i/100, i%100)),
451+
TestValue(Int64, strconv.Itoa(i%2)),
452+
}
453+
}
454+
return &Result{
455+
Fields: fields,
456+
RowsAffected: uint64(numRows),
457+
Rows: rows,
458+
}
459+
}
460+
461+
// BenchmarkResultToProto3 measures per-call allocation cost of ResultToProto3
462+
// with and without CacheProto3Rows, across different result sizes.
463+
func BenchmarkResultToProto3(b *testing.B) {
464+
for _, numRows := range []int{100, 1000, 10000} {
465+
result := makeTestResult(numRows)
466+
467+
b.Run(fmt.Sprintf("rows=%d/uncached", numRows), func(b *testing.B) {
468+
b.ReportAllocs()
469+
for b.Loop() {
470+
ResultToProto3(result)
471+
}
472+
})
473+
474+
b.Run(fmt.Sprintf("rows=%d/cached", numRows), func(b *testing.B) {
475+
result.CacheProto3Rows()
476+
b.ReportAllocs()
477+
for b.Loop() {
478+
ResultToProto3(result)
479+
}
480+
})
481+
}
482+
}
483+
484+
// BenchmarkConsolidationFanOut simulates the consolidator scenario: one shared
485+
// Result is read by N concurrent goroutines calling ResultToProto3. Reports
486+
// total heap bytes allocated across all waiters.
487+
func BenchmarkConsolidationFanOut(b *testing.B) {
488+
const numRows = 10000
489+
490+
for _, waiters := range []int{1, 10, 50} {
491+
result := makeTestResult(numRows)
492+
493+
b.Run(fmt.Sprintf("waiters=%d/uncached", waiters), func(b *testing.B) {
494+
b.ReportAllocs()
495+
for b.Loop() {
496+
var wg sync.WaitGroup
497+
wg.Add(waiters)
498+
for range waiters {
499+
go func() {
500+
defer wg.Done()
501+
ResultToProto3(result)
502+
}()
503+
}
504+
wg.Wait()
505+
}
506+
})
507+
508+
b.Run(fmt.Sprintf("waiters=%d/cached", waiters), func(b *testing.B) {
509+
result.CacheProto3Rows()
510+
b.ReportAllocs()
511+
for b.Loop() {
512+
var wg sync.WaitGroup
513+
wg.Add(waiters)
514+
for range waiters {
515+
go func() {
516+
defer wg.Done()
517+
ResultToProto3(result)
518+
}()
519+
}
520+
wg.Wait()
521+
}
522+
})
523+
524+
// Report aggregate heap delta for a single iteration so the savings
525+
// are visible in absolute terms, not just per-op.
526+
b.Run(fmt.Sprintf("waiters=%d/heap-delta", waiters), func(b *testing.B) {
527+
for _, cached := range []bool{false, true} {
528+
label := "uncached"
529+
r := makeTestResult(numRows)
530+
if cached {
531+
label = "cached"
532+
r.CacheProto3Rows()
533+
}
534+
b.Run(label, func(b *testing.B) {
535+
b.ReportAllocs()
536+
for b.Loop() {
537+
runtime.GC()
538+
var before runtime.MemStats
539+
runtime.ReadMemStats(&before)
540+
541+
var wg sync.WaitGroup
542+
wg.Add(waiters)
543+
for range waiters {
544+
go func() {
545+
defer wg.Done()
546+
ResultToProto3(r)
547+
}()
548+
}
549+
wg.Wait()
550+
551+
var after runtime.MemStats
552+
runtime.ReadMemStats(&after)
553+
b.ReportMetric(float64(after.TotalAlloc-before.TotalAlloc), "heap-bytes/op")
554+
}
555+
})
556+
}
557+
})
558+
}
559+
}

go/sqltypes/result.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,12 @@ type Result struct {
3636
SessionStateChanges string `json:"session_state_changes"`
3737
StatusFlags uint16 `json:"status_flags"`
3838
Info string `json:"info"`
39+
40+
// proto3Rows caches the proto3-encoded representation of Rows, avoiding
41+
// redundant encoding when multiple consumers share the same Result (i.e.
42+
// query consolidation). Not populated for the non-consolidation flow;
43+
// don't depend on it being present.
44+
proto3Rows []*querypb.Row
3945
}
4046

4147
//goland:noinspection GoUnusedConst
@@ -76,6 +82,7 @@ type MultiResultStream interface {
7682
// Repair fixes the type info in the rows
7783
// to conform to the supplied field types.
7884
func (result *Result) Repair(fields []*querypb.Field) {
85+
result.proto3Rows = nil
7986
// Usage of j is intentional.
8087
for j, f := range fields {
8188
for _, r := range result.Rows {
@@ -119,6 +126,7 @@ func (result *Result) Copy() *Result {
119126
out.Rows = append(out.Rows, CopyRow(r))
120127
}
121128
}
129+
// proto3Rows is intentionally not propagated: callers may modify Rows
122130
return out
123131
}
124132

@@ -132,6 +140,15 @@ func (result *Result) ShallowCopy() *Result {
132140
Info: result.Info,
133141
SessionStateChanges: result.SessionStateChanges,
134142
Rows: result.Rows,
143+
// proto3Rows is intentionally not propagated: callers may modify Rows
144+
}
145+
}
146+
147+
func (result *Result) CacheProto3Rows() {
148+
// result can be nil when the query errors out (e.g. execDBConn returns
149+
// a nil Result alongside an error).
150+
if result != nil && len(result.Rows) > 0 {
151+
result.proto3Rows = RowsToProto3(result.Rows)
135152
}
136153
}
137154

@@ -339,6 +356,7 @@ func (result *Result) StripMetadata(incl querypb.ExecuteOptions_IncludedFields)
339356
// to another result.Note currently it doesn't handle cases like
340357
// if two results have different fields.We will enhance this function.
341358
func (result *Result) AppendResult(src *Result) {
359+
result.proto3Rows = nil
342360
result.RowsAffected += src.RowsAffected
343361
if src.InsertIDUpdated() {
344362
result.InsertID = src.InsertID

0 commit comments

Comments
 (0)