Merge pull request #1287 from karalabe/fix-downloader-cancel-interrupt

eth, eth/downloader: fix processing interrupt caused by temp cancel
This commit is contained in:
Jeffrey Wilcke 2015-06-18 02:50:19 -07:00
commit 53a6145a2b
3 changed files with 60 additions and 64 deletions

View File

@ -33,23 +33,22 @@ var (
) )
var ( var (
errBusy = errors.New("busy") errBusy = errors.New("busy")
errUnknownPeer = errors.New("peer is unknown or unhealthy") errUnknownPeer = errors.New("peer is unknown or unhealthy")
errBadPeer = errors.New("action from bad peer ignored") errBadPeer = errors.New("action from bad peer ignored")
errStallingPeer = errors.New("peer is stalling") errStallingPeer = errors.New("peer is stalling")
errBannedHead = errors.New("peer head hash already banned") errBannedHead = errors.New("peer head hash already banned")
errNoPeers = errors.New("no peers to keep download active") errNoPeers = errors.New("no peers to keep download active")
errPendingQueue = errors.New("pending items in queue") errPendingQueue = errors.New("pending items in queue")
errTimeout = errors.New("timeout") errTimeout = errors.New("timeout")
errEmptyHashSet = errors.New("empty hash set by peer") errEmptyHashSet = errors.New("empty hash set by peer")
errPeersUnavailable = errors.New("no peers available or all peers tried for block download process") errPeersUnavailable = errors.New("no peers available or all peers tried for block download process")
errAlreadyInPool = errors.New("hash already in pool") errAlreadyInPool = errors.New("hash already in pool")
errInvalidChain = errors.New("retrieved hash chain is invalid") errInvalidChain = errors.New("retrieved hash chain is invalid")
errCrossCheckFailed = errors.New("block cross-check failed") errCrossCheckFailed = errors.New("block cross-check failed")
errCancelHashFetch = errors.New("hash fetching canceled (requested)") errCancelHashFetch = errors.New("hash fetching canceled (requested)")
errCancelBlockFetch = errors.New("block downloading canceled (requested)") errCancelBlockFetch = errors.New("block downloading canceled (requested)")
errCancelChainImport = errors.New("chain importing canceled (requested)") errNoSyncActive = errors.New("no sync active")
errNoSyncActive = errors.New("no sync active")
) )
// hashCheckFn is a callback type for verifying a hash's presence in the local chain. // hashCheckFn is a callback type for verifying a hash's presence in the local chain.
@ -87,6 +86,8 @@ type Downloader struct {
checks map[common.Hash]*crossCheck // Pending cross checks to verify a hash chain checks map[common.Hash]*crossCheck // Pending cross checks to verify a hash chain
banned *set.Set // Set of hashes we've received and banned banned *set.Set // Set of hashes we've received and banned
interrupt int32 // Atomic boolean to signal termination
// Statistics // Statistics
importStart time.Time // Instance when the last blocks were taken from the cache importStart time.Time // Instance when the last blocks were taken from the cache
importQueue []*Block // Previously taken blocks to check import progress importQueue []*Block // Previously taken blocks to check import progress
@ -245,12 +246,6 @@ func (d *Downloader) synchronise(id string, hash common.Hash) error {
if atomic.CompareAndSwapInt32(&d.notified, 0, 1) { if atomic.CompareAndSwapInt32(&d.notified, 0, 1) {
glog.V(logger.Info).Infoln("Block synchronisation started") glog.V(logger.Info).Infoln("Block synchronisation started")
} }
// Create cancel channel for aborting mid-flight
d.cancelLock.Lock()
d.cancelCh = make(chan struct{})
d.cancelLock.Unlock()
// Abort if the queue still contains some leftover data // Abort if the queue still contains some leftover data
if _, cached := d.queue.Size(); cached > 0 && d.queue.GetHeadBlock() != nil { if _, cached := d.queue.Size(); cached > 0 && d.queue.GetHeadBlock() != nil {
return errPendingQueue return errPendingQueue
@ -260,12 +255,16 @@ func (d *Downloader) synchronise(id string, hash common.Hash) error {
d.peers.Reset() d.peers.Reset()
d.checks = make(map[common.Hash]*crossCheck) d.checks = make(map[common.Hash]*crossCheck)
// Create cancel channel for aborting mid-flight
d.cancelLock.Lock()
d.cancelCh = make(chan struct{})
d.cancelLock.Unlock()
// Retrieve the origin peer and initiate the downloading process // Retrieve the origin peer and initiate the downloading process
p := d.peers.Peer(id) p := d.peers.Peer(id)
if p == nil { if p == nil {
return errUnknownPeer return errUnknownPeer
} }
return d.syncWithPeer(p, hash) return d.syncWithPeer(p, hash)
} }
@ -282,7 +281,7 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash) (err error) {
defer func() { defer func() {
// reset on error // reset on error
if err != nil { if err != nil {
d.Cancel() d.cancel()
d.mux.Post(FailedEvent{err}) d.mux.Post(FailedEvent{err})
} else { } else {
d.mux.Post(DoneEvent{}) d.mux.Post(DoneEvent{})
@ -301,9 +300,9 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash) (err error) {
return nil return nil
} }
// Cancel cancels all of the operations and resets the queue. It returns true // cancel cancels all of the operations and resets the queue. It returns true
// if the cancel operation was completed. // if the cancel operation was completed.
func (d *Downloader) Cancel() { func (d *Downloader) cancel() {
// Close the current cancel channel // Close the current cancel channel
d.cancelLock.Lock() d.cancelLock.Lock()
if d.cancelCh != nil { if d.cancelCh != nil {
@ -320,6 +319,12 @@ func (d *Downloader) Cancel() {
d.queue.Reset() d.queue.Reset()
} }
// Terminate interrupts the downloader, canceling all pending operations.
func (d *Downloader) Terminate() {
atomic.StoreInt32(&d.interrupt, 1)
d.cancel()
}
// fetchHahes starts retrieving hashes backwards from a specific peer and hash, // fetchHahes starts retrieving hashes backwards from a specific peer and hash,
// up until it finds a common ancestor. If the source peer times out, alternative // up until it finds a common ancestor. If the source peer times out, alternative
// ones are tried for continuation. // ones are tried for continuation.
@ -713,7 +718,7 @@ func (d *Downloader) banBlocks(peerId string, head common.Hash) error {
// between these state changes, a block may have arrived, but a processing // between these state changes, a block may have arrived, but a processing
// attempt denied, so we need to re-enter to ensure the block isn't left // attempt denied, so we need to re-enter to ensure the block isn't left
// to idle in the cache. // to idle in the cache.
func (d *Downloader) process() (err error) { func (d *Downloader) process() {
// Make sure only one goroutine is ever allowed to process blocks at once // Make sure only one goroutine is ever allowed to process blocks at once
if !atomic.CompareAndSwapInt32(&d.processing, 0, 1) { if !atomic.CompareAndSwapInt32(&d.processing, 0, 1) {
return return
@ -723,8 +728,8 @@ func (d *Downloader) process() (err error) {
// the fresh blocks might have been rejected entry to to this present thread // the fresh blocks might have been rejected entry to to this present thread
// not yet releasing the `processing` state. // not yet releasing the `processing` state.
defer func() { defer func() {
if err == nil && d.queue.GetHeadBlock() != nil { if atomic.LoadInt32(&d.interrupt) == 0 && d.queue.GetHeadBlock() != nil {
err = d.process() d.process()
} }
}() }()
// Release the lock upon exit (note, before checking for reentry!), and set // Release the lock upon exit (note, before checking for reentry!), and set
@ -737,18 +742,12 @@ func (d *Downloader) process() (err error) {
atomic.StoreInt32(&d.processing, 0) atomic.StoreInt32(&d.processing, 0)
}() }()
// Fetch the current cancel channel to allow termination
d.cancelLock.RLock()
cancel := d.cancelCh
d.cancelLock.RUnlock()
// Repeat the processing as long as there are blocks to import // Repeat the processing as long as there are blocks to import
for { for {
// Fetch the next batch of blocks // Fetch the next batch of blocks
blocks := d.queue.TakeBlocks() blocks := d.queue.TakeBlocks()
if len(blocks) == 0 { if len(blocks) == 0 {
return nil return
} }
// Reset the import statistics // Reset the import statistics
d.importLock.Lock() d.importLock.Lock()
@ -759,12 +758,10 @@ func (d *Downloader) process() (err error) {
// Actually import the blocks // Actually import the blocks
glog.V(logger.Debug).Infof("Inserting chain with %d blocks (#%v - #%v)\n", len(blocks), blocks[0].RawBlock.Number(), blocks[len(blocks)-1].RawBlock.Number()) glog.V(logger.Debug).Infof("Inserting chain with %d blocks (#%v - #%v)\n", len(blocks), blocks[0].RawBlock.Number(), blocks[len(blocks)-1].RawBlock.Number())
for len(blocks) != 0 { // TODO: quit for len(blocks) != 0 {
// Check for any termination requests // Check for any termination requests
select { if atomic.LoadInt32(&d.interrupt) == 1 {
case <-cancel: return
return errCancelChainImport
default:
} }
// Retrieve the first batch of blocks to insert // Retrieve the first batch of blocks to insert
max := int(math.Min(float64(len(blocks)), float64(maxBlockProcess))) max := int(math.Min(float64(len(blocks)), float64(maxBlockProcess)))
@ -777,8 +774,8 @@ func (d *Downloader) process() (err error) {
if err != nil { if err != nil {
glog.V(logger.Debug).Infof("Block #%d import failed: %v", raw[index].NumberU64(), err) glog.V(logger.Debug).Infof("Block #%d import failed: %v", raw[index].NumberU64(), err)
d.dropPeer(blocks[index].OriginPeer) d.dropPeer(blocks[index].OriginPeer)
d.Cancel() d.cancel()
return errCancelChainImport return
} }
blocks = blocks[max:] blocks = blocks[max:]
} }

View File

@ -247,7 +247,7 @@ func TestCancel(t *testing.T) {
tester.newPeer("peer", hashes, blocks) tester.newPeer("peer", hashes, blocks)
// Make sure canceling works with a pristine downloader // Make sure canceling works with a pristine downloader
tester.downloader.Cancel() tester.downloader.cancel()
hashCount, blockCount := tester.downloader.queue.Size() hashCount, blockCount := tester.downloader.queue.Size()
if hashCount > 0 || blockCount > 0 { if hashCount > 0 || blockCount > 0 {
t.Errorf("block or hash count mismatch: %d hashes, %d blocks, want 0", hashCount, blockCount) t.Errorf("block or hash count mismatch: %d hashes, %d blocks, want 0", hashCount, blockCount)
@ -256,7 +256,7 @@ func TestCancel(t *testing.T) {
if err := tester.sync("peer"); err != nil { if err := tester.sync("peer"); err != nil {
t.Fatalf("failed to synchronise blocks: %v", err) t.Fatalf("failed to synchronise blocks: %v", err)
} }
tester.downloader.Cancel() tester.downloader.cancel()
hashCount, blockCount = tester.downloader.queue.Size() hashCount, blockCount = tester.downloader.queue.Size()
if hashCount > 0 || blockCount > 0 { if hashCount > 0 || blockCount > 0 {
t.Errorf("block or hash count mismatch: %d hashes, %d blocks, want 0", hashCount, blockCount) t.Errorf("block or hash count mismatch: %d hashes, %d blocks, want 0", hashCount, blockCount)
@ -359,7 +359,7 @@ func TestSlowSynchronisation(t *testing.T) {
// Create a batch of blocks, with a slow and a full speed peer // Create a batch of blocks, with a slow and a full speed peer
targetCycles := 2 targetCycles := 2
targetBlocks := targetCycles*blockCacheLimit - 15 targetBlocks := targetCycles*blockCacheLimit - 15
targetIODelay := 500 * time.Millisecond targetIODelay := time.Second
hashes := createHashes(targetBlocks, knownHash) hashes := createHashes(targetBlocks, knownHash)
blocks := createBlocksFromHashes(hashes) blocks := createBlocksFromHashes(hashes)
@ -749,22 +749,21 @@ func TestHashAttackerDropping(t *testing.T) {
result error result error
drop bool drop bool
}{ }{
{nil, false}, // Sync succeeded, all is well {nil, false}, // Sync succeeded, all is well
{errBusy, false}, // Sync is already in progress, no problem {errBusy, false}, // Sync is already in progress, no problem
{errUnknownPeer, false}, // Peer is unknown, was already dropped, don't double drop {errUnknownPeer, false}, // Peer is unknown, was already dropped, don't double drop
{errBadPeer, true}, // Peer was deemed bad for some reason, drop it {errBadPeer, true}, // Peer was deemed bad for some reason, drop it
{errStallingPeer, true}, // Peer was detected to be stalling, drop it {errStallingPeer, true}, // Peer was detected to be stalling, drop it
{errBannedHead, true}, // Peer's head hash is a known bad hash, drop it {errBannedHead, true}, // Peer's head hash is a known bad hash, drop it
{errNoPeers, false}, // No peers to download from, soft race, no issue {errNoPeers, false}, // No peers to download from, soft race, no issue
{errPendingQueue, false}, // There are blocks still cached, wait to exhaust, no issue {errPendingQueue, false}, // There are blocks still cached, wait to exhaust, no issue
{errTimeout, true}, // No hashes received in due time, drop the peer {errTimeout, true}, // No hashes received in due time, drop the peer
{errEmptyHashSet, true}, // No hashes were returned as a response, drop as it's a dead end {errEmptyHashSet, true}, // No hashes were returned as a response, drop as it's a dead end
{errPeersUnavailable, true}, // Nobody had the advertised blocks, drop the advertiser {errPeersUnavailable, true}, // Nobody had the advertised blocks, drop the advertiser
{errInvalidChain, true}, // Hash chain was detected as invalid, definitely drop {errInvalidChain, true}, // Hash chain was detected as invalid, definitely drop
{errCrossCheckFailed, true}, // Hash-origin failed to pass a block cross check, drop {errCrossCheckFailed, true}, // Hash-origin failed to pass a block cross check, drop
{errCancelHashFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop {errCancelHashFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
{errCancelBlockFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop {errCancelBlockFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
{errCancelChainImport, false}, // Synchronisation was canceled, origin may be innocent, don't drop
} }
// Run the tests and check disconnection status // Run the tests and check disconnection status
tester := newTester() tester := newTester()

View File

@ -251,7 +251,7 @@ func (pm *ProtocolManager) fetcher() {
// downloading hashes and blocks as well as retrieving cached ones. // downloading hashes and blocks as well as retrieving cached ones.
func (pm *ProtocolManager) syncer() { func (pm *ProtocolManager) syncer() {
// Abort any pending syncs if we terminate // Abort any pending syncs if we terminate
defer pm.downloader.Cancel() defer pm.downloader.Terminate()
forceSync := time.Tick(forceSyncCycle) forceSync := time.Tick(forceSyncCycle)
for { for {