From 686f7438d37db6cf1fea640df51f8f4a36b6d6c2 Mon Sep 17 00:00:00 2001 From: Gary Rong Date: Mon, 9 Jan 2023 15:07:08 +0800 Subject: [PATCH 1/2] eth/downloader: fix unexpected skeleton header deletion --- eth/downloader/skeleton.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/eth/downloader/skeleton.go b/eth/downloader/skeleton.go index 8dcec2292b..6fa565bd5b 100644 --- a/eth/downloader/skeleton.go +++ b/eth/downloader/skeleton.go @@ -977,8 +977,14 @@ func (s *skeleton) processResponse(res *headerResponse) (linked bool, merged boo // the expected new sync cycle after some propagated blocks. Log // it for debugging purposes, explicitly clean and don't escalate. case subchains == 2 && s.progress.Subchains[1].Head == s.progress.Subchains[1].Tail: - log.Debug("Cleaning previous beacon sync state", "head", s.progress.Subchains[1].Head) - rawdb.DeleteSkeletonHeader(batch, s.progress.Subchains[1].Head) + // Remove the leftover skeleton header associated with old + // skeleton chain only if it's not covered by the current + // skeleton range. + if s.progress.Subchains[1].Head < s.progress.Subchains[0].Tail { + log.Debug("Cleaning previous beacon sync state", "head", s.progress.Subchains[1].Head) + rawdb.DeleteSkeletonHeader(batch, s.progress.Subchains[1].Head) + } + // Drop the leftover skeleton chain since it's stale. s.progress.Subchains = s.progress.Subchains[:1] // If we have more than one header or more than one leftover chain, From 71f7988b0f6abc6db84e0ef6ec80766b946229f6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Mon, 9 Jan 2023 12:12:25 +0200 Subject: [PATCH 2/2] eth/downloader: create repro testcase for beacon header loss --- eth/downloader/skeleton.go | 2 +- eth/downloader/skeleton_test.go | 157 ++++++++++++++++++++++++-------- 2 files changed, 120 insertions(+), 39 deletions(-) diff --git a/eth/downloader/skeleton.go b/eth/downloader/skeleton.go index 6fa565bd5b..15d473af26 100644 --- a/eth/downloader/skeleton.go +++ b/eth/downloader/skeleton.go @@ -520,7 +520,7 @@ func (s *skeleton) initSync(head *types.Header) { } break } - // If the last subchain can be extended, we're lucky. Otherwise create + // If the last subchain can be extended, we're lucky. Otherwise, create // a new subchain sync task. var extended bool if n := len(s.progress.Subchains); n > 0 { diff --git a/eth/downloader/skeleton_test.go b/eth/downloader/skeleton_test.go index 41373d33a8..3b8e627cba 100644 --- a/eth/downloader/skeleton_test.go +++ b/eth/downloader/skeleton_test.go @@ -37,7 +37,7 @@ import ( type hookedBackfiller struct { // suspendHook is an optional hook to be called when the filler is requested // to be suspended. - suspendHook func() + suspendHook func() *types.Header // resumeHook is an optional hook to be called when the filler is requested // to be resumed. @@ -56,7 +56,7 @@ func newHookedBackfiller() backfiller { // on initial startup. func (hf *hookedBackfiller) suspend() *types.Header { if hf.suspendHook != nil { - hf.suspendHook() + return hf.suspendHook() } return nil // we don't really care about header cleanups for now } @@ -525,9 +525,21 @@ func TestSkeletonSyncRetrievals(t *testing.T) { Number: big.NewInt(int64(i)), }) } + // Some tests require a forking side chain to trigger cornercases. + var sidechain []*types.Header + for i := 0; i < len(chain)/2; i++ { // Fork at block #5000 + sidechain = append(sidechain, chain[i]) + } + for i := len(chain) / 2; i < len(chain); i++ { + sidechain = append(sidechain, &types.Header{ + ParentHash: sidechain[i-1].Hash(), + Number: big.NewInt(int64(i)), + Extra: []byte("B"), // force a different hash + }) + } tests := []struct { - headers []*types.Header // Database content (beside the genesis) - oldstate []*subchain // Old sync state with various interrupted subchains + fill bool // Whether to run a real backfiller in this test case + unpredictable bool // Whether to ignore drops/serves due to uncertain packet assignments head *types.Header // New head header to announce to reorg to peers []*skeletonTestPeer // Initial peer set to start the sync with @@ -760,11 +772,41 @@ func TestSkeletonSyncRetrievals(t *testing.T) { endstate: []*subchain{{Head: 2*requestHeaders + 2, Tail: 1}}, endserve: 4 * requestHeaders, }, + // This test reproduces a bug caught by (@rjl493456442) where a skeleton + // header goes missing, causing the sync to get stuck and/or panic. + // + // The setup requires a previously successfully synced chain up to a block + // height N. That results is a single skeleton header (block N) and a single + // subchain (head N, Tail N) being stored on disk. + // + // The following step requires a new sync cycle to a new side chain of a + // height higher than N, and an ancestor lower than N (e.g. N-2, N+2). + // In this scenario, when processing a batch of headers, a link point of + // N-2 will be found, meaning that N-1 and N have been overwritten. + // + // The link event triggers an early exit, noticing that the previous sub- + // chain is a leftover and deletes it (with it's skeleton header N). But + // since skeleton header N has been overwritten to the new side chain, we + // end up losing it and creating a gap. + { + fill: true, + unpredictable: true, // We have good and bad peer too, bad may be dropped, test too short for certainty + + head: chain[len(chain)/2+1], // Sync up until the sidechain common ancestor + 2 + peers: []*skeletonTestPeer{newSkeletonTestPeer("test-peer-oldchain", chain)}, + midstate: []*subchain{{Head: uint64(len(chain)/2 + 1), Tail: 1}}, + + newHead: sidechain[len(sidechain)/2+3], // Sync up until the sidechain common ancestor + 4 + newPeer: newSkeletonTestPeer("test-peer-newchain", sidechain), + endstate: []*subchain{{Head: uint64(len(sidechain)/2 + 3), Tail: uint64(len(chain) / 2)}}, + }, } for i, tt := range tests { // Create a fresh database and initialize it with the starting state db := rawdb.NewMemoryDatabase() - rawdb.WriteHeader(db, chain[0]) + + rawdb.WriteBlock(db, types.NewBlockWithHeader(chain[0])) + rawdb.WriteReceipts(db, chain[0].Hash(), chain[0].Number.Uint64(), types.Receipts{}) // Create a peer set to feed headers through peerset := newPeerSet() @@ -780,8 +822,43 @@ func TestSkeletonSyncRetrievals(t *testing.T) { peerset.Unregister(peer) dropped[peer]++ } + // Create a backfiller if we need to run more advanced tests + filler := newHookedBackfiller() + if tt.fill { + var filled *types.Header + + filler = &hookedBackfiller{ + resumeHook: func() { + var progress skeletonProgress + json.Unmarshal(rawdb.ReadSkeletonSyncStatus(db), &progress) + + for progress.Subchains[0].Tail < progress.Subchains[0].Head { + header := rawdb.ReadSkeletonHeader(db, progress.Subchains[0].Tail) + + rawdb.WriteBlock(db, types.NewBlockWithHeader(header)) + rawdb.WriteReceipts(db, header.Hash(), header.Number.Uint64(), types.Receipts{}) + + rawdb.DeleteSkeletonHeader(db, header.Number.Uint64()) + + progress.Subchains[0].Tail++ + progress.Subchains[0].Next = header.Hash() + } + filled = rawdb.ReadSkeletonHeader(db, progress.Subchains[0].Tail) + + rawdb.WriteBlock(db, types.NewBlockWithHeader(filled)) + rawdb.WriteReceipts(db, filled.Hash(), filled.Number.Uint64(), types.Receipts{}) + }, + + suspendHook: func() *types.Header { + prev := filled + filled = nil + + return prev + }, + } + } // Create a skeleton sync and run a cycle - skeleton := newSkeleton(db, peerset, drop, newHookedBackfiller()) + skeleton := newSkeleton(db, peerset, drop, filler) skeleton.Sync(tt.head, true) var progress skeletonProgress @@ -815,19 +892,21 @@ func TestSkeletonSyncRetrievals(t *testing.T) { t.Error(err) continue } - var served uint64 - for _, peer := range tt.peers { - served += atomic.LoadUint64(&peer.served) - } - if served != tt.midserve { - t.Errorf("test %d, mid state: served headers mismatch: have %d, want %d", i, served, tt.midserve) - } - var drops uint64 - for _, peer := range tt.peers { - drops += atomic.LoadUint64(&peer.dropped) - } - if drops != tt.middrop { - t.Errorf("test %d, mid state: dropped peers mismatch: have %d, want %d", i, drops, tt.middrop) + if !tt.unpredictable { + var served uint64 + for _, peer := range tt.peers { + served += atomic.LoadUint64(&peer.served) + } + if served != tt.midserve { + t.Errorf("test %d, mid state: served headers mismatch: have %d, want %d", i, served, tt.midserve) + } + var drops uint64 + for _, peer := range tt.peers { + drops += atomic.LoadUint64(&peer.dropped) + } + if drops != tt.middrop { + t.Errorf("test %d, mid state: dropped peers mismatch: have %d, want %d", i, drops, tt.middrop) + } } // Apply the post-init events if there's any if tt.newHead != nil { @@ -868,25 +947,27 @@ func TestSkeletonSyncRetrievals(t *testing.T) { continue } // Check that the peers served no more headers than we actually needed - served = 0 - for _, peer := range tt.peers { - served += atomic.LoadUint64(&peer.served) - } - if tt.newPeer != nil { - served += atomic.LoadUint64(&tt.newPeer.served) - } - if served != tt.endserve { - t.Errorf("test %d, end state: served headers mismatch: have %d, want %d", i, served, tt.endserve) - } - drops = 0 - for _, peer := range tt.peers { - drops += atomic.LoadUint64(&peer.dropped) - } - if tt.newPeer != nil { - drops += atomic.LoadUint64(&tt.newPeer.dropped) - } - if drops != tt.middrop { - t.Errorf("test %d, end state: dropped peers mismatch: have %d, want %d", i, drops, tt.middrop) + if !tt.unpredictable { + served := uint64(0) + for _, peer := range tt.peers { + served += atomic.LoadUint64(&peer.served) + } + if tt.newPeer != nil { + served += atomic.LoadUint64(&tt.newPeer.served) + } + if served != tt.endserve { + t.Errorf("test %d, end state: served headers mismatch: have %d, want %d", i, served, tt.endserve) + } + drops := uint64(0) + for _, peer := range tt.peers { + drops += atomic.LoadUint64(&peer.dropped) + } + if tt.newPeer != nil { + drops += atomic.LoadUint64(&tt.newPeer.dropped) + } + if drops != tt.enddrop { + t.Errorf("test %d, end state: dropped peers mismatch: have %d, want %d", i, drops, tt.middrop) + } } // Clean up any leftover skeleton sync resources skeleton.Terminate()