Skip to content

Commit 744676e

Browse files
rootulpclaude
authored andcommitted
fix: disconnect peers that send invalid chunks during state sync (#2814)
## Summary - When the ABCI application rejects a chunk sender via `RejectSenders` during state sync, the peer was only blacklisted in the in-memory snapshot pool (`snapshots.RejectPeer`) but remained connected at the P2P layer. A malicious peer could keep sending poisoned `ChunkResponse` messages, creating an infinite retry loop. - Adds an `onPeerRejected` callback to the syncer that the reactor wires up to call `StopPeerForError`, disconnecting the rejected peer at the P2P layer. - Adds `TestSyncer_applyChunks_onPeerRejected` to verify the callback is invoked with the correct peer ID. ## Test plan - [x] `go test -v -run TestSyncer_applyChunks_onPeerRejected ./statesync/` - [x] `go test -v ./statesync/...` (all 37 tests pass) - [x] `go test -race ./statesync/...` 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com> (cherry picked from commit c4ce459)
1 parent 252b1bd commit 744676e

3 files changed

Lines changed: 83 additions & 4 deletions

File tree

statesync/reactor.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,12 @@ func (r *Reactor) Sync(stateProvider StateProvider, discoveryTime time.Duration)
275275
}
276276
r.metrics.Syncing.Set(1)
277277
r.syncer = newSyncer(r.cfg, r.Logger, r.conn, r.connQuery, stateProvider, r.tempDir)
278+
r.syncer.setOnPeerRejected(func(peerID p2p.ID) {
279+
peer := r.Switch.Peers().Get(peerID)
280+
if peer != nil {
281+
r.Switch.StopPeerForError(peer, "chunk sender rejected by ABCI application", r.String())
282+
}
283+
})
278284
r.mtx.Unlock()
279285

280286
hook := func() {

statesync/syncer.go

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,9 @@ type syncer struct {
6060
chunkFetchers int32
6161
retryTimeout time.Duration
6262

63-
mtx cmtsync.RWMutex
64-
chunks *chunkQueue
63+
mtx cmtsync.RWMutex
64+
chunks *chunkQueue
65+
onPeerRejected func(p2p.ID)
6566
}
6667

6768
// newSyncer creates a new syncer.
@@ -86,6 +87,13 @@ func newSyncer(
8687
}
8788
}
8889

90+
// setOnPeerRejected sets a callback that is invoked when a peer is rejected by the ABCI
91+
// application during chunk application. This allows the caller to disconnect the peer at the
92+
// P2P layer, preventing them from continuing to send invalid chunks.
93+
func (s *syncer) setOnPeerRejected(fn func(p2p.ID)) {
94+
s.onPeerRejected = fn
95+
}
96+
8997
// AddChunk adds a chunk to the chunk queue, if any. It returns false if the chunk has already
9098
// been added to the queue, or an error if there's no sync in progress.
9199
func (s *syncer) AddChunk(chunk *chunk) (bool, error) {
@@ -403,8 +411,12 @@ func (s *syncer) applyChunks(chunks *chunkQueue) error {
403411
// Reject any senders as requested by the app
404412
for _, sender := range resp.RejectSenders {
405413
if sender != "" {
406-
s.snapshots.RejectPeer(p2p.ID(sender))
407-
err := chunks.DiscardSender(p2p.ID(sender))
414+
peerID := p2p.ID(sender)
415+
s.snapshots.RejectPeer(peerID)
416+
if s.onPeerRejected != nil {
417+
s.onPeerRejected(peerID)
418+
}
419+
err := chunks.DiscardSender(peerID)
408420
if err != nil {
409421
return fmt.Errorf("failed to reject sender: %w", err)
410422
}

statesync/syncer_test.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -636,6 +636,67 @@ func TestSyncer_applyChunks_RejectSenders(t *testing.T) {
636636
}
637637
}
638638

639+
func TestSyncer_applyChunks_onPeerRejected(t *testing.T) {
640+
connQuery := &proxymocks.AppConnQuery{}
641+
connSnapshot := &proxymocks.AppConnSnapshot{}
642+
stateProvider := &mocks.StateProvider{}
643+
stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
644+
645+
cfg := config.DefaultStateSyncConfig()
646+
syncer := newSyncer(*cfg, log.NewNopLogger(), connSnapshot, connQuery, stateProvider, "")
647+
648+
// Track which peer IDs were reported via the callback.
649+
var rejectedPeers []p2p.ID
650+
syncer.setOnPeerRejected(func(peerID p2p.ID) {
651+
rejectedPeers = append(rejectedPeers, peerID)
652+
})
653+
654+
peerA := simplePeer("a")
655+
peerB := simplePeer("b")
656+
peerC := simplePeer("c")
657+
658+
s1 := &snapshot{Height: 1, Format: 1, Chunks: 3}
659+
_, err := syncer.AddSnapshot(peerA, s1)
660+
require.NoError(t, err)
661+
_, err = syncer.AddSnapshot(peerB, s1)
662+
require.NoError(t, err)
663+
_, err = syncer.AddSnapshot(peerC, s1)
664+
require.NoError(t, err)
665+
666+
chunks, err := newChunkQueue(s1, "")
667+
require.NoError(t, err)
668+
added, err := chunks.Add(&chunk{Height: 1, Format: 1, Index: 0, Chunk: []byte{0}, Sender: peerA.ID()})
669+
require.True(t, added)
670+
require.NoError(t, err)
671+
added, err = chunks.Add(&chunk{Height: 1, Format: 1, Index: 1, Chunk: []byte{1}, Sender: peerB.ID()})
672+
require.True(t, added)
673+
require.NoError(t, err)
674+
added, err = chunks.Add(&chunk{Height: 1, Format: 1, Index: 2, Chunk: []byte{2}, Sender: peerC.ID()})
675+
require.True(t, added)
676+
require.NoError(t, err)
677+
678+
// First two chunks are accepted, the third triggers rejection of sender "b".
679+
connSnapshot.On("ApplySnapshotChunk", mock.Anything, &abci.RequestApplySnapshotChunk{
680+
Index: 0, Chunk: []byte{0}, Sender: "a",
681+
}).Once().Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
682+
connSnapshot.On("ApplySnapshotChunk", mock.Anything, &abci.RequestApplySnapshotChunk{
683+
Index: 1, Chunk: []byte{1}, Sender: "b",
684+
}).Once().Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
685+
connSnapshot.On("ApplySnapshotChunk", mock.Anything, &abci.RequestApplySnapshotChunk{
686+
Index: 2, Chunk: []byte{2}, Sender: "c",
687+
}).Once().Return(&abci.ResponseApplySnapshotChunk{
688+
Result: abci.ResponseApplySnapshotChunk_ACCEPT,
689+
RejectSenders: []string{string(peerB.ID())},
690+
}, nil)
691+
692+
err = syncer.applyChunks(chunks)
693+
require.NoError(t, err)
694+
695+
require.Len(t, rejectedPeers, 1)
696+
assert.Equal(t, p2p.ID("b"), rejectedPeers[0])
697+
connSnapshot.AssertExpectations(t)
698+
}
699+
639700
func TestSyncer_verifyApp(t *testing.T) {
640701
boom := errors.New("boom")
641702
const appVersion = 9

0 commit comments

Comments
 (0)