From 9b84caf3a5f55cc2a14b50291118b9fab668b8c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Wed, 22 Mar 2017 02:37:24 +0200 Subject: [PATCH] core, eth, les: support resuming fast sync on heavy rollback (#3743) --- core/blockchain.go | 73 +++++++++++++++++++++---------- eth/api_backend.go | 1 + eth/downloader/downloader.go | 16 +++---- eth/downloader/downloader_test.go | 4 +- eth/sync.go | 8 ++++ les/api_backend.go | 1 + 6 files changed, 69 insertions(+), 34 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index 765a4b318f..207c21a659 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -182,16 +182,25 @@ func (self *BlockChain) loadLastState() error { head := GetHeadBlockHash(self.chainDb) if head == (common.Hash{}) { // Corrupt or empty database, init from scratch - self.Reset() - } else { - if block := self.GetBlockByHash(head); block != nil { - // Block found, set as the current head - self.currentBlock = block - } else { - // Corrupt or empty database, init from scratch - self.Reset() - } + log.Warn("Empty database, resetting chain") + return self.Reset() } + // Make sure the entire head block is available + currentBlock := self.GetBlockByHash(head) + if currentBlock == nil { + // Corrupt or empty database, init from scratch + log.Warn("Head block missing, resetting chain", "hash", head) + return self.Reset() + } + // Make sure the state associated with the block is available + if _, err := state.New(currentBlock.Root(), self.chainDb); err != nil { + // Dangling block without a state associated, init from scratch + log.Warn("Head state missing, resetting chain", "number", currentBlock.Number(), "hash", currentBlock.Hash()) + return self.Reset() + } + // Everything seems to be fine, set as the head block + self.currentBlock = currentBlock + // Restore the last known head header currentHeader := self.currentBlock.Header() if head := GetHeadHeaderHash(self.chainDb); head != (common.Hash{}) { @@ -200,6 +209,7 @@ func (self *BlockChain) loadLastState() error { } } self.hc.SetCurrentHeader(currentHeader) + // Restore the last known head fast block self.currentFastBlock = self.currentBlock if head := GetHeadFastBlockHash(self.chainDb); head != (common.Hash{}) { @@ -233,14 +243,18 @@ func (self *BlockChain) loadLastState() error { // above the new head will be deleted and the new one set. In the case of blocks // though, the head may be further rewound if block bodies are missing (non-archive // nodes after a fast sync). -func (bc *BlockChain) SetHead(head uint64) { +func (bc *BlockChain) SetHead(head uint64) error { + log.Warn("Rewinding blockchain", "target", head) + bc.mu.Lock() defer bc.mu.Unlock() + // Rewind the header chain, deleting all block bodies until then delFn := func(hash common.Hash, num uint64) { DeleteBody(bc.chainDb, hash, num) } bc.hc.SetHead(head, delFn) + currentHeader := bc.hc.CurrentHeader() // Clear out any stale content from the caches bc.bodyCache.Purge() @@ -248,29 +262,34 @@ func (bc *BlockChain) SetHead(head uint64) { bc.blockCache.Purge() bc.futureBlocks.Purge() - // Update all computed fields to the new head - currentHeader := bc.hc.CurrentHeader() + // Rewind the block chain, ensuring we don't end up with a stateless head block if bc.currentBlock != nil && currentHeader.Number.Uint64() < bc.currentBlock.NumberU64() { bc.currentBlock = bc.GetBlock(currentHeader.Hash(), currentHeader.Number.Uint64()) } + if bc.currentBlock != nil { + if _, err := state.New(bc.currentBlock.Root(), bc.chainDb); err != nil { + // Rewound state missing, rolled back to before pivot, reset to genesis + bc.currentBlock = nil + } + } + // Rewind the fast block in a simpleton way to the target head if bc.currentFastBlock != nil && currentHeader.Number.Uint64() < bc.currentFastBlock.NumberU64() { bc.currentFastBlock = bc.GetBlock(currentHeader.Hash(), currentHeader.Number.Uint64()) } - + // If either blocks reached nil, reset to the genesis state if bc.currentBlock == nil { bc.currentBlock = bc.genesisBlock } if bc.currentFastBlock == nil { bc.currentFastBlock = bc.genesisBlock } - if err := WriteHeadBlockHash(bc.chainDb, bc.currentBlock.Hash()); err != nil { log.Crit("Failed to reset head full block", "err", err) } if err := WriteHeadFastBlockHash(bc.chainDb, bc.currentFastBlock.Hash()); err != nil { log.Crit("Failed to reset head fast block", "err", err) } - bc.loadLastState() + return bc.loadLastState() } // FastSyncCommitHead sets the current head block to the one defined by the hash @@ -378,16 +397,17 @@ func (self *BlockChain) StateAt(root common.Hash) (*state.StateDB, error) { } // Reset purges the entire blockchain, restoring it to its genesis state. -func (bc *BlockChain) Reset() { - bc.ResetWithGenesisBlock(bc.genesisBlock) +func (bc *BlockChain) Reset() error { + return bc.ResetWithGenesisBlock(bc.genesisBlock) } // ResetWithGenesisBlock purges the entire blockchain, restoring it to the // specified genesis state. -func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) { +func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) error { // Dump the entire block chain and purge the caches - bc.SetHead(0) - + if err := bc.SetHead(0); err != nil { + return err + } bc.mu.Lock() defer bc.mu.Unlock() @@ -404,6 +424,8 @@ func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) { bc.hc.SetGenesis(bc.genesisBlock.Header()) bc.hc.SetCurrentHeader(bc.genesisBlock.Header()) bc.currentFastBlock = bc.genesisBlock + + return nil } // Export writes the active chain to the given writer. @@ -790,12 +812,15 @@ func (self *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain } // Update the head fast sync block if better self.mu.Lock() + head := blockChain[len(errs)-1] - if self.GetTd(self.currentFastBlock.Hash(), self.currentFastBlock.NumberU64()).Cmp(self.GetTd(head.Hash(), head.NumberU64())) < 0 { - if err := WriteHeadFastBlockHash(self.chainDb, head.Hash()); err != nil { - log.Crit("Failed to update head fast block hash", "err", err) + if td := self.GetTd(head.Hash(), head.NumberU64()); td != nil { // Rewind may have occurred, skip in that case + if self.GetTd(self.currentFastBlock.Hash(), self.currentFastBlock.NumberU64()).Cmp(td) < 0 { + if err := WriteHeadFastBlockHash(self.chainDb, head.Hash()); err != nil { + log.Crit("Failed to update head fast block hash", "err", err) + } + self.currentFastBlock = head } - self.currentFastBlock = head } self.mu.Unlock() diff --git a/eth/api_backend.go b/eth/api_backend.go index 5a5c4c532e..bce772900d 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -51,6 +51,7 @@ func (b *EthApiBackend) CurrentBlock() *types.Block { } func (b *EthApiBackend) SetHead(number uint64) { + b.eth.protocolManager.downloader.Cancel() b.eth.blockchain.SetHead(number) } diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index f7aca031a9..d269957827 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -277,7 +277,7 @@ func (d *Downloader) UnregisterPeer(id string) error { d.cancelLock.RUnlock() if master { - d.cancel() + d.Cancel() } return nil } @@ -352,7 +352,7 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode d.cancelPeer = id d.cancelLock.Unlock() - defer d.cancel() // No matter what, we can't leave the cancel channel open + defer d.Cancel() // No matter what, we can't leave the cancel channel open // Set the requested sync mode, unless it's forbidden d.mode = mode @@ -473,7 +473,7 @@ func (d *Downloader) spawnSync(origin uint64, fetchers ...func() error) error { } } d.queue.Close() - d.cancel() + d.Cancel() wg.Wait() // If sync failed in the critical section, bump the fail counter @@ -483,9 +483,9 @@ func (d *Downloader) spawnSync(origin uint64, fetchers ...func() error) error { return err } -// cancel cancels all of the operations and resets the queue. It returns true +// Cancel cancels all of the operations and resets the queue. It returns true // if the cancel operation was completed. -func (d *Downloader) cancel() { +func (d *Downloader) Cancel() { // Close the current cancel channel d.cancelLock.Lock() if d.cancelCh != nil { @@ -512,7 +512,7 @@ func (d *Downloader) Terminate() { d.quitLock.Unlock() // Cancel any pending download requests - d.cancel() + d.Cancel() } // fetchHeight retrieves the head header of the remote peer to aid in estimating @@ -945,7 +945,7 @@ func (d *Downloader) fetchNodeData() error { if err != nil { // If the node data processing failed, the root hash is very wrong, abort log.Error("State processing failed", "peer", packet.PeerId(), "err", err) - d.cancel() + d.Cancel() return } // Processing succeeded, notify state fetcher of continuation @@ -1208,7 +1208,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { if atomic.LoadUint32(&d.fsPivotFails) == 0 { for _, header := range rollback { if header.Number.Uint64() == pivot { - log.Warn("Fast-sync critical section failure, locked pivot to header", "number", pivot, "hash", header.Hash()) + log.Warn("Fast-sync pivot locked in", "number", pivot, "hash", header.Hash()) d.fsPivotLock = header } } diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index a9ea797eac..267a0def94 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -982,7 +982,7 @@ func testCancel(t *testing.T, protocol int, mode SyncMode) { tester.newPeer("peer", protocol, hashes, headers, blocks, receipts) // Make sure canceling works with a pristine downloader - tester.downloader.cancel() + tester.downloader.Cancel() if !tester.downloader.queue.Idle() { t.Errorf("download queue not idle") } @@ -990,7 +990,7 @@ func testCancel(t *testing.T, protocol int, mode SyncMode) { if err := tester.sync("peer", nil, mode); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } - tester.downloader.cancel() + tester.downloader.Cancel() if !tester.downloader.queue.Idle() { t.Errorf("download queue not idle") } diff --git a/eth/sync.go b/eth/sync.go index 6e2c7c4320..f2cae6c198 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -175,6 +175,14 @@ func (pm *ProtocolManager) synchronise(peer *peer) { // Otherwise try to sync with the downloader mode := downloader.FullSync if atomic.LoadUint32(&pm.fastSync) == 1 { + // Fast sync was explicitly requested, and explicitly granted + mode = downloader.FastSync + } else if currentBlock.NumberU64() == 0 && pm.blockchain.CurrentFastBlock().NumberU64() > 0 { + // The database seems empty as the current block is the genesis. Yet the fast + // block is ahead, so fast sync was enabled for this node at a certain point. + // The only scenario where this can happen is if the user manually (or via a + // bad block) rolled back a fast sync node below the sync point. In this case + // however it's safe to reenable fast sync. mode = downloader.FastSync } if err := pm.downloader.Synchronise(peer.id, pHead, pTd, mode); err != nil { diff --git a/les/api_backend.go b/les/api_backend.go index 264b381f51..006240369c 100644 --- a/les/api_backend.go +++ b/les/api_backend.go @@ -50,6 +50,7 @@ func (b *LesApiBackend) CurrentBlock() *types.Block { } func (b *LesApiBackend) SetHead(number uint64) { + b.eth.protocolManager.downloader.Cancel() b.eth.blockchain.SetHead(number) }