Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion core/state/accessors.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,21 @@ func HasContract(r db.KeyValueReader, addr *felt.Felt) (bool, error) {
return r.Has(key)
}

func WriteContract(w db.KeyValueWriter, addr *felt.Felt, contract *stateContract) error {
func WriteContract(
w db.KeyValueWriter,
addr *felt.Felt,
nonce, classHash felt.Felt,
deployHeight uint64,
) error {
contract := stateContract{
Nonce: nonce,
ClassHash: classHash,
DeployedHeight: deployHeight,
}
return writeContract(w, addr, &contract)
}

func writeContract(w db.KeyValueWriter, addr *felt.Felt, contract *stateContract) error {
key := db.ContractKey(addr)
data, err := contract.MarshalBinary()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion core/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ func (s *State) flush(
return err
}
} else { // updated
if err := WriteContract(s.batch, &addr, obj.contract); err != nil {
if err := writeContract(s.batch, &addr, obj.contract); err != nil {
return err
}
}
Expand Down
49 changes: 49 additions & 0 deletions migration/headstate/committer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package headstate

import (
"github.com/NethermindEth/juno/db"
"github.com/NethermindEth/juno/migration/pipeline"
"github.com/NethermindEth/juno/migration/semaphore"
"github.com/NethermindEth/juno/utils/log"
"go.uber.org/zap"
)

type committer struct {
counter counter
logger log.StructuredLogger
batchSemaphore semaphore.ResourceSemaphore[db.Batch]
}

var _ pipeline.State[task, struct{}] = (*committer)(nil)

func newCommitter(
logger log.StructuredLogger,
batchSemaphore semaphore.ResourceSemaphore[db.Batch],
) *committer {
return &committer{
logger: logger,
counter: newCounter(logger, timeLogRate),
batchSemaphore: batchSemaphore,
}
}

func (c *committer) Run(_ int, t task, _ chan<- struct{}) error {
c.logger.Debug(
"writing batch",
zap.Int("completedAddrs", t.completedAddrs),
zap.Int("batchSize", t.batch.Size()),
)

byteSize := uint64(t.batch.Size())
if err := t.batch.Write(); err != nil {
return err
}

c.counter.log(byteSize, t.completedAddrs)
c.batchSemaphore.Put()
return nil
}

func (c *committer) Done(int, chan<- struct{}) error {
return nil
}
47 changes: 47 additions & 0 deletions migration/headstate/counter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package headstate

import (
"time"

"github.com/NethermindEth/juno/db"
"github.com/NethermindEth/juno/utils/log"
"go.uber.org/zap"
)

type counter struct {
logger log.StructuredLogger
timeLogRate time.Duration
start time.Time
size uint64
completedAddrs uint64
}

func newCounter(logger log.StructuredLogger, timeLogRate time.Duration) counter {
return counter{
logger: logger,
timeLogRate: timeLogRate,
start: time.Now(),
}
}

func (c *counter) log(byteSize uint64, completedAddrs int) {
c.size += byteSize
c.completedAddrs += uint64(completedAddrs)

now := time.Now()
elapsed := now.Sub(c.start).Seconds()
if elapsed > c.timeLogRate.Seconds() {
mbs := float64(c.size) / float64(db.Megabyte)
c.logger.Info(
"write speed",
zap.Float64("MB", mbs),
zap.Float64("MB/s", mbs/elapsed),
zap.Uint64("completedContracts", c.completedAddrs),
zap.Float64("completedContracts/s", float64(c.completedAddrs)/elapsed),
zap.Float64("time", elapsed),
)
c.start = now
c.size = 0
c.completedAddrs = 0
}
}
95 changes: 95 additions & 0 deletions migration/headstate/ingestor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package headstate

import (
"errors"
"fmt"

"github.com/NethermindEth/juno/core"
"github.com/NethermindEth/juno/core/felt"
"github.com/NethermindEth/juno/core/state"
"github.com/NethermindEth/juno/db"
"github.com/NethermindEth/juno/migration/pipeline"
"github.com/NethermindEth/juno/migration/semaphore"
)

type ingestor struct {
database db.KeyValueReader
batchSemaphore semaphore.ResourceSemaphore[db.Batch]
tasks []task
}

func newIngestor(
database db.KeyValueReader,
batchSemaphore semaphore.ResourceSemaphore[db.Batch],
) *ingestor {
tasks := make([]task, ingestorCount)
for i := range tasks {
tasks[i] = task{batch: batchSemaphore.GetBlocking()}
}
return &ingestor{
database: database,
batchSemaphore: batchSemaphore,
tasks: tasks,
}
}

var _ pipeline.State[felt.Address, task] = (*ingestor)(nil)

func (c *ingestor) Run(index int, addr felt.Address, outputs chan<- task) error {
t := &c.tasks[index]

sizeBefore := t.batch.Size()
if err := c.ingestAddress(t.batch, addr); err != nil {
return err
}
if t.batch.Size() > sizeBefore {
t.completedAddrs++
}

if t.batch.Size() >= targetBatchByteSize {
outputs <- task{batch: t.batch, completedAddrs: t.completedAddrs}
t.completedAddrs = 0
t.batch = c.batchSemaphore.GetBlocking()
}
return nil
}

func (c *ingestor) Done(index int, outputs chan<- task) error {
outputs <- c.tasks[index]
return nil
}

func (c *ingestor) ingestAddress(batch db.Batch, addr felt.Address) error {
addrFelt := felt.Felt(addr)

already, err := state.HasContract(c.database, &addrFelt)
if err != nil {
return fmt.Errorf("HasContract(%s): %w", &addrFelt, err)
}
if already {
return nil
}

classHash, err := core.GetContractClassHash(c.database, &addrFelt)
if err != nil {
return fmt.Errorf("GetContractClassHash(%s): %w", &addrFelt, err)
}

nonce, err := core.GetContractNonce(c.database, &addrFelt)
if err != nil {
if !errors.Is(err, db.ErrKeyNotFound) {
return fmt.Errorf("GetContractNonce(%s): %w", &addrFelt, err)
}
nonce = felt.Zero
}

height, err := core.GetContractDeploymentHeight(c.database, &addrFelt)
if err != nil {
return fmt.Errorf("GetContractDeploymentHeight(%s): %w", &addrFelt, err)
}

if err := state.WriteContract(batch, &addrFelt, nonce, classHash, height); err != nil {
return fmt.Errorf("WriteContract(%s): %w", &addrFelt, err)
}
return nil
}
163 changes: 163 additions & 0 deletions migration/headstate/migrator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package headstate

import (
"context"
"errors"
"fmt"
"iter"
"time"

"github.com/NethermindEth/juno/blockchain/networks"
"github.com/NethermindEth/juno/core/felt"
"github.com/NethermindEth/juno/db"
"github.com/NethermindEth/juno/db/dbutils"
"github.com/NethermindEth/juno/migration"
"github.com/NethermindEth/juno/migration/pipeline"
"github.com/NethermindEth/juno/migration/semaphore"
"github.com/NethermindEth/juno/utils/log"
)

const (
batchByteSize = 128 * db.Megabyte
targetBatchByteSize = 96 * db.Megabyte
ingestorCount = 4
timeLogRate = 5 * time.Second
)

type task struct {
batch db.Batch
completedAddrs int
}

var (
shouldRerun = []byte{}
shouldNotRerun = []byte(nil)
)

var _ migration.Migration = (*Migrator)(nil)

// Migrator consolidates the deprecated per-field contract layout into a
// single Contract record per address, written via state.WriteContract:
//
// ContractClassHash[addr]
// ContractNonce[addr]
// ContractDeploymentHeight[addr]
// │
// ▼
// Contract[addr] = { ClassHash, Nonce, DeployedHeight }
//
// StorageRoot is left zero — the running node lazily backfills it on the
// contract's first storage write.
//
// Each address discovered in the ContractClassHash bucket is processed by one
// of ingestorCount worker goroutines that read the three old fields into a
// shared db.Batch; a single committer drains batches to disk. Once every
// address has been migrated, the three deprecated buckets are wiped via
// DeleteRange.
//
// Re-run safe: an address whose Contract record already exists is skipped
// (via state.HasContract), and the trailing wipe re-issues DeleteRange over
// the (possibly already empty) ranges.
type Migrator struct{}

func (Migrator) Before([]byte) error {
return nil
}

func (Migrator) Migrate(
ctx context.Context,
database db.KeyValueStore,
_ *networks.Network,
logger log.StructuredLogger,
) ([]byte, error) {
addressesIter, sourceErr := pendingAddresses(database)
res := migrateAddresses(ctx, database, logger, addressesIter)

if err := errors.Join(sourceErr(), res.Err); err != nil {
return shouldRerun, err
}
if !res.IsDone {
if ctxErr := ctx.Err(); ctxErr != nil {
return shouldRerun, ctxErr
}
return shouldRerun, errors.New("headstate migration did not complete")
}

return shouldNotRerun, wipeDeprecatedBuckets(database)
}

func migrateAddresses(
ctx context.Context,
database db.KeyValueStore,
logger log.StructuredLogger,
addresses iter.Seq[felt.Address],
) pipeline.Result {
batchSemaphore := semaphore.New(
ingestorCount+1,
func() db.Batch {
return database.NewBatchWithSize(batchByteSize)
},
)

source := pipeline.Source(addresses)

ingestorPipeline := pipeline.New(
source,
ingestorCount,
newIngestor(database, batchSemaphore),
)

committerPipeline := pipeline.New(
ingestorPipeline,
1,
newCommitter(logger, batchSemaphore),
)

_, wait := committerPipeline.Run(ctx)
return wait()
}

func pendingAddresses(r db.KeyValueReader) (iter.Seq[felt.Address], func() error) {
var iterErr error
seq := func(yield func(felt.Address) bool) {
prefix := db.ContractClassHash.Key()
it, err := r.NewIterator(prefix, true)
if err != nil {
iterErr = err
return
}
defer it.Close()

for valid := it.First(); valid; valid = it.Next() {
key := it.Key()
if len(key) != len(prefix)+felt.Bytes {
iterErr = fmt.Errorf(
"malformed ContractClassHash key: len %d, want %d",
len(key),
len(prefix)+felt.Bytes,
)
return
}
f := felt.FromBytes[felt.Felt](key[len(prefix):])
if !yield(felt.Address(f)) {
return
}
}
}
return seq, func() error { return iterErr }
}

func wipeDeprecatedBuckets(database db.KeyValueStore) error {
for _, bucket := range []db.Bucket{
db.ContractClassHash,
db.ContractNonce,
db.ContractDeploymentHeight,
} {
start := bucket.Key()
end := dbutils.UpperBound(start)
if err := database.DeleteRange(start, end); err != nil {
return err
}
}
return nil
}
Loading
Loading