diff --git a/migration/state/common/committer.go b/migration/state/common/committer.go new file mode 100644 index 0000000000..bddccc4fda --- /dev/null +++ b/migration/state/common/committer.go @@ -0,0 +1,58 @@ +package common + +import ( + "github.com/NethermindEth/juno/db" + "github.com/NethermindEth/juno/migration/pipeline" + "github.com/NethermindEth/juno/migration/semaphore" + "github.com/NethermindEth/juno/utils/log" + "go.uber.org/zap" +) + +type Committer struct { + logger log.StructuredLogger + counter Counter + batchSemaphore semaphore.ResourceSemaphore[db.Batch] + phaseName string +} + +var _ pipeline.State[Task, struct{}] = (*Committer)(nil) + +func NewCommitter( + logger log.StructuredLogger, + batchSemaphore semaphore.ResourceSemaphore[db.Batch], + phaseName string, +) *Committer { + return &Committer{ + logger: logger, + counter: NewCounter(logger, TimeLogRate, phaseName), + batchSemaphore: batchSemaphore, + phaseName: phaseName, + } +} + +func (c *Committer) Run(_ int, t Task, _ chan<- struct{}) error { + defer c.batchSemaphore.Put() + + fields := make([]zap.Field, 0, 4) + if c.phaseName != "" { + fields = append(fields, zap.String("phase", c.phaseName)) + } + fields = append(fields, + zap.Int("completedAddrs", t.CompletedAddrs), + zap.Int("entryCount", t.EntryCount), + zap.Int("batchSize", t.Batch.Size()), + ) + c.logger.Debug("writing batch", fields...) + + byteSize := uint64(t.Batch.Size()) + if err := t.Batch.Write(); err != nil { + return err + } + + c.counter.Log(byteSize, t.CompletedAddrs, t.EntryCount) + return nil +} + +func (c *Committer) Done(int, chan<- struct{}) error { + return nil +} diff --git a/migration/state/common/constants.go b/migration/state/common/constants.go new file mode 100644 index 0000000000..0f79d398b8 --- /dev/null +++ b/migration/state/common/constants.go @@ -0,0 +1,14 @@ +package common + +import ( + "time" + + "github.com/NethermindEth/juno/db" +) + +const ( + BatchByteSize = 128 * db.Megabyte + TargetBatchByteSize = 96 * db.Megabyte + IngestorCount = 4 + TimeLogRate = 5 * time.Second +) diff --git a/migration/state/common/counter.go b/migration/state/common/counter.go new file mode 100644 index 0000000000..7c2c36516a --- /dev/null +++ b/migration/state/common/counter.go @@ -0,0 +1,67 @@ +package common + +import ( + "math" + "time" + + "github.com/NethermindEth/juno/db" + "github.com/NethermindEth/juno/utils/log" + "go.uber.org/zap" +) + +type Counter struct { + logger log.StructuredLogger + timeLogRate time.Duration + phaseName string + start time.Time + size uint64 + completedAddrs uint64 + entryCount uint64 +} + +func NewCounter(logger log.StructuredLogger, timeLogRate time.Duration, phaseName string) Counter { + if zl, ok := logger.(*log.ZapLogger); ok { + logger = zl.WithOptions(zap.AddCallerSkip(1)) + } + return Counter{ + logger: logger, + timeLogRate: timeLogRate, + phaseName: phaseName, + start: time.Now(), + } +} + +func (c *Counter) Log(byteSize uint64, completedAddrs, entryCount int) { + c.size += byteSize + c.completedAddrs += uint64(completedAddrs) + c.entryCount += uint64(entryCount) + + const cent = 100 + + now := time.Now() + elapsed := now.Sub(c.start).Seconds() + if elapsed <= c.timeLogRate.Seconds() { + return + } + + mbs := float64(c.size) / float64(db.Megabyte) + fields := make([]zap.Field, 0, 8) + if c.phaseName != "" { + fields = append(fields, zap.String("phase", c.phaseName)) + } + fields = append(fields, + zap.Float64("MB", math.Round(mbs*cent)/cent), + zap.Float64("MB/s", math.Round(mbs/elapsed*cent)/cent), + zap.Uint64("completedContracts", c.completedAddrs), + zap.Float64("completedContracts/s", float64(c.completedAddrs)/elapsed), + zap.Uint64("entries", c.entryCount), + zap.Float64("entries/s", float64(c.entryCount)/elapsed), + zap.Float64("time", elapsed), + ) + c.logger.Info("write speed", fields...) + + c.start = now + c.size = 0 + c.completedAddrs = 0 + c.entryCount = 0 +} diff --git a/migration/state/headstate/counter_test.go b/migration/state/common/counter_test.go similarity index 71% rename from migration/state/headstate/counter_test.go rename to migration/state/common/counter_test.go index 31e9ff6804..c5b3e24e0a 100644 --- a/migration/state/headstate/counter_test.go +++ b/migration/state/common/counter_test.go @@ -1,4 +1,4 @@ -package headstate +package common import ( "math" @@ -20,8 +20,8 @@ func TestCounter_DoesNotLogBeforeTimeLogRate(t *testing.T) { core, recorded := observer.New(zapcore.InfoLevel) logger := log.NewZapLoggerWithCore(core) - c := newCounter(logger, time.Hour) - c.log(uint64(db.Megabyte), 5) + c := NewCounter(logger, time.Hour, "") + c.Log(uint64(db.Megabyte), 5, 0) assert.Zero(t, recorded.Len()) } @@ -30,7 +30,7 @@ func TestCounter_LogRoundsMBAndAttributesCaller(t *testing.T) { core, recorded := observer.New(zapcore.InfoLevel) logger := log.NewZapLoggerWithCore(core) - c := newCounter(logger, time.Millisecond) + c := NewCounter(logger, time.Millisecond, "") // Force elapsed > timeLogRate without sleeping. c.start = time.Now().Add(-time.Second) @@ -38,7 +38,7 @@ func TestCounter_LogRoundsMBAndAttributesCaller(t *testing.T) { c.size = bytes c.completedAddrs = 4242 - c.log(0, 0) + c.Log(0, 0, 0) require.Equal(t, 1, recorded.Len()) entry := recorded.All()[0] @@ -48,7 +48,7 @@ func TestCounter_LogRoundsMBAndAttributesCaller(t *testing.T) { require.True(t, entry.Caller.Defined, "caller must be captured") assert.Equal(t, "counter_test.go", filepath.Base(entry.Caller.File), - "log must be attributed to caller of counter.log, not counter.go itself") + "log must be attributed to caller of Counter.Log, not counter.go itself") mb := fields["MB"].(float64) mbPerS := fields["MB/s"].(float64) @@ -61,6 +61,22 @@ func TestCounter_LogRoundsMBAndAttributesCaller(t *testing.T) { assertTwoDecimalsOrFewer(t, "MB/s", mbPerS) assert.Equal(t, uint64(4242), fields["completedContracts"]) + + _, hasPhase := fields["phase"] + assert.False(t, hasPhase, "empty phaseName must be omitted from the log entry") +} + +func TestCounter_LogIncludesPhaseWhenSet(t *testing.T) { + core, recorded := observer.New(zapcore.InfoLevel) + logger := log.NewZapLoggerWithCore(core) + + c := NewCounter(logger, time.Millisecond, "class-hash") + c.start = time.Now().Add(-time.Second) + c.Log(0, 0, 0) + + require.Equal(t, 1, recorded.Len()) + fields := recorded.All()[0].ContextMap() + assert.Equal(t, "class-hash", fields["phase"]) } func assertTwoDecimalsOrFewer(t *testing.T, name string, v float64) { diff --git a/migration/state/common/ingestor.go b/migration/state/common/ingestor.go new file mode 100644 index 0000000000..7eae2289dc --- /dev/null +++ b/migration/state/common/ingestor.go @@ -0,0 +1,63 @@ +package common + +import ( + "context" + + "github.com/NethermindEth/juno/db" + "github.com/NethermindEth/juno/migration/semaphore" +) + +type BaseIngestor struct { + Database db.KeyValueReader + Tasks []Task + ctx context.Context + batchSemaphore semaphore.ResourceSemaphore[db.Batch] +} + +// NewBaseIngestor pre-allocates one batch per ingestor slot. The semaphore is +// created with capacity IngestorCount+1 immediately before this call, so the +// acquires cannot block — using GetBlocking keeps the constructor signature +// error-free. +func NewBaseIngestor( + ctx context.Context, + sem semaphore.ResourceSemaphore[db.Batch], + database db.KeyValueReader, +) BaseIngestor { + tasks := make([]Task, IngestorCount) + for i := range tasks { + tasks[i] = Task{Batch: sem.GetBlocking()} + } + return BaseIngestor{ + Database: database, + Tasks: tasks, + ctx: ctx, + batchSemaphore: sem, + } +} + +// Flush emits the current task downstream when its batch hits target size and +// acquires a fresh batch. The ctx-aware select on the channel send is the +// snappy cancellation point. The semaphore acquire uses GetBlocking — it is +// guaranteed to unblock within one committer iteration because the committer's +// deferred Put always runs. +func (b *BaseIngestor) Flush(t *Task, outputs chan<- Task) error { + if t.Batch.Size() < TargetBatchByteSize { + return nil + } + select { + case <-b.ctx.Done(): + return b.ctx.Err() + case outputs <- *t: + } + *t = Task{Batch: b.batchSemaphore.GetBlocking()} + return nil +} + +func (b *BaseIngestor) Done(index int, outputs chan<- Task) error { + select { + case <-b.ctx.Done(): + return b.ctx.Err() + case outputs <- b.Tasks[index]: + } + return nil +} diff --git a/migration/state/common/task.go b/migration/state/common/task.go new file mode 100644 index 0000000000..6641feb8da --- /dev/null +++ b/migration/state/common/task.go @@ -0,0 +1,9 @@ +package common + +import "github.com/NethermindEth/juno/db" + +type Task struct { + Batch db.Batch + CompletedAddrs int + EntryCount int +} diff --git a/migration/state/headstate/committer.go b/migration/state/headstate/committer.go deleted file mode 100644 index 333e07d6c3..0000000000 --- a/migration/state/headstate/committer.go +++ /dev/null @@ -1,49 +0,0 @@ -package headstate - -import ( - "github.com/NethermindEth/juno/db" - "github.com/NethermindEth/juno/migration/pipeline" - "github.com/NethermindEth/juno/migration/semaphore" - "github.com/NethermindEth/juno/utils/log" - "go.uber.org/zap" -) - -type committer struct { - counter counter - logger log.StructuredLogger - batchSemaphore semaphore.ResourceSemaphore[db.Batch] -} - -var _ pipeline.State[task, struct{}] = (*committer)(nil) - -func newCommitter( - logger log.StructuredLogger, - batchSemaphore semaphore.ResourceSemaphore[db.Batch], -) *committer { - return &committer{ - logger: logger, - counter: newCounter(logger, timeLogRate), - batchSemaphore: batchSemaphore, - } -} - -func (c *committer) Run(_ int, task task, _ chan<- struct{}) error { - c.logger.Debug( - "writing batch", - zap.Int("completed addresses", task.completedAddrs), - zap.Int("batch size", task.batch.Size()), - ) - - byteSize := uint64(task.batch.Size()) - if err := task.batch.Write(); err != nil { - return err - } - - c.counter.log(byteSize, task.completedAddrs) - c.batchSemaphore.Put() - return nil -} - -func (c *committer) Done(int, chan<- struct{}) error { - return nil -} diff --git a/migration/state/headstate/counter.go b/migration/state/headstate/counter.go deleted file mode 100644 index a696df618d..0000000000 --- a/migration/state/headstate/counter.go +++ /dev/null @@ -1,54 +0,0 @@ -package headstate - -import ( - "math" - "time" - - "github.com/NethermindEth/juno/db" - "github.com/NethermindEth/juno/utils/log" - "go.uber.org/zap" -) - -type counter struct { - logger log.StructuredLogger - timeLogRate time.Duration - start time.Time - size uint64 - completedAddrs uint64 -} - -func newCounter(logger log.StructuredLogger, timeLogRate time.Duration) counter { - if zl, ok := logger.(*log.ZapLogger); ok { - logger = zl.WithOptions(zap.AddCallerSkip(1)) - } - return counter{ - logger: logger, - timeLogRate: timeLogRate, - start: time.Now(), - } -} - -func (c *counter) log(byteSize uint64, completedAddrs int) { - c.size += byteSize - c.completedAddrs += uint64(completedAddrs) - - // to keep the floats with only 2 digits - const cent = 100 - - now := time.Now() - elapsed := now.Sub(c.start).Seconds() - if elapsed > c.timeLogRate.Seconds() { - mbs := float64(c.size) / float64(db.Megabyte) - c.logger.Info( - "write speed", - zap.Float64("MB", math.Round(mbs*cent)/cent), - zap.Float64("MB/s", math.Round(mbs/elapsed*cent)/cent), - zap.Uint64("completedContracts", c.completedAddrs), - zap.Float64("completedContracts/s", float64(c.completedAddrs)/elapsed), - zap.Float64("time", elapsed), - ) - c.start = now - c.size = 0 - c.completedAddrs = 0 - } -} diff --git a/migration/state/headstate/ingestor.go b/migration/state/headstate/ingestor.go index 5b1db9103e..ca8ece7da9 100644 --- a/migration/state/headstate/ingestor.go +++ b/migration/state/headstate/ingestor.go @@ -1,6 +1,7 @@ package headstate import ( + "context" "errors" "fmt" @@ -10,64 +11,41 @@ import ( "github.com/NethermindEth/juno/db" "github.com/NethermindEth/juno/migration/pipeline" "github.com/NethermindEth/juno/migration/semaphore" + "github.com/NethermindEth/juno/migration/state/common" ) -type task struct { - batch db.Batch - completedAddrs int -} - type ingestor struct { - database db.KeyValueReader - batchSemaphore semaphore.ResourceSemaphore[db.Batch] - tasks []task + common.BaseIngestor } +var _ pipeline.State[felt.Address, common.Task] = (*ingestor)(nil) + func newIngestor( + ctx context.Context, + sem semaphore.ResourceSemaphore[db.Batch], database db.KeyValueReader, - batchSemaphore semaphore.ResourceSemaphore[db.Batch], ) *ingestor { - tasks := make([]task, ingestorCount) - for i := range tasks { - tasks[i] = task{batch: batchSemaphore.GetBlocking()} - } - return &ingestor{ - database: database, - batchSemaphore: batchSemaphore, - tasks: tasks, - } + return &ingestor{BaseIngestor: common.NewBaseIngestor(ctx, sem, database)} } -var _ pipeline.State[felt.Address, task] = (*ingestor)(nil) - -func (c *ingestor) Run(index int, addr felt.Address, outputs chan<- task) error { - curTask := &c.tasks[index] +func (c *ingestor) Run(index int, addr felt.Address, outputs chan<- common.Task) error { + curTask := &c.Tasks[index] - sizeBefore := curTask.batch.Size() - if err := c.ingestAddress(curTask.batch, &addr); err != nil { + sizeBefore := curTask.Batch.Size() + if err := c.ingestAddress(curTask.Batch, &addr); err != nil { return err } - if curTask.batch.Size() > sizeBefore { - curTask.completedAddrs++ + if curTask.Batch.Size() > sizeBefore { + curTask.CompletedAddrs++ } - if curTask.batch.Size() >= targetBatchByteSize { - outputs <- task{batch: curTask.batch, completedAddrs: curTask.completedAddrs} - curTask.completedAddrs = 0 - curTask.batch = c.batchSemaphore.GetBlocking() - } - return nil -} - -func (c *ingestor) Done(index int, outputs chan<- task) error { - outputs <- c.tasks[index] - return nil + return c.Flush(curTask, outputs) } func (c *ingestor) ingestAddress(batch db.Batch, addr *felt.Address) error { addrFelt := (*felt.Felt)(addr) - already, err := state.HasContract(c.database, addrFelt) + already, err := state.HasContract(c.Database, addrFelt) if err != nil { return fmt.Errorf("HasContract(%s): %w", addr, err) } @@ -75,12 +53,12 @@ func (c *ingestor) ingestAddress(batch db.Batch, addr *felt.Address) error { return nil } - classHash, err := core.GetContractClassHash(c.database, addrFelt) + classHash, err := core.GetContractClassHash(c.Database, addrFelt) if err != nil { return fmt.Errorf("GetContractClassHash(%s): %w", addr, err) } - nonce, err := core.GetContractNonce(c.database, addrFelt) + nonce, err := core.GetContractNonce(c.Database, addrFelt) if err != nil { if !errors.Is(err, db.ErrKeyNotFound) { return fmt.Errorf("GetContractNonce(%s): %w", addr, err) @@ -88,7 +66,7 @@ func (c *ingestor) ingestAddress(batch db.Batch, addr *felt.Address) error { nonce = felt.Zero } - height, err := core.GetContractDeploymentHeight(c.database, addrFelt) + height, err := core.GetContractDeploymentHeight(c.Database, addrFelt) if err != nil { return fmt.Errorf("GetContractDeploymentHeight(%s): %w", addr, err) } diff --git a/migration/state/headstate/migrator.go b/migration/state/headstate/migrator.go index 2a2e200f30..c73df0a7c8 100644 --- a/migration/state/headstate/migrator.go +++ b/migration/state/headstate/migrator.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "iter" - "time" "github.com/NethermindEth/juno/blockchain/networks" "github.com/NethermindEth/juno/core/felt" @@ -14,16 +13,10 @@ import ( "github.com/NethermindEth/juno/migration" "github.com/NethermindEth/juno/migration/pipeline" "github.com/NethermindEth/juno/migration/semaphore" + "github.com/NethermindEth/juno/migration/state/common" "github.com/NethermindEth/juno/utils/log" ) -const ( - batchByteSize = 128 * db.Megabyte - targetBatchByteSize = 96 * db.Megabyte - ingestorCount = 4 - timeLogRate = 5 * time.Second -) - var ( shouldRerun = []byte{} shouldNotRerun = []byte(nil) @@ -45,9 +38,9 @@ var _ migration.Migration = (*Migrator)(nil) // contract's first storage write. // // Each address discovered in the ContractClassHash bucket is processed by one -// of ingestorCount worker goroutines that read the three old fields into a -// shared db.Batch; a single committer drains batches to disk. Once every -// address has been migrated, the three deprecated buckets are wiped via +// of common.IngestorCount worker goroutines that read the three old fields +// into a shared db.Batch; a single committer drains batches to disk. Once +// every address has been migrated, the three deprecated buckets are wiped via // DeleteRange. // // Re-run safe: an address whose Contract record already exists is skipped @@ -88,9 +81,9 @@ func migrateAddresses( addresses iter.Seq[felt.Address], ) pipeline.Result { batchSemaphore := semaphore.New( - ingestorCount+1, + common.IngestorCount+1, func() db.Batch { - return database.NewBatchWithSize(batchByteSize) + return database.NewBatchWithSize(common.BatchByteSize) }, ) @@ -98,14 +91,14 @@ func migrateAddresses( ingestorPipeline := pipeline.New( source, - ingestorCount, - newIngestor(database, batchSemaphore), + common.IngestorCount, + newIngestor(ctx, batchSemaphore, database), ) committerPipeline := pipeline.New( ingestorPipeline, 1, - newCommitter(logger, batchSemaphore), + common.NewCommitter(logger, batchSemaphore, ""), ) _, wait := committerPipeline.Run(ctx) diff --git a/migration/state/history/class_hash_ingestor.go b/migration/state/history/class_hash_ingestor.go new file mode 100644 index 0000000000..c502574631 --- /dev/null +++ b/migration/state/history/class_hash_ingestor.go @@ -0,0 +1,163 @@ +package history + +import ( + "context" + "fmt" + + "github.com/NethermindEth/juno/core/felt" + "github.com/NethermindEth/juno/core/state" + "github.com/NethermindEth/juno/db" + "github.com/NethermindEth/juno/db/dbutils" + "github.com/NethermindEth/juno/migration/pipeline" + "github.com/NethermindEth/juno/migration/semaphore" + "github.com/NethermindEth/juno/migration/state/common" +) + +type classHashIngestor struct { + common.BaseIngestor +} + +var _ pipeline.State[*felt.Felt, common.Task] = (*classHashIngestor)(nil) + +func newClassHashIngestor( + ctx context.Context, + sem semaphore.ResourceSemaphore[db.Batch], + database db.KeyValueReader, +) *classHashIngestor { + return &classHashIngestor{BaseIngestor: common.NewBaseIngestor(ctx, sem, database)} +} + +// Run migrates the class-hash history of a single contract. +// +// Legend: Bₙ = block at which the n-th class-hash *replacement* happened. +// Vₙ = the class hash active *after* Bₙ; V₀ is the deploy-time hash. The +// deprecated layout writes nothing at deploy: each entry is written only +// on a *Replace*, and the value stored is the hash that was active before +// that replace. So deprecated[B₁] = V₀ even though no replace happened at +// deploy_h itself. The new layout adds an explicit deploy entry and shifts +// everything else by one slot: +// +// block │ deprecated │ new +// ─────────┼────────────────┼────── +// deploy_h │ — │ V₀ ← inserted from first deprecated entry +// B₁ │ V₀ │ V₁ +// B₂ │ V₁ │ V₂ +// B₃ │ V₂ │ V₃ +// ─────────┼────────────────┼────── +// > B₃ │ contract │ V₃ (last entry — self-contained) +// .ClassHash ← deprecated must reach into the Contract +// record for any block past the last replace +// +// If the deprecated history is empty (no replaces ever), the single deploy +// entry is written with contract.ClassHash directly. Deprecated rows are +// deleted at the end of the run. Resume-safe: empty-deprecated + existing +// deploy entry → no-op. +func (i *classHashIngestor) Run(index int, addr *felt.Felt, outputs chan<- common.Task) error { + t := &i.Tasks[index] + + deprecatedPrefix := db.DeprecatedContractClassHashHistoryKey(addr) + contract, err := state.GetContract(i.Database, addr) + if err != nil { + return fmt.Errorf("class-hash: GetContract(%s): %w", addr, err) + } + + depIt, err := i.Database.NewIterator(deprecatedPrefix, true) + if err != nil { + return fmt.Errorf("class-hash: open deprecated iter(%s): %w", addr, err) + } + defer depIt.Close() + + if !depIt.First() { + return i.writeDeployOnly(t, outputs, addr, contract.DeployedHeight, &contract.ClassHash) + } + return i.writeShiftedHistory( + t, outputs, depIt, deprecatedPrefix, addr, + contract.DeployedHeight, &contract.ClassHash, + ) +} + +// writeDeployOnly handles the "no deprecated history" branch: write the +// deploy-time entry from contract.ClassHash, unless a previous run already +// wrote it. +func (i *classHashIngestor) writeDeployOnly( + t *common.Task, + outputs chan<- common.Task, + addr *felt.Felt, + deployHeight uint64, + classHash *felt.Felt, +) error { + deployKey := db.ContractClassHashHistoryAtBlockKey(addr, deployHeight) + deployEntryExists, err := i.Database.Has(deployKey) + if err != nil { + return fmt.Errorf("class-hash: Has(deploy entry): %w", err) + } + if deployEntryExists { + return nil + } + if err := state.WriteClassHashHistory(t.Batch, addr, deployHeight, classHash); err != nil { + return err + } + t.CompletedAddrs++ + t.EntryCount++ + return i.Flush(t, outputs) +} + +// writeShiftedHistory handles the "non-empty deprecated history" branch: +// writes the deploy entry from the first deprecated value, shifts each +// deprecated entry into the new layout using the next entry's pre-value +// (or contract.ClassHash for the last), and deletes the deprecated rows. +// depIt must be positioned at the first deprecated entry. +func (i *classHashIngestor) writeShiftedHistory( + t *common.Task, + outputs chan<- common.Task, + depIt db.Iterator, + prefix []byte, + addr *felt.Felt, + deployHeight uint64, + headClassHash *felt.Felt, +) error { + rawValue, err := depIt.Value() + if err != nil { + return fmt.Errorf("class-hash: read first value(%s): %w", addr, err) + } + deployClassHash := felt.FromBytes[felt.Felt](rawValue) + if err := state.WriteClassHashHistory(t.Batch, addr, deployHeight, &deployClassHash); err != nil { + return err + } + t.EntryCount++ + if err := i.Flush(t, outputs); err != nil { + return err + } + + for { + block, err := parseBlockKey(depIt.Key(), prefix) + if err != nil { + return fmt.Errorf("class-hash(%s): %w", addr, err) + } + hasNext := depIt.Next() + historyValue := *headClassHash + if hasNext { + rawValue, err := depIt.Value() + if err != nil { + return fmt.Errorf("class-hash(%s): %w", addr, err) + } + historyValue = felt.FromBytes[felt.Felt](rawValue) + } + if err := state.WriteClassHashHistory(t.Batch, addr, block, &historyValue); err != nil { + return err + } + t.EntryCount++ + if err := i.Flush(t, outputs); err != nil { + return err + } + if !hasNext { + break + } + } + + if err := t.Batch.DeleteRange(prefix, dbutils.UpperBound(prefix)); err != nil { + return fmt.Errorf("class-hash: DeleteRange deprecated(%s): %w", addr, err) + } + t.CompletedAddrs++ + return nil +} diff --git a/migration/state/history/migrator.go b/migration/state/history/migrator.go new file mode 100644 index 0000000000..aec86b9ddc --- /dev/null +++ b/migration/state/history/migrator.go @@ -0,0 +1,170 @@ +package history + +import ( + "context" + "errors" + "fmt" + "iter" + + "github.com/NethermindEth/juno/blockchain/networks" + "github.com/NethermindEth/juno/core/felt" + "github.com/NethermindEth/juno/db" + "github.com/NethermindEth/juno/migration" + "github.com/NethermindEth/juno/migration/pipeline" + "github.com/NethermindEth/juno/migration/semaphore" + "github.com/NethermindEth/juno/migration/state/common" + "github.com/NethermindEth/juno/utils/log" +) + +var ( + shouldRerun = []byte{} + shouldNotRerun = []byte(nil) +) + +var _ migration.Migration = (*Migrator)(nil) + +// Migrator rewrites the contract history layout so each entry stores the +// post-update value at its block, instead of the pre-update value. +// +// Example — a contract whose class hash was 0xAA at deploy (block 100), +// changed to 0xBB at block 200, then to 0xCC at block 500: +// +// block │ old layout (pre-value) │ new layout (post-value) +// ──────┼────────────────────────┼───────────────────────── +// 100 │ (no entry) │ 0xAA ← explicit deploy +// 200 │ 0xAA │ 0xBB +// 500 │ 0xBB │ 0xCC +// head │ 0xCC (contract record) │ (read from history) +// +// The same shape change applies to nonces and per-slot storage. The +// migrator runs three phases (class-hash, nonce, storage); each phase +// iterates the Contract bucket and rewrites one contract's deprecated +// entries at a time, deleting them in the same batch. +// +// Crash / cancellation safety: pebble batches commit atomically, so the +// writes inside any single committed batch are durable as a unit. A +// contract whose history is large may span more than one batch — but each +// new entry's value is a pure function of the deprecated source data, so +// re-running over an already-partially-rewritten contract overwrites with +// identical values and then deletes the (still-present) deprecated rows. +// Contracts whose deprecated entries are already gone short-circuit on an +// empty iterator. The three phases run sequentially: a later phase only +// starts after the earlier phase completes. +type Migrator struct{} + +func (Migrator) Before([]byte) error { return nil } + +func (Migrator) Migrate( + ctx context.Context, + database db.KeyValueStore, + _ *networks.Network, + logger log.StructuredLogger, +) ([]byte, error) { + if err := runClassHashPhase(ctx, database, logger); err != nil { + return shouldRerun, err + } + if err := runNoncePhase(ctx, database, logger); err != nil { + return shouldRerun, err + } + if err := runStoragePhase(ctx, database, logger); err != nil { + return shouldRerun, err + } + + return shouldNotRerun, nil +} + +func runClassHashPhase( + ctx context.Context, + database db.KeyValueStore, + logger log.StructuredLogger, +) error { + sem, src, sourceErr := setupBeforePhase(database) + ing := newClassHashIngestor(ctx, sem, database) + return runPipeline(ctx, "class-hash", src, ing, logger, sem, sourceErr) +} + +func runNoncePhase( + ctx context.Context, + database db.KeyValueStore, + logger log.StructuredLogger, +) error { + sem, src, sourceErr := setupBeforePhase(database) + ing := newNonceIngestor(ctx, sem, database) + return runPipeline(ctx, "nonce", src, ing, logger, sem, sourceErr) +} + +func runStoragePhase( + ctx context.Context, + database db.KeyValueStore, + logger log.StructuredLogger, +) error { + sem, src, sourceErr := setupBeforePhase(database) + ing := newStorageIngestor(ctx, sem, database) + return runPipeline(ctx, "storage", src, ing, logger, sem, sourceErr) +} + +func setupBeforePhase( + database db.KeyValueStore, +) (semaphore.ResourceSemaphore[db.Batch], pipeline.Pipeline[*felt.Felt], func() error) { + sem := semaphore.New(common.IngestorCount+1, func() db.Batch { + return database.NewBatchWithSize(common.BatchByteSize) + }) + seq, sourceErr := addressSeq(database) + return sem, pipeline.Source(seq), sourceErr +} + +func runPipeline( + ctx context.Context, + name string, + src pipeline.Pipeline[*felt.Felt], + ing pipeline.State[*felt.Felt, common.Task], + logger log.StructuredLogger, + sem semaphore.ResourceSemaphore[db.Batch], + sourceErr func() error, +) error { + ingestors := pipeline.New(src, common.IngestorCount, ing) + committers := pipeline.New(ingestors, 1, common.NewCommitter(logger, sem, name)) + + _, wait := committers.Run(ctx) + res := wait() + + if err := errors.Join(sourceErr(), res.Err); err != nil { + return fmt.Errorf("%s: %w", name, err) + } + if !res.IsDone { + if ctxErr := ctx.Err(); ctxErr != nil { + return fmt.Errorf("%s: %w", name, ctxErr) + } + return fmt.Errorf("%s phase did not complete", name) + } + return nil +} + +func addressSeq(r db.KeyValueReader) (iter.Seq[*felt.Felt], func() error) { + var iterErr error + seq := func(yield func(*felt.Felt) bool) { + prefix := db.Contract.Key() + it, err := r.NewIterator(prefix, true) + if err != nil { + iterErr = err + return + } + defer it.Close() + for valid := it.First(); valid; valid = it.Next() { + key := it.Key() + if len(key) != len(prefix)+felt.Bytes { + iterErr = fmt.Errorf( + "malformed Contract key: len %d, want %d", + len(key), + len(prefix)+felt.Bytes, + ) + return + } + f := felt.FromBytes[felt.Felt](key[len(prefix):]) + if !yield(&f) { + return + } + } + } + return seq, func() error { return iterErr } +} diff --git a/migration/state/history/migrator_test.go b/migration/state/history/migrator_test.go new file mode 100644 index 0000000000..f125230a5d --- /dev/null +++ b/migration/state/history/migrator_test.go @@ -0,0 +1,762 @@ +package history_test + +import ( + "context" + "testing" + + "github.com/NethermindEth/juno/blockchain/networks" + "github.com/NethermindEth/juno/core" + "github.com/NethermindEth/juno/core/deprecatedstate" + "github.com/NethermindEth/juno/core/felt" + "github.com/NethermindEth/juno/core/state" + "github.com/NethermindEth/juno/core/trie" + "github.com/NethermindEth/juno/core/trie2/triedb" + "github.com/NethermindEth/juno/db" + "github.com/NethermindEth/juno/db/memory" + "github.com/NethermindEth/juno/migration/state/history" + "github.com/NethermindEth/juno/utils/log" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func seedContract( + t *testing.T, + memDB db.KeyValueStore, + addr felt.Felt, + nonce, classHash felt.Felt, +) { + t.Helper() + require.NoError(t, state.WriteContract(memDB, &addr, nonce, classHash, 100)) +} + +func seedDeprecatedClassHashHistory( + t *testing.T, + w db.KeyValueWriter, + addr felt.Felt, + block uint64, + oldValue felt.Felt, +) { + t.Helper() + require.NoError(t, core.WriteDeprecatedContractClassHashHistory(w, &addr, &oldValue, block)) +} + +func seedDeprecatedNonceHistory( + t *testing.T, + w db.KeyValueWriter, + addr felt.Felt, + block uint64, + oldValue felt.Felt, +) { + t.Helper() + require.NoError(t, core.WriteDeprecatedContractNonceHistory(w, &addr, &oldValue, block)) +} + +func seedDeprecatedStorageHistory( + t *testing.T, + w db.KeyValueWriter, + addr, slot felt.Felt, + block uint64, + oldValue felt.Felt, +) { + t.Helper() + require.NoError(t, core.WriteDeprecatedContractStorageHistory(w, &addr, &slot, &oldValue, block)) +} + +func seedDeprecatedStorageTrie( + t *testing.T, + memDB db.KeyValueStore, + addr felt.Felt, + leaves map[felt.Felt]felt.Felt, +) { + t.Helper() + //nolint:staticcheck // Necessary for old state + txn := memDB.NewIndexedBatch() + tr, err := trie.NewTriePedersen( + txn, + db.ContractStorage.Key(addr.Marshal()), + deprecatedstate.ContractStorageTrieHeight, + ) + require.NoError(t, err) + for k, v := range leaves { + _, err := tr.Put(&k, &v) + require.NoError(t, err) + } + require.NoError(t, tr.Commit()) + require.NoError(t, txn.Write()) +} + +func bucketKeyCount(t *testing.T, r db.KeyValueReader, bucket db.Bucket) int { + t.Helper() + it, err := r.NewIterator(bucket.Key(), true) + require.NoError(t, err) + defer it.Close() + count := 0 + for valid := it.First(); valid; valid = it.Next() { + count++ + } + return count +} + +func TestMigrate_EmptyDB(t *testing.T) { + memDB := memory.New() + t.Cleanup(func() { memDB.Close() }) + + res, err := history.Migrator{}.Migrate( + context.Background(), + memDB, + &networks.Sepolia, + log.NewNopZapLogger(), + ) + require.NoError(t, err) + require.Nil(t, res) +} + +func TestMigrate_ClassHash_DeployOnly(t *testing.T) { + memDB := memory.New() + t.Cleanup(func() { memDB.Close() }) + + addr := felt.FromUint64[felt.Felt](1) + classHash := felt.FromUint64[felt.Felt](170) + seedContract(t, memDB, addr, felt.Zero, classHash) + + res, err := history.Migrator{}.Migrate( + context.Background(), + memDB, + &networks.Sepolia, + log.NewNopZapLogger(), + ) + require.NoError(t, err) + require.Nil(t, res) + + reader, err := state.NewStateReader(&felt.Zero, state.NewStateDB(memDB, triedb.New(memDB, nil))) + require.NoError(t, err) + got, err := reader.ContractClassHashAt(&addr, 100) + require.NoError(t, err) + assert.Equal(t, classHash, got) + + assert.Equal( + t, + 0, + bucketKeyCount(t, memDB, db.DeprecatedContractClassHashHistory), + "deprecated must be empty", + ) +} + +func TestMigrate_ClassHash_Reclassed(t *testing.T) { + memDB := memory.New() + t.Cleanup(func() { memDB.Close() }) + + addr := felt.FromUint64[felt.Felt](1) + deployClass := felt.FromUint64[felt.Felt](170) + replacedClass := felt.FromUint64[felt.Felt](187) + deployHeight := uint64(100) + replaceBlock := uint64(300) + + seedContract(t, memDB, addr, felt.Zero, replacedClass) + seedDeprecatedClassHashHistory(t, memDB, addr, replaceBlock, deployClass) + + res, err := history.Migrator{}.Migrate( + context.Background(), + memDB, + &networks.Sepolia, + log.NewNopZapLogger(), + ) + require.NoError(t, err) + require.Nil(t, res) + + reader, err := state.NewStateReader(&felt.Zero, state.NewStateDB(memDB, triedb.New(memDB, nil))) + require.NoError(t, err) + + got, err := reader.ContractClassHashAt(&addr, deployHeight) + require.NoError(t, err) + assert.Equal(t, deployClass, got, "deploy entry preserved") + + got, err = reader.ContractClassHashAt(&addr, replaceBlock) + require.NoError(t, err) + assert.Equal(t, replacedClass, got, "replace block has post-update value (head)") + + assert.Equal( + t, + 0, + bucketKeyCount(t, memDB, db.DeprecatedContractClassHashHistory), + "deprecated must be empty", + ) +} + +func TestMigrate_Nonce_Updated(t *testing.T) { + memDB := memory.New() + t.Cleanup(func() { memDB.Close() }) + + addr := felt.FromUint64[felt.Felt](1) + classHash := felt.FromUint64[felt.Felt](170) + headNonce := felt.FromUint64[felt.Felt](66) + + seedContract(t, memDB, addr, headNonce, classHash) + seedDeprecatedNonceHistory(t, memDB, addr, 200, felt.Zero) + seedDeprecatedNonceHistory(t, memDB, addr, 300, felt.FromUint64[felt.Felt](1)) + + res, err := history.Migrator{}.Migrate( + context.Background(), + memDB, + &networks.Sepolia, + log.NewNopZapLogger(), + ) + require.NoError(t, err) + require.Nil(t, res) + + reader, err := state.NewStateReader(&felt.Zero, state.NewStateDB(memDB, triedb.New(memDB, nil))) + require.NoError(t, err) + + got, err := reader.ContractNonceAt(&addr, 200) + require.NoError(t, err) + assert.Equal( + t, + felt.FromUint64[felt.Felt](1), + got, + "value installed at 200 = next entry's old value", + ) + + got, err = reader.ContractNonceAt(&addr, 300) + require.NoError(t, err) + assert.Equal(t, headNonce, got, "value installed at 300 = head") + + assert.Equal( + t, + 0, + bucketKeyCount(t, memDB, db.DeprecatedContractNonceHistory), + "deprecated must be empty", + ) +} + +func TestMigrate_Nonce_DeployOnly(t *testing.T) { + memDB := memory.New() + t.Cleanup(func() { memDB.Close() }) + + addr := felt.FromUint64[felt.Felt](1) + seedContract(t, memDB, addr, felt.Zero, felt.FromUint64[felt.Felt](170)) + + res, err := history.Migrator{}.Migrate( + context.Background(), + memDB, + &networks.Sepolia, + log.NewNopZapLogger(), + ) + require.NoError(t, err) + require.Nil(t, res) + + assert.Equal( + t, + 0, + bucketKeyCount(t, memDB, db.ContractNonceHistory), + "no nonce entry expected when never updated", + ) +} + +func TestMigrate_Storage_MultiWrite(t *testing.T) { + memDB := memory.New() + t.Cleanup(func() { memDB.Close() }) + + addr := felt.FromUint64[felt.Felt](1) + slot := felt.FromUint64[felt.Felt](170) + firstVal := felt.FromUint64[felt.Felt](5) + secondVal := felt.FromUint64[felt.Felt](12) + headVal := felt.FromUint64[felt.Felt](7) + + seedContract(t, memDB, addr, felt.Zero, felt.FromUint64[felt.Felt](187)) + + seedDeprecatedStorageHistory(t, memDB, addr, slot, 100, felt.Zero) + seedDeprecatedStorageHistory(t, memDB, addr, slot, 200, firstVal) + seedDeprecatedStorageHistory(t, memDB, addr, slot, 300, secondVal) + + seedDeprecatedStorageTrie(t, memDB, addr, map[felt.Felt]felt.Felt{slot: headVal}) + + res, err := history.Migrator{}.Migrate( + context.Background(), + memDB, + &networks.Sepolia, + log.NewNopZapLogger(), + ) + require.NoError(t, err) + require.Nil(t, res) + + reader, err := state.NewStateReader(&felt.Zero, state.NewStateDB(memDB, triedb.New(memDB, nil))) + require.NoError(t, err) + + got, err := reader.ContractStorageAt(&addr, &slot, 100) + require.NoError(t, err) + assert.Equal(t, firstVal, got, "value installed at 100") + + got, err = reader.ContractStorageAt(&addr, &slot, 200) + require.NoError(t, err) + assert.Equal(t, secondVal, got, "value installed at 200") + + got, err = reader.ContractStorageAt(&addr, &slot, 300) + require.NoError(t, err) + assert.Equal(t, headVal, got, "value installed at 300 = head from trie") + + assert.Equal( + t, + 0, + bucketKeyCount(t, memDB, db.DeprecatedContractStorageHistory), + "deprecated must be empty", + ) +} + +func TestMigrate_Storage_SingleWrite(t *testing.T) { + memDB := memory.New() + t.Cleanup(func() { memDB.Close() }) + + addr := felt.FromUint64[felt.Felt](1) + slot := felt.FromUint64[felt.Felt](170) + v := felt.FromUint64[felt.Felt](9) + + seedContract(t, memDB, addr, felt.Zero, felt.FromUint64[felt.Felt](187)) + seedDeprecatedStorageHistory(t, memDB, addr, slot, 100, felt.Zero) + seedDeprecatedStorageTrie(t, memDB, addr, map[felt.Felt]felt.Felt{slot: v}) + + res, err := history.Migrator{}.Migrate( + context.Background(), + memDB, + &networks.Sepolia, + log.NewNopZapLogger(), + ) + require.NoError(t, err) + require.Nil(t, res) + + reader, err := state.NewStateReader(&felt.Zero, state.NewStateDB(memDB, triedb.New(memDB, nil))) + require.NoError(t, err) + + got, err := reader.ContractStorageAt(&addr, &slot, 100) + require.NoError(t, err) + assert.Equal(t, v, got) + + assert.Equal(t, 0, bucketKeyCount(t, memDB, db.DeprecatedContractStorageHistory)) +} + +func TestMigrate_Idempotent(t *testing.T) { + memDB := memory.New() + t.Cleanup(func() { memDB.Close() }) + + addr := felt.FromUint64[felt.Felt](1) + deployClass := felt.FromUint64[felt.Felt](170) + replacedClass := felt.FromUint64[felt.Felt](187) + + seedContract(t, memDB, addr, felt.FromUint64[felt.Felt](66), replacedClass) + seedDeprecatedClassHashHistory(t, memDB, addr, 300, deployClass) + seedDeprecatedNonceHistory(t, memDB, addr, 200, felt.Zero) + + for range 3 { + res, err := history.Migrator{}.Migrate( + context.Background(), + memDB, + &networks.Sepolia, + log.NewNopZapLogger(), + ) + require.NoError(t, err) + require.Nil(t, res) + } + + reader, err := state.NewStateReader(&felt.Zero, state.NewStateDB(memDB, triedb.New(memDB, nil))) + require.NoError(t, err) + + got, err := reader.ContractClassHashAt(&addr, 100) + require.NoError(t, err) + assert.Equal(t, deployClass, got) + + got, err = reader.ContractClassHashAt(&addr, 300) + require.NoError(t, err) + assert.Equal(t, replacedClass, got) + + got, err = reader.ContractNonceAt(&addr, 200) + require.NoError(t, err) + assert.Equal(t, felt.FromUint64[felt.Felt](66), got) +} + +func TestMigrate_ClassHash_ResumeFromPartial(t *testing.T) { + memDB := memory.New() + t.Cleanup(func() { memDB.Close() }) + + addr := felt.FromUint64[felt.Felt](1) + deployClass := felt.FromUint64[felt.Felt](170) + replacedClass := felt.FromUint64[felt.Felt](187) + deployHeight := uint64(100) + replaceBlock := uint64(300) + + seedContract(t, memDB, addr, felt.Zero, replacedClass) + seedDeprecatedClassHashHistory(t, memDB, addr, replaceBlock, deployClass) + + require.NoError(t, state.WriteClassHashHistory(memDB, &addr, deployHeight, &deployClass)) + + res, err := history.Migrator{}.Migrate( + context.Background(), + memDB, + &networks.Sepolia, + log.NewNopZapLogger(), + ) + require.NoError(t, err) + require.Nil(t, res) + + reader, err := state.NewStateReader(&felt.Zero, state.NewStateDB(memDB, triedb.New(memDB, nil))) + require.NoError(t, err) + + got, err := reader.ContractClassHashAt(&addr, deployHeight) + require.NoError(t, err) + assert.Equal(t, deployClass, got, "deploy entry must survive partial-resume re-run") + + got, err = reader.ContractClassHashAt(&addr, replaceBlock) + require.NoError(t, err) + assert.Equal(t, replacedClass, got, "shifted entry at replace block") + + assert.Equal( + t, + 0, + bucketKeyCount(t, memDB, db.DeprecatedContractClassHashHistory), + "deprecated must be empty after migration completes", + ) +} + +func TestMigrate_Storage_ZeroedSlotHasNoLeaf(t *testing.T) { + memDB := memory.New() + t.Cleanup(func() { memDB.Close() }) + + addr := felt.FromUint64[felt.Felt](1) + zeroedSlot := felt.FromUint64[felt.Felt](170) + keptSlot := felt.FromUint64[felt.Felt](187) + keptHead := felt.FromUint64[felt.Felt](9) + + seedContract(t, memDB, addr, felt.Zero, felt.FromUint64[felt.Felt](204)) + + seedDeprecatedStorageHistory(t, memDB, addr, zeroedSlot, 100, felt.Zero) + seedDeprecatedStorageHistory(t, memDB, addr, zeroedSlot, 200, felt.FromUint64[felt.Felt](5)) + seedDeprecatedStorageHistory(t, memDB, addr, keptSlot, 100, felt.Zero) + + seedDeprecatedStorageTrie(t, memDB, addr, map[felt.Felt]felt.Felt{keptSlot: keptHead}) + + res, err := history.Migrator{}.Migrate( + context.Background(), + memDB, + &networks.Sepolia, + log.NewNopZapLogger(), + ) + require.NoError(t, err) + require.Nil(t, res) + + reader, err := state.NewStateReader(&felt.Zero, state.NewStateDB(memDB, triedb.New(memDB, nil))) + require.NoError(t, err) + + got, err := reader.ContractStorageAt(&addr, &keptSlot, 100) + require.NoError(t, err) + assert.Equal(t, keptHead, got, "kept slot last entry = head") + + got, err = reader.ContractStorageAt(&addr, &zeroedSlot, 200) + require.NoError(t, err) + assert.Equal(t, felt.Zero, got, "zeroed slot last entry = Zero (no leaf in trie)") + + got, err = reader.ContractStorageAt(&addr, &zeroedSlot, 100) + require.NoError(t, err) + assert.Equal(t, felt.FromUint64[felt.Felt](5), got, "shift-up at block 100 = next-entry's value") + + assert.Equal( + t, + 0, + bucketKeyCount(t, memDB, db.DeprecatedContractStorageHistory), + "deprecated fully drained", + ) +} + +func TestMigrate_Storage_ManyEntries(t *testing.T) { + memDB := memory.New() + t.Cleanup(func() { memDB.Close() }) + + addr := felt.FromUint64[felt.Felt](1) + headValues := map[felt.Felt]felt.Felt{} + + seedContract(t, memDB, addr, felt.Zero, felt.FromUint64[felt.Felt](187)) + + const ( + numSlots = 50 + numEntriesPerSlot = 20 + startBlock = uint64(100) + ) + + for s := uint64(1); s <= numSlots; s++ { + slot := felt.NewFromUint64[felt.Felt](s) + headVal := felt.NewFromUint64[felt.Felt](1000000 + s) + headValues[*slot] = *headVal + + for b := range uint64(numEntriesPerSlot) { + block := startBlock + b + oldVal := felt.Zero + if b > 0 { + oldVal = *felt.NewFromUint64[felt.Felt](s*10000 + b) + } + seedDeprecatedStorageHistory(t, memDB, addr, *slot, block, oldVal) + } + } + seedDeprecatedStorageTrie(t, memDB, addr, headValues) + + res, err := history.Migrator{}.Migrate( + context.Background(), + memDB, + &networks.Sepolia, + log.NewNopZapLogger(), + ) + require.NoError(t, err) + require.Nil(t, res) + + assert.Equal( + t, + 0, + bucketKeyCount(t, memDB, db.DeprecatedContractStorageHistory), + "deprecated storage history must be fully drained", + ) + + reader, err := state.NewStateReader(&felt.Zero, state.NewStateDB(memDB, triedb.New(memDB, nil))) + require.NoError(t, err) + + for slot, head := range headValues { + lastBlock := startBlock + numEntriesPerSlot - 1 + got, err := reader.ContractStorageAt(&addr, &slot, lastBlock) + require.NoErrorf(t, err, "read storage failed for slot %v", &slot) + assert.Equalf(t, head, got, "last entry must equal head for slot %v", &slot) + } +} + +func TestMigrate_Storage_MultiAddress(t *testing.T) { + memDB := memory.New() + t.Cleanup(func() { memDB.Close() }) + + addrs := []felt.Felt{ + felt.FromUint64[felt.Felt](1), + felt.FromUint64[felt.Felt](2), + felt.FromUint64[felt.Felt](3), + } + slots := []felt.Felt{ + felt.FromUint64[felt.Felt](100), + felt.FromUint64[felt.Felt](200), + } + + for i := range addrs { + seedContract(t, memDB, addrs[i], felt.Zero, felt.FromUint64[felt.Felt](uint64(170+i))) + for _, slot := range slots { + seedDeprecatedStorageHistory(t, memDB, addrs[i], slot, 100, felt.Zero) + seedDeprecatedStorageHistory( + t, memDB, addrs[i], slot, 200, + felt.FromUint64[felt.Felt](uint64(10+i)), + ) + } + seedDeprecatedStorageTrie(t, memDB, addrs[i], map[felt.Felt]felt.Felt{ + slots[0]: felt.FromUint64[felt.Felt](uint64(1000 + i*10)), + slots[1]: felt.FromUint64[felt.Felt](uint64(1000 + i*10 + 1)), + }) + } + + res, err := history.Migrator{}.Migrate( + context.Background(), + memDB, + &networks.Sepolia, + log.NewNopZapLogger(), + ) + require.NoError(t, err) + require.Nil(t, res) + + reader, err := state.NewStateReader(&felt.Zero, state.NewStateDB(memDB, triedb.New(memDB, nil))) + require.NoError(t, err) + + for i := range addrs { + for j, slot := range slots { + got, err := reader.ContractStorageAt(&addrs[i], &slot, 100) + require.NoErrorf(t, err, "addr %d slot %d block 100", i, j) + assert.Equalf( + t, felt.FromUint64[felt.Felt](uint64(10+i)), got, + "addr %d slot %d block 100 = next-entry's value", i, j, + ) + got, err = reader.ContractStorageAt(&addrs[i], &slot, 200) + require.NoErrorf(t, err, "addr %d slot %d block 200", i, j) + assert.Equalf( + t, felt.FromUint64[felt.Felt](uint64(1000+i*10+j)), got, + "addr %d slot %d block 200 = head from trie", i, j, + ) + } + } + + assert.Zero( + t, + bucketKeyCount(t, memDB, db.DeprecatedContractStorageHistory), + "deprecated must be empty across all addresses", + ) +} + +func TestMigrate_CancelledContext_ResumesCleanly(t *testing.T) { + memDB := memory.New() + t.Cleanup(func() { memDB.Close() }) + + addr := felt.FromUint64[felt.Felt](1) + seedContract(t, memDB, addr, felt.Zero, felt.FromUint64[felt.Felt](170)) + seedDeprecatedClassHashHistory(t, memDB, addr, 200, felt.FromUint64[felt.Felt](42)) + seedDeprecatedNonceHistory(t, memDB, addr, 200, felt.Zero) + slot := felt.FromUint64[felt.Felt](5) + seedDeprecatedStorageHistory(t, memDB, addr, slot, 200, felt.Zero) + seedDeprecatedStorageTrie(t, memDB, addr, map[felt.Felt]felt.Felt{ + slot: felt.FromUint64[felt.Felt](9), + }) + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + res, err := history.Migrator{}.Migrate(ctx, memDB, &networks.Sepolia, log.NewNopZapLogger()) + require.Error(t, err) + require.ErrorIs(t, err, context.Canceled) + require.NotNil(t, res, "shouldRerun sentinel must not be nil") + require.Empty(t, res, "shouldRerun is a non-nil empty slice") + + res, err = history.Migrator{}.Migrate( + context.Background(), + memDB, + &networks.Sepolia, + log.NewNopZapLogger(), + ) + require.NoError(t, err) + require.Nil(t, res) + + assert.Zero(t, bucketKeyCount(t, memDB, db.DeprecatedContractClassHashHistory)) + assert.Zero(t, bucketKeyCount(t, memDB, db.DeprecatedContractNonceHistory)) + assert.Zero(t, bucketKeyCount(t, memDB, db.DeprecatedContractStorageHistory)) +} + +func TestMigrate_Storage_ResumeFromPartial(t *testing.T) { + memDB := memory.New() + t.Cleanup(func() { memDB.Close() }) + + addr := felt.FromUint64[felt.Felt](1) + slot := felt.FromUint64[felt.Felt](170) + firstVal := felt.FromUint64[felt.Felt](5) + secondVal := felt.FromUint64[felt.Felt](12) + headVal := felt.FromUint64[felt.Felt](7) + + seedContract(t, memDB, addr, felt.Zero, felt.FromUint64[felt.Felt](187)) + + seedDeprecatedStorageHistory(t, memDB, addr, slot, 100, felt.Zero) + seedDeprecatedStorageHistory(t, memDB, addr, slot, 200, firstVal) + seedDeprecatedStorageHistory(t, memDB, addr, slot, 300, secondVal) + seedDeprecatedStorageTrie(t, memDB, addr, map[felt.Felt]felt.Felt{slot: headVal}) + + require.NoError(t, state.WriteStorageHistory(memDB, &addr, &slot, 100, &firstVal)) + + res, err := history.Migrator{}.Migrate( + context.Background(), + memDB, + &networks.Sepolia, + log.NewNopZapLogger(), + ) + require.NoError(t, err) + require.Nil(t, res) + + reader, err := state.NewStateReader(&felt.Zero, state.NewStateDB(memDB, triedb.New(memDB, nil))) + require.NoError(t, err) + + got, err := reader.ContractStorageAt(&addr, &slot, 100) + require.NoError(t, err) + assert.Equal(t, firstVal, got, "preserved deploy entry after partial-resume") + + got, err = reader.ContractStorageAt(&addr, &slot, 200) + require.NoError(t, err) + assert.Equal(t, secondVal, got) + + got, err = reader.ContractStorageAt(&addr, &slot, 300) + require.NoError(t, err) + assert.Equal(t, headVal, got) + + assert.Zero(t, bucketKeyCount(t, memDB, db.DeprecatedContractStorageHistory)) +} + +func TestMigrate_AddressWithEmptyHistoryForOnePhase(t *testing.T) { + memDB := memory.New() + t.Cleanup(func() { memDB.Close() }) + + addr := felt.FromUint64[felt.Felt](1) + classHash := felt.FromUint64[felt.Felt](170) + deployClass := felt.FromUint64[felt.Felt](42) + + seedContract(t, memDB, addr, felt.Zero, classHash) + seedDeprecatedClassHashHistory(t, memDB, addr, 300, deployClass) + + res, err := history.Migrator{}.Migrate( + context.Background(), + memDB, + &networks.Sepolia, + log.NewNopZapLogger(), + ) + require.NoError(t, err) + require.Nil(t, res) + + reader, err := state.NewStateReader(&felt.Zero, state.NewStateDB(memDB, triedb.New(memDB, nil))) + require.NoError(t, err) + + got, err := reader.ContractClassHashAt(&addr, 100) + require.NoError(t, err) + assert.Equal(t, deployClass, got) + + got, err = reader.ContractClassHashAt(&addr, 300) + require.NoError(t, err) + assert.Equal(t, classHash, got) + + assert.Zero( + t, bucketKeyCount(t, memDB, db.ContractNonceHistory), + "nonce history empty when phase no-ops", + ) + assert.Zero( + t, bucketKeyCount(t, memDB, db.ContractStorageHistory), + "storage history empty when phase no-ops", + ) + assert.Zero(t, bucketKeyCount(t, memDB, db.DeprecatedContractClassHashHistory)) +} + +func TestMigrate_Storage_InterleavedZeroedSlots(t *testing.T) { + memDB := memory.New() + t.Cleanup(func() { memDB.Close() }) + + addr := felt.FromUint64[felt.Felt](1) + slot1 := felt.FromUint64[felt.Felt](100) // zeroed + slot2 := felt.FromUint64[felt.Felt](200) // kept + slot3 := felt.FromUint64[felt.Felt](300) // zeroed + slot4 := felt.FromUint64[felt.Felt](400) // kept + head2 := felt.FromUint64[felt.Felt](22) + head4 := felt.FromUint64[felt.Felt](44) + + seedContract(t, memDB, addr, felt.Zero, felt.FromUint64[felt.Felt](204)) + + for _, slot := range []felt.Felt{slot1, slot2, slot3, slot4} { + seedDeprecatedStorageHistory(t, memDB, addr, slot, 100, felt.Zero) + } + seedDeprecatedStorageTrie(t, memDB, addr, map[felt.Felt]felt.Felt{ + slot2: head2, + slot4: head4, + }) + + res, err := history.Migrator{}.Migrate( + context.Background(), + memDB, + &networks.Sepolia, + log.NewNopZapLogger(), + ) + require.NoError(t, err) + require.Nil(t, res) + + reader, err := state.NewStateReader(&felt.Zero, state.NewStateDB(memDB, triedb.New(memDB, nil))) + require.NoError(t, err) + + for _, slot := range []felt.Felt{slot1, slot3} { + got, err := reader.ContractStorageAt(&addr, &slot, 100) + require.NoErrorf(t, err, "zeroed slot %v", &slot) + assert.Equalf(t, felt.Zero, got, "zeroed slot %v has no trie leaf", &slot) + } + got, err := reader.ContractStorageAt(&addr, &slot2, 100) + require.NoError(t, err) + assert.Equal(t, head2, got, "slot2 kept its head value") + got, err = reader.ContractStorageAt(&addr, &slot4, 100) + require.NoError(t, err) + assert.Equal(t, head4, got, "slot4 kept its head value") + + assert.Zero(t, bucketKeyCount(t, memDB, db.DeprecatedContractStorageHistory)) +} diff --git a/migration/state/history/nonce_ingestor.go b/migration/state/history/nonce_ingestor.go new file mode 100644 index 0000000000..3ca8adf610 --- /dev/null +++ b/migration/state/history/nonce_ingestor.go @@ -0,0 +1,100 @@ +package history + +import ( + "context" + "fmt" + + "github.com/NethermindEth/juno/core/felt" + "github.com/NethermindEth/juno/core/state" + "github.com/NethermindEth/juno/db" + "github.com/NethermindEth/juno/db/dbutils" + "github.com/NethermindEth/juno/migration/pipeline" + "github.com/NethermindEth/juno/migration/semaphore" + "github.com/NethermindEth/juno/migration/state/common" +) + +type nonceIngestor struct { + common.BaseIngestor +} + +var _ pipeline.State[*felt.Felt, common.Task] = (*nonceIngestor)(nil) + +func newNonceIngestor( + ctx context.Context, + sem semaphore.ResourceSemaphore[db.Batch], + database db.KeyValueReader, +) *nonceIngestor { + return &nonceIngestor{BaseIngestor: common.NewBaseIngestor(ctx, sem, database)} +} + +// Run migrates the nonce history of a single contract. +// +// Legend: Bₙ = block at which the n-th nonce change happened. Nₙ = the +// nonce active *after* Bₙ; the deploy nonce is always 0 and is *not* +// written to the deprecated history — its presence is implicit in the +// pre-value of the first change entry. The new layout stores the same +// number of entries, just shifted to post-values: +// +// block │ deprecated │ new +// ───────┼────────────────┼────── +// B₁ │ 0 │ N₁ +// B₂ │ N₁ │ N₂ +// B₃ │ N₂ │ N₃ +// ───────┼────────────────┼────── +// > B₃ │ contract │ N₃ (last entry — self-contained) +// .Nonce ← deprecated must reach into the Contract +// record for any block past the last change +// +// Contracts with no deprecated nonce history are skipped. Deprecated rows +// are deleted at the end of the run. +func (i *nonceIngestor) Run(index int, addr *felt.Felt, outputs chan<- common.Task) error { + t := &i.Tasks[index] + deprecatedPrefix := db.DeprecatedContractNonceHistoryKey(addr) + + depIt, err := i.Database.NewIterator(deprecatedPrefix, true) + if err != nil { + return fmt.Errorf("nonce: open deprecated iter(%s): %w", addr, err) + } + defer depIt.Close() + if !depIt.First() { + return nil + } + + contract, err := state.GetContract(i.Database, addr) + if err != nil { + return fmt.Errorf("nonce: GetContract(%s): %w", addr, err) + } + + for { + block, err := parseBlockKey(depIt.Key(), deprecatedPrefix) + if err != nil { + return fmt.Errorf("nonce(%s): %w", addr, err) + } + hasNext := depIt.Next() + historyValue := contract.Nonce + if hasNext { + rawValue, err := depIt.Value() + if err != nil { + return fmt.Errorf("nonce(%s): %w", addr, err) + } + historyValue = felt.FromBytes[felt.Felt](rawValue) + } + err = state.WriteNonceHistory(t.Batch, addr, block, &historyValue) + if err != nil { + return err + } + t.EntryCount++ + if err := i.Flush(t, outputs); err != nil { + return err + } + if !hasNext { + break + } + } + + if err := t.Batch.DeleteRange(deprecatedPrefix, dbutils.UpperBound(deprecatedPrefix)); err != nil { + return fmt.Errorf("nonce: DeleteRange deprecated(%s): %w", addr, err) + } + t.CompletedAddrs++ + return nil +} diff --git a/migration/state/history/parse.go b/migration/state/history/parse.go new file mode 100644 index 0000000000..59bfbe0ad7 --- /dev/null +++ b/migration/state/history/parse.go @@ -0,0 +1,28 @@ +package history + +import ( + "encoding/binary" + "fmt" + + "github.com/NethermindEth/juno/core/felt" +) + +func parseBlockKey(key, prefix []byte) (uint64, error) { + if len(key) != len(prefix)+8 { + return 0, fmt.Errorf("malformed block-keyed entry: key len %d, want %d", len(key), len(prefix)+8) + } + return binary.BigEndian.Uint64(key[len(prefix):]), nil +} + +func parseStorageKey(key, prefix []byte) (felt.Felt, uint64, error) { + if len(key) != len(prefix)+felt.Bytes+8 { + return felt.Felt{}, 0, fmt.Errorf( + "malformed storage-history entry: key len %d, want %d", + len(key), + len(prefix)+felt.Bytes+8, + ) + } + slot := felt.FromBytes[felt.Felt](key[len(prefix) : len(prefix)+felt.Bytes]) + block := binary.BigEndian.Uint64(key[len(prefix)+felt.Bytes:]) + return slot, block, nil +} diff --git a/migration/state/history/storage_ingestor.go b/migration/state/history/storage_ingestor.go new file mode 100644 index 0000000000..7222050cb9 --- /dev/null +++ b/migration/state/history/storage_ingestor.go @@ -0,0 +1,220 @@ +package history + +import ( + "bytes" + "context" + "fmt" + + "github.com/NethermindEth/juno/core/deprecatedstate" + "github.com/NethermindEth/juno/core/felt" + "github.com/NethermindEth/juno/core/state" + "github.com/NethermindEth/juno/core/trie" + "github.com/NethermindEth/juno/db" + "github.com/NethermindEth/juno/db/dbutils" + "github.com/NethermindEth/juno/migration/pipeline" + "github.com/NethermindEth/juno/migration/semaphore" + "github.com/NethermindEth/juno/migration/state/common" +) + +type storageIngestor struct { + common.BaseIngestor +} + +var _ pipeline.State[*felt.Felt, common.Task] = (*storageIngestor)(nil) + +func newStorageIngestor( + ctx context.Context, + sem semaphore.ResourceSemaphore[db.Batch], + database db.KeyValueReader, +) *storageIngestor { + return &storageIngestor{BaseIngestor: common.NewBaseIngestor(ctx, sem, database)} +} + +// Run migrates the per-slot storage history of a single contract. +// +// Legend: Bₙ = block at which the n-th change to a slot happened. preXₙ +// is the value of slot X before Bₙ (= what the deprecated layout stores +// at [X, Bₙ]); headX is the slot's current value, read from the head +// storage trie. The deprecated layout writes nothing at deploy — the +// pre-deploy value (0) is implicit in the first change entry. The new +// layout stores the same number of entries per slot, just shifted to +// post-values. For one slot: +// +// block │ deprecated[slotA] │ new[slotA] +// ───────┼───────────────────┼─────────── +// B₁ │ 0 │ preA₁ +// B₂ │ preA₁ │ preA₂ +// B₃ │ preA₂ │ headA +// ───────┼───────────────────┼─────────── +// > B₃ │ head trie leaf │ headA (last entry — self-contained) +// for slotA ← deprecated must reach into the head +// storage trie for any block past the +// last change +// +// For each deprecated entry the post-value comes from one of: +// +// - the *next* deprecated entry, when it's on the same slot — its stored +// pre-value is exactly this block's post-value; +// - the head storage trie leaf for that slot, when there is no next +// deprecated entry on the same slot; +// - felt.Zero, when there is no head leaf for the slot (the slot was +// eventually zeroed out and dropped from the trie). +// +// Both the deprecated history and the head trie are sorted by raw slot +// bytes, so the ingestor walks them in lockstep — the head-trie iterator +// advances only when its current leaf matches the slot just resolved: +// +// deprecated history head trie new history +// ───────────────────── ───────────── ───────────────────────── +// [slotA, B₁..B₃] ──→ [slotA] = headA [slotA, B₁..B₃] last uses headA +// [slotB, B₁..B₂] ──→ (no leaf) [slotB, B₁..B₂] last uses 0 +// ← slotB was set (slotB was zeroed +// and later zeroed at B₂) +// [slotC, B₁] ──→ [slotC] = headC [slotC, B₁] = headC +// +// Contracts with no deprecated storage history are skipped; deprecated +// rows are deleted at the end of the run via DeleteRange. +func (i *storageIngestor) Run(index int, addr *felt.Felt, outputs chan<- common.Task) error { + t := &i.Tasks[index] + + addrBytes := addr.Marshal() + deprecatedPrefix := db.DeprecatedContractStorageHistory.Key(addrBytes) + + deprecatedHistoryIt, err := i.Database.NewIterator(deprecatedPrefix, true) + if err != nil { + return fmt.Errorf("storage: open deprecated iter(%s): %w", addr, err) + } + defer deprecatedHistoryIt.Close() + if !deprecatedHistoryIt.First() { + return nil + } + + leafPrefix := db.ContractStorage.Key(addrBytes) + leafPrefix = append(leafPrefix, deprecatedstate.ContractStorageTrieHeight) + + headStorageTrieIt, err := i.Database.NewIterator(leafPrefix, true) + if err != nil { + return fmt.Errorf("storage: open leaf iter(%s): %w", addr, err) + } + defer headStorageTrieIt.Close() + leafValid := headStorageTrieIt.First() + + for { + slot, block, err := parseStorageKey(deprecatedHistoryIt.Key(), deprecatedPrefix) + if err != nil { + return fmt.Errorf("storage: parse key(%s): %w", addr, err) + } + + successorSlot, successorValue, hasNext, err := peekSuccessor( + deprecatedHistoryIt, + deprecatedPrefix, + addr, + ) + if err != nil { + return err + } + + historyValue, advanced, err := resolveHistoryValue( + headStorageTrieIt, + leafPrefix, + addr, + &slot, + hasNext, + successorSlot, + successorValue, + leafValid, + ) + if err != nil { + return err + } + if advanced { + leafValid = headStorageTrieIt.Next() + } + + err = state.WriteStorageHistory( + t.Batch, + addr, + &slot, + block, + &historyValue, + ) + if err != nil { + return err + } + t.EntryCount++ + if err := i.Flush(t, outputs); err != nil { + return err + } + + if !hasNext { + break + } + } + + if err := t.Batch.DeleteRange(deprecatedPrefix, dbutils.UpperBound(deprecatedPrefix)); err != nil { + return fmt.Errorf("storage: DeleteRange deprecated(%s): %w", addr, err) + } + t.CompletedAddrs++ + return nil +} + +// peekSuccessor advances the deprecated-history iterator. If a next entry +// exists, returns its (slot, value, true); otherwise returns (_, _, false). +func peekSuccessor( + it db.Iterator, + prefix []byte, + addr *felt.Felt, +) (felt.Felt, felt.Felt, bool, error) { + if !it.Next() { + return felt.Felt{}, felt.Felt{}, false, nil + } + slot, _, err := parseStorageKey(it.Key(), prefix) + if err != nil { + return felt.Felt{}, felt.Felt{}, false, fmt.Errorf( + "storage: parse successor key(%s): %w", addr, err, + ) + } + rawValue, err := it.Value() + if err != nil { + return felt.Felt{}, felt.Felt{}, false, fmt.Errorf( + "storage: read successor value(%s, slot=%s): %w", addr, &slot, err, + ) + } + return slot, felt.FromBytes[felt.Felt](rawValue), true, nil +} + +// resolveHistoryValue decides the value to install at the current entry: the +// successor's value when both are on the same slot, otherwise the head-trie +// leaf (when the iterator is positioned on this slot), otherwise zero. Returns +// advanced=true when the head-trie iterator should be advanced by the caller. +func resolveHistoryValue( + headIt db.Iterator, + leafPrefix []byte, + addr, slot *felt.Felt, + hasSuccessor bool, + successorSlot, successorValue felt.Felt, + leafValid bool, +) (value felt.Felt, advanced bool, err error) { + if hasSuccessor && successorSlot == *slot { + return successorValue, false, nil + } + if !leafValid { + return felt.Felt{}, false, nil + } + if !bytes.Equal(headIt.Key()[len(leafPrefix):], slot.Marshal()) { + return felt.Felt{}, false, nil + } + raw, err := headIt.Value() + if err != nil { + return felt.Felt{}, false, fmt.Errorf( + "storage: leaf(%s, slot=%s): %w", addr, slot, err, + ) + } + var node trie.Node + if err := node.UnmarshalBinary(raw); err != nil { + return felt.Felt{}, false, fmt.Errorf( + "storage: decode leaf(%s, slot=%s): %w", addr, slot, err, + ) + } + return *node.Value, true, nil +} diff --git a/node/migration.go b/node/migration.go index 9d9a1fa319..b0a7d69513 100644 --- a/node/migration.go +++ b/node/migration.go @@ -13,6 +13,7 @@ import ( "github.com/NethermindEth/juno/migration/deprecated" //nolint:staticcheck,nolintlint,lll // ignore statick check package will be removed in future, nolinlint because main config does not check "github.com/NethermindEth/juno/migration/historyprunner" "github.com/NethermindEth/juno/migration/state/headstate" + "github.com/NethermindEth/juno/migration/state/history" "github.com/NethermindEth/juno/utils/log" ) @@ -27,7 +28,8 @@ func registerMigrations(cfg *Config) *migration.Registry { cfg.Prune, PruneModeFlag, ). - WithOptional(&headstate.Migrator{}, cfg.NewState, "new-state") + WithOptional(&headstate.Migrator{}, cfg.NewState, "new-state"). + WithOptional(&history.Migrator{}, cfg.NewState, "new-state") return registry }