Skip to content

Commit 0f83e76

Browse files
committed
Backport proto3 row caching for query consolidator to slack-19.0
Backport of vitessio#19872, which was already cherry-picked to slack-22.0 in #846. When multiple queries are consolidated, the original query now pre-computes and caches the proto3 row encoding so that N waiters share the encoded rows instead of each redundantly calling RowsToProto3. Gated behind --consolidator-cache-proto3-rows (default false). Key v19-specific adaptations: - No InsertIDChanged field (v22-only) - AppendResult uses src.InsertID != 0 (not InsertIDUpdated()) - Benchmarks use for/b.N loops (Go 1.22, no b.Loop()) - Sizegen format: no blank lines between functions Co-Authored-By: Claude <svc-devxp-claude@slack-corp.com> AI disclosure: Claude Code assisted with development. Every line of code was either written by or carefully reviewed by me :)
1 parent dd41012 commit 0f83e76

13 files changed

Lines changed: 360 additions & 28 deletions

File tree

go/flags/endtoend/vtcombo.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ Flags:
5151
--config-path strings Paths to search for config files in. (default [{{ .Workdir }}])
5252
--config-persistence-min-interval duration minimum interval between persisting dynamic config changes back to disk (if no change has occurred, nothing is done). (default 1s)
5353
--config-type string Config file type (omit to infer config type from file extension).
54+
--consolidator-cache-proto3-rows Cache proto3-encoded rows in consolidated results to avoid redundant encoding by waiting queries.
5455
--consolidator-query-waiter-cap int Configure the maximum number of clients allowed to wait on the consolidator.
5556
--consolidator-query-waiter-cap-method string Configure the method when consolidator waiter cap is exceeded. Options: fallthrough, reject. (default "fallthrough")
5657
--consolidator-stream-query-size int Configure the stream consolidator query size in bytes. Setting to 0 disables the stream consolidator. (default 2097152)

go/flags/endtoend/vttablet.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ Flags:
8585
--config-path strings Paths to search for config files in. (default [{{ .Workdir }}])
8686
--config-persistence-min-interval duration minimum interval between persisting dynamic config changes back to disk (if no change has occurred, nothing is done). (default 1s)
8787
--config-type string Config file type (omit to infer config type from file extension).
88+
--consolidator-cache-proto3-rows Cache proto3-encoded rows in consolidated results to avoid redundant encoding by waiting queries.
8889
--consolidator-query-waiter-cap int Configure the maximum number of clients allowed to wait on the consolidator.
8990
--consolidator-query-waiter-cap-method string Configure the method when consolidator waiter cap is exceeded. Options: fallthrough, reject. (default "fallthrough")
9091
--consolidator-stream-query-size int Configure the stream consolidator query size in bytes. Setting to 0 disables the stream consolidator. (default 2097152)

go/sqltypes/cached_size.go

Lines changed: 8 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

go/sqltypes/proto3.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,16 +94,24 @@ func proto3ToRows(fields []*querypb.Field, rows []*querypb.Row) [][]Value {
9494
return result
9595
}
9696

97-
// ResultToProto3 converts Result to proto3.
9897
func ResultToProto3(qr *Result) *querypb.QueryResult {
9998
if qr == nil {
10099
return nil
101100
}
101+
// This read is susceptible to TOCTOU if proto3Rows is populated
102+
// concurrently, but the worst case is a redundant RowsToProto3 call.
103+
// In the consolidation path this can't happen today (the leader sets
104+
// the cached value before releasing the RWMutex), but the fallback is
105+
// harmless.
106+
rows := qr.proto3Rows
107+
if rows == nil {
108+
rows = RowsToProto3(qr.Rows)
109+
}
102110
return &querypb.QueryResult{
103111
Fields: qr.Fields,
104112
RowsAffected: qr.RowsAffected,
105113
InsertId: qr.InsertID,
106-
Rows: RowsToProto3(qr.Rows),
114+
Rows: rows,
107115
Info: qr.Info,
108116
SessionStateChanges: qr.SessionStateChanges,
109117
}

go/sqltypes/proto3_test.go

Lines changed: 227 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@ limitations under the License.
1717
package sqltypes
1818

1919
import (
20+
"fmt"
21+
"runtime"
22+
"strconv"
23+
"sync"
2024
"testing"
2125

2226
"github.com/stretchr/testify/require"
@@ -320,3 +324,226 @@ func TestProto3ValuesEqual(t *testing.T) {
320324
require.Equal(t, tc.expected, Proto3ValuesEqual(tc.v1, tc.v2))
321325
}
322326
}
327+
328+
func TestResultToProto3_CachedRows(t *testing.T) {
329+
fields := []*querypb.Field{{
330+
Name: "col1",
331+
Type: VarChar,
332+
}, {
333+
Name: "col2",
334+
Type: Int64,
335+
}}
336+
result := &Result{
337+
Fields: fields,
338+
RowsAffected: 2,
339+
Rows: [][]Value{{
340+
TestValue(VarChar, "aa"),
341+
TestValue(Int64, "1"),
342+
}, {
343+
TestValue(VarChar, "bb"),
344+
TestValue(Int64, "2"),
345+
}},
346+
}
347+
348+
uncached := ResultToProto3(result)
349+
require.Len(t, uncached.Rows, 2)
350+
351+
result.CacheProto3Rows()
352+
353+
cached := ResultToProto3(result)
354+
require.True(t, proto.Equal(uncached, cached), "cached and uncached proto3 results differ")
355+
356+
require.Same(t, cached.Rows[0], result.proto3Rows[0])
357+
require.Same(t, cached.Rows[1], result.proto3Rows[1])
358+
}
359+
360+
func TestResultToProto3_NilAndEmptyCache(t *testing.T) {
361+
require.Nil(t, ResultToProto3(nil))
362+
var nilResult *Result
363+
nilResult.CacheProto3Rows() // does not panic
364+
365+
result := &Result{
366+
Fields: []*querypb.Field{{Name: "col1", Type: VarChar}},
367+
}
368+
result.CacheProto3Rows()
369+
require.Nil(t, result.proto3Rows)
370+
371+
p3 := ResultToProto3(result)
372+
require.NotNil(t, p3)
373+
require.Empty(t, p3.Rows)
374+
}
375+
376+
func TestCopy_DoesNotPropagateProto3RowCache(t *testing.T) {
377+
result := &Result{
378+
Fields: []*querypb.Field{{Name: "col1", Type: Int64}},
379+
Rows: [][]Value{{
380+
TestValue(Int64, "42"),
381+
}},
382+
}
383+
result.CacheProto3Rows()
384+
require.NotNil(t, result.proto3Rows)
385+
386+
require.Nil(t, result.ShallowCopy().proto3Rows)
387+
require.Nil(t, result.Copy().proto3Rows)
388+
}
389+
390+
func TestMutations_InvalidateCachedProto3Rows(t *testing.T) {
391+
resultWithProto3FieldPopulated := func(t *testing.T) *Result {
392+
t.Helper()
393+
result := &Result{
394+
Fields: []*querypb.Field{{Name: "col1", Type: VarChar}},
395+
Rows: [][]Value{{
396+
TestValue(VarChar, "hello"),
397+
}},
398+
}
399+
result.CacheProto3Rows()
400+
require.NotNil(t, result.proto3Rows)
401+
return result
402+
}
403+
404+
t.Run("AppendResult", func(t *testing.T) {
405+
result := resultWithProto3FieldPopulated(t)
406+
result.AppendResult(&Result{
407+
Rows: [][]Value{{
408+
TestValue(VarChar, "world"),
409+
}},
410+
})
411+
require.Nil(t, result.proto3Rows, "AppendResult must invalidate proto3Rows cache")
412+
413+
p3 := ResultToProto3(result)
414+
require.Len(t, p3.Rows, 2)
415+
})
416+
417+
t.Run("Repair", func(t *testing.T) {
418+
result := resultWithProto3FieldPopulated(t)
419+
result.Repair([]*querypb.Field{{Name: "col1", Type: VarBinary}})
420+
require.Nil(t, result.proto3Rows, "Repair must invalidate proto3Rows cache")
421+
422+
p3 := ResultToProto3(result)
423+
require.Len(t, p3.Rows, 1)
424+
})
425+
}
426+
427+
// makeTestResult builds a Result with numRows rows, each containing 5 columns
428+
// of mixed types. Row values are deterministic so benchmarks are reproducible.
429+
func makeTestResult(numRows int) *Result {
430+
fields := MakeTestFields(
431+
"col1|col2|col3|col4|col5",
432+
"int64|varchar|varchar|float64|int64",
433+
)
434+
rows := make([][]Value, numRows)
435+
for i := range rows {
436+
rows[i] = []Value{
437+
TestValue(Int64, strconv.Itoa(i)),
438+
TestValue(VarChar, fmt.Sprintf("val-%06d", i)),
439+
TestValue(VarChar, fmt.Sprintf("val-%06d-longervalue", i)),
440+
TestValue(Float64, fmt.Sprintf("%d.%02d", i/100, i%100)),
441+
TestValue(Int64, strconv.Itoa(i%2)),
442+
}
443+
}
444+
return &Result{
445+
Fields: fields,
446+
RowsAffected: uint64(numRows),
447+
Rows: rows,
448+
}
449+
}
450+
451+
// BenchmarkResultToProto3 measures per-call allocation cost of ResultToProto3
452+
// with and without CacheProto3Rows, across different result sizes.
453+
func BenchmarkResultToProto3(b *testing.B) {
454+
for _, numRows := range []int{100, 1000, 10000} {
455+
result := makeTestResult(numRows)
456+
457+
b.Run(fmt.Sprintf("rows=%d/uncached", numRows), func(b *testing.B) {
458+
b.ReportAllocs()
459+
for i := 0; i < b.N; i++ {
460+
ResultToProto3(result)
461+
}
462+
})
463+
464+
b.Run(fmt.Sprintf("rows=%d/cached", numRows), func(b *testing.B) {
465+
result.CacheProto3Rows()
466+
b.ReportAllocs()
467+
for i := 0; i < b.N; i++ {
468+
ResultToProto3(result)
469+
}
470+
})
471+
}
472+
}
473+
474+
// BenchmarkConsolidationFanOut simulates the consolidator scenario: one shared
475+
// Result is read by N concurrent goroutines calling ResultToProto3. Reports
476+
// total heap bytes allocated across all waiters.
477+
func BenchmarkConsolidationFanOut(b *testing.B) {
478+
const numRows = 10000
479+
480+
for _, waiters := range []int{1, 10, 50} {
481+
result := makeTestResult(numRows)
482+
483+
b.Run(fmt.Sprintf("waiters=%d/uncached", waiters), func(b *testing.B) {
484+
b.ReportAllocs()
485+
for i := 0; i < b.N; i++ {
486+
var wg sync.WaitGroup
487+
wg.Add(waiters)
488+
for j := 0; j < waiters; j++ {
489+
go func() {
490+
defer wg.Done()
491+
ResultToProto3(result)
492+
}()
493+
}
494+
wg.Wait()
495+
}
496+
})
497+
498+
b.Run(fmt.Sprintf("waiters=%d/cached", waiters), func(b *testing.B) {
499+
result.CacheProto3Rows()
500+
b.ReportAllocs()
501+
for i := 0; i < b.N; i++ {
502+
var wg sync.WaitGroup
503+
wg.Add(waiters)
504+
for j := 0; j < waiters; j++ {
505+
go func() {
506+
defer wg.Done()
507+
ResultToProto3(result)
508+
}()
509+
}
510+
wg.Wait()
511+
}
512+
})
513+
514+
// Report aggregate heap delta for a single iteration so the savings
515+
// are visible in absolute terms, not just per-op.
516+
b.Run(fmt.Sprintf("waiters=%d/heap-delta", waiters), func(b *testing.B) {
517+
for _, cached := range []bool{false, true} {
518+
label := "uncached"
519+
r := makeTestResult(numRows)
520+
if cached {
521+
label = "cached"
522+
r.CacheProto3Rows()
523+
}
524+
b.Run(label, func(b *testing.B) {
525+
b.ReportAllocs()
526+
for i := 0; i < b.N; i++ {
527+
runtime.GC()
528+
var before runtime.MemStats
529+
runtime.ReadMemStats(&before)
530+
531+
var wg sync.WaitGroup
532+
wg.Add(waiters)
533+
for j := 0; j < waiters; j++ {
534+
go func() {
535+
defer wg.Done()
536+
ResultToProto3(r)
537+
}()
538+
}
539+
wg.Wait()
540+
541+
var after runtime.MemStats
542+
runtime.ReadMemStats(&after)
543+
b.ReportMetric(float64(after.TotalAlloc-before.TotalAlloc), "heap-bytes/op")
544+
}
545+
})
546+
}
547+
})
548+
}
549+
}

go/sqltypes/result.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,12 @@ type Result struct {
3535
SessionStateChanges string `json:"session_state_changes"`
3636
StatusFlags uint16 `json:"status_flags"`
3737
Info string `json:"info"`
38+
39+
// proto3Rows caches the proto3-encoded representation of Rows, avoiding
40+
// redundant encoding when multiple consumers share the same Result (i.e.
41+
// query consolidation). Not populated for the non-consolidation flow;
42+
// don't depend on it being present.
43+
proto3Rows []*querypb.Row
3844
}
3945

4046
//goland:noinspection GoUnusedConst
@@ -66,6 +72,7 @@ type ResultStream interface {
6672
// Repair fixes the type info in the rows
6773
// to conform to the supplied field types.
6874
func (result *Result) Repair(fields []*querypb.Field) {
75+
result.proto3Rows = nil
6976
// Usage of j is intentional.
7077
for j, f := range fields {
7178
for _, r := range result.Rows {
@@ -108,6 +115,7 @@ func (result *Result) Copy() *Result {
108115
out.Rows = append(out.Rows, CopyRow(r))
109116
}
110117
}
118+
// proto3Rows is intentionally not propagated: callers may modify Rows
111119
return out
112120
}
113121

@@ -120,6 +128,15 @@ func (result *Result) ShallowCopy() *Result {
120128
Info: result.Info,
121129
SessionStateChanges: result.SessionStateChanges,
122130
Rows: result.Rows,
131+
// proto3Rows is intentionally not propagated: callers may modify Rows
132+
}
133+
}
134+
135+
func (result *Result) CacheProto3Rows() {
136+
// result can be nil when the query errors out (e.g. execDBConn returns
137+
// a nil Result alongside an error).
138+
if result != nil && len(result.Rows) > 0 {
139+
result.proto3Rows = RowsToProto3(result.Rows)
123140
}
124141
}
125142

@@ -324,6 +341,7 @@ func (result *Result) StripMetadata(incl querypb.ExecuteOptions_IncludedFields)
324341
// to another result.Note currently it doesn't handle cases like
325342
// if two results have different fields.We will enhance this function.
326343
func (result *Result) AppendResult(src *Result) {
344+
result.proto3Rows = nil
327345
if src.RowsAffected == 0 && len(src.Rows) == 0 && len(src.Fields) == 0 {
328346
return
329347
}

0 commit comments

Comments
 (0)