core/rawdb, triedb/pathdb: implement history indexer
This commit is contained in:
parent
a07beeb2c8
commit
53691eec4e
|
@ -0,0 +1,170 @@
|
|||
// Copyright 2025 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package rawdb
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/ethdb"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
)
|
||||
|
||||
// ReadLastStateHistoryIndex retrieves the number of latest indexed state history.
|
||||
func ReadLastStateHistoryIndex(db ethdb.KeyValueReader) *uint64 {
|
||||
data, _ := db.Get(headStateHistoryIndexKey)
|
||||
if len(data) != 8 {
|
||||
return nil
|
||||
}
|
||||
number := binary.BigEndian.Uint64(data)
|
||||
return &number
|
||||
}
|
||||
|
||||
// WriteLastStateHistoryIndex stores the number of latest indexed state history
|
||||
// into database.
|
||||
func WriteLastStateHistoryIndex(db ethdb.KeyValueWriter, number uint64) {
|
||||
if err := db.Put(headStateHistoryIndexKey, encodeBlockNumber(number)); err != nil {
|
||||
log.Crit("Failed to store the state index tail", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
// DeleteLastStateHistoryIndex removes the number of latest indexed state history.
|
||||
func DeleteLastStateHistoryIndex(db ethdb.KeyValueWriter) {
|
||||
if err := db.Delete(headStateHistoryIndexKey); err != nil {
|
||||
log.Crit("Failed to delete the state index tail", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
// ReadAccountHistoryIndex retrieves the account history index with the provided
|
||||
// account address.
|
||||
func ReadAccountHistoryIndex(db ethdb.KeyValueReader, address common.Address) []byte {
|
||||
data, err := db.Get(accountHistoryIndexKey(address))
|
||||
if err != nil || len(data) == 0 {
|
||||
return nil
|
||||
}
|
||||
return data
|
||||
}
|
||||
|
||||
// WriteAccountHistoryIndex writes the provided account history index into database.
|
||||
func WriteAccountHistoryIndex(db ethdb.KeyValueWriter, address common.Address, data []byte) {
|
||||
if err := db.Put(accountHistoryIndexKey(address), data); err != nil {
|
||||
log.Crit("Failed to store account history index", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
// DeleteAccountHistoryIndex deletes the specified account history index from
|
||||
// the database.
|
||||
func DeleteAccountHistoryIndex(db ethdb.KeyValueWriter, address common.Address) {
|
||||
if err := db.Delete(accountHistoryIndexKey(address)); err != nil {
|
||||
log.Crit("Failed to delete account history index", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
// ReadStorageHistoryIndex retrieves the storage history index with the provided
|
||||
// account address and storage key hash.
|
||||
func ReadStorageHistoryIndex(db ethdb.KeyValueReader, address common.Address, storageHash common.Hash) []byte {
|
||||
data, err := db.Get(storageHistoryIndexKey(address, storageHash))
|
||||
if err != nil || len(data) == 0 {
|
||||
return nil
|
||||
}
|
||||
return data
|
||||
}
|
||||
|
||||
// WriteStorageHistoryIndex writes the provided storage history index into database.
|
||||
func WriteStorageHistoryIndex(db ethdb.KeyValueWriter, address common.Address, storageHash common.Hash, data []byte) {
|
||||
if err := db.Put(storageHistoryIndexKey(address, storageHash), data); err != nil {
|
||||
log.Crit("Failed to store storage history index", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
// DeleteStorageHistoryIndex deletes the specified state index from the database.
|
||||
func DeleteStorageHistoryIndex(db ethdb.KeyValueWriter, address common.Address, storageHash common.Hash) {
|
||||
if err := db.Delete(storageHistoryIndexKey(address, storageHash)); err != nil {
|
||||
log.Crit("Failed to delete storage history index", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
// ReadAccountHistoryIndexBlock retrieves the index block with the provided
|
||||
// account address along with the block id.
|
||||
func ReadAccountHistoryIndexBlock(db ethdb.KeyValueReader, address common.Address, blockID uint32) []byte {
|
||||
data, err := db.Get(accountHistoryIndexBlockKey(address, blockID))
|
||||
if err != nil || len(data) == 0 {
|
||||
return nil
|
||||
}
|
||||
return data
|
||||
}
|
||||
|
||||
// WriteAccountHistoryIndexBlock writes the provided index block into database.
|
||||
func WriteAccountHistoryIndexBlock(db ethdb.KeyValueWriter, address common.Address, blockID uint32, data []byte) {
|
||||
if err := db.Put(accountHistoryIndexBlockKey(address, blockID), data); err != nil {
|
||||
log.Crit("Failed to store account index block", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
// DeleteAccountHistoryIndexBlock deletes the specified index block from the database.
|
||||
func DeleteAccountHistoryIndexBlock(db ethdb.KeyValueWriter, address common.Address, blockID uint32) {
|
||||
if err := db.Delete(accountHistoryIndexBlockKey(address, blockID)); err != nil {
|
||||
log.Crit("Failed to delete account index block", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
// ReadStorageHistoryIndexBlock retrieves the index block with the provided state
|
||||
// identifier along with the block id.
|
||||
func ReadStorageHistoryIndexBlock(db ethdb.KeyValueReader, address common.Address, storageHash common.Hash, blockID uint32) []byte {
|
||||
data, err := db.Get(storageHistoryIndexBlockKey(address, storageHash, blockID))
|
||||
if err != nil || len(data) == 0 {
|
||||
return nil
|
||||
}
|
||||
return data
|
||||
}
|
||||
|
||||
// WriteStorageHistoryIndexBlock writes the provided index block into database.
|
||||
func WriteStorageHistoryIndexBlock(db ethdb.KeyValueWriter, address common.Address, storageHash common.Hash, id uint32, data []byte) {
|
||||
if err := db.Put(storageHistoryIndexBlockKey(address, storageHash, id), data); err != nil {
|
||||
log.Crit("Failed to store storage index block", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
// DeleteStorageHistoryIndexBlock deletes the specified index block from the database.
|
||||
func DeleteStorageHistoryIndexBlock(db ethdb.KeyValueWriter, address common.Address, state common.Hash, id uint32) {
|
||||
if err := db.Delete(storageHistoryIndexBlockKey(address, state, id)); err != nil {
|
||||
log.Crit("Failed to delete storage index block", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
// increaseKey increase the input key by one bit. Return nil if the entire
|
||||
// addition operation overflows.
|
||||
func increaseKey(key []byte) []byte {
|
||||
for i := len(key) - 1; i >= 0; i-- {
|
||||
key[i]++
|
||||
if key[i] != 0x0 {
|
||||
return key
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeleteHistoryIndex completely removes all history indexing data, including indexes
|
||||
// for accounts and storages.
|
||||
//
|
||||
// Note, this method assumes the storage space with prefix `StateHistoryIndexPrefix`
|
||||
// is exclusively occupied by the history indexing data!
|
||||
func DeleteHistoryIndex(db ethdb.KeyValueRangeDeleter) {
|
||||
if err := db.DeleteRange(StateHistoryIndexPrefix, increaseKey(StateHistoryIndexPrefix)); err != nil {
|
||||
log.Crit("Failed to delete history index range", "err", err)
|
||||
}
|
||||
}
|
|
@ -18,6 +18,7 @@ package rawdb
|
|||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/ethdb"
|
||||
|
@ -251,6 +252,36 @@ func ReadStateHistory(db ethdb.AncientReaderOp, id uint64) ([]byte, []byte, []by
|
|||
return meta, accountIndex, storageIndex, accountData, storageData, nil
|
||||
}
|
||||
|
||||
// ReadStateHistoryList retrieves a list of state histories from database with
|
||||
// specific range. Compute the position of state history in freezer by minus one
|
||||
// since the id of first state history starts from one(zero for initial state).
|
||||
func ReadStateHistoryList(db ethdb.AncientReaderOp, start uint64, count uint64) ([][]byte, [][]byte, [][]byte, [][]byte, [][]byte, error) {
|
||||
metaList, err := db.AncientRange(stateHistoryMeta, start-1, count, 0)
|
||||
if err != nil {
|
||||
return nil, nil, nil, nil, nil, err
|
||||
}
|
||||
aIndexList, err := db.AncientRange(stateHistoryAccountIndex, start-1, count, 0)
|
||||
if err != nil {
|
||||
return nil, nil, nil, nil, nil, err
|
||||
}
|
||||
sIndexList, err := db.AncientRange(stateHistoryStorageIndex, start-1, count, 0)
|
||||
if err != nil {
|
||||
return nil, nil, nil, nil, nil, err
|
||||
}
|
||||
aDataList, err := db.AncientRange(stateHistoryAccountData, start-1, count, 0)
|
||||
if err != nil {
|
||||
return nil, nil, nil, nil, nil, err
|
||||
}
|
||||
sDataList, err := db.AncientRange(stateHistoryStorageData, start-1, count, 0)
|
||||
if err != nil {
|
||||
return nil, nil, nil, nil, nil, err
|
||||
}
|
||||
if len(metaList) != len(aIndexList) || len(metaList) != len(sIndexList) || len(metaList) != len(aDataList) || len(metaList) != len(sDataList) {
|
||||
return nil, nil, nil, nil, nil, errors.New("state history is corrupted")
|
||||
}
|
||||
return metaList, aIndexList, sIndexList, aDataList, sDataList, nil
|
||||
}
|
||||
|
||||
// WriteStateHistory writes the provided state history to database. Compute the
|
||||
// position of state history in freezer by minus one since the id of first state
|
||||
// history starts from one(zero for initial state).
|
||||
|
|
|
@ -379,6 +379,9 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error {
|
|||
beaconHeaders stat
|
||||
cliqueSnaps stat
|
||||
|
||||
// Path-mode archive data
|
||||
stateIndex stat
|
||||
|
||||
// Verkle statistics
|
||||
verkleTries stat
|
||||
verkleStateLookups stat
|
||||
|
@ -452,6 +455,8 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error {
|
|||
bytes.HasPrefix(key, BloomTrieIndexPrefix) ||
|
||||
bytes.HasPrefix(key, BloomTriePrefix): // Bloomtrie sub
|
||||
bloomTrieNodes.Add(size)
|
||||
case bytes.HasPrefix(key, StateHistoryIndexPrefix) && len(key) >= len(StateHistoryIndexPrefix)+common.AddressLength:
|
||||
stateIndex.Add(size)
|
||||
|
||||
// Verkle trie data is detected, determine the sub-category
|
||||
case bytes.HasPrefix(key, VerklePrefix):
|
||||
|
@ -510,6 +515,7 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error {
|
|||
{"Key-Value store", "Path trie state lookups", stateLookups.Size(), stateLookups.Count()},
|
||||
{"Key-Value store", "Path trie account nodes", accountTries.Size(), accountTries.Count()},
|
||||
{"Key-Value store", "Path trie storage nodes", storageTries.Size(), storageTries.Count()},
|
||||
{"Key-Value store", "Path state history indexes", stateIndex.Size(), stateIndex.Count()},
|
||||
{"Key-Value store", "Verkle trie nodes", verkleTries.Size(), verkleTries.Count()},
|
||||
{"Key-Value store", "Verkle trie state lookups", verkleStateLookups.Size(), verkleStateLookups.Count()},
|
||||
{"Key-Value store", "Trie preimages", preimages.Size(), preimages.Count()},
|
||||
|
|
|
@ -76,6 +76,10 @@ var (
|
|||
// trieJournalKey tracks the in-memory trie node layers across restarts.
|
||||
trieJournalKey = []byte("TrieJournal")
|
||||
|
||||
// headStateHistoryIndexKey tracks the ID of the latest state history that has
|
||||
// been indexed.
|
||||
headStateHistoryIndexKey = []byte("LastStateHistoryIndex")
|
||||
|
||||
// txIndexTailKey tracks the oldest block whose transactions have been indexed.
|
||||
txIndexTailKey = []byte("TransactionIndexTail")
|
||||
|
||||
|
@ -117,6 +121,9 @@ var (
|
|||
TrieNodeStoragePrefix = []byte("O") // TrieNodeStoragePrefix + accountHash + hexPath -> trie node
|
||||
stateIDPrefix = []byte("L") // stateIDPrefix + state root -> state id
|
||||
|
||||
// State history indexing within path-based storage scheme
|
||||
StateHistoryIndexPrefix = []byte("m") // StateHistoryIndexPrefix + account address or (account address + slotHash) -> index
|
||||
|
||||
// VerklePrefix is the database prefix for Verkle trie data, which includes:
|
||||
// (a) Trie nodes
|
||||
// (b) In-memory trie node journal
|
||||
|
@ -341,3 +348,27 @@ func IsStorageTrieNode(key []byte) bool {
|
|||
ok, _, _ := ResolveStorageTrieNode(key)
|
||||
return ok
|
||||
}
|
||||
|
||||
// accountHistoryIndexKey = StateHistoryIndexPrefix + address
|
||||
func accountHistoryIndexKey(address common.Address) []byte {
|
||||
return append(StateHistoryIndexPrefix, address.Bytes()...)
|
||||
}
|
||||
|
||||
// storageHistoryIndexKey = StateHistoryIndexPrefix + address + storageHash
|
||||
func storageHistoryIndexKey(address common.Address, storageHash common.Hash) []byte {
|
||||
return append(append(StateHistoryIndexPrefix, address.Bytes()...), storageHash.Bytes()...)
|
||||
}
|
||||
|
||||
// accountHistoryIndexBlockKey = StateHistoryIndexPrefix + address + blockID
|
||||
func accountHistoryIndexBlockKey(address common.Address, blockID uint32) []byte {
|
||||
var buf [4]byte
|
||||
binary.BigEndian.PutUint32(buf[:], blockID)
|
||||
return append(append(StateHistoryIndexPrefix, address.Bytes()...), buf[:]...)
|
||||
}
|
||||
|
||||
// storageHistoryIndexBlockKey = StateHistoryIndexPrefix + address + storageHash + blockID
|
||||
func storageHistoryIndexBlockKey(address common.Address, storageHash common.Hash, blockID uint32) []byte {
|
||||
var buf [4]byte
|
||||
binary.BigEndian.PutUint32(buf[:], blockID)
|
||||
return append(append(append(StateHistoryIndexPrefix, address.Bytes()...), storageHash.Bytes()...), buf[:]...)
|
||||
}
|
||||
|
|
|
@ -209,6 +209,7 @@ type Database struct {
|
|||
tree *layerTree // The group for all known layers
|
||||
freezer ethdb.ResettableAncientStore // Freezer for storing trie histories, nil possible in tests
|
||||
lock sync.RWMutex // Lock to prevent mutations from happening at the same time
|
||||
indexer *historyIndexer // History indexer
|
||||
}
|
||||
|
||||
// New attempts to load an already existing layer from a persistent key-value
|
||||
|
@ -257,6 +258,10 @@ func New(diskdb ethdb.Database, config *Config, isVerkle bool) *Database {
|
|||
// might be scheduled.
|
||||
db.setStateGenerator()
|
||||
|
||||
// TODO (rjl493456442) disable the background indexing in read-only mode
|
||||
if db.freezer != nil {
|
||||
db.indexer = newHistoryIndexer(db.diskdb, db.freezer, db.tree.bottom().stateID())
|
||||
}
|
||||
fields := config.fields()
|
||||
if db.isVerkle {
|
||||
fields = append(fields, "verkle", true)
|
||||
|
@ -294,6 +299,11 @@ func (db *Database) repairHistory() error {
|
|||
log.Crit("Failed to retrieve head of state history", "err", err)
|
||||
}
|
||||
if frozen != 0 {
|
||||
// TODO(rjl493456442) would be better to group them into a batch.
|
||||
//
|
||||
// Purge all state history indexing data first
|
||||
rawdb.DeleteLastStateHistoryIndex(db.diskdb)
|
||||
rawdb.DeleteHistoryIndex(db.diskdb)
|
||||
err := db.freezer.Reset()
|
||||
if err != nil {
|
||||
log.Crit("Failed to reset state histories", "err", err)
|
||||
|
@ -467,6 +477,11 @@ func (db *Database) Enable(root common.Hash) error {
|
|||
// mappings can be huge and might take a while to clear
|
||||
// them, just leave them in disk and wait for overwriting.
|
||||
if db.freezer != nil {
|
||||
// TODO(rjl493456442) would be better to group them into a batch.
|
||||
//
|
||||
// Purge all state history indexing data first
|
||||
rawdb.DeleteLastStateHistoryIndex(db.diskdb)
|
||||
rawdb.DeleteHistoryIndex(db.diskdb)
|
||||
if err := db.freezer.Reset(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -580,6 +595,10 @@ func (db *Database) Close() error {
|
|||
}
|
||||
disk.resetCache() // release the memory held by clean cache
|
||||
|
||||
// Terminate the background state history indexer
|
||||
if db.indexer != nil {
|
||||
db.indexer.close()
|
||||
}
|
||||
// Close the attached state history freezer.
|
||||
if db.freezer == nil {
|
||||
return nil
|
||||
|
|
|
@ -163,6 +163,20 @@ func (t *tester) hashPreimage(hash common.Hash) common.Hash {
|
|||
return common.BytesToHash(t.preimages[hash])
|
||||
}
|
||||
|
||||
func (t *tester) extend(layers int) {
|
||||
for i := 0; i < layers; i++ {
|
||||
var parent = types.EmptyRootHash
|
||||
if len(t.roots) != 0 {
|
||||
parent = t.roots[len(t.roots)-1]
|
||||
}
|
||||
root, nodes, states := t.generate(parent, true)
|
||||
if err := t.db.Update(root, parent, uint64(i), nodes, states); err != nil {
|
||||
panic(fmt.Errorf("failed to update state changes, err: %w", err))
|
||||
}
|
||||
t.roots = append(t.roots, root)
|
||||
}
|
||||
}
|
||||
|
||||
func (t *tester) release() {
|
||||
t.db.Close()
|
||||
t.db.diskdb.Close()
|
||||
|
|
|
@ -319,6 +319,12 @@ func (dl *diskLayer) commit(bottom *diffLayer, force bool) (*diskLayer, error) {
|
|||
overflow = true
|
||||
oldest = bottom.stateID() - limit + 1 // track the id of history **after truncation**
|
||||
}
|
||||
// Notify the state history indexer for newly created history
|
||||
if dl.db.indexer != nil {
|
||||
if err := dl.db.indexer.extend(bottom.stateID()); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
// Mark the diskLayer as stale before applying any mutations on top.
|
||||
dl.stale = true
|
||||
|
@ -411,6 +417,12 @@ func (dl *diskLayer) revert(h *history) (*diskLayer, error) {
|
|||
|
||||
dl.stale = true
|
||||
|
||||
// Unindex the corresponding state history
|
||||
if dl.db.indexer != nil {
|
||||
if err := dl.db.indexer.shorten(dl.id); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
// State change may be applied to node buffer, or the persistent
|
||||
// state, depends on if node buffer is empty or not. If the node
|
||||
// buffer is not empty, it means that the state transition that
|
||||
|
|
|
@ -738,7 +738,7 @@ func (g *generator) generateAccounts(ctx *generatorContext, accMarker []byte) er
|
|||
return err
|
||||
}
|
||||
}
|
||||
// Some account processed, unmark the marker
|
||||
// Some account counter, unmark the marker
|
||||
accMarker = nil
|
||||
return nil
|
||||
}
|
||||
|
@ -784,7 +784,7 @@ func (g *generator) generate(ctx *generatorContext) {
|
|||
// Even if the snapshot data is updated during the interruption (before
|
||||
// or at the marker), the assumption is still held.
|
||||
// For the account or storage slot at the interruption, they will be
|
||||
// processed twice by the generator(they are already processed in the
|
||||
// counter twice by the generator(they are already counter in the
|
||||
// last run) but it's fine.
|
||||
var (
|
||||
accMarker, _ = splitMarker(g.progress)
|
||||
|
|
|
@ -510,25 +510,41 @@ func (h *history) decode(accountData, storageData, accountIndexes, storageIndexe
|
|||
|
||||
// readHistory reads and decodes the state history object by the given id.
|
||||
func readHistory(reader ethdb.AncientReader, id uint64) (*history, error) {
|
||||
blob := rawdb.ReadStateHistoryMeta(reader, id)
|
||||
if len(blob) == 0 {
|
||||
return nil, fmt.Errorf("state history not found %d", id)
|
||||
mData, accountIndexes, storageIndexes, accountData, storageData, err := rawdb.ReadStateHistory(reader, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var m meta
|
||||
if err := m.decode(blob); err != nil {
|
||||
if err := m.decode(mData); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var (
|
||||
dec = history{meta: &m}
|
||||
accountData = rawdb.ReadStateAccountHistory(reader, id)
|
||||
storageData = rawdb.ReadStateStorageHistory(reader, id)
|
||||
accountIndexes = rawdb.ReadStateAccountIndex(reader, id)
|
||||
storageIndexes = rawdb.ReadStateStorageIndex(reader, id)
|
||||
)
|
||||
if err := dec.decode(accountData, storageData, accountIndexes, storageIndexes); err != nil {
|
||||
h := history{meta: &m}
|
||||
if err := h.decode(accountData, storageData, accountIndexes, storageIndexes); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &dec, nil
|
||||
return &h, nil
|
||||
}
|
||||
|
||||
// readHistories reads and decodes a list of state histories with the specific
|
||||
// history range.
|
||||
func readHistories(freezer ethdb.AncientReader, start uint64, count uint64) ([]*history, error) {
|
||||
var histories []*history
|
||||
metaList, aIndexList, sIndexList, aDataList, sDataList, err := rawdb.ReadStateHistoryList(freezer, start, count)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for i := 0; i < len(metaList); i++ {
|
||||
var m meta
|
||||
if err := m.decode(metaList[i]); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
h := history{meta: &m}
|
||||
if err := h.decode(aDataList[i], sDataList[i], aIndexList[i], sIndexList[i]); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
histories = append(histories, &h)
|
||||
}
|
||||
return histories, nil
|
||||
}
|
||||
|
||||
// writeHistory persists the state history with the provided state set.
|
||||
|
|
|
@ -0,0 +1,433 @@
|
|||
// Copyright 2025 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/
|
||||
|
||||
package pathdb
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"sort"
|
||||
|
||||
"github.com/ethereum/go-ethereum/core/rawdb"
|
||||
"github.com/ethereum/go-ethereum/ethdb"
|
||||
)
|
||||
|
||||
// parseIndex parses the index data with the supplied byte stream. The index data
|
||||
// is a list of fixed-sized metadata. Empty metadata is regarded as invalid.
|
||||
func parseIndex(blob []byte) ([]*indexBlockDesc, error) {
|
||||
if len(blob) == 0 {
|
||||
return nil, errors.New("empty state history index")
|
||||
}
|
||||
if len(blob)%indexBlockDescSize != 0 {
|
||||
return nil, fmt.Errorf("corrupted state index, len: %d", len(blob))
|
||||
}
|
||||
var (
|
||||
lastID uint32
|
||||
lastMax uint64
|
||||
descList []*indexBlockDesc
|
||||
)
|
||||
for i := 0; i < len(blob)/indexBlockDescSize; i++ {
|
||||
var desc indexBlockDesc
|
||||
desc.decode(blob[i*indexBlockDescSize : (i+1)*indexBlockDescSize])
|
||||
if desc.empty() {
|
||||
return nil, errors.New("empty state history index block")
|
||||
}
|
||||
if desc.min > desc.max {
|
||||
return nil, fmt.Errorf("indexBlockDesc: min %d > max %d", desc.min, desc.max)
|
||||
}
|
||||
if lastID != 0 {
|
||||
if lastID+1 != desc.id {
|
||||
return nil, fmt.Errorf("index block id is out of order, last-id: %d, this-id: %d", lastID, desc.id)
|
||||
}
|
||||
if desc.min <= lastMax {
|
||||
return nil, fmt.Errorf("index block range is out of order, last-max: %d, this-min: %d", lastMax, desc.min)
|
||||
}
|
||||
}
|
||||
lastID = desc.id
|
||||
lastMax = desc.max
|
||||
descList = append(descList, &desc)
|
||||
}
|
||||
return descList, nil
|
||||
}
|
||||
|
||||
// indexReader is the structure to look up the state history index records
|
||||
// associated with the specific state element.
|
||||
type indexReader struct {
|
||||
db ethdb.KeyValueReader
|
||||
descList []*indexBlockDesc
|
||||
readers map[uint32]*blockReader
|
||||
state stateIdent
|
||||
}
|
||||
|
||||
// loadIndexData loads the index data associated with the specified state.
|
||||
func loadIndexData(db ethdb.KeyValueReader, state stateIdent) ([]*indexBlockDesc, error) {
|
||||
var blob []byte
|
||||
if state.account {
|
||||
blob = rawdb.ReadAccountHistoryIndex(db, state.address)
|
||||
} else {
|
||||
blob = rawdb.ReadStorageHistoryIndex(db, state.address, state.storageHash)
|
||||
}
|
||||
if len(blob) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
return parseIndex(blob)
|
||||
}
|
||||
|
||||
// newIndexReader constructs a index reader for the specified state. Reader with
|
||||
// empty data is allowed.
|
||||
func newIndexReader(db ethdb.KeyValueReader, state stateIdent) (*indexReader, error) {
|
||||
descList, err := loadIndexData(db, state)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &indexReader{
|
||||
descList: descList,
|
||||
readers: make(map[uint32]*blockReader),
|
||||
db: db,
|
||||
state: state,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// refresh reloads the last section of index data to account for any additional
|
||||
// elements that may have been written to disk.
|
||||
func (r *indexReader) refresh() error {
|
||||
// Release the reader for the last section of index data, as its content
|
||||
// may have been modified by additional elements written to the disk.
|
||||
if len(r.descList) != 0 {
|
||||
last := r.descList[len(r.descList)-1]
|
||||
if !last.full() {
|
||||
delete(r.readers, last.id)
|
||||
}
|
||||
}
|
||||
descList, err := loadIndexData(r.db, r.state)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
r.descList = descList
|
||||
return nil
|
||||
}
|
||||
|
||||
// readGreaterThan locates the first element that is greater than the specified
|
||||
// value. If no such element is found, MaxUint64 is returned.
|
||||
func (r *indexReader) readGreaterThan(id uint64) (uint64, error) {
|
||||
index := sort.Search(len(r.descList), func(i int) bool {
|
||||
return id < r.descList[i].max
|
||||
})
|
||||
if index == len(r.descList) {
|
||||
return math.MaxUint64, nil
|
||||
}
|
||||
desc := r.descList[index]
|
||||
|
||||
br, ok := r.readers[desc.id]
|
||||
if !ok {
|
||||
var (
|
||||
err error
|
||||
blob []byte
|
||||
)
|
||||
if r.state.account {
|
||||
blob = rawdb.ReadAccountHistoryIndexBlock(r.db, r.state.address, desc.id)
|
||||
} else {
|
||||
blob = rawdb.ReadStorageHistoryIndexBlock(r.db, r.state.address, r.state.storageHash, desc.id)
|
||||
}
|
||||
br, err = newBlockReader(blob)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
r.readers[desc.id] = br
|
||||
}
|
||||
// The supplied ID is not greater than block.max, ensuring that an element
|
||||
// satisfying the condition can be found.
|
||||
return br.readGreaterThan(id)
|
||||
}
|
||||
|
||||
// indexWriter is responsible for writing index data for a specific state (either
|
||||
// an account or a storage slot). The state index follows a two-layer structure:
|
||||
// the first layer consists of a list of fixed-size metadata, each linked to a
|
||||
// second-layer block. The index data (monotonically increasing list of state
|
||||
// history ids) is stored in these second-layer index blocks, which are size
|
||||
// limited.
|
||||
type indexWriter struct {
|
||||
descList []*indexBlockDesc // The list of index block descriptions
|
||||
bw *blockWriter // The live index block writer
|
||||
frozen []*blockWriter // The finalized index block writers, waiting for flush
|
||||
lastID uint64 // The ID of the latest tracked history
|
||||
state stateIdent
|
||||
db ethdb.KeyValueReader
|
||||
}
|
||||
|
||||
// newIndexWriter constructs the index writer for the specified state.
|
||||
func newIndexWriter(db ethdb.KeyValueReader, state stateIdent) (*indexWriter, error) {
|
||||
var blob []byte
|
||||
if state.account {
|
||||
blob = rawdb.ReadAccountHistoryIndex(db, state.address)
|
||||
} else {
|
||||
blob = rawdb.ReadStorageHistoryIndex(db, state.address, state.storageHash)
|
||||
}
|
||||
if len(blob) == 0 {
|
||||
desc := newIndexBlockDesc(0)
|
||||
bw, _ := newBlockWriter(nil, desc)
|
||||
return &indexWriter{
|
||||
descList: []*indexBlockDesc{desc},
|
||||
bw: bw,
|
||||
state: state,
|
||||
db: db,
|
||||
}, nil
|
||||
}
|
||||
descList, err := parseIndex(blob)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var (
|
||||
indexBlock []byte
|
||||
lastDesc = descList[len(descList)-1]
|
||||
)
|
||||
if state.account {
|
||||
indexBlock = rawdb.ReadAccountHistoryIndexBlock(db, state.address, lastDesc.id)
|
||||
} else {
|
||||
indexBlock = rawdb.ReadStorageHistoryIndexBlock(db, state.address, state.storageHash, lastDesc.id)
|
||||
}
|
||||
bw, err := newBlockWriter(indexBlock, lastDesc)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &indexWriter{
|
||||
descList: descList,
|
||||
lastID: lastDesc.max,
|
||||
bw: bw,
|
||||
state: state,
|
||||
db: db,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// append adds the new element into the index writer.
|
||||
func (w *indexWriter) append(id uint64) error {
|
||||
if id <= w.lastID {
|
||||
return fmt.Errorf("append element out of order, last: %d, this: %d", w.lastID, id)
|
||||
}
|
||||
if w.bw.full() {
|
||||
if err := w.rotate(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if err := w.bw.append(id); err != nil {
|
||||
return err
|
||||
}
|
||||
w.lastID = id
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// rotate creates a new index block for storing index records from scratch
|
||||
// and caches the current full index block for finalization.
|
||||
func (w *indexWriter) rotate() error {
|
||||
var (
|
||||
err error
|
||||
desc = newIndexBlockDesc(w.bw.desc.id + 1)
|
||||
)
|
||||
w.frozen = append(w.frozen, w.bw)
|
||||
w.bw, err = newBlockWriter(nil, desc)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
w.descList = append(w.descList, desc)
|
||||
return nil
|
||||
}
|
||||
|
||||
// finish finalizes all the frozen index block writers along with the live one
|
||||
// if it's not empty, committing the index block data and the index meta into
|
||||
// the supplied batch.
|
||||
//
|
||||
// This function is safe to be called multiple times.
|
||||
func (w *indexWriter) finish(batch ethdb.Batch) {
|
||||
var (
|
||||
writers = append(w.frozen, w.bw)
|
||||
descList = w.descList
|
||||
)
|
||||
// The live index block writer might be empty if the entire index write
|
||||
// is created from scratch, remove it from committing.
|
||||
if w.bw.empty() {
|
||||
writers = writers[:len(writers)-1]
|
||||
descList = descList[:len(descList)-1]
|
||||
}
|
||||
if len(writers) == 0 {
|
||||
return // nothing to commit
|
||||
}
|
||||
for _, bw := range writers {
|
||||
if w.state.account {
|
||||
rawdb.WriteAccountHistoryIndexBlock(batch, w.state.address, bw.desc.id, bw.finish())
|
||||
} else {
|
||||
rawdb.WriteStorageHistoryIndexBlock(batch, w.state.address, w.state.storageHash, bw.desc.id, bw.finish())
|
||||
}
|
||||
}
|
||||
w.frozen = nil // release all the frozen writers
|
||||
|
||||
buf := make([]byte, 0, indexBlockDescSize*len(descList))
|
||||
for _, desc := range descList {
|
||||
buf = append(buf, desc.encode()...)
|
||||
}
|
||||
if w.state.account {
|
||||
rawdb.WriteAccountHistoryIndex(batch, w.state.address, buf)
|
||||
} else {
|
||||
rawdb.WriteStorageHistoryIndex(batch, w.state.address, w.state.storageHash, buf)
|
||||
}
|
||||
}
|
||||
|
||||
// indexDeleter is responsible for deleting index data for a specific state.
|
||||
type indexDeleter struct {
|
||||
descList []*indexBlockDesc // The list of index block descriptions
|
||||
bw *blockWriter // The live index block writer
|
||||
dropped []uint32 // The list of index block id waiting for deleting
|
||||
lastID uint64 // The ID of the latest tracked history
|
||||
state stateIdent
|
||||
db ethdb.KeyValueReader
|
||||
}
|
||||
|
||||
// newIndexDeleter constructs the index deleter for the specified state.
|
||||
func newIndexDeleter(db ethdb.KeyValueReader, state stateIdent) (*indexDeleter, error) {
|
||||
var blob []byte
|
||||
if state.account {
|
||||
blob = rawdb.ReadAccountHistoryIndex(db, state.address)
|
||||
} else {
|
||||
blob = rawdb.ReadStorageHistoryIndex(db, state.address, state.storageHash)
|
||||
}
|
||||
if len(blob) == 0 {
|
||||
// TODO(rjl493456442) we can probably return an error here,
|
||||
// deleter with no data is meaningless.
|
||||
desc := newIndexBlockDesc(0)
|
||||
bw, _ := newBlockWriter(nil, desc)
|
||||
return &indexDeleter{
|
||||
descList: []*indexBlockDesc{desc},
|
||||
bw: bw,
|
||||
state: state,
|
||||
db: db,
|
||||
}, nil
|
||||
}
|
||||
descList, err := parseIndex(blob)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var (
|
||||
indexBlock []byte
|
||||
lastDesc = descList[len(descList)-1]
|
||||
)
|
||||
if state.account {
|
||||
indexBlock = rawdb.ReadAccountHistoryIndexBlock(db, state.address, lastDesc.id)
|
||||
} else {
|
||||
indexBlock = rawdb.ReadStorageHistoryIndexBlock(db, state.address, state.storageHash, lastDesc.id)
|
||||
}
|
||||
bw, err := newBlockWriter(indexBlock, lastDesc)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &indexDeleter{
|
||||
descList: descList,
|
||||
lastID: lastDesc.max,
|
||||
bw: bw,
|
||||
state: state,
|
||||
db: db,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// empty returns an flag indicating whether the state index is empty.
|
||||
func (d *indexDeleter) empty() bool {
|
||||
return d.bw.empty() && len(d.descList) == 1
|
||||
}
|
||||
|
||||
// pop removes the last written element from the index writer.
|
||||
func (d *indexDeleter) pop(id uint64) error {
|
||||
if id == 0 {
|
||||
return fmt.Errorf("zero history ID is not valid")
|
||||
}
|
||||
if id != d.lastID {
|
||||
return fmt.Errorf("pop element out of order, last: %d, this: %d", d.lastID, id)
|
||||
}
|
||||
if err := d.bw.pop(id); err != nil {
|
||||
return err
|
||||
}
|
||||
if !d.bw.empty() {
|
||||
d.lastID = d.bw.desc.max
|
||||
return nil
|
||||
}
|
||||
// Discarding the last block writer if it becomes empty by popping an element
|
||||
d.dropped = append(d.dropped, d.descList[len(d.descList)-1].id)
|
||||
|
||||
// Reset the entire index writer if it becomes empty after popping an element
|
||||
if d.empty() {
|
||||
d.lastID = 0
|
||||
return nil
|
||||
}
|
||||
d.descList = d.descList[:len(d.descList)-1]
|
||||
|
||||
// Open the previous block writer for deleting
|
||||
var (
|
||||
indexBlock []byte
|
||||
lastDesc = d.descList[len(d.descList)-1]
|
||||
)
|
||||
if d.state.account {
|
||||
indexBlock = rawdb.ReadAccountHistoryIndexBlock(d.db, d.state.address, lastDesc.id)
|
||||
} else {
|
||||
indexBlock = rawdb.ReadStorageHistoryIndexBlock(d.db, d.state.address, d.state.storageHash, lastDesc.id)
|
||||
}
|
||||
bw, err := newBlockWriter(indexBlock, lastDesc)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
d.bw = bw
|
||||
d.lastID = bw.desc.max
|
||||
return nil
|
||||
}
|
||||
|
||||
// finish deletes the empty index blocks and updates the index meta.
|
||||
//
|
||||
// This function is safe to be called multiple times.
|
||||
func (d *indexDeleter) finish(batch ethdb.Batch) {
|
||||
for _, id := range d.dropped {
|
||||
if d.state.account {
|
||||
rawdb.DeleteAccountHistoryIndexBlock(batch, d.state.address, id)
|
||||
} else {
|
||||
rawdb.DeleteStorageHistoryIndexBlock(batch, d.state.address, d.state.storageHash, id)
|
||||
}
|
||||
}
|
||||
d.dropped = nil
|
||||
|
||||
// Flush the content of last block writer, regardless it's dirty or not
|
||||
if !d.bw.empty() {
|
||||
if d.state.account {
|
||||
rawdb.WriteAccountHistoryIndexBlock(batch, d.state.address, d.bw.desc.id, d.bw.finish())
|
||||
} else {
|
||||
rawdb.WriteStorageHistoryIndexBlock(batch, d.state.address, d.state.storageHash, d.bw.desc.id, d.bw.finish())
|
||||
}
|
||||
}
|
||||
// Flush the index metadata into the supplied batch
|
||||
if d.empty() {
|
||||
if d.state.account {
|
||||
rawdb.DeleteAccountHistoryIndex(batch, d.state.address)
|
||||
} else {
|
||||
rawdb.DeleteStorageHistoryIndex(batch, d.state.address, d.state.storageHash)
|
||||
}
|
||||
} else {
|
||||
buf := make([]byte, 0, indexBlockDescSize*len(d.descList))
|
||||
for _, desc := range d.descList {
|
||||
buf = append(buf, desc.encode()...)
|
||||
}
|
||||
if d.state.account {
|
||||
rawdb.WriteAccountHistoryIndex(batch, d.state.address, buf)
|
||||
} else {
|
||||
rawdb.WriteStorageHistoryIndex(batch, d.state.address, d.state.storageHash, buf)
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,380 @@
|
|||
// Copyright 2025 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/
|
||||
|
||||
package pathdb
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"sort"
|
||||
)
|
||||
|
||||
const (
|
||||
indexBlockDescSize = 24 // The size of index block descriptor
|
||||
indexBlockEntriesCap = 4096 // The maximum number of entries can be grouped in a block
|
||||
indexBlockRestartLen = 256 // The restart interval length of index block
|
||||
historyIndexBatch = 65536 // The number of state histories for constructing or deleting indexes together
|
||||
)
|
||||
|
||||
// indexBlockDesc represents a descriptor for an index block, which contains a
|
||||
// list of state mutation records associated with a specific state (either an
|
||||
// account or a storage slot).
|
||||
type indexBlockDesc struct {
|
||||
min uint64 // The minimum state ID retained within the block
|
||||
max uint64 // The maximum state ID retained within the block
|
||||
entries uint32 // The number of state mutation records retained within the block
|
||||
id uint32 // The id of the index block
|
||||
}
|
||||
|
||||
func newIndexBlockDesc(id uint32) *indexBlockDesc {
|
||||
return &indexBlockDesc{id: id}
|
||||
}
|
||||
|
||||
// empty indicates whether the block is empty with no element retained.
|
||||
func (d *indexBlockDesc) empty() bool {
|
||||
return d.entries == 0
|
||||
}
|
||||
|
||||
// full indicates whether the number of elements in the block exceeds the
|
||||
// preconfigured limit.
|
||||
func (d *indexBlockDesc) full() bool {
|
||||
return d.entries >= indexBlockEntriesCap
|
||||
}
|
||||
|
||||
// encode packs index block descriptor into byte stream.
|
||||
func (d *indexBlockDesc) encode() []byte {
|
||||
var buf [indexBlockDescSize]byte
|
||||
binary.BigEndian.PutUint64(buf[:8], d.min)
|
||||
binary.BigEndian.PutUint64(buf[8:16], d.max)
|
||||
binary.BigEndian.PutUint32(buf[16:20], d.entries)
|
||||
binary.BigEndian.PutUint32(buf[20:24], d.id)
|
||||
return buf[:]
|
||||
}
|
||||
|
||||
// decode unpacks index block descriptor from byte stream.
|
||||
func (d *indexBlockDesc) decode(blob []byte) {
|
||||
d.min = binary.BigEndian.Uint64(blob[:8])
|
||||
d.max = binary.BigEndian.Uint64(blob[8:16])
|
||||
d.entries = binary.BigEndian.Uint32(blob[16:20])
|
||||
d.id = binary.BigEndian.Uint32(blob[20:24])
|
||||
}
|
||||
|
||||
// parseIndexBlock parses the index block with the supplied byte stream.
|
||||
// The index block format can be illustrated as below:
|
||||
//
|
||||
// +---->+------------------+
|
||||
// | | Chunk1 |
|
||||
// | +------------------+
|
||||
// | | ...... |
|
||||
// | +-->+------------------+
|
||||
// | | | ChunkN |
|
||||
// | | +------------------+
|
||||
// +-|---| Restart1 |
|
||||
// | | Restart... | 4N bytes
|
||||
// +---| RestartN |
|
||||
// +------------------+
|
||||
// | Restart count | 4 bytes
|
||||
// +------------------+
|
||||
//
|
||||
// - Chunk list: A list of data chunks
|
||||
// - Restart list: A list of 4-byte pointers, each pointing to the start position of a chunk
|
||||
// - Restart count: The number of restarts in the block, stored at the end of the block (4 bytes)
|
||||
//
|
||||
// Each chunk begins with the full value of the first integer, followed by
|
||||
// subsequent integers representing the differences between the current value
|
||||
// and the preceding one. Integers are encoded with variable-size for best
|
||||
// storage efficiency.
|
||||
//
|
||||
// Empty index block is regarded as invalid.
|
||||
func parseIndexBlock(blob []byte) ([]uint32, []byte, error) {
|
||||
if len(blob) < 4 {
|
||||
return nil, nil, fmt.Errorf("corrupted index block, len: %d", len(blob))
|
||||
}
|
||||
restartLen := binary.BigEndian.Uint32(blob[len(blob)-4:])
|
||||
if restartLen == 0 {
|
||||
return nil, nil, errors.New("corrupted index block, no restart")
|
||||
}
|
||||
tailLen := int(restartLen+1) * 4
|
||||
if len(blob) < tailLen {
|
||||
return nil, nil, fmt.Errorf("truncated restarts, size: %d, restarts: %d", len(blob), restartLen)
|
||||
}
|
||||
restarts := make([]uint32, 0, restartLen)
|
||||
for i := restartLen; i > 0; i-- {
|
||||
restart := binary.BigEndian.Uint32(blob[len(blob)-int(i+1)*4:])
|
||||
restarts = append(restarts, restart)
|
||||
}
|
||||
var prev uint32
|
||||
for i := 0; i < len(restarts); i++ {
|
||||
if i != 0 {
|
||||
if restarts[i] <= prev {
|
||||
return nil, nil, fmt.Errorf("restart out of order, prev: %d, next: %d", prev, restarts[i])
|
||||
}
|
||||
}
|
||||
if int(restarts[i]) >= len(blob)-tailLen {
|
||||
return nil, nil, fmt.Errorf("invalid restart position, restart: %d, size: %d", restarts[i], len(blob)-tailLen)
|
||||
}
|
||||
prev = restarts[i]
|
||||
}
|
||||
return restarts, blob[:len(blob)-tailLen], nil
|
||||
}
|
||||
|
||||
// blockReader is the reader to access the element within a block.
|
||||
type blockReader struct {
|
||||
restarts []uint32
|
||||
data []byte
|
||||
}
|
||||
|
||||
// newBlockReader constructs the block reader with the supplied block data.
|
||||
func newBlockReader(blob []byte) (*blockReader, error) {
|
||||
restarts, data, err := parseIndexBlock(blob)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &blockReader{
|
||||
restarts: restarts,
|
||||
data: data, // safe to own the slice
|
||||
}, nil
|
||||
}
|
||||
|
||||
// readGreaterThan locates the first element in the block that is greater than
|
||||
// the specified value. If no such element is found, MaxUint64 is returned.
|
||||
func (br *blockReader) readGreaterThan(id uint64) (uint64, error) {
|
||||
var err error
|
||||
index := sort.Search(len(br.restarts), func(i int) bool {
|
||||
item, n := binary.Uvarint(br.data[br.restarts[i]:])
|
||||
if n <= 0 {
|
||||
err = fmt.Errorf("failed to decode item at restart %d", br.restarts[i])
|
||||
}
|
||||
return item > id
|
||||
})
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if index == 0 {
|
||||
item, _ := binary.Uvarint(br.data[br.restarts[0]:])
|
||||
return item, nil
|
||||
}
|
||||
var (
|
||||
start int
|
||||
limit int
|
||||
result uint64
|
||||
)
|
||||
if index == len(br.restarts) {
|
||||
// The element being searched falls within the last restart section,
|
||||
// there is no guarantee such element can be found.
|
||||
start = int(br.restarts[len(br.restarts)-1])
|
||||
limit = len(br.data)
|
||||
} else {
|
||||
// The element being searched falls within the non-last restart section,
|
||||
// such element can be found for sure.
|
||||
start = int(br.restarts[index-1])
|
||||
limit = int(br.restarts[index])
|
||||
}
|
||||
pos := start
|
||||
for pos < limit {
|
||||
x, n := binary.Uvarint(br.data[pos:])
|
||||
if pos == start {
|
||||
result = x
|
||||
} else {
|
||||
result += x
|
||||
}
|
||||
if result > id {
|
||||
return result, nil
|
||||
}
|
||||
pos += n
|
||||
}
|
||||
if index == len(br.restarts) {
|
||||
return math.MaxUint64, nil
|
||||
}
|
||||
item, _ := binary.Uvarint(br.data[br.restarts[index]:])
|
||||
return item, nil
|
||||
}
|
||||
|
||||
type blockWriter struct {
|
||||
desc *indexBlockDesc
|
||||
restarts []uint32
|
||||
scratch []byte
|
||||
data []byte
|
||||
}
|
||||
|
||||
func newBlockWriter(blob []byte, desc *indexBlockDesc) (*blockWriter, error) {
|
||||
scratch := make([]byte, binary.MaxVarintLen64)
|
||||
if len(blob) == 0 {
|
||||
return &blockWriter{
|
||||
desc: desc,
|
||||
scratch: scratch,
|
||||
data: make([]byte, 0, 1024),
|
||||
}, nil
|
||||
}
|
||||
restarts, data, err := parseIndexBlock(blob)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &blockWriter{
|
||||
desc: desc,
|
||||
restarts: restarts,
|
||||
scratch: scratch,
|
||||
data: data, // safe to own the slice
|
||||
}, nil
|
||||
}
|
||||
|
||||
// append adds a new element to the block. The new element must be greater than
|
||||
// the previously one.
|
||||
func (b *blockWriter) append(id uint64) error {
|
||||
if id == 0 {
|
||||
return errors.New("invalid zero id")
|
||||
}
|
||||
if id <= b.desc.max {
|
||||
return fmt.Errorf("append element out of order, last: %d, this: %d", b.desc.max, id)
|
||||
}
|
||||
if b.desc.entries%indexBlockRestartLen == 0 {
|
||||
// Rotate the current restart range if it's full
|
||||
b.restarts = append(b.restarts, uint32(len(b.data)))
|
||||
|
||||
// The restart point item can either be encoded in variable
|
||||
// size or fixed size. Although variable-size encoding is
|
||||
// slightly slower (2ns per operation), it is still relatively
|
||||
// fast, therefore, it's picked for better space efficiency.
|
||||
//
|
||||
// The first element in a restart range is encoded using its
|
||||
// full value.
|
||||
n := binary.PutUvarint(b.scratch[0:], id)
|
||||
b.data = append(b.data, b.scratch[:n]...)
|
||||
} else {
|
||||
// Non-head elements within a restart range are encoded using
|
||||
// their difference from the preceding element.
|
||||
n := binary.PutUvarint(b.scratch[0:], id-b.desc.max)
|
||||
b.data = append(b.data, b.scratch[:n]...)
|
||||
}
|
||||
b.desc.entries++
|
||||
|
||||
// The state history ID must be greater than 0.
|
||||
if b.desc.min == 0 {
|
||||
b.desc.min = id
|
||||
}
|
||||
b.desc.max = id
|
||||
return nil
|
||||
}
|
||||
|
||||
// scanSection traverses the specified section and terminates if fn returns true.
|
||||
func (b *blockWriter) scanSection(section int, fn func(uint64, int) bool) {
|
||||
var (
|
||||
value uint64
|
||||
start = int(b.restarts[section])
|
||||
pos = start
|
||||
limit int
|
||||
)
|
||||
if section == len(b.restarts)-1 {
|
||||
limit = len(b.data)
|
||||
} else {
|
||||
limit = int(b.restarts[section+1])
|
||||
}
|
||||
for pos < limit {
|
||||
x, n := binary.Uvarint(b.data[pos:])
|
||||
if pos == start {
|
||||
value = x
|
||||
} else {
|
||||
value += x
|
||||
}
|
||||
if fn(value, pos) {
|
||||
return
|
||||
}
|
||||
pos += n
|
||||
}
|
||||
}
|
||||
|
||||
// sectionLast returns the last element in the specified section.
|
||||
func (b *blockWriter) sectionLast(section int) uint64 {
|
||||
var n uint64
|
||||
b.scanSection(section, func(v uint64, _ int) bool {
|
||||
n = v
|
||||
return false
|
||||
})
|
||||
return n
|
||||
}
|
||||
|
||||
// sectionSearch looks up the specified value in the given section,
|
||||
// the position and the preceding value will be returned if found.
|
||||
func (b *blockWriter) sectionSearch(section int, n uint64) (prev uint64, pos int) {
|
||||
b.scanSection(section, func(v uint64, p int) bool {
|
||||
if n == v {
|
||||
pos = p
|
||||
return true
|
||||
}
|
||||
prev = v
|
||||
return false
|
||||
})
|
||||
return prev, pos
|
||||
}
|
||||
|
||||
// pop removes the last element from the block. The assumption is held that block
|
||||
// writer must be non-empty.
|
||||
func (b *blockWriter) pop(id uint64) error {
|
||||
if id == 0 {
|
||||
return errors.New("invalid zero id")
|
||||
}
|
||||
if id != b.desc.max {
|
||||
return fmt.Errorf("pop element out of order, last: %d, this: %d", b.desc.max, id)
|
||||
}
|
||||
// If there is only one entry left, the entire block should be reset
|
||||
if b.desc.entries == 1 {
|
||||
b.desc.min = 0
|
||||
b.desc.max = 0
|
||||
b.desc.entries = 0
|
||||
b.restarts = nil
|
||||
b.data = b.data[:0]
|
||||
return nil
|
||||
}
|
||||
// Pop the last restart section if the section becomes empty after removing
|
||||
// one element.
|
||||
if b.desc.entries%indexBlockRestartLen == 1 {
|
||||
b.data = b.data[:b.restarts[len(b.restarts)-1]]
|
||||
b.restarts = b.restarts[:len(b.restarts)-1]
|
||||
b.desc.max = b.sectionLast(len(b.restarts) - 1)
|
||||
b.desc.entries -= 1
|
||||
return nil
|
||||
}
|
||||
// Look up the element preceding the one to be popped, in order to update
|
||||
// the maximum element in the block.
|
||||
prev, pos := b.sectionSearch(len(b.restarts)-1, id)
|
||||
b.desc.max = prev
|
||||
b.data = b.data[:pos]
|
||||
b.desc.entries -= 1
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *blockWriter) empty() bool {
|
||||
return b.desc.empty()
|
||||
}
|
||||
|
||||
func (b *blockWriter) full() bool {
|
||||
return b.desc.full()
|
||||
}
|
||||
|
||||
// finish finalizes the index block encoding by appending the encoded restart points
|
||||
// and the restart counter to the end of the block.
|
||||
//
|
||||
// This function is safe to be called multiple times.
|
||||
func (b *blockWriter) finish() []byte {
|
||||
var buf []byte
|
||||
for _, number := range append(b.restarts, uint32(len(b.restarts))) {
|
||||
binary.BigEndian.PutUint32(b.scratch[:4], number)
|
||||
buf = append(buf, b.scratch[:4]...)
|
||||
}
|
||||
return append(b.data, buf...)
|
||||
}
|
|
@ -0,0 +1,217 @@
|
|||
// Copyright 2025 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/
|
||||
|
||||
package pathdb
|
||||
|
||||
import (
|
||||
"math"
|
||||
"math/rand"
|
||||
"slices"
|
||||
"sort"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestBlockReaderBasic(t *testing.T) {
|
||||
elements := []uint64{
|
||||
1, 5, 10, 11, 20,
|
||||
}
|
||||
bw, _ := newBlockWriter(nil, newIndexBlockDesc(0))
|
||||
for i := 0; i < len(elements); i++ {
|
||||
bw.append(elements[i])
|
||||
}
|
||||
|
||||
br, err := newBlockReader(bw.finish())
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to construct the block reader, %v", err)
|
||||
}
|
||||
cases := []struct {
|
||||
value uint64
|
||||
result uint64
|
||||
}{
|
||||
{0, 1},
|
||||
{1, 5},
|
||||
{10, 11},
|
||||
{19, 20},
|
||||
{20, math.MaxUint64},
|
||||
{21, math.MaxUint64},
|
||||
}
|
||||
for _, c := range cases {
|
||||
got, err := br.readGreaterThan(c.value)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error, got %v", err)
|
||||
}
|
||||
if got != c.result {
|
||||
t.Fatalf("Unexpected result, got %v, wanted %v", got, c.result)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestBlockReaderLarge(t *testing.T) {
|
||||
var elements []uint64
|
||||
for i := 0; i < 1000; i++ {
|
||||
elements = append(elements, rand.Uint64())
|
||||
}
|
||||
slices.Sort(elements)
|
||||
|
||||
bw, _ := newBlockWriter(nil, newIndexBlockDesc(0))
|
||||
for i := 0; i < len(elements); i++ {
|
||||
bw.append(elements[i])
|
||||
}
|
||||
|
||||
br, err := newBlockReader(bw.finish())
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to construct the block reader, %v", err)
|
||||
}
|
||||
for i := 0; i < 100; i++ {
|
||||
value := rand.Uint64()
|
||||
pos := sort.Search(len(elements), func(i int) bool {
|
||||
return elements[i] > value
|
||||
})
|
||||
got, err := br.readGreaterThan(value)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error, got %v", err)
|
||||
}
|
||||
if pos == len(elements) {
|
||||
if got != math.MaxUint64 {
|
||||
t.Fatalf("Unexpected result, got %d, wanted math.MaxUint64", got)
|
||||
}
|
||||
} else if got != elements[pos] {
|
||||
t.Fatalf("Unexpected result, got %d, wanted %d", got, elements[pos])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestBlockWriterBasic(t *testing.T) {
|
||||
bw, _ := newBlockWriter(nil, newIndexBlockDesc(0))
|
||||
if !bw.empty() {
|
||||
t.Fatal("expected empty block")
|
||||
}
|
||||
bw.append(2)
|
||||
if err := bw.append(1); err == nil {
|
||||
t.Fatal("out-of-order insertion is not expected")
|
||||
}
|
||||
for i := 0; i < 10; i++ {
|
||||
bw.append(uint64(i + 3))
|
||||
}
|
||||
|
||||
bw, err := newBlockWriter(bw.finish(), newIndexBlockDesc(0))
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to construct the block writer, %v", err)
|
||||
}
|
||||
for i := 0; i < 10; i++ {
|
||||
if err := bw.append(uint64(i + 100)); err != nil {
|
||||
t.Fatalf("Failed to append value %d: %v", i, err)
|
||||
}
|
||||
}
|
||||
bw.finish()
|
||||
}
|
||||
|
||||
func TestBlockWriterDelete(t *testing.T) {
|
||||
bw, _ := newBlockWriter(nil, newIndexBlockDesc(0))
|
||||
for i := 0; i < 10; i++ {
|
||||
bw.append(uint64(i + 1))
|
||||
}
|
||||
// Pop unknown id, the request should be rejected
|
||||
if err := bw.pop(100); err == nil {
|
||||
t.Fatal("Expect error to occur for unknown id")
|
||||
}
|
||||
for i := 10; i >= 1; i-- {
|
||||
if err := bw.pop(uint64(i)); err != nil {
|
||||
t.Fatalf("Unexpected error for element popping, %v", err)
|
||||
}
|
||||
empty := i == 1
|
||||
if empty != bw.empty() {
|
||||
t.Fatalf("Emptiness is not matched, want: %T, got: %T", empty, bw.empty())
|
||||
}
|
||||
newMax := uint64(i - 1)
|
||||
if bw.desc.max != newMax {
|
||||
t.Fatalf("Maxmium element is not matched, want: %d, got: %d", newMax, bw.desc.max)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestBlcokWriterDeleteWithData(t *testing.T) {
|
||||
elements := []uint64{
|
||||
1, 5, 10, 11, 20,
|
||||
}
|
||||
bw, _ := newBlockWriter(nil, newIndexBlockDesc(0))
|
||||
for i := 0; i < len(elements); i++ {
|
||||
bw.append(elements[i])
|
||||
}
|
||||
|
||||
// Re-construct the block writer with data
|
||||
desc := &indexBlockDesc{
|
||||
id: 0,
|
||||
min: 1,
|
||||
max: 20,
|
||||
entries: 5,
|
||||
}
|
||||
bw, err := newBlockWriter(bw.finish(), desc)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to construct block writer %v", err)
|
||||
}
|
||||
for i := len(elements) - 1; i > 0; i-- {
|
||||
if err := bw.pop(elements[i]); err != nil {
|
||||
t.Fatalf("Failed to pop element, %v", err)
|
||||
}
|
||||
newTail := elements[i-1]
|
||||
|
||||
// Ensure the element can still be queried with no issue
|
||||
br, err := newBlockReader(bw.finish())
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to construct the block reader, %v", err)
|
||||
}
|
||||
cases := []struct {
|
||||
value uint64
|
||||
result uint64
|
||||
}{
|
||||
{0, 1},
|
||||
{1, 5},
|
||||
{10, 11},
|
||||
{19, 20},
|
||||
{20, math.MaxUint64},
|
||||
{21, math.MaxUint64},
|
||||
}
|
||||
for _, c := range cases {
|
||||
want := c.result
|
||||
if c.value >= newTail {
|
||||
want = math.MaxUint64
|
||||
}
|
||||
got, err := br.readGreaterThan(c.value)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error, got %v", err)
|
||||
}
|
||||
if got != want {
|
||||
t.Fatalf("Unexpected result, got %v, wanted %v", got, want)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestCorruptedIndexBlock(t *testing.T) {
|
||||
bw, _ := newBlockWriter(nil, newIndexBlockDesc(0))
|
||||
for i := 0; i < 10; i++ {
|
||||
bw.append(uint64(i + 1))
|
||||
}
|
||||
buf := bw.finish()
|
||||
|
||||
// Mutate the buffer manually
|
||||
buf[len(buf)-1]++
|
||||
_, err := newBlockWriter(buf, newIndexBlockDesc(0))
|
||||
if err == nil {
|
||||
t.Fatal("Corrupted index block data is not detected")
|
||||
}
|
||||
}
|
|
@ -0,0 +1,290 @@
|
|||
// Copyright 2025 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/
|
||||
|
||||
package pathdb
|
||||
|
||||
import (
|
||||
"math"
|
||||
"math/rand"
|
||||
"slices"
|
||||
"sort"
|
||||
"testing"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/core/rawdb"
|
||||
)
|
||||
|
||||
func TestIndexReaderBasic(t *testing.T) {
|
||||
elements := []uint64{
|
||||
1, 5, 10, 11, 20,
|
||||
}
|
||||
db := rawdb.NewMemoryDatabase()
|
||||
bw, _ := newIndexWriter(db, newAccountIdent(common.Address{0xa}))
|
||||
for i := 0; i < len(elements); i++ {
|
||||
bw.append(elements[i])
|
||||
}
|
||||
batch := db.NewBatch()
|
||||
bw.finish(batch)
|
||||
batch.Write()
|
||||
|
||||
br, err := newIndexReader(db, newAccountIdent(common.Address{0xa}))
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to construct the index reader, %v", err)
|
||||
}
|
||||
cases := []struct {
|
||||
value uint64
|
||||
result uint64
|
||||
}{
|
||||
{0, 1},
|
||||
{1, 5},
|
||||
{10, 11},
|
||||
{19, 20},
|
||||
{20, math.MaxUint64},
|
||||
{21, math.MaxUint64},
|
||||
}
|
||||
for _, c := range cases {
|
||||
got, err := br.readGreaterThan(c.value)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error, got %v", err)
|
||||
}
|
||||
if got != c.result {
|
||||
t.Fatalf("Unexpected result, got %v, wanted %v", got, c.result)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestIndexReaderLarge(t *testing.T) {
|
||||
var elements []uint64
|
||||
for i := 0; i < 10*indexBlockEntriesCap; i++ {
|
||||
elements = append(elements, rand.Uint64())
|
||||
}
|
||||
slices.Sort(elements)
|
||||
|
||||
db := rawdb.NewMemoryDatabase()
|
||||
bw, _ := newIndexWriter(db, newAccountIdent(common.Address{0xa}))
|
||||
for i := 0; i < len(elements); i++ {
|
||||
bw.append(elements[i])
|
||||
}
|
||||
batch := db.NewBatch()
|
||||
bw.finish(batch)
|
||||
batch.Write()
|
||||
|
||||
br, err := newIndexReader(db, newAccountIdent(common.Address{0xa}))
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to construct the index reader, %v", err)
|
||||
}
|
||||
for i := 0; i < 100; i++ {
|
||||
value := rand.Uint64()
|
||||
pos := sort.Search(len(elements), func(i int) bool {
|
||||
return elements[i] > value
|
||||
})
|
||||
got, err := br.readGreaterThan(value)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error, got %v", err)
|
||||
}
|
||||
if pos == len(elements) {
|
||||
if got != math.MaxUint64 {
|
||||
t.Fatalf("Unexpected result, got %d, wanted math.MaxUint64", got)
|
||||
}
|
||||
} else if got != elements[pos] {
|
||||
t.Fatalf("Unexpected result, got %d, wanted %d", got, elements[pos])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestEmptyIndexReader(t *testing.T) {
|
||||
br, err := newIndexReader(rawdb.NewMemoryDatabase(), newAccountIdent(common.Address{0xa}))
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to construct the index reader, %v", err)
|
||||
}
|
||||
res, err := br.readGreaterThan(100)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to query, %v", err)
|
||||
}
|
||||
if res != math.MaxUint64 {
|
||||
t.Fatalf("Unexpected result, got %d, wanted math.MaxUint64", res)
|
||||
}
|
||||
}
|
||||
|
||||
func TestIndexWriterBasic(t *testing.T) {
|
||||
db := rawdb.NewMemoryDatabase()
|
||||
iw, _ := newIndexWriter(db, newAccountIdent(common.Address{0xa}))
|
||||
iw.append(2)
|
||||
if err := iw.append(1); err == nil {
|
||||
t.Fatal("out-of-order insertion is not expected")
|
||||
}
|
||||
for i := 0; i < 10; i++ {
|
||||
iw.append(uint64(i + 3))
|
||||
}
|
||||
batch := db.NewBatch()
|
||||
iw.finish(batch)
|
||||
batch.Write()
|
||||
|
||||
iw, err := newIndexWriter(db, newAccountIdent(common.Address{0xa}))
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to construct the block writer, %v", err)
|
||||
}
|
||||
for i := 0; i < 10; i++ {
|
||||
if err := iw.append(uint64(i + 100)); err != nil {
|
||||
t.Fatalf("Failed to append item, %v", err)
|
||||
}
|
||||
}
|
||||
iw.finish(db.NewBatch())
|
||||
}
|
||||
|
||||
func TestIndexWriterDelete(t *testing.T) {
|
||||
db := rawdb.NewMemoryDatabase()
|
||||
iw, _ := newIndexWriter(db, newAccountIdent(common.Address{0xa}))
|
||||
for i := 0; i < indexBlockEntriesCap*4; i++ {
|
||||
iw.append(uint64(i + 1))
|
||||
}
|
||||
batch := db.NewBatch()
|
||||
iw.finish(batch)
|
||||
batch.Write()
|
||||
|
||||
// Delete unknown id, the request should be rejected
|
||||
id, _ := newIndexDeleter(db, newAccountIdent(common.Address{0xa}))
|
||||
if err := id.pop(indexBlockEntriesCap * 5); err == nil {
|
||||
t.Fatal("Expect error to occur for unknown id")
|
||||
}
|
||||
for i := indexBlockEntriesCap * 4; i >= 1; i-- {
|
||||
if err := id.pop(uint64(i)); err != nil {
|
||||
t.Fatalf("Unexpected error for element popping, %v", err)
|
||||
}
|
||||
if id.lastID != uint64(i-1) {
|
||||
t.Fatalf("Unexpected lastID, want: %d, got: %d", uint64(i-1), iw.lastID)
|
||||
}
|
||||
if rand.Intn(10) == 0 {
|
||||
batch := db.NewBatch()
|
||||
id.finish(batch)
|
||||
batch.Write()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestBatchIndexerWrite(t *testing.T) {
|
||||
var (
|
||||
db = rawdb.NewMemoryDatabase()
|
||||
batch = newBatchIndexer(db, false)
|
||||
histories = makeHistories(10)
|
||||
)
|
||||
for i, h := range histories {
|
||||
if err := batch.process(h, uint64(i+1)); err != nil {
|
||||
t.Fatalf("Failed to process history, %v", err)
|
||||
}
|
||||
}
|
||||
if err := batch.finish(true); err != nil {
|
||||
t.Fatalf("Failed to finish batch indexer, %v", err)
|
||||
}
|
||||
indexed := rawdb.ReadLastStateHistoryIndex(db)
|
||||
if indexed == nil || *indexed != uint64(10) {
|
||||
t.Fatal("Unexpected index position")
|
||||
}
|
||||
var (
|
||||
accounts = make(map[common.Address][]uint64)
|
||||
storages = make(map[common.Address]map[common.Hash][]uint64)
|
||||
)
|
||||
for i, h := range histories {
|
||||
for _, addr := range h.accountList {
|
||||
accounts[addr] = append(accounts[addr], uint64(i+1))
|
||||
|
||||
if _, ok := storages[addr]; !ok {
|
||||
storages[addr] = make(map[common.Hash][]uint64)
|
||||
}
|
||||
for _, slot := range h.storageList[addr] {
|
||||
storages[addr][slot] = append(storages[addr][slot], uint64(i+1))
|
||||
}
|
||||
}
|
||||
}
|
||||
for addr, indexes := range accounts {
|
||||
ir, _ := newIndexReader(db, newAccountIdent(addr))
|
||||
for i := 0; i < len(indexes)-1; i++ {
|
||||
n, err := ir.readGreaterThan(indexes[i])
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to read index, %v", err)
|
||||
}
|
||||
if n != indexes[i+1] {
|
||||
t.Fatalf("Unexpected result, want %d, got %d", indexes[i+1], n)
|
||||
}
|
||||
}
|
||||
n, err := ir.readGreaterThan(indexes[len(indexes)-1])
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to read index, %v", err)
|
||||
}
|
||||
if n != math.MaxUint64 {
|
||||
t.Fatalf("Unexpected result, want math.MaxUint64, got %d", n)
|
||||
}
|
||||
}
|
||||
for addr, slots := range storages {
|
||||
for slotHash, indexes := range slots {
|
||||
ir, _ := newIndexReader(db, newStorageIdent(addr, slotHash))
|
||||
for i := 0; i < len(indexes)-1; i++ {
|
||||
n, err := ir.readGreaterThan(indexes[i])
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to read index, %v", err)
|
||||
}
|
||||
if n != indexes[i+1] {
|
||||
t.Fatalf("Unexpected result, want %d, got %d", indexes[i+1], n)
|
||||
}
|
||||
}
|
||||
n, err := ir.readGreaterThan(indexes[len(indexes)-1])
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to read index, %v", err)
|
||||
}
|
||||
if n != math.MaxUint64 {
|
||||
t.Fatalf("Unexpected result, want math.MaxUint64, got %d", n)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestBatchIndexerDelete(t *testing.T) {
|
||||
var (
|
||||
db = rawdb.NewMemoryDatabase()
|
||||
bw = newBatchIndexer(db, false)
|
||||
histories = makeHistories(10)
|
||||
)
|
||||
// Index histories
|
||||
for i, h := range histories {
|
||||
if err := bw.process(h, uint64(i+1)); err != nil {
|
||||
t.Fatalf("Failed to process history, %v", err)
|
||||
}
|
||||
}
|
||||
if err := bw.finish(true); err != nil {
|
||||
t.Fatalf("Failed to finish batch indexer, %v", err)
|
||||
}
|
||||
|
||||
// Unindex histories
|
||||
bd := newBatchIndexer(db, true)
|
||||
for i := len(histories) - 1; i >= 0; i-- {
|
||||
if err := bd.process(histories[i], uint64(i+1)); err != nil {
|
||||
t.Fatalf("Failed to process history, %v", err)
|
||||
}
|
||||
}
|
||||
if err := bd.finish(true); err != nil {
|
||||
t.Fatalf("Failed to finish batch indexer, %v", err)
|
||||
}
|
||||
|
||||
indexed := rawdb.ReadLastStateHistoryIndex(db)
|
||||
if indexed != nil {
|
||||
t.Fatal("Unexpected index position")
|
||||
}
|
||||
it := db.NewIterator(rawdb.StateHistoryIndexPrefix, nil)
|
||||
for it.Next() {
|
||||
t.Fatal("Leftover history index data")
|
||||
}
|
||||
it.Release()
|
||||
}
|
|
@ -0,0 +1,508 @@
|
|||
// Copyright 2025 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/
|
||||
|
||||
package pathdb
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/core/rawdb"
|
||||
"github.com/ethereum/go-ethereum/crypto"
|
||||
"github.com/ethereum/go-ethereum/ethdb"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
)
|
||||
|
||||
// The batch size for reading state histories
|
||||
const historyReadBatch = 1000
|
||||
|
||||
// batchIndexer is a structure designed to perform batch indexing or unindexing
|
||||
// of state histories atomically.
|
||||
type batchIndexer struct {
|
||||
accounts map[common.Address][]uint64 // History ID list, Keyed by account address
|
||||
storages map[common.Address]map[common.Hash][]uint64 // History ID list, Keyed by account address and the hash of raw storage key
|
||||
counter int // The counter of processed states
|
||||
delete bool // Index or unindex mode
|
||||
lastID uint64 // The ID of latest processed history
|
||||
db ethdb.KeyValueStore
|
||||
}
|
||||
|
||||
// newBatchIndexer constructs the batch indexer with the supplied mode.
|
||||
func newBatchIndexer(db ethdb.KeyValueStore, delete bool) *batchIndexer {
|
||||
return &batchIndexer{
|
||||
accounts: make(map[common.Address][]uint64),
|
||||
storages: make(map[common.Address]map[common.Hash][]uint64),
|
||||
delete: delete,
|
||||
db: db,
|
||||
}
|
||||
}
|
||||
|
||||
// process iterates through the accounts and their associated storage slots in the
|
||||
// state history, tracking the mapping between state and history IDs.
|
||||
func (b *batchIndexer) process(h *history, historyID uint64) error {
|
||||
buf := crypto.NewKeccakState()
|
||||
for _, address := range h.accountList {
|
||||
b.counter += 1
|
||||
b.accounts[address] = append(b.accounts[address], historyID)
|
||||
|
||||
for _, slotKey := range h.storageList[address] {
|
||||
b.counter += 1
|
||||
if _, ok := b.storages[address]; !ok {
|
||||
b.storages[address] = make(map[common.Hash][]uint64)
|
||||
}
|
||||
// The hash of the storage slot key is used as the identifier because the
|
||||
// legacy history does not include the raw storage key, therefore, the
|
||||
// conversion from storage key to hash is necessary for non-v0 histories.
|
||||
slotHash := slotKey
|
||||
if h.meta.version != stateHistoryV0 {
|
||||
slotHash = crypto.HashData(buf, slotKey.Bytes())
|
||||
}
|
||||
b.storages[address][slotHash] = append(b.storages[address][slotHash], historyID)
|
||||
}
|
||||
}
|
||||
b.lastID = historyID
|
||||
return b.finish(false)
|
||||
}
|
||||
|
||||
// finish writes the accumulated state indexes into the disk if either the
|
||||
// memory limitation is reached or it's requested forcibly.
|
||||
func (b *batchIndexer) finish(force bool) error {
|
||||
if b.counter == 0 {
|
||||
return nil
|
||||
}
|
||||
if !force && b.counter < historyIndexBatch {
|
||||
return nil
|
||||
}
|
||||
batch := b.db.NewBatch()
|
||||
for address, idList := range b.accounts {
|
||||
if !b.delete {
|
||||
iw, err := newIndexWriter(b.db, newAccountIdent(address))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, n := range idList {
|
||||
if err := iw.append(n); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
iw.finish(batch)
|
||||
} else {
|
||||
id, err := newIndexDeleter(b.db, newAccountIdent(address))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, n := range idList {
|
||||
if err := id.pop(n); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
id.finish(batch)
|
||||
}
|
||||
}
|
||||
for address, slots := range b.storages {
|
||||
for storageHash, idList := range slots {
|
||||
if !b.delete {
|
||||
iw, err := newIndexWriter(b.db, newStorageIdent(address, storageHash))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, n := range idList {
|
||||
if err := iw.append(n); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
iw.finish(batch)
|
||||
} else {
|
||||
id, err := newIndexDeleter(b.db, newStorageIdent(address, storageHash))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, n := range idList {
|
||||
if err := id.pop(n); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
id.finish(batch)
|
||||
}
|
||||
}
|
||||
}
|
||||
// Update the position of last indexed state history
|
||||
if !b.delete {
|
||||
rawdb.WriteLastStateHistoryIndex(batch, b.lastID)
|
||||
} else {
|
||||
if b.lastID == 1 {
|
||||
rawdb.DeleteLastStateHistoryIndex(batch)
|
||||
} else {
|
||||
rawdb.WriteLastStateHistoryIndex(batch, b.lastID-1)
|
||||
}
|
||||
}
|
||||
if err := batch.Write(); err != nil {
|
||||
return err
|
||||
}
|
||||
b.counter = 0
|
||||
b.accounts = make(map[common.Address][]uint64)
|
||||
b.storages = make(map[common.Address]map[common.Hash][]uint64)
|
||||
return nil
|
||||
}
|
||||
|
||||
// indexSingle processes the state history with the specified ID for indexing.
|
||||
func indexSingle(historyID uint64, db ethdb.KeyValueStore, freezer ethdb.AncientReader) error {
|
||||
defer func(start time.Time) {
|
||||
indexHistoryTimer.UpdateSince(start)
|
||||
}(time.Now())
|
||||
|
||||
indexed := rawdb.ReadLastStateHistoryIndex(db)
|
||||
if indexed == nil || *indexed+1 != historyID {
|
||||
last := "null"
|
||||
if indexed != nil {
|
||||
last = fmt.Sprintf("%v", *indexed)
|
||||
}
|
||||
return fmt.Errorf("history indexing is out of order, last: %s, requested: %d", last, historyID)
|
||||
}
|
||||
h, err := readHistory(freezer, historyID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
b := newBatchIndexer(db, false)
|
||||
if err := b.process(h, historyID); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := b.finish(true); err != nil {
|
||||
return err
|
||||
}
|
||||
log.Debug("Indexed state history", "id", historyID)
|
||||
return nil
|
||||
}
|
||||
|
||||
// unindexSingle processes the state history with the specified ID for unindexing.
|
||||
func unindexSingle(historyID uint64, db ethdb.KeyValueStore, freezer ethdb.AncientReader) error {
|
||||
defer func(start time.Time) {
|
||||
unindexHistoryTimer.UpdateSince(start)
|
||||
}(time.Now())
|
||||
|
||||
indexed := rawdb.ReadLastStateHistoryIndex(db)
|
||||
if indexed == nil || *indexed != historyID {
|
||||
last := "null"
|
||||
if indexed != nil {
|
||||
last = fmt.Sprintf("%v", *indexed)
|
||||
}
|
||||
return fmt.Errorf("history unindexing is out of order, last: %s, requested: %d", last, historyID)
|
||||
}
|
||||
h, err := readHistory(freezer, historyID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
b := newBatchIndexer(db, true)
|
||||
if err := b.process(h, historyID); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := b.finish(true); err != nil {
|
||||
return err
|
||||
}
|
||||
log.Debug("Unindexed state history", "id", historyID)
|
||||
return nil
|
||||
}
|
||||
|
||||
type interruptSignal struct {
|
||||
newLastID uint64
|
||||
result chan error
|
||||
}
|
||||
|
||||
type indexIniter struct {
|
||||
disk ethdb.KeyValueStore
|
||||
freezer ethdb.AncientStore
|
||||
interrupt chan *interruptSignal
|
||||
done chan struct{}
|
||||
closed chan struct{}
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
func newIndexIniter(disk ethdb.KeyValueStore, freezer ethdb.AncientStore, lastID uint64) *indexIniter {
|
||||
initer := &indexIniter{
|
||||
disk: disk,
|
||||
freezer: freezer,
|
||||
interrupt: make(chan *interruptSignal),
|
||||
done: make(chan struct{}),
|
||||
closed: make(chan struct{}),
|
||||
}
|
||||
initer.wg.Add(1)
|
||||
go initer.run(lastID)
|
||||
return initer
|
||||
}
|
||||
|
||||
func (i *indexIniter) close() {
|
||||
select {
|
||||
case <-i.closed:
|
||||
return
|
||||
default:
|
||||
close(i.closed)
|
||||
i.wg.Wait()
|
||||
}
|
||||
}
|
||||
|
||||
func (i *indexIniter) inited() bool {
|
||||
select {
|
||||
case <-i.closed:
|
||||
return false
|
||||
case <-i.done:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
func (i *indexIniter) run(lastID uint64) {
|
||||
defer i.wg.Done()
|
||||
|
||||
// Launch background indexing thread
|
||||
var (
|
||||
done = make(chan struct{})
|
||||
interrupt = new(atomic.Int32)
|
||||
|
||||
// checkDone indicates whether all requested state histories
|
||||
// have been fully indexed.
|
||||
checkDone = func() bool {
|
||||
indexed := rawdb.ReadLastStateHistoryIndex(i.disk)
|
||||
return indexed != nil && *indexed == lastID
|
||||
}
|
||||
)
|
||||
go i.index(done, interrupt, lastID)
|
||||
|
||||
for {
|
||||
select {
|
||||
case signal := <-i.interrupt:
|
||||
// The indexing limit can only be extended or shortened continuously.
|
||||
if signal.newLastID != lastID+1 && signal.newLastID != lastID-1 {
|
||||
signal.result <- fmt.Errorf("invalid history id, last: %d, got: %d", lastID, signal.newLastID)
|
||||
continue
|
||||
}
|
||||
// The index limit is extended by one, update the limit without
|
||||
// interrupting the current background process.
|
||||
if signal.newLastID == lastID+1 {
|
||||
lastID = signal.newLastID
|
||||
signal.result <- nil
|
||||
continue
|
||||
}
|
||||
// The index limit is shortened by one, interrupt the current background
|
||||
// process and relaunch with new target.
|
||||
interrupt.Store(1)
|
||||
<-done
|
||||
|
||||
// If all state histories, including the one to be reverted, have
|
||||
// been fully indexed, unindex it here and shut down the initializer.
|
||||
if checkDone() {
|
||||
if err := unindexSingle(lastID, i.disk, i.freezer); err != nil {
|
||||
signal.result <- err
|
||||
return
|
||||
}
|
||||
close(i.done)
|
||||
signal.result <- nil
|
||||
return
|
||||
}
|
||||
// Adjust the indexing target and relaunch the process
|
||||
lastID = signal.newLastID
|
||||
done, interrupt = make(chan struct{}), new(atomic.Int32)
|
||||
go i.index(done, interrupt, lastID)
|
||||
|
||||
case <-done:
|
||||
if checkDone() {
|
||||
close(i.done)
|
||||
return
|
||||
}
|
||||
// Relaunch the background runner if some tasks are left
|
||||
done, interrupt = make(chan struct{}), new(atomic.Int32)
|
||||
go i.index(done, interrupt, lastID)
|
||||
|
||||
case <-i.closed:
|
||||
interrupt.Store(1)
|
||||
log.Info("Waiting background history index initer to exit")
|
||||
<-done
|
||||
|
||||
if checkDone() {
|
||||
close(i.done)
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// next returns the ID of the next state history to be indexed.
|
||||
func (i *indexIniter) next() (uint64, error) {
|
||||
tail, err := i.freezer.Tail()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
tailID := tail + 1 // compute the id of the oldest history
|
||||
|
||||
// Start indexing from scratch if nothing has been indexed
|
||||
lastIndexed := rawdb.ReadLastStateHistoryIndex(i.disk)
|
||||
if lastIndexed == nil {
|
||||
return tailID, nil
|
||||
}
|
||||
// Resume indexing from the last interrupted position
|
||||
if *lastIndexed+1 >= tailID {
|
||||
return *lastIndexed + 1, nil
|
||||
}
|
||||
// History has been shortened without indexing. Discard the gapped segment
|
||||
// in the history and shift to the first available element.
|
||||
//
|
||||
// The missing indexes corresponding to the gapped histories won't be visible.
|
||||
// It's fine to leave them unindexed.
|
||||
log.Info("History gap detected, discard old segment", "oldHead", *lastIndexed, "newHead", tailID)
|
||||
return tailID, nil
|
||||
}
|
||||
|
||||
func (i *indexIniter) index(done chan struct{}, interrupt *atomic.Int32, lastID uint64) {
|
||||
defer close(done)
|
||||
|
||||
beginID, err := i.next()
|
||||
if err != nil {
|
||||
log.Error("Failed to find next state history for indexing", "err", err)
|
||||
return
|
||||
}
|
||||
// All available state histories have been indexed, and the last indexed one
|
||||
// exceeds the most recent available state history. This situation may occur
|
||||
// when the state is reverted manually (chain.SetHead) or the deep reorg is
|
||||
// encountered. In such cases, no indexing should be scheduled.
|
||||
if beginID > lastID {
|
||||
return
|
||||
}
|
||||
log.Info("Start history indexing", "beginID", beginID, "lastID", lastID)
|
||||
|
||||
var (
|
||||
current = beginID
|
||||
start = time.Now()
|
||||
logged = time.Now()
|
||||
batch = newBatchIndexer(i.disk, false)
|
||||
)
|
||||
for current <= lastID {
|
||||
count := lastID - current + 1
|
||||
if count > historyReadBatch {
|
||||
count = historyReadBatch
|
||||
}
|
||||
histories, err := readHistories(i.freezer, current, count)
|
||||
if err != nil {
|
||||
// The history read might fall if the history is truncated from
|
||||
// head due to revert operation.
|
||||
log.Error("Failed to read history for indexing", "current", current, "count", count, "err", err)
|
||||
return
|
||||
}
|
||||
for _, h := range histories {
|
||||
if err := batch.process(h, current); err != nil {
|
||||
log.Error("Failed to index history", "err", err)
|
||||
return
|
||||
}
|
||||
current += 1
|
||||
|
||||
// Occasionally report the indexing progress
|
||||
if time.Since(logged) > time.Second*8 {
|
||||
logged = time.Now()
|
||||
|
||||
var (
|
||||
left = lastID - current + 1
|
||||
done = current - beginID
|
||||
speed = done/uint64(time.Since(start)/time.Millisecond+1) + 1 // +1s to avoid division by zero
|
||||
)
|
||||
// Override the ETA if larger than the largest until now
|
||||
eta := time.Duration(left/speed) * time.Millisecond
|
||||
log.Info("Indexing state history", "counter", done, "left", left, "eta", common.PrettyDuration(eta))
|
||||
}
|
||||
}
|
||||
// Check interruption signal and abort process if it's fired
|
||||
if interrupt != nil {
|
||||
if signal := interrupt.Load(); signal != 0 {
|
||||
if err := batch.finish(true); err != nil {
|
||||
log.Error("Failed to flush index", "err", err)
|
||||
}
|
||||
log.Info("State indexing interrupted")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
if err := batch.finish(true); err != nil {
|
||||
log.Error("Failed to flush index", "err", err)
|
||||
}
|
||||
log.Info("Indexed state history", "from", beginID, "to", lastID, "elapsed", common.PrettyDuration(time.Since(start)))
|
||||
}
|
||||
|
||||
// historyIndexer manages the indexing and unindexing of state histories,
|
||||
// providing access to historical states.
|
||||
//
|
||||
// Upon initialization, historyIndexer starts a one-time background process
|
||||
// to complete the indexing of any remaining state histories. Once this
|
||||
// process is finished, all state histories are marked as fully indexed,
|
||||
// enabling handling of requests for historical states. Thereafter, any new
|
||||
// state histories must be indexed or unindexed synchronously, ensuring that
|
||||
// the history index is created or removed with the corresponding state history.
|
||||
type historyIndexer struct {
|
||||
initer *indexIniter
|
||||
disk ethdb.KeyValueStore
|
||||
freezer ethdb.AncientStore
|
||||
}
|
||||
|
||||
func newHistoryIndexer(disk ethdb.KeyValueStore, freezer ethdb.AncientStore, lastHistoryID uint64) *historyIndexer {
|
||||
return &historyIndexer{
|
||||
initer: newIndexIniter(disk, freezer, lastHistoryID),
|
||||
disk: disk,
|
||||
freezer: freezer,
|
||||
}
|
||||
}
|
||||
|
||||
func (i *historyIndexer) close() {
|
||||
i.initer.close()
|
||||
}
|
||||
|
||||
func (i *historyIndexer) inited() bool {
|
||||
return i.initer.inited()
|
||||
}
|
||||
|
||||
// extend sends the notification that new state history with specified ID
|
||||
// has been written into the database and is ready for indexing.
|
||||
func (i *historyIndexer) extend(historyID uint64) error {
|
||||
signal := &interruptSignal{
|
||||
newLastID: historyID,
|
||||
result: make(chan error, 1),
|
||||
}
|
||||
select {
|
||||
case <-i.initer.closed:
|
||||
return errors.New("indexer is closed")
|
||||
case <-i.initer.done:
|
||||
return indexSingle(historyID, i.disk, i.freezer)
|
||||
case i.initer.interrupt <- signal:
|
||||
return <-signal.result
|
||||
}
|
||||
}
|
||||
|
||||
// shorten sends the notification that state history with specified ID
|
||||
// is about to be deleted from the database and should be unindexed.
|
||||
func (i *historyIndexer) shorten(historyID uint64) error {
|
||||
signal := &interruptSignal{
|
||||
newLastID: historyID - 1,
|
||||
result: make(chan error, 1),
|
||||
}
|
||||
select {
|
||||
case <-i.initer.closed:
|
||||
return errors.New("indexer is closed")
|
||||
case <-i.initer.done:
|
||||
return unindexSingle(historyID, i.disk, i.freezer)
|
||||
case i.initer.interrupt <- signal:
|
||||
return <-signal.result
|
||||
}
|
||||
}
|
|
@ -0,0 +1,339 @@
|
|||
// Copyright 2025 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/
|
||||
|
||||
package pathdb
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"sort"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/core/rawdb"
|
||||
"github.com/ethereum/go-ethereum/ethdb"
|
||||
)
|
||||
|
||||
// stateIdent represents the identifier of a state element, which can be
|
||||
// either an account or a storage slot.
|
||||
type stateIdent struct {
|
||||
account bool
|
||||
address common.Address
|
||||
storageHash common.Hash // null if account is true, the hash of the raw storage slot key
|
||||
}
|
||||
|
||||
// String returns the string format state identifier.
|
||||
func (ident stateIdent) String() string {
|
||||
if ident.account {
|
||||
return ident.address.Hex()
|
||||
}
|
||||
return ident.address.Hex() + ident.storageHash.Hex()
|
||||
}
|
||||
|
||||
// newAccountIdent constructs a state identifier for an account.
|
||||
func newAccountIdent(address common.Address) stateIdent {
|
||||
return stateIdent{
|
||||
account: true,
|
||||
address: address,
|
||||
}
|
||||
}
|
||||
|
||||
// newStorageIdent constructs a state identifier for a storage slot.
|
||||
// The address denotes the address of the associated account;
|
||||
// the storageHash denotes the hash of the raw storage slot key;
|
||||
func newStorageIdent(address common.Address, storageHash common.Hash) stateIdent {
|
||||
return stateIdent{
|
||||
address: address,
|
||||
storageHash: storageHash,
|
||||
}
|
||||
}
|
||||
|
||||
// stateIdentQuery is the extension of stateIdent by adding the raw storage key.
|
||||
type stateIdentQuery struct {
|
||||
stateIdent
|
||||
storageKey common.Hash
|
||||
}
|
||||
|
||||
// newAccountIdentQuery constructs a state identifier for an account.
|
||||
func newAccountIdentQuery(address common.Address) stateIdentQuery {
|
||||
return stateIdentQuery{
|
||||
stateIdent: stateIdent{
|
||||
account: true,
|
||||
address: address,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// newStorageIdentQuery constructs a state identifier for a storage slot.
|
||||
// The address denotes the address of the associated account;
|
||||
// the storageKey denotes the raw storage slot key;
|
||||
// the storageHash denotes the hash of the raw storage slot key;
|
||||
func newStorageIdentQuery(address common.Address, storageKey common.Hash, storageHash common.Hash) stateIdentQuery {
|
||||
return stateIdentQuery{
|
||||
stateIdent: stateIdent{
|
||||
address: address,
|
||||
storageHash: storageHash,
|
||||
},
|
||||
storageKey: storageKey,
|
||||
}
|
||||
}
|
||||
|
||||
// indexReaderWithLimitTag is a wrapper around indexReader that includes an
|
||||
// additional index position. This position represents the ID of the last
|
||||
// indexed state history at the time the reader was created, implying that
|
||||
// indexes beyond this position are unavailable.
|
||||
type indexReaderWithLimitTag struct {
|
||||
reader *indexReader
|
||||
limit uint64
|
||||
db ethdb.KeyValueReader
|
||||
}
|
||||
|
||||
// newIndexReaderWithLimitTag constructs a index reader with indexing position.
|
||||
func newIndexReaderWithLimitTag(db ethdb.KeyValueReader, state stateIdent) (*indexReaderWithLimitTag, error) {
|
||||
// Read the last indexed ID before the index reader construction
|
||||
indexed := rawdb.ReadLastStateHistoryIndex(db)
|
||||
if indexed == nil {
|
||||
return nil, errors.New("state history hasn't been indexed yet")
|
||||
}
|
||||
r, err := newIndexReader(db, state)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &indexReaderWithLimitTag{
|
||||
reader: r,
|
||||
limit: *indexed,
|
||||
db: db,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// readGreaterThan locates the first element that is greater than the specified
|
||||
// value. If no such element is found, MaxUint64 is returned.
|
||||
//
|
||||
// Note: It is possible that additional histories have been indexed since the
|
||||
// reader was created. The reader should be refreshed as needed to load the
|
||||
// latest indexed data from disk.
|
||||
func (r *indexReaderWithLimitTag) readGreaterThan(id uint64, lastID uint64) (uint64, error) {
|
||||
// Mark the index reader as stale if the tracked indexing position moves
|
||||
// backward. This can occur if the pathdb is reverted and certain state
|
||||
// histories are unindexed. For simplicity, the reader is marked as stale
|
||||
// instead of being refreshed, as this scenario is highly unlikely.
|
||||
if r.limit > lastID {
|
||||
return 0, fmt.Errorf("index reader is stale, limit: %d, last-state-id: %d", r.limit, lastID)
|
||||
}
|
||||
// Try to find the element which is greater than the specified target
|
||||
res, err := r.reader.readGreaterThan(id)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
// Short circuit if the element is found within the current index
|
||||
if res != math.MaxUint64 {
|
||||
return res, nil
|
||||
}
|
||||
// The element was not found, and no additional histories have been indexed.
|
||||
// Return a not-found result.
|
||||
if r.limit == lastID {
|
||||
return res, nil
|
||||
}
|
||||
// Refresh the index reader and give another attempt
|
||||
indexed := rawdb.ReadLastStateHistoryIndex(r.db)
|
||||
if indexed == nil || *indexed < lastID {
|
||||
return 0, errors.New("state history hasn't been indexed yet")
|
||||
}
|
||||
if err := r.reader.refresh(); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
r.limit = *indexed
|
||||
|
||||
return r.reader.readGreaterThan(id)
|
||||
}
|
||||
|
||||
// historyReader is the structure to access historic state data.
|
||||
type historyReader struct {
|
||||
disk ethdb.KeyValueReader
|
||||
freezer ethdb.AncientReader
|
||||
readers map[string]*indexReaderWithLimitTag
|
||||
}
|
||||
|
||||
// newHistoryReader constructs the history reader with the supplied db.
|
||||
func newHistoryReader(disk ethdb.KeyValueReader, freezer ethdb.AncientReader) *historyReader {
|
||||
return &historyReader{
|
||||
disk: disk,
|
||||
freezer: freezer,
|
||||
readers: make(map[string]*indexReaderWithLimitTag),
|
||||
}
|
||||
}
|
||||
|
||||
// readAccountMetadata resolves the account metadata within the specified
|
||||
// state history.
|
||||
func (r *historyReader) readAccountMetadata(address common.Address, historyID uint64) ([]byte, error) {
|
||||
blob := rawdb.ReadStateAccountIndex(r.freezer, historyID)
|
||||
if len(blob)%accountIndexSize != 0 {
|
||||
return nil, fmt.Errorf("account index is corrupted, historyID: %d", historyID)
|
||||
}
|
||||
n := len(blob) / accountIndexSize
|
||||
|
||||
pos := sort.Search(n, func(i int) bool {
|
||||
h := blob[accountIndexSize*i : accountIndexSize*i+common.HashLength]
|
||||
return bytes.Compare(h, address.Bytes()) >= 0
|
||||
})
|
||||
if pos == n {
|
||||
return nil, fmt.Errorf("account %#x is not found", address)
|
||||
}
|
||||
offset := accountIndexSize * pos
|
||||
if address != common.BytesToAddress(blob[offset:offset+common.AddressLength]) {
|
||||
return nil, fmt.Errorf("account %#x is not found", address)
|
||||
}
|
||||
return blob[offset : accountIndexSize*(pos+1)], nil
|
||||
}
|
||||
|
||||
// readStorageMetadata resolves the storage slot metadata within the specified
|
||||
// state history.
|
||||
func (r *historyReader) readStorageMetadata(storageKey common.Hash, storageHash common.Hash, historyID uint64, slotOffset, slotNumber int) ([]byte, error) {
|
||||
// TODO(rj493456442) optimize it with partial read
|
||||
blob := rawdb.ReadStateStorageIndex(r.freezer, historyID)
|
||||
if len(blob)%slotIndexSize != 0 {
|
||||
return nil, fmt.Errorf("storage indices is corrupted, historyID: %d", historyID)
|
||||
}
|
||||
if slotIndexSize*(slotOffset+slotNumber) > len(blob) {
|
||||
return nil, errors.New("out of slice")
|
||||
}
|
||||
subSlice := blob[slotIndexSize*slotOffset : slotIndexSize*(slotOffset+slotNumber)]
|
||||
|
||||
// TODO(rj493456442) get rid of the metadata resolution
|
||||
var (
|
||||
m meta
|
||||
target common.Hash
|
||||
)
|
||||
blob = rawdb.ReadStateHistoryMeta(r.freezer, historyID)
|
||||
if err := m.decode(blob); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if m.version == stateHistoryV0 {
|
||||
target = storageHash
|
||||
} else {
|
||||
target = storageKey
|
||||
}
|
||||
pos := sort.Search(slotNumber, func(i int) bool {
|
||||
slotID := subSlice[slotIndexSize*i : slotIndexSize*i+common.HashLength]
|
||||
return bytes.Compare(slotID, target.Bytes()) >= 0
|
||||
})
|
||||
if pos == slotNumber {
|
||||
return nil, fmt.Errorf("storage metadata is not found, slot key: %#x, historyID: %d", storageKey, historyID)
|
||||
}
|
||||
offset := slotIndexSize * pos
|
||||
if target != common.BytesToHash(subSlice[offset:offset+common.HashLength]) {
|
||||
return nil, fmt.Errorf("storage metadata is not found, slot key: %#x, historyID: %d", storageKey, historyID)
|
||||
}
|
||||
return subSlice[offset : slotIndexSize*(pos+1)], nil
|
||||
}
|
||||
|
||||
// readAccount retrieves the account data from the specified state history.
|
||||
func (r *historyReader) readAccount(address common.Address, historyID uint64) ([]byte, error) {
|
||||
metadata, err := r.readAccountMetadata(address, historyID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
length := int(metadata[common.AddressLength]) // one byte for account data length
|
||||
offset := int(binary.BigEndian.Uint32(metadata[common.AddressLength+1 : common.AddressLength+5])) // four bytes for the account data offset
|
||||
|
||||
// TODO(rj493456442) optimize it with partial read
|
||||
data := rawdb.ReadStateAccountHistory(r.freezer, historyID)
|
||||
if len(data) < length+offset {
|
||||
return nil, fmt.Errorf("account data is truncated, address: %#x, historyID: %d", address, historyID)
|
||||
}
|
||||
return data[offset : offset+length], nil
|
||||
}
|
||||
|
||||
// readStorage retrieves the storage slot data from the specified state history.
|
||||
func (r *historyReader) readStorage(address common.Address, storageKey common.Hash, storageHash common.Hash, historyID uint64) ([]byte, error) {
|
||||
metadata, err := r.readAccountMetadata(address, historyID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// slotIndexOffset:
|
||||
// The offset of storage indices associated with the specified account.
|
||||
// slotIndexNumber:
|
||||
// The number of storage indices associated with the specified account.
|
||||
slotIndexOffset := int(binary.BigEndian.Uint32(metadata[common.AddressLength+5 : common.AddressLength+9]))
|
||||
slotIndexNumber := int(binary.BigEndian.Uint32(metadata[common.AddressLength+9 : common.AddressLength+13]))
|
||||
|
||||
slotMetadata, err := r.readStorageMetadata(storageKey, storageHash, historyID, slotIndexOffset, slotIndexNumber)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
length := int(slotMetadata[common.HashLength]) // one byte for slot data length
|
||||
offset := int(binary.BigEndian.Uint32(slotMetadata[common.HashLength+1 : common.HashLength+5])) // four bytes for slot data offset
|
||||
|
||||
// TODO(rj493456442) optimize it with partial read
|
||||
data := rawdb.ReadStateStorageHistory(r.freezer, historyID)
|
||||
if len(data) < offset+length {
|
||||
return nil, errors.New("corrupted storage data")
|
||||
}
|
||||
return data[offset : offset+length], nil
|
||||
}
|
||||
|
||||
// read retrieves the state element data associated with the stateID.
|
||||
// stateID: represents the ID of the state of the specified version;
|
||||
// lastID: represents the ID of the latest/newest state history;
|
||||
// latestValue: represents the state value at the current disk layer with ID == lastID;
|
||||
func (r *historyReader) read(state stateIdentQuery, stateID uint64, lastID uint64, latestValue []byte) ([]byte, error) {
|
||||
tail, err := r.freezer.Tail()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// stateID == tail is allowed, as the first history object preserved
|
||||
// is tail+1
|
||||
if stateID < tail {
|
||||
return nil, errors.New("historical state has been pruned")
|
||||
}
|
||||
lastIndexedID := rawdb.ReadLastStateHistoryIndex(r.disk)
|
||||
|
||||
// To serve the request, all state histories from stateID+1 to lastID
|
||||
// must be indexed
|
||||
if lastIndexedID == nil || *lastIndexedID < lastID {
|
||||
indexed := "null"
|
||||
if lastIndexedID != nil {
|
||||
indexed = fmt.Sprintf("%d", *lastIndexedID)
|
||||
}
|
||||
return nil, fmt.Errorf("state history is not fully indexed, requested: %d, indexed: %s", stateID, indexed)
|
||||
}
|
||||
|
||||
// Construct the index reader to locate the corresponding history for
|
||||
// state retrieval
|
||||
ir, ok := r.readers[state.String()]
|
||||
if !ok {
|
||||
ir, err = newIndexReaderWithLimitTag(r.disk, state.stateIdent)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
r.readers[state.String()] = ir
|
||||
}
|
||||
historyID, err := ir.readGreaterThan(stateID, lastID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// The state was not found in the state histories, as it has not been modified
|
||||
// since stateID. Use the data from the associated disk layer instead.
|
||||
if historyID == math.MaxUint64 {
|
||||
return latestValue, nil
|
||||
}
|
||||
if state.account {
|
||||
return r.readAccount(state.address, historyID)
|
||||
}
|
||||
return r.readStorage(state.address, state.storageKey, state.storageHash, historyID)
|
||||
}
|
|
@ -0,0 +1,159 @@
|
|||
// Copyright 2025 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/
|
||||
|
||||
package pathdb
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/core/rawdb"
|
||||
)
|
||||
|
||||
func waitIndexing(db *Database) {
|
||||
for {
|
||||
id := rawdb.ReadLastStateHistoryIndex(db.diskdb)
|
||||
if id != nil && *id >= db.tree.bottom().stateID() {
|
||||
return
|
||||
}
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
}
|
||||
|
||||
func checkHistoricState(env *tester, root common.Hash, hr *historyReader) error {
|
||||
// Short circuit if the historical state is no longer available
|
||||
if rawdb.ReadStateID(env.db.diskdb, root) == nil {
|
||||
return nil
|
||||
}
|
||||
var (
|
||||
dl = env.db.tree.bottom()
|
||||
stateID = rawdb.ReadStateID(env.db.diskdb, root)
|
||||
accounts = env.snapAccounts[root]
|
||||
storages = env.snapStorages[root]
|
||||
)
|
||||
for addrHash, accountData := range accounts {
|
||||
latest, _ := dl.account(addrHash, 0)
|
||||
blob, err := hr.read(newAccountIdentQuery(env.accountPreimage(addrHash)), *stateID, dl.stateID(), latest)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !bytes.Equal(accountData, blob) {
|
||||
return fmt.Errorf("wrong account data, expected %x, got %x", accountData, blob)
|
||||
}
|
||||
}
|
||||
for i := 0; i < len(env.roots); i++ {
|
||||
if env.roots[i] == root {
|
||||
break
|
||||
}
|
||||
// Find all accounts deleted in the past, ensure the associated data is null
|
||||
for addrHash := range env.snapAccounts[env.roots[i]] {
|
||||
if _, ok := accounts[addrHash]; !ok {
|
||||
latest, _ := dl.account(addrHash, 0)
|
||||
blob, err := hr.read(newAccountIdentQuery(env.accountPreimage(addrHash)), *stateID, dl.stateID(), latest)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(blob) != 0 {
|
||||
return fmt.Errorf("wrong account data, expected null, got %x", blob)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
for addrHash, slots := range storages {
|
||||
for slotHash, slotData := range slots {
|
||||
latest, _ := dl.storage(addrHash, slotHash, 0)
|
||||
blob, err := hr.read(newStorageIdentQuery(env.accountPreimage(addrHash), env.hashPreimage(slotHash), slotHash), *stateID, dl.stateID(), latest)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !bytes.Equal(slotData, blob) {
|
||||
return fmt.Errorf("wrong storage data, expected %x, got %x", slotData, blob)
|
||||
}
|
||||
}
|
||||
}
|
||||
for i := 0; i < len(env.roots); i++ {
|
||||
if env.roots[i] == root {
|
||||
break
|
||||
}
|
||||
// Find all storage slots deleted in the past, ensure the associated data is null
|
||||
for addrHash, slots := range env.snapStorages[env.roots[i]] {
|
||||
for slotHash := range slots {
|
||||
_, ok := storages[addrHash]
|
||||
if ok {
|
||||
_, ok = storages[addrHash][slotHash]
|
||||
}
|
||||
if !ok {
|
||||
latest, _ := dl.storage(addrHash, slotHash, 0)
|
||||
blob, err := hr.read(newStorageIdentQuery(env.accountPreimage(addrHash), env.hashPreimage(slotHash), slotHash), *stateID, dl.stateID(), latest)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(blob) != 0 {
|
||||
return fmt.Errorf("wrong storage data, expected null, got %x", blob)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestHistoryReader(t *testing.T) {
|
||||
testHistoryReader(t, 0) // with all histories reserved
|
||||
testHistoryReader(t, 10) // with latest 10 histories reserved
|
||||
}
|
||||
|
||||
func testHistoryReader(t *testing.T, historyLimit uint64) {
|
||||
maxDiffLayers = 4
|
||||
defer func() {
|
||||
maxDiffLayers = 128
|
||||
}()
|
||||
//log.SetDefault(log.NewLogger(log.NewTerminalHandlerWithLevel(os.Stderr, log.LevelDebug, true)))
|
||||
|
||||
env := newTester(t, historyLimit, false, 64)
|
||||
defer env.release()
|
||||
waitIndexing(env.db)
|
||||
|
||||
var (
|
||||
roots = env.roots
|
||||
dRoot = env.db.tree.bottom().rootHash()
|
||||
hr = newHistoryReader(env.db.diskdb, env.db.freezer)
|
||||
)
|
||||
for _, root := range roots {
|
||||
if root == dRoot {
|
||||
break
|
||||
}
|
||||
if err := checkHistoricState(env, root, hr); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
// Pile up more histories on top, ensuring the historic reader is not affected
|
||||
env.extend(4)
|
||||
waitIndexing(env.db)
|
||||
|
||||
for _, root := range roots {
|
||||
if root == dRoot {
|
||||
break
|
||||
}
|
||||
if err := checkHistoricState(env, root, hr); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
|
@ -73,8 +73,14 @@ var (
|
|||
historyDataBytesMeter = metrics.NewRegisteredMeter("pathdb/history/bytes/data", nil)
|
||||
historyIndexBytesMeter = metrics.NewRegisteredMeter("pathdb/history/bytes/index", nil)
|
||||
|
||||
indexHistoryTimer = metrics.NewRegisteredResettingTimer("pathdb/history/index/time", nil)
|
||||
unindexHistoryTimer = metrics.NewRegisteredResettingTimer("pathdb/history/unindex/time", nil)
|
||||
|
||||
lookupAddLayerTimer = metrics.NewRegisteredResettingTimer("pathdb/lookup/add/time", nil)
|
||||
lookupRemoveLayerTimer = metrics.NewRegisteredResettingTimer("pathdb/lookup/remove/time", nil)
|
||||
|
||||
historicalAccountReadTimer = metrics.NewRegisteredResettingTimer("pathdb/history/account/reads", nil)
|
||||
historicalStorageReadTimer = metrics.NewRegisteredResettingTimer("pathdb/history/storage/reads", nil)
|
||||
)
|
||||
|
||||
// Metrics in generation
|
||||
|
|
|
@ -19,10 +19,13 @@ package pathdb
|
|||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/common/hexutil"
|
||||
"github.com/ethereum/go-ethereum/core/rawdb"
|
||||
"github.com/ethereum/go-ethereum/core/types"
|
||||
"github.com/ethereum/go-ethereum/crypto"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/rlp"
|
||||
"github.com/ethereum/go-ethereum/triedb/database"
|
||||
|
@ -182,3 +185,109 @@ func (db *Database) StateReader(root common.Hash) (database.StateReader, error)
|
|||
layer: layer,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// HistoricalStateReader is a wrapper over history reader, providing access to
|
||||
// historical state.
|
||||
type HistoricalStateReader struct {
|
||||
db *Database
|
||||
reader *historyReader
|
||||
id uint64
|
||||
}
|
||||
|
||||
// HistoricReader constructs a reader for accessing the requested historic state.
|
||||
func (db *Database) HistoricReader(root common.Hash) (*HistoricalStateReader, error) {
|
||||
// Bail out if the state history hasn't been fully indexed
|
||||
if db.indexer == nil || !db.indexer.inited() {
|
||||
return nil, errors.New("state histories haven't been fully indexed yet")
|
||||
}
|
||||
// States older than current disk layer (disk layer is included) are available
|
||||
// for accessing.
|
||||
id := rawdb.ReadStateID(db.diskdb, root)
|
||||
if id == nil {
|
||||
return nil, fmt.Errorf("state %#x is not available", root)
|
||||
}
|
||||
return &HistoricalStateReader{
|
||||
id: *id,
|
||||
db: db,
|
||||
reader: newHistoryReader(db.diskdb, db.freezer),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// AccountRLP directly retrieves the account RLP associated with a particular
|
||||
// address in the slim data format. An error will be returned if the read
|
||||
// operation exits abnormally. Specifically, if the layer is already stale.
|
||||
//
|
||||
// Note:
|
||||
// - the returned account is not a copy, please don't modify it.
|
||||
// - no error will be returned if the requested account is not found in database.
|
||||
func (r *HistoricalStateReader) AccountRLP(address common.Address) ([]byte, error) {
|
||||
defer func(start time.Time) {
|
||||
historicalAccountReadTimer.UpdateSince(start)
|
||||
}(time.Now())
|
||||
|
||||
// TODO(rjl493456442): Theoretically, the obtained disk layer could become stale
|
||||
// within a very short time window.
|
||||
//
|
||||
// While reading the account data while holding `db.tree.lock` can resolve
|
||||
// this issue, but it will introduce a heavy contention over the lock.
|
||||
//
|
||||
// Let's optimistically assume the situation is very unlikely to happen,
|
||||
// and try to define a low granularity lock if the current approach doesn't
|
||||
// work later.
|
||||
dl := r.db.tree.bottom()
|
||||
latest, err := dl.account(crypto.Keccak256Hash(address.Bytes()), 0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return r.reader.read(newAccountIdentQuery(address), r.id, dl.stateID(), latest)
|
||||
}
|
||||
|
||||
// Account directly retrieves the account associated with a particular address in
|
||||
// the slim data format. An error will be returned if the read operation exits
|
||||
// abnormally. Specifically, if the layer is already stale.
|
||||
//
|
||||
// No error will be returned if the requested account is not found in database
|
||||
func (r *HistoricalStateReader) Account(address common.Address) (*types.SlimAccount, error) {
|
||||
blob, err := r.AccountRLP(address)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(blob) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
account := new(types.SlimAccount)
|
||||
if err := rlp.DecodeBytes(blob, account); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return account, nil
|
||||
}
|
||||
|
||||
// Storage directly retrieves the storage data associated with a particular key,
|
||||
// within a particular account. An error will be returned if the read operation
|
||||
// exits abnormally. Specifically, if the layer is already stale.
|
||||
//
|
||||
// Note:
|
||||
// - the returned storage data is not a copy, please don't modify it.
|
||||
// - no error will be returned if the requested slot is not found in database.
|
||||
func (r *HistoricalStateReader) Storage(address common.Address, key common.Hash) ([]byte, error) {
|
||||
defer func(start time.Time) {
|
||||
historicalStorageReadTimer.UpdateSince(start)
|
||||
}(time.Now())
|
||||
|
||||
// TODO(rjl493456442): Theoretically, the obtained disk layer could become stale
|
||||
// within a very short time window.
|
||||
//
|
||||
// While reading the account data while holding `db.tree.lock` can resolve
|
||||
// this issue, but it will introduce a heavy contention over the lock.
|
||||
//
|
||||
// Let's optimistically assume the situation is very unlikely to happen,
|
||||
// and try to define a low granularity lock if the current approach doesn't
|
||||
// work later.
|
||||
dl := r.db.tree.bottom()
|
||||
keyHash := crypto.Keccak256Hash(key.Bytes())
|
||||
latest, err := dl.storage(crypto.Keccak256Hash(address.Bytes()), keyHash, 0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return r.reader.read(newStorageIdentQuery(address, key, keyHash), r.id, dl.stateID(), latest)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue