Skip to content

Commit 98db3ce

Browse files
authored
fix!: retrieve transactions from the mempool async, ignore irelevant rounds, and don't start processing messages until switching from blocksync to consensus (#1695)
## Description this PR started as a simple fix, but then its scope grew significantly as we found more bugs and made the testnet work it: - makes retrieving the transactions from the mempool async during propagation. - ajusts priorities for gossip - stops automatically broadcasting transactions with CAT, instead only sends seentx (this stops peers from downloading transactions more than once) - minor optimizations around not verifying proofs that the node just generated. - NOTE: this PR reverts #1553 and #1582 cause those break CAT and we need a working mempool that doesn't re-gossip everything - ignores irelevant round - only processes compact blocks until the consensus reactor starts - a few other minor bug fixes
1 parent ba4c592 commit 98db3ce

34 files changed

Lines changed: 999 additions & 751 deletions

blockchain/v0/pool.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ const (
3232
maxTotalRequesters = 600
3333
maxPendingRequests = maxTotalRequesters
3434
maxPendingRequestsPerPeer = 20
35-
requestRetrySeconds = 30
35+
requestRetrySeconds = 45
3636

3737
// Minimum recv rate to ensure we're receiving blocks from a peer fast
3838
// enough. If a peer is not sending us data at least that rate, we
@@ -46,7 +46,7 @@ const (
4646
maxDiffBetweenCurrentAndReceivedBlockHeight = 100
4747
)
4848

49-
var peerTimeout = 15 * time.Second // not const so we can override with tests
49+
var peerTimeout = 120 * time.Second // not const so we can override with tests
5050

5151
/*
5252
Peers self report their heights when we join the block pool.

blockchain/v0/reactor.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ func (bcR *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor {
145145
return []*p2p.ChannelDescriptor{
146146
{
147147
ID: BlockchainChannel,
148-
Priority: 5,
148+
Priority: 30,
149149
SendQueueCapacity: 1000,
150150
RecvBufferCapacity: 50 * 4096,
151151
RecvMessageCapacity: bc.MaxMsgSize,

consensus/propagation/catchup.go

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
proptypes "github.com/tendermint/tendermint/consensus/propagation/types"
77
"github.com/tendermint/tendermint/libs/bits"
88
"github.com/tendermint/tendermint/p2p"
9+
"github.com/tendermint/tendermint/pkg/trace/schema"
910
protoprop "github.com/tendermint/tendermint/proto/tendermint/propagation"
1011
"github.com/tendermint/tendermint/types"
1112
)
@@ -14,13 +15,17 @@ import (
1415
//
1516
// todo: add a request limit for each part to avoid downloading the block too
1617
// many times. atm, this code will request the same part from every peer.
17-
func (blockProp *Reactor) retryWants(currentHeight int64, currentRound int32) {
18-
data := blockProp.dumpAll()
18+
func (blockProp *Reactor) retryWants(currentHeight int64) {
19+
if !blockProp.started.Load() {
20+
return
21+
}
22+
data := blockProp.unfinishedHeights()
1923
peers := blockProp.getPeers()
2024
for _, prop := range data {
2125
height, round := prop.compactBlock.Proposal.Height, prop.compactBlock.Proposal.Round
2226

23-
if height == currentHeight && round == currentRound {
27+
// don't re-request parts for any round on the current height
28+
if height == currentHeight {
2429
continue
2530
}
2631

@@ -29,17 +34,20 @@ func (blockProp *Reactor) retryWants(currentHeight int64, currentRound int32) {
2934
}
3035

3136
// only re-request original parts that are missing, not parity parts.
32-
missing := prop.block.BitArray().Not()
37+
missing := prop.block.MissingOriginal()
3338
if missing.IsEmpty() {
3439
blockProp.Logger.Error("no missing parts yet block is incomplete", "height", height, "round", round)
3540
continue
3641
}
3742

43+
schema.WriteRetries(blockProp.traceClient, height, round, missing.String())
44+
3845
// make requests from different peers
3946
peers = shuffle(peers)
4047

4148
for _, peer := range peers {
4249
mc := missing.Copy()
50+
4351
reqs, has := peer.GetRequests(height, round)
4452
if has {
4553
mc = mc.Sub(reqs)
@@ -52,7 +60,7 @@ func (blockProp *Reactor) retryWants(currentHeight int64, currentRound int32) {
5260
e := p2p.Envelope{
5361
ChannelID: WantChannel,
5462
Message: &protoprop.WantParts{
55-
Parts: *missing.ToProto(),
63+
Parts: *mc.ToProto(),
5664
Height: height,
5765
Round: round,
5866
Prove: true,
@@ -64,17 +72,23 @@ func (blockProp *Reactor) retryWants(currentHeight int64, currentRound int32) {
6472
continue
6573
}
6674

75+
schema.WriteCatchupRequest(blockProp.traceClient, height, round, mc.String(), string(peer.peer.ID()))
76+
6777
// keep track of which requests we've made this attempt.
68-
missing.Sub(mc)
78+
missing = missing.Sub(mc)
6979
peer.AddRequests(height, round, missing)
7080
}
7181
}
7282
}
7383

7484
func (blockProp *Reactor) AddCommitment(height int64, round int32, psh *types.PartSetHeader) {
85+
blockProp.Logger.Info("adding commitment", "height", height, "round", round, "psh", psh)
7586
blockProp.pmtx.Lock()
7687
defer blockProp.pmtx.Unlock()
7788

89+
blockProp.Logger.Info("added commitment", "height", height, "round", round)
90+
schema.WriteGap(blockProp.traceClient, height, round)
91+
7892
if blockProp.proposals[height] == nil {
7993
blockProp.proposals[height] = make(map[int32]*proposalData)
8094
}
Lines changed: 74 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -1,75 +1,76 @@
11
package propagation
22

3-
// TODO(rachid): fix test
4-
// func TestCatchup(t *testing.T) {
5-
// reactors, _ := testBlockPropReactors(3)
6-
// reactor1 := reactors[0]
7-
// reactor2 := reactors[1]
8-
// reactor3 := reactors[2]
9-
10-
// // setting the proposal for height 8 round 1
11-
// compactBlock := createCompactBlock(8, 1)
12-
// reactor1.AddProposal(compactBlock)
13-
// reactor2.AddProposal(compactBlock)
14-
// reactor3.AddProposal(compactBlock)
15-
16-
// // setting the proposal for height 9 round 0
17-
// compactBlock = createCompactBlock(9, 1)
18-
// reactor1.AddProposal(compactBlock)
19-
// reactor2.AddProposal(compactBlock)
20-
// reactor3.AddProposal(compactBlock)
21-
22-
// // setting the proposal for height 10 round 0
23-
// compactBlock = createCompactBlock(10, 0)
24-
// reactor1.AddProposal(compactBlock)
25-
// reactor2.AddProposal(compactBlock)
26-
// reactor3.AddProposal(compactBlock)
27-
28-
// // setting the proposal for height 10 round 1
29-
// compactBlock = createCompactBlock(10, 1)
30-
// reactor1.AddProposal(compactBlock)
31-
// reactor2.AddProposal(compactBlock)
32-
// reactor3.AddProposal(compactBlock)
33-
34-
// // setting the first reactor current height and round
35-
// reactor1.currentHeight = 8
36-
// reactor1.currentRound = 0
37-
38-
// // handle the compact block
39-
// reactor1.handleCompactBlock(compactBlock, reactor1.self)
40-
41-
// time.Sleep(200 * time.Millisecond)
42-
43-
// // check if reactor 1 sent wants to all the connected peers
44-
// _, has := reactor2.getPeer(reactor1.self).GetWants(9, 1)
45-
// require.True(t, has)
46-
47-
// _, has = reactor2.getPeer(reactor1.self).GetWants(10, 0)
48-
// require.True(t, has)
49-
50-
// _, has = reactor3.getPeer(reactor1.self).GetWants(9, 1)
51-
// require.True(t, has)
52-
53-
// _, has = reactor3.getPeer(reactor1.self).GetWants(10, 0)
54-
// require.True(t, has)
55-
// }
56-
57-
// func createCompactBlock(height int64, round int32) *proptypes.CompactBlock {
58-
// return &proptypes.CompactBlock{
59-
// BpHash: cmtrand.Bytes(32),
60-
// Signature: cmtrand.Bytes(64),
61-
// LastLen: 0,
62-
// Blobs: []proptypes.TxMetaData{
63-
// {Hash: cmtrand.Bytes(32)},
64-
// {Hash: cmtrand.Bytes(32)},
65-
// },
66-
// Proposal: types.Proposal{
67-
// BlockID: types.BlockID{
68-
// Hash: nil,
69-
// PartSetHeader: types.PartSetHeader{Total: 30},
70-
// },
71-
// Height: height,
72-
// Round: round,
73-
// },
74-
// }
75-
// }
3+
import (
4+
"testing"
5+
"time"
6+
7+
"github.com/stretchr/testify/require"
8+
cfg "github.com/tendermint/tendermint/config"
9+
proptypes "github.com/tendermint/tendermint/consensus/propagation/types"
10+
cmtrand "github.com/tendermint/tendermint/libs/rand"
11+
"github.com/tendermint/tendermint/state"
12+
"github.com/tendermint/tendermint/types"
13+
)
14+
15+
func TestGapCatchup(t *testing.T) {
16+
p2pCfg := cfg.DefaultP2PConfig()
17+
p2pCfg.SendRate = 100000000
18+
p2pCfg.RecvRate = 100000000
19+
nodes := 2
20+
21+
reactors, _ := createTestReactors(nodes, p2pCfg, false, "/home/evan/data/experiments/celestia/fast-recovery/debug")
22+
cleanup, _, sm := state.SetupTestCase(t)
23+
t.Cleanup(func() {
24+
cleanup(t)
25+
})
26+
27+
prop, ps, _, metaData := createTestProposal(sm, 1, 2, 1000000)
28+
29+
// set the commitment and the data on the first node so that it can respond
30+
// to the catchup request
31+
n1 := reactors[0]
32+
parityBlock, lastLen, err := types.Encode(ps, types.BlockPartSizeBytes)
33+
require.NoError(t, err)
34+
35+
partHashes := extractHashes(ps, parityBlock)
36+
proofs := extractProofs(ps, parityBlock)
37+
38+
cb := &proptypes.CompactBlock{
39+
Proposal: *prop,
40+
LastLen: uint32(lastLen),
41+
Signature: cmtrand.Bytes(64), // todo: sign the proposal with a real signature
42+
BpHash: parityBlock.Hash(),
43+
Blobs: metaData,
44+
PartsHashes: partHashes,
45+
}
46+
47+
cb.SetProofCache(proofs)
48+
49+
added := n1.AddProposal(cb)
50+
require.True(t, added)
51+
52+
_, parts, _, has := n1.getAllState(prop.Height, prop.Round, true)
53+
require.True(t, has)
54+
55+
parts.SetProposalData(ps, parityBlock)
56+
57+
// add the partset header to the second node and trigger the call to retry
58+
// wants
59+
n2 := reactors[1]
60+
61+
_, _, has = n2.GetProposal(prop.Height, prop.Round)
62+
require.False(t, has)
63+
64+
psh := ps.Header()
65+
n2.AddCommitment(prop.Height, prop.Round, &psh)
66+
67+
// this call simulates getting a commitment for a proposal of a higher
68+
// height
69+
n2.retryWants(2)
70+
71+
time.Sleep(800 * time.Millisecond)
72+
73+
_, caughtUp, has := n2.GetProposal(prop.Height, prop.Round)
74+
require.True(t, has)
75+
require.True(t, caughtUp.IsComplete())
76+
}

0 commit comments

Comments
 (0)