From fd39f722a3ff506aba9a993bb10ef176f8d654d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Mon, 2 Dec 2019 13:27:20 +0200 Subject: [PATCH] core: journal the snapshot inside leveldb, not a flat file --- core/blockchain.go | 4 +- core/rawdb/accessors_snapshot.go | 23 +++++++++ core/rawdb/schema.go | 5 +- core/state/snapshot/difflayer_test.go | 5 +- core/state/snapshot/journal.go | 70 +++++++++++---------------- core/state/snapshot/snapshot.go | 22 +++++---- core/state/statedb.go | 4 +- 7 files changed, 72 insertions(+), 61 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index 3932baf557..f868f7301f 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -302,7 +302,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par } } // Load any existing snapshot, regenerating it if loading failed - bc.snaps = snapshot.New(bc.db, bc.stateCache.TrieDB(), "snapshot.rlp", bc.cacheConfig.SnapshotLimit, bc.CurrentBlock().Root()) + bc.snaps = snapshot.New(bc.db, bc.stateCache.TrieDB(), bc.cacheConfig.SnapshotLimit, bc.CurrentBlock().Root()) // Take ownership of this particular state go bc.update() @@ -854,7 +854,7 @@ func (bc *BlockChain) Stop() { bc.wg.Wait() // Ensure that the entirety of the state snapshot is journalled to disk. - snapBase, err := bc.snaps.Journal(bc.CurrentBlock().Root(), "snapshot.rlp") + snapBase, err := bc.snaps.Journal(bc.CurrentBlock().Root()) if err != nil { log.Error("Failed to journal state snapshot", "err", err) } diff --git a/core/rawdb/accessors_snapshot.go b/core/rawdb/accessors_snapshot.go index 9388e857b7..3a8d6c779e 100644 --- a/core/rawdb/accessors_snapshot.go +++ b/core/rawdb/accessors_snapshot.go @@ -95,3 +95,26 @@ func DeleteStorageSnapshot(db ethdb.KeyValueWriter, accountHash, storageHash com func IterateStorageSnapshots(db ethdb.Iteratee, accountHash common.Hash) ethdb.Iterator { return db.NewIteratorWithPrefix(storageSnapshotsKey(accountHash)) } + +// ReadSnapshotJournal retrieves the serialized in-memory diff layers saved at +// the last shutdown. The blob is expected to be max a few 10s of megabytes. +func ReadSnapshotJournal(db ethdb.KeyValueReader) []byte { + data, _ := db.Get(snapshotJournalKey) + return data +} + +// WriteSnapshotJournal stores the serialized in-memory diff layers to save at +// shutdown. The blob is expected to be max a few 10s of megabytes. +func WriteSnapshotJournal(db ethdb.KeyValueWriter, journal []byte) { + if err := db.Put(snapshotJournalKey, journal); err != nil { + log.Crit("Failed to store snapshot journal", "err", err) + } +} + +// DeleteSnapshotJournal deletes the serialized in-memory diff layers saved at +// the last shutdown +func DeleteSnapshotJournal(db ethdb.KeyValueWriter) { + if err := db.Delete(snapshotJournalKey); err != nil { + log.Crit("Failed to remove snapshot journal", "err", err) + } +} diff --git a/core/rawdb/schema.go b/core/rawdb/schema.go index 1b8e53eb61..dc8faca32d 100644 --- a/core/rawdb/schema.go +++ b/core/rawdb/schema.go @@ -41,9 +41,12 @@ var ( // fastTrieProgressKey tracks the number of trie entries imported during fast sync. fastTrieProgressKey = []byte("TrieSync") - // snapshotRootKey tracks the number and hash of the last snapshot. + // snapshotRootKey tracks the hash of the last snapshot. snapshotRootKey = []byte("SnapshotRoot") + // snapshotJournalKey tracks the in-memory diff layers across restarts. + snapshotJournalKey = []byte("SnapshotJournal") + // Data item prefixes (use single byte to avoid mixing data types, avoid `i`, used for indexes). headerPrefix = []byte("h") // headerPrefix + num (uint64 big endian) + hash -> header headerTDSuffix = []byte("t") // headerPrefix + num (uint64 big endian) + hash + headerTDSuffix -> td diff --git a/core/state/snapshot/difflayer_test.go b/core/state/snapshot/difflayer_test.go index 9029bb04b8..84220e359b 100644 --- a/core/state/snapshot/difflayer_test.go +++ b/core/state/snapshot/difflayer_test.go @@ -20,8 +20,6 @@ import ( "bytes" "math/big" "math/rand" - "os" - "path" "testing" "github.com/VictoriaMetrics/fastcache" @@ -343,7 +341,6 @@ func BenchmarkJournal(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - f, _, _ := layer.Journal(path.Join(os.TempDir(), "difflayer_journal.tmp")) - f.Close() + layer.Journal(new(bytes.Buffer)) } } diff --git a/core/state/snapshot/journal.go b/core/state/snapshot/journal.go index 1c6c63a0b9..1c36e06233 100644 --- a/core/state/snapshot/journal.go +++ b/core/state/snapshot/journal.go @@ -17,12 +17,11 @@ package snapshot import ( - "bufio" + "bytes" "encoding/binary" "errors" "fmt" "io" - "os" "time" "github.com/VictoriaMetrics/fastcache" @@ -58,7 +57,7 @@ type journalStorage struct { } // loadSnapshot loads a pre-existing state snapshot backed by a key-value store. -func loadSnapshot(diskdb ethdb.KeyValueStore, triedb *trie.Database, journal string, cache int, root common.Hash) (snapshot, error) { +func loadSnapshot(diskdb ethdb.KeyValueStore, triedb *trie.Database, cache int, root common.Hash) (snapshot, error) { // Retrieve the block number and hash of the snapshot, failing if no snapshot // is present in the database (or crashed mid-update). baseRoot := rawdb.ReadSnapshotRoot(diskdb) @@ -71,13 +70,13 @@ func loadSnapshot(diskdb ethdb.KeyValueStore, triedb *trie.Database, journal str cache: fastcache.New(cache * 1024 * 1024), root: baseRoot, } - // Open the journal, it must exist since even for 0 layer it stores whether + // Retrieve the journal, it must exist since even for 0 layer it stores whether // we've already generated the snapshot or are in progress only - file, err := os.Open(journal) - if err != nil { - return nil, err + journal := rawdb.ReadSnapshotJournal(diskdb) + if len(journal) == 0 { + return nil, errors.New("missing or corrupted snapshot journal") } - r := rlp.NewStream(file, 0) + r := rlp.NewStream(bytes.NewReader(journal), 0) // Read the snapshot generation progress for the disk layer var generator journalGenerator @@ -162,9 +161,9 @@ func loadDiffLayer(parent snapshot, r *rlp.Stream) (snapshot, error) { return loadDiffLayer(newDiffLayer(parent, root, accountData, storageData), r) } -// Journal is the internal version of Journal that also returns the journal file -// so subsequent layers know where to write to. -func (dl *diskLayer) Journal(path string) (io.WriteCloser, common.Hash, error) { +// Journal writes the persistent layer generator stats into a buffer to be stored +// in the database as the snapshot journal. +func (dl *diskLayer) Journal(buffer *bytes.Buffer) (common.Hash, error) { // If the snapshot is currenty being generated, abort it var stats *generatorStats if dl.genAbort != nil { @@ -180,12 +179,7 @@ func (dl *diskLayer) Journal(path string) (io.WriteCloser, common.Hash, error) { defer dl.lock.RUnlock() if dl.stale { - return nil, common.Hash{}, ErrSnapshotStale - } - // We've reached the bottom, open the journal - file, err := os.Create(path) - if err != nil { - return nil, common.Hash{}, err + return common.Hash{}, ErrSnapshotStale } // Write out the generator marker entry := journalGenerator{ @@ -198,44 +192,37 @@ func (dl *diskLayer) Journal(path string) (io.WriteCloser, common.Hash, error) { entry.Slots = stats.slots entry.Storage = uint64(stats.storage) } - if err := rlp.Encode(file, entry); err != nil { - file.Close() - return nil, common.Hash{}, err + if err := rlp.Encode(buffer, entry); err != nil { + return common.Hash{}, err } - return file, dl.root, nil + return dl.root, nil } -// Journal is the internal version of Journal that also returns the journal file -// so subsequent layers know where to write to. -func (dl *diffLayer) Journal(path string) (io.WriteCloser, common.Hash, error) { +// Journal writes the memory layer contents into a buffer to be stored in the +// database as the snapshot journal. +func (dl *diffLayer) Journal(buffer *bytes.Buffer) (common.Hash, error) { // Journal the parent first - writer, base, err := dl.parent.Journal(path) + base, err := dl.parent.Journal(buffer) if err != nil { - return nil, common.Hash{}, err + return common.Hash{}, err } // Ensure the layer didn't get stale dl.lock.RLock() defer dl.lock.RUnlock() if dl.stale { - writer.Close() - return nil, common.Hash{}, ErrSnapshotStale + return common.Hash{}, ErrSnapshotStale } // Everything below was journalled, persist this layer too - buf := bufio.NewWriter(writer) - if err := rlp.Encode(buf, dl.root); err != nil { - buf.Flush() - writer.Close() - return nil, common.Hash{}, err + if err := rlp.Encode(buffer, dl.root); err != nil { + return common.Hash{}, err } accounts := make([]journalAccount, 0, len(dl.accountData)) for hash, blob := range dl.accountData { accounts = append(accounts, journalAccount{Hash: hash, Blob: blob}) } - if err := rlp.Encode(buf, accounts); err != nil { - buf.Flush() - writer.Close() - return nil, common.Hash{}, err + if err := rlp.Encode(buffer, accounts); err != nil { + return common.Hash{}, err } storage := make([]journalStorage, 0, len(dl.storageData)) for hash, slots := range dl.storageData { @@ -247,11 +234,8 @@ func (dl *diffLayer) Journal(path string) (io.WriteCloser, common.Hash, error) { } storage = append(storage, journalStorage{Hash: hash, Keys: keys, Vals: vals}) } - if err := rlp.Encode(buf, storage); err != nil { - buf.Flush() - writer.Close() - return nil, common.Hash{}, err + if err := rlp.Encode(buffer, storage); err != nil { + return common.Hash{}, err } - buf.Flush() - return writer, base, nil + return base, nil } diff --git a/core/state/snapshot/snapshot.go b/core/state/snapshot/snapshot.go index 744d56c1bc..749f610789 100644 --- a/core/state/snapshot/snapshot.go +++ b/core/state/snapshot/snapshot.go @@ -21,7 +21,6 @@ import ( "bytes" "errors" "fmt" - "io" "sync" "github.com/ethereum/go-ethereum/common" @@ -112,10 +111,10 @@ type snapshot interface { // copying everything. Update(blockRoot common.Hash, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte) *diffLayer - // Journal commits an entire diff hierarchy to disk into a single journal file. + // Journal commits an entire diff hierarchy to disk into a single journal entry. // This is meant to be used during shutdown to persist the snapshot without // flattening everything down (bad for reorgs). - Journal(path string) (io.WriteCloser, common.Hash, error) + Journal(buffer *bytes.Buffer) (common.Hash, error) // Stale return whether this layer has become stale (was flattened across) or // if it's still live. @@ -146,7 +145,7 @@ type Tree struct { // If the snapshot is missing or inconsistent, the entirety is deleted and will // be reconstructed from scratch based on the tries in the key-value store, on a // background thread. -func New(diskdb ethdb.KeyValueStore, triedb *trie.Database, journal string, cache int, root common.Hash) *Tree { +func New(diskdb ethdb.KeyValueStore, triedb *trie.Database, cache int, root common.Hash) *Tree { // Create a new, empty snapshot tree snap := &Tree{ diskdb: diskdb, @@ -155,7 +154,7 @@ func New(diskdb ethdb.KeyValueStore, triedb *trie.Database, journal string, cach layers: make(map[common.Hash]snapshot), } // Attempt to load a previously persisted snapshot and rebuild one if failed - head, err := loadSnapshot(diskdb, triedb, journal, cache, root) + head, err := loadSnapshot(diskdb, triedb, cache, root) if err != nil { log.Warn("Failed to load snapshot, regenerating", "err", err) snap.Rebuild(root) @@ -401,6 +400,7 @@ func diffToDisk(bottom *diffLayer) *diskLayer { // Account was updated, push to disk rawdb.WriteAccountSnapshot(batch, hash, data) base.cache.Set(hash[:], data) + snapshotCleanAccountWriteMeter.Mark(int64(len(data))) if batch.ValueSize() > ethdb.IdealBatchSize { if err := batch.Write(); err != nil { @@ -445,6 +445,7 @@ func diffToDisk(bottom *diffLayer) *diskLayer { if len(data) > 0 { rawdb.WriteStorageSnapshot(batch, accountHash, storageHash, data) base.cache.Set(append(accountHash[:], storageHash[:]...), data) + snapshotCleanStorageWriteMeter.Mark(int64(len(data))) } else { rawdb.DeleteStorageSnapshot(batch, accountHash, storageHash) base.cache.Set(append(accountHash[:], storageHash[:]...), nil) @@ -484,13 +485,13 @@ func diffToDisk(bottom *diffLayer) *diskLayer { return res } -// Journal commits an entire diff hierarchy to disk into a single journal file. +// Journal commits an entire diff hierarchy to disk into a single journal entry. // This is meant to be used during shutdown to persist the snapshot without // flattening everything down (bad for reorgs). // // The method returns the root hash of the base layer that needs to be persisted // to disk as a trie too to allow continuing any pending generation op. -func (t *Tree) Journal(root common.Hash, path string) (common.Hash, error) { +func (t *Tree) Journal(root common.Hash) (common.Hash, error) { // Retrieve the head snapshot to journal from var snap snapshot snap := t.Snapshot(root) if snap == nil { @@ -500,11 +501,14 @@ func (t *Tree) Journal(root common.Hash, path string) (common.Hash, error) { t.lock.Lock() defer t.lock.Unlock() - writer, base, err := snap.(snapshot).Journal(path) + journal := new(bytes.Buffer) + base, err := snap.(snapshot).Journal(journal) if err != nil { return common.Hash{}, err } - return base, writer.Close() + // Store the journal into the database and return + rawdb.WriteSnapshotJournal(t.diskdb, journal.Bytes()) + return base, nil } // Rebuild wipes all available snapshot data from the persistent database and diff --git a/core/state/statedb.go b/core/state/statedb.go index 1528b45aa3..b3ea95a464 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -845,8 +845,8 @@ func (s *StateDB) Commit(deleteEmptyObjects bool) (common.Hash, error) { if err := s.snaps.Update(root, parent, s.snapAccounts, s.snapStorage); err != nil { log.Warn("Failed to update snapshot tree", "from", parent, "to", root, "err", err) } - if err := s.snaps.Cap(root, 128); err != nil { - log.Warn("Failed to cap snapshot tree", "root", root, "layers", 128, "err", err) + if err := s.snaps.Cap(root, 127); err != nil { // Persistent layer is 128th, the last available trie + log.Warn("Failed to cap snapshot tree", "root", root, "layers", 127, "err", err) } } s.snap, s.snapAccounts, s.snapStorage = nil, nil, nil