diff --git a/core/state/snapshot/generate_test.go b/core/state/snapshot/generate_test.go index 661610840a..436fa4c593 100644 --- a/core/state/snapshot/generate_test.go +++ b/core/state/snapshot/generate_test.go @@ -240,6 +240,16 @@ func (t *testHelper) Commit() common.Hash { } t.triedb.Update(root, types.EmptyRootHash, 0, t.nodes, t.states) t.triedb.Commit(root, false) + + // re-open the trie database to ensure the frozen buffer + // is not referenced + config := &triedb.Config{} + if t.triedb.Scheme() == rawdb.PathScheme { + config.PathDB = &pathdb.Config{} // disable caching + } else { + config.HashDB = &hashdb.Config{} // disable caching + } + t.triedb = triedb.NewDatabase(t.triedb.Disk(), config) return root } diff --git a/core/state/statedb_test.go b/core/state/statedb_test.go index 57886e6e03..ee3d2b718d 100644 --- a/core/state/statedb_test.go +++ b/core/state/statedb_test.go @@ -978,19 +978,22 @@ func TestMissingTrieNodes(t *testing.T) { func testMissingTrieNodes(t *testing.T, scheme string) { // Create an initial state with a few accounts var ( - tdb *triedb.Database - memDb = rawdb.NewMemoryDatabase() + tdb *triedb.Database + memDb = rawdb.NewMemoryDatabase() + openDb = func() *triedb.Database { + if scheme == rawdb.PathScheme { + return triedb.NewDatabase(memDb, &triedb.Config{PathDB: &pathdb.Config{ + CleanCacheSize: 0, + WriteBufferSize: 0, + }}) // disable caching + } else { + return triedb.NewDatabase(memDb, &triedb.Config{HashDB: &hashdb.Config{ + CleanCacheSize: 0, + }}) // disable caching + } + } ) - if scheme == rawdb.PathScheme { - tdb = triedb.NewDatabase(memDb, &triedb.Config{PathDB: &pathdb.Config{ - CleanCacheSize: 0, - WriteBufferSize: 0, - }}) // disable caching - } else { - tdb = triedb.NewDatabase(memDb, &triedb.Config{HashDB: &hashdb.Config{ - CleanCacheSize: 0, - }}) // disable caching - } + tdb = openDb() db := NewDatabase(tdb, nil) var root common.Hash @@ -1008,17 +1011,29 @@ func testMissingTrieNodes(t *testing.T, scheme string) { tdb.Commit(root, false) } // Create a new state on the old root - state, _ = New(root, db) // Now we clear out the memdb it := memDb.NewIterator(nil, nil) for it.Next() { k := it.Key() + // Leave the root intact - if !bytes.Equal(k, root[:]) { - t.Logf("key: %x", k) - memDb.Delete(k) + if scheme == rawdb.HashScheme { + if !bytes.Equal(k, root[:]) { + t.Logf("key: %x", k) + memDb.Delete(k) + } + } + if scheme == rawdb.PathScheme { + rk := k[len(rawdb.TrieNodeAccountPrefix):] + if len(rk) != 0 { + t.Logf("key: %x", k) + memDb.Delete(k) + } } } + tdb = openDb() + db = NewDatabase(tdb, nil) + state, _ = New(root, db) balance := state.GetBalance(addr) // The removed elem should lead to it returning zero balance if exp, got := uint64(0), balance.Uint64(); got != exp { diff --git a/triedb/pathdb/buffer.go b/triedb/pathdb/buffer.go index 68e136f193..ba98b35688 100644 --- a/triedb/pathdb/buffer.go +++ b/triedb/pathdb/buffer.go @@ -17,6 +17,7 @@ package pathdb import ( + "errors" "fmt" "time" @@ -37,6 +38,9 @@ type buffer struct { limit uint64 // The maximum memory allowance in bytes nodes *nodeSet // Aggregated trie node set states *stateSet // Aggregated state set + + done chan struct{} // notifier whether the content in buffer has been flushed or not + flushErr error // error if any exception occurs during flushing } // newBuffer initializes the buffer with the provided states and trie nodes. @@ -124,36 +128,61 @@ func (b *buffer) size() uint64 { // flush persists the in-memory dirty trie node into the disk if the configured // memory threshold is reached. Note, all data must be written atomically. -func (b *buffer) flush(db ethdb.KeyValueStore, freezer ethdb.AncientWriter, nodesCache *fastcache.Cache, id uint64) error { - // Ensure the target state id is aligned with the internal counter. - head := rawdb.ReadPersistentStateID(db) - if head+b.layers != id { - return fmt.Errorf("buffer layers (%d) cannot be applied on top of persisted state id (%d) to reach requested state id (%d)", b.layers, head, id) +func (b *buffer) flush(db ethdb.KeyValueStore, freezer ethdb.AncientWriter, nodesCache *fastcache.Cache, id uint64) { + if b.done != nil { + panic("duplicated flush operation") } - // Terminate the state snapshot generation if it's active - var ( - start = time.Now() - batch = db.NewBatchWithSize(b.nodes.dbsize() * 11 / 10) // extra 10% for potential pebble internal stuff - ) - // Explicitly sync the state freezer, ensuring that all written - // data is transferred to disk before updating the key-value store. - if freezer != nil { - if err := freezer.Sync(); err != nil { - return err - } - } - nodes := b.nodes.write(batch, nodesCache) - rawdb.WritePersistentStateID(batch, id) + b.done = make(chan struct{}) - // Flush all mutations in a single batch - size := batch.ValueSize() - if err := batch.Write(); err != nil { - return err - } - commitBytesMeter.Mark(int64(size)) - commitNodesMeter.Mark(int64(nodes)) - commitTimeTimer.UpdateSince(start) - b.reset() - log.Debug("Persisted buffer content", "nodes", nodes, "bytes", common.StorageSize(size), "elapsed", common.PrettyDuration(time.Since(start))) - return nil + go func() { + defer func() { + close(b.done) + }() + + // Ensure the target state id is aligned with the internal counter. + head := rawdb.ReadPersistentStateID(db) + if head+b.layers != id { + b.flushErr = fmt.Errorf("buffer layers (%d) cannot be applied on top of persisted state id (%d) to reach requested state id (%d)", b.layers, head, id) + return + } + // Terminate the state snapshot generation if it's active + var ( + start = time.Now() + batch = db.NewBatchWithSize(b.nodes.dbsize() * 11 / 10) // extra 10% for potential pebble internal stuff + ) + // Explicitly sync the state freezer, ensuring that all written + // data is transferred to disk before updating the key-value store. + if freezer != nil { + if err := freezer.Sync(); err != nil { + b.flushErr = err + return + } + } + nodes := b.nodes.write(batch, nodesCache) + rawdb.WritePersistentStateID(batch, id) + + // Flush all mutations in a single batch + size := batch.ValueSize() + if err := batch.Write(); err != nil { + b.flushErr = err + return + } + // The content in the frozen buffer is kept for consequent state access, + // TODO (rjl493456442) measure the gc overhead for holding this struct. + + commitBytesMeter.Mark(int64(size)) + commitNodesMeter.Mark(int64(nodes)) + commitTimeTimer.UpdateSince(start) + log.Debug("Persisted buffer content", "nodes", nodes, "bytes", common.StorageSize(size), "elapsed", common.PrettyDuration(time.Since(start))) + }() +} + +// waitFlush blocks until the buffer has been fully flushed and returns any +// stored errors that occurred during the process. +func (b *buffer) waitFlush() error { + if b.done == nil { + return errors.New("the buffer is not frozen") + } + <-b.done + return b.flushErr } diff --git a/triedb/pathdb/database.go b/triedb/pathdb/database.go index cfbdb01c49..403f56bd99 100644 --- a/triedb/pathdb/database.go +++ b/triedb/pathdb/database.go @@ -377,7 +377,7 @@ func (db *Database) Enable(root common.Hash) error { } // Re-construct a new disk layer backed by persistent state // with **empty clean cache and node buffer**. - db.tree.reset(newDiskLayer(root, 0, db, nil, newBuffer(db.config.WriteBufferSize, nil, nil, 0))) + db.tree.reset(newDiskLayer(root, 0, db, nil, newBuffer(db.config.WriteBufferSize, nil, nil, 0), nil)) // Re-enable the database as the final step. db.waitSync = false @@ -476,7 +476,13 @@ func (db *Database) Close() error { db.readOnly = true // Release the memory held by clean cache. - db.tree.bottom().resetCache() + disk := db.tree.bottom() + if disk.frozen != nil { + if err := disk.frozen.waitFlush(); err != nil { + return err + } + } + disk.resetCache() // Close the attached state history freezer. if db.freezer == nil { diff --git a/triedb/pathdb/difflayer.go b/triedb/pathdb/difflayer.go index c06026b6ca..8708424727 100644 --- a/triedb/pathdb/difflayer.go +++ b/triedb/pathdb/difflayer.go @@ -156,7 +156,7 @@ func (dl *diffLayer) update(root common.Hash, id uint64, block uint64, nodes *no } // persist flushes the diff layer and all its parent layers to disk layer. -func (dl *diffLayer) persist(force bool) (layer, error) { +func (dl *diffLayer) persist(force bool) (*diskLayer, error) { if parent, ok := dl.parentLayer().(*diffLayer); ok { // Hold the lock to prevent any read operation until the new // parent is linked correctly. @@ -183,7 +183,7 @@ func (dl *diffLayer) size() uint64 { // diffToDisk merges a bottom-most diff into the persistent disk layer underneath // it. The method will panic if called onto a non-bottom-most diff layer. -func diffToDisk(layer *diffLayer, force bool) (layer, error) { +func diffToDisk(layer *diffLayer, force bool) (*diskLayer, error) { disk, ok := layer.parentLayer().(*diskLayer) if !ok { panic(fmt.Sprintf("unknown layer type: %T", layer.parentLayer())) diff --git a/triedb/pathdb/disklayer.go b/triedb/pathdb/disklayer.go index 003431b19b..9b7c14007b 100644 --- a/triedb/pathdb/disklayer.go +++ b/triedb/pathdb/disklayer.go @@ -34,13 +34,14 @@ type diskLayer struct { id uint64 // Immutable, corresponding state id db *Database // Path-based trie database nodes *fastcache.Cache // GC friendly memory cache of clean nodes - buffer *buffer // Dirty buffer to aggregate writes of nodes and states + buffer *buffer // Live buffer to aggregate writes + frozen *buffer // Frozen node buffer waiting for flushing stale bool // Signals that the layer became stale (state progressed) lock sync.RWMutex // Lock used to protect stale flag } // newDiskLayer creates a new disk layer based on the passing arguments. -func newDiskLayer(root common.Hash, id uint64, db *Database, nodes *fastcache.Cache, buffer *buffer) *diskLayer { +func newDiskLayer(root common.Hash, id uint64, db *Database, nodes *fastcache.Cache, buffer *buffer, frozen *buffer) *diskLayer { // Initialize a clean cache if the memory allowance is not zero // or reuse the provided cache if it is not nil (inherited from // the original disk layer). @@ -53,6 +54,7 @@ func newDiskLayer(root common.Hash, id uint64, db *Database, nodes *fastcache.Ca db: db, nodes: nodes, buffer: buffer, + frozen: frozen, } } @@ -101,16 +103,19 @@ func (dl *diskLayer) node(owner common.Hash, path []byte, depth int) ([]byte, co if dl.stale { return nil, common.Hash{}, nil, errSnapshotStale } - // Try to retrieve the trie node from the not-yet-written - // node buffer first. Note the buffer is lock free since - // it's impossible to mutate the buffer before tagging the - // layer as stale. - n, found := dl.buffer.node(owner, path) - if found { - dirtyNodeHitMeter.Mark(1) - dirtyNodeReadMeter.Mark(int64(len(n.Blob))) - dirtyNodeHitDepthHist.Update(int64(depth)) - return n.Blob, n.Hash, &nodeLoc{loc: locDirtyCache, depth: depth}, nil + // Try to retrieve the trie node from the not-yet-written node buffer first + // (both the live one and the frozen one). Note the buffer is lock free since + // it's impossible to mutate the buffer before tagging the layer as stale. + for _, buffer := range []*buffer{dl.buffer, dl.frozen} { + if buffer != nil { + n, found := buffer.node(owner, path) + if found { + dirtyNodeHitMeter.Mark(1) + dirtyNodeReadMeter.Mark(int64(len(n.Blob))) + dirtyNodeHitDepthHist.Update(int64(depth)) + return n.Blob, n.Hash, &nodeLoc{loc: locDirtyCache, depth: depth}, nil + } + } } dirtyNodeMissMeter.Mark(1) @@ -134,6 +139,11 @@ func (dl *diskLayer) node(owner common.Hash, path []byte, depth int) ([]byte, co } else { blob = rawdb.ReadStorageTrieNode(dl.db.diskdb, owner, path) } + // Store the resolved data in the clean cache. The background buffer flusher + // may also write to the clean cache concurrently, but two writers cannot + // write the same item with different content. If the item already exists, + // it will be found in the frozen buffer, eliminating the need to check the + // database. if dl.nodes != nil && len(blob) > 0 { dl.nodes.Set(key, blob) cleanNodeWriteMeter.Mark(int64(len(blob))) @@ -152,24 +162,27 @@ func (dl *diskLayer) account(hash common.Hash, depth int) ([]byte, error) { if dl.stale { return nil, errSnapshotStale } - // Try to retrieve the account from the not-yet-written - // node buffer first. Note the buffer is lock free since - // it's impossible to mutate the buffer before tagging the - // layer as stale. - blob, found := dl.buffer.account(hash) - if found { - dirtyStateHitMeter.Mark(1) - dirtyStateReadMeter.Mark(int64(len(blob))) - dirtyStateHitDepthHist.Update(int64(depth)) + // Try to retrieve the trie node from the not-yet-written node buffer first + // (both the live one and the frozen one). Note the buffer is lock free since + // it's impossible to mutate the buffer before tagging the layer as stale. + for _, buffer := range []*buffer{dl.buffer, dl.frozen} { + if buffer != nil { + blob, found := buffer.account(hash) + if found { + dirtyStateHitMeter.Mark(1) + dirtyStateReadMeter.Mark(int64(len(blob))) + dirtyStateHitDepthHist.Update(int64(depth)) - if len(blob) == 0 { - stateAccountInexMeter.Mark(1) - } else { - stateAccountExistMeter.Mark(1) + if len(blob) == 0 { + stateAccountInexMeter.Mark(1) + } else { + stateAccountExistMeter.Mark(1) + } + return blob, nil + } } - return blob, nil } - dirtyStateMissMeter.Mark(1) + dirtyNodeMissMeter.Mark(1) // TODO(rjl493456442) support persistent state retrieval return nil, errors.New("not supported") @@ -188,22 +201,27 @@ func (dl *diskLayer) storage(accountHash, storageHash common.Hash, depth int) ([ if dl.stale { return nil, errSnapshotStale } + // Try to retrieve the trie node from the not-yet-written node buffer first + // (both the live one and the frozen one). Note the buffer is lock free since + // it's impossible to mutate the buffer before tagging the layer as stale. + for _, buffer := range []*buffer{dl.buffer, dl.frozen} { + if blob, found := buffer.storage(accountHash, storageHash); found { + dirtyStateHitMeter.Mark(1) + dirtyStateReadMeter.Mark(int64(len(blob))) + dirtyStateHitDepthHist.Update(int64(depth)) + + if len(blob) == 0 { + stateStorageInexMeter.Mark(1) + } else { + stateStorageExistMeter.Mark(1) + } + return blob, nil + } + } // Try to retrieve the storage slot from the not-yet-written // node buffer first. Note the buffer is lock free since // it's impossible to mutate the buffer before tagging the // layer as stale. - if blob, found := dl.buffer.storage(accountHash, storageHash); found { - dirtyStateHitMeter.Mark(1) - dirtyStateReadMeter.Mark(int64(len(blob))) - dirtyStateHitDepthHist.Update(int64(depth)) - - if len(blob) == 0 { - stateStorageInexMeter.Mark(1) - } else { - stateStorageExistMeter.Mark(1) - } - return blob, nil - } dirtyStateMissMeter.Mark(1) // TODO(rjl493456442) support persistent state retrieval @@ -250,7 +268,7 @@ func (dl *diskLayer) commit(bottom *diffLayer, force bool) (*diskLayer, error) { // Mark the diskLayer as stale before applying any mutations on top. dl.stale = true - // Store the root->id lookup afterwards. All stored lookups are identified + // Store the root->id lookup afterward. All stored lookups are identified // by the **unique** state root. It's impossible that in the same chain // blocks are not adjacent but have the same root. if dl.id == 0 { @@ -262,18 +280,40 @@ func (dl *diskLayer) commit(bottom *diffLayer, force bool) (*diskLayer, error) { // truncation) surpasses the persisted state ID, we take the necessary action // of forcibly committing the cached dirty states to ensure that the persisted // state ID remains higher. - if !force && rawdb.ReadPersistentStateID(dl.db.diskdb) < oldest { + persistedID := rawdb.ReadPersistentStateID(dl.db.diskdb) + if !force && persistedID < oldest { force = true } - // Merge the trie nodes and flat states of the bottom-most diff layer into the - // buffer as the combined layer. + // Merge the nodes of the bottom-most diff layer into the buffer as the combined one combined := dl.buffer.commit(bottom.nodes, bottom.states.stateSet) if combined.full() || force { - if err := combined.flush(dl.db.diskdb, dl.db.freezer, dl.nodes, bottom.stateID()); err != nil { - return nil, err + // Wait until the previous frozen buffer is fully flushed + if dl.frozen != nil { + if err := dl.frozen.waitFlush(); err != nil { + return nil, err + } } + // Release the frozen buffer and the internally referenced maps will + // be reclaimed by GC. + dl.frozen = nil + + // Freeze the live buffer and schedule background flushing + dl.frozen = combined + dl.frozen.flush(dl.db.diskdb, dl.db.freezer, dl.nodes, bottom.stateID()) + + // Block until the frozen buffer is fully flushed out if the oldest history + // surpasses the persisted state ID. + if persistedID < oldest { + if err := dl.frozen.waitFlush(); err != nil { + return nil, err + } + } + combined = newBuffer(dl.db.config.WriteBufferSize, nil, nil, 0) } - ndl := newDiskLayer(bottom.root, bottom.stateID(), dl.db, dl.nodes, combined) + // Construct a new disk layer by merging the nodes from the provided diff + // layer, and flush the content in disk layer if there are too many nodes + // cached. The clean cache is inherited from the original disk layer. + ndl := newDiskLayer(bottom.root, bottom.stateID(), dl.db, dl.nodes, combined, dl.frozen) // To remove outdated history objects from the end, we set the 'tail' parameter // to 'oldest-1' due to the offset between the freezer index and the history ID. @@ -337,6 +377,15 @@ func (dl *diskLayer) revert(h *history) (*diskLayer, error) { return nil, err } } else { + // Block until the frozen buffer is fully flushed + if dl.frozen != nil { + if err := dl.frozen.waitFlush(); err != nil { + return nil, err + } + // Unset the frozen buffer if it exists, otherwise these "reverted" + // states will still be accessible after revert in frozen buffer. + dl.frozen = nil + } batch := dl.db.diskdb.NewBatch() writeNodes(batch, nodes, dl.nodes) rawdb.WritePersistentStateID(batch, dl.id-1) @@ -344,7 +393,7 @@ func (dl *diskLayer) revert(h *history) (*diskLayer, error) { log.Crit("Failed to write states", "err", err) } } - return newDiskLayer(h.meta.parent, dl.id-1, dl.db, dl.nodes, dl.buffer), nil + return newDiskLayer(h.meta.parent, dl.id-1, dl.db, dl.nodes, dl.buffer, dl.frozen), nil } // size returns the approximate size of cached nodes in the disk layer. diff --git a/triedb/pathdb/journal.go b/triedb/pathdb/journal.go index 779a262fdd..6e46467ac9 100644 --- a/triedb/pathdb/journal.go +++ b/triedb/pathdb/journal.go @@ -109,7 +109,7 @@ func (db *Database) loadLayers() layer { log.Info("Failed to load journal, discard it", "err", err) } // Return single layer with persistent state. - return newDiskLayer(root, rawdb.ReadPersistentStateID(db.diskdb), db, nil, newBuffer(db.config.WriteBufferSize, nil, nil, 0)) + return newDiskLayer(root, rawdb.ReadPersistentStateID(db.diskdb), db, nil, newBuffer(db.config.WriteBufferSize, nil, nil, 0), nil) } // loadDiskLayer reads the binary blob from the layer journal, reconstructing @@ -141,7 +141,7 @@ func (db *Database) loadDiskLayer(r *rlp.Stream) (layer, error) { if err := states.decode(r); err != nil { return nil, err } - return newDiskLayer(root, id, db, nil, newBuffer(db.config.WriteBufferSize, &nodes, &states, id-stored)), nil + return newDiskLayer(root, id, db, nil, newBuffer(db.config.WriteBufferSize, &nodes, &states, id-stored), nil), nil } // loadDiffLayer reads the next sections of a layer journal, reconstructing a new @@ -243,6 +243,11 @@ func (db *Database) Journal(root common.Hash) error { return fmt.Errorf("triedb layer [%#x] missing", root) } disk := db.tree.bottom() + if disk.frozen != nil { + if err := disk.frozen.waitFlush(); err != nil { + return err + } + } if l, ok := l.(*diffLayer); ok { log.Info("Persisting dirty state to disk", "head", l.block, "root", root, "layers", l.id-disk.id+disk.buffer.layers) } else { // disk layer only on noop runs (likely) or deep reorgs (unlikely) diff --git a/triedb/pathdb/layertree.go b/triedb/pathdb/layertree.go index cf6b14e744..292b3aed62 100644 --- a/triedb/pathdb/layertree.go +++ b/triedb/pathdb/layertree.go @@ -130,6 +130,12 @@ func (tree *layerTree) cap(root common.Hash, layers int) error { if err != nil { return err } + // Block until the frozen buffer is fully flushed + if base.frozen != nil { + if err := base.frozen.waitFlush(); err != nil { + return err + } + } // Replace the entire layer tree with the flat base tree.layers = map[common.Hash]layer{base.rootHash(): base} return nil