core/rawdb: implement in-memory freezer (#29135)

This commit is contained in:
rjl493456442 2024-04-30 17:33:22 +08:00 committed by GitHub
parent c04b8e6d74
commit f46c878441
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 1014 additions and 151 deletions

View File

@ -16,7 +16,11 @@
package rawdb package rawdb
import "path/filepath" import (
"path/filepath"
"github.com/ethereum/go-ethereum/ethdb"
)
// The list of table names of chain freezer. // The list of table names of chain freezer.
const ( const (
@ -75,7 +79,15 @@ var (
// freezers the collections of all builtin freezers. // freezers the collections of all builtin freezers.
var freezers = []string{ChainFreezerName, StateFreezerName} var freezers = []string{ChainFreezerName, StateFreezerName}
// NewStateFreezer initializes the freezer for state history. // NewStateFreezer initializes the ancient store for state history.
func NewStateFreezer(ancientDir string, readOnly bool) (*ResettableFreezer, error) { //
return NewResettableFreezer(filepath.Join(ancientDir, StateFreezerName), "eth/db/state", readOnly, stateHistoryTableSize, stateFreezerNoSnappy) // - if the empty directory is given, initializes the pure in-memory
// state freezer (e.g. dev mode).
// - if non-empty directory is given, initializes the regular file-based
// state freezer.
func NewStateFreezer(ancientDir string, readOnly bool) (ethdb.ResettableAncientStore, error) {
if ancientDir == "" {
return NewMemoryFreezer(readOnly, stateFreezerNoSnappy), nil
}
return newResettableFreezer(filepath.Join(ancientDir, StateFreezerName), "eth/db/state", readOnly, stateHistoryTableSize, stateFreezerNoSnappy)
} }

View File

@ -0,0 +1,325 @@
// Copyright 2024 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package ancienttest
import (
"bytes"
"reflect"
"testing"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/internal/testrand"
)
// TestAncientSuite runs a suite of tests against an ancient database
// implementation.
func TestAncientSuite(t *testing.T, newFn func(kinds []string) ethdb.AncientStore) {
// Test basic read methods
t.Run("BasicRead", func(t *testing.T) { basicRead(t, newFn) })
// Test batch read method
t.Run("BatchRead", func(t *testing.T) { batchRead(t, newFn) })
// Test basic write methods
t.Run("BasicWrite", func(t *testing.T) { basicWrite(t, newFn) })
// Test if data mutation is allowed after db write
t.Run("nonMutable", func(t *testing.T) { nonMutable(t, newFn) })
}
func basicRead(t *testing.T, newFn func(kinds []string) ethdb.AncientStore) {
var (
db = newFn([]string{"a"})
data = makeDataset(100, 32)
)
defer db.Close()
db.ModifyAncients(func(op ethdb.AncientWriteOp) error {
for i := 0; i < len(data); i++ {
op.AppendRaw("a", uint64(i), data[i])
}
return nil
})
db.TruncateTail(10)
db.TruncateHead(90)
// Test basic tail and head retrievals
tail, err := db.Tail()
if err != nil || tail != 10 {
t.Fatal("Failed to retrieve tail")
}
ancient, err := db.Ancients()
if err != nil || ancient != 90 {
t.Fatal("Failed to retrieve ancient")
}
// Test the deleted items shouldn't be reachable
var cases = []struct {
start int
limit int
}{
{0, 10},
{90, 100},
}
for _, c := range cases {
for i := c.start; i < c.limit; i++ {
exist, err := db.HasAncient("a", uint64(i))
if err != nil {
t.Fatalf("Failed to check presence, %v", err)
}
if exist {
t.Fatalf("Item %d is already truncated", uint64(i))
}
_, err = db.Ancient("a", uint64(i))
if err == nil {
t.Fatal("Error is expected for non-existent item")
}
}
}
// Test the items in range should be reachable
for i := 10; i < 90; i++ {
exist, err := db.HasAncient("a", uint64(i))
if err != nil {
t.Fatalf("Failed to check presence, %v", err)
}
if !exist {
t.Fatalf("Item %d is missing", uint64(i))
}
blob, err := db.Ancient("a", uint64(i))
if err != nil {
t.Fatalf("Failed to retrieve item, %v", err)
}
if !bytes.Equal(blob, data[i]) {
t.Fatalf("Unexpected item content, want: %v, got: %v", data[i], blob)
}
}
// Test the items in unknown table shouldn't be reachable
exist, err := db.HasAncient("b", uint64(0))
if err != nil {
t.Fatalf("Failed to check presence, %v", err)
}
if exist {
t.Fatal("Item in unknown table shouldn't be found")
}
_, err = db.Ancient("b", uint64(0))
if err == nil {
t.Fatal("Error is expected for unknown table")
}
}
func batchRead(t *testing.T, newFn func(kinds []string) ethdb.AncientStore) {
var (
db = newFn([]string{"a"})
data = makeDataset(100, 32)
)
defer db.Close()
db.ModifyAncients(func(op ethdb.AncientWriteOp) error {
for i := 0; i < 100; i++ {
op.AppendRaw("a", uint64(i), data[i])
}
return nil
})
db.TruncateTail(10)
db.TruncateHead(90)
// Test the items in range should be reachable
var cases = []struct {
start uint64
count uint64
maxSize uint64
expStart int
expLimit int
}{
// Items in range [10, 90) with no size limitation
{
10, 80, 0, 10, 90,
},
// Items in range [10, 90) with 32 size cap, single item is expected
{
10, 80, 32, 10, 11,
},
// Items in range [10, 90) with 31 size cap, single item is expected
{
10, 80, 31, 10, 11,
},
// Items in range [10, 90) with 32*80 size cap, all items are expected
{
10, 80, 32 * 80, 10, 90,
},
// Extra items above the last item are not returned
{
10, 90, 0, 10, 90,
},
}
for i, c := range cases {
batch, err := db.AncientRange("a", c.start, c.count, c.maxSize)
if err != nil {
t.Fatalf("Failed to retrieve item in range, %v", err)
}
if !reflect.DeepEqual(batch, data[c.expStart:c.expLimit]) {
t.Fatalf("Case %d, Batch content is not matched", i)
}
}
// Test out-of-range / zero-size retrieval should be rejected
_, err := db.AncientRange("a", 0, 1, 0)
if err == nil {
t.Fatal("Out-of-range retrieval should be rejected")
}
_, err = db.AncientRange("a", 90, 1, 0)
if err == nil {
t.Fatal("Out-of-range retrieval should be rejected")
}
_, err = db.AncientRange("a", 10, 0, 0)
if err == nil {
t.Fatal("Zero-size retrieval should be rejected")
}
// Test item in unknown table shouldn't be reachable
_, err = db.AncientRange("b", 10, 1, 0)
if err == nil {
t.Fatal("Item in unknown table shouldn't be found")
}
}
func basicWrite(t *testing.T, newFn func(kinds []string) ethdb.AncientStore) {
var (
db = newFn([]string{"a", "b"})
dataA = makeDataset(100, 32)
dataB = makeDataset(100, 32)
)
defer db.Close()
// The ancient write to tables should be aligned
_, err := db.ModifyAncients(func(op ethdb.AncientWriteOp) error {
for i := 0; i < 100; i++ {
op.AppendRaw("a", uint64(i), dataA[i])
}
return nil
})
if err == nil {
t.Fatal("Unaligned ancient write should be rejected")
}
// Test normal ancient write
size, err := db.ModifyAncients(func(op ethdb.AncientWriteOp) error {
for i := 0; i < 100; i++ {
op.AppendRaw("a", uint64(i), dataA[i])
op.AppendRaw("b", uint64(i), dataB[i])
}
return nil
})
if err != nil {
t.Fatalf("Failed to write ancient data %v", err)
}
wantSize := int64(6400)
if size != wantSize {
t.Fatalf("Ancient write size is not expected, want: %d, got: %d", wantSize, size)
}
// Write should work after head truncating
db.TruncateHead(90)
_, err = db.ModifyAncients(func(op ethdb.AncientWriteOp) error {
for i := 90; i < 100; i++ {
op.AppendRaw("a", uint64(i), dataA[i])
op.AppendRaw("b", uint64(i), dataB[i])
}
return nil
})
if err != nil {
t.Fatalf("Failed to write ancient data %v", err)
}
// Write should work after truncating everything
db.TruncateTail(0)
_, err = db.ModifyAncients(func(op ethdb.AncientWriteOp) error {
for i := 0; i < 100; i++ {
op.AppendRaw("a", uint64(i), dataA[i])
op.AppendRaw("b", uint64(i), dataB[i])
}
return nil
})
if err != nil {
t.Fatalf("Failed to write ancient data %v", err)
}
}
func nonMutable(t *testing.T, newFn func(kinds []string) ethdb.AncientStore) {
db := newFn([]string{"a"})
defer db.Close()
// We write 100 zero-bytes to the freezer and immediately mutate the slice
db.ModifyAncients(func(op ethdb.AncientWriteOp) error {
data := make([]byte, 100)
op.AppendRaw("a", uint64(0), data)
for i := range data {
data[i] = 0xff
}
return nil
})
// Now read it.
data, err := db.Ancient("a", uint64(0))
if err != nil {
t.Fatal(err)
}
for k, v := range data {
if v != 0 {
t.Fatalf("byte %d != 0: %x", k, v)
}
}
}
// TestResettableAncientSuite runs a suite of tests against a resettable ancient
// database implementation.
func TestResettableAncientSuite(t *testing.T, newFn func(kinds []string) ethdb.ResettableAncientStore) {
t.Run("Reset", func(t *testing.T) {
var (
db = newFn([]string{"a"})
data = makeDataset(100, 32)
)
defer db.Close()
db.ModifyAncients(func(op ethdb.AncientWriteOp) error {
for i := 0; i < 100; i++ {
op.AppendRaw("a", uint64(i), data[i])
}
return nil
})
db.TruncateTail(10)
db.TruncateHead(90)
// Ancient write should work after resetting
db.Reset()
db.ModifyAncients(func(op ethdb.AncientWriteOp) error {
for i := 0; i < 100; i++ {
op.AppendRaw("a", uint64(i), data[i])
}
return nil
})
})
}
func makeDataset(size, value int) [][]byte {
var vals [][]byte
for i := 0; i < size; i += 1 {
vals = append(vals, testrand.Bytes(value))
}
return vals
}

View File

@ -39,24 +39,38 @@ const (
freezerBatchLimit = 30000 freezerBatchLimit = 30000
) )
// chainFreezer is a wrapper of freezer with additional chain freezing feature. // chainFreezer is a wrapper of chain ancient store with additional chain freezing
// The background thread will keep moving ancient chain segments from key-value // feature. The background thread will keep moving ancient chain segments from
// database to flat files for saving space on live database. // key-value database to flat files for saving space on live database.
type chainFreezer struct { type chainFreezer struct {
*Freezer ethdb.AncientStore // Ancient store for storing cold chain segment
quit chan struct{} quit chan struct{}
wg sync.WaitGroup wg sync.WaitGroup
trigger chan chan struct{} // Manual blocking freeze trigger, test determinism trigger chan chan struct{} // Manual blocking freeze trigger, test determinism
} }
// newChainFreezer initializes the freezer for ancient chain data. // newChainFreezer initializes the freezer for ancient chain segment.
//
// - if the empty directory is given, initializes the pure in-memory
// state freezer (e.g. dev mode).
// - if non-empty directory is given, initializes the regular file-based
// state freezer.
func newChainFreezer(datadir string, namespace string, readonly bool) (*chainFreezer, error) { func newChainFreezer(datadir string, namespace string, readonly bool) (*chainFreezer, error) {
freezer, err := NewChainFreezer(datadir, namespace, readonly) var (
err error
freezer ethdb.AncientStore
)
if datadir == "" {
freezer = NewMemoryFreezer(readonly, chainFreezerNoSnappy)
} else {
freezer, err = NewFreezer(datadir, namespace, readonly, freezerTableSize, chainFreezerNoSnappy)
}
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &chainFreezer{ return &chainFreezer{
Freezer: freezer, AncientStore: freezer,
quit: make(chan struct{}), quit: make(chan struct{}),
trigger: make(chan chan struct{}), trigger: make(chan chan struct{}),
}, nil }, nil
@ -70,7 +84,7 @@ func (f *chainFreezer) Close() error {
close(f.quit) close(f.quit)
} }
f.wg.Wait() f.wg.Wait()
return f.Freezer.Close() return f.AncientStore.Close()
} }
// readHeadNumber returns the number of chain head block. 0 is returned if the // readHeadNumber returns the number of chain head block. 0 is returned if the
@ -167,7 +181,7 @@ func (f *chainFreezer) freeze(db ethdb.KeyValueStore) {
log.Debug("Current full block not old enough to freeze", "err", err) log.Debug("Current full block not old enough to freeze", "err", err)
continue continue
} }
frozen := f.frozen.Load() frozen, _ := f.Ancients() // no error will occur, safe to ignore
// Short circuit if the blocks below threshold are already frozen. // Short circuit if the blocks below threshold are already frozen.
if frozen != 0 && frozen-1 >= threshold { if frozen != 0 && frozen-1 >= threshold {
@ -190,7 +204,7 @@ func (f *chainFreezer) freeze(db ethdb.KeyValueStore) {
backoff = true backoff = true
continue continue
} }
// Batch of blocks have been frozen, flush them before wiping from leveldb // Batch of blocks have been frozen, flush them before wiping from key-value store
if err := f.Sync(); err != nil { if err := f.Sync(); err != nil {
log.Crit("Failed to flush frozen tables", "err", err) log.Crit("Failed to flush frozen tables", "err", err)
} }
@ -210,7 +224,7 @@ func (f *chainFreezer) freeze(db ethdb.KeyValueStore) {
// Wipe out side chains also and track dangling side chains // Wipe out side chains also and track dangling side chains
var dangling []common.Hash var dangling []common.Hash
frozen = f.frozen.Load() // Needs reload after during freezeRange frozen, _ = f.Ancients() // Needs reload after during freezeRange
for number := first; number < frozen; number++ { for number := first; number < frozen; number++ {
// Always keep the genesis block in active database // Always keep the genesis block in active database
if number != 0 { if number != 0 {

View File

@ -34,11 +34,13 @@ import (
"github.com/olekukonko/tablewriter" "github.com/olekukonko/tablewriter"
) )
// freezerdb is a database wrapper that enables freezer data retrievals. // freezerdb is a database wrapper that enables ancient chain segment freezing.
type freezerdb struct { type freezerdb struct {
ancientRoot string
ethdb.KeyValueStore ethdb.KeyValueStore
ethdb.AncientStore *chainFreezer
readOnly bool
ancientRoot string
} }
// AncientDatadir returns the path of root ancient directory. // AncientDatadir returns the path of root ancient directory.
@ -50,7 +52,7 @@ func (frdb *freezerdb) AncientDatadir() (string, error) {
// the slow ancient tables. // the slow ancient tables.
func (frdb *freezerdb) Close() error { func (frdb *freezerdb) Close() error {
var errs []error var errs []error
if err := frdb.AncientStore.Close(); err != nil { if err := frdb.chainFreezer.Close(); err != nil {
errs = append(errs, err) errs = append(errs, err)
} }
if err := frdb.KeyValueStore.Close(); err != nil { if err := frdb.KeyValueStore.Close(); err != nil {
@ -66,12 +68,12 @@ func (frdb *freezerdb) Close() error {
// a freeze cycle completes, without having to sleep for a minute to trigger the // a freeze cycle completes, without having to sleep for a minute to trigger the
// automatic background run. // automatic background run.
func (frdb *freezerdb) Freeze() error { func (frdb *freezerdb) Freeze() error {
if frdb.AncientStore.(*chainFreezer).readonly { if frdb.readOnly {
return errReadOnly return errReadOnly
} }
// Trigger a freeze cycle and block until it's done // Trigger a freeze cycle and block until it's done
trigger := make(chan struct{}, 1) trigger := make(chan struct{}, 1)
frdb.AncientStore.(*chainFreezer).trigger <- trigger frdb.chainFreezer.trigger <- trigger
<-trigger <-trigger
return nil return nil
} }
@ -192,8 +194,13 @@ func resolveChainFreezerDir(ancient string) string {
// storage. The passed ancient indicates the path of root ancient directory // storage. The passed ancient indicates the path of root ancient directory
// where the chain freezer can be opened. // where the chain freezer can be opened.
func NewDatabaseWithFreezer(db ethdb.KeyValueStore, ancient string, namespace string, readonly bool) (ethdb.Database, error) { func NewDatabaseWithFreezer(db ethdb.KeyValueStore, ancient string, namespace string, readonly bool) (ethdb.Database, error) {
// Create the idle freezer instance // Create the idle freezer instance. If the given ancient directory is empty,
frdb, err := newChainFreezer(resolveChainFreezerDir(ancient), namespace, readonly) // in-memory chain freezer is used (e.g. dev mode); otherwise the regular
// file-based freezer is created.
if ancient != "" {
ancient = resolveChainFreezerDir(ancient)
}
frdb, err := newChainFreezer(ancient, namespace, readonly)
if err != nil { if err != nil {
printChainMetadata(db) printChainMetadata(db)
return nil, err return nil, err
@ -277,7 +284,7 @@ func NewDatabaseWithFreezer(db ethdb.KeyValueStore, ancient string, namespace st
} }
} }
// Freezer is consistent with the key-value database, permit combining the two // Freezer is consistent with the key-value database, permit combining the two
if !frdb.readonly { if !readonly {
frdb.wg.Add(1) frdb.wg.Add(1)
go func() { go func() {
frdb.freeze(db) frdb.freeze(db)
@ -287,7 +294,7 @@ func NewDatabaseWithFreezer(db ethdb.KeyValueStore, ancient string, namespace st
return &freezerdb{ return &freezerdb{
ancientRoot: ancient, ancientRoot: ancient,
KeyValueStore: db, KeyValueStore: db,
AncientStore: frdb, chainFreezer: frdb,
}, nil }, nil
} }

View File

@ -62,7 +62,7 @@ const freezerTableSize = 2 * 1000 * 1000 * 1000
// reserving it for go-ethereum. This would also reduce the memory requirements // reserving it for go-ethereum. This would also reduce the memory requirements
// of Geth, and thus also GC overhead. // of Geth, and thus also GC overhead.
type Freezer struct { type Freezer struct {
frozen atomic.Uint64 // Number of blocks already frozen frozen atomic.Uint64 // Number of items already frozen
tail atomic.Uint64 // Number of the first stored item in the freezer tail atomic.Uint64 // Number of the first stored item in the freezer
// This lock synchronizes writers and the truncate operation, as well as // This lock synchronizes writers and the truncate operation, as well as
@ -76,12 +76,6 @@ type Freezer struct {
closeOnce sync.Once closeOnce sync.Once
} }
// NewChainFreezer is a small utility method around NewFreezer that sets the
// default parameters for the chain storage.
func NewChainFreezer(datadir string, namespace string, readonly bool) (*Freezer, error) {
return NewFreezer(datadir, namespace, readonly, freezerTableSize, chainFreezerNoSnappy)
}
// NewFreezer creates a freezer instance for maintaining immutable ordered // NewFreezer creates a freezer instance for maintaining immutable ordered
// data according to the given parameters. // data according to the given parameters.
// //

View File

@ -0,0 +1,428 @@
// Copyright 2024 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package rawdb
import (
"errors"
"fmt"
"sync"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/math"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
)
// memoryTable is used to store a list of sequential items in memory.
type memoryTable struct {
name string // Table name
items uint64 // Number of stored items in the table, including the deleted ones
offset uint64 // Number of deleted items from the table
data [][]byte // List of rlp-encoded items, sort in order
size uint64 // Total memory size occupied by the table
lock sync.RWMutex
}
// newMemoryTable initializes the memory table.
func newMemoryTable(name string) *memoryTable {
return &memoryTable{name: name}
}
// has returns an indicator whether the specified data exists.
func (t *memoryTable) has(number uint64) bool {
t.lock.RLock()
defer t.lock.RUnlock()
return number >= t.offset && number < t.items
}
// retrieve retrieves multiple items in sequence, starting from the index 'start'.
// It will return:
// - at most 'count' items,
// - if maxBytes is specified: at least 1 item (even if exceeding the maxByteSize),
// but will otherwise return as many items as fit into maxByteSize.
// - if maxBytes is not specified, 'count' items will be returned if they are present
func (t *memoryTable) retrieve(start uint64, count, maxBytes uint64) ([][]byte, error) {
t.lock.RLock()
defer t.lock.RUnlock()
var (
size uint64
batch [][]byte
)
// Ensure the start is written, not deleted from the tail, and that the
// caller actually wants something.
if t.items <= start || t.offset > start || count == 0 {
return nil, errOutOfBounds
}
// Cap the item count if the retrieval is out of bound.
if start+count > t.items {
count = t.items - start
}
for n := start; n < start+count; n++ {
index := n - t.offset
if len(batch) != 0 && maxBytes != 0 && size+uint64(len(t.data[index])) > maxBytes {
return batch, nil
}
batch = append(batch, t.data[index])
size += uint64(len(t.data[index]))
}
return batch, nil
}
// truncateHead discards any recent data above the provided threshold number.
func (t *memoryTable) truncateHead(items uint64) error {
t.lock.Lock()
defer t.lock.Unlock()
// Short circuit if nothing to delete.
if t.items <= items {
return nil
}
if items < t.offset {
return errors.New("truncation below tail")
}
t.data = t.data[:items-t.offset]
t.items = items
return nil
}
// truncateTail discards any recent data before the provided threshold number.
func (t *memoryTable) truncateTail(items uint64) error {
t.lock.Lock()
defer t.lock.Unlock()
// Short circuit if nothing to delete.
if t.offset >= items {
return nil
}
if t.items < items {
return errors.New("truncation above head")
}
t.data = t.data[items-t.offset:]
t.offset = items
return nil
}
// commit merges the given item batch into table. It's presumed that the
// batch is ordered and continuous with table.
func (t *memoryTable) commit(batch [][]byte) error {
t.lock.Lock()
defer t.lock.Unlock()
for _, item := range batch {
t.size += uint64(len(item))
}
t.data = append(t.data, batch...)
t.items += uint64(len(batch))
return nil
}
// memoryBatch is the singleton batch used for ancient write.
type memoryBatch struct {
data map[string][][]byte
next map[string]uint64
size map[string]int64
}
func newMemoryBatch() *memoryBatch {
return &memoryBatch{
data: make(map[string][][]byte),
next: make(map[string]uint64),
size: make(map[string]int64),
}
}
func (b *memoryBatch) reset(freezer *MemoryFreezer) {
b.data = make(map[string][][]byte)
b.next = make(map[string]uint64)
b.size = make(map[string]int64)
for name, table := range freezer.tables {
b.next[name] = table.items
}
}
// Append adds an RLP-encoded item.
func (b *memoryBatch) Append(kind string, number uint64, item interface{}) error {
if b.next[kind] != number {
return errOutOrderInsertion
}
blob, err := rlp.EncodeToBytes(item)
if err != nil {
return err
}
b.data[kind] = append(b.data[kind], blob)
b.next[kind]++
b.size[kind] += int64(len(blob))
return nil
}
// AppendRaw adds an item without RLP-encoding it.
func (b *memoryBatch) AppendRaw(kind string, number uint64, blob []byte) error {
if b.next[kind] != number {
return errOutOrderInsertion
}
b.data[kind] = append(b.data[kind], common.CopyBytes(blob))
b.next[kind]++
b.size[kind] += int64(len(blob))
return nil
}
// commit is called at the end of a write operation and writes all remaining
// data to tables.
func (b *memoryBatch) commit(freezer *MemoryFreezer) (items uint64, writeSize int64, err error) {
// Check that count agrees on all batches.
items = math.MaxUint64
for name, next := range b.next {
if items < math.MaxUint64 && next != items {
return 0, 0, fmt.Errorf("table %s is at item %d, want %d", name, next, items)
}
items = next
}
// Commit all table batches.
for name, batch := range b.data {
table := freezer.tables[name]
if err := table.commit(batch); err != nil {
return 0, 0, err
}
writeSize += b.size[name]
}
return items, writeSize, nil
}
// MemoryFreezer is an ephemeral ancient store. It implements the ethdb.AncientStore
// interface and can be used along with ephemeral key-value store.
type MemoryFreezer struct {
items uint64 // Number of items stored
tail uint64 // Number of the first stored item in the freezer
readonly bool // Flag if the freezer is only for reading
lock sync.RWMutex // Lock to protect fields
tables map[string]*memoryTable // Tables for storing everything
writeBatch *memoryBatch // Pre-allocated write batch
}
// NewMemoryFreezer initializes an in-memory freezer instance.
func NewMemoryFreezer(readonly bool, tableName map[string]bool) *MemoryFreezer {
tables := make(map[string]*memoryTable)
for name := range tableName {
tables[name] = newMemoryTable(name)
}
return &MemoryFreezer{
writeBatch: newMemoryBatch(),
readonly: readonly,
tables: tables,
}
}
// HasAncient returns an indicator whether the specified data exists.
func (f *MemoryFreezer) HasAncient(kind string, number uint64) (bool, error) {
f.lock.RLock()
defer f.lock.RUnlock()
if table := f.tables[kind]; table != nil {
return table.has(number), nil
}
return false, nil
}
// Ancient retrieves an ancient binary blob from the in-memory freezer.
func (f *MemoryFreezer) Ancient(kind string, number uint64) ([]byte, error) {
f.lock.RLock()
defer f.lock.RUnlock()
t := f.tables[kind]
if t == nil {
return nil, errUnknownTable
}
data, err := t.retrieve(number, 1, 0)
if err != nil {
return nil, err
}
return data[0], nil
}
// AncientRange retrieves multiple items in sequence, starting from the index 'start'.
// It will return
// - at most 'count' items,
// - if maxBytes is specified: at least 1 item (even if exceeding the maxByteSize),
// but will otherwise return as many items as fit into maxByteSize.
// - if maxBytes is not specified, 'count' items will be returned if they are present
func (f *MemoryFreezer) AncientRange(kind string, start, count, maxBytes uint64) ([][]byte, error) {
f.lock.RLock()
defer f.lock.RUnlock()
t := f.tables[kind]
if t == nil {
return nil, errUnknownTable
}
return t.retrieve(start, count, maxBytes)
}
// Ancients returns the ancient item numbers in the freezer.
func (f *MemoryFreezer) Ancients() (uint64, error) {
f.lock.RLock()
defer f.lock.RUnlock()
return f.items, nil
}
// Tail returns the number of first stored item in the freezer.
// This number can also be interpreted as the total deleted item numbers.
func (f *MemoryFreezer) Tail() (uint64, error) {
f.lock.RLock()
defer f.lock.RUnlock()
return f.tail, nil
}
// AncientSize returns the ancient size of the specified category.
func (f *MemoryFreezer) AncientSize(kind string) (uint64, error) {
f.lock.RLock()
defer f.lock.RUnlock()
if table := f.tables[kind]; table != nil {
return table.size, nil
}
return 0, errUnknownTable
}
// ReadAncients runs the given read operation while ensuring that no writes take place
// on the underlying freezer.
func (f *MemoryFreezer) ReadAncients(fn func(ethdb.AncientReaderOp) error) (err error) {
f.lock.RLock()
defer f.lock.RUnlock()
return fn(f)
}
// ModifyAncients runs the given write operation.
func (f *MemoryFreezer) ModifyAncients(fn func(ethdb.AncientWriteOp) error) (writeSize int64, err error) {
f.lock.Lock()
defer f.lock.Unlock()
if f.readonly {
return 0, errReadOnly
}
// Roll back all tables to the starting position in case of error.
defer func(old uint64) {
if err == nil {
return
}
// The write operation has failed. Go back to the previous item position.
for name, table := range f.tables {
err := table.truncateHead(old)
if err != nil {
log.Error("Freezer table roll-back failed", "table", name, "index", old, "err", err)
}
}
}(f.items)
// Modify the ancients in batch.
f.writeBatch.reset(f)
if err := fn(f.writeBatch); err != nil {
return 0, err
}
item, writeSize, err := f.writeBatch.commit(f)
if err != nil {
return 0, err
}
f.items = item
return writeSize, nil
}
// TruncateHead discards any recent data above the provided threshold number.
// It returns the previous head number.
func (f *MemoryFreezer) TruncateHead(items uint64) (uint64, error) {
f.lock.Lock()
defer f.lock.Unlock()
if f.readonly {
return 0, errReadOnly
}
old := f.items
if old <= items {
return old, nil
}
for _, table := range f.tables {
if err := table.truncateHead(items); err != nil {
return 0, err
}
}
f.items = items
return old, nil
}
// TruncateTail discards any recent data below the provided threshold number.
func (f *MemoryFreezer) TruncateTail(tail uint64) (uint64, error) {
f.lock.Lock()
defer f.lock.Unlock()
if f.readonly {
return 0, errReadOnly
}
old := f.tail
if old >= tail {
return old, nil
}
for _, table := range f.tables {
if err := table.truncateTail(tail); err != nil {
return 0, err
}
}
f.tail = tail
return old, nil
}
// Sync flushes all data tables to disk.
func (f *MemoryFreezer) Sync() error {
return nil
}
// MigrateTable processes and migrates entries of a given table to a new format.
// The second argument is a function that takes a raw entry and returns it
// in the newest format.
func (f *MemoryFreezer) MigrateTable(string, func([]byte) ([]byte, error)) error {
return errors.New("not implemented")
}
// Close releases all the sources held by the memory freezer. It will panic if
// any following invocation is made to a closed freezer.
func (f *MemoryFreezer) Close() error {
f.lock.Lock()
defer f.lock.Unlock()
f.tables = nil
f.writeBatch = nil
return nil
}
// Reset drops all the data cached in the memory freezer and reset itself
// back to default state.
func (f *MemoryFreezer) Reset() error {
f.lock.Lock()
defer f.lock.Unlock()
tables := make(map[string]*memoryTable)
for name := range f.tables {
tables[name] = newMemoryTable(name)
}
f.tables = tables
f.items, f.tail = 0, 0
return nil
}

View File

@ -0,0 +1,41 @@
// Copyright 2024 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package rawdb
import (
"testing"
"github.com/ethereum/go-ethereum/core/rawdb/ancienttest"
"github.com/ethereum/go-ethereum/ethdb"
)
func TestMemoryFreezer(t *testing.T) {
ancienttest.TestAncientSuite(t, func(kinds []string) ethdb.AncientStore {
tables := make(map[string]bool)
for _, kind := range kinds {
tables[kind] = true
}
return NewMemoryFreezer(false, tables)
})
ancienttest.TestResettableAncientSuite(t, func(kinds []string) ethdb.ResettableAncientStore {
tables := make(map[string]bool)
for _, kind := range kinds {
tables[kind] = true
}
return NewMemoryFreezer(false, tables)
})
}

View File

@ -30,16 +30,16 @@ const tmpSuffix = ".tmp"
// freezerOpenFunc is the function used to open/create a freezer. // freezerOpenFunc is the function used to open/create a freezer.
type freezerOpenFunc = func() (*Freezer, error) type freezerOpenFunc = func() (*Freezer, error)
// ResettableFreezer is a wrapper of the freezer which makes the // resettableFreezer is a wrapper of the freezer which makes the
// freezer resettable. // freezer resettable.
type ResettableFreezer struct { type resettableFreezer struct {
freezer *Freezer freezer *Freezer
opener freezerOpenFunc opener freezerOpenFunc
datadir string datadir string
lock sync.RWMutex lock sync.RWMutex
} }
// NewResettableFreezer creates a resettable freezer, note freezer is // newResettableFreezer creates a resettable freezer, note freezer is
// only resettable if the passed file directory is exclusively occupied // only resettable if the passed file directory is exclusively occupied
// by the freezer. And also the user-configurable ancient root directory // by the freezer. And also the user-configurable ancient root directory
// is **not** supported for reset since it might be a mount and rename // is **not** supported for reset since it might be a mount and rename
@ -48,7 +48,7 @@ type ResettableFreezer struct {
// //
// The reset function will delete directory atomically and re-create the // The reset function will delete directory atomically and re-create the
// freezer from scratch. // freezer from scratch.
func NewResettableFreezer(datadir string, namespace string, readonly bool, maxTableSize uint32, tables map[string]bool) (*ResettableFreezer, error) { func newResettableFreezer(datadir string, namespace string, readonly bool, maxTableSize uint32, tables map[string]bool) (*resettableFreezer, error) {
if err := cleanup(datadir); err != nil { if err := cleanup(datadir); err != nil {
return nil, err return nil, err
} }
@ -59,7 +59,7 @@ func NewResettableFreezer(datadir string, namespace string, readonly bool, maxTa
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &ResettableFreezer{ return &resettableFreezer{
freezer: freezer, freezer: freezer,
opener: opener, opener: opener,
datadir: datadir, datadir: datadir,
@ -70,7 +70,7 @@ func NewResettableFreezer(datadir string, namespace string, readonly bool, maxTa
// recreate the freezer from scratch. The atomicity of directory deletion // recreate the freezer from scratch. The atomicity of directory deletion
// is guaranteed by the rename operation, the leftover directory will be // is guaranteed by the rename operation, the leftover directory will be
// cleaned up in next startup in case crash happens after rename. // cleaned up in next startup in case crash happens after rename.
func (f *ResettableFreezer) Reset() error { func (f *resettableFreezer) Reset() error {
f.lock.Lock() f.lock.Lock()
defer f.lock.Unlock() defer f.lock.Unlock()
@ -93,7 +93,7 @@ func (f *ResettableFreezer) Reset() error {
} }
// Close terminates the chain freezer, unmapping all the data files. // Close terminates the chain freezer, unmapping all the data files.
func (f *ResettableFreezer) Close() error { func (f *resettableFreezer) Close() error {
f.lock.RLock() f.lock.RLock()
defer f.lock.RUnlock() defer f.lock.RUnlock()
@ -102,7 +102,7 @@ func (f *ResettableFreezer) Close() error {
// HasAncient returns an indicator whether the specified ancient data exists // HasAncient returns an indicator whether the specified ancient data exists
// in the freezer // in the freezer
func (f *ResettableFreezer) HasAncient(kind string, number uint64) (bool, error) { func (f *resettableFreezer) HasAncient(kind string, number uint64) (bool, error) {
f.lock.RLock() f.lock.RLock()
defer f.lock.RUnlock() defer f.lock.RUnlock()
@ -110,7 +110,7 @@ func (f *ResettableFreezer) HasAncient(kind string, number uint64) (bool, error)
} }
// Ancient retrieves an ancient binary blob from the append-only immutable files. // Ancient retrieves an ancient binary blob from the append-only immutable files.
func (f *ResettableFreezer) Ancient(kind string, number uint64) ([]byte, error) { func (f *resettableFreezer) Ancient(kind string, number uint64) ([]byte, error) {
f.lock.RLock() f.lock.RLock()
defer f.lock.RUnlock() defer f.lock.RUnlock()
@ -123,7 +123,7 @@ func (f *ResettableFreezer) Ancient(kind string, number uint64) ([]byte, error)
// - if maxBytes is specified: at least 1 item (even if exceeding the maxByteSize), // - if maxBytes is specified: at least 1 item (even if exceeding the maxByteSize),
// but will otherwise return as many items as fit into maxByteSize. // but will otherwise return as many items as fit into maxByteSize.
// - if maxBytes is not specified, 'count' items will be returned if they are present. // - if maxBytes is not specified, 'count' items will be returned if they are present.
func (f *ResettableFreezer) AncientRange(kind string, start, count, maxBytes uint64) ([][]byte, error) { func (f *resettableFreezer) AncientRange(kind string, start, count, maxBytes uint64) ([][]byte, error) {
f.lock.RLock() f.lock.RLock()
defer f.lock.RUnlock() defer f.lock.RUnlock()
@ -131,7 +131,7 @@ func (f *ResettableFreezer) AncientRange(kind string, start, count, maxBytes uin
} }
// Ancients returns the length of the frozen items. // Ancients returns the length of the frozen items.
func (f *ResettableFreezer) Ancients() (uint64, error) { func (f *resettableFreezer) Ancients() (uint64, error) {
f.lock.RLock() f.lock.RLock()
defer f.lock.RUnlock() defer f.lock.RUnlock()
@ -139,7 +139,7 @@ func (f *ResettableFreezer) Ancients() (uint64, error) {
} }
// Tail returns the number of first stored item in the freezer. // Tail returns the number of first stored item in the freezer.
func (f *ResettableFreezer) Tail() (uint64, error) { func (f *resettableFreezer) Tail() (uint64, error) {
f.lock.RLock() f.lock.RLock()
defer f.lock.RUnlock() defer f.lock.RUnlock()
@ -147,7 +147,7 @@ func (f *ResettableFreezer) Tail() (uint64, error) {
} }
// AncientSize returns the ancient size of the specified category. // AncientSize returns the ancient size of the specified category.
func (f *ResettableFreezer) AncientSize(kind string) (uint64, error) { func (f *resettableFreezer) AncientSize(kind string) (uint64, error) {
f.lock.RLock() f.lock.RLock()
defer f.lock.RUnlock() defer f.lock.RUnlock()
@ -156,7 +156,7 @@ func (f *ResettableFreezer) AncientSize(kind string) (uint64, error) {
// ReadAncients runs the given read operation while ensuring that no writes take place // ReadAncients runs the given read operation while ensuring that no writes take place
// on the underlying freezer. // on the underlying freezer.
func (f *ResettableFreezer) ReadAncients(fn func(ethdb.AncientReaderOp) error) (err error) { func (f *resettableFreezer) ReadAncients(fn func(ethdb.AncientReaderOp) error) (err error) {
f.lock.RLock() f.lock.RLock()
defer f.lock.RUnlock() defer f.lock.RUnlock()
@ -164,7 +164,7 @@ func (f *ResettableFreezer) ReadAncients(fn func(ethdb.AncientReaderOp) error) (
} }
// ModifyAncients runs the given write operation. // ModifyAncients runs the given write operation.
func (f *ResettableFreezer) ModifyAncients(fn func(ethdb.AncientWriteOp) error) (writeSize int64, err error) { func (f *resettableFreezer) ModifyAncients(fn func(ethdb.AncientWriteOp) error) (writeSize int64, err error) {
f.lock.RLock() f.lock.RLock()
defer f.lock.RUnlock() defer f.lock.RUnlock()
@ -173,7 +173,7 @@ func (f *ResettableFreezer) ModifyAncients(fn func(ethdb.AncientWriteOp) error)
// TruncateHead discards any recent data above the provided threshold number. // TruncateHead discards any recent data above the provided threshold number.
// It returns the previous head number. // It returns the previous head number.
func (f *ResettableFreezer) TruncateHead(items uint64) (uint64, error) { func (f *resettableFreezer) TruncateHead(items uint64) (uint64, error) {
f.lock.RLock() f.lock.RLock()
defer f.lock.RUnlock() defer f.lock.RUnlock()
@ -182,7 +182,7 @@ func (f *ResettableFreezer) TruncateHead(items uint64) (uint64, error) {
// TruncateTail discards any recent data below the provided threshold number. // TruncateTail discards any recent data below the provided threshold number.
// It returns the previous value // It returns the previous value
func (f *ResettableFreezer) TruncateTail(tail uint64) (uint64, error) { func (f *resettableFreezer) TruncateTail(tail uint64) (uint64, error) {
f.lock.RLock() f.lock.RLock()
defer f.lock.RUnlock() defer f.lock.RUnlock()
@ -190,7 +190,7 @@ func (f *ResettableFreezer) TruncateTail(tail uint64) (uint64, error) {
} }
// Sync flushes all data tables to disk. // Sync flushes all data tables to disk.
func (f *ResettableFreezer) Sync() error { func (f *resettableFreezer) Sync() error {
f.lock.RLock() f.lock.RLock()
defer f.lock.RUnlock() defer f.lock.RUnlock()
@ -199,7 +199,7 @@ func (f *ResettableFreezer) Sync() error {
// MigrateTable processes the entries in a given table in sequence // MigrateTable processes the entries in a given table in sequence
// converting them to a new format if they're of an old format. // converting them to a new format if they're of an old format.
func (f *ResettableFreezer) MigrateTable(kind string, convert convertLegacyFn) error { func (f *resettableFreezer) MigrateTable(kind string, convert convertLegacyFn) error {
f.lock.RLock() f.lock.RLock()
defer f.lock.RUnlock() defer f.lock.RUnlock()

View File

@ -33,7 +33,7 @@ func TestResetFreezer(t *testing.T) {
{1, bytes.Repeat([]byte{1}, 2048)}, {1, bytes.Repeat([]byte{1}, 2048)},
{2, bytes.Repeat([]byte{2}, 2048)}, {2, bytes.Repeat([]byte{2}, 2048)},
} }
f, _ := NewResettableFreezer(t.TempDir(), "", false, 2048, freezerTestTableDef) f, _ := newResettableFreezer(t.TempDir(), "", false, 2048, freezerTestTableDef)
defer f.Close() defer f.Close()
f.ModifyAncients(func(op ethdb.AncientWriteOp) error { f.ModifyAncients(func(op ethdb.AncientWriteOp) error {
@ -87,7 +87,7 @@ func TestFreezerCleanup(t *testing.T) {
{2, bytes.Repeat([]byte{2}, 2048)}, {2, bytes.Repeat([]byte{2}, 2048)},
} }
datadir := t.TempDir() datadir := t.TempDir()
f, _ := NewResettableFreezer(datadir, "", false, 2048, freezerTestTableDef) f, _ := newResettableFreezer(datadir, "", false, 2048, freezerTestTableDef)
f.ModifyAncients(func(op ethdb.AncientWriteOp) error { f.ModifyAncients(func(op ethdb.AncientWriteOp) error {
for _, item := range items { for _, item := range items {
op.AppendRaw("test", item.id, item.blob) op.AppendRaw("test", item.id, item.blob)
@ -98,7 +98,7 @@ func TestFreezerCleanup(t *testing.T) {
os.Rename(datadir, tmpName(datadir)) os.Rename(datadir, tmpName(datadir))
// Open the freezer again, trigger cleanup operation // Open the freezer again, trigger cleanup operation
f, _ = NewResettableFreezer(datadir, "", false, 2048, freezerTestTableDef) f, _ = newResettableFreezer(datadir, "", false, 2048, freezerTestTableDef)
f.Close() f.Close()
if _, err := os.Lstat(tmpName(datadir)); !os.IsNotExist(err) { if _, err := os.Lstat(tmpName(datadir)); !os.IsNotExist(err) {

View File

@ -27,6 +27,7 @@ import (
"sync" "sync"
"testing" "testing"
"github.com/ethereum/go-ethereum/core/rawdb/ancienttest"
"github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -480,3 +481,22 @@ func TestFreezerCloseSync(t *testing.T) {
t.Fatalf("want %v, have %v", have, want) t.Fatalf("want %v, have %v", have, want)
} }
} }
func TestFreezerSuite(t *testing.T) {
ancienttest.TestAncientSuite(t, func(kinds []string) ethdb.AncientStore {
tables := make(map[string]bool)
for _, kind := range kinds {
tables[kind] = true
}
f, _ := newFreezerForTesting(t, tables)
return f
})
ancienttest.TestResettableAncientSuite(t, func(kinds []string) ethdb.ResettableAncientStore {
tables := make(map[string]bool)
for _, kind := range kinds {
tables[kind] = true
}
f, _ := newResettableFreezer(t.TempDir(), "", false, 2048, tables)
return f
})
}

View File

@ -88,8 +88,8 @@ type AncientReaderOp interface {
// Ancients returns the ancient item numbers in the ancient store. // Ancients returns the ancient item numbers in the ancient store.
Ancients() (uint64, error) Ancients() (uint64, error)
// Tail returns the number of first stored item in the freezer. // Tail returns the number of first stored item in the ancient store.
// This number can also be interpreted as the total deleted item numbers. // This number can also be interpreted as the total deleted items.
Tail() (uint64, error) Tail() (uint64, error)
// AncientSize returns the ancient size of the specified category. // AncientSize returns the ancient size of the specified category.
@ -101,7 +101,7 @@ type AncientReader interface {
AncientReaderOp AncientReaderOp
// ReadAncients runs the given read operation while ensuring that no writes take place // ReadAncients runs the given read operation while ensuring that no writes take place
// on the underlying freezer. // on the underlying ancient store.
ReadAncients(fn func(AncientReaderOp) error) (err error) ReadAncients(fn func(AncientReaderOp) error) (err error)
} }
@ -141,11 +141,15 @@ type AncientWriteOp interface {
AppendRaw(kind string, number uint64, item []byte) error AppendRaw(kind string, number uint64, item []byte) error
} }
// AncientStater wraps the Stat method of a backing data store. // AncientStater wraps the Stat method of a backing ancient store.
type AncientStater interface { type AncientStater interface {
// AncientDatadir returns the path of root ancient directory. Empty string // AncientDatadir returns the path of the ancient store directory.
// will be returned if ancient store is not enabled at all. The returned //
// path can be used to construct the path of other freezers. // If the ancient store is not activated, an error is returned.
// If an ephemeral ancient store is used, an empty path is returned.
//
// The path returned by AncientDatadir can be used as the root path
// of the ancient store to construct paths for other sub ancient stores.
AncientDatadir() (string, error) AncientDatadir() (string, error)
} }
@ -171,15 +175,23 @@ type Stater interface {
} }
// AncientStore contains all the methods required to allow handling different // AncientStore contains all the methods required to allow handling different
// ancient data stores backing immutable chain data store. // ancient data stores backing immutable data store.
type AncientStore interface { type AncientStore interface {
AncientReader AncientReader
AncientWriter AncientWriter
io.Closer io.Closer
} }
// ResettableAncientStore extends the AncientStore interface by adding a Reset method.
type ResettableAncientStore interface {
AncientStore
// Reset is designed to reset the entire ancient store to its default state.
Reset() error
}
// Database contains all the methods required by the high level database to not // Database contains all the methods required by the high level database to not
// only access the key-value data store but also the chain freezer. // only access the key-value data store but also the ancient chain store.
type Database interface { type Database interface {
Reader Reader
Writer Writer

View File

@ -34,6 +34,7 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/ethdb/memorydb"
"github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p"
@ -752,7 +753,7 @@ func (n *Node) OpenDatabaseWithFreezer(name string, cache, handles int, ancient
var db ethdb.Database var db ethdb.Database
var err error var err error
if n.config.DataDir == "" { if n.config.DataDir == "" {
db = rawdb.NewMemoryDatabase() db, err = rawdb.NewDatabaseWithFreezer(memorydb.New(), "", namespace, readonly)
} else { } else {
db, err = rawdb.Open(rawdb.OpenOptions{ db, err = rawdb.Open(rawdb.OpenOptions{
Type: n.config.DBEngine, Type: n.config.DBEngine,

View File

@ -138,7 +138,7 @@ type Database struct {
config *Config // Configuration for database config *Config // Configuration for database
diskdb ethdb.Database // Persistent storage for matured trie nodes diskdb ethdb.Database // Persistent storage for matured trie nodes
tree *layerTree // The group for all known layers tree *layerTree // The group for all known layers
freezer *rawdb.ResettableFreezer // Freezer for storing trie histories, nil possible in tests freezer ethdb.ResettableAncientStore // Freezer for storing trie histories, nil possible in tests
lock sync.RWMutex // Lock to prevent mutations from happening at the same time lock sync.RWMutex // Lock to prevent mutations from happening at the same time
} }
@ -162,23 +162,44 @@ func New(diskdb ethdb.Database, config *Config, isVerkle bool) *Database {
// and in-memory layer journal. // and in-memory layer journal.
db.tree = newLayerTree(db.loadLayers()) db.tree = newLayerTree(db.loadLayers())
// Open the freezer for state history if the passed database contains an // Repair the state history, which might not be aligned with the state
// ancient store. Otherwise, all the relevant functionalities are disabled. // in the key-value store due to an unclean shutdown.
// if err := db.repairHistory(); err != nil {
// Because the freezer can only be opened once at the same time, this log.Crit("Failed to repair pathdb", "err", err)
// mechanism also ensures that at most one **non-readOnly** database }
// is opened at the same time to prevent accidental mutation. // Disable database in case node is still in the initial state sync stage.
if ancient, err := diskdb.AncientDatadir(); err == nil && ancient != "" && !db.readOnly { if rawdb.ReadSnapSyncStatusFlag(diskdb) == rawdb.StateSyncRunning && !db.readOnly {
if err := db.Disable(); err != nil {
log.Crit("Failed to disable database", "err", err) // impossible to happen
}
}
return db
}
// repairHistory truncates leftover state history objects, which may occur due
// to an unclean shutdown or other unexpected reasons.
func (db *Database) repairHistory() error {
// Open the freezer for state history. This mechanism ensures that
// only one database instance can be opened at a time to prevent
// accidental mutation.
ancient, err := db.diskdb.AncientDatadir()
if err != nil {
// TODO error out if ancient store is disabled. A tons of unit tests
// disable the ancient store thus the error here will immediately fail
// all of them. Fix the tests first.
return nil
}
freezer, err := rawdb.NewStateFreezer(ancient, false) freezer, err := rawdb.NewStateFreezer(ancient, false)
if err != nil { if err != nil {
log.Crit("Failed to open state history freezer", "err", err) log.Crit("Failed to open state history freezer", "err", err)
} }
db.freezer = freezer db.freezer = freezer
diskLayerID := db.tree.bottom().stateID() // Reset the entire state histories if the trie database is not initialized
if diskLayerID == 0 { // yet. This action is necessary because these state histories are not
// Reset the entire state histories in case the trie database is // expected to exist without an initialized trie database.
// not initialized yet, as these state histories are not expected. id := db.tree.bottom().stateID()
if id == 0 {
frozen, err := db.freezer.Ancients() frozen, err := db.freezer.Ancients()
if err != nil { if err != nil {
log.Crit("Failed to retrieve head of state history", "err", err) log.Crit("Failed to retrieve head of state history", "err", err)
@ -190,25 +211,18 @@ func New(diskdb ethdb.Database, config *Config, isVerkle bool) *Database {
} }
log.Info("Truncated extraneous state history") log.Info("Truncated extraneous state history")
} }
} else { return nil
// Truncate the extra state histories above in freezer in case }
// it's not aligned with the disk layer. // Truncate the extra state histories above in freezer in case it's not
pruned, err := truncateFromHead(db.diskdb, freezer, diskLayerID) // aligned with the disk layer. It might happen after a unclean shutdown.
pruned, err := truncateFromHead(db.diskdb, db.freezer, id)
if err != nil { if err != nil {
log.Crit("Failed to truncate extra state histories", "err", err) log.Crit("Failed to truncate extra state histories", "err", err)
} }
if pruned != 0 { if pruned != 0 {
log.Warn("Truncated extra state histories", "number", pruned) log.Warn("Truncated extra state histories", "number", pruned)
} }
} return nil
}
// Disable database in case node is still in the initial state sync stage.
if rawdb.ReadSnapSyncStatusFlag(diskdb) == rawdb.StateSyncRunning && !db.readOnly {
if err := db.Disable(); err != nil {
log.Crit("Failed to disable database", "err", err) // impossible to happen
}
}
return db
} }
// Update adds a new layer into the tree, if that can be linked to an existing // Update adds a new layer into the tree, if that can be linked to an existing

View File

@ -472,8 +472,8 @@ func (h *history) decode(accountData, storageData, accountIndexes, storageIndexe
} }
// readHistory reads and decodes the state history object by the given id. // readHistory reads and decodes the state history object by the given id.
func readHistory(freezer *rawdb.ResettableFreezer, id uint64) (*history, error) { func readHistory(reader ethdb.AncientReader, id uint64) (*history, error) {
blob := rawdb.ReadStateHistoryMeta(freezer, id) blob := rawdb.ReadStateHistoryMeta(reader, id)
if len(blob) == 0 { if len(blob) == 0 {
return nil, fmt.Errorf("state history not found %d", id) return nil, fmt.Errorf("state history not found %d", id)
} }
@ -483,10 +483,10 @@ func readHistory(freezer *rawdb.ResettableFreezer, id uint64) (*history, error)
} }
var ( var (
dec = history{meta: &m} dec = history{meta: &m}
accountData = rawdb.ReadStateAccountHistory(freezer, id) accountData = rawdb.ReadStateAccountHistory(reader, id)
storageData = rawdb.ReadStateStorageHistory(freezer, id) storageData = rawdb.ReadStateStorageHistory(reader, id)
accountIndexes = rawdb.ReadStateAccountIndex(freezer, id) accountIndexes = rawdb.ReadStateAccountIndex(reader, id)
storageIndexes = rawdb.ReadStateStorageIndex(freezer, id) storageIndexes = rawdb.ReadStateStorageIndex(reader, id)
) )
if err := dec.decode(accountData, storageData, accountIndexes, storageIndexes); err != nil { if err := dec.decode(accountData, storageData, accountIndexes, storageIndexes); err != nil {
return nil, err return nil, err
@ -495,7 +495,7 @@ func readHistory(freezer *rawdb.ResettableFreezer, id uint64) (*history, error)
} }
// writeHistory persists the state history with the provided state set. // writeHistory persists the state history with the provided state set.
func writeHistory(freezer *rawdb.ResettableFreezer, dl *diffLayer) error { func writeHistory(writer ethdb.AncientWriter, dl *diffLayer) error {
// Short circuit if state set is not available. // Short circuit if state set is not available.
if dl.states == nil { if dl.states == nil {
return errors.New("state change set is not available") return errors.New("state change set is not available")
@ -509,7 +509,7 @@ func writeHistory(freezer *rawdb.ResettableFreezer, dl *diffLayer) error {
indexSize := common.StorageSize(len(accountIndex) + len(storageIndex)) indexSize := common.StorageSize(len(accountIndex) + len(storageIndex))
// Write history data into five freezer table respectively. // Write history data into five freezer table respectively.
rawdb.WriteStateHistory(freezer, dl.stateID(), history.meta.encode(), accountIndex, storageIndex, accountData, storageData) rawdb.WriteStateHistory(writer, dl.stateID(), history.meta.encode(), accountIndex, storageIndex, accountData, storageData)
historyDataBytesMeter.Mark(int64(dataSize)) historyDataBytesMeter.Mark(int64(dataSize))
historyIndexBytesMeter.Mark(int64(indexSize)) historyIndexBytesMeter.Mark(int64(indexSize))
@ -521,13 +521,13 @@ func writeHistory(freezer *rawdb.ResettableFreezer, dl *diffLayer) error {
// checkHistories retrieves a batch of meta objects with the specified range // checkHistories retrieves a batch of meta objects with the specified range
// and performs the callback on each item. // and performs the callback on each item.
func checkHistories(freezer *rawdb.ResettableFreezer, start, count uint64, check func(*meta) error) error { func checkHistories(reader ethdb.AncientReader, start, count uint64, check func(*meta) error) error {
for count > 0 { for count > 0 {
number := count number := count
if number > 10000 { if number > 10000 {
number = 10000 // split the big read into small chunks number = 10000 // split the big read into small chunks
} }
blobs, err := rawdb.ReadStateHistoryMetaList(freezer, start, number) blobs, err := rawdb.ReadStateHistoryMetaList(reader, start, number)
if err != nil { if err != nil {
return err return err
} }
@ -548,12 +548,12 @@ func checkHistories(freezer *rawdb.ResettableFreezer, start, count uint64, check
// truncateFromHead removes the extra state histories from the head with the given // truncateFromHead removes the extra state histories from the head with the given
// parameters. It returns the number of items removed from the head. // parameters. It returns the number of items removed from the head.
func truncateFromHead(db ethdb.Batcher, freezer *rawdb.ResettableFreezer, nhead uint64) (int, error) { func truncateFromHead(db ethdb.Batcher, store ethdb.AncientStore, nhead uint64) (int, error) {
ohead, err := freezer.Ancients() ohead, err := store.Ancients()
if err != nil { if err != nil {
return 0, err return 0, err
} }
otail, err := freezer.Tail() otail, err := store.Tail()
if err != nil { if err != nil {
return 0, err return 0, err
} }
@ -566,7 +566,7 @@ func truncateFromHead(db ethdb.Batcher, freezer *rawdb.ResettableFreezer, nhead
return 0, nil return 0, nil
} }
// Load the meta objects in range [nhead+1, ohead] // Load the meta objects in range [nhead+1, ohead]
blobs, err := rawdb.ReadStateHistoryMetaList(freezer, nhead+1, ohead-nhead) blobs, err := rawdb.ReadStateHistoryMetaList(store, nhead+1, ohead-nhead)
if err != nil { if err != nil {
return 0, err return 0, err
} }
@ -581,7 +581,7 @@ func truncateFromHead(db ethdb.Batcher, freezer *rawdb.ResettableFreezer, nhead
if err := batch.Write(); err != nil { if err := batch.Write(); err != nil {
return 0, err return 0, err
} }
ohead, err = freezer.TruncateHead(nhead) ohead, err = store.TruncateHead(nhead)
if err != nil { if err != nil {
return 0, err return 0, err
} }
@ -590,12 +590,12 @@ func truncateFromHead(db ethdb.Batcher, freezer *rawdb.ResettableFreezer, nhead
// truncateFromTail removes the extra state histories from the tail with the given // truncateFromTail removes the extra state histories from the tail with the given
// parameters. It returns the number of items removed from the tail. // parameters. It returns the number of items removed from the tail.
func truncateFromTail(db ethdb.Batcher, freezer *rawdb.ResettableFreezer, ntail uint64) (int, error) { func truncateFromTail(db ethdb.Batcher, store ethdb.AncientStore, ntail uint64) (int, error) {
ohead, err := freezer.Ancients() ohead, err := store.Ancients()
if err != nil { if err != nil {
return 0, err return 0, err
} }
otail, err := freezer.Tail() otail, err := store.Tail()
if err != nil { if err != nil {
return 0, err return 0, err
} }
@ -608,7 +608,7 @@ func truncateFromTail(db ethdb.Batcher, freezer *rawdb.ResettableFreezer, ntail
return 0, nil return 0, nil
} }
// Load the meta objects in range [otail+1, ntail] // Load the meta objects in range [otail+1, ntail]
blobs, err := rawdb.ReadStateHistoryMetaList(freezer, otail+1, ntail-otail) blobs, err := rawdb.ReadStateHistoryMetaList(store, otail+1, ntail-otail)
if err != nil { if err != nil {
return 0, err return 0, err
} }
@ -623,7 +623,7 @@ func truncateFromTail(db ethdb.Batcher, freezer *rawdb.ResettableFreezer, ntail
if err := batch.Write(); err != nil { if err := batch.Write(); err != nil {
return 0, err return 0, err
} }
otail, err = freezer.TruncateTail(ntail) otail, err = store.TruncateTail(ntail)
if err != nil { if err != nil {
return 0, err return 0, err
} }

View File

@ -21,7 +21,7 @@ import (
"time" "time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
) )
@ -34,7 +34,7 @@ type HistoryStats struct {
} }
// sanitizeRange limits the given range to fit within the local history store. // sanitizeRange limits the given range to fit within the local history store.
func sanitizeRange(start, end uint64, freezer *rawdb.ResettableFreezer) (uint64, uint64, error) { func sanitizeRange(start, end uint64, freezer ethdb.AncientReader) (uint64, uint64, error) {
// Load the id of the first history object in local store. // Load the id of the first history object in local store.
tail, err := freezer.Tail() tail, err := freezer.Tail()
if err != nil { if err != nil {
@ -60,7 +60,7 @@ func sanitizeRange(start, end uint64, freezer *rawdb.ResettableFreezer) (uint64,
return first, last, nil return first, last, nil
} }
func inspectHistory(freezer *rawdb.ResettableFreezer, start, end uint64, onHistory func(*history, *HistoryStats)) (*HistoryStats, error) { func inspectHistory(freezer ethdb.AncientReader, start, end uint64, onHistory func(*history, *HistoryStats)) (*HistoryStats, error) {
var ( var (
stats = &HistoryStats{} stats = &HistoryStats{}
init = time.Now() init = time.Now()
@ -96,7 +96,7 @@ func inspectHistory(freezer *rawdb.ResettableFreezer, start, end uint64, onHisto
} }
// accountHistory inspects the account history within the range. // accountHistory inspects the account history within the range.
func accountHistory(freezer *rawdb.ResettableFreezer, address common.Address, start, end uint64) (*HistoryStats, error) { func accountHistory(freezer ethdb.AncientReader, address common.Address, start, end uint64) (*HistoryStats, error) {
return inspectHistory(freezer, start, end, func(h *history, stats *HistoryStats) { return inspectHistory(freezer, start, end, func(h *history, stats *HistoryStats) {
blob, exists := h.accounts[address] blob, exists := h.accounts[address]
if !exists { if !exists {
@ -108,7 +108,7 @@ func accountHistory(freezer *rawdb.ResettableFreezer, address common.Address, st
} }
// storageHistory inspects the storage history within the range. // storageHistory inspects the storage history within the range.
func storageHistory(freezer *rawdb.ResettableFreezer, address common.Address, slot common.Hash, start uint64, end uint64) (*HistoryStats, error) { func storageHistory(freezer ethdb.AncientReader, address common.Address, slot common.Hash, start uint64, end uint64) (*HistoryStats, error) {
return inspectHistory(freezer, start, end, func(h *history, stats *HistoryStats) { return inspectHistory(freezer, start, end, func(h *history, stats *HistoryStats) {
slots, exists := h.storages[address] slots, exists := h.storages[address]
if !exists { if !exists {
@ -124,7 +124,7 @@ func storageHistory(freezer *rawdb.ResettableFreezer, address common.Address, sl
} }
// historyRange returns the block number range of local state histories. // historyRange returns the block number range of local state histories.
func historyRange(freezer *rawdb.ResettableFreezer) (uint64, uint64, error) { func historyRange(freezer ethdb.AncientReader) (uint64, uint64, error) {
// Load the id of the first history object in local store. // Load the id of the first history object in local store.
tail, err := freezer.Tail() tail, err := freezer.Tail()
if err != nil { if err != nil {

View File

@ -102,7 +102,7 @@ func TestEncodeDecodeHistory(t *testing.T) {
} }
} }
func checkHistory(t *testing.T, db ethdb.KeyValueReader, freezer *rawdb.ResettableFreezer, id uint64, root common.Hash, exist bool) { func checkHistory(t *testing.T, db ethdb.KeyValueReader, freezer ethdb.AncientReader, id uint64, root common.Hash, exist bool) {
blob := rawdb.ReadStateHistoryMeta(freezer, id) blob := rawdb.ReadStateHistoryMeta(freezer, id)
if exist && len(blob) == 0 { if exist && len(blob) == 0 {
t.Fatalf("Failed to load trie history, %d", id) t.Fatalf("Failed to load trie history, %d", id)
@ -118,7 +118,7 @@ func checkHistory(t *testing.T, db ethdb.KeyValueReader, freezer *rawdb.Resettab
} }
} }
func checkHistoriesInRange(t *testing.T, db ethdb.KeyValueReader, freezer *rawdb.ResettableFreezer, from, to uint64, roots []common.Hash, exist bool) { func checkHistoriesInRange(t *testing.T, db ethdb.KeyValueReader, freezer ethdb.AncientReader, from, to uint64, roots []common.Hash, exist bool) {
for i, j := from, 0; i <= to; i, j = i+1, j+1 { for i, j := from, 0; i <= to; i, j = i+1, j+1 {
checkHistory(t, db, freezer, i, roots[j], exist) checkHistory(t, db, freezer, i, roots[j], exist)
} }
@ -129,7 +129,7 @@ func TestTruncateHeadHistory(t *testing.T) {
roots []common.Hash roots []common.Hash
hs = makeHistories(10) hs = makeHistories(10)
db = rawdb.NewMemoryDatabase() db = rawdb.NewMemoryDatabase()
freezer, _ = openFreezer(t.TempDir(), false) freezer, _ = rawdb.NewStateFreezer(t.TempDir(), false)
) )
defer freezer.Close() defer freezer.Close()
@ -157,7 +157,7 @@ func TestTruncateTailHistory(t *testing.T) {
roots []common.Hash roots []common.Hash
hs = makeHistories(10) hs = makeHistories(10)
db = rawdb.NewMemoryDatabase() db = rawdb.NewMemoryDatabase()
freezer, _ = openFreezer(t.TempDir(), false) freezer, _ = rawdb.NewStateFreezer(t.TempDir(), false)
) )
defer freezer.Close() defer freezer.Close()
@ -200,7 +200,7 @@ func TestTruncateTailHistories(t *testing.T) {
roots []common.Hash roots []common.Hash
hs = makeHistories(10) hs = makeHistories(10)
db = rawdb.NewMemoryDatabase() db = rawdb.NewMemoryDatabase()
freezer, _ = openFreezer(t.TempDir()+fmt.Sprintf("%d", i), false) freezer, _ = rawdb.NewStateFreezer(t.TempDir()+fmt.Sprintf("%d", i), false)
) )
defer freezer.Close() defer freezer.Close()
@ -228,7 +228,7 @@ func TestTruncateOutOfRange(t *testing.T) {
var ( var (
hs = makeHistories(10) hs = makeHistories(10)
db = rawdb.NewMemoryDatabase() db = rawdb.NewMemoryDatabase()
freezer, _ = openFreezer(t.TempDir(), false) freezer, _ = rawdb.NewStateFreezer(t.TempDir(), false)
) )
defer freezer.Close() defer freezer.Close()
@ -268,11 +268,6 @@ func TestTruncateOutOfRange(t *testing.T) {
} }
} }
// openFreezer initializes the freezer instance for storing state histories.
func openFreezer(datadir string, readOnly bool) (*rawdb.ResettableFreezer, error) {
return rawdb.NewStateFreezer(datadir, readOnly)
}
func compareSet[k comparable](a, b map[k][]byte) bool { func compareSet[k comparable](a, b map[k][]byte) bool {
if len(a) != len(b) { if len(a) != len(b) {
return false return false