Merge pull request #948 from karalabe/fix-downlaoder-activepeer-shadow
eth/downloader: fix active peer shadowing, polish func names
This commit is contained in:
commit
7cb0e24245
|
@ -55,10 +55,9 @@ type hashPack struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type Downloader struct {
|
type Downloader struct {
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
queue *queue
|
queue *queue
|
||||||
peers *peerSet
|
peers *peerSet
|
||||||
activePeer string
|
|
||||||
|
|
||||||
// Callbacks
|
// Callbacks
|
||||||
hasBlock hashCheckFn
|
hasBlock hashCheckFn
|
||||||
|
@ -162,7 +161,6 @@ func (d *Downloader) Has(hash common.Hash) bool {
|
||||||
// syncWithPeer starts a block synchronization based on the hash chain from the
|
// syncWithPeer starts a block synchronization based on the hash chain from the
|
||||||
// specified peer and head hash.
|
// specified peer and head hash.
|
||||||
func (d *Downloader) syncWithPeer(p *peer, hash common.Hash) (err error) {
|
func (d *Downloader) syncWithPeer(p *peer, hash common.Hash) (err error) {
|
||||||
d.activePeer = p.id
|
|
||||||
defer func() {
|
defer func() {
|
||||||
// reset on error
|
// reset on error
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -416,32 +414,26 @@ out:
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Deliver a chunk to the downloader. This is usually done through the BlocksMsg by
|
// DeliverBlocks injects a new batch of blocks received from a remote node.
|
||||||
// the protocol handler.
|
// This is usually invoked through the BlocksMsg by the protocol handler.
|
||||||
func (d *Downloader) DeliverChunk(id string, blocks []*types.Block) error {
|
func (d *Downloader) DeliverBlocks(id string, blocks []*types.Block) error {
|
||||||
// Make sure the downloader is active
|
// Make sure the downloader is active
|
||||||
if atomic.LoadInt32(&d.synchronising) == 0 {
|
if atomic.LoadInt32(&d.synchronising) == 0 {
|
||||||
return errNoSyncActive
|
return errNoSyncActive
|
||||||
}
|
}
|
||||||
|
|
||||||
d.blockCh <- blockPack{id, blocks}
|
d.blockCh <- blockPack{id, blocks}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Downloader) AddHashes(id string, hashes []common.Hash) error {
|
// DeliverHashes injects a new batch of hashes received from a remote node into
|
||||||
|
// the download schedule. This is usually invoked through the BlockHashesMsg by
|
||||||
|
// the protocol handler.
|
||||||
|
func (d *Downloader) DeliverHashes(id string, hashes []common.Hash) error {
|
||||||
// Make sure the downloader is active
|
// Make sure the downloader is active
|
||||||
if atomic.LoadInt32(&d.synchronising) == 0 {
|
if atomic.LoadInt32(&d.synchronising) == 0 {
|
||||||
return errNoSyncActive
|
return errNoSyncActive
|
||||||
}
|
}
|
||||||
|
|
||||||
// make sure that the hashes that are being added are actually from the peer
|
|
||||||
// that's the current active peer. hashes that have been received from other
|
|
||||||
// peers are dropped and ignored.
|
|
||||||
if d.activePeer != id {
|
|
||||||
return fmt.Errorf("received hashes from %s while active peer is %s", id, d.activePeer)
|
|
||||||
}
|
|
||||||
|
|
||||||
if glog.V(logger.Debug) && len(hashes) != 0 {
|
if glog.V(logger.Debug) && len(hashes) != 0 {
|
||||||
from, to := hashes[0], hashes[len(hashes)-1]
|
from, to := hashes[0], hashes[len(hashes)-1]
|
||||||
glog.V(logger.Debug).Infof("adding %d (T=%d) hashes [ %x / %x ] from: %s\n", len(hashes), d.queue.Pending(), from[:4], to[:4], id)
|
glog.V(logger.Debug).Infof("adding %d (T=%d) hashes [ %x / %x ] from: %s\n", len(hashes), d.queue.Pending(), from[:4], to[:4], id)
|
||||||
|
|
|
@ -76,7 +76,7 @@ func (dl *downloadTester) getBlock(hash common.Hash) *types.Block {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dl *downloadTester) getHashes(hash common.Hash) error {
|
func (dl *downloadTester) getHashes(hash common.Hash) error {
|
||||||
dl.downloader.AddHashes(dl.activePeerId, dl.hashes)
|
dl.downloader.DeliverHashes(dl.activePeerId, dl.hashes)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -87,7 +87,7 @@ func (dl *downloadTester) getBlocks(id string) func([]common.Hash) error {
|
||||||
blocks[i] = dl.blocks[hash]
|
blocks[i] = dl.blocks[hash]
|
||||||
}
|
}
|
||||||
|
|
||||||
go dl.downloader.DeliverChunk(id, blocks)
|
go dl.downloader.DeliverBlocks(id, blocks)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -188,12 +188,12 @@ func TestInactiveDownloader(t *testing.T) {
|
||||||
blocks := createBlocksFromHashSet(createHashSet(hashes))
|
blocks := createBlocksFromHashSet(createHashSet(hashes))
|
||||||
tester := newTester(t, hashes, nil)
|
tester := newTester(t, hashes, nil)
|
||||||
|
|
||||||
err := tester.downloader.AddHashes("bad peer 001", hashes)
|
err := tester.downloader.DeliverHashes("bad peer 001", hashes)
|
||||||
if err != errNoSyncActive {
|
if err != errNoSyncActive {
|
||||||
t.Error("expected no sync error, got", err)
|
t.Error("expected no sync error, got", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = tester.downloader.DeliverChunk("bad peer 001", blocks)
|
err = tester.downloader.DeliverBlocks("bad peer 001", blocks)
|
||||||
if err != errNoSyncActive {
|
if err != errNoSyncActive {
|
||||||
t.Error("expected no sync error, got", err)
|
t.Error("expected no sync error, got", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -224,7 +224,7 @@ func (self *ProtocolManager) handleMsg(p *peer) error {
|
||||||
if err := msgStream.Decode(&hashes); err != nil {
|
if err := msgStream.Decode(&hashes); err != nil {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
err := self.downloader.AddHashes(p.id, hashes)
|
err := self.downloader.DeliverHashes(p.id, hashes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(logger.Debug).Infoln(err)
|
glog.V(logger.Debug).Infoln(err)
|
||||||
}
|
}
|
||||||
|
@ -264,7 +264,7 @@ func (self *ProtocolManager) handleMsg(p *peer) error {
|
||||||
glog.V(logger.Detail).Infoln("Decode error", err)
|
glog.V(logger.Detail).Infoln("Decode error", err)
|
||||||
blocks = nil
|
blocks = nil
|
||||||
}
|
}
|
||||||
self.downloader.DeliverChunk(p.id, blocks)
|
self.downloader.DeliverBlocks(p.id, blocks)
|
||||||
|
|
||||||
case NewBlockMsg:
|
case NewBlockMsg:
|
||||||
var request newBlockMsgData
|
var request newBlockMsgData
|
||||||
|
|
Loading…
Reference in New Issue