Skip to content

Commit 1cd979c

Browse files
evan-forbesrach-id
andauthored
fix: trace haves and wants correctly (#2693)
fixes some traces for the prop reactor --------- Co-authored-by: CHAMI Rachid <chamirachid1@gmail.com>
1 parent b6797ed commit 1cd979c

6 files changed

Lines changed: 64 additions & 13 deletions

File tree

consensus/propagation/commitment.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ func (blockProp *Reactor) ProposeBlock(proposal *types.Proposal, block *types.Pa
127127
peer.consensusPeerState.SetHasProposalBlockPart(proposal.Height, proposal.Round, int(part.GetIndex()))
128128
}
129129

130+
schema.WriteBlockPartState(blockProp.traceClient, proposal.Height, proposal.Round, chunks[index].GetTrueIndices(), true, string(peer.peer.ID()), schema.Upload, "have")
130131
}
131132
return nil
132133
}

consensus/propagation/have_wants.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,17 @@ func (blockProp *Reactor) sendWant(ps *PeerState, want *proptypes.WantParts) {
296296
return
297297
}
298298

299+
schema.WriteBlockPartState(
300+
blockProp.traceClient,
301+
want.Height,
302+
want.Round,
303+
want.Parts.GetTrueIndices(),
304+
false,
305+
string(ps.peer.ID()),
306+
schema.Upload,
307+
"want",
308+
)
309+
299310
// keep track of the parts that this node has requested.
300311
ps.AddRequests(want.Height, want.Round, want.Parts)
301312
ps.IncreaseConcurrentReqs(int64(len(want.Parts.GetTrueIndices())))

consensus/propagation/reactor.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,12 @@ func (blockProp *Reactor) ReceiveEnvelope(e p2p.Envelope) {
272272
schema.WriteProposal(blockProp.traceClient, msg.Proposal.Height, msg.Proposal.Round, string(e.Src.ID()), schema.Download)
273273
case *proptypes.HaveParts:
274274
blockProp.handleHaves(e.Src.ID(), msg)
275+
// Trace the received HaveParts message
276+
indexes := make([]int, len(msg.Parts))
277+
for i, part := range msg.Parts {
278+
indexes[i] = int(part.Index)
279+
}
280+
schema.WriteBlockPartState(blockProp.traceClient, msg.Height, msg.Round, indexes, true, string(e.Src.ID()), schema.Download, "have")
275281
case *proptypes.RecoveryPart:
276282
schema.WriteBlockPart(blockProp.traceClient, msg.Height, msg.Round, msg.Index, false, string(e.Src.ID()), schema.Download)
277283
blockProp.handleRecoveryPart(e.Src.ID(), msg)
@@ -282,6 +288,8 @@ func (blockProp *Reactor) ReceiveEnvelope(e p2p.Envelope) {
282288
switch msg := msg.(type) {
283289
case *proptypes.WantParts:
284290
blockProp.handleWants(e.Src.ID(), msg)
291+
// Trace the received WantParts message
292+
schema.WriteBlockPartState(blockProp.traceClient, msg.Height, msg.Round, msg.Parts.GetTrueIndices(), false, string(e.Src.ID()), schema.Download, "want")
285293
}
286294
default:
287295
blockProp.Logger.Error(fmt.Sprintf("Unknown chId %X", e.ChannelID))

consensus/state.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,10 @@ type State struct {
176176

177177
// gossipDataEnabled controls whether the gossipDataRoutine should run
178178
gossipDataEnabled atomic.Bool
179+
180+
// proposalReceivedTime tracks when the proposal was received for the current height/round
181+
// Used to calculate the duration until full block is received
182+
proposalReceivedTime time.Time
179183
}
180184

181185
// StateOption sets an optional parameter on the State.
@@ -876,6 +880,7 @@ func (cs *State) updateToState(state sm.State) {
876880
cs.rs.Proposal = nil
877881
cs.rs.ProposalBlock = nil
878882
cs.rs.ProposalBlockParts = nil
883+
cs.proposalReceivedTime = time.Time{}
879884
cs.rs.LockedRound = -1
880885
cs.rs.LockedBlock = nil
881886
cs.rs.LockedBlockParts = nil
@@ -1244,6 +1249,7 @@ func (cs *State) enterNewRound(height int64, round int32) {
12441249
cs.rs.Proposal = nil
12451250
cs.rs.ProposalBlock = nil
12461251
cs.rs.ProposalBlockParts = nil
1252+
cs.proposalReceivedTime = time.Time{}
12471253
}
12481254

12491255
logger.Debug("entering new round",
@@ -2239,7 +2245,7 @@ func (cs *State) defaultSetProposal(proposal *types.Proposal) error {
22392245
}
22402246

22412247
cs.Logger.Info("received proposal", "proposal", proposal, "proposer", pubKey.Address())
2242-
schema.WriteFullBlockReceivingTime(cs.traceClient, proposal.Height, proposal.Round, time.Now(), false)
2248+
cs.proposalReceivedTime = time.Now()
22432249
return nil
22442250
}
22452251

@@ -2296,7 +2302,10 @@ func (cs *State) addProposalBlockPart(msg *BlockPartMessage, peerID p2p.ID) (add
22962302
)
22972303
}
22982304
if added && cs.rs.ProposalBlockParts.IsComplete() {
2299-
schema.WriteFullBlockReceivingTime(cs.traceClient, msg.Height, msg.Round, time.Now(), true)
2305+
if !cs.proposalReceivedTime.IsZero() {
2306+
duration := time.Since(cs.proposalReceivedTime)
2307+
schema.WriteFullBlockReceivingTime(cs.traceClient, msg.Height, msg.Round, duration)
2308+
}
23002309
bz := cs.rs.ProposalBlockParts.GetBytes()
23012310

23022311
pbb := new(cmtproto.Block)

docs/p2p-channels.md

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
# P2P Channel Values
2+
3+
This document lists all the P2P channel identifiers used by the various reactors in celestia-core.
4+
5+
## Channel Overview
6+
7+
| Channel Name | Hex Value | Decimal Value | Reactor | File | Description |
8+
|--------------|-----------|---------------|---------|------|-------------|
9+
| PexChannel | 0x00 | 0 | PEX | `p2p/pex/pex_reactor.go:21` | Peer exchange protocol |
10+
| StateChannel | 0x20 | 32 | Consensus | `consensus/reactor.go:31` | Consensus state messages |
11+
| DataChannel | 0x21 | 33 | Consensus | `consensus/reactor.go:32` | Consensus data messages |
12+
| VoteChannel | 0x22 | 34 | Consensus | `consensus/reactor.go:33` | Vote messages |
13+
| VoteSetBitsChannel | 0x23 | 35 | Consensus | `consensus/reactor.go:34` | Vote set bits messages |
14+
| MempoolChannel | 0x30 | 48 | Mempool | `mempool/mempool.go:13` | Basic mempool channel |
15+
| MempoolDataChannel | 0x31 | 49 | Mempool CAT | `mempool/cat/reactor.go:27` | CAT mempool data (SeenTx and blob messages) |
16+
| MempoolWantsChannel | 0x32 | 50 | Mempool CAT | `mempool/cat/reactor.go:30` | CAT mempool wants messages |
17+
| EvidenceChannel | 0x38 | 56 | Evidence | `evidence/reactor.go:17` | Evidence messages |
18+
| BlocksyncChannel | 0x40 | 64 | Blocksync | `blocksync/reactor.go:22` | Block synchronization |
19+
| DataChannel | 0x50 | 80 | Propagation | `consensus/propagation/reactor.go:33` | Propagation data messages |
20+
| WantChannel | 0x51 | 81 | Propagation | `consensus/propagation/reactor.go:36` | Propagation want messages |
21+
| SnapshotChannel | 0x60 | 96 | State Sync | `statesync/reactor.go:21` | State sync snapshot messages |
22+
| ChunkChannel | 0x61 | 97 | State Sync | `statesync/reactor.go:23` | State sync chunk messages |

libs/trace/schema/consensus.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,7 @@ type BlockPartState struct {
273273
Have bool `json:"have"`
274274
Peer string `json:"peer"`
275275
TransferType TransferType `json:"transfer_type"`
276+
MessageType string `json:"message_type"`
276277
}
277278

278279
// Table returns the table name for the BlockPart struct.
@@ -290,6 +291,7 @@ func WriteBlockPartState(
290291
have bool,
291292
peer string,
292293
transferType TransferType,
294+
messageType string,
293295
) {
294296
// this check is redundant to what is checked during client.Write, although it
295297
// is an optimization to avoid allocations from the map of fields.
@@ -304,6 +306,7 @@ func WriteBlockPartState(
304306
Have: have,
305307
Peer: peer,
306308
TransferType: transferType,
309+
MessageType: messageType,
307310
})
308311
}
309312

@@ -493,11 +496,10 @@ const (
493496
)
494497

495498
type FullBlockReceivingTime struct {
496-
Height int64 `json:"height"`
497-
Round int32 `json:"round"`
498-
Time time.Time `json:"time"`
499-
// Complete set to false when we receive the proposal. Set to true when the full block is received
500-
Complete bool `json:"complete"`
499+
Height int64 `json:"height"`
500+
Round int32 `json:"round"`
501+
// Duration in milliseconds from when the proposal was received to when the full block was received
502+
DurationMs int64 `json:"duration_ms"`
501503
}
502504

503505
func (b FullBlockReceivingTime) Table() string {
@@ -508,13 +510,11 @@ func WriteFullBlockReceivingTime(
508510
client trace.Tracer,
509511
height int64,
510512
round int32,
511-
t time.Time,
512-
complete bool,
513+
duration time.Duration,
513514
) {
514515
client.Write(FullBlockReceivingTime{
515-
Height: height,
516-
Round: round,
517-
Time: t,
518-
Complete: complete,
516+
Height: height,
517+
Round: round,
518+
DurationMs: duration.Milliseconds(),
519519
})
520520
}

0 commit comments

Comments
 (0)