// Copyright 2022 The go-ethereum Authors // This file is part of the go-ethereum library. // // The go-ethereum library is free software: you can redistribute it and/or modify // it under the terms of the GNU Lesser General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // The go-ethereum library is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU Lesser General Public License for more details. // // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see . package pathdb import ( "errors" "fmt" "io" "sync" "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/trie/trienode" ) const ( // defaultCleanSize is the default memory allowance of clean cache. defaultCleanSize = 16 * 1024 * 1024 // maxBufferSize is the maximum memory allowance of node buffer. // Too large buffer will cause the system to pause for a long // time when write happens. Also, the largest batch that pebble can // support is 4GB, node will panic if batch size exceeds this limit. maxBufferSize = 256 * 1024 * 1024 // defaultBufferSize is the default memory allowance of node buffer // that aggregates the writes from above until it's flushed into the // disk. It's meant to be used once the initial sync is finished. // Do not increase the buffer size arbitrarily, otherwise the system // pause time will increase when the database writes happen. defaultBufferSize = 64 * 1024 * 1024 ) var ( // maxDiffLayers is the maximum diff layers allowed in the layer tree. maxDiffLayers = 128 ) // layer is the interface implemented by all state layers which includes some // public methods and some additional methods for internal usage. type layer interface { // node retrieves the trie node with the node info. An error will be returned // if the read operation exits abnormally. Specifically, if the layer is // already stale. // // Note: // - the returned node is not a copy, please don't modify it. // - no error will be returned if the requested node is not found in database. node(owner common.Hash, path []byte, depth int) ([]byte, common.Hash, *nodeLoc, error) // account directly retrieves the account RLP associated with a particular // hash in the slim data format. An error will be returned if the read // operation exits abnormally. Specifically, if the layer is already stale. // // Note: // - the returned account is not a copy, please don't modify it. // - no error will be returned if the requested account is not found in database. account(hash common.Hash, depth int) ([]byte, error) // storage directly retrieves the storage data associated with a particular hash, // within a particular account. An error will be returned if the read operation // exits abnormally. Specifically, if the layer is already stale. // // Note: // - the returned storage data is not a copy, please don't modify it. // - no error will be returned if the requested slot is not found in database. storage(accountHash, storageHash common.Hash, depth int) ([]byte, error) // rootHash returns the root hash for which this layer was made. rootHash() common.Hash // stateID returns the associated state id of layer. stateID() uint64 // parentLayer returns the subsequent layer of it, or nil if the disk was reached. parentLayer() layer // update creates a new layer on top of the existing layer diff tree with // the provided dirty trie nodes along with the state change set. // // Note, the maps are retained by the method to avoid copying everything. update(root common.Hash, id uint64, block uint64, nodes *nodeSet, states *StateSetWithOrigin) *diffLayer // journal commits an entire diff hierarchy to disk into a single journal entry. // This is meant to be used during shutdown to persist the layer without // flattening everything down (bad for reorgs). journal(w io.Writer) error } // Config contains the settings for database. type Config struct { StateHistory uint64 // Number of recent blocks to maintain state history for CleanCacheSize int // Maximum memory allowance (in bytes) for caching clean nodes WriteBufferSize int // Maximum memory allowance (in bytes) for write buffer ReadOnly bool // Flag whether the database is opened in read only mode. } // sanitize checks the provided user configurations and changes anything that's // unreasonable or unworkable. func (c *Config) sanitize() *Config { conf := *c if conf.WriteBufferSize > maxBufferSize { log.Warn("Sanitizing invalid node buffer size", "provided", common.StorageSize(conf.WriteBufferSize), "updated", common.StorageSize(maxBufferSize)) conf.WriteBufferSize = maxBufferSize } return &conf } // fields returns a list of attributes of config for printing. func (c *Config) fields() []interface{} { var list []interface{} if c.ReadOnly { list = append(list, "readonly", true) } list = append(list, "cache", common.StorageSize(c.CleanCacheSize)) list = append(list, "buffer", common.StorageSize(c.WriteBufferSize)) list = append(list, "history", c.StateHistory) return list } // Defaults contains default settings for Ethereum mainnet. var Defaults = &Config{ StateHistory: params.FullImmutabilityThreshold, CleanCacheSize: defaultCleanSize, WriteBufferSize: defaultBufferSize, } // ReadOnly is the config in order to open database in read only mode. var ReadOnly = &Config{ReadOnly: true} // Database is a multiple-layered structure for maintaining in-memory states // along with its dirty trie nodes. It consists of one persistent base layer // backed by a key-value store, on top of which arbitrarily many in-memory diff // layers are stacked. The memory diffs can form a tree with branching, but the // disk layer is singleton and common to all. If a reorg goes deeper than the // disk layer, a batch of reverse diffs can be applied to rollback. The deepest // reorg that can be handled depends on the amount of state histories tracked // in the disk. // // At most one readable and writable database can be opened at the same time in // the whole system which ensures that only one database writer can operate the // persistent state. Unexpected open operations can cause the system to panic. type Database struct { // readOnly is the flag whether the mutation is allowed to be applied. // It will be set automatically when the database is journaled during // the shutdown to reject all following unexpected mutations. readOnly bool // Flag if database is opened in read only mode waitSync bool // Flag if database is deactivated due to initial state sync isVerkle bool // Flag if database is used for verkle tree config *Config // Configuration for database diskdb ethdb.Database // Persistent storage for matured trie nodes tree *layerTree // The group for all known layers freezer ethdb.ResettableAncientStore // Freezer for storing trie histories, nil possible in tests lock sync.RWMutex // Lock to prevent mutations from happening at the same time } // New attempts to load an already existing layer from a persistent key-value // store (with a number of memory layers from a journal). If the journal is not // matched with the base persistent layer, all the recorded diff layers are discarded. func New(diskdb ethdb.Database, config *Config, isVerkle bool) *Database { if config == nil { config = Defaults } config = config.sanitize() // Establish a dedicated database namespace tailored for verkle-specific // data, ensuring the isolation of both verkle and merkle tree data. It's // important to note that the introduction of a prefix won't lead to // substantial storage overhead, as the underlying database will efficiently // compress the shared key prefix. if isVerkle { diskdb = rawdb.NewTable(diskdb, string(rawdb.VerklePrefix)) } db := &Database{ readOnly: config.ReadOnly, isVerkle: isVerkle, config: config, diskdb: diskdb, } // Construct the layer tree by resolving the in-disk singleton state // and in-memory layer journal. db.tree = newLayerTree(db.loadLayers()) // Repair the state history, which might not be aligned with the state // in the key-value store due to an unclean shutdown. if err := db.repairHistory(); err != nil { log.Crit("Failed to repair state history", "err", err) } // Disable database in case node is still in the initial state sync stage. if rawdb.ReadSnapSyncStatusFlag(diskdb) == rawdb.StateSyncRunning && !db.readOnly { if err := db.Disable(); err != nil { log.Crit("Failed to disable database", "err", err) // impossible to happen } } fields := config.fields() if db.isVerkle { fields = append(fields, "verkle", true) } log.Info("Initialized path database", fields...) return db } // repairHistory truncates leftover state history objects, which may occur due // to an unclean shutdown or other unexpected reasons. func (db *Database) repairHistory() error { // Open the freezer for state history. This mechanism ensures that // only one database instance can be opened at a time to prevent // accidental mutation. ancient, err := db.diskdb.AncientDatadir() if err != nil { // TODO error out if ancient store is disabled. A tons of unit tests // disable the ancient store thus the error here will immediately fail // all of them. Fix the tests first. return nil } freezer, err := rawdb.NewStateFreezer(ancient, db.isVerkle, db.readOnly) if err != nil { log.Crit("Failed to open state history freezer", "err", err) } db.freezer = freezer // Reset the entire state histories if the trie database is not initialized // yet. This action is necessary because these state histories are not // expected to exist without an initialized trie database. id := db.tree.bottom().stateID() if id == 0 { frozen, err := db.freezer.Ancients() if err != nil { log.Crit("Failed to retrieve head of state history", "err", err) } if frozen != 0 { err := db.freezer.Reset() if err != nil { log.Crit("Failed to reset state histories", "err", err) } log.Info("Truncated extraneous state history") } return nil } // Truncate the extra state histories above in freezer in case it's not // aligned with the disk layer. It might happen after a unclean shutdown. pruned, err := truncateFromHead(db.diskdb, db.freezer, id) if err != nil { log.Crit("Failed to truncate extra state histories", "err", err) } if pruned != 0 { log.Warn("Truncated extra state histories", "number", pruned) } return nil } // Update adds a new layer into the tree, if that can be linked to an existing // old parent. It is disallowed to insert a disk layer (the origin of all). Apart // from that this function will flatten the extra diff layers at bottom into disk // to only keep 128 diff layers in memory by default. // // The passed in maps(nodes, states) will be retained to avoid copying everything. // Therefore, these maps must not be changed afterwards. func (db *Database) Update(root common.Hash, parentRoot common.Hash, block uint64, nodes *trienode.MergedNodeSet, states *StateSetWithOrigin) error { // Hold the lock to prevent concurrent mutations. db.lock.Lock() defer db.lock.Unlock() // Short circuit if the mutation is not allowed. if err := db.modifyAllowed(); err != nil { return err } if err := db.tree.add(root, parentRoot, block, nodes, states); err != nil { return err } // Keep 128 diff layers in the memory, persistent layer is 129th. // - head layer is paired with HEAD state // - head-1 layer is paired with HEAD-1 state // - head-127 layer(bottom-most diff layer) is paired with HEAD-127 state // - head-128 layer(disk layer) is paired with HEAD-128 state return db.tree.cap(root, maxDiffLayers) } // Commit traverses downwards the layer tree from a specified layer with the // provided state root and all the layers below are flattened downwards. It // can be used alone and mostly for test purposes. func (db *Database) Commit(root common.Hash, report bool) error { // Hold the lock to prevent concurrent mutations. db.lock.Lock() defer db.lock.Unlock() // Short circuit if the mutation is not allowed. if err := db.modifyAllowed(); err != nil { return err } return db.tree.cap(root, 0) } // Disable deactivates the database and invalidates all available state layers // as stale to prevent access to the persistent state, which is in the syncing // stage. func (db *Database) Disable() error { db.lock.Lock() defer db.lock.Unlock() // Short circuit if the database is in read only mode. if db.readOnly { return errDatabaseReadOnly } // Prevent duplicated disable operation. if db.waitSync { log.Error("Reject duplicated disable operation") return nil } db.waitSync = true // Mark the disk layer as stale to prevent access to persistent state. db.tree.bottom().markStale() // Write the initial sync flag to persist it across restarts. rawdb.WriteSnapSyncStatusFlag(db.diskdb, rawdb.StateSyncRunning) log.Info("Disabled trie database due to state sync") return nil } // Enable activates database and resets the state tree with the provided persistent // state root once the state sync is finished. func (db *Database) Enable(root common.Hash) error { db.lock.Lock() defer db.lock.Unlock() // Short circuit if the database is in read only mode. if db.readOnly { return errDatabaseReadOnly } // Ensure the provided state root matches the stored one. root = types.TrieRootHash(root) stored := types.EmptyRootHash if blob := rawdb.ReadAccountTrieNode(db.diskdb, nil); len(blob) > 0 { stored = crypto.Keccak256Hash(blob) } if stored != root { return fmt.Errorf("state root mismatch: stored %x, synced %x", stored, root) } // Drop the stale state journal in persistent database and // reset the persistent state id back to zero. batch := db.diskdb.NewBatch() rawdb.DeleteTrieJournal(batch) rawdb.WritePersistentStateID(batch, 0) if err := batch.Write(); err != nil { return err } // Clean up all state histories in freezer. Theoretically // all root->id mappings should be removed as well. Since // mappings can be huge and might take a while to clear // them, just leave them in disk and wait for overwriting. if db.freezer != nil { if err := db.freezer.Reset(); err != nil { return err } } // Re-construct a new disk layer backed by persistent state // with **empty clean cache and node buffer**. db.tree.reset(newDiskLayer(root, 0, db, nil, newBuffer(db.config.WriteBufferSize, nil, nil, 0))) // Re-enable the database as the final step. db.waitSync = false rawdb.WriteSnapSyncStatusFlag(db.diskdb, rawdb.StateSyncFinished) log.Info("Rebuilt trie database", "root", root) return nil } // Recover rollbacks the database to a specified historical point. // The state is supported as the rollback destination only if it's // canonical state and the corresponding trie histories are existent. func (db *Database) Recover(root common.Hash) error { db.lock.Lock() defer db.lock.Unlock() // Short circuit if rollback operation is not supported if err := db.modifyAllowed(); err != nil { return err } if db.freezer == nil { return errors.New("state rollback is non-supported") } // Short circuit if the target state is not recoverable root = types.TrieRootHash(root) if !db.Recoverable(root) { return errStateUnrecoverable } // Apply the state histories upon the disk layer in order var ( start = time.Now() dl = db.tree.bottom() ) for dl.rootHash() != root { h, err := readHistory(db.freezer, dl.stateID()) if err != nil { return err } dl, err = dl.revert(h) if err != nil { return err } // reset layer with newly created disk layer. It must be // done after each revert operation, otherwise the new // disk layer won't be accessible from outside. db.tree.reset(dl) } rawdb.DeleteTrieJournal(db.diskdb) _, err := truncateFromHead(db.diskdb, db.freezer, dl.stateID()) if err != nil { return err } log.Debug("Recovered state", "root", root, "elapsed", common.PrettyDuration(time.Since(start))) return nil } // Recoverable returns the indicator if the specified state is recoverable. func (db *Database) Recoverable(root common.Hash) bool { // Ensure the requested state is a known state. root = types.TrieRootHash(root) id := rawdb.ReadStateID(db.diskdb, root) if id == nil { return false } // Recoverable state must below the disk layer. The recoverable // state only refers the state that is currently not available, // but can be restored by applying state history. dl := db.tree.bottom() if *id >= dl.stateID() { return false } // This is a temporary workaround for the unavailability of the freezer in // dev mode. As a consequence, the Pathdb loses the ability for deep reorg // in certain cases. // TODO(rjl493456442): Implement the in-memory ancient store. if db.freezer == nil { return false } // Ensure the requested state is a canonical state and all state // histories in range [id+1, disklayer.ID] are present and complete. return checkHistories(db.freezer, *id+1, dl.stateID()-*id, func(m *meta) error { if m.parent != root { return errors.New("unexpected state history") } root = m.root return nil }) == nil } // Close closes the trie database and the held freezer. func (db *Database) Close() error { db.lock.Lock() defer db.lock.Unlock() // Set the database to read-only mode to prevent all // following mutations. db.readOnly = true // Release the memory held by clean cache. db.tree.bottom().resetCache() // Close the attached state history freezer. if db.freezer == nil { return nil } return db.freezer.Close() } // Size returns the current storage size of the memory cache in front of the // persistent database layer. func (db *Database) Size() (diffs common.StorageSize, nodes common.StorageSize) { db.tree.forEach(func(layer layer) { if diff, ok := layer.(*diffLayer); ok { diffs += common.StorageSize(diff.size()) } if disk, ok := layer.(*diskLayer); ok { nodes += disk.size() } }) return diffs, nodes } // Initialized returns an indicator if the state data is already // initialized in path-based scheme. func (db *Database) Initialized(genesisRoot common.Hash) bool { var inited bool db.tree.forEach(func(layer layer) { if layer.rootHash() != types.EmptyRootHash { inited = true } }) if !inited { inited = rawdb.ReadSnapSyncStatusFlag(db.diskdb) != rawdb.StateSyncUnknown } return inited } // modifyAllowed returns the indicator if mutation is allowed. This function // assumes the db.lock is already held. func (db *Database) modifyAllowed() error { if db.readOnly { return errDatabaseReadOnly } if db.waitSync { return errDatabaseWaitSync } return nil } // AccountHistory inspects the account history within the specified range. // // Start: State ID of the first history object for the query. 0 implies the first // available object is selected as the starting point. // // End: State ID of the last history for the query. 0 implies the last available // object is selected as the ending point. Note end is included in the query. func (db *Database) AccountHistory(address common.Address, start, end uint64) (*HistoryStats, error) { return accountHistory(db.freezer, address, start, end) } // StorageHistory inspects the storage history within the specified range. // // Start: State ID of the first history object for the query. 0 implies the first // available object is selected as the starting point. // // End: State ID of the last history for the query. 0 implies the last available // object is selected as the ending point. Note end is included in the query. // // Note, slot refers to the hash of the raw slot key. func (db *Database) StorageHistory(address common.Address, slot common.Hash, start uint64, end uint64) (*HistoryStats, error) { return storageHistory(db.freezer, address, slot, start, end) } // HistoryRange returns the block numbers associated with earliest and latest // state history in the local store. func (db *Database) HistoryRange() (uint64, uint64, error) { return historyRange(db.freezer) } // AccountIterator creates a new account iterator for the specified root hash and // seeks to a starting account hash. func (db *Database) AccountIterator(root common.Hash, seek common.Hash) (AccountIterator, error) { return newFastAccountIterator(db, root, seek) } // StorageIterator creates a new storage iterator for the specified root hash and // account. The iterator will be moved to the specific start position. func (db *Database) StorageIterator(root common.Hash, account common.Hash, seek common.Hash) (StorageIterator, error) { return newFastStorageIterator(db, root, account, seek) }