From 66d3dc8690e0aa551e7b35a17006a2135b51c9bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Thu, 11 Jun 2015 15:56:08 +0300 Subject: [PATCH] eth, eth/downloader: move peer removal into downloader --- eth/backend.go | 8 ++- eth/downloader/downloader.go | 87 +++++++++++++++++++++---------- eth/downloader/downloader_test.go | 32 ++++++------ eth/downloader/queue.go | 2 +- eth/handler.go | 4 +- eth/sync.go | 32 +----------- 6 files changed, 83 insertions(+), 82 deletions(-) diff --git a/eth/backend.go b/eth/backend.go index d2ec0cc62b..4ebf21811b 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -193,7 +193,6 @@ type Ethereum struct { whisper *whisper.Whisper pow *ethash.Ethash protocolManager *ProtocolManager - downloader *downloader.Downloader SolcPath string solc *compiler.Solidity @@ -290,14 +289,13 @@ func New(config *Config) (*Ethereum, error) { if err != nil { return nil, err } - eth.downloader = downloader.New(eth.EventMux(), eth.chainManager.HasBlock, eth.chainManager.GetBlock) eth.txPool = core.NewTxPool(eth.EventMux(), eth.chainManager.State, eth.chainManager.GasLimit) eth.blockProcessor = core.NewBlockProcessor(stateDb, extraDb, eth.pow, eth.chainManager, eth.EventMux()) eth.chainManager.SetProcessor(eth.blockProcessor) + eth.protocolManager = NewProtocolManager(config.ProtocolVersion, config.NetworkId, eth.eventMux, eth.txPool, eth.chainManager) + eth.miner = miner.New(eth, eth.EventMux(), eth.pow) eth.miner.SetGasPrice(config.GasPrice) - - eth.protocolManager = NewProtocolManager(config.ProtocolVersion, config.NetworkId, eth.eventMux, eth.txPool, eth.chainManager, eth.downloader) if config.Shh { eth.whisper = whisper.New() eth.shhVersionId = int(eth.whisper.Version()) @@ -447,7 +445,7 @@ func (s *Ethereum) ClientVersion() string { return s.clientVersio func (s *Ethereum) EthVersion() int { return s.ethVersionId } func (s *Ethereum) NetVersion() int { return s.netVersionId } func (s *Ethereum) ShhVersion() int { return s.shhVersionId } -func (s *Ethereum) Downloader() *downloader.Downloader { return s.downloader } +func (s *Ethereum) Downloader() *downloader.Downloader { return s.protocolManager.downloader } // Start the ethereum func (s *Ethereum) Start() error { diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index f0a515d12a..499b3a5854 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -32,28 +32,32 @@ var ( var ( errLowTd = errors.New("peers TD is too low") - ErrBusy = errors.New("busy") + errBusy = errors.New("busy") errUnknownPeer = errors.New("peer is unknown or unhealthy") - ErrBadPeer = errors.New("action from bad peer ignored") - ErrStallingPeer = errors.New("peer is stalling") + errBadPeer = errors.New("action from bad peer ignored") + errStallingPeer = errors.New("peer is stalling") errBannedHead = errors.New("peer head hash already banned") errNoPeers = errors.New("no peers to keep download active") - ErrPendingQueue = errors.New("pending items in queue") - ErrTimeout = errors.New("timeout") - ErrEmptyHashSet = errors.New("empty hash set by peer") + errPendingQueue = errors.New("pending items in queue") + errTimeout = errors.New("timeout") + errEmptyHashSet = errors.New("empty hash set by peer") errPeersUnavailable = errors.New("no peers available or all peers tried for block download process") errAlreadyInPool = errors.New("hash already in pool") - ErrInvalidChain = errors.New("retrieved hash chain is invalid") - ErrCrossCheckFailed = errors.New("block cross-check failed") + errInvalidChain = errors.New("retrieved hash chain is invalid") + errCrossCheckFailed = errors.New("block cross-check failed") errCancelHashFetch = errors.New("hash fetching cancelled (requested)") errCancelBlockFetch = errors.New("block downloading cancelled (requested)") errNoSyncActive = errors.New("no sync active") ) +// hashCheckFn is a callback type for verifying a hash's presence in the local chain. type hashCheckFn func(common.Hash) bool -type getBlockFn func(common.Hash) *types.Block -type chainInsertFn func(types.Blocks) (int, error) -type hashIterFn func() (common.Hash, error) + +// blockRetrievalFn is a callback type for retrieving a block from the local chain. +type blockRetrievalFn func(common.Hash) *types.Block + +// peerDropFn is a callback type for dropping a peer detected as malicious. +type peerDropFn func(id string) type blockPack struct { peerId string @@ -85,8 +89,9 @@ type Downloader struct { importLock sync.Mutex // Callbacks - hasBlock hashCheckFn - getBlock getBlockFn + hasBlock hashCheckFn // Checks if a block is present in the chain + getBlock blockRetrievalFn // Retrieves a block from the chain + dropPeer peerDropFn // Retrieved the TD of our own chain // Status synchronising int32 @@ -107,7 +112,8 @@ type Block struct { OriginPeer string } -func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock getBlockFn) *Downloader { +// New creates a new downloader to fetch hashes and blocks from remote peers. +func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock blockRetrievalFn, dropPeer peerDropFn) *Downloader { // Create the base downloader downloader := &Downloader{ mux: mux, @@ -115,6 +121,7 @@ func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock getBlockFn) *Downloa peers: newPeerSet(), hasBlock: hasBlock, getBlock: getBlock, + dropPeer: dropPeer, newPeerCh: make(chan *peer, 1), hashCh: make(chan hashPack, 1), blockCh: make(chan blockPack, 1), @@ -183,19 +190,43 @@ func (d *Downloader) UnregisterPeer(id string) error { return nil } -// Synchronise will select the peer and use it for synchronising. If an empty string is given +// Synchronise tries to sync up our local block chain with a remote peer, both +// adding various sanity checks as well as wrapping it with various log entries. +func (d *Downloader) Synchronise(id string, head common.Hash) { + glog.V(logger.Detail).Infof("Attempting synchronisation: %v, 0x%x", id, head) + + switch err := d.synchronise(id, head); err { + case nil: + glog.V(logger.Detail).Infof("Synchronisation completed") + + case errBusy: + glog.V(logger.Detail).Infof("Synchronisation already in progress") + + case errTimeout, errBadPeer, errEmptyHashSet, errInvalidChain, errCrossCheckFailed: + glog.V(logger.Debug).Infof("Removing peer %v: %v", id, err) + d.dropPeer(id) + + case errPendingQueue: + glog.V(logger.Debug).Infoln("Synchronisation aborted:", err) + + default: + glog.V(logger.Warn).Infof("Synchronisation failed: %v", err) + } +} + +// synchronise will select the peer and use it for synchronising. If an empty string is given // it will use the best peer possible and synchronize if it's TD is higher than our own. If any of the // checks fail an error will be returned. This method is synchronous -func (d *Downloader) Synchronise(id string, hash common.Hash) error { +func (d *Downloader) synchronise(id string, hash common.Hash) error { // Make sure only one goroutine is ever allowed past this point at once if !atomic.CompareAndSwapInt32(&d.synchronising, 0, 1) { - return ErrBusy + return errBusy } defer atomic.StoreInt32(&d.synchronising, 0) // If the head hash is banned, terminate immediately if d.banned.Has(hash) { - return ErrInvalidChain + return errInvalidChain } // Post a user notification of the sync (only once per session) if atomic.CompareAndSwapInt32(&d.notified, 0, 1) { @@ -209,7 +240,7 @@ func (d *Downloader) Synchronise(id string, hash common.Hash) error { // Abort if the queue still contains some leftover data if _, cached := d.queue.Size(); cached > 0 && d.queue.GetHeadBlock() != nil { - return ErrPendingQueue + return errPendingQueue } // Reset the queue and peer set to clean any internal leftover state d.queue.Reset() @@ -342,7 +373,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { // Make sure the peer actually gave something valid if len(hashPack.hashes) == 0 { glog.V(logger.Debug).Infof("Peer (%s) responded with empty hash set", active.id) - return ErrEmptyHashSet + return errEmptyHashSet } for index, hash := range hashPack.hashes { if d.banned.Has(hash) { @@ -352,7 +383,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { if err := d.banBlocks(active.id, hash); err != nil { glog.V(logger.Debug).Infof("Failed to ban batch of blocks: %v", err) } - return ErrInvalidChain + return errInvalidChain } } // Determine if we're done fetching hashes (queue up all pending), and continue if not done @@ -369,12 +400,12 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { inserts := d.queue.Insert(hashPack.hashes) if len(inserts) == 0 && !done { glog.V(logger.Debug).Infof("Peer (%s) responded with stale hashes", active.id) - return ErrBadPeer + return errBadPeer } if !done { // Check that the peer is not stalling the sync if len(inserts) < MinHashFetch { - return ErrStallingPeer + return errStallingPeer } // Try and fetch a random block to verify the hash batch // Skip the last hash as the cross check races with the next hash fetch @@ -408,7 +439,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { block := blockPack.blocks[0] if check, ok := d.checks[block.Hash()]; ok { if block.ParentHash() != check.parent { - return ErrCrossCheckFailed + return errCrossCheckFailed } delete(d.checks, block.Hash()) } @@ -418,7 +449,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { for hash, check := range d.checks { if time.Now().After(check.expire) { glog.V(logger.Debug).Infof("Cross check timeout for %x", hash) - return ErrCrossCheckFailed + return errCrossCheckFailed } } @@ -438,7 +469,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { // if all peers have been tried, abort the process entirely or if the hash is // the zero hash. if p == nil || (head == common.Hash{}) { - return ErrTimeout + return errTimeout } // set p to the active peer. this will invalidate any hashes that may be returned // by our previous (delayed) peer. @@ -500,7 +531,7 @@ out: peer.SetIdle() glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blockPack.blocks)) - case ErrInvalidChain: + case errInvalidChain: // The hash chain is invalid (blocks are not ordered properly), abort return err @@ -617,7 +648,7 @@ func (d *Downloader) banBlocks(peerId string, head common.Hash) error { return errCancelBlockFetch case <-timeout: - return ErrTimeout + return errTimeout case <-d.hashCh: // Out of bounds hashes received, ignore them diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 5f10fb41f7..5e79c10c95 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -73,7 +73,7 @@ func newTester(t *testing.T, hashes []common.Hash, blocks map[common.Hash]*types done: make(chan bool), } var mux event.TypeMux - downloader := New(&mux, tester.hasBlock, tester.getBlock) + downloader := New(&mux, tester.hasBlock, tester.getBlock, nil) tester.downloader = downloader return tester @@ -83,7 +83,7 @@ func newTester(t *testing.T, hashes []common.Hash, blocks map[common.Hash]*types // block until it returns func (dl *downloadTester) sync(peerId string, head common.Hash) error { dl.activePeerId = peerId - return dl.downloader.Synchronise(peerId, head) + return dl.downloader.synchronise(peerId, head) } // syncTake is starts synchronising with a remote peer, but concurrently it also @@ -415,8 +415,8 @@ func TestInvalidHashOrderAttack(t *testing.T) { // Try and sync with the malicious node and check that it fails tester := newTester(t, reverse, blocks) tester.newPeer("attack", big.NewInt(10000), reverse[0]) - if _, err := tester.syncTake("attack", reverse[0]); err != ErrInvalidChain { - t.Fatalf("synchronisation error mismatch: have %v, want %v", err, ErrInvalidChain) + if _, err := tester.syncTake("attack", reverse[0]); err != errInvalidChain { + t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errInvalidChain) } // Ensure that a valid chain can still pass sync tester.hashes = hashes @@ -438,8 +438,8 @@ func TestMadeupHashChainAttack(t *testing.T) { // Try and sync with the malicious node and check that it fails tester := newTester(t, hashes, nil) tester.newPeer("attack", big.NewInt(10000), hashes[0]) - if _, err := tester.syncTake("attack", hashes[0]); err != ErrCrossCheckFailed { - t.Fatalf("synchronisation error mismatch: have %v, want %v", err, ErrCrossCheckFailed) + if _, err := tester.syncTake("attack", hashes[0]); err != errCrossCheckFailed { + t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errCrossCheckFailed) } } @@ -455,8 +455,8 @@ func TestMadeupHashChainDrippingAttack(t *testing.T) { // Try and sync with the attacker, one hash at a time tester.maxHashFetch = 1 tester.newPeer("attack", big.NewInt(10000), hashes[0]) - if _, err := tester.syncTake("attack", hashes[0]); err != ErrStallingPeer { - t.Fatalf("synchronisation error mismatch: have %v, want %v", err, ErrStallingPeer) + if _, err := tester.syncTake("attack", hashes[0]); err != errStallingPeer { + t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errStallingPeer) } } @@ -480,8 +480,8 @@ func TestMadeupBlockChainAttack(t *testing.T) { // Try and sync with the malicious node and check that it fails tester := newTester(t, gapped, blocks) tester.newPeer("attack", big.NewInt(10000), gapped[0]) - if _, err := tester.syncTake("attack", gapped[0]); err != ErrCrossCheckFailed { - t.Fatalf("synchronisation error mismatch: have %v, want %v", err, ErrCrossCheckFailed) + if _, err := tester.syncTake("attack", gapped[0]); err != errCrossCheckFailed { + t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errCrossCheckFailed) } // Ensure that a valid chain can still pass sync blockSoftTTL = defaultBlockTTL @@ -514,8 +514,8 @@ func TestMadeupParentBlockChainAttack(t *testing.T) { // Try and sync with the malicious node and check that it fails tester := newTester(t, hashes, forges) tester.newPeer("attack", big.NewInt(10000), hashes[0]) - if _, err := tester.syncTake("attack", hashes[0]); err != ErrCrossCheckFailed { - t.Fatalf("synchronisation error mismatch: have %v, want %v", err, ErrCrossCheckFailed) + if _, err := tester.syncTake("attack", hashes[0]); err != errCrossCheckFailed { + t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errCrossCheckFailed) } // Ensure that a valid chain can still pass sync blockSoftTTL = defaultBlockTTL @@ -547,8 +547,8 @@ func TestBannedChainStarvationAttack(t *testing.T) { tester.newPeer("attack", big.NewInt(10000), hashes[0]) for banned := tester.downloader.banned.Size(); ; { // Try to sync with the attacker, check hash chain failure - if _, err := tester.syncTake("attack", hashes[0]); err != ErrInvalidChain { - t.Fatalf("synchronisation error mismatch: have %v, want %v", err, ErrInvalidChain) + if _, err := tester.syncTake("attack", hashes[0]); err != errInvalidChain { + t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errInvalidChain) } // Check that the ban list grew with at least 1 new item, or all banned bans := tester.downloader.banned.Size() @@ -592,8 +592,8 @@ func TestBannedChainMemoryExhaustionAttack(t *testing.T) { tester.newPeer("attack", big.NewInt(10000), hashes[0]) for { // Try to sync with the attacker, check hash chain failure - if _, err := tester.syncTake("attack", hashes[0]); err != ErrInvalidChain { - t.Fatalf("synchronisation error mismatch: have %v, want %v", err, ErrInvalidChain) + if _, err := tester.syncTake("attack", hashes[0]); err != errInvalidChain { + t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errInvalidChain) } // Short circuit if the entire chain was banned if tester.downloader.banned.Has(hashes[0]) { diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index 7abbd42fd3..903f043eb3 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -320,7 +320,7 @@ func (q *queue) Deliver(id string, blocks []*types.Block) (err error) { // If a requested block falls out of the range, the hash chain is invalid index := int(block.NumberU64()) - q.blockOffset if index >= len(q.blockCache) || index < 0 { - return ErrInvalidChain + return errInvalidChain } // Otherwise merge the block and mark the hash block q.blockCache[index] = &Block{ diff --git a/eth/handler.go b/eth/handler.go index f002727f3f..ac7fb8fcf0 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -68,12 +68,11 @@ type ProtocolManager struct { // NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable // with the ethereum network. -func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpool txPool, chainman *core.ChainManager, downloader *downloader.Downloader) *ProtocolManager { +func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpool txPool, chainman *core.ChainManager) *ProtocolManager { manager := &ProtocolManager{ eventMux: mux, txpool: txpool, chainman: chainman, - downloader: downloader, peers: newPeerSet(), newPeerCh: make(chan *peer, 1), newHashCh: make(chan []*blockAnnounce, 1), @@ -81,6 +80,7 @@ func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpo txsyncCh: make(chan *txsync), quitSync: make(chan struct{}), } + manager.downloader = downloader.New(manager.eventMux, manager.chainman.HasBlock, manager.chainman.GetBlock, manager.removePeer) manager.SubProtocol = p2p.Protocol{ Name: "eth", Version: uint(protocolVersion), diff --git a/eth/sync.go b/eth/sync.go index 8fee21d7b4..b127ca979f 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -8,7 +8,6 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/p2p/discover" @@ -332,33 +331,6 @@ func (pm *ProtocolManager) synchronise(peer *peer) { if peer.Td().Cmp(pm.chainman.Td()) <= 0 { return } - // FIXME if we have the hash in our chain and the TD of the peer is - // much higher than ours, something is wrong with us or the peer. - // Check if the hash is on our own chain - head := peer.Head() - if pm.chainman.HasBlock(head) { - glog.V(logger.Debug).Infoln("Synchronisation canceled: head already known") - return - } - // Get the hashes from the peer (synchronously) - glog.V(logger.Detail).Infof("Attempting synchronisation: %v, 0x%x", peer.id, head) - - err := pm.downloader.Synchronise(peer.id, head) - switch err { - case nil: - glog.V(logger.Detail).Infof("Synchronisation completed") - - case downloader.ErrBusy: - glog.V(logger.Detail).Infof("Synchronisation already in progress") - - case downloader.ErrTimeout, downloader.ErrBadPeer, downloader.ErrEmptyHashSet, downloader.ErrInvalidChain, downloader.ErrCrossCheckFailed: - glog.V(logger.Debug).Infof("Removing peer %v: %v", peer.id, err) - pm.removePeer(peer.id) - - case downloader.ErrPendingQueue: - glog.V(logger.Debug).Infoln("Synchronisation aborted:", err) - - default: - glog.V(logger.Warn).Infof("Synchronisation failed: %v", err) - } + // Otherwise try to sync with the downloader + pm.downloader.Synchronise(peer.id, peer.Head()) }