From a5eee8d1dc61352c29b9800eaf96609ba4184fd6 Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Fri, 29 May 2020 11:12:43 +0200 Subject: [PATCH] eth/downloader: more context in errors (#21067) This PR makes use of go 1.13 error handling, wrapping errors and using errors.Is to check a wrapped root-cause. It also removes the travis builders for go 1.11 and go 1.12. --- .travis.yml | 20 ----------------- eth/downloader/downloader.go | 37 ++++++++++++++++++++++--------- eth/downloader/downloader_test.go | 27 +++++++++++++--------- eth/downloader/queue.go | 18 ++++++++------- 4 files changed, 52 insertions(+), 50 deletions(-) diff --git a/.travis.yml b/.travis.yml index 6f4483c4b0..1b61667c88 100644 --- a/.travis.yml +++ b/.travis.yml @@ -24,26 +24,6 @@ jobs: script: - go run build/ci.go lint - - stage: build - os: linux - dist: xenial - go: 1.11.x - env: - - GO111MODULE=on - script: - - go run build/ci.go install - - go run build/ci.go test -coverage $TEST_PACKAGES - - - stage: build - os: linux - dist: xenial - go: 1.12.x - env: - - GO111MODULE=on - script: - - go run build/ci.go install - - go run build/ci.go test -coverage $TEST_PACKAGES - - stage: build os: linux dist: xenial diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 0a42a20986..71a626632f 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -321,13 +321,28 @@ func (d *Downloader) UnregisterPeer(id string) error { // adding various sanity checks as well as wrapping it with various log entries. func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode SyncMode) error { err := d.synchronise(id, head, td, mode) - switch err { - case nil: - case errBusy, errCanceled: + switch err { + case nil, errBusy, errCanceled: + return err + } + + if errors.Is(err, errInvalidChain) { + log.Warn("Synchronisation failed, dropping peer", "peer", id, "err", err) + if d.dropPeer == nil { + // The dropPeer method is nil when `--copydb` is used for a local copy. + // Timeouts can occur if e.g. compaction hits at the wrong time, and can be ignored + log.Warn("Downloader wants to drop peer, but peerdrop-function is not set", "peer", id) + } else { + d.dropPeer(id) + } + return err + } + + switch err { case errTimeout, errBadPeer, errStallingPeer, errUnsyncedPeer, errEmptyHeaderSet, errPeersUnavailable, errTooOld, - errInvalidAncestor, errInvalidChain: + errInvalidAncestor: log.Warn("Synchronisation failed, dropping peer", "peer", id, "err", err) if d.dropPeer == nil { // The dropPeer method is nil when `--copydb` is used for a local copy. @@ -774,7 +789,7 @@ func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header) expectNumber := from + int64(i)*int64(skip+1) if number := header.Number.Int64(); number != expectNumber { p.log.Warn("Head headers broke chain ordering", "index", i, "requested", expectNumber, "received", number) - return 0, errInvalidChain + return 0, fmt.Errorf("%w: %v", errInvalidChain, errors.New("head headers broke chain ordering")) } } // Check if a common ancestor was found @@ -988,7 +1003,7 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64) filled, proced, err := d.fillHeaderSkeleton(from, headers) if err != nil { p.log.Debug("Skeleton chain invalid", "err", err) - return errInvalidChain + return fmt.Errorf("%w: %v", errInvalidChain, err) } headers = filled[proced:] from += uint64(proced) @@ -1207,13 +1222,13 @@ func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack) if peer := d.peers.Peer(packet.PeerId()); peer != nil { // Deliver the received chunk of data and check chain validity accepted, err := deliver(packet) - if err == errInvalidChain { + if errors.Is(err, errInvalidChain) { return err } // Unless a peer delivered something completely else than requested (usually // caused by a timed out request which came through in the end), set it to // idle. If the delivery's stale, the peer should have already been idled. - if err != errStaleDelivery { + if !errors.Is(err, errStaleDelivery) { setIdle(peer, accepted) } // Issue a log to the user to see what's going on @@ -1473,7 +1488,7 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er rollback = append(rollback, chunk[:n]...) } log.Debug("Invalid header encountered", "number", chunk[n].Number, "hash", chunk[n].Hash(), "err", err) - return errInvalidChain + return fmt.Errorf("%w: %v", errInvalidChain, err) } // All verifications passed, store newly found uncertain headers rollback = append(rollback, unknown...) @@ -1565,7 +1580,7 @@ func (d *Downloader) importBlockResults(results []*fetchResult) error { // of the blocks delivered from the downloader, and the indexing will be off. log.Debug("Downloaded item processing failed on sidechain import", "index", index, "err", err) } - return errInvalidChain + return fmt.Errorf("%w: %v", errInvalidChain, err) } return nil } @@ -1706,7 +1721,7 @@ func (d *Downloader) commitFastSyncData(results []*fetchResult, stateSync *state } if index, err := d.blockchain.InsertReceiptChain(blocks, receipts, d.ancientLimit); err != nil { log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err) - return errInvalidChain + return fmt.Errorf("%w: %v", errInvalidChain, err) } return nil } diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 05ab4f7810..1b9f00c695 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -242,27 +242,32 @@ func (dl *downloadTester) GetTd(hash common.Hash, number uint64) *big.Int { func (dl *downloadTester) InsertHeaderChain(headers []*types.Header, checkFreq int) (i int, err error) { dl.lock.Lock() defer dl.lock.Unlock() - // Do a quick check, as the blockchain.InsertHeaderChain doesn't insert anything in case of errors if _, ok := dl.ownHeaders[headers[0].ParentHash]; !ok { - return 0, errors.New("unknown parent") + return 0, errors.New("InsertHeaderChain: unknown parent at first position") } + var hashes []common.Hash for i := 1; i < len(headers); i++ { + hash := headers[i-1].Hash() if headers[i].ParentHash != headers[i-1].Hash() { - return i, errors.New("unknown parent") + return i, fmt.Errorf("non-contiguous import at position %d", i) } + hashes = append(hashes, hash) } + hashes = append(hashes, headers[len(headers)-1].Hash()) // Do a full insert if pre-checks passed for i, header := range headers { - if _, ok := dl.ownHeaders[header.Hash()]; ok { + hash := hashes[i] + if _, ok := dl.ownHeaders[hash]; ok { continue } if _, ok := dl.ownHeaders[header.ParentHash]; !ok { - return i, errors.New("unknown parent") + // This _should_ be impossible, due to precheck and induction + return i, fmt.Errorf("InsertHeaderChain: unknown parent at position %d", i) } - dl.ownHashes = append(dl.ownHashes, header.Hash()) - dl.ownHeaders[header.Hash()] = header - dl.ownChainTd[header.Hash()] = new(big.Int).Add(dl.ownChainTd[header.ParentHash], header.Difficulty) + dl.ownHashes = append(dl.ownHashes, hash) + dl.ownHeaders[hash] = header + dl.ownChainTd[hash] = new(big.Int).Add(dl.ownChainTd[header.ParentHash], header.Difficulty) } return len(headers), nil } @@ -274,9 +279,9 @@ func (dl *downloadTester) InsertChain(blocks types.Blocks) (i int, err error) { for i, block := range blocks { if parent, ok := dl.ownBlocks[block.ParentHash()]; !ok { - return i, errors.New("unknown parent") + return i, fmt.Errorf("InsertChain: unknown parent at position %d / %d", i, len(blocks)) } else if _, err := dl.stateDb.Get(parent.Root().Bytes()); err != nil { - return i, fmt.Errorf("unknown parent state %x: %v", parent.Root(), err) + return i, fmt.Errorf("InsertChain: unknown parent state %x: %v", parent.Root(), err) } if _, ok := dl.ownHeaders[block.Hash()]; !ok { dl.ownHashes = append(dl.ownHashes, block.Hash()) @@ -301,7 +306,7 @@ func (dl *downloadTester) InsertReceiptChain(blocks types.Blocks, receipts []typ } if _, ok := dl.ancientBlocks[blocks[i].ParentHash()]; !ok { if _, ok := dl.ownBlocks[blocks[i].ParentHash()]; !ok { - return i, errors.New("unknown parent") + return i, errors.New("InsertReceiptChain: unknown parent") } } if blocks[i].NumberU64() <= ancientLimit { diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index 6ce0cbd515..9aea102539 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -509,7 +509,7 @@ func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common index := int(header.Number.Int64() - int64(q.resultOffset)) if index >= len(q.resultCache) || index < 0 { common.Report("index allocation went beyond available resultCache space") - return nil, false, errInvalidChain + return nil, false, fmt.Errorf("%w: index allocation went beyond available resultCache space", errInvalidChain) } if q.resultCache[index] == nil { components := 1 @@ -863,14 +863,16 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ q.active.Signal() } // If none of the data was good, it's a stale delivery - switch { - case failure == nil || failure == errInvalidChain: - return accepted, failure - case useful: - return accepted, fmt.Errorf("partial failure: %v", failure) - default: - return accepted, errStaleDelivery + if failure == nil { + return accepted, nil } + if errors.Is(failure, errInvalidChain) { + return accepted, failure + } + if useful { + return accepted, fmt.Errorf("partial failure: %v", failure) + } + return accepted, fmt.Errorf("%w: %v", failure, errStaleDelivery) } // Prepare configures the result cache to allow accepting and caching inbound