From e006ce460e367485bdeda8379561104078f46905 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Tue, 9 Dec 2025 16:02:43 +1000 Subject: [PATCH 1/3] feat: Memory optimizations --- pkg/api/handler.go | 87 +++++++-- pkg/beacon/config.go | 5 + pkg/beacon/default.go | 2 +- pkg/beacon/ssz/encoder.go | 159 ++++++++++++++++- pkg/beacon/ssz/encoder_test.go | 313 +++++++++++++++++++++++++++++++++ pkg/beacon/store/block.go | 5 +- pkg/beacon/store/block_test.go | 311 ++++++++++++++++++++++++++++++++ 7 files changed, 861 insertions(+), 21 deletions(-) create mode 100644 pkg/beacon/ssz/encoder_test.go create mode 100644 pkg/beacon/store/block_test.go diff --git a/pkg/api/handler.go b/pkg/api/handler.go index 004e329..e1637a3 100644 --- a/pkg/api/handler.go +++ b/pkg/api/handler.go @@ -67,7 +67,8 @@ func (h *Handler) Register(ctx context.Context, router *httprouter.Router) error router.GET("/eth/v2/beacon/blocks/:block_id", h.wrappedHandler(h.handleEthV2BeaconBlocks)) - router.GET("/eth/v2/debug/beacon/states/:state_id", h.wrappedHandler(h.handleEthV2DebugBeaconStates)) + // Use streaming handler for states - they can be 100s of MB and we don't want to buffer them + router.GET("/eth/v2/debug/beacon/states/:state_id", h.wrappedStreamingHandler(h.handleEthV2DebugBeaconStatesStreaming)) router.GET("/checkpointz/v1/status", h.wrappedHandler(h.handleCheckpointzStatus)) router.GET("/checkpointz/v1/beacon/slots", h.wrappedHandler(h.handleCheckpointzBeaconSlots)) @@ -139,6 +140,43 @@ func (h *Handler) wrappedHandler(handler func(ctx context.Context, r *http.Reque } } +// StreamingHandler is a handler that writes directly to the ResponseWriter for large responses. +// This avoids buffering the entire response in memory, which is important for beacon states (~100s MB). +type StreamingHandler func(ctx context.Context, w http.ResponseWriter, r *http.Request, p httprouter.Params, contentType ContentType) error + +func (h *Handler) wrappedStreamingHandler(handler StreamingHandler) httprouter.Handle { + return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { + start := time.Now() + + contentType := NewContentTypeFromRequest(r) + ctx := r.Context() + registeredPath := deriveRegisteredPath(r, p) + + h.log.WithFields(logrus.Fields{ + "method": r.Method, + "path": r.URL.Path, + "content_type": contentType, + "accept": r.Header.Get("Accept"), + }).Trace("Handling streaming request") + + h.metrics.ObserveRequest(r.Method, registeredPath) + + statusCode := http.StatusOK + + defer func() { + h.metrics.ObserveResponse(r.Method, registeredPath, fmt.Sprintf("%v", statusCode), contentType.String(), time.Since(start)) + }() + + if err := handler(ctx, w, r, p, contentType); err != nil { + statusCode = http.StatusInternalServerError + + if writeErr := WriteErrorResponse(w, err.Error(), statusCode); writeErr != nil { + h.log.WithError(writeErr).Error("Failed to write error response") + } + } + } +} + func (h *Handler) handleEthV1BeaconGenesis(ctx context.Context, r *http.Request, p httprouter.Params, contentType ContentType) (*HTTPResponse, error) { if err := ValidateContentType(contentType, []ContentType{ContentTypeJSON}); err != nil { return NewUnsupportedMediaTypeResponse(nil), err @@ -199,45 +237,60 @@ func (h *Handler) handleEthV2BeaconBlocks(ctx context.Context, r *http.Request, return rsp, nil } -func (h *Handler) handleEthV2DebugBeaconStates(ctx context.Context, r *http.Request, p httprouter.Params, contentType ContentType) (*HTTPResponse, error) { +// handleEthV2DebugBeaconStatesStreaming streams beacon states directly to the ResponseWriter. +// This avoids buffering the entire state (~100s of MB) in memory before writing. +func (h *Handler) handleEthV2DebugBeaconStatesStreaming(ctx context.Context, w http.ResponseWriter, r *http.Request, p httprouter.Params, contentType ContentType) error { if err := ValidateContentType(contentType, []ContentType{ContentTypeSSZ}); err != nil { - return NewUnsupportedMediaTypeResponse(nil), err + w.WriteHeader(http.StatusNotAcceptable) + + return err } id, err := eth.NewStateIdentifier(p.ByName("state_id")) if err != nil { - return NewBadRequestResponse(nil), err + w.WriteHeader(http.StatusBadRequest) + + return err } state, err := h.eth.BeaconState(ctx, id) if err != nil { - return NewInternalServerErrorResponse(nil), err + w.WriteHeader(http.StatusInternalServerError) + + return err } if state == nil { - return NewInternalServerErrorResponse(nil), errors.New("state not found") + w.WriteHeader(http.StatusInternalServerError) + + return errors.New("state not found") } - rsp := NewSuccessResponse(ContentTypeResolvers{ - ContentTypeSSZ: func() ([]byte, error) { - return h.sszEncoder.EncodeStateSSZ(state) - }, - }) + // Set headers before streaming + w.Header().Set("Content-Type", ContentTypeSSZ.String()) + w.Header().Set("Eth-Consensus-Version", state.Version.String()) switch id.Type() { case eth.StateIDSlot: - rsp.SetCacheControl("public, s-max-age=6000") + w.Header().Set("Cache-Control", "public, s-max-age=6000") case eth.StateIDFinalized: - rsp.SetCacheControl("public, s-max-age=180") + w.Header().Set("Cache-Control", "public, s-max-age=180") case eth.StateIDRoot: - rsp.SetCacheControl("public, s-max-age=6000") + w.Header().Set("Cache-Control", "public, s-max-age=6000") case eth.StateIDHead: - rsp.SetCacheControl("public, s-max-age=30") + w.Header().Set("Cache-Control", "public, s-max-age=30") } - rsp.SetEthConsensusVersion(state.Version.String()) + // Get size for Content-Length header (enables efficient HTTP handling) + size, err := h.sszEncoder.StateSizeSSZ(state) + if err == nil { + w.Header().Set("Content-Length", strconv.Itoa(size)) + } - return rsp, nil + // Stream the SSZ-encoded state directly to the ResponseWriter + _, err = h.sszEncoder.WriteStateSSZ(ctx, w, state) + + return err } func (h *Handler) handleEthV1ConfigSpec(ctx context.Context, r *http.Request, p httprouter.Params, contentType ContentType) (*HTTPResponse, error) { diff --git a/pkg/beacon/config.go b/pkg/beacon/config.go index c125243..95edf89 100644 --- a/pkg/beacon/config.go +++ b/pkg/beacon/config.go @@ -19,6 +19,11 @@ type Config struct { // HistoricalEpochCount determines how many historical epochs the provider will cache. HistoricalEpochCount int `yaml:"historical_epoch_count" default:"20"` + // SSZEncodingMemoryBudgetBytes limits the total memory used for concurrent SSZ encoding + // operations (e.g., serving beacon states). Set to 0 to disable limiting (default). + // Example: 17179869184 (16GB) allows ~53 concurrent mainnet state encodings. + SSZEncodingMemoryBudgetBytes int64 `yaml:"ssz_encoding_memory_budget_bytes" default:"0"` + // Cache holds configuration for the caches. Frontend FrontendConfig `yaml:"frontend"` } diff --git a/pkg/beacon/default.go b/pkg/beacon/default.go index 82aa5c5..a6caef0 100644 --- a/pkg/beacon/default.go +++ b/pkg/beacon/default.go @@ -81,7 +81,7 @@ func NewDefaultProvider(namespace string, log logrus.FieldLogger, nodes []node.C historicalSlotFailures: make(map[phase0.Slot]int), broker: emission.NewEmitter(), - sszEncoder: ssz.NewEncoder(config.CustomPreset), + sszEncoder: ssz.NewEncoder(config.CustomPreset, config.SSZEncodingMemoryBudgetBytes), blocks: store.NewBlock(log, config.Caches.Blocks, namespace), states: store.NewBeaconState(log, config.Caches.States, namespace), depositSnapshots: store.NewDepositSnapshot(log, config.Caches.DepositSnapshots, namespace), diff --git a/pkg/beacon/ssz/encoder.go b/pkg/beacon/ssz/encoder.go index d0c2108..67b49f6 100644 --- a/pkg/beacon/ssz/encoder.go +++ b/pkg/beacon/ssz/encoder.go @@ -1,28 +1,54 @@ package ssz import ( + "context" "encoding/json" "errors" + "io" "sync" "github.com/attestantio/go-eth2-client/spec" "github.com/attestantio/go-eth2-client/spec/phase0" "github.com/ethpandaops/beacon/pkg/beacon/state" + "golang.org/x/sync/semaphore" dynssz "github.com/pk910/dynamic-ssz" "github.com/pk910/dynamic-ssz/sszutils" ) +// sszBufferPool provides reusable buffers for SSZ encoding to reduce allocations. +// Beacon states can be 100s of MB, so reusing buffers significantly reduces GC pressure. +var sszBufferPool = sync.Pool{ + New: func() any { + // Start with 1MB buffer, will grow as needed + b := make([]byte, 0, 1024*1024) + return &b + }, +} + type Encoder struct { customPreset bool dynssz *dynssz.DynSsz spec map[string]any specMtx sync.Mutex + + // memorySem limits total memory used for concurrent SSZ encoding. + // If nil, no limit is applied. + memorySem *semaphore.Weighted } -func NewEncoder(customPreset bool) *Encoder { +// NewEncoder creates a new SSZ encoder. +// If memoryBudget is > 0, limits concurrent SSZ encoding to that many bytes. +// If memoryBudget is <= 0, no limit is applied (original behavior). +func NewEncoder(customPreset bool, memoryBudget int64) *Encoder { + var sem *semaphore.Weighted + if memoryBudget > 0 { + sem = semaphore.NewWeighted(memoryBudget) + } + return &Encoder{ customPreset: customPreset, + memorySem: sem, } } @@ -213,3 +239,134 @@ func (e *Encoder) EncodeStateSSZ(beaconState *spec.VersionedBeaconState) (ssz [] return ssz, nil } + +// WriteStateSSZ encodes the beacon state as SSZ and writes directly to w. +// This uses a pooled buffer to reduce allocations for large states (~100s of MB). +// If the encoder was created with a memory budget, this method will block until +// sufficient memory is available and respects context cancellation. +// The memory budget is released immediately after encoding completes, allowing +// slow client connections to stream without holding the budget hostage. +// Returns the number of bytes written. +func (e *Encoder) WriteStateSSZ(ctx context.Context, w io.Writer, beaconState *spec.VersionedBeaconState) (int64, error) { + var stateObj sszutils.FastsszMarshaler + + switch beaconState.Version { + case spec.DataVersionPhase0: + stateObj = beaconState.Phase0 + case spec.DataVersionAltair: + stateObj = beaconState.Altair + case spec.DataVersionBellatrix: + stateObj = beaconState.Bellatrix + case spec.DataVersionCapella: + stateObj = beaconState.Capella + case spec.DataVersionDeneb: + stateObj = beaconState.Deneb + case spec.DataVersionElectra: + stateObj = beaconState.Electra + case spec.DataVersionFulu: + stateObj = beaconState.Fulu + default: + return 0, errors.New("unknown state version") + } + + // Get the size upfront - needed for both memory budgeting and buffer allocation + size := stateObj.SizeSSZ() + + // If memory budget is configured, acquire memory from the semaphore. + // Note: We release the semaphore after encoding, not after streaming. + // This allows slow clients to receive data without holding the budget. + if e.memorySem != nil { + if err := e.memorySem.Acquire(ctx, int64(size)); err != nil { + return 0, err + } + } + + // For custom presets, fall back to regular encoding (dynamic-ssz doesn't support MarshalSSZTo) + if e.customPreset { + data, err := e.getDynamicSSZ().MarshalSSZ(stateObj) + + // Release memory budget immediately after encoding (before streaming to client) + if e.memorySem != nil { + e.memorySem.Release(int64(size)) + } + + if err != nil { + return 0, err + } + + n, err := w.Write(data) + + return int64(n), err + } + + // Acquire a pooled buffer + bufPtr, ok := sszBufferPool.Get().(*[]byte) + if !ok || bufPtr == nil { + // Pool returned unexpected type, allocate fresh buffer + b := make([]byte, 0, size) + bufPtr = &b + } + + buf := *bufPtr + + // Ensure buffer has enough capacity + if cap(buf) < size { + buf = make([]byte, 0, size) + } else { + buf = buf[:0] + } + + // Marshal into the buffer + data, err := stateObj.MarshalSSZTo(buf) + + // Release memory budget immediately after encoding (before streaming to client) + // This allows slow clients to receive data without holding the budget hostage. + if e.memorySem != nil { + e.memorySem.Release(int64(size)) + } + + if err != nil { + // Return buffer to pool even on error + *bufPtr = buf + sszBufferPool.Put(bufPtr) + + return 0, err + } + + // Write to the output (this can take a long time for slow clients, but we've + // already released the memory budget so other requests can proceed) + n, err := w.Write(data) + + // Return buffer to pool + *bufPtr = buf + sszBufferPool.Put(bufPtr) + + return int64(n), err +} + +// StateSizeSSZ returns the SSZ encoded size of the beacon state without encoding it. +// Useful for setting Content-Length headers before streaming. +func (e *Encoder) StateSizeSSZ(beaconState *spec.VersionedBeaconState) (int, error) { + var stateObj sszutils.FastsszMarshaler + + switch beaconState.Version { + case spec.DataVersionPhase0: + stateObj = beaconState.Phase0 + case spec.DataVersionAltair: + stateObj = beaconState.Altair + case spec.DataVersionBellatrix: + stateObj = beaconState.Bellatrix + case spec.DataVersionCapella: + stateObj = beaconState.Capella + case spec.DataVersionDeneb: + stateObj = beaconState.Deneb + case spec.DataVersionElectra: + stateObj = beaconState.Electra + case spec.DataVersionFulu: + stateObj = beaconState.Fulu + default: + return 0, errors.New("unknown state version") + } + + return stateObj.SizeSSZ(), nil +} diff --git a/pkg/beacon/ssz/encoder_test.go b/pkg/beacon/ssz/encoder_test.go new file mode 100644 index 0000000..5a4ee67 --- /dev/null +++ b/pkg/beacon/ssz/encoder_test.go @@ -0,0 +1,313 @@ +package ssz + +import ( + "bytes" + "context" + "io" + "testing" + + "github.com/attestantio/go-eth2-client/spec" + "github.com/attestantio/go-eth2-client/spec/altair" + "github.com/attestantio/go-eth2-client/spec/capella" + "github.com/attestantio/go-eth2-client/spec/deneb" + "github.com/attestantio/go-eth2-client/spec/phase0" + "github.com/holiman/uint256" + "github.com/stretchr/testify/require" +) + +// createTestState creates a minimal beacon state for testing. +// In real usage, states are ~100-300MB. We use smaller states for benchmarks +// to keep CI fast, but the relative performance difference still applies. +func createTestState(validatorCount int) *spec.VersionedBeaconState { + validators := make([]*phase0.Validator, validatorCount) + balances := make([]phase0.Gwei, validatorCount) + prevParticipation := make([]altair.ParticipationFlags, validatorCount) + currParticipation := make([]altair.ParticipationFlags, validatorCount) + inactivityScores := make([]uint64, validatorCount) + + for i := 0; i < validatorCount; i++ { + validators[i] = &phase0.Validator{ + PublicKey: phase0.BLSPubKey{}, + WithdrawalCredentials: make([]byte, 32), + EffectiveBalance: 32000000000, + Slashed: false, + ActivationEpoch: 0, + ExitEpoch: ^phase0.Epoch(0), + WithdrawableEpoch: ^phase0.Epoch(0), + } + balances[i] = 32000000000 + prevParticipation[i] = 0 + currParticipation[i] = 0 + inactivityScores[i] = 0 + } + + // Create sync committee with proper pubkeys + syncCommitteePubkeys := make([]phase0.BLSPubKey, 512) + for i := range syncCommitteePubkeys { + syncCommitteePubkeys[i] = phase0.BLSPubKey{} + } + + syncCommittee := &altair.SyncCommittee{ + Pubkeys: syncCommitteePubkeys, + AggregatePubkey: phase0.BLSPubKey{}, + } + + state := &deneb.BeaconState{ + GenesisTime: 1606824023, + GenesisValidatorsRoot: phase0.Root{}, + Slot: 1000000, + Fork: &phase0.Fork{ + PreviousVersion: phase0.Version{0x04, 0x00, 0x00, 0x00}, + CurrentVersion: phase0.Version{0x04, 0x00, 0x00, 0x00}, + Epoch: 0, + }, + LatestBlockHeader: &phase0.BeaconBlockHeader{ + Slot: 999999, + ProposerIndex: 0, + ParentRoot: phase0.Root{}, + StateRoot: phase0.Root{}, + BodyRoot: phase0.Root{}, + }, + BlockRoots: make([]phase0.Root, 8192), + StateRoots: make([]phase0.Root, 8192), + HistoricalRoots: []phase0.Root{}, + ETH1Data: &phase0.ETH1Data{ + DepositRoot: phase0.Root{}, + DepositCount: 0, + BlockHash: make([]byte, 32), + }, + ETH1DataVotes: []*phase0.ETH1Data{}, + ETH1DepositIndex: 0, + Validators: validators, + Balances: balances, + RANDAOMixes: make([]phase0.Root, 65536), + Slashings: make([]phase0.Gwei, 8192), + PreviousEpochParticipation: prevParticipation, + CurrentEpochParticipation: currParticipation, + JustificationBits: []byte{0}, + PreviousJustifiedCheckpoint: &phase0.Checkpoint{}, + CurrentJustifiedCheckpoint: &phase0.Checkpoint{}, + FinalizedCheckpoint: &phase0.Checkpoint{}, + InactivityScores: inactivityScores, + CurrentSyncCommittee: syncCommittee, + NextSyncCommittee: syncCommittee, + LatestExecutionPayloadHeader: &deneb.ExecutionPayloadHeader{ + ParentHash: phase0.Hash32{}, + FeeRecipient: [20]byte{}, + StateRoot: phase0.Root{}, + ReceiptsRoot: phase0.Root{}, + LogsBloom: [256]byte{}, + PrevRandao: [32]byte{}, + BlockNumber: 0, + GasLimit: 0, + GasUsed: 0, + Timestamp: 0, + ExtraData: []byte{}, + BaseFeePerGas: uint256.NewInt(0), + BlockHash: phase0.Hash32{}, + TransactionsRoot: phase0.Root{}, + WithdrawalsRoot: phase0.Root{}, + BlobGasUsed: 0, + ExcessBlobGas: 0, + }, + NextWithdrawalIndex: 0, + NextWithdrawalValidatorIndex: 0, + HistoricalSummaries: []*capella.HistoricalSummary{}, + } + + return &spec.VersionedBeaconState{ + Version: spec.DataVersionDeneb, + Deneb: state, + } +} + +func TestEncodeStateSSZ(t *testing.T) { + encoder := NewEncoder(false, 0) + state := createTestState(100) + + data, err := encoder.EncodeStateSSZ(state) + require.NoError(t, err) + require.NotEmpty(t, data) +} + +func TestWriteStateSSZ(t *testing.T) { + encoder := NewEncoder(false, 0) + state := createTestState(100) + + var buf bytes.Buffer + + n, err := encoder.WriteStateSSZ(context.Background(), &buf, state) + require.NoError(t, err) + require.Greater(t, n, int64(0)) + + // Verify output matches EncodeStateSSZ + expected, err := encoder.EncodeStateSSZ(state) + require.NoError(t, err) + require.Equal(t, expected, buf.Bytes()) +} + +func TestStateSizeSSZ(t *testing.T) { + encoder := NewEncoder(false, 0) + state := createTestState(100) + + size, err := encoder.StateSizeSSZ(state) + require.NoError(t, err) + require.Greater(t, size, 0) + + // Verify size matches actual encoded size + data, err := encoder.EncodeStateSSZ(state) + require.NoError(t, err) + require.Equal(t, size, len(data)) +} + +func TestWriteStateSSZ_MemoryBounded(t *testing.T) { + state := createTestState(100) + + // Get the size of this state + unboundedEncoder := NewEncoder(false, 0) + + size, err := unboundedEncoder.StateSizeSSZ(state) + require.NoError(t, err) + + // Create encoder with memory budget that allows exactly 2 concurrent encodings + memoryBudget := int64(size * 2) + encoder := NewEncoder(false, memoryBudget) + + // Test basic encoding works + var buf bytes.Buffer + + n, err := encoder.WriteStateSSZ(context.Background(), &buf, state) + require.NoError(t, err) + require.Equal(t, int64(size), n) +} + +func TestWriteStateSSZ_MemoryBounded_ContextCancellation(t *testing.T) { + state := createTestState(100) + + // Create encoder with very small memory budget (smaller than state size) + // This will cause the semaphore to block + encoder := NewEncoder(false, 1) // 1 byte budget + + // Create an already-cancelled context + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + // Should fail immediately due to cancelled context + _, err := encoder.WriteStateSSZ(ctx, io.Discard, state) + require.Error(t, err) + require.ErrorIs(t, err, context.Canceled) +} + +// BenchmarkEncodeStateSSZ benchmarks the original non-streaming encoder. +// This allocates a new buffer for each call. +func BenchmarkEncodeStateSSZ(b *testing.B) { + benchmarks := []struct { + name string + validatorCount int + }{ + {"100_validators", 100}, + {"1000_validators", 1000}, + {"10000_validators", 10000}, + } + + encoder := NewEncoder(false, 0) + + for _, bm := range benchmarks { + state := createTestState(bm.validatorCount) + + b.Run(bm.name, func(b *testing.B) { + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + data, err := encoder.EncodeStateSSZ(state) + if err != nil { + b.Fatal(err) + } + + // Prevent compiler optimization + if len(data) == 0 { + b.Fatal("empty data") + } + } + }) + } +} + +// BenchmarkWriteStateSSZ benchmarks the streaming encoder with pooled buffers. +// This reuses buffers across calls, reducing allocations. +func BenchmarkWriteStateSSZ(b *testing.B) { + benchmarks := []struct { + name string + validatorCount int + }{ + {"100_validators", 100}, + {"1000_validators", 1000}, + {"10000_validators", 10000}, + } + + encoder := NewEncoder(false, 0) + + for _, bm := range benchmarks { + state := createTestState(bm.validatorCount) + + b.Run(bm.name, func(b *testing.B) { + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + n, err := encoder.WriteStateSSZ(context.Background(), io.Discard, state) + if err != nil { + b.Fatal(err) + } + + // Prevent compiler optimization + if n == 0 { + b.Fatal("no bytes written") + } + } + }) + } +} + +// BenchmarkWriteStateSSZ_Parallel tests buffer pool contention under parallel load. +func BenchmarkWriteStateSSZ_Parallel(b *testing.B) { + encoder := NewEncoder(false, 0) + state := createTestState(1000) + + b.ReportAllocs() + b.ResetTimer() + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + n, err := encoder.WriteStateSSZ(context.Background(), io.Discard, state) + if err != nil { + b.Fatal(err) + } + + if n == 0 { + b.Fatal("no bytes written") + } + } + }) +} + +// BenchmarkStateSizeSSZ benchmarks size calculation (no encoding). +func BenchmarkStateSizeSSZ(b *testing.B) { + encoder := NewEncoder(false, 0) + state := createTestState(10000) + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + size, err := encoder.StateSizeSSZ(state) + if err != nil { + b.Fatal(err) + } + + if size == 0 { + b.Fatal("zero size") + } + } +} diff --git a/pkg/beacon/store/block.go b/pkg/beacon/store/block.go index e3222fd..d5922f3 100644 --- a/pkg/beacon/store/block.go +++ b/pkg/beacon/store/block.go @@ -89,8 +89,9 @@ func (c *Block) cleanupBlock(block *spec.VersionedSignedBeaconBlock) error { return err } - c.slotToBlockRoot.Delete(eth.SlotAsString(slot)) - c.stateRootToBlockRoot.Delete(eth.RootAsString(stateRoot)) + // Keys must match the types used in Add(): slot (phase0.Slot) and stateRoot (phase0.Root) + c.slotToBlockRoot.Delete(slot) + c.stateRootToBlockRoot.Delete(stateRoot) return nil } diff --git a/pkg/beacon/store/block_test.go b/pkg/beacon/store/block_test.go new file mode 100644 index 0000000..613f534 --- /dev/null +++ b/pkg/beacon/store/block_test.go @@ -0,0 +1,311 @@ +package store + +import ( + "fmt" + "sync/atomic" + "testing" + "time" + + "github.com/attestantio/go-eth2-client/spec" + "github.com/attestantio/go-eth2-client/spec/altair" + "github.com/attestantio/go-eth2-client/spec/bellatrix" + "github.com/attestantio/go-eth2-client/spec/capella" + "github.com/attestantio/go-eth2-client/spec/deneb" + "github.com/attestantio/go-eth2-client/spec/phase0" + "github.com/holiman/uint256" + "github.com/prysmaticlabs/go-bitfield" + "github.com/sirupsen/logrus/hooks/test" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// benchNamespaceCounter ensures unique namespaces for benchmarks to avoid Prometheus metric conflicts. +var benchNamespaceCounter atomic.Uint64 + +func uniqueNamespace(prefix string) string { + return fmt.Sprintf("%s_%d", prefix, benchNamespaceCounter.Add(1)) +} + +func createTestBlock(slot phase0.Slot) *spec.VersionedSignedBeaconBlock { + return &spec.VersionedSignedBeaconBlock{ + Version: spec.DataVersionDeneb, + Deneb: &deneb.SignedBeaconBlock{ + Message: &deneb.BeaconBlock{ + Slot: slot, + ProposerIndex: phase0.ValidatorIndex(slot % 1000), + ParentRoot: phase0.Root{byte(slot)}, + StateRoot: phase0.Root{byte(slot), byte(slot >> 8)}, + Body: &deneb.BeaconBlockBody{ + RANDAOReveal: phase0.BLSSignature{}, + ETH1Data: &phase0.ETH1Data{}, + Graffiti: [32]byte{}, + ProposerSlashings: []*phase0.ProposerSlashing{}, + AttesterSlashings: []*phase0.AttesterSlashing{}, + Attestations: []*phase0.Attestation{}, + Deposits: []*phase0.Deposit{}, + VoluntaryExits: []*phase0.SignedVoluntaryExit{}, + SyncAggregate: &altair.SyncAggregate{ + SyncCommitteeBits: bitfield.NewBitvector512(), + SyncCommitteeSignature: phase0.BLSSignature{}, + }, + ExecutionPayload: &deneb.ExecutionPayload{ + ParentHash: phase0.Hash32{}, + FeeRecipient: [20]byte{}, + StateRoot: phase0.Root{}, + ReceiptsRoot: phase0.Root{}, + LogsBloom: [256]byte{}, + PrevRandao: [32]byte{}, + BlockNumber: uint64(slot), + GasLimit: 30000000, + GasUsed: 15000000, + Timestamp: uint64(slot) * 12, + ExtraData: []byte{}, + BaseFeePerGas: uint256.NewInt(0), + BlockHash: phase0.Hash32{byte(slot)}, + Transactions: []bellatrix.Transaction{}, + Withdrawals: []*capella.Withdrawal{}, + BlobGasUsed: 0, + ExcessBlobGas: 0, + }, + BLSToExecutionChanges: []*capella.SignedBLSToExecutionChange{}, + BlobKZGCommitments: []deneb.KZGCommitment{}, + }, + }, + Signature: phase0.BLSSignature{}, + }, + } +} + +func TestBlockAddAndGet(t *testing.T) { + logger, _ := test.NewNullLogger() + config := Config{MaxItems: 10} + namespace := "test_block_a" + blockStore := NewBlock(logger, config, namespace) + + slot := phase0.Slot(100) + block := createTestBlock(slot) + expiresAt := time.Now().Add(10 * time.Minute) + + // Get the root for this block + root := phase0.Root{0x01, 0x02, 0x03} + + err := blockStore.Add(root, block, expiresAt) + require.NoError(t, err) + + // Get by root + retrievedBlock, err := blockStore.GetByRoot(root) + require.NoError(t, err) + require.NotNil(t, retrievedBlock) + + retrievedSlot, err := retrievedBlock.Slot() + require.NoError(t, err) + assert.Equal(t, slot, retrievedSlot) + + // Get by slot + retrievedBlock2, err := blockStore.GetBySlot(slot) + require.NoError(t, err) + require.NotNil(t, retrievedBlock2) +} + +func TestBlockGetBySlotNotFound(t *testing.T) { + logger, _ := test.NewNullLogger() + config := Config{MaxItems: 10} + namespace := "test_block_b" + blockStore := NewBlock(logger, config, namespace) + + slot := phase0.Slot(200) + + retrievedBlock, err := blockStore.GetBySlot(slot) + assert.Error(t, err) + assert.Nil(t, retrievedBlock) +} + +func TestBlockCleanup(t *testing.T) { + logger, _ := test.NewNullLogger() + config := Config{MaxItems: 3} + namespace := "test_block_cleanup" + blockStore := NewBlock(logger, config, namespace) + + // Add 4 blocks (max is 3, so first should be evicted) + for i := 0; i < 4; i++ { + slot := phase0.Slot(100 + i) //nolint:gosec // test code with bounded values + block := createTestBlock(slot) + root := phase0.Root{byte(i)} + expiresAt := time.Now().Add(time.Duration(i+1) * time.Minute) + + err := blockStore.Add(root, block, expiresAt) + require.NoError(t, err) + } + + // Give the eviction callback time to run + time.Sleep(100 * time.Millisecond) + + // The first block (slot 100) should have been evicted (closest to expiry) + _, err := blockStore.GetBySlot(phase0.Slot(100)) + assert.Error(t, err, "slot 100 should have been evicted") + + // Slots 101, 102, 103 should still exist + for i := 1; i < 4; i++ { + _, err := blockStore.GetBySlot(phase0.Slot(100 + i)) //nolint:gosec // test code with bounded values + assert.NoError(t, err, "slot %d should still exist", 100+i) + } +} + +// BenchmarkBlockAdd benchmarks adding blocks to the store. +func BenchmarkBlockAdd(b *testing.B) { + logger, _ := test.NewNullLogger() + config := Config{MaxItems: 1000} + blockStore := NewBlock(logger, config, uniqueNamespace("bench_block_add")) + + blocks := make([]*spec.VersionedSignedBeaconBlock, b.N) + roots := make([]phase0.Root, b.N) + + for i := 0; i < b.N; i++ { + blocks[i] = createTestBlock(phase0.Slot(i)) //nolint:gosec // test code with bounded values + roots[i] = phase0.Root{byte(i), byte(i >> 8), byte(i >> 16)} + } + + expiresAt := time.Now().Add(1 * time.Hour) + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + err := blockStore.Add(roots[i], blocks[i], expiresAt) + if err != nil { + b.Fatal(err) + } + } +} + +// BenchmarkBlockGetBySlot benchmarks slot lookups. +func BenchmarkBlockGetBySlot(b *testing.B) { + logger, _ := test.NewNullLogger() + config := Config{MaxItems: 1000} + blockStore := NewBlock(logger, config, uniqueNamespace("bench_block_get_slot")) + + // Pre-populate with blocks + numBlocks := 100 + + for i := 0; i < numBlocks; i++ { + block := createTestBlock(phase0.Slot(i)) //nolint:gosec // test code with bounded values + root := phase0.Root{byte(i), byte(i >> 8)} + expiresAt := time.Now().Add(1 * time.Hour) + + err := blockStore.Add(root, block, expiresAt) + if err != nil { + b.Fatal(err) + } + } + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + slot := phase0.Slot(i % numBlocks) //nolint:gosec // test code with bounded values + + _, err := blockStore.GetBySlot(slot) + if err != nil { + b.Fatal(err) + } + } +} + +// BenchmarkBlockGetByRoot benchmarks root lookups. +func BenchmarkBlockGetByRoot(b *testing.B) { + logger, _ := test.NewNullLogger() + config := Config{MaxItems: 1000} + blockStore := NewBlock(logger, config, uniqueNamespace("bench_block_get_root")) + + // Pre-populate with blocks + numBlocks := 100 + roots := make([]phase0.Root, numBlocks) + + for i := 0; i < numBlocks; i++ { + block := createTestBlock(phase0.Slot(i)) //nolint:gosec // test code with bounded values + roots[i] = phase0.Root{byte(i), byte(i >> 8)} + expiresAt := time.Now().Add(1 * time.Hour) + + err := blockStore.Add(roots[i], block, expiresAt) + if err != nil { + b.Fatal(err) + } + } + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + root := roots[i%numBlocks] + + _, err := blockStore.GetByRoot(root) + if err != nil { + b.Fatal(err) + } + } +} + +// BenchmarkBlockEviction benchmarks the eviction behavior when cache is full. +func BenchmarkBlockEviction(b *testing.B) { + logger, _ := test.NewNullLogger() + config := Config{MaxItems: 10} // Small cache to force evictions + blockStore := NewBlock(logger, config, uniqueNamespace("bench_block_evict")) + + blocks := make([]*spec.VersionedSignedBeaconBlock, b.N) + roots := make([]phase0.Root, b.N) + + for i := 0; i < b.N; i++ { + blocks[i] = createTestBlock(phase0.Slot(i)) //nolint:gosec // test code with bounded values + roots[i] = phase0.Root{byte(i), byte(i >> 8), byte(i >> 16)} + } + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + // Each add after the 10th will trigger eviction + expiresAt := time.Now().Add(time.Duration(i) * time.Millisecond) + + err := blockStore.Add(roots[i], blocks[i], expiresAt) + if err != nil { + b.Fatal(err) + } + } +} + +// BenchmarkBlockGetBySlot_Parallel benchmarks concurrent slot lookups. +func BenchmarkBlockGetBySlot_Parallel(b *testing.B) { + logger, _ := test.NewNullLogger() + config := Config{MaxItems: 1000} + blockStore := NewBlock(logger, config, uniqueNamespace("bench_block_parallel")) + + // Pre-populate with blocks + numBlocks := 100 + + for i := 0; i < numBlocks; i++ { + block := createTestBlock(phase0.Slot(i)) //nolint:gosec // test code with bounded values + root := phase0.Root{byte(i), byte(i >> 8)} + expiresAt := time.Now().Add(1 * time.Hour) + + err := blockStore.Add(root, block, expiresAt) + if err != nil { + b.Fatal(err) + } + } + + b.ReportAllocs() + b.ResetTimer() + + b.RunParallel(func(pb *testing.PB) { + i := 0 + for pb.Next() { + slot := phase0.Slot(i % numBlocks) //nolint:gosec // test code with bounded values + i++ + + _, err := blockStore.GetBySlot(slot) + if err != nil { + b.Fatal(err) + } + } + }) +} From 1cd542a734bf1b056df83fbab8b7043651d51774 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Tue, 9 Dec 2025 16:07:28 +1000 Subject: [PATCH 2/3] refactor(store): remove redundant comment about key types in block cleanup --- pkg/beacon/ssz/encoder.go | 2 +- pkg/beacon/store/block.go | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/beacon/ssz/encoder.go b/pkg/beacon/ssz/encoder.go index 67b49f6..814ab42 100644 --- a/pkg/beacon/ssz/encoder.go +++ b/pkg/beacon/ssz/encoder.go @@ -39,7 +39,7 @@ type Encoder struct { // NewEncoder creates a new SSZ encoder. // If memoryBudget is > 0, limits concurrent SSZ encoding to that many bytes. -// If memoryBudget is <= 0, no limit is applied (original behavior). +// If memoryBudget is <= 0, no limit is applied. func NewEncoder(customPreset bool, memoryBudget int64) *Encoder { var sem *semaphore.Weighted if memoryBudget > 0 { diff --git a/pkg/beacon/store/block.go b/pkg/beacon/store/block.go index d5922f3..bb45007 100644 --- a/pkg/beacon/store/block.go +++ b/pkg/beacon/store/block.go @@ -89,7 +89,6 @@ func (c *Block) cleanupBlock(block *spec.VersionedSignedBeaconBlock) error { return err } - // Keys must match the types used in Add(): slot (phase0.Slot) and stateRoot (phase0.Root) c.slotToBlockRoot.Delete(slot) c.stateRootToBlockRoot.Delete(stateRoot) From e775ffa8c4a6ee0f26057d0d461aab3bc25b600c Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Tue, 9 Dec 2025 16:14:11 +1000 Subject: [PATCH 3/3] fix(lint): address gosec G115 integer overflow warnings Add nolint directives for safe integer conversions where values are bounded by design (config limits, beacon chain constraints, blob indices). Add validation for negative slot values in NewSlotFromString. --- pkg/beacon/default.go | 1 + pkg/beacon/download.go | 2 ++ pkg/beacon/expire_test.go | 2 ++ pkg/eth/slot.go | 1 + pkg/service/eth/block_id.go | 4 ++++ pkg/service/eth/eth.go | 1 + 6 files changed, 11 insertions(+) diff --git a/pkg/beacon/default.go b/pkg/beacon/default.go index a6caef0..b446162 100644 --- a/pkg/beacon/default.go +++ b/pkg/beacon/default.go @@ -750,6 +750,7 @@ func (d *Default) ListFinalizedSlots(ctx context.Context) ([]phase0.Slot, error) latestSlot := phase0.Slot(uint64(finality.Finalized.Epoch) * uint64(sp.SlotsPerEpoch)) + //nolint:gosec // G115: HistoricalEpochCount is a small positive config value, safe to convert for i, val := uint64(latestSlot), uint64(latestSlot)-uint64(sp.SlotsPerEpoch)*uint64(d.config.HistoricalEpochCount); i > val; i -= uint64(sp.SlotsPerEpoch) { slots = append(slots, phase0.Slot(i)) } diff --git a/pkg/beacon/download.go b/pkg/beacon/download.go index 2c97dd6..04798ad 100644 --- a/pkg/beacon/download.go +++ b/pkg/beacon/download.go @@ -173,10 +173,12 @@ func (d *Default) fetchHistoricalCheckpoints(ctx context.Context, checkpoint *v1 // We'll derive the current finalized slot and then work back in intervals of SLOTS_PER_EPOCH. currentSlot := uint64(checkpoint.Finalized.Epoch) * uint64(sp.SlotsPerEpoch) for i := 1; i < d.config.HistoricalEpochCount; i++ { + //nolint:gosec // G115: i is a positive loop counter bounded by HistoricalEpochCount if uint64(i)*uint64(sp.SlotsPerEpoch) > currentSlot { break } + //nolint:gosec // G115: i is a positive loop counter bounded by HistoricalEpochCount slot := phase0.Slot(currentSlot - uint64(i)*uint64(sp.SlotsPerEpoch)) slotsInScope[slot] = struct{}{} diff --git a/pkg/beacon/expire_test.go b/pkg/beacon/expire_test.go index 48dc1e4..dc3111a 100644 --- a/pkg/beacon/expire_test.go +++ b/pkg/beacon/expire_test.go @@ -13,10 +13,12 @@ var ( ) func CalculateSlotExpiration(slot phase0.Slot, slotsOfHistory int) phase0.Slot { + //nolint:gosec // G115: slotsOfHistory is a small positive test value return slot + phase0.Slot(slotsOfHistory) } func GetSlotTime(slot phase0.Slot, secondsPerSlot time.Duration, genesis time.Time) time.Time { + //nolint:gosec // G115: slot values are bounded by beacon chain limits, safe for duration return genesis.Add(time.Duration(slot) * secondsPerSlot) } diff --git a/pkg/eth/slot.go b/pkg/eth/slot.go index bd15f41..2c004c4 100644 --- a/pkg/eth/slot.go +++ b/pkg/eth/slot.go @@ -14,6 +14,7 @@ type SlotTime struct { } func CalculateSlotTime(slot phase0.Slot, genesisTime time.Time, durationPerSlot time.Duration) SlotTime { + //nolint:gosec // G115: slot values are bounded by beacon chain limits, safe for duration slotStartTime := genesisTime.Add(time.Duration(slot) * durationPerSlot).UTC() return SlotTime{ diff --git a/pkg/service/eth/block_id.go b/pkg/service/eth/block_id.go index ebf05a5..1f31390 100644 --- a/pkg/service/eth/block_id.go +++ b/pkg/service/eth/block_id.go @@ -88,6 +88,10 @@ func NewSlotFromString(id string) (phase0.Slot, error) { return 0, err } + if slot < 0 { + return 0, errors.New("slot cannot be negative") + } + return phase0.Slot(slot), nil } diff --git a/pkg/service/eth/eth.go b/pkg/service/eth/eth.go index 12939a5..a1e80fa 100644 --- a/pkg/service/eth/eth.go +++ b/pkg/service/eth/eth.go @@ -572,6 +572,7 @@ func (h *Handler) BlobSidecars(ctx context.Context, blockID BlockIdentifier, ind // Find the sidecar with the given index for i, sidecar := range sidecars { + //nolint:gosec // G115: blob indices are small values (max 6), safe to convert if index == int(sidecar.Index) { filtered = append(filtered, sidecars[i])