Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
dc2f928
Revert "fix: mempool locking mechanism in v1 and cat (#1582)"
evan-forbes Mar 24, 2025
b043c0a
Revert "fix: remove go routines for RecheckTx (#1553)"
evan-forbes Mar 24, 2025
7df519a
hack: remove
evan-forbes Mar 24, 2025
491df09
hack: random optimizations
evan-forbes Mar 25, 2025
cdb4268
fix: async retrieve transactions from the mempool
evan-forbes Mar 26, 2025
82a8868
Merge branch 'feature/recovery' into evan/recovery/debug-mamo
evan-forbes Mar 26, 2025
14bacd7
chore: cleanup
evan-forbes Mar 26, 2025
ee6dffd
chore: minor cleanup
evan-forbes Mar 27, 2025
5725c16
fix: only decode once
evan-forbes Mar 27, 2025
7e485d6
fix!: catchup
evan-forbes Mar 28, 2025
56d85cd
chore: clean up
evan-forbes Mar 29, 2025
1a59bc3
chore: modify tracing for recovered parts
evan-forbes Mar 29, 2025
5f6d9eb
fix: non-gap catchup only request once
evan-forbes Mar 30, 2025
44c4679
feat: broadcast haves after recoverying txs from the mempool
evan-forbes Mar 30, 2025
59ae259
fix!: only gossip data from the latest round except during catchup
evan-forbes Mar 31, 2025
8f03228
fix: don't download block parts messages until switching from blocksy…
evan-forbes Mar 31, 2025
5dc0034
chore!: move tracing tables to the schema package
evan-forbes Mar 31, 2025
f3cc92a
chore: linter
evan-forbes Mar 31, 2025
2b51b41
chore: linter
evan-forbes Mar 31, 2025
9fc18b1
fix: weird nil
evan-forbes Mar 31, 2025
c475011
fix: overflow
evan-forbes Mar 31, 2025
5634e6b
chore: fix tests
evan-forbes Mar 31, 2025
0e38447
chore: linter
evan-forbes Mar 31, 2025
c82a179
feat: force pebbleDB for the blockstore only
evan-forbes Mar 31, 2025
f61fe49
fix: use the db dir
evan-forbes Mar 31, 2025
4ea2f23
docs: comment
evan-forbes Apr 2, 2025
c673402
fix: data race
evan-forbes Apr 2, 2025
2d3c9eb
hack: disable mempool broadcast test
evan-forbes Apr 2, 2025
4f57ce2
chore: linter
evan-forbes Apr 2, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions blockchain/v0/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ const (
maxTotalRequesters = 600
maxPendingRequests = maxTotalRequesters
maxPendingRequestsPerPeer = 20
requestRetrySeconds = 30
requestRetrySeconds = 45

// Minimum recv rate to ensure we're receiving blocks from a peer fast
// enough. If a peer is not sending us data at least that rate, we
Expand All @@ -46,7 +46,7 @@ const (
maxDiffBetweenCurrentAndReceivedBlockHeight = 100
)

var peerTimeout = 15 * time.Second // not const so we can override with tests
var peerTimeout = 120 * time.Second // not const so we can override with tests

/*
Peers self report their heights when we join the block pool.
Expand Down
2 changes: 1 addition & 1 deletion blockchain/v0/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (bcR *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor {
return []*p2p.ChannelDescriptor{
{
ID: BlockchainChannel,
Priority: 5,
Priority: 30,
SendQueueCapacity: 1000,
RecvBufferCapacity: 50 * 4096,
RecvMessageCapacity: bc.MaxMsgSize,
Expand Down
8 changes: 6 additions & 2 deletions consensus/propagation/catchup.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (blockProp *Reactor) retryWants(currentHeight int64, currentRound int32) {
}

// only re-request original parts that are missing, not parity parts.
missing := prop.block.BitArray().Not()
missing := prop.block.MissingOriginal()
if missing.IsEmpty() {
blockProp.Logger.Error("no missing parts yet block is incomplete", "height", height, "round", round)
continue
Expand All @@ -40,6 +40,7 @@ func (blockProp *Reactor) retryWants(currentHeight int64, currentRound int32) {

for _, peer := range peers {
mc := missing.Copy()

reqs, has := peer.GetRequests(height, round)
if has {
mc = mc.Sub(reqs)
Expand All @@ -52,7 +53,7 @@ func (blockProp *Reactor) retryWants(currentHeight int64, currentRound int32) {
e := p2p.Envelope{
ChannelID: WantChannel,
Message: &protoprop.WantParts{
Parts: *missing.ToProto(),
Parts: *mc.ToProto(),
Height: height,
Round: round,
Prove: true,
Expand All @@ -72,9 +73,12 @@ func (blockProp *Reactor) retryWants(currentHeight int64, currentRound int32) {
}

func (blockProp *Reactor) AddCommitment(height int64, round int32, psh *types.PartSetHeader) {
blockProp.Logger.Info("adding commitment", "height", height, "round", round, "psh", psh)
blockProp.pmtx.Lock()
defer blockProp.pmtx.Unlock()

blockProp.Logger.Info("added commitment", "height", height, "round", round)

if blockProp.proposals[height] == nil {
blockProp.proposals[height] = make(map[int32]*proposalData)
}
Expand Down
147 changes: 74 additions & 73 deletions consensus/propagation/catchup_test.go
Original file line number Diff line number Diff line change
@@ -1,75 +1,76 @@
package propagation

// TODO(rachid): fix test
// func TestCatchup(t *testing.T) {
// reactors, _ := testBlockPropReactors(3)
// reactor1 := reactors[0]
// reactor2 := reactors[1]
// reactor3 := reactors[2]

// // setting the proposal for height 8 round 1
// compactBlock := createCompactBlock(8, 1)
// reactor1.AddProposal(compactBlock)
// reactor2.AddProposal(compactBlock)
// reactor3.AddProposal(compactBlock)

// // setting the proposal for height 9 round 0
// compactBlock = createCompactBlock(9, 1)
// reactor1.AddProposal(compactBlock)
// reactor2.AddProposal(compactBlock)
// reactor3.AddProposal(compactBlock)

// // setting the proposal for height 10 round 0
// compactBlock = createCompactBlock(10, 0)
// reactor1.AddProposal(compactBlock)
// reactor2.AddProposal(compactBlock)
// reactor3.AddProposal(compactBlock)

// // setting the proposal for height 10 round 1
// compactBlock = createCompactBlock(10, 1)
// reactor1.AddProposal(compactBlock)
// reactor2.AddProposal(compactBlock)
// reactor3.AddProposal(compactBlock)

// // setting the first reactor current height and round
// reactor1.currentHeight = 8
// reactor1.currentRound = 0

// // handle the compact block
// reactor1.handleCompactBlock(compactBlock, reactor1.self)

// time.Sleep(200 * time.Millisecond)

// // check if reactor 1 sent wants to all the connected peers
// _, has := reactor2.getPeer(reactor1.self).GetWants(9, 1)
// require.True(t, has)

// _, has = reactor2.getPeer(reactor1.self).GetWants(10, 0)
// require.True(t, has)

// _, has = reactor3.getPeer(reactor1.self).GetWants(9, 1)
// require.True(t, has)

// _, has = reactor3.getPeer(reactor1.self).GetWants(10, 0)
// require.True(t, has)
// }

// func createCompactBlock(height int64, round int32) *proptypes.CompactBlock {
// return &proptypes.CompactBlock{
// BpHash: cmtrand.Bytes(32),
// Signature: cmtrand.Bytes(64),
// LastLen: 0,
// Blobs: []proptypes.TxMetaData{
// {Hash: cmtrand.Bytes(32)},
// {Hash: cmtrand.Bytes(32)},
// },
// Proposal: types.Proposal{
// BlockID: types.BlockID{
// Hash: nil,
// PartSetHeader: types.PartSetHeader{Total: 30},
// },
// Height: height,
// Round: round,
// },
// }
// }
import (
"testing"
"time"

"github.com/stretchr/testify/require"
cfg "github.com/tendermint/tendermint/config"
proptypes "github.com/tendermint/tendermint/consensus/propagation/types"
cmtrand "github.com/tendermint/tendermint/libs/rand"
"github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types"
)

func TestGapCatchup(t *testing.T) {
p2pCfg := cfg.DefaultP2PConfig()
p2pCfg.SendRate = 100000000
p2pCfg.RecvRate = 100000000
nodes := 2

reactors, _ := createTestReactors(nodes, p2pCfg, false, "/home/evan/data/experiments/celestia/fast-recovery/debug")
cleanup, _, sm := state.SetupTestCase(t)
t.Cleanup(func() {
cleanup(t)
})

prop, ps, _, metaData := createTestProposal(sm, 1, 2, 1000000)

// set the commitment and the data on the first node so that it can respond
// to the catchup request
n1 := reactors[0]
parityBlock, lastLen, err := types.Encode(ps, types.BlockPartSizeBytes)
require.NoError(t, err)

partHashes := extractHashes(ps, parityBlock)
proofs := extractProofs(ps, parityBlock)

cb := &proptypes.CompactBlock{
Proposal: *prop,
LastLen: uint32(lastLen),
Signature: cmtrand.Bytes(64), // todo: sign the proposal with a real signature
BpHash: parityBlock.Hash(),
Blobs: metaData,
PartsHashes: partHashes,
}

cb.SetProofCache(proofs)

added := n1.AddProposal(cb)
require.True(t, added)

_, parts, _, has := n1.getAllState(prop.Height, prop.Round)
require.True(t, has)

parts.SetProposalData(ps, parityBlock)

// add the partset header to the second node and trigger the call to retry
// wants
n2 := reactors[1]

_, _, has = n2.GetProposal(prop.Height, prop.Round)
require.False(t, has)

psh := ps.Header()
n2.AddCommitment(prop.Height, prop.Round, &psh)

// this call simulates getting a commitment for a proposal of a higher
// height
n2.retryWants(2, 0)

time.Sleep(800 * time.Millisecond)

_, caughtUp, has := n2.GetProposal(prop.Height, prop.Round)
require.True(t, has)
require.True(t, caughtUp.IsComplete())
}
2 changes: 1 addition & 1 deletion consensus/propagation/commitment.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@
// time a proposal is received from a peer or when a proposal is created. If the
// proposal is new, it will be stored and broadcast to the relevant peers.
func (blockProp *Reactor) handleCompactBlock(cb *proptypes.CompactBlock, peer p2p.ID, proposer bool) {
added, _, _ := blockProp.AddProposal(cb)
added := blockProp.AddProposal(cb)
if !added {
return
}
Expand Down Expand Up @@ -217,7 +217,7 @@
schema.WriteMempoolRecoveredParts(blockProp.traceClient, cb.Proposal.Height, cb.Proposal.Round, int(p.Index), true)
}

return

Check failure on line 220 in consensus/propagation/commitment.go

View workflow job for this annotation

GitHub Actions / golangci-lint

S1023: redundant `return` statement (gosimple)
}

// broadcastProposal gossips the provided proposal to all peers. This should
Expand Down
31 changes: 13 additions & 18 deletions consensus/propagation/commitment_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ type proposalData struct {
}

type ProposalCache struct {
store *store.BlockStore
pmtx *sync.Mutex
proposals map[int64]map[int32]*proposalData
currentHeight int64
currentRound int32
store *store.BlockStore
pmtx *sync.Mutex
proposals map[int64]map[int32]*proposalData
currentHeight int64
currentRound int32
consensusHeight int64
}

func NewProposalCache(bs *store.BlockStore) *ProposalCache {
Expand All @@ -37,32 +38,26 @@ func NewProposalCache(bs *store.BlockStore) *ProposalCache {
return pc
}

func (p *ProposalCache) AddProposal(cb *proptypes.CompactBlock) (added bool, gapHeights []int64, gapRounds []int32) {
func (p *ProposalCache) AddProposal(cb *proptypes.CompactBlock) (added bool) {
p.pmtx.Lock()
defer p.pmtx.Unlock()
if cb.Proposal.Height <= p.consensusHeight {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so we're no longer storing past proposals? or it means we already have it?

return false
}

if p.proposals[cb.Proposal.Height] == nil {
p.proposals[cb.Proposal.Height] = make(map[int32]*proposalData)
}
if p.proposals[cb.Proposal.Height][cb.Proposal.Round] != nil {
return false, gapHeights, gapRounds
return false
}

// if the propsoal is for a lower height, make sure that we have that height

// if we don't have this proposal, and its height is greater than the current
// height, update the current height and round.
if cb.Proposal.Height > p.currentHeight {
// add the missing heights to the gapHeights
for h := p.currentHeight + 1; h < cb.Proposal.Height; h++ {
gapHeights = append(gapHeights, h)
}
p.currentHeight = cb.Proposal.Height
p.currentRound = cb.Proposal.Round
} else if cb.Proposal.Height == p.currentHeight && cb.Proposal.Round > p.currentRound {
// add the missing rounds to the gapRounds
for r := p.currentRound + 1; r < cb.Proposal.Round; r++ {
gapRounds = append(gapRounds, r)
}
p.currentRound = cb.Proposal.Round
}

Expand All @@ -71,7 +66,7 @@ func (p *ProposalCache) AddProposal(cb *proptypes.CompactBlock) (added bool, gap
block: proptypes.NewCombinedSetFromCompactBlock(cb),
maxRequests: bits.NewBitArray(int(cb.Proposal.BlockID.PartSetHeader.Total)),
}
return true, gapHeights, gapRounds
return true
}

// GetProposal returns the proposal and block for a given height and round if
Expand Down
18 changes: 2 additions & 16 deletions consensus/propagation/commitment_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,6 @@ func TestProposalCache_AddProposal(t *testing.T) {
name string
inputProposal *proptypes.CompactBlock
wantAdded bool
wantGapHeights []int64
wantGapRounds []int32
wantCurrentHeight int64
wantCurrentRound int32
}
Expand All @@ -66,55 +64,43 @@ func TestProposalCache_AddProposal(t *testing.T) {
name: "Add first proposal - updates current height/round",
inputProposal: makeCompactBlock(10, 1, 5),
wantAdded: true,
wantGapHeights: []int64{1, 2, 3, 4, 5, 6, 7, 8, 9}, // since store.Height=0 initially
wantGapRounds: nil,
wantCurrentHeight: 10,
wantCurrentRound: 1,
},
{
name: "Add proposal at same height, higher round - updates current round",
inputProposal: makeCompactBlock(10, 3, 5),
wantAdded: true,
wantGapHeights: nil, // same height
wantGapRounds: []int32{2}, // we jumped from round=1 to round=3
wantCurrentHeight: 10,
wantCurrentRound: 3,
},
{
name: "Add proposal with same height and round - returns false",
inputProposal: makeCompactBlock(10, 3, 5),
wantAdded: false, // already have 10/3 from above
wantGapHeights: nil,
wantGapRounds: nil,
wantCurrentHeight: 10,
wantCurrentRound: 3,
},
{
name: "Add proposal at higher height, round 0 - gap in heights",
inputProposal: makeCompactBlock(12, 0, 5),
wantAdded: true,
wantGapHeights: []int64{11},
wantGapRounds: nil,
wantCurrentHeight: 12,
wantCurrentRound: 0,
},
{
name: "Add proposal with older height - no height/round update",
inputProposal: makeCompactBlock(5, 0, 5),
wantAdded: true, // it doesn't exist yet, so it can be added
wantGapHeights: nil, // ignoring the store, it won't fill in from 0..5
wantGapRounds: nil,
wantCurrentHeight: 12, // still remain at 12/0
wantCurrentHeight: 12, // still remain at 12/0
wantCurrentRound: 0,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
added, gapHeights, gapRounds := pc.AddProposal(tc.inputProposal)
added := pc.AddProposal(tc.inputProposal)
require.Equal(t, tc.wantAdded, added, "added mismatch")
require.Equal(t, tc.wantGapHeights, gapHeights, "gapHeights mismatch")
require.Equal(t, tc.wantGapRounds, gapRounds, "gapRounds mismatch")
require.Equal(t, tc.wantCurrentHeight, pc.currentHeight, "currentHeight mismatch")
require.Equal(t, tc.wantCurrentRound, pc.currentRound, "currentRound mismatch")
})
Expand Down
Loading
Loading