Skip to content

Commit 03da8ab

Browse files
rootulpclaude
authored andcommitted
fix: release P2P channel receive buffer after large message EOF (#2815)
## Summary - Replace `ch.recving = ch.recving[:0]` with `ch.recving = make([]byte, 0, ch.desc.RecvBufferCapacity)` in `Channel.recvPacketMsg` so that large backing arrays are released to GC after message completion instead of being retained for the connection lifetime. - Add unit tests verifying buffer capacity resets to baseline after single-packet and multi-packet large messages. ## Rationale A remote peer can pin large heap allocations (~128 MiB per connection on the BLOCKSYNC channel) by sending one large message and keeping the connection open. With 40 default inbound peers, this enables ~5 GiB of pinned heap leading to OOM. The fix ensures the receive buffer shrinks back to its baseline capacity (default 4 KiB) after each message completes. ## Test plan - [x] `TestRecvPacketMsgReleasesBufferAfterLargeMessage` — verifies buffer cap returns to baseline after a single large EOF message - [x] `TestRecvPacketMsgMultiPacketMessage` — verifies buffer cap returns to baseline after multi-chunk reassembly - [x] All existing `p2p/conn` tests pass 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com> (cherry picked from commit dc43229)
1 parent 252b1bd commit 03da8ab

2 files changed

Lines changed: 79 additions & 5 deletions

File tree

p2p/conn/connection.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -899,11 +899,11 @@ func (ch *Channel) recvPacketMsg(packet tmp2p.PacketMsg) ([]byte, error) {
899899
ch.recving = append(ch.recving, packet.Data...)
900900
if packet.EOF {
901901
msgBytes := ch.recving
902-
// clear the slice without re-allocating.
903-
// http://stackoverflow.com/questions/16971741/how-do-you-clear-a-slice-in-go
904-
// suggests this could be a memory leak, but we might as well keep the memory for the channel until it closes,
905-
// at which point the recving slice stops being used and should be garbage collected
906-
ch.recving = ch.recving[:0] // make([]byte, 0, ch.desc.RecvBufferCapacity)
902+
// Reset the receive buffer to the baseline capacity so that large
903+
// message allocations are not retained for the lifetime of the
904+
// connection. Without this, a peer can pin memory by sending a
905+
// single large message and keeping the connection open.
906+
ch.recving = make([]byte, 0, ch.desc.RecvBufferCapacity)
907907
return msgBytes, nil
908908
}
909909
return nil, nil

p2p/conn/connection_test.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -618,6 +618,80 @@ func TestMConnectionChannelOverflow(t *testing.T) {
618618

619619
}
620620

621+
func TestRecvPacketMsgReleasesBufferAfterLargeMessage(t *testing.T) {
622+
const (
623+
recvBufCap = 4 * 1024 // 4 KiB baseline
624+
msgSize = 8 * 1024 * 1024 // 8 MiB message
625+
)
626+
627+
ch := &Channel{
628+
desc: ChannelDescriptor{
629+
ID: 0x01,
630+
Priority: 1,
631+
RecvBufferCapacity: recvBufCap,
632+
RecvMessageCapacity: msgSize + 1024,
633+
},
634+
recving: make([]byte, 0, recvBufCap),
635+
}
636+
637+
require.Equal(t, recvBufCap, cap(ch.recving))
638+
639+
data := make([]byte, msgSize)
640+
msgBytes, err := ch.recvPacketMsg(tmp2p.PacketMsg{Data: data, EOF: true})
641+
require.NoError(t, err)
642+
require.Len(t, msgBytes, msgSize)
643+
644+
// After EOF, the receive buffer must be reset to the baseline capacity
645+
// rather than retaining the large backing array.
646+
require.Equal(t, 0, len(ch.recving))
647+
require.Equal(t, recvBufCap, cap(ch.recving),
648+
"receive buffer capacity should shrink back to RecvBufferCapacity after EOF")
649+
}
650+
651+
func TestRecvPacketMsgMultiPacketMessage(t *testing.T) {
652+
const (
653+
recvBufCap = 4 * 1024 // 4 KiB baseline
654+
chunkSize = 1024 // 1 KiB per packet
655+
numChunks = 100 // 100 KiB total message
656+
)
657+
658+
ch := &Channel{
659+
desc: ChannelDescriptor{
660+
ID: 0x01,
661+
Priority: 1,
662+
RecvBufferCapacity: recvBufCap,
663+
RecvMessageCapacity: chunkSize*numChunks + 1024,
664+
},
665+
recving: make([]byte, 0, recvBufCap),
666+
}
667+
668+
// Send non-EOF chunks that accumulate in the buffer.
669+
for i := 0; i < numChunks-1; i++ {
670+
msgBytes, err := ch.recvPacketMsg(tmp2p.PacketMsg{
671+
Data: make([]byte, chunkSize),
672+
EOF: false,
673+
})
674+
require.NoError(t, err)
675+
require.Nil(t, msgBytes, "non-EOF packet should not return message bytes")
676+
}
677+
678+
// Buffer should have grown to hold the accumulated data.
679+
require.Equal(t, chunkSize*(numChunks-1), len(ch.recving))
680+
681+
// Final EOF chunk completes the message.
682+
msgBytes, err := ch.recvPacketMsg(tmp2p.PacketMsg{
683+
Data: make([]byte, chunkSize),
684+
EOF: true,
685+
})
686+
require.NoError(t, err)
687+
require.Len(t, msgBytes, chunkSize*numChunks)
688+
689+
// Buffer must be reset to baseline after EOF.
690+
require.Equal(t, 0, len(ch.recving))
691+
require.Equal(t, recvBufCap, cap(ch.recving),
692+
"receive buffer capacity should shrink back to RecvBufferCapacity after EOF")
693+
}
694+
621695
type stopper interface {
622696
Stop() error
623697
}

0 commit comments

Comments
 (0)