From 9dcf8aae4742cc4220065489a5bdcf045c398616 Mon Sep 17 00:00:00 2001 From: rjl493456442 Date: Wed, 10 Apr 2024 17:02:45 +0800 Subject: [PATCH] eth/protocols/snap: skip retrieval for completed storages (#29378) * eth/protocols/snap: skip retrieval for completed storages * eth/protocols/snap: address comments from peter * eth/protocols/snap: add comments --- eth/protocols/snap/metrics.go | 5 + eth/protocols/snap/progress_test.go | 154 ++++++++++++++++++++++++++++ eth/protocols/snap/sync.go | 142 +++++++++++++++++++++---- 3 files changed, 281 insertions(+), 20 deletions(-) create mode 100644 eth/protocols/snap/progress_test.go diff --git a/eth/protocols/snap/metrics.go b/eth/protocols/snap/metrics.go index a8dc2b5824..19e9151824 100644 --- a/eth/protocols/snap/metrics.go +++ b/eth/protocols/snap/metrics.go @@ -54,4 +54,9 @@ var ( // skipStorageHealingGauge is the metric to track how many storages are retrieved // in multiple requests but healing is not necessary. skipStorageHealingGauge = metrics.NewRegisteredGauge("eth/protocols/snap/sync/storage/noheal", nil) + + // largeStorageDiscardGauge is the metric to track how many chunked storages are + // discarded during the snap sync. + largeStorageDiscardGauge = metrics.NewRegisteredGauge("eth/protocols/snap/sync/storage/chunk/discard", nil) + largeStorageResumedGauge = metrics.NewRegisteredGauge("eth/protocols/snap/sync/storage/chunk/resume", nil) ) diff --git a/eth/protocols/snap/progress_test.go b/eth/protocols/snap/progress_test.go new file mode 100644 index 0000000000..9d923bd2f5 --- /dev/null +++ b/eth/protocols/snap/progress_test.go @@ -0,0 +1,154 @@ +// Copyright 2024 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package snap + +import ( + "encoding/json" + "testing" + + "github.com/ethereum/go-ethereum/common" +) + +// Legacy sync progress definitions +type legacyStorageTask struct { + Next common.Hash // Next account to sync in this interval + Last common.Hash // Last account to sync in this interval +} + +type legacyAccountTask struct { + Next common.Hash // Next account to sync in this interval + Last common.Hash // Last account to sync in this interval + SubTasks map[common.Hash][]*legacyStorageTask // Storage intervals needing fetching for large contracts +} + +type legacyProgress struct { + Tasks []*legacyAccountTask // The suspended account tasks (contract tasks within) +} + +func compareProgress(a legacyProgress, b SyncProgress) bool { + if len(a.Tasks) != len(b.Tasks) { + return false + } + for i := 0; i < len(a.Tasks); i++ { + if a.Tasks[i].Next != b.Tasks[i].Next { + return false + } + if a.Tasks[i].Last != b.Tasks[i].Last { + return false + } + // new fields are not checked here + + if len(a.Tasks[i].SubTasks) != len(b.Tasks[i].SubTasks) { + return false + } + for addrHash, subTasksA := range a.Tasks[i].SubTasks { + subTasksB, ok := b.Tasks[i].SubTasks[addrHash] + if !ok || len(subTasksB) != len(subTasksA) { + return false + } + for j := 0; j < len(subTasksA); j++ { + if subTasksA[j].Next != subTasksB[j].Next { + return false + } + if subTasksA[j].Last != subTasksB[j].Last { + return false + } + } + } + } + return true +} + +func makeLegacyProgress() legacyProgress { + return legacyProgress{ + Tasks: []*legacyAccountTask{ + { + Next: common.Hash{}, + Last: common.Hash{0x77}, + SubTasks: map[common.Hash][]*legacyStorageTask{ + common.Hash{0x1}: { + { + Next: common.Hash{}, + Last: common.Hash{0xff}, + }, + }, + }, + }, + { + Next: common.Hash{0x88}, + Last: common.Hash{0xff}, + }, + }, + } +} + +func convertLegacy(legacy legacyProgress) SyncProgress { + var progress SyncProgress + for i, task := range legacy.Tasks { + subTasks := make(map[common.Hash][]*storageTask) + for owner, list := range task.SubTasks { + var cpy []*storageTask + for i := 0; i < len(list); i++ { + cpy = append(cpy, &storageTask{ + Next: list[i].Next, + Last: list[i].Last, + }) + } + subTasks[owner] = cpy + } + accountTask := &accountTask{ + Next: task.Next, + Last: task.Last, + SubTasks: subTasks, + } + if i == 0 { + accountTask.StorageCompleted = []common.Hash{{0xaa}, {0xbb}} // fulfill new fields + } + progress.Tasks = append(progress.Tasks, accountTask) + } + return progress +} + +func TestSyncProgressCompatibility(t *testing.T) { + // Decode serialized bytes of legacy progress, backward compatibility + legacy := makeLegacyProgress() + blob, err := json.Marshal(legacy) + if err != nil { + t.Fatalf("Failed to marshal progress %v", err) + } + var dec SyncProgress + if err := json.Unmarshal(blob, &dec); err != nil { + t.Fatalf("Failed to unmarshal progress %v", err) + } + if !compareProgress(legacy, dec) { + t.Fatal("sync progress is not backward compatible") + } + + // Decode serialized bytes of new format progress + progress := convertLegacy(legacy) + blob, err = json.Marshal(progress) + if err != nil { + t.Fatalf("Failed to marshal progress %v", err) + } + var legacyDec legacyProgress + if err := json.Unmarshal(blob, &legacyDec); err != nil { + t.Fatalf("Failed to unmarshal progress %v", err) + } + if !compareProgress(legacyDec, progress) { + t.Fatal("sync progress is not forward compatible") + } +} diff --git a/eth/protocols/snap/sync.go b/eth/protocols/snap/sync.go index 7915a8eba8..d5d6fd6d69 100644 --- a/eth/protocols/snap/sync.go +++ b/eth/protocols/snap/sync.go @@ -295,11 +295,19 @@ type bytecodeHealResponse struct { // accountTask represents the sync task for a chunk of the account snapshot. type accountTask struct { - // These fields get serialized to leveldb on shutdown + // These fields get serialized to key-value store on shutdown Next common.Hash // Next account to sync in this interval Last common.Hash // Last account to sync in this interval SubTasks map[common.Hash][]*storageTask // Storage intervals needing fetching for large contracts + // This is a list of account hashes whose storage are already completed + // in this cycle. This field is newly introduced in v1.14 and will be + // empty if the task is resolved from legacy progress data. Furthermore, + // this additional field will be ignored by legacy Geth. The only side + // effect is that these contracts might be resynced in the new cycle, + // retaining the legacy behavior. + StorageCompleted []common.Hash `json:",omitempty"` + // These fields are internals used during runtime req *accountRequest // Pending request to fill this task res *accountResponse // Validate response filling this task @@ -309,8 +317,9 @@ type accountTask struct { needState []bool // Flags whether the filling accounts need storage retrieval needHeal []bool // Flags whether the filling accounts's state was chunked and need healing - codeTasks map[common.Hash]struct{} // Code hashes that need retrieval - stateTasks map[common.Hash]common.Hash // Account hashes->roots that need full state retrieval + codeTasks map[common.Hash]struct{} // Code hashes that need retrieval + stateTasks map[common.Hash]common.Hash // Account hashes->roots that need full state retrieval + stateCompleted map[common.Hash]struct{} // Account hashes whose storage have been completed genBatch ethdb.Batch // Batch used by the node generator genTrie *trie.StackTrie // Node generator from storage slots @@ -318,6 +327,30 @@ type accountTask struct { done bool // Flag whether the task can be removed } +// activeSubTasks returns the set of storage tasks covered by the current account +// range. Normally this would be the entire subTask set, but on a sync interrupt +// and later resume it can happen that a shorter account range is retrieved. This +// method ensures that we only start up the subtasks covered by the latest account +// response. +// +// Nil is returned if the account range is empty. +func (task *accountTask) activeSubTasks() map[common.Hash][]*storageTask { + if len(task.res.hashes) == 0 { + return nil + } + var ( + tasks = make(map[common.Hash][]*storageTask) + last = task.res.hashes[len(task.res.hashes)-1] + ) + for hash, subTasks := range task.SubTasks { + subTasks := subTasks // closure + if hash.Cmp(last) <= 0 { + tasks[hash] = subTasks + } + } + return tasks +} + // storageTask represents the sync task for a chunk of the storage snapshot. type storageTask struct { Next common.Hash // Next account to sync in this interval @@ -745,6 +778,14 @@ func (s *Syncer) loadSyncStatus() { for _, task := range s.tasks { task := task // closure for task.genBatch in the stacktrie writer callback + // Restore the completed storages + task.stateCompleted = make(map[common.Hash]struct{}) + for _, hash := range task.StorageCompleted { + task.stateCompleted[hash] = struct{}{} + } + task.StorageCompleted = nil + + // Allocate batch for account trie generation task.genBatch = ethdb.HookedBatch{ Batch: s.db.NewBatch(), OnPut: func(key []byte, value []byte) { @@ -767,6 +808,8 @@ func (s *Syncer) loadSyncStatus() { options = options.WithSkipBoundary(task.Next != (common.Hash{}), task.Last != common.MaxHash, boundaryAccountNodesGauge) } task.genTrie = trie.NewStackTrie(options) + + // Restore leftover storage tasks for accountHash, subtasks := range task.SubTasks { for _, subtask := range subtasks { subtask := subtask // closure for subtask.genBatch in the stacktrie writer callback @@ -861,11 +904,12 @@ func (s *Syncer) loadSyncStatus() { options = options.WithSkipBoundary(next != common.Hash{}, last != common.MaxHash, boundaryAccountNodesGauge) } s.tasks = append(s.tasks, &accountTask{ - Next: next, - Last: last, - SubTasks: make(map[common.Hash][]*storageTask), - genBatch: batch, - genTrie: trie.NewStackTrie(options), + Next: next, + Last: last, + SubTasks: make(map[common.Hash][]*storageTask), + genBatch: batch, + stateCompleted: make(map[common.Hash]struct{}), + genTrie: trie.NewStackTrie(options), }) log.Debug("Created account sync task", "from", next, "last", last) next = common.BigToHash(new(big.Int).Add(last.Big(), common.Big1)) @@ -886,6 +930,14 @@ func (s *Syncer) saveSyncStatus() { } } } + // Save the account hashes of completed storage. + task.StorageCompleted = make([]common.Hash, 0, len(task.stateCompleted)) + for hash := range task.stateCompleted { + task.StorageCompleted = append(task.StorageCompleted, hash) + } + if len(task.StorageCompleted) > 0 { + log.Debug("Leftover completed storages", "number", len(task.StorageCompleted), "next", task.Next, "last", task.Last) + } } // Store the actual progress markers progress := &SyncProgress{ @@ -970,6 +1022,10 @@ func (s *Syncer) cleanStorageTasks() { delete(task.SubTasks, account) task.pend-- + // Mark the state as complete to prevent resyncing, regardless + // if state healing is necessary. + task.stateCompleted[account] = struct{}{} + // If this was the last pending task, forward the account task if task.pend == 0 { s.forwardAccountTask(task) @@ -1209,7 +1265,8 @@ func (s *Syncer) assignStorageTasks(success chan *storageResponse, fail chan *st continue } // Skip tasks that are already retrieving (or done with) all small states - if len(task.SubTasks) == 0 && len(task.stateTasks) == 0 { + storageTasks := task.activeSubTasks() + if len(storageTasks) == 0 && len(task.stateTasks) == 0 { continue } // Task pending retrieval, try to find an idle peer. If no such peer @@ -1253,7 +1310,7 @@ func (s *Syncer) assignStorageTasks(success chan *storageResponse, fail chan *st roots = make([]common.Hash, 0, storageSets) subtask *storageTask ) - for account, subtasks := range task.SubTasks { + for account, subtasks := range storageTasks { for _, st := range subtasks { // Skip any subtasks already filling if st.req != nil { @@ -1850,11 +1907,11 @@ func (s *Syncer) processAccountResponse(res *accountResponse) { res.task.res = res // Ensure that the response doesn't overflow into the subsequent task - last := res.task.Last.Big() + lastBig := res.task.Last.Big() for i, hash := range res.hashes { // Mark the range complete if the last is already included. // Keep iteration to delete the extra states if exists. - cmp := hash.Big().Cmp(last) + cmp := hash.Big().Cmp(lastBig) if cmp == 0 { res.cont = false continue @@ -1890,7 +1947,21 @@ func (s *Syncer) processAccountResponse(res *accountResponse) { } // Check if the account is a contract with an unknown storage trie if account.Root != types.EmptyRootHash { - if !rawdb.HasTrieNode(s.db, res.hashes[i], nil, account.Root, s.scheme) { + // If the storage was already retrieved in the last cycle, there's no need + // to resync it again, regardless of whether the storage root is consistent + // or not. + if _, exist := res.task.stateCompleted[res.hashes[i]]; exist { + // The leftover storage tasks are not expected, unless system is + // very wrong. + if _, ok := res.task.SubTasks[res.hashes[i]]; ok { + panic(fmt.Errorf("unexpected leftover storage tasks, owner: %x", res.hashes[i])) + } + // Mark the healing tag if storage root node is inconsistent, or + // it's non-existent due to storage chunking. + if !rawdb.HasTrieNode(s.db, res.hashes[i], nil, account.Root, s.scheme) { + res.task.needHeal[i] = true + } + } else { // If there was a previous large state retrieval in progress, // don't restart it from scratch. This happens if a sync cycle // is interrupted and resumed later. However, *do* update the @@ -1902,7 +1973,12 @@ func (s *Syncer) processAccountResponse(res *accountResponse) { } res.task.needHeal[i] = true resumed[res.hashes[i]] = struct{}{} + largeStorageResumedGauge.Inc(1) } else { + // It's possible that in the hash scheme, the storage, along + // with the trie nodes of the given root, is already present + // in the database. Schedule the storage task anyway to simplify + // the logic here. res.task.stateTasks[res.hashes[i]] = account.Root } res.task.needState[i] = true @@ -1910,13 +1986,29 @@ func (s *Syncer) processAccountResponse(res *accountResponse) { } } } - // Delete any subtasks that have been aborted but not resumed. This may undo - // some progress if a new peer gives us less accounts than an old one, but for - // now we have to live with that. - for hash := range res.task.SubTasks { - if _, ok := resumed[hash]; !ok { - log.Debug("Aborting suspended storage retrieval", "account", hash) - delete(res.task.SubTasks, hash) + // Delete any subtasks that have been aborted but not resumed. It's essential + // as the corresponding contract might be self-destructed in this cycle(it's + // no longer possible in ethereum as self-destruction is disabled in Cancun + // Fork, but the condition is still necessary for other networks). + // + // Keep the leftover storage tasks if they are not covered by the responded + // account range which should be picked up in next account wave. + if len(res.hashes) > 0 { + // The hash of last delivered account in the response + last := res.hashes[len(res.hashes)-1] + for hash := range res.task.SubTasks { + // TODO(rjl493456442) degrade the log level before merging. + if hash.Cmp(last) > 0 { + log.Info("Keeping suspended storage retrieval", "account", hash) + continue + } + // TODO(rjl493456442) degrade the log level before merging. + // It should never happen in ethereum. + if _, ok := resumed[hash]; !ok { + log.Error("Aborting suspended storage retrieval", "account", hash) + delete(res.task.SubTasks, hash) + largeStorageDiscardGauge.Inc(1) + } } } // If the account range contained no contracts, or all have been fully filled @@ -2014,6 +2106,7 @@ func (s *Syncer) processStorageResponse(res *storageResponse) { if res.subTask == nil && res.mainTask.needState[j] && (i < len(res.hashes)-1 || !res.cont) { res.mainTask.needState[j] = false res.mainTask.pend-- + res.mainTask.stateCompleted[account] = struct{}{} // mark it as completed smallStorageGauge.Inc(1) } // If the last contract was chunked, mark it as needing healing @@ -2409,10 +2502,19 @@ func (s *Syncer) forwardAccountTask(task *accountTask) { return } task.Next = incHash(hash) + + // Remove the completion flag once the account range is pushed + // forward. The leftover accounts will be skipped in the next + // cycle. + delete(task.stateCompleted, hash) } // All accounts marked as complete, track if the entire task is done task.done = !res.cont + // Error out if there is any leftover completion flag. + if task.done && len(task.stateCompleted) != 0 { + panic(fmt.Errorf("storage completion flags should be emptied, %d left", len(task.stateCompleted))) + } // Stack trie could have generated trie nodes, push them to disk (we need to // flush after finalizing task.done. It's fine even if we crash and lose this // write as it will only cause more data to be downloaded during heal.