Skip to content

Commit ae0d6e9

Browse files
committed
VReplication: Add VTGate VStreamFlag to include journal events in the stream (vitessio#16737)
Signed-off-by: Malcolm Akinje <malcolm.akinje@gmail.com> Signed-off-by: Malcolm Akinje <makinje@slack-corp.com>
1 parent 15e1e38 commit ae0d6e9

6 files changed

Lines changed: 171 additions & 86 deletions

File tree

examples/local/vstream_client.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ func main() {
7575
flags := &vtgatepb.VStreamFlags{
7676
//MinimizeSkew: false,
7777
//HeartbeatInterval: 60, //seconds
78+
// IncludeReshardJournalEvents: true,
7879
}
7980
reader, err := conn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags)
8081
for {

go/test/endtoend/vreplication/vstream_test.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -601,7 +601,10 @@ func TestMultiVStreamsKeyspaceReshard(t *testing.T) {
601601
Match: "/customer.*",
602602
}},
603603
}
604-
flags := &vtgatepb.VStreamFlags{}
604+
flags := &vtgatepb.VStreamFlags{
605+
IncludeReshardJournalEvents: true,
606+
}
607+
journalEvents := 0
605608

606609
// Stream events but stop once we have a VGTID with positions for the old/original shards.
607610
var newVGTID *binlogdatapb.VGtid
@@ -683,6 +686,9 @@ func TestMultiVStreamsKeyspaceReshard(t *testing.T) {
683686
default:
684687
require.FailNow(t, fmt.Sprintf("received event for unexpected shard: %s", shard))
685688
}
689+
case binlogdatapb.VEventType_JOURNAL:
690+
require.True(t, ev.Journal.MigrationType == binlogdatapb.MigrationType_SHARDS)
691+
journalEvents++
686692
}
687693
}
688694
default:
@@ -697,8 +703,10 @@ func TestMultiVStreamsKeyspaceReshard(t *testing.T) {
697703
}()
698704

699705
// We should have a mix of events across the old and new shards.
700-
require.NotZero(t, oldShardRowEvents)
701-
require.NotZero(t, newShardRowEvents)
706+
require.Greater(t, oldShardRowEvents, 0)
707+
require.Greater(t, newShardRowEvents, 0)
708+
// We should have seen a reshard journal event.
709+
require.Greater(t, journalEvents, 0)
702710

703711
// The number of row events streamed by the VStream API should match the number of rows inserted.
704712
customerResult := execVtgateQuery(t, vtgateConn, ks, "select count(*) from customer")

go/vt/proto/vtgate/vtgate.pb.go

Lines changed: 70 additions & 57 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

go/vt/proto/vtgate/vtgate_vtproto.pb.go

Lines changed: 41 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)