core, triedb/pathdb: integrate state snapshot inth pathdb

This commit is contained in:
Gary Rong 2024-10-23 14:49:22 +08:00
parent 82e963e5c9
commit f981e181f8
26 changed files with 2584 additions and 213 deletions

View File

@ -161,7 +161,12 @@ func (c *CacheConfig) triedbConfig(isVerkle bool) *triedb.Config {
if c.StateScheme == rawdb.PathScheme {
config.PathDB = &pathdb.Config{
StateHistory: c.StateHistory,
CleanCacheSize: c.TrieCleanLimit * 1024 * 1024,
TrieCleanSize: c.TrieCleanLimit * 1024 * 1024,
StateCleanSize: c.SnapshotLimit * 1024 * 1024,
// TODO(rjl493456442): The write buffer represents the memory limit used
// for flushing both trie data and state data to disk. The config name
// should be updated to eliminate the confusion.
WriteBufferSize: c.TrieDirtyLimit * 1024 * 1024,
}
}
@ -349,11 +354,14 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
// Do nothing here until the state syncer picks it up.
log.Info("Genesis state is missing, wait state sync")
} else {
// Head state is missing, before the state recovery, find out the
// disk layer point of snapshot(if it's enabled). Make sure the
// rewound point is lower than disk layer.
// Head state is missing, before the state recovery, find out the disk
// layer point of snapshot(if it's enabled). Make sure the rewound point
// is lower than disk layer.
//
// Note it's unnecessary in path mode which always keep trie data and
// state data consistent.
var diskRoot common.Hash
if bc.cacheConfig.SnapshotLimit > 0 {
if bc.cacheConfig.SnapshotLimit > 0 && bc.cacheConfig.StateScheme == rawdb.HashScheme {
diskRoot = rawdb.ReadSnapshotRoot(bc.db)
}
if diskRoot != (common.Hash{}) {
@ -426,31 +434,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
bc.logger.OnGenesisBlock(bc.genesisBlock, alloc)
}
}
// Load any existing snapshot, regenerating it if loading failed
if bc.cacheConfig.SnapshotLimit > 0 {
// If the chain was rewound past the snapshot persistent layer (causing
// a recovery block number to be persisted to disk), check if we're still
// in recovery mode and in that case, don't invalidate the snapshot on a
// head mismatch.
var recover bool
head := bc.CurrentBlock()
if layer := rawdb.ReadSnapshotRecoveryNumber(bc.db); layer != nil && *layer >= head.Number.Uint64() {
log.Warn("Enabling snapshot recovery", "chainhead", head.Number, "diskbase", *layer)
recover = true
}
snapconfig := snapshot.Config{
CacheSize: bc.cacheConfig.SnapshotLimit,
Recovery: recover,
NoBuild: bc.cacheConfig.SnapshotNoBuild,
AsyncBuild: !bc.cacheConfig.SnapshotWait,
}
bc.snaps, _ = snapshot.New(snapconfig, bc.db, bc.triedb, head.Root)
// Re-initialize the state database with snapshot
bc.statedb = state.NewDatabase(bc.triedb, bc.snaps)
}
bc.setupSnapshot()
// Rewind the chain in case of an incompatible config upgrade.
if compat, ok := genesisErr.(*params.ConfigCompatError); ok {
@ -470,6 +454,37 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
return bc, nil
}
func (bc *BlockChain) setupSnapshot() {
// Short circuit if the chain is established with path scheme, as the
// state snapshot has been integrated into path database natively.
if bc.cacheConfig.StateScheme == rawdb.PathScheme {
return
}
// Load any existing snapshot, regenerating it if loading failed
if bc.cacheConfig.SnapshotLimit > 0 {
// If the chain was rewound past the snapshot persistent layer (causing
// a recovery block number to be persisted to disk), check if we're still
// in recovery mode and in that case, don't invalidate the snapshot on a
// head mismatch.
var recover bool
head := bc.CurrentBlock()
if layer := rawdb.ReadSnapshotRecoveryNumber(bc.db); layer != nil && *layer >= head.Number.Uint64() {
log.Warn("Enabling snapshot recovery", "chainhead", head.Number, "diskbase", *layer)
recover = true
}
snapconfig := snapshot.Config{
CacheSize: bc.cacheConfig.SnapshotLimit,
Recovery: recover,
NoBuild: bc.cacheConfig.SnapshotNoBuild,
AsyncBuild: !bc.cacheConfig.SnapshotWait,
}
bc.snaps, _ = snapshot.New(snapconfig, bc.db, bc.triedb, head.Root)
// Re-initialize the state database with snapshot
bc.statedb = state.NewDatabase(bc.triedb, bc.snaps)
}
}
// empty returns an indicator whether the blockchain is empty.
// Note, it's a special case that we connect a non-empty ancient
// database with an empty node, so that we can plugin the ancient

View File

@ -1791,7 +1791,7 @@ func testRepairWithScheme(t *testing.T, tt *rewindTest, snapshots bool, scheme s
}
)
defer engine.Close()
if snapshots {
if snapshots && scheme == rawdb.HashScheme {
config.SnapshotLimit = 256
config.SnapshotWait = true
}
@ -1820,7 +1820,7 @@ func testRepairWithScheme(t *testing.T, tt *rewindTest, snapshots bool, scheme s
if err := chain.triedb.Commit(canonblocks[tt.commitBlock-1].Root(), false); err != nil {
t.Fatalf("Failed to flush trie state: %v", err)
}
if snapshots {
if snapshots && scheme == rawdb.HashScheme {
if err := chain.snaps.Cap(canonblocks[tt.commitBlock-1].Root(), 0); err != nil {
t.Fatalf("Failed to flatten snapshots: %v", err)
}
@ -1952,9 +1952,11 @@ func testIssue23496(t *testing.T, scheme string) {
if _, err := chain.InsertChain(blocks[1:2]); err != nil {
t.Fatalf("Failed to import canonical chain start: %v", err)
}
if scheme == rawdb.HashScheme {
if err := chain.snaps.Cap(blocks[1].Root(), 0); err != nil {
t.Fatalf("Failed to flatten snapshots: %v", err)
}
}
// Insert block B3 and commit the state into disk
if _, err := chain.InsertChain(blocks[2:3]); err != nil {
@ -1997,16 +1999,22 @@ func testIssue23496(t *testing.T, scheme string) {
}
expHead := uint64(1)
if scheme == rawdb.PathScheme {
expHead = uint64(2)
expHead = uint64(3)
}
if head := chain.CurrentBlock(); head.Number.Uint64() != expHead {
t.Errorf("Head block mismatch: have %d, want %d", head.Number, expHead)
}
if scheme == rawdb.PathScheme {
// Reinsert B3-B4
if _, err := chain.InsertChain(blocks[2:]); err != nil {
t.Fatalf("Failed to import canonical chain tail: %v", err)
}
} else {
// Reinsert B2-B4
if _, err := chain.InsertChain(blocks[1:]); err != nil {
t.Fatalf("Failed to import canonical chain tail: %v", err)
}
}
if head := chain.CurrentHeader(); head.Number.Uint64() != uint64(4) {
t.Errorf("Head header mismatch: have %d, want %d", head.Number, 4)
}
@ -2016,7 +2024,9 @@ func testIssue23496(t *testing.T, scheme string) {
if head := chain.CurrentBlock(); head.Number.Uint64() != uint64(4) {
t.Errorf("Head block mismatch: have %d, want %d", head.Number, uint64(4))
}
if scheme == rawdb.HashScheme {
if layer := chain.Snapshots().Snapshot(blocks[2].Root()); layer == nil {
t.Error("Failed to regenerate the snapshot of known state")
}
}
}

View File

@ -2023,7 +2023,7 @@ func testSetHeadWithScheme(t *testing.T, tt *rewindTest, snapshots bool, scheme
}
if tt.commitBlock > 0 {
chain.triedb.Commit(canonblocks[tt.commitBlock-1].Root(), false)
if snapshots {
if snapshots && scheme == rawdb.HashScheme {
if err := chain.snaps.Cap(canonblocks[tt.commitBlock-1].Root(), 0); err != nil {
t.Fatalf("Failed to flatten snapshots: %v", err)
}

View File

@ -105,7 +105,7 @@ func (basic *snapshotTestBasic) prepare(t *testing.T) (*BlockChain, []*types.Blo
if basic.commitBlock > 0 && basic.commitBlock == point {
chain.TrieDB().Commit(blocks[point-1].Root(), false)
}
if basic.snapshotBlock > 0 && basic.snapshotBlock == point {
if basic.snapshotBlock > 0 && basic.snapshotBlock == point && basic.scheme == rawdb.HashScheme {
// Flushing the entire snap tree into the disk, the
// relevant (a) snapshot root and (b) snapshot generator
// will be persisted atomically.
@ -149,15 +149,19 @@ func (basic *snapshotTestBasic) verify(t *testing.T, chain *BlockChain, blocks [
block := chain.GetBlockByNumber(basic.expSnapshotBottom)
if block == nil {
t.Errorf("The corresponding block[%d] of snapshot disk layer is missing", basic.expSnapshotBottom)
} else if !bytes.Equal(chain.snaps.DiskRoot().Bytes(), block.Root().Bytes()) {
} else if basic.scheme == rawdb.HashScheme {
if !bytes.Equal(chain.snaps.DiskRoot().Bytes(), block.Root().Bytes()) {
t.Errorf("The snapshot disk layer root is incorrect, want %x, get %x", block.Root(), chain.snaps.DiskRoot())
}
}
// Check the snapshot, ensure it's integrated
if basic.scheme == rawdb.HashScheme {
if err := chain.snaps.Verify(block.Root()); err != nil {
t.Errorf("The disk layer is not integrated %v", err)
}
}
}
//nolint:unused
func (basic *snapshotTestBasic) dump() string {
@ -570,7 +574,7 @@ func TestHighCommitCrashWithNewSnapshot(t *testing.T) {
for _, scheme := range []string{rawdb.HashScheme, rawdb.PathScheme} {
expHead := uint64(0)
if scheme == rawdb.PathScheme {
expHead = uint64(4)
expHead = uint64(6)
}
test := &crashSnapshotTest{
snapshotTestBasic{

View File

@ -186,8 +186,9 @@ func (db *CachingDB) Reader(stateRoot common.Hash) (Reader, error) {
readers = append(readers, newFlatReader(snap))
}
} else {
// If standalone state snapshot is not available, try to construct
// the state reader with database.
// If standalone state snapshot is not available (path scheme
// or the state snapshot is explicitly disabled in hash mode),
// try to construct the state reader with database.
reader, err := db.triedb.StateReader(stateRoot)
if err == nil {
readers = append(readers, newFlatReader(reader)) // state reader is optional

View File

@ -166,7 +166,9 @@ func newHelper(scheme string) *testHelper {
diskdb := rawdb.NewMemoryDatabase()
config := &triedb.Config{}
if scheme == rawdb.PathScheme {
config.PathDB = &pathdb.Config{} // disable caching
config.PathDB = &pathdb.Config{
SnapshotNoBuild: true,
} // disable caching
} else {
config.HashDB = &hashdb.Config{} // disable caching
}

View File

@ -979,7 +979,8 @@ func testMissingTrieNodes(t *testing.T, scheme string) {
)
if scheme == rawdb.PathScheme {
tdb = triedb.NewDatabase(memDb, &triedb.Config{PathDB: &pathdb.Config{
CleanCacheSize: 0,
TrieCleanSize: 0,
StateCleanSize: 0,
WriteBufferSize: 0,
}}) // disable caching
} else {

View File

@ -27,6 +27,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/forkid"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
@ -175,7 +176,7 @@ func newHandler(config *handlerConfig) (*handler, error) {
}
}
// If snap sync is requested but snapshots are disabled, fail loudly
if h.snapSync.Load() && config.Chain.Snapshots() == nil {
if h.snapSync.Load() && (config.Chain.Snapshots() == nil && config.Chain.TrieDB().Scheme() == rawdb.HashScheme) {
return nil, errors.New("snap sync not supported with snapshots disabled")
}
// Construct the downloader (long sync)

View File

@ -23,6 +23,8 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state/snapshot"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
@ -31,6 +33,7 @@ import (
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/ethereum/go-ethereum/trie"
"github.com/ethereum/go-ethereum/trie/trienode"
"github.com/ethereum/go-ethereum/triedb/database"
)
const (
@ -279,7 +282,16 @@ func ServiceGetAccountRangeQuery(chain *core.BlockChain, req *GetAccountRangePac
if err != nil {
return nil, nil
}
it, err := chain.Snapshots().AccountIterator(req.Root, req.Origin)
// Temporary solution: using the snapshot interface for both cases.
// This can be removed once the hash scheme is deprecated.
var it snapshot.AccountIterator
if chain.TrieDB().Scheme() == rawdb.HashScheme {
// The snapshot is assumed to be available in hash mode if
// the SNAP protocol is enabled.
it, err = chain.Snapshots().AccountIterator(req.Root, req.Origin)
} else {
it, err = chain.TrieDB().AccountIterator(req.Root, req.Origin)
}
if err != nil {
return nil, nil
}
@ -359,7 +371,19 @@ func ServiceGetStorageRangesQuery(chain *core.BlockChain, req *GetStorageRangesP
limit, req.Limit = common.BytesToHash(req.Limit), nil
}
// Retrieve the requested state and bail out if non existent
it, err := chain.Snapshots().StorageIterator(req.Root, account, origin)
var (
err error
it snapshot.StorageIterator
)
// Temporary solution: using the snapshot interface for both cases.
// This can be removed once the hash scheme is deprecated.
if chain.TrieDB().Scheme() == rawdb.HashScheme {
// The snapshot is assumed to be available in hash mode if
// the SNAP protocol is enabled.
it, err = chain.Snapshots().StorageIterator(req.Root, account, origin)
} else {
it, err = chain.TrieDB().StorageIterator(req.Root, account, origin)
}
if err != nil {
return nil, nil
}
@ -479,8 +503,15 @@ func ServiceGetTrieNodesQuery(chain *core.BlockChain, req *GetTrieNodesPacket, s
// We don't have the requested state available, bail out
return nil, nil
}
// The 'snap' might be nil, in which case we cannot serve storage slots.
snap := chain.Snapshots().Snapshot(req.Root)
// The 'reader' might be nil, in which case we cannot serve storage slots
// via snapshot.
var reader database.StateReader
if chain.Snapshots() != nil {
reader = chain.Snapshots().Snapshot(req.Root)
}
if reader == nil {
reader, _ = triedb.StateReader(req.Root)
}
// Retrieve trie nodes until the packet size limit is reached
var (
nodes [][]byte
@ -505,8 +536,9 @@ func ServiceGetTrieNodesQuery(chain *core.BlockChain, req *GetTrieNodesPacket, s
default:
var stRoot common.Hash
// Storage slots requested, open the storage trie and retrieve from there
if snap == nil {
if reader == nil {
// We don't have the requested state snapshotted yet (or it is stale),
// but can look up the account via the trie instead.
account, err := accTrie.GetAccountByHash(common.BytesToHash(pathset[0]))
@ -516,7 +548,7 @@ func ServiceGetTrieNodesQuery(chain *core.BlockChain, req *GetTrieNodesPacket, s
}
stRoot = account.Root
} else {
account, err := snap.Account(common.BytesToHash(pathset[0]))
account, err := reader.Account(common.BytesToHash(pathset[0]))
loads++ // always account database reads, even for failures
if err != nil || account == nil {
break

View File

@ -1962,5 +1962,5 @@ func newDbConfig(scheme string) *triedb.Config {
if scheme == rawdb.HashScheme {
return &triedb.Config{}
}
return &triedb.Config{PathDB: pathdb.Defaults}
return &triedb.Config{PathDB: &pathdb.Config{SnapshotNoBuild: true}}
}

View File

@ -187,10 +187,12 @@ func (t *BlockTest) Run(snapshotter bool, scheme string, witness bool, tracer *t
}
// Cross-check the snapshot-to-hash against the trie hash
if snapshotter {
if chain.Snapshots() != nil {
if err := chain.Snapshots().Verify(chain.CurrentBlock().Root); err != nil {
return err
}
}
}
return t.validateImportedHeaders(chain, validBlocks)
}

View File

@ -25,7 +25,7 @@ import (
"github.com/ethereum/go-ethereum/triedb/database"
)
// testReader implements database.Reader interface, providing function to
// testReader implements database.NodeReader interface, providing function to
// access trie nodes.
type testReader struct {
db ethdb.Database
@ -33,7 +33,7 @@ type testReader struct {
nodes []*trienode.MergedNodeSet // sorted from new to old
}
// Node implements database.Reader interface, retrieving trie node with
// Node implements database.NodeReader interface, retrieving trie node with
// all available cached layers.
func (r *testReader) Node(owner common.Hash, path []byte, hash common.Hash) ([]byte, error) {
// Check the node presence with the cached layer, from latest to oldest.
@ -54,7 +54,7 @@ func (r *testReader) Node(owner common.Hash, path []byte, hash common.Hash) ([]b
return rawdb.ReadTrieNode(r.db, owner, path, hash, r.scheme), nil
}
// testDb implements database.Database interface, using for testing purpose.
// testDb implements database.NodeDatabase interface, using for testing purpose.
type testDb struct {
disk ethdb.Database
root common.Hash

View File

@ -322,6 +322,26 @@ func (db *Database) Journal(root common.Hash) error {
return pdb.Journal(root)
}
// AccountIterator creates a new account iterator for the specified root hash and
// seeks to a starting account hash.
func (db *Database) AccountIterator(root common.Hash, seek common.Hash) (pathdb.AccountIterator, error) {
pdb, ok := db.backend.(*pathdb.Database)
if !ok {
return nil, errors.New("not supported")
}
return pdb.AccountIterator(root, seek)
}
// StorageIterator creates a new storage iterator for the specified root hash and
// account. The iterator will be move to the specific start position.
func (db *Database) StorageIterator(root common.Hash, account common.Hash, seek common.Hash) (pathdb.StorageIterator, error) {
pdb, ok := db.backend.(*pathdb.Database)
if !ok {
return nil, errors.New("not supported")
}
return pdb.StorageIterator(root, account, seek)
}
// IsVerkle returns the indicator if the database is holding a verkle tree.
func (db *Database) IsVerkle() bool {
return db.config.IsVerkle

View File

@ -124,7 +124,7 @@ func (b *buffer) size() uint64 {
// flush persists the in-memory dirty trie node into the disk if the configured
// memory threshold is reached. Note, all data must be written atomically.
func (b *buffer) flush(db ethdb.KeyValueStore, freezer ethdb.AncientWriter, nodesCache *fastcache.Cache, id uint64) error {
func (b *buffer) flush(root common.Hash, db ethdb.KeyValueStore, freezer ethdb.AncientWriter, progress []byte, nodesCache, statesCache *fastcache.Cache, id uint64) error {
// Ensure the target state id is aligned with the internal counter.
head := rawdb.ReadPersistentStateID(db)
if head+b.layers != id {
@ -133,7 +133,7 @@ func (b *buffer) flush(db ethdb.KeyValueStore, freezer ethdb.AncientWriter, node
// Terminate the state snapshot generation if it's active
var (
start = time.Now()
batch = db.NewBatchWithSize(b.nodes.dbsize() * 11 / 10) // extra 10% for potential pebble internal stuff
batch = db.NewBatchWithSize((b.nodes.dbsize() + b.states.dbsize()) * 11 / 10) // extra 10% for potential pebble internal stuff
)
// Explicitly sync the state freezer, ensuring that all written
// data is transferred to disk before updating the key-value store.
@ -143,7 +143,9 @@ func (b *buffer) flush(db ethdb.KeyValueStore, freezer ethdb.AncientWriter, node
}
}
nodes := b.nodes.write(batch, nodesCache)
accounts, slots := b.states.write(batch, progress, statesCache)
rawdb.WritePersistentStateID(batch, id)
rawdb.WriteSnapshotRoot(batch, root)
// Flush all mutations in a single batch
size := batch.ValueSize()
@ -152,8 +154,10 @@ func (b *buffer) flush(db ethdb.KeyValueStore, freezer ethdb.AncientWriter, node
}
commitBytesMeter.Mark(int64(size))
commitNodesMeter.Mark(int64(nodes))
commitAccountsMeter.Mark(int64(accounts))
commitStoragesMeter.Mark(int64(slots))
commitTimeTimer.UpdateSince(start)
b.reset()
log.Debug("Persisted buffer content", "nodes", nodes, "bytes", common.StorageSize(size), "elapsed", common.PrettyDuration(time.Since(start)))
log.Debug("Persisted buffer content", "nodes", nodes, "accounts", accounts, "slots", slots, "bytes", common.StorageSize(size), "elapsed", common.PrettyDuration(time.Since(start)))
return nil
}

246
triedb/pathdb/context.go Normal file
View File

@ -0,0 +1,246 @@
// Copyright 2024 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package pathdb
import (
"bytes"
"encoding/binary"
"errors"
"math"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/ethdb/memorydb"
"github.com/ethereum/go-ethereum/log"
)
const (
snapAccount = "account" // Identifier of account snapshot generation
snapStorage = "storage" // Identifier of storage snapshot generation
)
// generatorStats is a collection of statistics gathered by the snapshot generator
// for logging purposes. This data structure is used throughout the entire
// lifecycle of the snapshot generation process and is shared across multiple
// generation cycles.
type generatorStats struct {
origin uint64 // Origin prefix where generation started
start time.Time // Timestamp when generation started
accounts uint64 // Number of accounts indexed(generated or recovered)
slots uint64 // Number of storage slots indexed(generated or recovered)
dangling uint64 // Number of dangling storage slots
storage common.StorageSize // Total account and storage slot size(generation or recovery)
}
// log creates a contextual log with the given message and the context pulled
// from the internally maintained statistics.
func (gs *generatorStats) log(msg string, root common.Hash, marker []byte) {
var ctx []interface{}
if root != (common.Hash{}) {
ctx = append(ctx, []interface{}{"root", root}...)
}
// Figure out whether we're after or within an account
switch len(marker) {
case common.HashLength:
ctx = append(ctx, []interface{}{"at", common.BytesToHash(marker)}...)
case 2 * common.HashLength:
ctx = append(ctx, []interface{}{
"in", common.BytesToHash(marker[:common.HashLength]),
"at", common.BytesToHash(marker[common.HashLength:]),
}...)
}
// Add the usual measurements
ctx = append(ctx, []interface{}{
"accounts", gs.accounts,
"slots", gs.slots,
"storage", gs.storage,
"dangling", gs.dangling,
"elapsed", common.PrettyDuration(time.Since(gs.start)),
}...)
// Calculate the estimated indexing time based on current stats
if len(marker) > 0 {
if done := binary.BigEndian.Uint64(marker[:8]) - gs.origin; done > 0 {
left := math.MaxUint64 - binary.BigEndian.Uint64(marker[:8])
speed := done/uint64(time.Since(gs.start)/time.Millisecond+1) + 1 // +1s to avoid division by zero
ctx = append(ctx, []interface{}{
"eta", common.PrettyDuration(time.Duration(left/speed) * time.Millisecond),
}...)
}
}
log.Info(msg, ctx...)
}
// generatorContext holds several global fields that are used throughout the
// current generation cycle. It must be recreated if the generation cycle is
// restarted.
type generatorContext struct {
root common.Hash // State root of the generation target
account *holdableIterator // Iterator of account snapshot data
storage *holdableIterator // Iterator of storage snapshot data
db ethdb.KeyValueStore // Key-value store containing the snapshot data
batch ethdb.Batch // Database batch for writing data atomically
logged time.Time // The timestamp when last generation progress was displayed
}
// newGeneratorContext initializes the context for generation.
func newGeneratorContext(root common.Hash, marker []byte, db ethdb.KeyValueStore) *generatorContext {
ctx := &generatorContext{
root: root,
db: db,
batch: db.NewBatch(),
logged: time.Now(),
}
accMarker, storageMarker := splitMarker(marker)
ctx.openIterator(snapAccount, accMarker)
ctx.openIterator(snapStorage, storageMarker)
return ctx
}
// openIterator constructs global account and storage snapshot iterators
// at the interrupted position. These iterators should be reopened from time
// to time to avoid blocking leveldb compaction for a long time.
func (ctx *generatorContext) openIterator(kind string, start []byte) {
if kind == snapAccount {
iter := ctx.db.NewIterator(rawdb.SnapshotAccountPrefix, start)
ctx.account = newHoldableIterator(rawdb.NewKeyLengthIterator(iter, 1+common.HashLength))
return
}
iter := ctx.db.NewIterator(rawdb.SnapshotStoragePrefix, start)
ctx.storage = newHoldableIterator(rawdb.NewKeyLengthIterator(iter, 1+2*common.HashLength))
}
// reopenIterator releases the specified snapshot iterator and re-open it
// in the next position. It's aimed for not blocking leveldb compaction.
func (ctx *generatorContext) reopenIterator(kind string) {
// Shift iterator one more step, so that we can reopen
// the iterator at the right position.
var iter = ctx.account
if kind == snapStorage {
iter = ctx.storage
}
hasNext := iter.Next()
if !hasNext {
// Iterator exhausted, release forever and create an already exhausted virtual iterator
iter.Release()
if kind == snapAccount {
ctx.account = newHoldableIterator(memorydb.New().NewIterator(nil, nil))
return
}
ctx.storage = newHoldableIterator(memorydb.New().NewIterator(nil, nil))
return
}
next := iter.Key()
iter.Release()
ctx.openIterator(kind, next[1:])
}
// close releases all the held resources.
func (ctx *generatorContext) close() {
ctx.account.Release()
ctx.storage.Release()
}
// iterator returns the corresponding iterator specified by the kind.
func (ctx *generatorContext) iterator(kind string) *holdableIterator {
if kind == snapAccount {
return ctx.account
}
return ctx.storage
}
// removeStorageBefore deletes all storage entries which are located before
// the specified account. When the iterator touches the storage entry which
// is located in or outside the given account, it stops and holds the current
// iterated element locally.
func (ctx *generatorContext) removeStorageBefore(account common.Hash) uint64 {
var (
count uint64
start = time.Now()
iter = ctx.storage
)
for iter.Next() {
key := iter.Key()
if bytes.Compare(key[1:1+common.HashLength], account.Bytes()) >= 0 {
iter.Hold()
break
}
count++
ctx.batch.Delete(key)
if ctx.batch.ValueSize() > ethdb.IdealBatchSize {
ctx.batch.Write()
ctx.batch.Reset()
}
}
storageCleanCounter.Inc(time.Since(start).Nanoseconds())
return count
}
// removeStorageAt deletes all storage entries which are located in the specified
// account. When the iterator touches the storage entry which is outside the given
// account, it stops and holds the current iterated element locally. An error will
// be returned if the initial position of iterator is not in the given account.
func (ctx *generatorContext) removeStorageAt(account common.Hash) error {
var (
count int64
start = time.Now()
iter = ctx.storage
)
for iter.Next() {
key := iter.Key()
cmp := bytes.Compare(key[1:1+common.HashLength], account.Bytes())
if cmp < 0 {
return errors.New("invalid iterator position")
}
if cmp > 0 {
iter.Hold()
break
}
count++
ctx.batch.Delete(key)
if ctx.batch.ValueSize() > ethdb.IdealBatchSize {
ctx.batch.Write()
ctx.batch.Reset()
}
}
wipedStorageMeter.Mark(count)
storageCleanCounter.Inc(time.Since(start).Nanoseconds())
return nil
}
// removeStorageLeft deletes all storage entries which are located after
// the current iterator position.
func (ctx *generatorContext) removeStorageLeft() uint64 {
var (
count uint64
start = time.Now()
iter = ctx.storage
)
for iter.Next() {
count++
ctx.batch.Delete(iter.Key())
if ctx.batch.ValueSize() > ethdb.IdealBatchSize {
ctx.batch.Write()
ctx.batch.Reset()
}
}
danglingStorageMeter.Mark(int64(count))
storageCleanCounter.Inc(time.Since(start).Nanoseconds())
return count
}

View File

@ -17,6 +17,7 @@
package pathdb
import (
"encoding/binary"
"errors"
"fmt"
"io"
@ -35,8 +36,11 @@ import (
)
const (
// defaultCleanSize is the default memory allowance of clean cache.
defaultCleanSize = 16 * 1024 * 1024
// defaultTrieCleanSize is the default memory allowance of clean trie cache.
defaultTrieCleanSize = 16 * 1024 * 1024
// defaultStateCleanSize is the default memory allowance of clean state cache.
defaultStateCleanSize = 16 * 1024 * 1024
// maxBufferSize is the maximum memory allowance of node buffer.
// Too large buffer will cause the system to pause for a long
@ -111,9 +115,11 @@ type layer interface {
// Config contains the settings for database.
type Config struct {
StateHistory uint64 // Number of recent blocks to maintain state history for
CleanCacheSize int // Maximum memory allowance (in bytes) for caching clean nodes
TrieCleanSize int // Maximum memory allowance (in bytes) for caching clean trie nodes
StateCleanSize int // Maximum memory allowance (in bytes) for caching clean state data
WriteBufferSize int // Maximum memory allowance (in bytes) for write buffer
ReadOnly bool // Flag whether the database is opened in read only mode.
ReadOnly bool // Flag whether the database is opened in read only mode
SnapshotNoBuild bool // Flag Whether the background generation is allowed
}
// sanitize checks the provided user configurations and changes anything that's
@ -133,7 +139,11 @@ func (c *Config) fields() []interface{} {
if c.ReadOnly {
list = append(list, "readonly", true)
}
list = append(list, "cache", common.StorageSize(c.CleanCacheSize))
if c.SnapshotNoBuild {
list = append(list, "snapshot", false)
}
list = append(list, "triecache", common.StorageSize(c.TrieCleanSize))
list = append(list, "statecache", common.StorageSize(c.StateCleanSize))
list = append(list, "buffer", common.StorageSize(c.WriteBufferSize))
list = append(list, "history", c.StateHistory)
return list
@ -142,7 +152,8 @@ func (c *Config) fields() []interface{} {
// Defaults contains default settings for Ethereum mainnet.
var Defaults = &Config{
StateHistory: params.FullImmutabilityThreshold,
CleanCacheSize: defaultCleanSize,
TrieCleanSize: defaultTrieCleanSize,
StateCleanSize: defaultStateCleanSize,
WriteBufferSize: defaultBufferSize,
}
@ -240,6 +251,12 @@ func New(diskdb ethdb.Database, config *Config, isVerkle bool) *Database {
log.Crit("Failed to disable database", "err", err) // impossible to happen
}
}
// Resolving the state snapshot generation progress from the database is
// mandatory. This ensures that uncovered flat states are not accessed,
// even if background generation is not allowed. If permitted, the generation
// might be scheduled.
db.setStateGenerator()
fields := config.fields()
if db.isVerkle {
fields = append(fields, "verkle", true)
@ -297,6 +314,52 @@ func (db *Database) repairHistory() error {
return nil
}
// setStateGenerator loads the state generation progress marker and potentially
// resume the state generation if it's permitted.
func (db *Database) setStateGenerator() {
// Load the state snapshot generation progress marker to prevent access
// to uncovered states.
generator, root := loadGenerator(db.diskdb)
if generator == nil {
// Initialize an empty generator to rebuild the state snapshot from scratch
generator = &journalGenerator{
Marker: []byte{},
}
}
// Short circuit if the whole state snapshot has already been fully generated.
// The generator will be left as nil in disk layer for representing the whole
// state snapshot is available for accessing.
if generator.Done {
return
}
var origin uint64
if len(generator.Marker) >= 8 {
origin = binary.BigEndian.Uint64(generator.Marker)
}
stats := &generatorStats{
origin: origin,
start: time.Now(),
accounts: generator.Accounts,
slots: generator.Slots,
storage: common.StorageSize(generator.Storage),
}
dl := db.tree.bottom()
// Construct the generator and link it to the disk layer, ensuring that the
// generation progress is resolved to prevent accessing uncovered states
// regardless of whether background state snapshot generation is allowed.
noBuild := db.readOnly || db.config.SnapshotNoBuild
dl.setGenerator(newGenerator(db.diskdb, noBuild, generator.Marker, stats))
// Short circuit if the background generation is not permitted. Notably,
// snapshot generation is not functional in the verkle design.
if noBuild || db.isVerkle || db.waitSync {
return
}
stats.log("Starting snapshot generation", root, generator.Marker)
dl.generator.run(root)
}
// Update adds a new layer into the tree, if that can be linked to an existing
// old parent. It is disallowed to insert a disk layer (the origin of all). Apart
// from that this function will flatten the extra diff layers at bottom into disk
@ -359,8 +422,13 @@ func (db *Database) Disable() error {
}
db.waitSync = true
// Mark the disk layer as stale to prevent access to persistent state.
db.tree.bottom().markStale()
// Terminate the state generator if it's active and mark the disk layer
// as stale to prevent access to persistent state.
disk := db.tree.bottom()
if disk.generator != nil {
disk.generator.stop()
}
disk.markStale()
// Write the initial sync flag to persist it across restarts.
rawdb.WriteSnapSyncStatusFlag(db.diskdb, rawdb.StateSyncRunning)
@ -390,6 +458,7 @@ func (db *Database) Enable(root common.Hash) error {
// reset the persistent state id back to zero.
batch := db.diskdb.NewBatch()
rawdb.DeleteTrieJournal(batch)
rawdb.DeleteSnapshotRoot(batch)
rawdb.WritePersistentStateID(batch, 0)
if err := batch.Write(); err != nil {
return err
@ -403,13 +472,13 @@ func (db *Database) Enable(root common.Hash) error {
return err
}
}
// Re-construct a new disk layer backed by persistent state
// with **empty clean cache and node buffer**.
db.tree.reset(newDiskLayer(root, 0, db, nil, newBuffer(db.config.WriteBufferSize, nil, nil, 0)))
// Re-enable the database as the final step.
db.waitSync = false
rawdb.WriteSnapSyncStatusFlag(db.diskdb, rawdb.StateSyncFinished)
// Re-construct a new disk layer backed by persistent state
// and schedule the state snapshot generation if it's permitted.
db.tree.reset(generateSnapshot(db, root))
log.Info("Rebuilt trie database", "root", root)
return nil
}
@ -505,8 +574,12 @@ func (db *Database) Close() error {
// following mutations.
db.readOnly = true
// Release the memory held by clean cache.
db.tree.bottom().resetCache()
// Terminate the background generation if it's active
disk := db.tree.bottom()
if disk.generator != nil {
disk.generator.stop()
}
disk.resetCache() // release the memory held by clean cache
// Close the attached state history freezer.
if db.freezer == nil {
@ -586,14 +659,30 @@ func (db *Database) HistoryRange() (uint64, uint64, error) {
return historyRange(db.freezer)
}
// waitGeneration waits until the background generation is finished. It assumes
// that the generation is permitted; otherwise, it will block indefinitely.
func (db *Database) waitGeneration() {
gen := db.tree.bottom().generator
if gen == nil || gen.completed() {
return
}
<-gen.done
}
// AccountIterator creates a new account iterator for the specified root hash and
// seeks to a starting account hash.
func (db *Database) AccountIterator(root common.Hash, seek common.Hash) (AccountIterator, error) {
if gen := db.tree.bottom().generator; gen != nil && !gen.completed() {
return nil, errNotConstructed
}
return newFastAccountIterator(db, root, seek)
}
// StorageIterator creates a new storage iterator for the specified root hash and
// account. The iterator will be moved to the specific start position.
func (db *Database) StorageIterator(root common.Hash, account common.Hash, seek common.Hash) (StorageIterator, error) {
if gen := db.tree.bottom().generator; gen != nil && !gen.completed() {
return nil, errNotConstructed
}
return newFastStorageIterator(db, root, account, seek)
}

View File

@ -108,7 +108,8 @@ func newTester(t *testing.T, historyLimit uint64) *tester {
disk, _ = rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), t.TempDir(), "", false)
db = New(disk, &Config{
StateHistory: historyLimit,
CleanCacheSize: 16 * 1024,
TrieCleanSize: 16 * 1024,
StateCleanSize: 16 * 1024,
WriteBufferSize: 16 * 1024,
}, false)
obj = &tester{

View File

@ -17,7 +17,7 @@
package pathdb
import (
"errors"
"bytes"
"fmt"
"sync"
@ -34,24 +34,32 @@ type diskLayer struct {
id uint64 // Immutable, corresponding state id
db *Database // Path-based trie database
nodes *fastcache.Cache // GC friendly memory cache of clean nodes
states *fastcache.Cache // GC friendly memory cache of clean states
buffer *buffer // Dirty buffer to aggregate writes of nodes and states
stale bool // Signals that the layer became stale (state progressed)
lock sync.RWMutex // Lock used to protect stale flag
lock sync.RWMutex // Lock used to protect stale flag and genMarker
// The generator is set if the state snapshot was not fully completed
generator *generator
}
// newDiskLayer creates a new disk layer based on the passing arguments.
func newDiskLayer(root common.Hash, id uint64, db *Database, nodes *fastcache.Cache, buffer *buffer) *diskLayer {
// Initialize a clean cache if the memory allowance is not zero
// or reuse the provided cache if it is not nil (inherited from
func newDiskLayer(root common.Hash, id uint64, db *Database, nodes *fastcache.Cache, states *fastcache.Cache, buffer *buffer) *diskLayer {
// Initialize the clean caches if the memory allowance is not zero
// or reuse the provided caches if they are not nil (inherited from
// the original disk layer).
if nodes == nil && db.config.CleanCacheSize != 0 {
nodes = fastcache.New(db.config.CleanCacheSize)
if nodes == nil && db.config.TrieCleanSize != 0 {
nodes = fastcache.New(db.config.TrieCleanSize)
}
if states == nil && db.config.StateCleanSize != 0 {
states = fastcache.New(db.config.StateCleanSize)
}
return &diskLayer{
root: root,
id: id,
db: db,
nodes: nodes,
states: states,
buffer: buffer,
}
}
@ -72,6 +80,13 @@ func (dl *diskLayer) parentLayer() layer {
return nil
}
// setGenerator links the given generator to disk layer, representing the
// associated state snapshot is not fully completed yet and the generation
// is potentially running in the background.
func (dl *diskLayer) setGenerator(generator *generator) {
dl.generator = generator
}
// isStale return whether this layer has become stale (was flattened across) or if
// it's still live.
func (dl *diskLayer) isStale() bool {
@ -171,8 +186,41 @@ func (dl *diskLayer) account(hash common.Hash, depth int) ([]byte, error) {
}
dirtyStateMissMeter.Mark(1)
// TODO(rjl493456442) support persistent state retrieval
return nil, errors.New("not supported")
// If the layer is being generated, ensure the requested account has
// already been covered by the generator.
marker := dl.genMarker()
if marker != nil && bytes.Compare(hash.Bytes(), marker) > 0 {
return nil, errNotCoveredYet
}
// Try to retrieve the account from the memory cache
if dl.states != nil {
if blob, found := dl.states.HasGet(nil, hash[:]); found {
cleanStateHitMeter.Mark(1)
cleanStateReadMeter.Mark(int64(len(blob)))
if len(blob) == 0 {
stateAccountInexMeter.Mark(1)
} else {
stateAccountExistMeter.Mark(1)
}
return blob, nil
}
cleanStateMissMeter.Mark(1)
}
// Try to retrieve the account from the disk.
blob = rawdb.ReadAccountSnapshot(dl.db.diskdb, hash)
if dl.states != nil {
dl.states.Set(hash[:], blob)
cleanStateWriteMeter.Mark(int64(len(blob)))
}
if len(blob) == 0 {
stateAccountInexMeter.Mark(1)
stateAccountInexDiskMeter.Mark(1)
} else {
stateAccountExistMeter.Mark(1)
stateAccountExistDiskMeter.Mark(1)
}
return blob, nil
}
// storage directly retrieves the storage data associated with a particular hash,
@ -206,8 +254,42 @@ func (dl *diskLayer) storage(accountHash, storageHash common.Hash, depth int) ([
}
dirtyStateMissMeter.Mark(1)
// TODO(rjl493456442) support persistent state retrieval
return nil, errors.New("not supported")
// If the layer is being generated, ensure the requested storage slot
// has already been covered by the generator.
key := append(accountHash[:], storageHash[:]...)
marker := dl.genMarker()
if marker != nil && bytes.Compare(key, marker) > 0 {
return nil, errNotCoveredYet
}
// Try to retrieve the storage slot from the memory cache
if dl.states != nil {
if blob, found := dl.states.HasGet(nil, key); found {
cleanStateHitMeter.Mark(1)
cleanStateReadMeter.Mark(int64(len(blob)))
if len(blob) == 0 {
stateStorageInexMeter.Mark(1)
} else {
stateStorageExistMeter.Mark(1)
}
return blob, nil
}
cleanStateMissMeter.Mark(1)
}
// Try to retrieve the account from the disk
blob := rawdb.ReadStorageSnapshot(dl.db.diskdb, accountHash, storageHash)
if dl.states != nil {
dl.states.Set(key, blob)
cleanStateWriteMeter.Mark(int64(len(blob)))
}
if len(blob) == 0 {
stateStorageInexMeter.Mark(1)
stateStorageInexDiskMeter.Mark(1)
} else {
stateStorageExistMeter.Mark(1)
stateStorageExistDiskMeter.Mark(1)
}
return blob, nil
}
// update implements the layer interface, returning a new diff layer on top
@ -268,13 +350,39 @@ func (dl *diskLayer) commit(bottom *diffLayer, force bool) (*diskLayer, error) {
// Merge the trie nodes and flat states of the bottom-most diff layer into the
// buffer as the combined layer.
combined := dl.buffer.commit(bottom.nodes, bottom.states.stateSet)
// Terminate the background state snapshot generation before mutating the
// persistent state.
if combined.full() || force {
if err := combined.flush(dl.db.diskdb, dl.db.freezer, dl.nodes, bottom.stateID()); err != nil {
// Terminate the background state snapshot generator before flushing
// to prevent data race.
var progress []byte
if dl.generator != nil {
dl.generator.stop()
progress = dl.generator.progressMarker()
log.Info("Terminated state snapshot generation")
// If the snapshot has been fully generated, unset the generator
if progress == nil {
dl.setGenerator(nil)
}
}
// Flush the content in combined buffer. Any state data after the progress
// marker will be ignored, as the generator will pick it up later.
if err := combined.flush(bottom.root, dl.db.diskdb, dl.db.freezer, progress, dl.nodes, dl.states, bottom.stateID()); err != nil {
return nil, err
}
// Resume the background generation if it's not completed yet
if progress != nil {
dl.generator.run(bottom.root)
log.Info("Resumed state snapshot generation", "root", bottom.root)
}
}
// Link the generator if snapshot is not yet completed
ndl := newDiskLayer(bottom.root, bottom.stateID(), dl.db, dl.nodes, dl.states, combined)
if dl.generator != nil {
ndl.setGenerator(dl.generator)
}
ndl := newDiskLayer(bottom.root, bottom.stateID(), dl.db, dl.nodes, combined)
// To remove outdated history objects from the end, we set the 'tail' parameter
// to 'oldest-1' due to the offset between the freezer index and the history ID.
if overflow {
@ -336,15 +444,39 @@ func (dl *diskLayer) revert(h *history) (*diskLayer, error) {
if err != nil {
return nil, err
}
} else {
ndl := newDiskLayer(h.meta.parent, dl.id-1, dl.db, dl.nodes, dl.states, dl.buffer)
// Link the generator if it exists
if dl.generator != nil {
ndl.setGenerator(dl.generator)
}
return ndl, nil
}
// Terminate the generation before writing any data into database
var progress []byte
if dl.generator != nil {
dl.generator.stop()
progress = dl.generator.progressMarker()
}
batch := dl.db.diskdb.NewBatch()
writeNodes(batch, nodes, dl.nodes)
// Provide the original values of modified accounts and storages for revert
writeStates(batch, progress, accounts, storages, dl.states)
rawdb.WritePersistentStateID(batch, dl.id-1)
rawdb.WriteSnapshotRoot(batch, h.meta.parent)
if err := batch.Write(); err != nil {
log.Crit("Failed to write states", "err", err)
}
// Link the generator and resume generation if the snapshot is not yet
// fully completed.
ndl := newDiskLayer(h.meta.parent, dl.id-1, dl.db, dl.nodes, dl.states, dl.buffer)
if dl.generator != nil && !dl.generator.completed() {
ndl.generator = dl.generator
ndl.generator.run(h.meta.parent)
log.Info("Resumed state snapshot generation", "root", h.meta.parent)
}
return newDiskLayer(h.meta.parent, dl.id-1, dl.db, dl.nodes, dl.buffer), nil
return ndl, nil
}
// size returns the approximate size of cached nodes in the disk layer.
@ -370,6 +502,18 @@ func (dl *diskLayer) resetCache() {
if dl.nodes != nil {
dl.nodes.Reset()
}
if dl.states != nil {
dl.states.Reset()
}
}
// genMarker returns the current state snapshot generation progress marker. If
// the state snapshot has already been fully generated, nil is returned.
func (dl *diskLayer) genMarker() []byte {
if dl.generator == nil {
return nil
}
return dl.generator.progressMarker()
}
// hasher is used to compute the sha256 hash of the provided data.

View File

@ -39,4 +39,13 @@ var (
// errStateUnrecoverable is returned if state is required to be reverted to
// a destination without associated state history available.
errStateUnrecoverable = errors.New("state is unrecoverable")
// errNotCoveredYet is returned from data accessors if the underlying snapshot
// is being generated currently and the requested data item is not yet in the
// range of accounts covered.
errNotCoveredYet = errors.New("not covered yet")
// errNotConstructed is returned if the callers want to iterate the snapshot
// while the generation is not finished yet.
errNotConstructed = errors.New("state snapshot is not constructed")
)

View File

@ -17,6 +17,8 @@
package pathdb
import (
"bytes"
"github.com/VictoriaMetrics/fastcache"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
@ -63,3 +65,67 @@ func writeNodes(batch ethdb.Batch, nodes map[common.Hash]map[string]*trienode.No
}
return total
}
// writeStates flushes state mutations into the provided database batch as a whole.
//
// This function assumes the background generator is already terminated and states
// before the supplied marker has been correctly generated.
//
// TODO(rjl493456442) do we really need this generation marker? The state updates
// after the marker can also be written and will be fixed by generator later if
// it's outdated.
func writeStates(batch ethdb.Batch, genMarker []byte, accountData map[common.Hash][]byte, storageData map[common.Hash]map[common.Hash][]byte, clean *fastcache.Cache) (int, int) {
var (
accounts int
slots int
)
for addrHash, blob := range accountData {
// Skip any account not covered yet by the snapshot. The account
// at generation position (addrHash == genMarker[:common.HashLength])
// should be updated.
if genMarker != nil && bytes.Compare(addrHash[:], genMarker) > 0 {
continue
}
accounts += 1
if len(blob) == 0 {
rawdb.DeleteAccountSnapshot(batch, addrHash)
if clean != nil {
clean.Set(addrHash[:], nil)
}
} else {
rawdb.WriteAccountSnapshot(batch, addrHash, blob)
if clean != nil {
clean.Set(addrHash[:], blob)
}
}
}
for addrHash, storages := range storageData {
// Skip any account not covered yet by the snapshot
if genMarker != nil && bytes.Compare(addrHash[:], genMarker) > 0 {
continue
}
midAccount := genMarker != nil && bytes.Equal(addrHash[:], genMarker[:common.HashLength])
for storageHash, blob := range storages {
// Skip any slot not covered yet by the snapshot. The storage slot
// at generation position (addrHash == genMarker[:common.HashLength]
// and storageHash == genMarker[common.HashLength:]) should be updated.
if midAccount && bytes.Compare(storageHash[:], genMarker[common.HashLength:]) > 0 {
continue
}
slots += 1
if len(blob) == 0 {
rawdb.DeleteStorageSnapshot(batch, addrHash, storageHash)
if clean != nil {
clean.Set(append(addrHash[:], storageHash[:]...), nil)
}
} else {
rawdb.WriteStorageSnapshot(batch, addrHash, storageHash, blob)
if clean != nil {
clean.Set(append(addrHash[:], storageHash[:]...), blob)
}
}
}
}
return accounts, slots
}

853
triedb/pathdb/generate.go Normal file
View File

@ -0,0 +1,853 @@
// Copyright 2019 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package pathdb
import (
"bytes"
"errors"
"fmt"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie"
"github.com/ethereum/go-ethereum/triedb/database"
)
var (
// accountCheckRange is the upper limit of the number of accounts involved in
// each range check. This is a value estimated based on experience. If this
// range is too large, the failure rate of range proof will increase. Otherwise,
// if the range is too small, the efficiency of the state recovery will decrease.
accountCheckRange = 128
// storageCheckRange is the upper limit of the number of storage slots involved
// in each range check. This is a value estimated based on experience. If this
// range is too large, the failure rate of range proof will increase. Otherwise,
// if the range is too small, the efficiency of the state recovery will decrease.
storageCheckRange = 1024
// errMissingTrie is returned if the target trie is missing while the generation
// is running. In this case the generation is aborted and wait the new signal.
errMissingTrie = errors.New("missing trie")
)
// diskReader is a wrapper of key-value store and implements database.NodeReader,
// providing a function for accessing persistent trie nodes in the disk
type diskReader struct{ db ethdb.KeyValueStore }
// Node retrieves the trie node blob with the provided trie identifier,
// node path and the corresponding node hash. No error will be returned
// if the node is not found.
func (r *diskReader) Node(owner common.Hash, path []byte, hash common.Hash) ([]byte, error) {
if owner == (common.Hash{}) {
return rawdb.ReadAccountTrieNode(r.db, path), nil
}
return rawdb.ReadStorageTrieNode(r.db, owner, path), nil
}
// diskStore is a wrapper of key-value store and implements database.NodeDatabase.
// It's meant to be used for generating state snapshot from the trie data.
type diskStore struct {
db ethdb.KeyValueStore
}
// NodeReader returns a node reader associated with the specific state.
// An error will be returned if the specified state is not available.
func (s *diskStore) NodeReader(stateRoot common.Hash) (database.NodeReader, error) {
root := types.EmptyRootHash
if blob := rawdb.ReadAccountTrieNode(s.db, nil); len(blob) > 0 {
root = crypto.Keccak256Hash(blob)
}
if root != stateRoot {
return nil, fmt.Errorf("state %x is not available", stateRoot)
}
return &diskReader{s.db}, nil
}
// Generator is the struct for initial state snapshot generation. It is not thread-safe;
// the caller must manage concurrency issues themselves.
type generator struct {
noBuild bool // Flag indicating whether snapshot generation is permitted
running bool // Flag indicating whether the background generation is running
db ethdb.KeyValueStore // Key-value store containing the snapshot data
stats *generatorStats // Generation statistics used throughout the entire life cycle
abort chan chan struct{} // Notification channel to abort generating the snapshot in this layer
done chan struct{} // Notification channel when generation is done
progress []byte // Progress marker of the state generation, nil means it's completed
lock sync.RWMutex // Lock which protects the progress, only generator can mutate the progress
}
// newGenerator constructs the state snapshot generator.
//
// noBuild will be true if the background snapshot generation is not allowed,
// usually used in read-only mode.
//
// progress indicates the starting position for resuming snapshot generation.
// It must be provided even if generation is not allowed; otherwise, uncovered
// states may be exposed for serving.
func newGenerator(db ethdb.KeyValueStore, noBuild bool, progress []byte, stats *generatorStats) *generator {
if stats == nil {
stats = &generatorStats{start: time.Now()}
}
return &generator{
noBuild: noBuild,
progress: progress,
db: db,
stats: stats,
abort: make(chan chan struct{}),
done: make(chan struct{}),
}
}
// run starts the state snapshot generation in the background.
func (g *generator) run(root common.Hash) {
if g.noBuild {
log.Warn("Snapshot generation is not permitted")
return
}
if g.running {
g.stop()
log.Warn("Terminated the leftover generation cycle")
}
g.running = true
go g.generate(newGeneratorContext(root, g.progress, g.db))
}
// stop terminates the background generation if it's actively running.
// The Recent generation progress being made will be saved before returning.
func (g *generator) stop() {
if !g.running {
log.Debug("Snapshot generation is not running")
return
}
ch := make(chan struct{})
g.abort <- ch
<-ch
g.running = false
}
// completed returns the flag indicating if the whole generation is done.
func (g *generator) completed() bool {
progress := g.progressMarker()
return progress == nil
}
// progressMarker returns the current generation progress marker. It may slightly
// lag behind the actual generation position, as the progress field is only updated
// when checkAndFlush is called. The only effect is that some generated states
// may be refused for serving.
func (g *generator) progressMarker() []byte {
g.lock.RLock()
defer g.lock.RUnlock()
return g.progress
}
// splitMarker is an internal helper which splits the generation progress marker
// into two parts.
func splitMarker(marker []byte) ([]byte, []byte) {
var accMarker []byte
if len(marker) > 0 { // []byte{} is the start, use nil for that
accMarker = marker[:common.HashLength]
}
return accMarker, marker
}
// generateSnapshot regenerates a brand-new snapshot based on an existing state
// database and head block asynchronously. The snapshot is returned immediately
// and generation is continued in the background until done.
func generateSnapshot(triedb *Database, root common.Hash) *diskLayer {
// Create a new disk layer with an initialized state marker at zero
var (
stats = &generatorStats{start: time.Now()}
genMarker = []byte{} // Initialized but empty!
)
dl := newDiskLayer(root, 0, triedb, nil, nil, newBuffer(triedb.config.WriteBufferSize, nil, nil, 0))
dl.setGenerator(newGenerator(triedb.diskdb, false, genMarker, stats))
dl.generator.run(root)
log.Info("Started snapshot generation", "root", root)
return dl
}
// journalProgress persists the generator stats into the database to resume later.
func journalProgress(db ethdb.KeyValueWriter, marker []byte, stats *generatorStats) {
// Write out the generator marker. Note it's a standalone disk layer generator
// which is not mixed with journal. It's ok if the generator is persisted while
// journal is not.
entry := journalGenerator{
Done: marker == nil,
Marker: marker,
}
if stats != nil {
entry.Accounts = stats.accounts
entry.Slots = stats.slots
entry.Storage = uint64(stats.storage)
}
blob, err := rlp.EncodeToBytes(entry)
if err != nil {
panic(err) // Cannot happen, here to catch dev errors
}
var logstr string
switch {
case marker == nil:
logstr = "done"
case bytes.Equal(marker, []byte{}):
logstr = "empty"
case len(marker) == common.HashLength:
logstr = fmt.Sprintf("%#x", marker)
default:
logstr = fmt.Sprintf("%#x:%#x", marker[:common.HashLength], marker[common.HashLength:])
}
log.Debug("Journalled generator progress", "progress", logstr)
rawdb.WriteSnapshotGenerator(db, blob)
}
// proofResult contains the output of range proving which can be used
// for further processing regardless if it is successful or not.
type proofResult struct {
keys [][]byte // The key set of all elements being iterated, even proving is failed
vals [][]byte // The val set of all elements being iterated, even proving is failed
diskMore bool // Set when the database has extra snapshot states since last iteration
trieMore bool // Set when the trie has extra snapshot states(only meaningful for successful proving)
proofErr error // Indicator whether the given state range is valid or not
tr *trie.Trie // The trie, in case the trie was resolved by the prover (may be nil)
}
// valid returns the indicator that range proof is successful or not.
func (result *proofResult) valid() bool {
return result.proofErr == nil
}
// last returns the last verified element key regardless of whether the range proof is
// successful or not. Nil is returned if nothing involved in the proving.
func (result *proofResult) last() []byte {
var last []byte
if len(result.keys) > 0 {
last = result.keys[len(result.keys)-1]
}
return last
}
// forEach iterates all the visited elements and applies the given callback on them.
// The iteration is aborted if the callback returns non-nil error.
func (result *proofResult) forEach(callback func(key []byte, val []byte) error) error {
for i := 0; i < len(result.keys); i++ {
key, val := result.keys[i], result.vals[i]
if err := callback(key, val); err != nil {
return err
}
}
return nil
}
// proveRange proves the snapshot segment with particular prefix is "valid".
// The iteration start point will be assigned if the iterator is restored from
// the last interruption. Max will be assigned in order to limit the maximum
// amount of data involved in each iteration.
//
// The proof result will be returned if the range proving is finished, otherwise
// the error will be returned to abort the entire procedure.
func (g *generator) proveRange(ctx *generatorContext, trieId *trie.ID, prefix []byte, kind string, origin []byte, max int, valueConvertFn func([]byte) ([]byte, error)) (*proofResult, error) {
var (
keys [][]byte
vals [][]byte
proof = rawdb.NewMemoryDatabase()
diskMore = false
iter = ctx.iterator(kind)
start = time.Now()
min = append(prefix, origin...)
)
for iter.Next() {
// Ensure the iterated item is always equal or larger than the given origin.
key := iter.Key()
if bytes.Compare(key, min) < 0 {
return nil, errors.New("invalid iteration position")
}
// Ensure the iterated item still fall in the specified prefix. If
// not which means the items in the specified area are all visited.
// Move the iterator a step back since we iterate one extra element
// out.
if !bytes.Equal(key[:len(prefix)], prefix) {
iter.Hold()
break
}
// Break if we've reached the max size, and signal that we're not
// done yet. Move the iterator a step back since we iterate one
// extra element out.
if len(keys) == max {
iter.Hold()
diskMore = true
break
}
keys = append(keys, common.CopyBytes(key[len(prefix):]))
if valueConvertFn == nil {
vals = append(vals, common.CopyBytes(iter.Value()))
} else {
val, err := valueConvertFn(iter.Value())
if err != nil {
// Special case, the state data is corrupted (invalid slim-format account),
// don't abort the entire procedure directly. Instead, let the fallback
// generation to heal the invalid data.
//
// Here append the original value to ensure that the number of key and
// value are aligned.
vals = append(vals, common.CopyBytes(iter.Value()))
log.Error("Failed to convert account state data", "err", err)
} else {
vals = append(vals, val)
}
}
}
// Update metrics for database iteration and merkle proving
if kind == snapStorage {
storageSnapReadCounter.Inc(time.Since(start).Nanoseconds())
} else {
accountSnapReadCounter.Inc(time.Since(start).Nanoseconds())
}
defer func(start time.Time) {
if kind == snapStorage {
storageProveCounter.Inc(time.Since(start).Nanoseconds())
} else {
accountProveCounter.Inc(time.Since(start).Nanoseconds())
}
}(time.Now())
// The snap state is exhausted, pass the entire key/val set for verification
root := trieId.Root
if origin == nil && !diskMore {
stackTr := trie.NewStackTrie(nil)
for i, key := range keys {
if err := stackTr.Update(key, vals[i]); err != nil {
return nil, err
}
}
if gotRoot := stackTr.Hash(); gotRoot != root {
return &proofResult{
keys: keys,
vals: vals,
proofErr: fmt.Errorf("wrong root: have %#x want %#x", gotRoot, root),
}, nil
}
return &proofResult{keys: keys, vals: vals}, nil
}
// Snap state is chunked, generate edge proofs for verification.
tr, err := trie.New(trieId, &diskStore{db: g.db})
if err != nil {
log.Info("Trie missing, state snapshotting paused", "state", ctx.root, "kind", kind, "root", trieId.Root)
return nil, errMissingTrie
}
// Generate the Merkle proofs for the first and last element
if origin == nil {
origin = common.Hash{}.Bytes()
}
if err := tr.Prove(origin, proof); err != nil {
log.Debug("Failed to prove range", "kind", kind, "origin", origin, "err", err)
return &proofResult{
keys: keys,
vals: vals,
diskMore: diskMore,
proofErr: err,
tr: tr,
}, nil
}
if len(keys) > 0 {
if err := tr.Prove(keys[len(keys)-1], proof); err != nil {
log.Debug("Failed to prove range", "kind", kind, "last", keys[len(keys)-1], "err", err)
return &proofResult{
keys: keys,
vals: vals,
diskMore: diskMore,
proofErr: err,
tr: tr,
}, nil
}
}
// Verify the snapshot segment with range prover, ensure that all flat states
// in this range correspond to merkle trie.
cont, err := trie.VerifyRangeProof(root, origin, keys, vals, proof)
return &proofResult{
keys: keys,
vals: vals,
diskMore: diskMore,
trieMore: cont,
proofErr: err,
tr: tr},
nil
}
// onStateCallback is a function that is called by generateRange, when processing a range of
// accounts or storage slots. For each element, the callback is invoked.
//
// - If 'delete' is true, then this element (and potential slots) needs to be deleted from the snapshot.
// - If 'write' is true, then this element needs to be updated with the 'val'.
// - If 'write' is false, then this element is already correct, and needs no update.
// The 'val' is the canonical encoding of the value (not the slim format for accounts)
//
// However, for accounts, the storage trie of the account needs to be checked. Also,
// dangling storages(storage exists but the corresponding account is missing) need to
// be cleaned up.
type onStateCallback func(key []byte, val []byte, write bool, delete bool) error
// generateRange generates the state segment with particular prefix. Generation can
// either verify the correctness of existing state through range-proof and skip
// generation, or iterate trie to regenerate state on demand.
func (g *generator) generateRange(ctx *generatorContext, trieId *trie.ID, prefix []byte, kind string, origin []byte, max int, onState onStateCallback, valueConvertFn func([]byte) ([]byte, error)) (bool, []byte, error) {
// Use range prover to check the validity of the flat state in the range
result, err := g.proveRange(ctx, trieId, prefix, kind, origin, max, valueConvertFn)
if err != nil {
return false, nil, err
}
last := result.last()
// Construct contextual logger
logCtx := []interface{}{"kind", kind, "prefix", hexutil.Encode(prefix)}
if len(origin) > 0 {
logCtx = append(logCtx, "origin", hexutil.Encode(origin))
}
logger := log.New(logCtx...)
// The range prover says the range is correct, skip trie iteration
if result.valid() {
successfulRangeProofMeter.Mark(1)
logger.Trace("Proved state range", "last", hexutil.Encode(last))
// The verification is passed, process each state with the given
// callback function. If this state represents a contract, the
// corresponding storage check will be performed in the callback
if err := result.forEach(func(key []byte, val []byte) error { return onState(key, val, false, false) }); err != nil {
return false, nil, err
}
// Only abort the iteration when both database and trie are exhausted
return !result.diskMore && !result.trieMore, last, nil
}
logger.Trace("Detected outdated state range", "last", hexutil.Encode(last), "err", result.proofErr)
failedRangeProofMeter.Mark(1)
// Special case, the entire trie is missing. In the original trie scheme,
// all the duplicated subtries will be filtered out (only one copy of data
// will be stored). While in the snapshot model, all the storage tries
// belong to different contracts will be kept even they are duplicated.
// Track it to a certain extent remove the noise data used for statistics.
if origin == nil && last == nil {
meter := missallAccountMeter
if kind == snapStorage {
meter = missallStorageMeter
}
meter.Mark(1)
}
// We use the snap data to build up a cache which can be used by the
// main account trie as a primary lookup when resolving hashes
var resolver trie.NodeResolver
if len(result.keys) > 0 {
tr := trie.NewEmpty(nil)
for i, key := range result.keys {
tr.Update(key, result.vals[i])
}
_, nodes := tr.Commit(false)
hashSet := nodes.HashSet()
resolver = func(owner common.Hash, path []byte, hash common.Hash) []byte {
return hashSet[hash]
}
}
// Construct the trie for state iteration, reuse the trie
// if it's already opened with some nodes resolved.
tr := result.tr
if tr == nil {
tr, err = trie.New(trieId, &diskStore{db: g.db})
if err != nil {
log.Info("Trie missing, state snapshotting paused", "state", ctx.root, "kind", kind, "root", trieId.Root)
return false, nil, errMissingTrie
}
}
var (
trieMore bool
kvkeys, kvvals = result.keys, result.vals
// counters
count = 0 // number of states delivered by iterator
created = 0 // states created from the trie
updated = 0 // states updated from the trie
deleted = 0 // states not in trie, but were in snapshot
untouched = 0 // states already correct
// timers
start = time.Now()
internal time.Duration
)
nodeIt, err := tr.NodeIterator(origin)
if err != nil {
return false, nil, err
}
nodeIt.AddResolver(resolver)
iter := trie.NewIterator(nodeIt)
for iter.Next() {
if last != nil && bytes.Compare(iter.Key, last) > 0 {
trieMore = true
break
}
count++
write := true
created++
for len(kvkeys) > 0 {
if cmp := bytes.Compare(kvkeys[0], iter.Key); cmp < 0 {
// delete the key
istart := time.Now()
if err := onState(kvkeys[0], nil, false, true); err != nil {
return false, nil, err
}
kvkeys = kvkeys[1:]
kvvals = kvvals[1:]
deleted++
internal += time.Since(istart)
continue
} else if cmp == 0 {
// the snapshot key can be overwritten
created--
if write = !bytes.Equal(kvvals[0], iter.Value); write {
updated++
} else {
untouched++
}
kvkeys = kvkeys[1:]
kvvals = kvvals[1:]
}
break
}
istart := time.Now()
if err := onState(iter.Key, iter.Value, write, false); err != nil {
return false, nil, err
}
internal += time.Since(istart)
}
if iter.Err != nil {
// Trie errors should never happen. Still, in case of a bug, expose the
// error here, as the outer code will presume errors are interrupts, not
// some deeper issues.
log.Error("State snapshotter failed to iterate trie", "err", iter.Err)
return false, nil, iter.Err
}
// Delete all stale snapshot states remaining
istart := time.Now()
for _, key := range kvkeys {
if err := onState(key, nil, false, true); err != nil {
return false, nil, err
}
deleted += 1
}
internal += time.Since(istart)
// Update metrics for counting trie iteration
if kind == snapStorage {
storageTrieReadCounter.Inc((time.Since(start) - internal).Nanoseconds())
} else {
accountTrieReadCounter.Inc((time.Since(start) - internal).Nanoseconds())
}
logger.Debug("Regenerated state range", "root", trieId.Root, "last", hexutil.Encode(last),
"count", count, "created", created, "updated", updated, "untouched", untouched, "deleted", deleted)
// If there are either more trie items, or there are more snap items
// (in the next segment), then we need to keep working
return !trieMore && !result.diskMore, last, nil
}
// checkAndFlush checks if an interruption signal is received or the
// batch size has exceeded the allowance.
func (g *generator) checkAndFlush(ctx *generatorContext, current []byte) error {
var abort chan struct{}
select {
case abort = <-g.abort:
default:
}
if ctx.batch.ValueSize() > ethdb.IdealBatchSize || abort != nil {
if bytes.Compare(current, g.progress) < 0 {
log.Error("Snapshot generator went backwards", "current", fmt.Sprintf("%x", current), "genMarker", fmt.Sprintf("%x", g.progress))
}
// Persist the progress marker regardless of whether the batch is empty or not.
// It may happen that all the flat states in the database are correct, so the
// generator indeed makes progress even if there is nothing to commit.
journalProgress(ctx.batch, current, g.stats)
// Flush out the database writes atomically
if err := ctx.batch.Write(); err != nil {
return err
}
ctx.batch.Reset()
// Update the generation progress marker
g.lock.Lock()
g.progress = current
g.lock.Unlock()
// Abort the generation if it's required
if abort != nil {
g.stats.log("Aborting state snapshot generation", ctx.root, g.progress)
return newAbortErr(abort) // bubble up an error for interruption
}
// Don't hold the iterators too long, release them to let compactor works
ctx.reopenIterator(snapAccount)
ctx.reopenIterator(snapStorage)
}
if time.Since(ctx.logged) > 8*time.Second {
g.stats.log("Generating state snapshot", ctx.root, g.progress)
ctx.logged = time.Now()
}
return nil
}
// generateStorages generates the missing storage slots of the specific contract.
// It's supposed to restart the generation from the given origin position.
func (g *generator) generateStorages(ctx *generatorContext, account common.Hash, storageRoot common.Hash, storeMarker []byte) error {
onStorage := func(key []byte, val []byte, write bool, delete bool) error {
defer func(start time.Time) {
storageWriteCounter.Inc(time.Since(start).Nanoseconds())
}(time.Now())
if delete {
rawdb.DeleteStorageSnapshot(ctx.batch, account, common.BytesToHash(key))
wipedStorageMeter.Mark(1)
return nil
}
if write {
rawdb.WriteStorageSnapshot(ctx.batch, account, common.BytesToHash(key), val)
generatedStorageMeter.Mark(1)
} else {
recoveredStorageMeter.Mark(1)
}
g.stats.storage += common.StorageSize(1 + 2*common.HashLength + len(val))
g.stats.slots++
// If we've exceeded our batch allowance or termination was requested, flush to disk
if err := g.checkAndFlush(ctx, append(account[:], key...)); err != nil {
return err
}
return nil
}
// Loop for re-generating the missing storage slots.
var origin = common.CopyBytes(storeMarker)
for {
id := trie.StorageTrieID(ctx.root, account, storageRoot)
exhausted, last, err := g.generateRange(ctx, id, append(rawdb.SnapshotStoragePrefix, account.Bytes()...), snapStorage, origin, storageCheckRange, onStorage, nil)
if err != nil {
return err // The procedure it aborted, either by external signal or internal error.
}
// Abort the procedure if the entire contract storage is generated
if exhausted {
break
}
if origin = increaseKey(last); origin == nil {
break // special case, the last is 0xffffffff...fff
}
}
return nil
}
// generateAccounts generates the missing snapshot accounts as well as their
// storage slots in the main trie. It's supposed to restart the generation
// from the given origin position.
func (g *generator) generateAccounts(ctx *generatorContext, accMarker []byte) error {
onAccount := func(key []byte, val []byte, write bool, delete bool) error {
// Make sure to clear all dangling storages before this account
account := common.BytesToHash(key)
g.stats.dangling += ctx.removeStorageBefore(account)
start := time.Now()
if delete {
rawdb.DeleteAccountSnapshot(ctx.batch, account)
wipedAccountMeter.Mark(1)
accountWriteCounter.Inc(time.Since(start).Nanoseconds())
ctx.removeStorageAt(account)
return nil
}
// Retrieve the current account and flatten it into the internal format
var acc types.StateAccount
if err := rlp.DecodeBytes(val, &acc); err != nil {
log.Crit("Invalid account encountered during snapshot creation", "err", err)
}
// If the account is not yet in-progress, write it out
if accMarker == nil || !bytes.Equal(account[:], accMarker) {
dataLen := len(val) // Approximate size, saves us a round of RLP-encoding
if !write {
if bytes.Equal(acc.CodeHash, types.EmptyCodeHash[:]) {
dataLen -= 32
}
if acc.Root == types.EmptyRootHash {
dataLen -= 32
}
recoveredAccountMeter.Mark(1)
} else {
data := types.SlimAccountRLP(acc)
dataLen = len(data)
rawdb.WriteAccountSnapshot(ctx.batch, account, data)
generatedAccountMeter.Mark(1)
}
g.stats.storage += common.StorageSize(1 + common.HashLength + dataLen)
g.stats.accounts++
}
// If the snap generation goes here after interrupted, genMarker may go backward
// when last genMarker is consisted of accountHash and storageHash
marker := account[:]
if accMarker != nil && bytes.Equal(marker, accMarker) && len(g.progress) > common.HashLength {
marker = g.progress
}
// If we've exceeded our batch allowance or termination was requested, flush to disk
if err := g.checkAndFlush(ctx, marker); err != nil {
return err
}
accountWriteCounter.Inc(time.Since(start).Nanoseconds()) // let's count flush time as well
// If the iterated account is the contract, create a further loop to
// verify or regenerate the contract storage.
if acc.Root == types.EmptyRootHash {
ctx.removeStorageAt(account)
} else {
var storeMarker []byte
if accMarker != nil && bytes.Equal(account[:], accMarker) && len(g.progress) > common.HashLength {
storeMarker = g.progress[common.HashLength:]
}
if err := g.generateStorages(ctx, account, acc.Root, storeMarker); err != nil {
return err
}
}
// Some account processed, unmark the marker
accMarker = nil
return nil
}
origin := common.CopyBytes(accMarker)
for {
id := trie.StateTrieID(ctx.root)
exhausted, last, err := g.generateRange(ctx, id, rawdb.SnapshotAccountPrefix, snapAccount, origin, accountCheckRange, onAccount, types.FullAccountRLP)
if err != nil {
return err // The procedure it aborted, either by external signal or internal error.
}
origin = increaseKey(last)
// Last step, cleanup the storages after the last account.
// All the left storages should be treated as dangling.
if origin == nil || exhausted {
g.stats.dangling += ctx.removeStorageLeft()
break
}
}
return nil
}
// generate is a background thread that iterates over the state and storage tries,
// constructing the state snapshot. All the arguments are purely for statistics
// gathering and logging, since the method surfs the blocks as they arrive, often
// being restarted.
func (g *generator) generate(ctx *generatorContext) {
g.stats.log("Resuming state snapshot generation", ctx.root, g.progress)
defer ctx.close()
// Persist the initial marker and state snapshot root if progress is none
if len(g.progress) == 0 {
batch := g.db.NewBatch()
rawdb.WriteSnapshotRoot(batch, ctx.root)
journalProgress(batch, g.progress, g.stats)
if err := batch.Write(); err != nil {
log.Crit("Failed to write initialized state marker", "err", err)
}
}
// Initialize the global generator context. The snapshot iterators are
// opened at the interrupted position because the assumption is held
// that all the snapshot data are generated correctly before the marker.
// Even if the snapshot data is updated during the interruption (before
// or at the marker), the assumption is still held.
// For the account or storage slot at the interruption, they will be
// processed twice by the generator(they are already processed in the
// last run) but it's fine.
var (
accMarker, _ = splitMarker(g.progress)
abort chan struct{}
)
if err := g.generateAccounts(ctx, accMarker); err != nil {
// Extract the received interruption signal if exists
var aerr *abortErr
if errors.As(err, &aerr) {
abort = aerr.abort
}
// Aborted by internal error, wait the signal
if abort == nil {
abort = <-g.abort
}
close(abort)
return
}
// Snapshot fully generated, set the marker to nil.
// Note even there is nothing to commit, persist the
// generator anyway to mark the snapshot is complete.
journalProgress(ctx.batch, nil, g.stats)
if err := ctx.batch.Write(); err != nil {
log.Error("Failed to flush batch", "err", err)
abort = <-g.abort
close(abort)
return
}
ctx.batch.Reset()
log.Info("Generated state snapshot", "accounts", g.stats.accounts, "slots", g.stats.slots,
"storage", g.stats.storage, "dangling", g.stats.dangling, "elapsed", common.PrettyDuration(time.Since(g.stats.start)))
// Update the generation progress marker
g.lock.Lock()
g.progress = nil
g.lock.Unlock()
close(g.done)
// Someone will be looking for us, wait it out
abort = <-g.abort
close(abort)
}
// increaseKey increase the input key by one bit. Return nil if the entire
// addition operation overflows.
func increaseKey(key []byte) []byte {
for i := len(key) - 1; i >= 0; i-- {
key[i]++
if key[i] != 0x0 {
return key
}
}
return nil
}
// abortErr wraps an interruption signal received to represent the
// generation is aborted by external processes.
type abortErr struct {
abort chan struct{}
}
func newAbortErr(abort chan struct{}) error {
return &abortErr{abort: abort}
}
func (err *abortErr) Error() string {
return "aborted"
}

View File

@ -0,0 +1,766 @@
// Copyright 2024 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package pathdb
import (
"fmt"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/internal/testrand"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie"
"github.com/ethereum/go-ethereum/trie/trienode"
"github.com/holiman/uint256"
)
func hashData(input []byte) common.Hash {
return crypto.Keccak256Hash(input)
}
type genTester struct {
diskdb ethdb.Database
db *Database
acctTrie *trie.Trie
nodes *trienode.MergedNodeSet
states *StateSetWithOrigin
}
func newGenTester() *genTester {
disk := rawdb.NewMemoryDatabase()
config := *Defaults
config.SnapshotNoBuild = true // no background generation
db := New(disk, &config, false)
tr, _ := trie.New(trie.StateTrieID(types.EmptyRootHash), db)
return &genTester{
diskdb: disk,
db: db,
acctTrie: tr,
nodes: trienode.NewMergedNodeSet(),
states: NewStateSetWithOrigin(nil, nil, nil, nil),
}
}
func (t *genTester) addTrieAccount(acckey string, acc *types.StateAccount) {
var (
addr = common.BytesToAddress([]byte(acckey))
key = hashData([]byte(acckey))
val, _ = rlp.EncodeToBytes(acc)
)
t.acctTrie.MustUpdate(key.Bytes(), val)
t.states.accountData[key] = val
t.states.accountOrigin[addr] = nil
}
func (t *genTester) addSnapAccount(acckey string, acc *types.StateAccount) {
key := hashData([]byte(acckey))
rawdb.WriteAccountSnapshot(t.diskdb, key, types.SlimAccountRLP(*acc))
}
func (t *genTester) addAccount(acckey string, acc *types.StateAccount) {
t.addTrieAccount(acckey, acc)
t.addSnapAccount(acckey, acc)
}
func (t *genTester) addSnapStorage(accKey string, keys []string, vals []string) {
accHash := hashData([]byte(accKey))
for i, key := range keys {
rawdb.WriteStorageSnapshot(t.diskdb, accHash, hashData([]byte(key)), []byte(vals[i]))
}
}
func (t *genTester) makeStorageTrie(accKey string, keys []string, vals []string, commit bool) common.Hash {
var (
owner = hashData([]byte(accKey))
addr = common.BytesToAddress([]byte(accKey))
id = trie.StorageTrieID(types.EmptyRootHash, owner, types.EmptyRootHash)
tr, _ = trie.New(id, t.db)
storages = make(map[common.Hash][]byte)
storageOrigins = make(map[common.Hash][]byte)
)
for i, k := range keys {
key := hashData([]byte(k))
tr.MustUpdate(key.Bytes(), []byte(vals[i]))
storages[key] = []byte(vals[i])
storageOrigins[key] = nil
}
if !commit {
return tr.Hash()
}
root, nodes := tr.Commit(false)
if nodes != nil {
t.nodes.Merge(nodes)
}
t.states.storageData[owner] = storages
t.states.storageOrigin[addr] = storageOrigins
return root
}
func (t *genTester) Commit() common.Hash {
root, nodes := t.acctTrie.Commit(true)
if nodes != nil {
t.nodes.Merge(nodes)
}
t.db.Update(root, types.EmptyRootHash, 0, t.nodes, t.states)
t.db.Commit(root, false)
return root
}
func (t *genTester) CommitAndGenerate() (common.Hash, *diskLayer) {
root := t.Commit()
dl := generateSnapshot(t.db, root)
return root, dl
}
// Tests that snapshot generation from an empty database.
func TestGeneration(t *testing.T) {
helper := newGenTester()
stRoot := helper.makeStorageTrie("", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, false)
helper.addTrieAccount("acc-1", &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()})
helper.addTrieAccount("acc-2", &types.StateAccount{Balance: uint256.NewInt(2), Root: types.EmptyRootHash, CodeHash: types.EmptyCodeHash.Bytes()})
helper.addTrieAccount("acc-3", &types.StateAccount{Balance: uint256.NewInt(3), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()})
helper.makeStorageTrie("acc-1", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true)
helper.makeStorageTrie("acc-3", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true)
root, dl := helper.CommitAndGenerate()
if have, want := root, common.HexToHash("0xe3712f1a226f3782caca78ca770ccc19ee000552813a9f59d479f8611db9b1fd"); have != want {
t.Fatalf("have %#x want %#x", have, want)
}
select {
case <-dl.generator.done:
// Snapshot generation succeeded
case <-time.After(3 * time.Second):
t.Errorf("Snapshot generation failed")
}
// TODO(rjl493456442) enable the snapshot tests
// checkSnapRoot(t, snap, root)
// Signal abortion to the generator and wait for it to tear down
dl.generator.stop()
}
// Tests that snapshot generation with existent flat state, where the flat state
// contains some errors:
// - the contract with empty storage root but has storage entries in the disk
// - the contract with non empty storage root but empty storage slots
// - the contract(non-empty storage) misses some storage slots
// - miss in the beginning
// - miss in the middle
// - miss in the end
//
// - the contract(non-empty storage) has wrong storage slots
// - wrong slots in the beginning
// - wrong slots in the middle
// - wrong slots in the end
//
// - the contract(non-empty storage) has extra storage slots
// - extra slots in the beginning
// - extra slots in the middle
// - extra slots in the end
func TestGenerateExistentStateWithWrongStorage(t *testing.T) {
helper := newGenTester()
// Account one, empty storage trie root but non-empty flat states
helper.addAccount("acc-1", &types.StateAccount{Balance: uint256.NewInt(1), Root: types.EmptyRootHash, CodeHash: types.EmptyCodeHash.Bytes()})
helper.addSnapStorage("acc-1", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"})
// Account two, non-empty storage trie root but empty flat states
stRoot := helper.makeStorageTrie("acc-2", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true)
helper.addAccount("acc-2", &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()})
// Miss slots
{
// Account three, non-empty root but misses slots in the beginning
helper.makeStorageTrie("acc-3", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true)
helper.addAccount("acc-3", &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()})
helper.addSnapStorage("acc-3", []string{"key-2", "key-3"}, []string{"val-2", "val-3"})
// Account four, non-empty root but misses slots in the middle
helper.makeStorageTrie("acc-4", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true)
helper.addAccount("acc-4", &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()})
helper.addSnapStorage("acc-4", []string{"key-1", "key-3"}, []string{"val-1", "val-3"})
// Account five, non-empty root but misses slots in the end
helper.makeStorageTrie("acc-5", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true)
helper.addAccount("acc-5", &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()})
helper.addSnapStorage("acc-5", []string{"key-1", "key-2"}, []string{"val-1", "val-2"})
}
// Wrong storage slots
{
// Account six, non-empty root but wrong slots in the beginning
helper.makeStorageTrie("acc-6", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true)
helper.addAccount("acc-6", &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()})
helper.addSnapStorage("acc-6", []string{"key-1", "key-2", "key-3"}, []string{"badval-1", "val-2", "val-3"})
// Account seven, non-empty root but wrong slots in the middle
helper.makeStorageTrie("acc-7", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true)
helper.addAccount("acc-7", &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()})
helper.addSnapStorage("acc-7", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "badval-2", "val-3"})
// Account eight, non-empty root but wrong slots in the end
helper.makeStorageTrie("acc-8", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true)
helper.addAccount("acc-8", &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()})
helper.addSnapStorage("acc-8", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "badval-3"})
// Account 9, non-empty root but rotated slots
helper.makeStorageTrie("acc-9", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true)
helper.addAccount("acc-9", &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()})
helper.addSnapStorage("acc-9", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-3", "val-2"})
}
// Extra storage slots
{
// Account 10, non-empty root but extra slots in the beginning
helper.makeStorageTrie("acc-10", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true)
helper.addAccount("acc-10", &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()})
helper.addSnapStorage("acc-10", []string{"key-0", "key-1", "key-2", "key-3"}, []string{"val-0", "val-1", "val-2", "val-3"})
// Account 11, non-empty root but extra slots in the middle
helper.makeStorageTrie("acc-11", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true)
helper.addAccount("acc-11", &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()})
helper.addSnapStorage("acc-11", []string{"key-1", "key-2", "key-2-1", "key-3"}, []string{"val-1", "val-2", "val-2-1", "val-3"})
// Account 12, non-empty root but extra slots in the end
helper.makeStorageTrie("acc-12", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true)
helper.addAccount("acc-12", &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()})
helper.addSnapStorage("acc-12", []string{"key-1", "key-2", "key-3", "key-4"}, []string{"val-1", "val-2", "val-3", "val-4"})
}
root, dl := helper.CommitAndGenerate()
t.Logf("Root: %#x\n", root) // Root = 0x8746cce9fd9c658b2cfd639878ed6584b7a2b3e73bb40f607fcfa156002429a0
select {
case <-dl.generator.done:
// Snapshot generation succeeded
case <-time.After(3 * time.Second):
t.Errorf("Snapshot generation failed")
}
// TODO(rjl493456442) enable the snapshot tests
// checkSnapRoot(t, snap, root)
// Signal abortion to the generator and wait for it to tear down
dl.generator.stop()
}
// Tests that snapshot generation with existent flat state, where the flat state
// contains some errors:
// - miss accounts
// - wrong accounts
// - extra accounts
func TestGenerateExistentStateWithWrongAccounts(t *testing.T) {
helper := newGenTester()
// Trie accounts [acc-1, acc-2, acc-3, acc-4, acc-6]
helper.makeStorageTrie("acc-1", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true)
helper.makeStorageTrie("acc-2", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true)
helper.makeStorageTrie("acc-3", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true)
helper.makeStorageTrie("acc-4", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true)
stRoot := helper.makeStorageTrie("acc-6", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true)
// Missing accounts, only in the trie
{
helper.addTrieAccount("acc-1", &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()}) // Beginning
helper.addTrieAccount("acc-4", &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()}) // Middle
helper.addTrieAccount("acc-6", &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()}) // End
}
// Wrong accounts
{
helper.addTrieAccount("acc-2", &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()})
helper.addSnapAccount("acc-2", &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: common.Hex2Bytes("0x1234")})
helper.addTrieAccount("acc-3", &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()})
helper.addSnapAccount("acc-3", &types.StateAccount{Balance: uint256.NewInt(1), Root: types.EmptyRootHash, CodeHash: types.EmptyCodeHash.Bytes()})
}
// Extra accounts, only in the snap
{
helper.addSnapAccount("acc-0", &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()}) // before the beginning
helper.addSnapAccount("acc-5", &types.StateAccount{Balance: uint256.NewInt(1), Root: types.EmptyRootHash, CodeHash: common.Hex2Bytes("0x1234")}) // Middle
helper.addSnapAccount("acc-7", &types.StateAccount{Balance: uint256.NewInt(1), Root: types.EmptyRootHash, CodeHash: types.EmptyCodeHash.Bytes()}) // after the end
}
root, dl := helper.CommitAndGenerate()
t.Logf("Root: %#x\n", root) // Root = 0x825891472281463511e7ebcc7f109e4f9200c20fa384754e11fd605cd98464e8
select {
case <-dl.generator.done:
// Snapshot generation succeeded
case <-time.After(3 * time.Second):
t.Errorf("Snapshot generation failed")
}
// TODO(rjl493456442) enable the snapshot tests
// checkSnapRoot(t, snap, root)
// Signal abortion to the generator and wait for it to tear down
dl.generator.stop()
}
func TestGenerateCorruptAccountTrie(t *testing.T) {
helper := newGenTester()
helper.addTrieAccount("acc-1", &types.StateAccount{Balance: uint256.NewInt(1), Root: types.EmptyRootHash, CodeHash: types.EmptyCodeHash.Bytes()}) // 0xc7a30f39aff471c95d8a837497ad0e49b65be475cc0953540f80cfcdbdcd9074
helper.addTrieAccount("acc-2", &types.StateAccount{Balance: uint256.NewInt(2), Root: types.EmptyRootHash, CodeHash: types.EmptyCodeHash.Bytes()}) // 0x65145f923027566669a1ae5ccac66f945b55ff6eaeb17d2ea8e048b7d381f2d7
helper.addTrieAccount("acc-3", &types.StateAccount{Balance: uint256.NewInt(3), Root: types.EmptyRootHash, CodeHash: types.EmptyCodeHash.Bytes()}) // 0x19ead688e907b0fab07176120dceec244a72aff2f0aa51e8b827584e378772f4
root := helper.Commit() // Root: 0xa04693ea110a31037fb5ee814308a6f1d76bdab0b11676bdf4541d2de55ba978
// Delete an account trie node and ensure the generator chokes
path := []byte{0xc}
if !rawdb.HasAccountTrieNode(helper.diskdb, path) {
t.Logf("Invalid node path to delete, %v", path)
}
rawdb.DeleteAccountTrieNode(helper.diskdb, path)
helper.db.tree.bottom().resetCache()
dl := generateSnapshot(helper.db, root)
select {
case <-dl.generator.done:
// Snapshot generation succeeded
t.Errorf("Snapshot generated against corrupt account trie")
case <-time.After(time.Second):
// Not generated fast enough, hopefully blocked inside on missing trie node fail
}
// Signal abortion to the generator and wait for it to tear down
dl.generator.stop()
}
func TestGenerateMissingStorageTrie(t *testing.T) {
var (
acc1 = hashData([]byte("acc-1"))
acc3 = hashData([]byte("acc-3"))
helper = newGenTester()
)
stRoot := helper.makeStorageTrie("acc-1", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true) // 0xddefcd9376dd029653ef384bd2f0a126bb755fe84fdcc9e7cf421ba454f2bc67
helper.addTrieAccount("acc-1", &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()}) // 0x9250573b9c18c664139f3b6a7a8081b7d8f8916a8fcc5d94feec6c29f5fd4e9e
helper.addTrieAccount("acc-2", &types.StateAccount{Balance: uint256.NewInt(2), Root: types.EmptyRootHash, CodeHash: types.EmptyCodeHash.Bytes()}) // 0x65145f923027566669a1ae5ccac66f945b55ff6eaeb17d2ea8e048b7d381f2d7
stRoot = helper.makeStorageTrie("acc-3", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true)
helper.addTrieAccount("acc-3", &types.StateAccount{Balance: uint256.NewInt(3), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()}) // 0x50815097425d000edfc8b3a4a13e175fc2bdcfee8bdfbf2d1ff61041d3c235b2
root := helper.Commit()
// Delete storage trie root of account one and three.
rawdb.DeleteStorageTrieNode(helper.diskdb, acc1, nil)
rawdb.DeleteStorageTrieNode(helper.diskdb, acc3, nil)
helper.db.tree.bottom().resetCache()
dl := generateSnapshot(helper.db, root)
select {
case <-dl.generator.done:
// Snapshot generation succeeded
t.Errorf("Snapshot generated against corrupt storage trie")
case <-time.After(time.Second):
// Not generated fast enough, hopefully blocked inside on missing trie node fail
}
// Signal abortion to the generator and wait for it to tear down
dl.generator.stop()
}
func TestGenerateCorruptStorageTrie(t *testing.T) {
helper := newGenTester()
stRoot := helper.makeStorageTrie("acc-1", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true) // 0xddefcd9376dd029653ef384bd2f0a126bb755fe84fdcc9e7cf421ba454f2bc67
helper.addTrieAccount("acc-1", &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()}) // 0x9250573b9c18c664139f3b6a7a8081b7d8f8916a8fcc5d94feec6c29f5fd4e9e
helper.addTrieAccount("acc-2", &types.StateAccount{Balance: uint256.NewInt(2), Root: types.EmptyRootHash, CodeHash: types.EmptyCodeHash.Bytes()}) // 0x65145f923027566669a1ae5ccac66f945b55ff6eaeb17d2ea8e048b7d381f2d7
stRoot = helper.makeStorageTrie("acc-3", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true)
helper.addTrieAccount("acc-3", &types.StateAccount{Balance: uint256.NewInt(3), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()}) // 0x50815097425d000edfc8b3a4a13e175fc2bdcfee8bdfbf2d1ff61041d3c235b2
root := helper.Commit()
// Delete a node in the storage trie.
path := []byte{0x4}
if !rawdb.HasStorageTrieNode(helper.diskdb, hashData([]byte("acc-1")), path) {
t.Logf("Invalid node path to delete, %v", path)
}
rawdb.DeleteStorageTrieNode(helper.diskdb, hashData([]byte("acc-1")), []byte{0x4})
if !rawdb.HasStorageTrieNode(helper.diskdb, hashData([]byte("acc-3")), path) {
t.Logf("Invalid node path to delete, %v", path)
}
rawdb.DeleteStorageTrieNode(helper.diskdb, hashData([]byte("acc-3")), []byte{0x4})
helper.db.tree.bottom().resetCache()
dl := generateSnapshot(helper.db, root)
select {
case <-dl.generator.done:
// Snapshot generation succeeded
t.Errorf("Snapshot generated against corrupt storage trie")
case <-time.After(time.Second):
// Not generated fast enough, hopefully blocked inside on missing trie node fail
}
// Signal abortion to the generator and wait for it to tear down
dl.generator.stop()
}
func TestGenerateWithExtraAccounts(t *testing.T) {
helper := newGenTester()
// Account one in the trie
stRoot := helper.makeStorageTrie("acc-1",
[]string{"key-1", "key-2", "key-3", "key-4", "key-5"},
[]string{"val-1", "val-2", "val-3", "val-4", "val-5"},
true,
)
acc := &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()}
val, _ := rlp.EncodeToBytes(acc)
helper.acctTrie.MustUpdate(hashData([]byte("acc-1")).Bytes(), val) // 0x9250573b9c18c664139f3b6a7a8081b7d8f8916a8fcc5d94feec6c29f5fd4e9e
// Identical in the snap
key := hashData([]byte("acc-1"))
rawdb.WriteAccountSnapshot(helper.diskdb, key, val)
rawdb.WriteStorageSnapshot(helper.diskdb, key, hashData([]byte("key-1")), []byte("val-1"))
rawdb.WriteStorageSnapshot(helper.diskdb, key, hashData([]byte("key-2")), []byte("val-2"))
rawdb.WriteStorageSnapshot(helper.diskdb, key, hashData([]byte("key-3")), []byte("val-3"))
rawdb.WriteStorageSnapshot(helper.diskdb, key, hashData([]byte("key-4")), []byte("val-4"))
rawdb.WriteStorageSnapshot(helper.diskdb, key, hashData([]byte("key-5")), []byte("val-5"))
// Account two exists only in the snapshot
stRoot = helper.makeStorageTrie("acc-2",
[]string{"key-1", "key-2", "key-3", "key-4", "key-5"},
[]string{"val-1", "val-2", "val-3", "val-4", "val-5"},
true,
)
acc = &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()}
val, _ = rlp.EncodeToBytes(acc)
key = hashData([]byte("acc-2"))
rawdb.WriteAccountSnapshot(helper.diskdb, key, val)
rawdb.WriteStorageSnapshot(helper.diskdb, key, hashData([]byte("b-key-1")), []byte("b-val-1"))
rawdb.WriteStorageSnapshot(helper.diskdb, key, hashData([]byte("b-key-2")), []byte("b-val-2"))
rawdb.WriteStorageSnapshot(helper.diskdb, key, hashData([]byte("b-key-3")), []byte("b-val-3"))
root := helper.Commit()
// To verify the test: If we now inspect the snap db, there should exist extraneous storage items
if data := rawdb.ReadStorageSnapshot(helper.diskdb, hashData([]byte("acc-2")), hashData([]byte("b-key-1"))); data == nil {
t.Fatalf("expected snap storage to exist")
}
dl := generateSnapshot(helper.db, root)
select {
case <-dl.generator.done:
// Snapshot generation succeeded
case <-time.After(3 * time.Second):
t.Errorf("Snapshot generation failed")
}
// TODO(rjl493456442) enable the snapshot tests
// checkSnapRoot(t, snap, root)
// Signal abortion to the generator and wait for it to tear down
dl.generator.stop()
// If we now inspect the snap db, there should exist no extraneous storage items
if data := rawdb.ReadStorageSnapshot(helper.diskdb, hashData([]byte("acc-2")), hashData([]byte("b-key-1"))); data != nil {
t.Fatalf("expected slot to be removed, got %v", string(data))
}
}
func TestGenerateWithManyExtraAccounts(t *testing.T) {
helper := newGenTester()
// Account one in the trie
stRoot := helper.makeStorageTrie("acc-1",
[]string{"key-1", "key-2", "key-3"},
[]string{"val-1", "val-2", "val-3"},
true,
)
acc := &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()}
val, _ := rlp.EncodeToBytes(acc)
helper.acctTrie.MustUpdate(hashData([]byte("acc-1")).Bytes(), val) // 0x9250573b9c18c664139f3b6a7a8081b7d8f8916a8fcc5d94feec6c29f5fd4e9e
// Identical in the snap
key := hashData([]byte("acc-1"))
rawdb.WriteAccountSnapshot(helper.diskdb, key, val)
rawdb.WriteStorageSnapshot(helper.diskdb, key, hashData([]byte("key-1")), []byte("val-1"))
rawdb.WriteStorageSnapshot(helper.diskdb, key, hashData([]byte("key-2")), []byte("val-2"))
rawdb.WriteStorageSnapshot(helper.diskdb, key, hashData([]byte("key-3")), []byte("val-3"))
// 100 accounts exist only in snapshot
for i := 0; i < 1000; i++ {
acc := &types.StateAccount{Balance: uint256.NewInt(uint64(i)), Root: types.EmptyRootHash, CodeHash: types.EmptyCodeHash.Bytes()}
val, _ := rlp.EncodeToBytes(acc)
key := hashData([]byte(fmt.Sprintf("acc-%d", i)))
rawdb.WriteAccountSnapshot(helper.diskdb, key, val)
}
_, dl := helper.CommitAndGenerate()
select {
case <-dl.generator.done:
// Snapshot generation succeeded
case <-time.After(3 * time.Second):
t.Errorf("Snapshot generation failed")
}
// TODO(rjl493456442) enable the snapshot tests
// checkSnapRoot(t, snap, root)
// Signal abortion to the generator and wait for it to tear down
dl.generator.stop()
}
func TestGenerateWithExtraBeforeAndAfter(t *testing.T) {
helper := newGenTester()
acc := &types.StateAccount{Balance: uint256.NewInt(1), Root: types.EmptyRootHash, CodeHash: types.EmptyCodeHash.Bytes()}
val, _ := rlp.EncodeToBytes(acc)
acctHashA := hashData([]byte("acc-1"))
acctHashB := hashData([]byte("acc-2"))
helper.acctTrie.MustUpdate(acctHashA.Bytes(), val)
helper.acctTrie.MustUpdate(acctHashB.Bytes(), val)
rawdb.WriteAccountSnapshot(helper.diskdb, acctHashA, val)
rawdb.WriteAccountSnapshot(helper.diskdb, acctHashB, val)
for i := 0; i < 16; i++ {
rawdb.WriteAccountSnapshot(helper.diskdb, common.Hash{byte(i)}, val)
}
_, dl := helper.CommitAndGenerate()
select {
case <-dl.generator.done:
// Snapshot generation succeeded
case <-time.After(3 * time.Second):
t.Errorf("Snapshot generation failed")
}
// TODO(rjl493456442) enable the snapshot tests
// checkSnapRoot(t, snap, root)
// Signal abortion to the generator and wait for it to tear down
dl.generator.stop()
}
func TestGenerateWithMalformedStateData(t *testing.T) {
helper := newGenTester()
acctHash := hashData([]byte("acc"))
acc := &types.StateAccount{Balance: uint256.NewInt(1), Root: types.EmptyRootHash, CodeHash: types.EmptyCodeHash.Bytes()}
val, _ := rlp.EncodeToBytes(acc)
helper.acctTrie.MustUpdate(acctHash.Bytes(), val)
junk := make([]byte, 100)
copy(junk, []byte{0xde, 0xad})
rawdb.WriteAccountSnapshot(helper.diskdb, acctHash, junk)
for i := 0; i < 16; i++ {
rawdb.WriteAccountSnapshot(helper.diskdb, common.Hash{byte(i)}, junk)
}
_, dl := helper.CommitAndGenerate()
select {
case <-dl.generator.done:
// Snapshot generation succeeded
case <-time.After(3 * time.Second):
t.Errorf("Snapshot generation failed")
}
// TODO(rjl493456442) enable the snapshot tests
// checkSnapRoot(t, snap, root)
// Signal abortion to the generator and wait for it to tear down
dl.generator.stop()
}
func TestGenerateFromEmptySnap(t *testing.T) {
helper := newGenTester()
for i := 0; i < 400; i++ {
stRoot := helper.makeStorageTrie(fmt.Sprintf("acc-%d", i), []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true)
helper.addTrieAccount(fmt.Sprintf("acc-%d", i), &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()})
}
root, snap := helper.CommitAndGenerate()
t.Logf("Root: %#x\n", root) // Root: 0x6f7af6d2e1a1bf2b84a3beb3f8b64388465fbc1e274ca5d5d3fc787ca78f59e4
select {
case <-snap.generator.done:
// Snapshot generation succeeded
case <-time.After(3 * time.Second):
t.Errorf("Snapshot generation failed")
}
// TODO(rjl493456442) enable the snapshot tests
// checkSnapRoot(t, snap, root)
// Signal abortion to the generator and wait for it to tear down
snap.generator.stop()
}
func TestGenerateWithIncompleteStorage(t *testing.T) {
helper := newGenTester()
stKeys := []string{"1", "2", "3", "4", "5", "6", "7", "8"}
stVals := []string{"v1", "v2", "v3", "v4", "v5", "v6", "v7", "v8"}
// We add 8 accounts, each one is missing exactly one of the storage slots. This means
// we don't have to order the keys and figure out exactly which hash-key winds up
// on the sensitive spots at the boundaries
for i := 0; i < 8; i++ {
accKey := fmt.Sprintf("acc-%d", i)
stRoot := helper.makeStorageTrie(accKey, stKeys, stVals, true)
helper.addAccount(accKey, &types.StateAccount{Balance: uint256.NewInt(uint64(i)), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()})
var moddedKeys []string
var moddedVals []string
for ii := 0; ii < 8; ii++ {
if ii != i {
moddedKeys = append(moddedKeys, stKeys[ii])
moddedVals = append(moddedVals, stVals[ii])
}
}
helper.addSnapStorage(accKey, moddedKeys, moddedVals)
}
root, dl := helper.CommitAndGenerate()
t.Logf("Root: %#x\n", root) // Root: 0xca73f6f05ba4ca3024ef340ef3dfca8fdabc1b677ff13f5a9571fd49c16e67ff
select {
case <-dl.generator.done:
// Snapshot generation succeeded
case <-time.After(3 * time.Second):
t.Errorf("Snapshot generation failed")
}
// TODO(rjl493456442) enable the snapshot tests
// checkSnapRoot(t, snap, root)
// Signal abortion to the generator and wait for it to tear down
dl.generator.stop()
}
func incKey(key []byte) []byte {
for i := len(key) - 1; i >= 0; i-- {
key[i]++
if key[i] != 0x0 {
break
}
}
return key
}
func decKey(key []byte) []byte {
for i := len(key) - 1; i >= 0; i-- {
key[i]--
if key[i] != 0xff {
break
}
}
return key
}
func populateDangling(disk ethdb.KeyValueStore) {
populate := func(accountHash common.Hash, keys []string, vals []string) {
for i, key := range keys {
rawdb.WriteStorageSnapshot(disk, accountHash, hashData([]byte(key)), []byte(vals[i]))
}
}
// Dangling storages of the "first" account
populate(common.Hash{}, []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"})
// Dangling storages of the "last" account
populate(common.HexToHash("ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"), []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"})
// Dangling storages around the account 1
hash := decKey(hashData([]byte("acc-1")).Bytes())
populate(common.BytesToHash(hash), []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"})
hash = incKey(hashData([]byte("acc-1")).Bytes())
populate(common.BytesToHash(hash), []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"})
// Dangling storages around the account 2
hash = decKey(hashData([]byte("acc-2")).Bytes())
populate(common.BytesToHash(hash), []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"})
hash = incKey(hashData([]byte("acc-2")).Bytes())
populate(common.BytesToHash(hash), []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"})
// Dangling storages around the account 3
hash = decKey(hashData([]byte("acc-3")).Bytes())
populate(common.BytesToHash(hash), []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"})
hash = incKey(hashData([]byte("acc-3")).Bytes())
populate(common.BytesToHash(hash), []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"})
// Dangling storages of the random account
populate(testrand.Hash(), []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"})
populate(testrand.Hash(), []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"})
populate(testrand.Hash(), []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"})
}
func TestGenerateCompleteSnapshotWithDanglingStorage(t *testing.T) {
var helper = newGenTester()
stRoot := helper.makeStorageTrie("acc-1", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true)
helper.addAccount("acc-1", &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()})
helper.addAccount("acc-2", &types.StateAccount{Balance: uint256.NewInt(1), Root: types.EmptyRootHash, CodeHash: types.EmptyCodeHash.Bytes()})
helper.makeStorageTrie("acc-3", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true)
helper.addAccount("acc-3", &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()})
helper.addSnapStorage("acc-1", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"})
helper.addSnapStorage("acc-3", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"})
populateDangling(helper.diskdb)
_, dl := helper.CommitAndGenerate()
select {
case <-dl.generator.done:
// Snapshot generation succeeded
case <-time.After(3 * time.Second):
t.Errorf("Snapshot generation failed")
}
// TODO(rjl493456442) enable the snapshot tests
// checkSnapRoot(t, snap, root)
// Signal abortion to the generator and wait for it to tear down
dl.generator.stop()
}
func TestGenerateBrokenSnapshotWithDanglingStorage(t *testing.T) {
var helper = newGenTester()
stRoot := helper.makeStorageTrie("acc-1", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true)
helper.addTrieAccount("acc-1", &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()})
helper.addTrieAccount("acc-2", &types.StateAccount{Balance: uint256.NewInt(2), Root: types.EmptyRootHash, CodeHash: types.EmptyCodeHash.Bytes()})
helper.makeStorageTrie("acc-3", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true)
helper.addTrieAccount("acc-3", &types.StateAccount{Balance: uint256.NewInt(3), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()})
populateDangling(helper.diskdb)
_, dl := helper.CommitAndGenerate()
select {
case <-dl.generator.done:
// Snapshot generation succeeded
case <-time.After(3 * time.Second):
t.Errorf("Snapshot generation failed")
}
// TODO(rjl493456442) enable the snapshot tests
// checkSnapRoot(t, snap, root)
// Signal abortion to the generator and wait for it to tear down
dl.generator.stop()
}

View File

@ -135,15 +135,12 @@ func TestAccountIteratorBasics(t *testing.T) {
it := newDiffAccountIterator(common.Hash{}, states, nil)
verifyIterator(t, 100, it, verifyNothing) // Nil is allowed for single layer iterator
// TODO reenable these tests once the persistent state iteration
// is implemented.
//db := rawdb.NewMemoryDatabase()
//batch := db.NewBatch()
//states.write(db, batch, nil, nil)
//batch.Write()
//it = newDiskAccountIterator(db, common.Hash{})
//verifyIterator(t, 100, it, verifyNothing) // Nil is allowed for single layer iterator
db := rawdb.NewMemoryDatabase()
batch := db.NewBatch()
states.write(batch, nil, nil)
batch.Write()
it = newDiskAccountIterator(db, common.Hash{})
verifyIterator(t, 100, it, verifyNothing) // Nil is allowed for single layer iterator
}
// TestStorageIteratorBasics tests some simple single-layer(diff and disk) iteration for storage
@ -177,17 +174,14 @@ func TestStorageIteratorBasics(t *testing.T) {
verifyIterator(t, 100, it, verifyNothing) // Nil is allowed for single layer iterator
}
// TODO reenable these tests once the persistent state iteration
// is implemented.
//db := rawdb.NewMemoryDatabase()
//batch := db.NewBatch()
//states.write(db, batch, nil, nil)
//batch.Write()
//for account := range accounts {
// it := newDiskStorageIterator(db, account, common.Hash{})
// verifyIterator(t, 100-nilStorage[account], it, verifyNothing) // Nil is allowed for single layer iterator
//}
db := rawdb.NewMemoryDatabase()
batch := db.NewBatch()
states.write(batch, nil, nil)
batch.Write()
for account := range accounts {
it := newDiskStorageIterator(db, account, common.Hash{})
verifyIterator(t, 100-nilStorage[account], it, verifyNothing) // Nil is allowed for single layer iterator
}
}
type testIterator struct {
@ -263,7 +257,7 @@ func TestAccountIteratorTraversal(t *testing.T) {
WriteBufferSize: 0,
}
db := New(rawdb.NewMemoryDatabase(), config, false)
// db.WaitGeneration()
db.waitGeneration()
// Stack three diff layers on top with various overlaps
db.Update(common.HexToHash("0x02"), types.EmptyRootHash, 0, trienode.NewMergedNodeSet(),
@ -290,19 +284,16 @@ func TestAccountIteratorTraversal(t *testing.T) {
verifyIterator(t, 7, it, verifyAccount)
it.Release()
// TODO reenable these tests once the persistent state iteration
// is implemented.
// Test after persist some bottom-most layers into the disk,
// the functionalities still work.
//db.tree.cap(common.HexToHash("0x04"), 2)
db.tree.cap(common.HexToHash("0x04"), 2)
//head = db.tree.get(common.HexToHash("0x04"))
//verifyIterator(t, 7, head.(*diffLayer).newBinaryAccountIterator(), verifyAccount)
//
//it, _ = db.AccountIterator(common.HexToHash("0x04"), common.Hash{})
//verifyIterator(t, 7, it, verifyAccount)
//it.Release()
head = db.tree.get(common.HexToHash("0x04"))
verifyIterator(t, 7, head.(*diffLayer).newBinaryAccountIterator(common.Hash{}), verifyAccount)
it, _ = db.AccountIterator(common.HexToHash("0x04"), common.Hash{})
verifyIterator(t, 7, it, verifyAccount)
it.Release()
}
func TestStorageIteratorTraversal(t *testing.T) {
@ -310,7 +301,7 @@ func TestStorageIteratorTraversal(t *testing.T) {
WriteBufferSize: 0,
}
db := New(rawdb.NewMemoryDatabase(), config, false)
// db.WaitGeneration()
db.waitGeneration()
// Stack three diff layers on top with various overlaps
db.Update(common.HexToHash("0x02"), types.EmptyRootHash, 0, trienode.NewMergedNodeSet(),
@ -337,17 +328,14 @@ func TestStorageIteratorTraversal(t *testing.T) {
verifyIterator(t, 6, it, verifyStorage)
it.Release()
// TODO reenable these tests once the persistent state iteration
// is implemented.
// Test after persist some bottom-most layers into the disk,
// the functionalities still work.
//db.tree.cap(common.HexToHash("0x04"), 2)
//verifyIterator(t, 6, head.(*diffLayer).newBinaryStorageIterator(common.HexToHash("0xaa")), verifyStorage)
//
//it, _ = db.StorageIterator(common.HexToHash("0x04"), common.HexToHash("0xaa"), common.Hash{})
//verifyIterator(t, 6, it, verifyStorage)
//it.Release()
db.tree.cap(common.HexToHash("0x04"), 2)
verifyIterator(t, 6, head.(*diffLayer).newBinaryStorageIterator(common.HexToHash("0xaa"), common.Hash{}), verifyStorage)
it, _ = db.StorageIterator(common.HexToHash("0x04"), common.HexToHash("0xaa"), common.Hash{})
verifyIterator(t, 6, it, verifyStorage)
it.Release()
}
// TestAccountIteratorTraversalValues tests some multi-layer iteration, where we
@ -357,7 +345,7 @@ func TestAccountIteratorTraversalValues(t *testing.T) {
WriteBufferSize: 0,
}
db := New(rawdb.NewMemoryDatabase(), config, false)
// db.WaitGeneration()
db.waitGeneration()
// Create a batch of account sets to seed subsequent layers with
var (
@ -434,26 +422,38 @@ func TestAccountIteratorTraversalValues(t *testing.T) {
}
it.Release()
// TODO reenable these tests once the persistent state iteration
// is implemented.
// Test after persist some bottom-most layers into the disk,
// the functionalities still work.
//db.tree.cap(common.HexToHash("0x09"), 2)
//
//it, _ = db.AccountIterator(common.HexToHash("0x09"), common.Hash{})
//for it.Next() {
// hash := it.Hash()
// account, err := head.Account(hash)
// if err != nil {
// t.Fatalf("failed to retrieve expected account: %v", err)
// }
// want, _ := rlp.EncodeToBytes(account)
// if have := it.Account(); !bytes.Equal(want, have) {
// t.Fatalf("hash %x: account mismatch: have %x, want %x", hash, have, want)
// }
//}
//it.Release()
db.tree.cap(common.HexToHash("0x09"), 2)
// binaryIterator
head = db.tree.get(common.HexToHash("0x09"))
it = head.(*diffLayer).newBinaryAccountIterator(common.Hash{})
for it.Next() {
hash := it.Hash()
want, err := r.(*reader).AccountRLP(hash)
if err != nil {
t.Fatalf("failed to retrieve expected account: %v", err)
}
if have := it.Account(); !bytes.Equal(want, have) {
t.Fatalf("hash %x: account mismatch: have %x, want %x", hash, have, want)
}
}
it.Release()
// fastIterator
it, _ = db.AccountIterator(common.HexToHash("0x09"), common.Hash{})
for it.Next() {
hash := it.Hash()
want, err := r.(*reader).AccountRLP(hash)
if err != nil {
t.Fatalf("failed to retrieve expected account: %v", err)
}
if have := it.Account(); !bytes.Equal(want, have) {
t.Fatalf("hash %x: account mismatch: have %x, want %x", hash, have, want)
}
}
it.Release()
}
func TestStorageIteratorTraversalValues(t *testing.T) {
@ -461,7 +461,7 @@ func TestStorageIteratorTraversalValues(t *testing.T) {
WriteBufferSize: 0,
}
db := New(rawdb.NewMemoryDatabase(), config, false)
// db.WaitGeneration()
db.waitGeneration()
wrapStorage := func(storage map[common.Hash][]byte) map[common.Hash]map[common.Hash][]byte {
return map[common.Hash]map[common.Hash][]byte{
@ -543,25 +543,38 @@ func TestStorageIteratorTraversalValues(t *testing.T) {
}
it.Release()
// TODO reenable these tests once the persistent state iteration
// is implemented.
// Test after persist some bottom-most layers into the disk,
// the functionalities still work.
//db.tree.cap(common.HexToHash("0x09"), 2)
//
//it, _ = db.StorageIterator(common.HexToHash("0x09"), common.HexToHash("0xaa"), common.Hash{})
//for it.Next() {
// hash := it.Hash()
// want, err := head.Storage(common.HexToHash("0xaa"), hash)
// if err != nil {
// t.Fatalf("failed to retrieve expected slot: %v", err)
// }
// if have := it.Slot(); !bytes.Equal(want, have) {
// t.Fatalf("hash %x: slot mismatch: have %x, want %x", hash, have, want)
// }
//}
//it.Release()
db.tree.cap(common.HexToHash("0x09"), 2)
// binaryIterator
head = db.tree.get(common.HexToHash("0x09"))
it = head.(*diffLayer).newBinaryStorageIterator(common.HexToHash("0xaa"), common.Hash{})
for it.Next() {
hash := it.Hash()
want, err := r.Storage(common.HexToHash("0xaa"), hash)
if err != nil {
t.Fatalf("failed to retrieve expected account: %v", err)
}
if have := it.Slot(); !bytes.Equal(want, have) {
t.Fatalf("hash %x: account mismatch: have %x, want %x", hash, have, want)
}
}
it.Release()
// fastIterator
it, _ = db.StorageIterator(common.HexToHash("0x09"), common.HexToHash("0xaa"), common.Hash{})
for it.Next() {
hash := it.Hash()
want, err := r.Storage(common.HexToHash("0xaa"), hash)
if err != nil {
t.Fatalf("failed to retrieve expected storage slot: %v", err)
}
if have := it.Slot(); !bytes.Equal(want, have) {
t.Fatalf("hash %x: slot mismatch: have %x, want %x", hash, have, want)
}
}
it.Release()
}
// This testcase is notorious, all layers contain the exact same 200 accounts.
@ -581,7 +594,8 @@ func TestAccountIteratorLargeTraversal(t *testing.T) {
WriteBufferSize: 0,
}
db := New(rawdb.NewMemoryDatabase(), config, false)
// db.WaitGeneration()
db.waitGeneration()
for i := 1; i < 128; i++ {
parent := types.EmptyRootHash
if i == 1 {
@ -599,18 +613,15 @@ func TestAccountIteratorLargeTraversal(t *testing.T) {
verifyIterator(t, 200, it, verifyAccount)
it.Release()
// TODO reenable these tests once the persistent state iteration
// is implemented.
// Test after persist some bottom-most layers into the disk,
// the functionalities still work.
//db.tree.cap(common.HexToHash("0x80"), 2)
//
//verifyIterator(t, 200, head.(*diffLayer).newBinaryAccountIterator(), verifyAccount)
//
//it, _ = db.AccountIterator(common.HexToHash("0x80"), common.Hash{})
//verifyIterator(t, 200, it, verifyAccount)
//it.Release()
db.tree.cap(common.HexToHash("0x80"), 2)
verifyIterator(t, 200, head.(*diffLayer).newBinaryAccountIterator(common.Hash{}), verifyAccount)
it, _ = db.AccountIterator(common.HexToHash("0x80"), common.Hash{})
verifyIterator(t, 200, it, verifyAccount)
it.Release()
}
// TestAccountIteratorFlattening tests what happens when we
@ -622,7 +633,7 @@ func TestAccountIteratorFlattening(t *testing.T) {
WriteBufferSize: 10 * 1024,
}
db := New(rawdb.NewMemoryDatabase(), config, false)
// db.WaitGeneration()
db.waitGeneration()
// Create a stack of diffs on top
db.Update(common.HexToHash("0x02"), types.EmptyRootHash, 1, trienode.NewMergedNodeSet(),
@ -655,7 +666,7 @@ func TestAccountIteratorSeek(t *testing.T) {
WriteBufferSize: 0,
}
db := New(rawdb.NewMemoryDatabase(), config, false)
// db.WaitGeneration()
db.waitGeneration()
db.Update(common.HexToHash("0x02"), types.EmptyRootHash, 1, trienode.NewMergedNodeSet(),
NewStateSetWithOrigin(randomAccountSet("0xaa", "0xee", "0xff", "0xf0"), nil, nil, nil))
@ -727,7 +738,7 @@ func testStorageIteratorSeek(t *testing.T, newIterator func(db *Database, root,
WriteBufferSize: 0,
}
db := New(rawdb.NewMemoryDatabase(), config, false)
// db.WaitGeneration()
db.waitGeneration()
// Stack three diff layers on top with various overlaps
db.Update(common.HexToHash("0x02"), types.EmptyRootHash, 1, trienode.NewMergedNodeSet(),
@ -799,7 +810,7 @@ func testAccountIteratorDeletions(t *testing.T, newIterator func(db *Database, r
WriteBufferSize: 0,
}
db := New(rawdb.NewMemoryDatabase(), config, false)
// db.WaitGeneration()
db.waitGeneration()
// Stack three diff layers on top with various overlaps
db.Update(common.HexToHash("0x02"), types.EmptyRootHash, 1, trienode.NewMergedNodeSet(),
@ -839,7 +850,7 @@ func TestStorageIteratorDeletions(t *testing.T) {
WriteBufferSize: 0,
}
db := New(rawdb.NewMemoryDatabase(), config, false)
// db.WaitGeneration()
db.waitGeneration()
// Stack three diff layers on top with various overlaps
db.Update(common.HexToHash("0x02"), types.EmptyRootHash, 1, trienode.NewMergedNodeSet(),
@ -907,7 +918,7 @@ func testStaleIterator(t *testing.T, newIter func(db *Database, hash common.Hash
WriteBufferSize: 16 * 1024 * 1024,
}
db := New(rawdb.NewMemoryDatabase(), config, false)
// db.WaitGeneration()
db.waitGeneration()
// [02 (disk), 03]
db.Update(common.HexToHash("0x02"), types.EmptyRootHash, 1, trienode.NewMergedNodeSet(),
@ -962,7 +973,7 @@ func BenchmarkAccountIteratorTraversal(b *testing.B) {
WriteBufferSize: 0,
}
db := New(rawdb.NewMemoryDatabase(), config, false)
// db.WaitGeneration()
db.waitGeneration()
for i := 1; i <= 100; i++ {
parent := types.EmptyRootHash
@ -1057,7 +1068,7 @@ func BenchmarkAccountIteratorLargeBaselayer(b *testing.B) {
WriteBufferSize: 0,
}
db := New(rawdb.NewMemoryDatabase(), config, false)
// db.WaitGeneration()
db.waitGeneration()
db.Update(common.HexToHash("0x02"), types.EmptyRootHash, 1, trienode.NewMergedNodeSet(), NewStateSetWithOrigin(makeAccounts(2000), nil, nil, nil))
for i := 2; i <= 100; i++ {

View File

@ -26,6 +26,8 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
)
@ -89,6 +91,52 @@ func (db *Database) loadJournal(diskRoot common.Hash) (layer, error) {
return head, nil
}
// journalGenerator is a disk layer entry containing the generator progress marker.
type journalGenerator struct {
// Indicator that whether the database was in progress of being wiped.
// It's deprecated but keep it here for backward compatibility.
Wiping bool
Done bool // Whether the generator finished creating the snapshot
Marker []byte
Accounts uint64
Slots uint64
Storage uint64
}
// loadGenerator loads the state generation progress marker from the database.
func loadGenerator(db ethdb.KeyValueReader) (*journalGenerator, common.Hash) {
trieRoot := types.EmptyRootHash
if blob := rawdb.ReadAccountTrieNode(db, nil); len(blob) > 0 {
trieRoot = crypto.Keccak256Hash(blob)
}
// State generation progress marker is lost, rebuild it
blob := rawdb.ReadSnapshotGenerator(db)
if len(blob) == 0 {
log.Info("State snapshot generator is not found")
return nil, trieRoot
}
// State generation progress marker is not compatible, rebuild it
var generator journalGenerator
if err := rlp.DecodeBytes(blob, &generator); err != nil {
log.Info("State snapshot generator is not compatible")
return nil, trieRoot
}
// The state snapshot is inconsistent with the trie data and needs to be rebuilt.
// Note: The SnapshotRoot and SnapshotGenerator are always consistent with each
// other, no matter in the legacy state snapshot or the path database.
stateRoot := rawdb.ReadSnapshotRoot(db)
if trieRoot != stateRoot {
log.Info("State snapshot is not consistent with trie", "trie", trieRoot, "state", stateRoot)
return nil, trieRoot
}
// Slice null-ness is lost after rlp decoding, reset it back to empty
if !generator.Done && generator.Marker == nil {
generator.Marker = []byte{}
}
return &generator, trieRoot
}
// loadLayers loads a pre-existing state layer backed by a key-value store.
func (db *Database) loadLayers() layer {
// Retrieve the root node of persistent state.
@ -108,7 +156,7 @@ func (db *Database) loadLayers() layer {
log.Info("Failed to load journal, discard it", "err", err)
}
// Return single layer with persistent state.
return newDiskLayer(root, rawdb.ReadPersistentStateID(db.diskdb), db, nil, newBuffer(db.config.WriteBufferSize, nil, nil, 0))
return newDiskLayer(root, rawdb.ReadPersistentStateID(db.diskdb), db, nil, nil, newBuffer(db.config.WriteBufferSize, nil, nil, 0))
}
// loadDiskLayer reads the binary blob from the layer journal, reconstructing
@ -140,7 +188,7 @@ func (db *Database) loadDiskLayer(r *rlp.Stream) (layer, error) {
if err := states.decode(r); err != nil {
return nil, err
}
return newDiskLayer(root, id, db, nil, newBuffer(db.config.WriteBufferSize, &nodes, &states, id-stored)), nil
return newDiskLayer(root, id, db, nil, nil, newBuffer(db.config.WriteBufferSize, &nodes, &states, id-stored)), nil
}
// loadDiffLayer reads the next sections of a layer journal, reconstructing a new
@ -249,6 +297,10 @@ func (db *Database) Journal(root common.Hash) error {
} else { // disk layer only on noop runs (likely) or deep reorgs (unlikely)
log.Info("Persisting dirty state to disk", "root", root, "layers", disk.buffer.layers)
}
// Terminate the background state generation if it's active
if disk.generator != nil {
disk.generator.stop()
}
start := time.Now()
// Run the journaling

View File

@ -24,6 +24,11 @@ var (
cleanNodeReadMeter = metrics.NewRegisteredMeter("pathdb/clean/node/read", nil)
cleanNodeWriteMeter = metrics.NewRegisteredMeter("pathdb/clean/node/write", nil)
cleanStateHitMeter = metrics.NewRegisteredMeter("pathdb/clean/state/hit", nil)
cleanStateMissMeter = metrics.NewRegisteredMeter("pathdb/clean/state/miss", nil)
cleanStateReadMeter = metrics.NewRegisteredMeter("pathdb/clean/state/read", nil)
cleanStateWriteMeter = metrics.NewRegisteredMeter("pathdb/clean/state/write", nil)
dirtyNodeHitMeter = metrics.NewRegisteredMeter("pathdb/dirty/node/hit", nil)
dirtyNodeMissMeter = metrics.NewRegisteredMeter("pathdb/dirty/node/miss", nil)
dirtyNodeReadMeter = metrics.NewRegisteredMeter("pathdb/dirty/node/read", nil)
@ -32,8 +37,13 @@ var (
stateAccountInexMeter = metrics.NewRegisteredMeter("pathdb/state/account/inex/total", nil)
stateStorageInexMeter = metrics.NewRegisteredMeter("pathdb/state/storage/inex/total", nil)
stateAccountInexDiskMeter = metrics.NewRegisteredMeter("pathdb/state/account/inex/disk", nil)
stateStorageInexDiskMeter = metrics.NewRegisteredMeter("pathdb/state/storage/inex/disk", nil)
stateAccountExistMeter = metrics.NewRegisteredMeter("pathdb/state/account/exist/total", nil)
stateStorageExistMeter = metrics.NewRegisteredMeter("pathdb/state/storage/exist/total", nil)
stateAccountExistDiskMeter = metrics.NewRegisteredMeter("pathdb/state/account/exist/disk", nil)
stateStorageExistDiskMeter = metrics.NewRegisteredMeter("pathdb/state/storage/exist/disk", nil)
dirtyStateHitMeter = metrics.NewRegisteredMeter("pathdb/dirty/state/hit", nil)
dirtyStateMissMeter = metrics.NewRegisteredMeter("pathdb/dirty/state/miss", nil)
@ -48,6 +58,8 @@ var (
commitTimeTimer = metrics.NewRegisteredTimer("pathdb/commit/time", nil)
commitNodesMeter = metrics.NewRegisteredMeter("pathdb/commit/nodes", nil)
commitAccountsMeter = metrics.NewRegisteredMeter("pathdb/commit/accounts", nil)
commitStoragesMeter = metrics.NewRegisteredMeter("pathdb/commit/slots", nil)
commitBytesMeter = metrics.NewRegisteredMeter("pathdb/commit/bytes", nil)
gcTrieNodeMeter = metrics.NewRegisteredMeter("pathdb/gc/node/count", nil)
@ -61,3 +73,28 @@ var (
historyDataBytesMeter = metrics.NewRegisteredMeter("pathdb/history/bytes/data", nil)
historyIndexBytesMeter = metrics.NewRegisteredMeter("pathdb/history/bytes/index", nil)
)
// Metrics in generation
var (
generatedAccountMeter = metrics.NewRegisteredMeter("pathdb/generation/account/generated", nil)
recoveredAccountMeter = metrics.NewRegisteredMeter("pathdb/generation/account/recovered", nil)
wipedAccountMeter = metrics.NewRegisteredMeter("pathdb/generation/account/wiped", nil)
missallAccountMeter = metrics.NewRegisteredMeter("pathdb/generation/account/missall", nil)
generatedStorageMeter = metrics.NewRegisteredMeter("pathdb/generation/storage/generated", nil)
recoveredStorageMeter = metrics.NewRegisteredMeter("pathdb/generation/storage/recovered", nil)
wipedStorageMeter = metrics.NewRegisteredMeter("pathdb/generation/storage/wiped", nil)
missallStorageMeter = metrics.NewRegisteredMeter("pathdb/generation/storage/missall", nil)
danglingStorageMeter = metrics.NewRegisteredMeter("pathdb/generation/storage/dangling", nil)
successfulRangeProofMeter = metrics.NewRegisteredMeter("pathdb/generation/proof/success", nil)
failedRangeProofMeter = metrics.NewRegisteredMeter("pathdb/generation/proof/failure", nil)
accountProveCounter = metrics.NewRegisteredCounter("pathdb/generation/duration/account/prove", nil)
accountTrieReadCounter = metrics.NewRegisteredCounter("pathdb/generation/duration/account/trieread", nil)
accountSnapReadCounter = metrics.NewRegisteredCounter("pathdb/generation/duration/account/snapread", nil)
accountWriteCounter = metrics.NewRegisteredCounter("pathdb/generation/duration/account/write", nil)
storageProveCounter = metrics.NewRegisteredCounter("pathdb/generation/duration/storage/prove", nil)
storageTrieReadCounter = metrics.NewRegisteredCounter("pathdb/generation/duration/storage/trieread", nil)
storageSnapReadCounter = metrics.NewRegisteredCounter("pathdb/generation/duration/storage/snapread", nil)
storageWriteCounter = metrics.NewRegisteredCounter("pathdb/generation/duration/storage/write", nil)
storageCleanCounter = metrics.NewRegisteredCounter("state/snapshot/generation/duration/storage/clean", nil)
)

View File

@ -22,8 +22,10 @@ import (
"slices"
"sync"
"github.com/VictoriaMetrics/fastcache"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/rlp"
@ -409,6 +411,11 @@ func (s *stateSet) decode(r *rlp.Stream) error {
return nil
}
// write flushes state mutations into the provided database batch as a whole.
func (s *stateSet) write(batch ethdb.Batch, genMarker []byte, clean *fastcache.Cache) (int, int) {
return writeStates(batch, genMarker, s.accountData, s.storageData, clean)
}
// reset clears all cached state data, including any optional sorted lists that
// may have been generated.
func (s *stateSet) reset() {
@ -420,8 +427,6 @@ func (s *stateSet) reset() {
}
// dbsize returns the approximate size for db write.
//
// nolint:unused
func (s *stateSet) dbsize() int {
m := len(s.accountData) * len(rawdb.SnapshotAccountPrefix)
for _, slots := range s.storageData {