Skip to content

Commit 413f6cc

Browse files
tanjinxmattlord
authored andcommitted
VStream: Allow for automatic resume after Reshard across VStreams (vitessio#15393) (#627)
Signed-off-by: Tanjin Xu <tanjin.xu@slack-corp.com> Co-authored-by: Matt Lord <mattalord@gmail.com>
1 parent 6715b78 commit 413f6cc

6 files changed

Lines changed: 330 additions & 30 deletions

File tree

go/test/endtoend/vreplication/vreplication_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1431,7 +1431,7 @@ func reshardAction(t *testing.T, action, workflow, keyspaceName, sourceShards, t
14311431
action, workflow, output)
14321432
}
14331433
if err != nil {
1434-
t.Fatalf("Reshard %s command failed with %+v\n", action, err)
1434+
t.Fatalf("Reshard %s command failed with %+v\nOutput: %s", action, err, output)
14351435
}
14361436
}
14371437

go/test/endtoend/vreplication/vstream_test.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ func insertRow(keyspace, table string, id int) {
226226
vtgateConn.ExecuteFetch("begin", 1000, false)
227227
_, err := vtgateConn.ExecuteFetch(fmt.Sprintf("insert into %s (name) values ('%s%d')", table, table, id), 1000, false)
228228
if err != nil {
229-
log.Infof("error inserting row %d: %v", id, err)
229+
log.Errorf("error inserting row %d: %v", id, err)
230230
}
231231
vtgateConn.ExecuteFetch("commit", 1000, false)
232232
}
@@ -390,13 +390,15 @@ func testVStreamCopyMultiKeyspaceReshard(t *testing.T, baseTabletID int) numEven
390390
defer vc.TearDown()
391391

392392
defaultCell := vc.Cells[vc.CellNames[0]]
393-
vc.AddKeyspace(t, []*Cell{defaultCell}, "unsharded", "0", vschemaUnsharded, schemaUnsharded, defaultReplicas, defaultRdonly, baseTabletID+100, nil)
393+
_, err := vc.AddKeyspace(t, []*Cell{defaultCell}, "unsharded", "0", vschemaUnsharded, schemaUnsharded, defaultReplicas, defaultRdonly, baseTabletID+100, nil)
394+
require.NoError(t, err)
394395

395396
vtgateConn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort)
396397
defer vtgateConn.Close()
397398
verifyClusterHealth(t, vc)
398399

399-
vc.AddKeyspace(t, []*Cell{defaultCell}, "sharded", "-80,80-", vschemaSharded, schemaSharded, defaultReplicas, defaultRdonly, baseTabletID+200, nil)
400+
_, err = vc.AddKeyspace(t, []*Cell{defaultCell}, "sharded", "-80,80-", vschemaSharded, schemaSharded, defaultReplicas, defaultRdonly, baseTabletID+200, nil)
401+
require.NoError(t, err)
400402

401403
ctx := context.Background()
402404
vstreamConn, err := vtgateconn.Dial(ctx, fmt.Sprintf("%s:%d", vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateGrpcPort))

go/vt/topo/faketopo/faketopo.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,11 @@ import (
2121
"strings"
2222
"sync"
2323

24+
"vitess.io/vitess/go/vt/log"
25+
"vitess.io/vitess/go/vt/topo"
2426
"vitess.io/vitess/go/vt/topo/memorytopo"
2527

26-
"vitess.io/vitess/go/vt/log"
2728
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
28-
29-
"vitess.io/vitess/go/vt/topo"
3029
)
3130

3231
// FakeFactory implements the Factory interface. This is supposed to be used only for testing

go/vt/vtgate/vstream_manager.go

Lines changed: 29 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ import (
2626
"sync"
2727
"time"
2828

29+
"golang.org/x/exp/maps"
30+
2931
"vitess.io/vitess/go/stats"
3032
"vitess.io/vitess/go/vt/discovery"
3133
"vitess.io/vitess/go/vt/key"
@@ -546,25 +548,42 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
546548
var err error
547549
cells := vs.getCells()
548550

549-
tp, err := discovery.NewTabletPicker(ctx, vs.ts, cells, vs.vsm.cell, sgtid.Keyspace, sgtid.Shard, vs.tabletType.String(), vs.tabletPickerOptions, ignoreTablets...)
551+
tpo := vs.tabletPickerOptions
552+
resharded, err := vs.keyspaceHasBeenResharded(ctx, sgtid.Keyspace)
550553
if err != nil {
551-
log.Errorf(err.Error())
552-
return err
554+
return vterrors.Wrapf(err, "failed to determine if keyspace %s has been resharded", sgtid.Keyspace)
555+
}
556+
if resharded {
557+
// The non-serving tablet in the old / non-serving shard will contain all of
558+
// the GTIDs that we need before transitioning to the new shards along with
559+
// the journal event that will then allow us to automatically transition to
560+
// the new shards (provided the stop_on_reshard option is not set).
561+
tpo.IncludeNonServingTablets = true
553562
}
554563

564+
tabletPickerErr := func(err error) error {
565+
tperr := vterrors.Wrapf(err, "failed to find a %s tablet for VStream in %s/%s within the %s cell(s)",
566+
vs.tabletType.String(), sgtid.GetKeyspace(), sgtid.GetShard(), strings.Join(cells, ","))
567+
log.Errorf("%v", tperr)
568+
return tperr
569+
}
570+
tp, err := discovery.NewTabletPicker(ctx, vs.ts, cells, vs.vsm.cell, sgtid.GetKeyspace(), sgtid.GetShard(), vs.tabletType.String(), tpo, ignoreTablets...)
571+
if err != nil {
572+
return tabletPickerErr(err)
573+
}
555574
// Create a child context with a stricter timeout when picking a tablet.
556575
// This will prevent hanging in the case no tablets are found.
557576
tpCtx, tpCancel := context.WithTimeout(ctx, tabletPickerContextTimeout)
558577
defer tpCancel()
559-
560578
tablet, err := tp.PickForStreaming(tpCtx)
561579
if err != nil {
562-
log.Errorf(err.Error())
563-
return err
580+
return tabletPickerErr(err)
564581
}
582+
565583
tabletAliasString := topoproto.TabletAliasString(tablet.Alias)
566-
log.Infof("Picked %s tablet %s for VStream in %s/%s within the %s cell(s)",
567-
vs.tabletType.String(), tabletAliasString, sgtid.GetKeyspace(), sgtid.GetShard(), strings.Join(cells, ","))
584+
585+
log.Infof("Picked a %s tablet for VStream in %s/%s within the %s cell(s)",
586+
vs.tabletType.String(), sgtid.GetKeyspace(), sgtid.GetShard(), strings.Join(cells, ","))
568587

569588
target := &querypb.Target{
570589
Keyspace: sgtid.Keyspace,
@@ -828,7 +847,7 @@ func (vs *vstream) sendAll(ctx context.Context, sgtid *binlogdatapb.ShardGtid, e
828847
if err := vs.getError(); err != nil {
829848
return err
830849
}
831-
// convert all gtids to vgtids. This should be done here while holding the lock.
850+
// Convert all gtids to vgtids. This should be done here while holding the lock.
832851
for j, event := range events {
833852
if event.Type == binlogdatapb.VEventType_GTID {
834853
// Update the VGtid and send that instead.
@@ -1054,16 +1073,7 @@ func (vs *vstream) keyspaceHasBeenResharded(ctx context.Context, keyspace string
10541073
// Now that we know there MAY have been an applicable reshard, let's make a
10551074
// definitive determination by looking at the shard keyranges.
10561075
// All we care about are the shard info records now.
1057-
1058-
// Added in backport of d916e81 on 1/16/2025
1059-
sis := func(shardMap map[string]*topo.ShardInfo) []*topo.ShardInfo {
1060-
ret := []*topo.ShardInfo{}
1061-
for _, shardInfo := range shardMap {
1062-
ret = append(ret, shardInfo)
1063-
}
1064-
1065-
return ret
1066-
}(shards)
1076+
sis := maps.Values(shards)
10671077

10681078
for i := range sis {
10691079
for j := range sis {

0 commit comments

Comments
 (0)