diff --git a/pkg/api/handler.go b/pkg/api/handler.go index 004e3295..e1637a3d 100644 --- a/pkg/api/handler.go +++ b/pkg/api/handler.go @@ -67,7 +67,8 @@ func (h *Handler) Register(ctx context.Context, router *httprouter.Router) error router.GET("/eth/v2/beacon/blocks/:block_id", h.wrappedHandler(h.handleEthV2BeaconBlocks)) - router.GET("/eth/v2/debug/beacon/states/:state_id", h.wrappedHandler(h.handleEthV2DebugBeaconStates)) + // Use streaming handler for states - they can be 100s of MB and we don't want to buffer them + router.GET("/eth/v2/debug/beacon/states/:state_id", h.wrappedStreamingHandler(h.handleEthV2DebugBeaconStatesStreaming)) router.GET("/checkpointz/v1/status", h.wrappedHandler(h.handleCheckpointzStatus)) router.GET("/checkpointz/v1/beacon/slots", h.wrappedHandler(h.handleCheckpointzBeaconSlots)) @@ -139,6 +140,43 @@ func (h *Handler) wrappedHandler(handler func(ctx context.Context, r *http.Reque } } +// StreamingHandler is a handler that writes directly to the ResponseWriter for large responses. +// This avoids buffering the entire response in memory, which is important for beacon states (~100s MB). +type StreamingHandler func(ctx context.Context, w http.ResponseWriter, r *http.Request, p httprouter.Params, contentType ContentType) error + +func (h *Handler) wrappedStreamingHandler(handler StreamingHandler) httprouter.Handle { + return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { + start := time.Now() + + contentType := NewContentTypeFromRequest(r) + ctx := r.Context() + registeredPath := deriveRegisteredPath(r, p) + + h.log.WithFields(logrus.Fields{ + "method": r.Method, + "path": r.URL.Path, + "content_type": contentType, + "accept": r.Header.Get("Accept"), + }).Trace("Handling streaming request") + + h.metrics.ObserveRequest(r.Method, registeredPath) + + statusCode := http.StatusOK + + defer func() { + h.metrics.ObserveResponse(r.Method, registeredPath, fmt.Sprintf("%v", statusCode), contentType.String(), time.Since(start)) + }() + + if err := handler(ctx, w, r, p, contentType); err != nil { + statusCode = http.StatusInternalServerError + + if writeErr := WriteErrorResponse(w, err.Error(), statusCode); writeErr != nil { + h.log.WithError(writeErr).Error("Failed to write error response") + } + } + } +} + func (h *Handler) handleEthV1BeaconGenesis(ctx context.Context, r *http.Request, p httprouter.Params, contentType ContentType) (*HTTPResponse, error) { if err := ValidateContentType(contentType, []ContentType{ContentTypeJSON}); err != nil { return NewUnsupportedMediaTypeResponse(nil), err @@ -199,45 +237,60 @@ func (h *Handler) handleEthV2BeaconBlocks(ctx context.Context, r *http.Request, return rsp, nil } -func (h *Handler) handleEthV2DebugBeaconStates(ctx context.Context, r *http.Request, p httprouter.Params, contentType ContentType) (*HTTPResponse, error) { +// handleEthV2DebugBeaconStatesStreaming streams beacon states directly to the ResponseWriter. +// This avoids buffering the entire state (~100s of MB) in memory before writing. +func (h *Handler) handleEthV2DebugBeaconStatesStreaming(ctx context.Context, w http.ResponseWriter, r *http.Request, p httprouter.Params, contentType ContentType) error { if err := ValidateContentType(contentType, []ContentType{ContentTypeSSZ}); err != nil { - return NewUnsupportedMediaTypeResponse(nil), err + w.WriteHeader(http.StatusNotAcceptable) + + return err } id, err := eth.NewStateIdentifier(p.ByName("state_id")) if err != nil { - return NewBadRequestResponse(nil), err + w.WriteHeader(http.StatusBadRequest) + + return err } state, err := h.eth.BeaconState(ctx, id) if err != nil { - return NewInternalServerErrorResponse(nil), err + w.WriteHeader(http.StatusInternalServerError) + + return err } if state == nil { - return NewInternalServerErrorResponse(nil), errors.New("state not found") + w.WriteHeader(http.StatusInternalServerError) + + return errors.New("state not found") } - rsp := NewSuccessResponse(ContentTypeResolvers{ - ContentTypeSSZ: func() ([]byte, error) { - return h.sszEncoder.EncodeStateSSZ(state) - }, - }) + // Set headers before streaming + w.Header().Set("Content-Type", ContentTypeSSZ.String()) + w.Header().Set("Eth-Consensus-Version", state.Version.String()) switch id.Type() { case eth.StateIDSlot: - rsp.SetCacheControl("public, s-max-age=6000") + w.Header().Set("Cache-Control", "public, s-max-age=6000") case eth.StateIDFinalized: - rsp.SetCacheControl("public, s-max-age=180") + w.Header().Set("Cache-Control", "public, s-max-age=180") case eth.StateIDRoot: - rsp.SetCacheControl("public, s-max-age=6000") + w.Header().Set("Cache-Control", "public, s-max-age=6000") case eth.StateIDHead: - rsp.SetCacheControl("public, s-max-age=30") + w.Header().Set("Cache-Control", "public, s-max-age=30") } - rsp.SetEthConsensusVersion(state.Version.String()) + // Get size for Content-Length header (enables efficient HTTP handling) + size, err := h.sszEncoder.StateSizeSSZ(state) + if err == nil { + w.Header().Set("Content-Length", strconv.Itoa(size)) + } - return rsp, nil + // Stream the SSZ-encoded state directly to the ResponseWriter + _, err = h.sszEncoder.WriteStateSSZ(ctx, w, state) + + return err } func (h *Handler) handleEthV1ConfigSpec(ctx context.Context, r *http.Request, p httprouter.Params, contentType ContentType) (*HTTPResponse, error) { diff --git a/pkg/beacon/config.go b/pkg/beacon/config.go index c1252432..95edf896 100644 --- a/pkg/beacon/config.go +++ b/pkg/beacon/config.go @@ -19,6 +19,11 @@ type Config struct { // HistoricalEpochCount determines how many historical epochs the provider will cache. HistoricalEpochCount int `yaml:"historical_epoch_count" default:"20"` + // SSZEncodingMemoryBudgetBytes limits the total memory used for concurrent SSZ encoding + // operations (e.g., serving beacon states). Set to 0 to disable limiting (default). + // Example: 17179869184 (16GB) allows ~53 concurrent mainnet state encodings. + SSZEncodingMemoryBudgetBytes int64 `yaml:"ssz_encoding_memory_budget_bytes" default:"0"` + // Cache holds configuration for the caches. Frontend FrontendConfig `yaml:"frontend"` } diff --git a/pkg/beacon/default.go b/pkg/beacon/default.go index 82aa5c54..b4461625 100644 --- a/pkg/beacon/default.go +++ b/pkg/beacon/default.go @@ -81,7 +81,7 @@ func NewDefaultProvider(namespace string, log logrus.FieldLogger, nodes []node.C historicalSlotFailures: make(map[phase0.Slot]int), broker: emission.NewEmitter(), - sszEncoder: ssz.NewEncoder(config.CustomPreset), + sszEncoder: ssz.NewEncoder(config.CustomPreset, config.SSZEncodingMemoryBudgetBytes), blocks: store.NewBlock(log, config.Caches.Blocks, namespace), states: store.NewBeaconState(log, config.Caches.States, namespace), depositSnapshots: store.NewDepositSnapshot(log, config.Caches.DepositSnapshots, namespace), @@ -750,6 +750,7 @@ func (d *Default) ListFinalizedSlots(ctx context.Context) ([]phase0.Slot, error) latestSlot := phase0.Slot(uint64(finality.Finalized.Epoch) * uint64(sp.SlotsPerEpoch)) + //nolint:gosec // G115: HistoricalEpochCount is a small positive config value, safe to convert for i, val := uint64(latestSlot), uint64(latestSlot)-uint64(sp.SlotsPerEpoch)*uint64(d.config.HistoricalEpochCount); i > val; i -= uint64(sp.SlotsPerEpoch) { slots = append(slots, phase0.Slot(i)) } diff --git a/pkg/beacon/download.go b/pkg/beacon/download.go index 2c97dd69..04798ad5 100644 --- a/pkg/beacon/download.go +++ b/pkg/beacon/download.go @@ -173,10 +173,12 @@ func (d *Default) fetchHistoricalCheckpoints(ctx context.Context, checkpoint *v1 // We'll derive the current finalized slot and then work back in intervals of SLOTS_PER_EPOCH. currentSlot := uint64(checkpoint.Finalized.Epoch) * uint64(sp.SlotsPerEpoch) for i := 1; i < d.config.HistoricalEpochCount; i++ { + //nolint:gosec // G115: i is a positive loop counter bounded by HistoricalEpochCount if uint64(i)*uint64(sp.SlotsPerEpoch) > currentSlot { break } + //nolint:gosec // G115: i is a positive loop counter bounded by HistoricalEpochCount slot := phase0.Slot(currentSlot - uint64(i)*uint64(sp.SlotsPerEpoch)) slotsInScope[slot] = struct{}{} diff --git a/pkg/beacon/expire_test.go b/pkg/beacon/expire_test.go index 48dc1e40..dc3111aa 100644 --- a/pkg/beacon/expire_test.go +++ b/pkg/beacon/expire_test.go @@ -13,10 +13,12 @@ var ( ) func CalculateSlotExpiration(slot phase0.Slot, slotsOfHistory int) phase0.Slot { + //nolint:gosec // G115: slotsOfHistory is a small positive test value return slot + phase0.Slot(slotsOfHistory) } func GetSlotTime(slot phase0.Slot, secondsPerSlot time.Duration, genesis time.Time) time.Time { + //nolint:gosec // G115: slot values are bounded by beacon chain limits, safe for duration return genesis.Add(time.Duration(slot) * secondsPerSlot) } diff --git a/pkg/beacon/ssz/encoder.go b/pkg/beacon/ssz/encoder.go index d0c21088..814ab425 100644 --- a/pkg/beacon/ssz/encoder.go +++ b/pkg/beacon/ssz/encoder.go @@ -1,28 +1,54 @@ package ssz import ( + "context" "encoding/json" "errors" + "io" "sync" "github.com/attestantio/go-eth2-client/spec" "github.com/attestantio/go-eth2-client/spec/phase0" "github.com/ethpandaops/beacon/pkg/beacon/state" + "golang.org/x/sync/semaphore" dynssz "github.com/pk910/dynamic-ssz" "github.com/pk910/dynamic-ssz/sszutils" ) +// sszBufferPool provides reusable buffers for SSZ encoding to reduce allocations. +// Beacon states can be 100s of MB, so reusing buffers significantly reduces GC pressure. +var sszBufferPool = sync.Pool{ + New: func() any { + // Start with 1MB buffer, will grow as needed + b := make([]byte, 0, 1024*1024) + return &b + }, +} + type Encoder struct { customPreset bool dynssz *dynssz.DynSsz spec map[string]any specMtx sync.Mutex + + // memorySem limits total memory used for concurrent SSZ encoding. + // If nil, no limit is applied. + memorySem *semaphore.Weighted } -func NewEncoder(customPreset bool) *Encoder { +// NewEncoder creates a new SSZ encoder. +// If memoryBudget is > 0, limits concurrent SSZ encoding to that many bytes. +// If memoryBudget is <= 0, no limit is applied. +func NewEncoder(customPreset bool, memoryBudget int64) *Encoder { + var sem *semaphore.Weighted + if memoryBudget > 0 { + sem = semaphore.NewWeighted(memoryBudget) + } + return &Encoder{ customPreset: customPreset, + memorySem: sem, } } @@ -213,3 +239,134 @@ func (e *Encoder) EncodeStateSSZ(beaconState *spec.VersionedBeaconState) (ssz [] return ssz, nil } + +// WriteStateSSZ encodes the beacon state as SSZ and writes directly to w. +// This uses a pooled buffer to reduce allocations for large states (~100s of MB). +// If the encoder was created with a memory budget, this method will block until +// sufficient memory is available and respects context cancellation. +// The memory budget is released immediately after encoding completes, allowing +// slow client connections to stream without holding the budget hostage. +// Returns the number of bytes written. +func (e *Encoder) WriteStateSSZ(ctx context.Context, w io.Writer, beaconState *spec.VersionedBeaconState) (int64, error) { + var stateObj sszutils.FastsszMarshaler + + switch beaconState.Version { + case spec.DataVersionPhase0: + stateObj = beaconState.Phase0 + case spec.DataVersionAltair: + stateObj = beaconState.Altair + case spec.DataVersionBellatrix: + stateObj = beaconState.Bellatrix + case spec.DataVersionCapella: + stateObj = beaconState.Capella + case spec.DataVersionDeneb: + stateObj = beaconState.Deneb + case spec.DataVersionElectra: + stateObj = beaconState.Electra + case spec.DataVersionFulu: + stateObj = beaconState.Fulu + default: + return 0, errors.New("unknown state version") + } + + // Get the size upfront - needed for both memory budgeting and buffer allocation + size := stateObj.SizeSSZ() + + // If memory budget is configured, acquire memory from the semaphore. + // Note: We release the semaphore after encoding, not after streaming. + // This allows slow clients to receive data without holding the budget. + if e.memorySem != nil { + if err := e.memorySem.Acquire(ctx, int64(size)); err != nil { + return 0, err + } + } + + // For custom presets, fall back to regular encoding (dynamic-ssz doesn't support MarshalSSZTo) + if e.customPreset { + data, err := e.getDynamicSSZ().MarshalSSZ(stateObj) + + // Release memory budget immediately after encoding (before streaming to client) + if e.memorySem != nil { + e.memorySem.Release(int64(size)) + } + + if err != nil { + return 0, err + } + + n, err := w.Write(data) + + return int64(n), err + } + + // Acquire a pooled buffer + bufPtr, ok := sszBufferPool.Get().(*[]byte) + if !ok || bufPtr == nil { + // Pool returned unexpected type, allocate fresh buffer + b := make([]byte, 0, size) + bufPtr = &b + } + + buf := *bufPtr + + // Ensure buffer has enough capacity + if cap(buf) < size { + buf = make([]byte, 0, size) + } else { + buf = buf[:0] + } + + // Marshal into the buffer + data, err := stateObj.MarshalSSZTo(buf) + + // Release memory budget immediately after encoding (before streaming to client) + // This allows slow clients to receive data without holding the budget hostage. + if e.memorySem != nil { + e.memorySem.Release(int64(size)) + } + + if err != nil { + // Return buffer to pool even on error + *bufPtr = buf + sszBufferPool.Put(bufPtr) + + return 0, err + } + + // Write to the output (this can take a long time for slow clients, but we've + // already released the memory budget so other requests can proceed) + n, err := w.Write(data) + + // Return buffer to pool + *bufPtr = buf + sszBufferPool.Put(bufPtr) + + return int64(n), err +} + +// StateSizeSSZ returns the SSZ encoded size of the beacon state without encoding it. +// Useful for setting Content-Length headers before streaming. +func (e *Encoder) StateSizeSSZ(beaconState *spec.VersionedBeaconState) (int, error) { + var stateObj sszutils.FastsszMarshaler + + switch beaconState.Version { + case spec.DataVersionPhase0: + stateObj = beaconState.Phase0 + case spec.DataVersionAltair: + stateObj = beaconState.Altair + case spec.DataVersionBellatrix: + stateObj = beaconState.Bellatrix + case spec.DataVersionCapella: + stateObj = beaconState.Capella + case spec.DataVersionDeneb: + stateObj = beaconState.Deneb + case spec.DataVersionElectra: + stateObj = beaconState.Electra + case spec.DataVersionFulu: + stateObj = beaconState.Fulu + default: + return 0, errors.New("unknown state version") + } + + return stateObj.SizeSSZ(), nil +} diff --git a/pkg/beacon/ssz/encoder_test.go b/pkg/beacon/ssz/encoder_test.go new file mode 100644 index 00000000..5a4ee67f --- /dev/null +++ b/pkg/beacon/ssz/encoder_test.go @@ -0,0 +1,313 @@ +package ssz + +import ( + "bytes" + "context" + "io" + "testing" + + "github.com/attestantio/go-eth2-client/spec" + "github.com/attestantio/go-eth2-client/spec/altair" + "github.com/attestantio/go-eth2-client/spec/capella" + "github.com/attestantio/go-eth2-client/spec/deneb" + "github.com/attestantio/go-eth2-client/spec/phase0" + "github.com/holiman/uint256" + "github.com/stretchr/testify/require" +) + +// createTestState creates a minimal beacon state for testing. +// In real usage, states are ~100-300MB. We use smaller states for benchmarks +// to keep CI fast, but the relative performance difference still applies. +func createTestState(validatorCount int) *spec.VersionedBeaconState { + validators := make([]*phase0.Validator, validatorCount) + balances := make([]phase0.Gwei, validatorCount) + prevParticipation := make([]altair.ParticipationFlags, validatorCount) + currParticipation := make([]altair.ParticipationFlags, validatorCount) + inactivityScores := make([]uint64, validatorCount) + + for i := 0; i < validatorCount; i++ { + validators[i] = &phase0.Validator{ + PublicKey: phase0.BLSPubKey{}, + WithdrawalCredentials: make([]byte, 32), + EffectiveBalance: 32000000000, + Slashed: false, + ActivationEpoch: 0, + ExitEpoch: ^phase0.Epoch(0), + WithdrawableEpoch: ^phase0.Epoch(0), + } + balances[i] = 32000000000 + prevParticipation[i] = 0 + currParticipation[i] = 0 + inactivityScores[i] = 0 + } + + // Create sync committee with proper pubkeys + syncCommitteePubkeys := make([]phase0.BLSPubKey, 512) + for i := range syncCommitteePubkeys { + syncCommitteePubkeys[i] = phase0.BLSPubKey{} + } + + syncCommittee := &altair.SyncCommittee{ + Pubkeys: syncCommitteePubkeys, + AggregatePubkey: phase0.BLSPubKey{}, + } + + state := &deneb.BeaconState{ + GenesisTime: 1606824023, + GenesisValidatorsRoot: phase0.Root{}, + Slot: 1000000, + Fork: &phase0.Fork{ + PreviousVersion: phase0.Version{0x04, 0x00, 0x00, 0x00}, + CurrentVersion: phase0.Version{0x04, 0x00, 0x00, 0x00}, + Epoch: 0, + }, + LatestBlockHeader: &phase0.BeaconBlockHeader{ + Slot: 999999, + ProposerIndex: 0, + ParentRoot: phase0.Root{}, + StateRoot: phase0.Root{}, + BodyRoot: phase0.Root{}, + }, + BlockRoots: make([]phase0.Root, 8192), + StateRoots: make([]phase0.Root, 8192), + HistoricalRoots: []phase0.Root{}, + ETH1Data: &phase0.ETH1Data{ + DepositRoot: phase0.Root{}, + DepositCount: 0, + BlockHash: make([]byte, 32), + }, + ETH1DataVotes: []*phase0.ETH1Data{}, + ETH1DepositIndex: 0, + Validators: validators, + Balances: balances, + RANDAOMixes: make([]phase0.Root, 65536), + Slashings: make([]phase0.Gwei, 8192), + PreviousEpochParticipation: prevParticipation, + CurrentEpochParticipation: currParticipation, + JustificationBits: []byte{0}, + PreviousJustifiedCheckpoint: &phase0.Checkpoint{}, + CurrentJustifiedCheckpoint: &phase0.Checkpoint{}, + FinalizedCheckpoint: &phase0.Checkpoint{}, + InactivityScores: inactivityScores, + CurrentSyncCommittee: syncCommittee, + NextSyncCommittee: syncCommittee, + LatestExecutionPayloadHeader: &deneb.ExecutionPayloadHeader{ + ParentHash: phase0.Hash32{}, + FeeRecipient: [20]byte{}, + StateRoot: phase0.Root{}, + ReceiptsRoot: phase0.Root{}, + LogsBloom: [256]byte{}, + PrevRandao: [32]byte{}, + BlockNumber: 0, + GasLimit: 0, + GasUsed: 0, + Timestamp: 0, + ExtraData: []byte{}, + BaseFeePerGas: uint256.NewInt(0), + BlockHash: phase0.Hash32{}, + TransactionsRoot: phase0.Root{}, + WithdrawalsRoot: phase0.Root{}, + BlobGasUsed: 0, + ExcessBlobGas: 0, + }, + NextWithdrawalIndex: 0, + NextWithdrawalValidatorIndex: 0, + HistoricalSummaries: []*capella.HistoricalSummary{}, + } + + return &spec.VersionedBeaconState{ + Version: spec.DataVersionDeneb, + Deneb: state, + } +} + +func TestEncodeStateSSZ(t *testing.T) { + encoder := NewEncoder(false, 0) + state := createTestState(100) + + data, err := encoder.EncodeStateSSZ(state) + require.NoError(t, err) + require.NotEmpty(t, data) +} + +func TestWriteStateSSZ(t *testing.T) { + encoder := NewEncoder(false, 0) + state := createTestState(100) + + var buf bytes.Buffer + + n, err := encoder.WriteStateSSZ(context.Background(), &buf, state) + require.NoError(t, err) + require.Greater(t, n, int64(0)) + + // Verify output matches EncodeStateSSZ + expected, err := encoder.EncodeStateSSZ(state) + require.NoError(t, err) + require.Equal(t, expected, buf.Bytes()) +} + +func TestStateSizeSSZ(t *testing.T) { + encoder := NewEncoder(false, 0) + state := createTestState(100) + + size, err := encoder.StateSizeSSZ(state) + require.NoError(t, err) + require.Greater(t, size, 0) + + // Verify size matches actual encoded size + data, err := encoder.EncodeStateSSZ(state) + require.NoError(t, err) + require.Equal(t, size, len(data)) +} + +func TestWriteStateSSZ_MemoryBounded(t *testing.T) { + state := createTestState(100) + + // Get the size of this state + unboundedEncoder := NewEncoder(false, 0) + + size, err := unboundedEncoder.StateSizeSSZ(state) + require.NoError(t, err) + + // Create encoder with memory budget that allows exactly 2 concurrent encodings + memoryBudget := int64(size * 2) + encoder := NewEncoder(false, memoryBudget) + + // Test basic encoding works + var buf bytes.Buffer + + n, err := encoder.WriteStateSSZ(context.Background(), &buf, state) + require.NoError(t, err) + require.Equal(t, int64(size), n) +} + +func TestWriteStateSSZ_MemoryBounded_ContextCancellation(t *testing.T) { + state := createTestState(100) + + // Create encoder with very small memory budget (smaller than state size) + // This will cause the semaphore to block + encoder := NewEncoder(false, 1) // 1 byte budget + + // Create an already-cancelled context + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + // Should fail immediately due to cancelled context + _, err := encoder.WriteStateSSZ(ctx, io.Discard, state) + require.Error(t, err) + require.ErrorIs(t, err, context.Canceled) +} + +// BenchmarkEncodeStateSSZ benchmarks the original non-streaming encoder. +// This allocates a new buffer for each call. +func BenchmarkEncodeStateSSZ(b *testing.B) { + benchmarks := []struct { + name string + validatorCount int + }{ + {"100_validators", 100}, + {"1000_validators", 1000}, + {"10000_validators", 10000}, + } + + encoder := NewEncoder(false, 0) + + for _, bm := range benchmarks { + state := createTestState(bm.validatorCount) + + b.Run(bm.name, func(b *testing.B) { + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + data, err := encoder.EncodeStateSSZ(state) + if err != nil { + b.Fatal(err) + } + + // Prevent compiler optimization + if len(data) == 0 { + b.Fatal("empty data") + } + } + }) + } +} + +// BenchmarkWriteStateSSZ benchmarks the streaming encoder with pooled buffers. +// This reuses buffers across calls, reducing allocations. +func BenchmarkWriteStateSSZ(b *testing.B) { + benchmarks := []struct { + name string + validatorCount int + }{ + {"100_validators", 100}, + {"1000_validators", 1000}, + {"10000_validators", 10000}, + } + + encoder := NewEncoder(false, 0) + + for _, bm := range benchmarks { + state := createTestState(bm.validatorCount) + + b.Run(bm.name, func(b *testing.B) { + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + n, err := encoder.WriteStateSSZ(context.Background(), io.Discard, state) + if err != nil { + b.Fatal(err) + } + + // Prevent compiler optimization + if n == 0 { + b.Fatal("no bytes written") + } + } + }) + } +} + +// BenchmarkWriteStateSSZ_Parallel tests buffer pool contention under parallel load. +func BenchmarkWriteStateSSZ_Parallel(b *testing.B) { + encoder := NewEncoder(false, 0) + state := createTestState(1000) + + b.ReportAllocs() + b.ResetTimer() + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + n, err := encoder.WriteStateSSZ(context.Background(), io.Discard, state) + if err != nil { + b.Fatal(err) + } + + if n == 0 { + b.Fatal("no bytes written") + } + } + }) +} + +// BenchmarkStateSizeSSZ benchmarks size calculation (no encoding). +func BenchmarkStateSizeSSZ(b *testing.B) { + encoder := NewEncoder(false, 0) + state := createTestState(10000) + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + size, err := encoder.StateSizeSSZ(state) + if err != nil { + b.Fatal(err) + } + + if size == 0 { + b.Fatal("zero size") + } + } +} diff --git a/pkg/beacon/store/block.go b/pkg/beacon/store/block.go index e3222fde..bb450074 100644 --- a/pkg/beacon/store/block.go +++ b/pkg/beacon/store/block.go @@ -89,8 +89,8 @@ func (c *Block) cleanupBlock(block *spec.VersionedSignedBeaconBlock) error { return err } - c.slotToBlockRoot.Delete(eth.SlotAsString(slot)) - c.stateRootToBlockRoot.Delete(eth.RootAsString(stateRoot)) + c.slotToBlockRoot.Delete(slot) + c.stateRootToBlockRoot.Delete(stateRoot) return nil } diff --git a/pkg/beacon/store/block_test.go b/pkg/beacon/store/block_test.go new file mode 100644 index 00000000..613f534a --- /dev/null +++ b/pkg/beacon/store/block_test.go @@ -0,0 +1,311 @@ +package store + +import ( + "fmt" + "sync/atomic" + "testing" + "time" + + "github.com/attestantio/go-eth2-client/spec" + "github.com/attestantio/go-eth2-client/spec/altair" + "github.com/attestantio/go-eth2-client/spec/bellatrix" + "github.com/attestantio/go-eth2-client/spec/capella" + "github.com/attestantio/go-eth2-client/spec/deneb" + "github.com/attestantio/go-eth2-client/spec/phase0" + "github.com/holiman/uint256" + "github.com/prysmaticlabs/go-bitfield" + "github.com/sirupsen/logrus/hooks/test" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// benchNamespaceCounter ensures unique namespaces for benchmarks to avoid Prometheus metric conflicts. +var benchNamespaceCounter atomic.Uint64 + +func uniqueNamespace(prefix string) string { + return fmt.Sprintf("%s_%d", prefix, benchNamespaceCounter.Add(1)) +} + +func createTestBlock(slot phase0.Slot) *spec.VersionedSignedBeaconBlock { + return &spec.VersionedSignedBeaconBlock{ + Version: spec.DataVersionDeneb, + Deneb: &deneb.SignedBeaconBlock{ + Message: &deneb.BeaconBlock{ + Slot: slot, + ProposerIndex: phase0.ValidatorIndex(slot % 1000), + ParentRoot: phase0.Root{byte(slot)}, + StateRoot: phase0.Root{byte(slot), byte(slot >> 8)}, + Body: &deneb.BeaconBlockBody{ + RANDAOReveal: phase0.BLSSignature{}, + ETH1Data: &phase0.ETH1Data{}, + Graffiti: [32]byte{}, + ProposerSlashings: []*phase0.ProposerSlashing{}, + AttesterSlashings: []*phase0.AttesterSlashing{}, + Attestations: []*phase0.Attestation{}, + Deposits: []*phase0.Deposit{}, + VoluntaryExits: []*phase0.SignedVoluntaryExit{}, + SyncAggregate: &altair.SyncAggregate{ + SyncCommitteeBits: bitfield.NewBitvector512(), + SyncCommitteeSignature: phase0.BLSSignature{}, + }, + ExecutionPayload: &deneb.ExecutionPayload{ + ParentHash: phase0.Hash32{}, + FeeRecipient: [20]byte{}, + StateRoot: phase0.Root{}, + ReceiptsRoot: phase0.Root{}, + LogsBloom: [256]byte{}, + PrevRandao: [32]byte{}, + BlockNumber: uint64(slot), + GasLimit: 30000000, + GasUsed: 15000000, + Timestamp: uint64(slot) * 12, + ExtraData: []byte{}, + BaseFeePerGas: uint256.NewInt(0), + BlockHash: phase0.Hash32{byte(slot)}, + Transactions: []bellatrix.Transaction{}, + Withdrawals: []*capella.Withdrawal{}, + BlobGasUsed: 0, + ExcessBlobGas: 0, + }, + BLSToExecutionChanges: []*capella.SignedBLSToExecutionChange{}, + BlobKZGCommitments: []deneb.KZGCommitment{}, + }, + }, + Signature: phase0.BLSSignature{}, + }, + } +} + +func TestBlockAddAndGet(t *testing.T) { + logger, _ := test.NewNullLogger() + config := Config{MaxItems: 10} + namespace := "test_block_a" + blockStore := NewBlock(logger, config, namespace) + + slot := phase0.Slot(100) + block := createTestBlock(slot) + expiresAt := time.Now().Add(10 * time.Minute) + + // Get the root for this block + root := phase0.Root{0x01, 0x02, 0x03} + + err := blockStore.Add(root, block, expiresAt) + require.NoError(t, err) + + // Get by root + retrievedBlock, err := blockStore.GetByRoot(root) + require.NoError(t, err) + require.NotNil(t, retrievedBlock) + + retrievedSlot, err := retrievedBlock.Slot() + require.NoError(t, err) + assert.Equal(t, slot, retrievedSlot) + + // Get by slot + retrievedBlock2, err := blockStore.GetBySlot(slot) + require.NoError(t, err) + require.NotNil(t, retrievedBlock2) +} + +func TestBlockGetBySlotNotFound(t *testing.T) { + logger, _ := test.NewNullLogger() + config := Config{MaxItems: 10} + namespace := "test_block_b" + blockStore := NewBlock(logger, config, namespace) + + slot := phase0.Slot(200) + + retrievedBlock, err := blockStore.GetBySlot(slot) + assert.Error(t, err) + assert.Nil(t, retrievedBlock) +} + +func TestBlockCleanup(t *testing.T) { + logger, _ := test.NewNullLogger() + config := Config{MaxItems: 3} + namespace := "test_block_cleanup" + blockStore := NewBlock(logger, config, namespace) + + // Add 4 blocks (max is 3, so first should be evicted) + for i := 0; i < 4; i++ { + slot := phase0.Slot(100 + i) //nolint:gosec // test code with bounded values + block := createTestBlock(slot) + root := phase0.Root{byte(i)} + expiresAt := time.Now().Add(time.Duration(i+1) * time.Minute) + + err := blockStore.Add(root, block, expiresAt) + require.NoError(t, err) + } + + // Give the eviction callback time to run + time.Sleep(100 * time.Millisecond) + + // The first block (slot 100) should have been evicted (closest to expiry) + _, err := blockStore.GetBySlot(phase0.Slot(100)) + assert.Error(t, err, "slot 100 should have been evicted") + + // Slots 101, 102, 103 should still exist + for i := 1; i < 4; i++ { + _, err := blockStore.GetBySlot(phase0.Slot(100 + i)) //nolint:gosec // test code with bounded values + assert.NoError(t, err, "slot %d should still exist", 100+i) + } +} + +// BenchmarkBlockAdd benchmarks adding blocks to the store. +func BenchmarkBlockAdd(b *testing.B) { + logger, _ := test.NewNullLogger() + config := Config{MaxItems: 1000} + blockStore := NewBlock(logger, config, uniqueNamespace("bench_block_add")) + + blocks := make([]*spec.VersionedSignedBeaconBlock, b.N) + roots := make([]phase0.Root, b.N) + + for i := 0; i < b.N; i++ { + blocks[i] = createTestBlock(phase0.Slot(i)) //nolint:gosec // test code with bounded values + roots[i] = phase0.Root{byte(i), byte(i >> 8), byte(i >> 16)} + } + + expiresAt := time.Now().Add(1 * time.Hour) + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + err := blockStore.Add(roots[i], blocks[i], expiresAt) + if err != nil { + b.Fatal(err) + } + } +} + +// BenchmarkBlockGetBySlot benchmarks slot lookups. +func BenchmarkBlockGetBySlot(b *testing.B) { + logger, _ := test.NewNullLogger() + config := Config{MaxItems: 1000} + blockStore := NewBlock(logger, config, uniqueNamespace("bench_block_get_slot")) + + // Pre-populate with blocks + numBlocks := 100 + + for i := 0; i < numBlocks; i++ { + block := createTestBlock(phase0.Slot(i)) //nolint:gosec // test code with bounded values + root := phase0.Root{byte(i), byte(i >> 8)} + expiresAt := time.Now().Add(1 * time.Hour) + + err := blockStore.Add(root, block, expiresAt) + if err != nil { + b.Fatal(err) + } + } + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + slot := phase0.Slot(i % numBlocks) //nolint:gosec // test code with bounded values + + _, err := blockStore.GetBySlot(slot) + if err != nil { + b.Fatal(err) + } + } +} + +// BenchmarkBlockGetByRoot benchmarks root lookups. +func BenchmarkBlockGetByRoot(b *testing.B) { + logger, _ := test.NewNullLogger() + config := Config{MaxItems: 1000} + blockStore := NewBlock(logger, config, uniqueNamespace("bench_block_get_root")) + + // Pre-populate with blocks + numBlocks := 100 + roots := make([]phase0.Root, numBlocks) + + for i := 0; i < numBlocks; i++ { + block := createTestBlock(phase0.Slot(i)) //nolint:gosec // test code with bounded values + roots[i] = phase0.Root{byte(i), byte(i >> 8)} + expiresAt := time.Now().Add(1 * time.Hour) + + err := blockStore.Add(roots[i], block, expiresAt) + if err != nil { + b.Fatal(err) + } + } + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + root := roots[i%numBlocks] + + _, err := blockStore.GetByRoot(root) + if err != nil { + b.Fatal(err) + } + } +} + +// BenchmarkBlockEviction benchmarks the eviction behavior when cache is full. +func BenchmarkBlockEviction(b *testing.B) { + logger, _ := test.NewNullLogger() + config := Config{MaxItems: 10} // Small cache to force evictions + blockStore := NewBlock(logger, config, uniqueNamespace("bench_block_evict")) + + blocks := make([]*spec.VersionedSignedBeaconBlock, b.N) + roots := make([]phase0.Root, b.N) + + for i := 0; i < b.N; i++ { + blocks[i] = createTestBlock(phase0.Slot(i)) //nolint:gosec // test code with bounded values + roots[i] = phase0.Root{byte(i), byte(i >> 8), byte(i >> 16)} + } + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + // Each add after the 10th will trigger eviction + expiresAt := time.Now().Add(time.Duration(i) * time.Millisecond) + + err := blockStore.Add(roots[i], blocks[i], expiresAt) + if err != nil { + b.Fatal(err) + } + } +} + +// BenchmarkBlockGetBySlot_Parallel benchmarks concurrent slot lookups. +func BenchmarkBlockGetBySlot_Parallel(b *testing.B) { + logger, _ := test.NewNullLogger() + config := Config{MaxItems: 1000} + blockStore := NewBlock(logger, config, uniqueNamespace("bench_block_parallel")) + + // Pre-populate with blocks + numBlocks := 100 + + for i := 0; i < numBlocks; i++ { + block := createTestBlock(phase0.Slot(i)) //nolint:gosec // test code with bounded values + root := phase0.Root{byte(i), byte(i >> 8)} + expiresAt := time.Now().Add(1 * time.Hour) + + err := blockStore.Add(root, block, expiresAt) + if err != nil { + b.Fatal(err) + } + } + + b.ReportAllocs() + b.ResetTimer() + + b.RunParallel(func(pb *testing.PB) { + i := 0 + for pb.Next() { + slot := phase0.Slot(i % numBlocks) //nolint:gosec // test code with bounded values + i++ + + _, err := blockStore.GetBySlot(slot) + if err != nil { + b.Fatal(err) + } + } + }) +} diff --git a/pkg/eth/slot.go b/pkg/eth/slot.go index bd15f41e..2c004c46 100644 --- a/pkg/eth/slot.go +++ b/pkg/eth/slot.go @@ -14,6 +14,7 @@ type SlotTime struct { } func CalculateSlotTime(slot phase0.Slot, genesisTime time.Time, durationPerSlot time.Duration) SlotTime { + //nolint:gosec // G115: slot values are bounded by beacon chain limits, safe for duration slotStartTime := genesisTime.Add(time.Duration(slot) * durationPerSlot).UTC() return SlotTime{ diff --git a/pkg/service/eth/block_id.go b/pkg/service/eth/block_id.go index ebf05a5a..1f313905 100644 --- a/pkg/service/eth/block_id.go +++ b/pkg/service/eth/block_id.go @@ -88,6 +88,10 @@ func NewSlotFromString(id string) (phase0.Slot, error) { return 0, err } + if slot < 0 { + return 0, errors.New("slot cannot be negative") + } + return phase0.Slot(slot), nil } diff --git a/pkg/service/eth/eth.go b/pkg/service/eth/eth.go index 12939a59..a1e80fa3 100644 --- a/pkg/service/eth/eth.go +++ b/pkg/service/eth/eth.go @@ -572,6 +572,7 @@ func (h *Handler) BlobSidecars(ctx context.Context, blockID BlockIdentifier, ind // Find the sidecar with the given index for i, sidecar := range sidecars { + //nolint:gosec // G115: blob indices are small values (max 6), safe to convert if index == int(sidecar.Index) { filtered = append(filtered, sidecars[i])