eth/downloader: add fast and light sync strategies

This commit is contained in:
Péter Szilágyi 2015-09-28 19:27:31 +03:00
parent c33cc382b3
commit f186b39018
11 changed files with 1456 additions and 828 deletions

View File

@ -140,13 +140,12 @@ func GetBlockReceipts(db ethdb.Database, hash common.Hash) types.Receipts {
if len(data) == 0 { if len(data) == 0 {
return nil return nil
} }
receipts := new(types.Receipts)
var receipts types.Receipts if err := rlp.DecodeBytes(data, receipts); err != nil {
err := rlp.DecodeBytes(data, &receipts) glog.V(logger.Error).Infof("invalid receipt array RLP for hash %x: %v", hash, err)
if err != nil { return nil
glog.V(logger.Core).Infoln("GetReceiptse err", err)
} }
return receipts return *receipts
} }
// PutBlockReceipts stores the block's transactions associated receipts // PutBlockReceipts stores the block's transactions associated receipts

View File

@ -172,8 +172,8 @@ type storageblock struct {
} }
var ( var (
emptyRootHash = DeriveSha(Transactions{}) EmptyRootHash = DeriveSha(Transactions{})
emptyUncleHash = CalcUncleHash(nil) EmptyUncleHash = CalcUncleHash(nil)
) )
// NewBlock creates a new block. The input data is copied, // NewBlock creates a new block. The input data is copied,
@ -188,7 +188,7 @@ func NewBlock(header *Header, txs []*Transaction, uncles []*Header, receipts []*
// TODO: panic if len(txs) != len(receipts) // TODO: panic if len(txs) != len(receipts)
if len(txs) == 0 { if len(txs) == 0 {
b.header.TxHash = emptyRootHash b.header.TxHash = EmptyRootHash
} else { } else {
b.header.TxHash = DeriveSha(Transactions(txs)) b.header.TxHash = DeriveSha(Transactions(txs))
b.transactions = make(Transactions, len(txs)) b.transactions = make(Transactions, len(txs))
@ -196,7 +196,7 @@ func NewBlock(header *Header, txs []*Transaction, uncles []*Header, receipts []*
} }
if len(receipts) == 0 { if len(receipts) == 0 {
b.header.ReceiptHash = emptyRootHash b.header.ReceiptHash = EmptyRootHash
} else { } else {
b.header.ReceiptHash = DeriveSha(Receipts(receipts)) b.header.ReceiptHash = DeriveSha(Receipts(receipts))
b.header.Bloom = CreateBloom(receipts) b.header.Bloom = CreateBloom(receipts)
@ -205,7 +205,7 @@ func NewBlock(header *Header, txs []*Transaction, uncles []*Header, receipts []*
} }
if len(uncles) == 0 { if len(uncles) == 0 {
b.header.UncleHash = emptyUncleHash b.header.UncleHash = EmptyUncleHash
} else { } else {
b.header.UncleHash = CalcUncleHash(uncles) b.header.UncleHash = CalcUncleHash(uncles)
b.uncles = make([]*Header, len(uncles)) b.uncles = make([]*Header, len(uncles))

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -42,4 +42,9 @@ var (
bodyReqTimer = metrics.NewTimer("eth/downloader/bodies/req") bodyReqTimer = metrics.NewTimer("eth/downloader/bodies/req")
bodyDropMeter = metrics.NewMeter("eth/downloader/bodies/drop") bodyDropMeter = metrics.NewMeter("eth/downloader/bodies/drop")
bodyTimeoutMeter = metrics.NewMeter("eth/downloader/bodies/timeout") bodyTimeoutMeter = metrics.NewMeter("eth/downloader/bodies/timeout")
receiptInMeter = metrics.NewMeter("eth/downloader/receipts/in")
receiptReqTimer = metrics.NewTimer("eth/downloader/receipts/req")
receiptDropMeter = metrics.NewMeter("eth/downloader/receipts/drop")
receiptTimeoutMeter = metrics.NewMeter("eth/downloader/receipts/timeout")
) )

26
eth/downloader/modes.go Normal file
View File

@ -0,0 +1,26 @@
// Copyright 2015 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package downloader
// SyncMode represents the synchronisation mode of the downloader.
type SyncMode int
const (
FullSync SyncMode = iota // Synchronise the entire block-chain history from full blocks
FastSync // Quikcly download the headers, full sync only at the chain head
LightSync // Download only the headers and terminate afterwards
)

View File

@ -36,10 +36,11 @@ type relativeHashFetcherFn func(common.Hash) error
type absoluteHashFetcherFn func(uint64, int) error type absoluteHashFetcherFn func(uint64, int) error
type blockFetcherFn func([]common.Hash) error type blockFetcherFn func([]common.Hash) error
// Block header and body fethers belonging to eth/62 and above // Block header and body fetchers belonging to eth/62 and above
type relativeHeaderFetcherFn func(common.Hash, int, int, bool) error type relativeHeaderFetcherFn func(common.Hash, int, int, bool) error
type absoluteHeaderFetcherFn func(uint64, int, int, bool) error type absoluteHeaderFetcherFn func(uint64, int, int, bool) error
type blockBodyFetcherFn func([]common.Hash) error type blockBodyFetcherFn func([]common.Hash) error
type receiptFetcherFn func([]common.Hash) error
var ( var (
errAlreadyFetching = errors.New("already fetching blocks from peer") errAlreadyFetching = errors.New("already fetching blocks from peer")
@ -52,11 +53,14 @@ type peer struct {
id string // Unique identifier of the peer id string // Unique identifier of the peer
head common.Hash // Hash of the peers latest known block head common.Hash // Hash of the peers latest known block
idle int32 // Current activity state of the peer (idle = 0, active = 1) blockIdle int32 // Current block activity state of the peer (idle = 0, active = 1)
receiptIdle int32 // Current receipt activity state of the peer (idle = 0, active = 1)
rep int32 // Simple peer reputation rep int32 // Simple peer reputation
capacity int32 // Number of blocks allowed to fetch per request blockCapacity int32 // Number of blocks (bodies) allowed to fetch per request
started time.Time // Time instance when the last fetch was started receiptCapacity int32 // Number of receipts allowed to fetch per request
blockStarted time.Time // Time instance when the last block (body)fetch was started
receiptStarted time.Time // Time instance when the last receipt fetch was started
ignored *set.Set // Set of hashes not to request (didn't have previously) ignored *set.Set // Set of hashes not to request (didn't have previously)
@ -68,6 +72,8 @@ type peer struct {
getAbsHeaders absoluteHeaderFetcherFn // [eth/62] Method to retrieve a batch of headers from an absolute position getAbsHeaders absoluteHeaderFetcherFn // [eth/62] Method to retrieve a batch of headers from an absolute position
getBlockBodies blockBodyFetcherFn // [eth/62] Method to retrieve a batch of block bodies getBlockBodies blockBodyFetcherFn // [eth/62] Method to retrieve a batch of block bodies
getReceipts receiptFetcherFn // [eth/63] Method to retrieve a batch of block transaction receipts
version int // Eth protocol version number to switch strategies version int // Eth protocol version number to switch strategies
} }
@ -75,11 +81,13 @@ type peer struct {
// mechanisms. // mechanisms.
func newPeer(id string, version int, head common.Hash, func newPeer(id string, version int, head common.Hash,
getRelHashes relativeHashFetcherFn, getAbsHashes absoluteHashFetcherFn, getBlocks blockFetcherFn, // eth/61 callbacks, remove when upgrading getRelHashes relativeHashFetcherFn, getAbsHashes absoluteHashFetcherFn, getBlocks blockFetcherFn, // eth/61 callbacks, remove when upgrading
getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn) *peer { getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn,
getReceipts receiptFetcherFn) *peer {
return &peer{ return &peer{
id: id, id: id,
head: head, head: head,
capacity: 1, blockCapacity: 1,
receiptCapacity: 1,
ignored: set.New(), ignored: set.New(),
getRelHashes: getRelHashes, getRelHashes: getRelHashes,
@ -90,24 +98,28 @@ func newPeer(id string, version int, head common.Hash,
getAbsHeaders: getAbsHeaders, getAbsHeaders: getAbsHeaders,
getBlockBodies: getBlockBodies, getBlockBodies: getBlockBodies,
getReceipts: getReceipts,
version: version, version: version,
} }
} }
// Reset clears the internal state of a peer entity. // Reset clears the internal state of a peer entity.
func (p *peer) Reset() { func (p *peer) Reset() {
atomic.StoreInt32(&p.idle, 0) atomic.StoreInt32(&p.blockIdle, 0)
atomic.StoreInt32(&p.capacity, 1) atomic.StoreInt32(&p.receiptIdle, 0)
atomic.StoreInt32(&p.blockCapacity, 1)
atomic.StoreInt32(&p.receiptCapacity, 1)
p.ignored.Clear() p.ignored.Clear()
} }
// Fetch61 sends a block retrieval request to the remote peer. // Fetch61 sends a block retrieval request to the remote peer.
func (p *peer) Fetch61(request *fetchRequest) error { func (p *peer) Fetch61(request *fetchRequest) error {
// Short circuit if the peer is already fetching // Short circuit if the peer is already fetching
if !atomic.CompareAndSwapInt32(&p.idle, 0, 1) { if !atomic.CompareAndSwapInt32(&p.blockIdle, 0, 1) {
return errAlreadyFetching return errAlreadyFetching
} }
p.started = time.Now() p.blockStarted = time.Now()
// Convert the hash set to a retrievable slice // Convert the hash set to a retrievable slice
hashes := make([]common.Hash, 0, len(request.Hashes)) hashes := make([]common.Hash, 0, len(request.Hashes))
@ -119,13 +131,13 @@ func (p *peer) Fetch61(request *fetchRequest) error {
return nil return nil
} }
// Fetch sends a block body retrieval request to the remote peer. // FetchBodies sends a block body retrieval request to the remote peer.
func (p *peer) Fetch(request *fetchRequest) error { func (p *peer) FetchBodies(request *fetchRequest) error {
// Short circuit if the peer is already fetching // Short circuit if the peer is already fetching
if !atomic.CompareAndSwapInt32(&p.idle, 0, 1) { if !atomic.CompareAndSwapInt32(&p.blockIdle, 0, 1) {
return errAlreadyFetching return errAlreadyFetching
} }
p.started = time.Now() p.blockStarted = time.Now()
// Convert the header set to a retrievable slice // Convert the header set to a retrievable slice
hashes := make([]common.Hash, 0, len(request.Headers)) hashes := make([]common.Hash, 0, len(request.Headers))
@ -137,55 +149,64 @@ func (p *peer) Fetch(request *fetchRequest) error {
return nil return nil
} }
// SetIdle61 sets the peer to idle, allowing it to execute new retrieval requests. // FetchReceipts sends a receipt retrieval request to the remote peer.
func (p *peer) FetchReceipts(request *fetchRequest) error {
// Short circuit if the peer is already fetching
if !atomic.CompareAndSwapInt32(&p.receiptIdle, 0, 1) {
return errAlreadyFetching
}
p.receiptStarted = time.Now()
// Convert the header set to a retrievable slice
hashes := make([]common.Hash, 0, len(request.Headers))
for _, header := range request.Headers {
hashes = append(hashes, header.Hash())
}
go p.getReceipts(hashes)
return nil
}
// SetBlocksIdle sets the peer to idle, allowing it to execute new retrieval requests.
// Its block retrieval allowance will also be updated either up- or downwards, // Its block retrieval allowance will also be updated either up- or downwards,
// depending on whether the previous fetch completed in time or not. // depending on whether the previous fetch completed in time or not.
func (p *peer) SetIdle61() { func (p *peer) SetBlocksIdle() {
// Update the peer's download allowance based on previous performance p.setIdle(p.blockStarted, blockSoftTTL, blockHardTTL, MaxBlockFetch, &p.blockCapacity, &p.blockIdle)
scale := 2.0
if time.Since(p.started) > blockSoftTTL {
scale = 0.5
if time.Since(p.started) > blockHardTTL {
scale = 1 / float64(MaxBlockFetch) // reduces capacity to 1
}
}
for {
// Calculate the new download bandwidth allowance
prev := atomic.LoadInt32(&p.capacity)
next := int32(math.Max(1, math.Min(float64(MaxBlockFetch), float64(prev)*scale)))
// Try to update the old value
if atomic.CompareAndSwapInt32(&p.capacity, prev, next) {
// If we're having problems at 1 capacity, try to find better peers
if next == 1 {
p.Demote()
}
break
}
}
// Set the peer to idle to allow further block requests
atomic.StoreInt32(&p.idle, 0)
} }
// SetIdle sets the peer to idle, allowing it to execute new retrieval requests. // SetBodiesIdle sets the peer to idle, allowing it to execute new retrieval requests.
// Its block body retrieval allowance will also be updated either up- or downwards, // Its block body retrieval allowance will also be updated either up- or downwards,
// depending on whether the previous fetch completed in time or not. // depending on whether the previous fetch completed in time or not.
func (p *peer) SetIdle() { func (p *peer) SetBodiesIdle() {
p.setIdle(p.blockStarted, bodySoftTTL, bodyHardTTL, MaxBlockFetch, &p.blockCapacity, &p.blockIdle)
}
// SetReceiptsIdle sets the peer to idle, allowing it to execute new retrieval requests.
// Its receipt retrieval allowance will also be updated either up- or downwards,
// depending on whether the previous fetch completed in time or not.
func (p *peer) SetReceiptsIdle() {
p.setIdle(p.receiptStarted, receiptSoftTTL, receiptHardTTL, MaxReceiptFetch, &p.receiptCapacity, &p.receiptIdle)
}
// setIdle sets the peer to idle, allowing it to execute new retrieval requests.
// Its data retrieval allowance will also be updated either up- or downwards,
// depending on whether the previous fetch completed in time or not.
func (p *peer) setIdle(started time.Time, softTTL, hardTTL time.Duration, maxFetch int, capacity, idle *int32) {
// Update the peer's download allowance based on previous performance // Update the peer's download allowance based on previous performance
scale := 2.0 scale := 2.0
if time.Since(p.started) > bodySoftTTL { if time.Since(started) > softTTL {
scale = 0.5 scale = 0.5
if time.Since(p.started) > bodyHardTTL { if time.Since(started) > hardTTL {
scale = 1 / float64(MaxBodyFetch) // reduces capacity to 1 scale = 1 / float64(maxFetch) // reduces capacity to 1
} }
} }
for { for {
// Calculate the new download bandwidth allowance // Calculate the new download bandwidth allowance
prev := atomic.LoadInt32(&p.capacity) prev := atomic.LoadInt32(capacity)
next := int32(math.Max(1, math.Min(float64(MaxBodyFetch), float64(prev)*scale))) next := int32(math.Max(1, math.Min(float64(maxFetch), float64(prev)*scale)))
// Try to update the old value // Try to update the old value
if atomic.CompareAndSwapInt32(&p.capacity, prev, next) { if atomic.CompareAndSwapInt32(capacity, prev, next) {
// If we're having problems at 1 capacity, try to find better peers // If we're having problems at 1 capacity, try to find better peers
if next == 1 { if next == 1 {
p.Demote() p.Demote()
@ -193,14 +214,20 @@ func (p *peer) SetIdle() {
break break
} }
} }
// Set the peer to idle to allow further block requests // Set the peer to idle to allow further fetch requests
atomic.StoreInt32(&p.idle, 0) atomic.StoreInt32(idle, 0)
} }
// Capacity retrieves the peers block download allowance based on its previously // BlockCapacity retrieves the peers block download allowance based on its
// discovered bandwidth capacity. // previously discovered bandwidth capacity.
func (p *peer) Capacity() int { func (p *peer) BlockCapacity() int {
return int(atomic.LoadInt32(&p.capacity)) return int(atomic.LoadInt32(&p.blockCapacity))
}
// ReceiptCapacity retrieves the peers block download allowance based on its
// previously discovered bandwidth capacity.
func (p *peer) ReceiptCapacity() int {
return int(atomic.LoadInt32(&p.receiptCapacity))
} }
// Promote increases the peer's reputation. // Promote increases the peer's reputation.
@ -226,7 +253,8 @@ func (p *peer) Demote() {
func (p *peer) String() string { func (p *peer) String() string {
return fmt.Sprintf("Peer %s [%s]", p.id, return fmt.Sprintf("Peer %s [%s]", p.id,
fmt.Sprintf("reputation %3d, ", atomic.LoadInt32(&p.rep))+ fmt.Sprintf("reputation %3d, ", atomic.LoadInt32(&p.rep))+
fmt.Sprintf("capacity %3d, ", atomic.LoadInt32(&p.capacity))+ fmt.Sprintf("block cap %3d, ", atomic.LoadInt32(&p.blockCapacity))+
fmt.Sprintf("receipt cap %3d, ", atomic.LoadInt32(&p.receiptCapacity))+
fmt.Sprintf("ignored %4d", p.ignored.Size()), fmt.Sprintf("ignored %4d", p.ignored.Size()),
) )
} }
@ -310,26 +338,52 @@ func (ps *peerSet) AllPeers() []*peer {
return list return list
} }
// IdlePeers retrieves a flat list of all the currently idle peers within the // BlockIdlePeers retrieves a flat list of all the currently idle peers within the
// active peer set, ordered by their reputation. // active peer set, ordered by their reputation.
func (ps *peerSet) IdlePeers(version int) []*peer { func (ps *peerSet) BlockIdlePeers(version int) ([]*peer, int) {
ps.lock.RLock() ps.lock.RLock()
defer ps.lock.RUnlock() defer ps.lock.RUnlock()
list := make([]*peer, 0, len(ps.peers)) idle, total := make([]*peer, 0, len(ps.peers)), 0
for _, p := range ps.peers { for _, p := range ps.peers {
if (version == eth61 && p.version == eth61) || (version >= eth62 && p.version >= eth62) { if (version == 61 && p.version == 61) || (version >= 62 && p.version >= 62) {
if atomic.LoadInt32(&p.idle) == 0 { if atomic.LoadInt32(&p.blockIdle) == 0 {
list = append(list, p) idle = append(idle, p)
}
total++
}
}
for i := 0; i < len(idle); i++ {
for j := i + 1; j < len(idle); j++ {
if atomic.LoadInt32(&idle[i].rep) < atomic.LoadInt32(&idle[j].rep) {
idle[i], idle[j] = idle[j], idle[i]
} }
} }
} }
for i := 0; i < len(list); i++ { return idle, total
for j := i + 1; j < len(list); j++ { }
if atomic.LoadInt32(&list[i].rep) < atomic.LoadInt32(&list[j].rep) {
list[i], list[j] = list[j], list[i] // ReceiptIdlePeers retrieves a flat list of all the currently idle peers within the
} // active peer set, ordered by their reputation.
} func (ps *peerSet) ReceiptIdlePeers() ([]*peer, int) {
} ps.lock.RLock()
return list defer ps.lock.RUnlock()
idle, total := make([]*peer, 0, len(ps.peers)), 0
for _, p := range ps.peers {
if p.version >= 63 {
if atomic.LoadInt32(&p.receiptIdle) == 0 {
idle = append(idle, p)
}
total++
}
}
for i := 0; i < len(idle); i++ {
for j := i + 1; j < len(idle); j++ {
if atomic.LoadInt32(&idle[i].rep) < atomic.LoadInt32(&idle[j].rep) {
idle[i], idle[j] = idle[j], idle[i]
}
}
}
return idle, total
} }

View File

@ -29,11 +29,12 @@ import (
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/logger/glog"
"github.com/rcrowley/go-metrics"
"gopkg.in/karalabe/cookiejar.v2/collections/prque" "gopkg.in/karalabe/cookiejar.v2/collections/prque"
) )
var ( var (
blockCacheLimit = 8 * MaxBlockFetch // Maximum number of blocks to cache before throttling the download blockCacheLimit = 1024 // Maximum number of blocks to cache before throttling the download
) )
var ( var (
@ -41,29 +42,47 @@ var (
errStaleDelivery = errors.New("stale delivery") errStaleDelivery = errors.New("stale delivery")
) )
// fetchRequest is a currently running block retrieval operation. // fetchRequest is a currently running data retrieval operation.
type fetchRequest struct { type fetchRequest struct {
Peer *peer // Peer to which the request was sent Peer *peer // Peer to which the request was sent
Hashes map[common.Hash]int // [eth/61] Requested hashes with their insertion index (priority) Hashes map[common.Hash]int // [eth/61] Requested block with their insertion index (priority)
Headers []*types.Header // [eth/62] Requested headers, sorted by request order Headers []*types.Header // [eth/62] Requested headers, sorted by request order
Time time.Time // Time when the request was made Time time.Time // Time when the request was made
} }
// fetchResult is the assembly collecting partial results from potentially more
// than one fetcher routines, until all outstanding retrievals complete and the
// result as a whole can be processed.
type fetchResult struct {
Pending int // Number of data fetches still pending
Header *types.Header
Uncles []*types.Header
Transactions types.Transactions
Receipts types.Receipts
}
// queue represents hashes that are either need fetching or are being fetched // queue represents hashes that are either need fetching or are being fetched
type queue struct { type queue struct {
hashPool map[common.Hash]int // [eth/61] Pending hashes, mapping to their insertion index (priority) hashPool map[common.Hash]int // [eth/61] Pending hashes, mapping to their insertion index (priority)
hashQueue *prque.Prque // [eth/61] Priority queue of the block hashes to fetch hashQueue *prque.Prque // [eth/61] Priority queue of the block hashes to fetch
hashCounter int // [eth/61] Counter indexing the added hashes to ensure retrieval order hashCounter int // [eth/61] Counter indexing the added hashes to ensure retrieval order
headerPool map[common.Hash]*types.Header // [eth/62] Pending headers, mapping from their hashes
headerQueue *prque.Prque // [eth/62] Priority queue of the headers to fetch the bodies for
headerHead common.Hash // [eth/62] Hash of the last queued header to verify order headerHead common.Hash // [eth/62] Hash of the last queued header to verify order
pendPool map[string]*fetchRequest // Currently pending block retrieval operations blockTaskPool map[common.Hash]*types.Header // [eth/62] Pending block (body) retrieval tasks, mapping hashes to headers
blockTaskQueue *prque.Prque // [eth/62] Priority queue of the headers to fetch the blocks (bodies) for
blockPendPool map[string]*fetchRequest // [eth/62] Currently pending block (body) retrieval operations
blockDonePool map[common.Hash]struct{} // [eth/62] Set of the completed block (body) fetches
blockPool map[common.Hash]uint64 // Hash-set of the downloaded data blocks, mapping to cache indexes receiptTaskPool map[common.Hash]*types.Header // [eth/63] Pending receipt retrieval tasks, mapping hashes to headers
blockCache []*Block // Downloaded but not yet delivered blocks receiptTaskQueue *prque.Prque // [eth/63] Priority queue of the headers to fetch the receipts for
blockOffset uint64 // Offset of the first cached block in the block-chain receiptPendPool map[string]*fetchRequest // [eth/63] Currently pending receipt retrieval operations
receiptDonePool map[common.Hash]struct{} // [eth/63] Set of the completed receipt fetches
resultCache []*fetchResult // Downloaded but not yet delivered fetch results
resultOffset uint64 // Offset of the first cached fetch result in the block-chain
resultParts int // Number of fetch components required to complete an item
lock sync.RWMutex lock sync.RWMutex
} }
@ -73,11 +92,15 @@ func newQueue() *queue {
return &queue{ return &queue{
hashPool: make(map[common.Hash]int), hashPool: make(map[common.Hash]int),
hashQueue: prque.New(), hashQueue: prque.New(),
headerPool: make(map[common.Hash]*types.Header), blockTaskPool: make(map[common.Hash]*types.Header),
headerQueue: prque.New(), blockTaskQueue: prque.New(),
pendPool: make(map[string]*fetchRequest), blockPendPool: make(map[string]*fetchRequest),
blockPool: make(map[common.Hash]uint64), blockDonePool: make(map[common.Hash]struct{}),
blockCache: make([]*Block, blockCacheLimit), receiptTaskPool: make(map[common.Hash]*types.Header),
receiptTaskQueue: prque.New(),
receiptPendPool: make(map[string]*fetchRequest),
receiptDonePool: make(map[common.Hash]struct{}),
resultCache: make([]*fetchResult, blockCacheLimit),
} }
} }
@ -90,32 +113,37 @@ func (q *queue) Reset() {
q.hashQueue.Reset() q.hashQueue.Reset()
q.hashCounter = 0 q.hashCounter = 0
q.headerPool = make(map[common.Hash]*types.Header)
q.headerQueue.Reset()
q.headerHead = common.Hash{} q.headerHead = common.Hash{}
q.pendPool = make(map[string]*fetchRequest) q.blockTaskPool = make(map[common.Hash]*types.Header)
q.blockTaskQueue.Reset()
q.blockPendPool = make(map[string]*fetchRequest)
q.blockDonePool = make(map[common.Hash]struct{})
q.blockPool = make(map[common.Hash]uint64) q.receiptTaskPool = make(map[common.Hash]*types.Header)
q.blockOffset = 0 q.receiptTaskQueue.Reset()
q.blockCache = make([]*Block, blockCacheLimit) q.receiptPendPool = make(map[string]*fetchRequest)
q.receiptDonePool = make(map[common.Hash]struct{})
q.resultCache = make([]*fetchResult, blockCacheLimit)
q.resultOffset = 0
q.resultParts = 0
} }
// Size retrieves the number of blocks in the queue, returning separately for // PendingBlocks retrieves the number of block (body) requests pending for retrieval.
// pending and already downloaded. func (q *queue) PendingBlocks() int {
func (q *queue) Size() (int, int) {
q.lock.RLock() q.lock.RLock()
defer q.lock.RUnlock() defer q.lock.RUnlock()
return len(q.hashPool) + len(q.headerPool), len(q.blockPool) return q.hashQueue.Size() + q.blockTaskQueue.Size()
} }
// Pending retrieves the number of blocks pending for retrieval. // PendingReceipts retrieves the number of block receipts pending for retrieval.
func (q *queue) Pending() int { func (q *queue) PendingReceipts() int {
q.lock.RLock() q.lock.RLock()
defer q.lock.RUnlock() defer q.lock.RUnlock()
return q.hashQueue.Size() + q.headerQueue.Size() return q.receiptTaskQueue.Size()
} }
// InFlight retrieves the number of fetch requests currently in flight. // InFlight retrieves the number of fetch requests currently in flight.
@ -123,44 +151,55 @@ func (q *queue) InFlight() int {
q.lock.RLock() q.lock.RLock()
defer q.lock.RUnlock() defer q.lock.RUnlock()
return len(q.pendPool) return len(q.blockPendPool) + len(q.receiptPendPool)
} }
// Throttle checks if the download should be throttled (active block fetches // Idle returns if the queue is fully idle or has some data still inside. This
// exceed block cache). // method is used by the tester to detect termination events.
func (q *queue) Throttle() bool { func (q *queue) Idle() bool {
q.lock.RLock() q.lock.RLock()
defer q.lock.RUnlock() defer q.lock.RUnlock()
// Calculate the currently in-flight block requests queued := q.hashQueue.Size() + q.blockTaskQueue.Size() + q.receiptTaskQueue.Size()
pending := len(q.blockPendPool) + len(q.receiptPendPool)
cached := len(q.blockDonePool) + len(q.receiptDonePool)
return (queued + pending + cached) == 0
}
// ThrottleBlocks checks if the download should be throttled (active block (body)
// fetches exceed block cache).
func (q *queue) ThrottleBlocks() bool {
q.lock.RLock()
defer q.lock.RUnlock()
// Calculate the currently in-flight block (body) requests
pending := 0 pending := 0
for _, request := range q.pendPool { for _, request := range q.blockPendPool {
pending += len(request.Hashes) + len(request.Headers) pending += len(request.Hashes) + len(request.Headers)
} }
// Throttle if more blocks are in-flight than free space in the cache // Throttle if more blocks (bodies) are in-flight than free space in the cache
return pending >= len(q.blockCache)-len(q.blockPool) return pending >= len(q.resultCache)-len(q.blockDonePool)
} }
// Has checks if a hash is within the download queue or not. // ThrottleReceipts checks if the download should be throttled (active receipt
func (q *queue) Has(hash common.Hash) bool { // fetches exceed block cache).
func (q *queue) ThrottleReceipts() bool {
q.lock.RLock() q.lock.RLock()
defer q.lock.RUnlock() defer q.lock.RUnlock()
if _, ok := q.hashPool[hash]; ok { // Calculate the currently in-flight receipt requests
return true pending := 0
for _, request := range q.receiptPendPool {
pending += len(request.Headers)
} }
if _, ok := q.headerPool[hash]; ok { // Throttle if more receipts are in-flight than free space in the cache
return true return pending >= len(q.resultCache)-len(q.receiptDonePool)
}
if _, ok := q.blockPool[hash]; ok {
return true
}
return false
} }
// Insert61 adds a set of hashes for the download queue for scheduling, returning // Schedule61 adds a set of hashes for the download queue for scheduling, returning
// the new hashes encountered. // the new hashes encountered.
func (q *queue) Insert61(hashes []common.Hash, fifo bool) []common.Hash { func (q *queue) Schedule61(hashes []common.Hash, fifo bool) []common.Hash {
q.lock.Lock() q.lock.Lock()
defer q.lock.Unlock() defer q.lock.Unlock()
@ -186,22 +225,17 @@ func (q *queue) Insert61(hashes []common.Hash, fifo bool) []common.Hash {
return inserts return inserts
} }
// Insert adds a set of headers for the download queue for scheduling, returning // Schedule adds a set of headers for the download queue for scheduling, returning
// the new headers encountered. // the new headers encountered.
func (q *queue) Insert(headers []*types.Header, from uint64) []*types.Header { func (q *queue) Schedule(headers []*types.Header, from uint64, receipts bool) []*types.Header {
q.lock.Lock() q.lock.Lock()
defer q.lock.Unlock() defer q.lock.Unlock()
// Insert all the headers prioritized by the contained block number // Insert all the headers prioritized by the contained block number
inserts := make([]*types.Header, 0, len(headers)) inserts := make([]*types.Header, 0, len(headers))
for _, header := range headers { for _, header := range headers {
// Make sure no duplicate requests are executed
hash := header.Hash()
if _, ok := q.headerPool[hash]; ok {
glog.V(logger.Warn).Infof("Header #%d [%x] already scheduled", header.Number.Uint64(), hash[:4])
continue
}
// Make sure chain order is honored and preserved throughout // Make sure chain order is honored and preserved throughout
hash := header.Hash()
if header.Number == nil || header.Number.Uint64() != from { if header.Number == nil || header.Number.Uint64() != from {
glog.V(logger.Warn).Infof("Header #%v [%x] broke chain ordering, expected %d", header.Number, hash[:4], from) glog.V(logger.Warn).Infof("Header #%v [%x] broke chain ordering, expected %d", header.Number, hash[:4], from)
break break
@ -210,69 +244,72 @@ func (q *queue) Insert(headers []*types.Header, from uint64) []*types.Header {
glog.V(logger.Warn).Infof("Header #%v [%x] broke chain ancestry", header.Number, hash[:4]) glog.V(logger.Warn).Infof("Header #%v [%x] broke chain ancestry", header.Number, hash[:4])
break break
} }
// Queue the header for body retrieval // Make sure no duplicate requests are executed
if _, ok := q.blockTaskPool[hash]; ok {
glog.V(logger.Warn).Infof("Header #%d [%x] already scheduled for block fetch", header.Number.Uint64(), hash[:4])
continue
}
if _, ok := q.receiptTaskPool[hash]; ok {
glog.V(logger.Warn).Infof("Header #%d [%x] already scheduled for receipt fetch", header.Number.Uint64(), hash[:4])
continue
}
// Queue the header for content retrieval
q.blockTaskPool[hash] = header
q.blockTaskQueue.Push(header, -float32(header.Number.Uint64()))
if receipts {
q.receiptTaskPool[hash] = header
q.receiptTaskQueue.Push(header, -float32(header.Number.Uint64()))
}
inserts = append(inserts, header) inserts = append(inserts, header)
q.headerPool[hash] = header
q.headerQueue.Push(header, -float32(header.Number.Uint64()))
q.headerHead = hash q.headerHead = hash
from++ from++
} }
return inserts return inserts
} }
// GetHeadBlock retrieves the first block from the cache, or nil if it hasn't // GetHeadResult retrieves the first fetch result from the cache, or nil if it hasn't
// been downloaded yet (or simply non existent). // been downloaded yet (or simply non existent).
func (q *queue) GetHeadBlock() *Block { func (q *queue) GetHeadResult() *fetchResult {
q.lock.RLock() q.lock.RLock()
defer q.lock.RUnlock() defer q.lock.RUnlock()
if len(q.blockCache) == 0 { if len(q.resultCache) == 0 || q.resultCache[0] == nil {
return nil return nil
} }
return q.blockCache[0] if q.resultCache[0].Pending > 0 {
return nil
}
return q.resultCache[0]
} }
// GetBlock retrieves a downloaded block, or nil if non-existent. // TakeResults retrieves and permanently removes a batch of fetch results from
func (q *queue) GetBlock(hash common.Hash) *Block { // the cache.
q.lock.RLock() func (q *queue) TakeResults() []*fetchResult {
defer q.lock.RUnlock()
// Short circuit if the block hasn't been downloaded yet
index, ok := q.blockPool[hash]
if !ok {
return nil
}
// Return the block if it's still available in the cache
if q.blockOffset <= index && index < q.blockOffset+uint64(len(q.blockCache)) {
return q.blockCache[index-q.blockOffset]
}
return nil
}
// TakeBlocks retrieves and permanently removes a batch of blocks from the cache.
func (q *queue) TakeBlocks() []*Block {
q.lock.Lock() q.lock.Lock()
defer q.lock.Unlock() defer q.lock.Unlock()
// Accumulate all available blocks // Accumulate all available results
blocks := []*Block{} results := []*fetchResult{}
for _, block := range q.blockCache { for _, result := range q.resultCache {
if block == nil { if result == nil || result.Pending > 0 {
break break
} }
blocks = append(blocks, block) results = append(results, result)
delete(q.blockPool, block.RawBlock.Hash())
}
// Delete the blocks from the slice and let them be garbage collected
// without this slice trick the blocks would stay in memory until nil
// would be assigned to q.blocks
copy(q.blockCache, q.blockCache[len(blocks):])
for k, n := len(q.blockCache)-len(blocks), len(q.blockCache); k < n; k++ {
q.blockCache[k] = nil
}
q.blockOffset += uint64(len(blocks))
return blocks hash := result.Header.Hash()
delete(q.blockDonePool, hash)
delete(q.receiptDonePool, hash)
}
// Delete the results from the slice and let them be garbage collected
// without this slice trick the results would stay in memory until nil
// would be assigned to them.
copy(q.resultCache, q.resultCache[len(results):])
for k, n := len(q.resultCache)-len(results), len(q.resultCache); k < n; k++ {
q.resultCache[k] = nil
}
q.resultOffset += uint64(len(results))
return results
} }
// Reserve61 reserves a set of hashes for the given peer, skipping any previously // Reserve61 reserves a set of hashes for the given peer, skipping any previously
@ -286,12 +323,12 @@ func (q *queue) Reserve61(p *peer, count int) *fetchRequest {
if q.hashQueue.Empty() { if q.hashQueue.Empty() {
return nil return nil
} }
if _, ok := q.pendPool[p.id]; ok { if _, ok := q.blockPendPool[p.id]; ok {
return nil return nil
} }
// Calculate an upper limit on the hashes we might fetch (i.e. throttling) // Calculate an upper limit on the hashes we might fetch (i.e. throttling)
space := len(q.blockCache) - len(q.blockPool) space := len(q.resultCache) - len(q.blockDonePool)
for _, request := range q.pendPool { for _, request := range q.blockPendPool {
space -= len(request.Hashes) space -= len(request.Hashes)
} }
// Retrieve a batch of hashes, skipping previously failed ones // Retrieve a batch of hashes, skipping previously failed ones
@ -319,49 +356,82 @@ func (q *queue) Reserve61(p *peer, count int) *fetchRequest {
Hashes: send, Hashes: send,
Time: time.Now(), Time: time.Now(),
} }
q.pendPool[p.id] = request q.blockPendPool[p.id] = request
return request return request
} }
// Reserve reserves a set of headers for the given peer, skipping any previously // ReserveBlocks reserves a set of body fetches for the given peer, skipping any
// failed download. Beside the next batch of needed fetches, it also returns a // previously failed downloads. Beside the next batch of needed fetches, it also
// flag whether empty blocks were queued requiring processing. // returns a flag whether empty blocks were queued requiring processing.
func (q *queue) Reserve(p *peer, count int) (*fetchRequest, bool, error) { func (q *queue) ReserveBlocks(p *peer, count int) (*fetchRequest, bool, error) {
noop := func(header *types.Header) bool {
return header.TxHash == types.EmptyRootHash && header.UncleHash == types.EmptyUncleHash
}
return q.reserveFetch(p, count, q.blockTaskPool, q.blockTaskQueue, q.blockPendPool, q.blockDonePool, noop)
}
// ReserveReceipts reserves a set of receipt fetches for the given peer, skipping
// any previously failed downloads. Beside the next batch of needed fetches, it
// also returns a flag whether empty receipts were queued requiring importing.
func (q *queue) ReserveReceipts(p *peer, count int) (*fetchRequest, bool, error) {
noop := func(header *types.Header) bool {
return header.ReceiptHash == types.EmptyRootHash
}
return q.reserveFetch(p, count, q.receiptTaskPool, q.receiptTaskQueue, q.receiptPendPool, q.receiptDonePool, noop)
}
// reserveFetch reserves a set of data download operations for a given peer,
// skipping any previously failed ones. This method is a generic version used
// by the individual special reservation functions.
func (q *queue) reserveFetch(p *peer, count int, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque,
pendPool map[string]*fetchRequest, donePool map[common.Hash]struct{}, noop func(*types.Header) bool) (*fetchRequest, bool, error) {
q.lock.Lock() q.lock.Lock()
defer q.lock.Unlock() defer q.lock.Unlock()
// Short circuit if the pool has been depleted, or if the peer's already // Short circuit if the pool has been depleted, or if the peer's already
// downloading something (sanity check not to corrupt state) // downloading something (sanity check not to corrupt state)
if q.headerQueue.Empty() { if taskQueue.Empty() {
return nil, false, nil return nil, false, nil
} }
if _, ok := q.pendPool[p.id]; ok { if _, ok := pendPool[p.id]; ok {
return nil, false, nil return nil, false, nil
} }
// Calculate an upper limit on the bodies we might fetch (i.e. throttling) // Calculate an upper limit on the items we might fetch (i.e. throttling)
space := len(q.blockCache) - len(q.blockPool) space := len(q.resultCache) - len(donePool)
for _, request := range q.pendPool { for _, request := range pendPool {
space -= len(request.Headers) space -= len(request.Headers)
} }
// Retrieve a batch of headers, skipping previously failed ones // Retrieve a batch of tasks, skipping previously failed ones
send := make([]*types.Header, 0, count) send := make([]*types.Header, 0, count)
skip := make([]*types.Header, 0) skip := make([]*types.Header, 0)
process := false progress := false
for proc := 0; proc < space && len(send) < count && !q.headerQueue.Empty(); proc++ { for proc := 0; proc < space && len(send) < count && !taskQueue.Empty(); proc++ {
header := q.headerQueue.PopItem().(*types.Header) header := taskQueue.PopItem().(*types.Header)
// If the header defines an empty block, deliver straight // If we're the first to request this task, initialize the result container
if header.TxHash == types.DeriveSha(types.Transactions{}) && header.UncleHash == types.CalcUncleHash([]*types.Header{}) { index := int(header.Number.Int64() - int64(q.resultOffset))
if err := q.enqueue("", types.NewBlockWithHeader(header)); err != nil { if index >= len(q.resultCache) || index < 0 {
return nil, false, errInvalidChain return nil, false, errInvalidChain
} }
delete(q.headerPool, header.Hash()) if q.resultCache[index] == nil {
process, space, proc = true, space-1, proc-1 q.resultCache[index] = &fetchResult{
Pending: q.resultParts,
Header: header,
}
}
// If this fetch task is a noop, skip this fetch operation
if noop(header) {
donePool[header.Hash()] = struct{}{}
delete(taskPool, header.Hash())
space, proc = space-1, proc-1
q.resultCache[index].Pending--
progress = true
continue continue
} }
// If it's a content block, add to the body fetch request // Otherwise if not a known unknown block, add to the retrieve list
if p.ignored.Has(header.Hash()) { if p.ignored.Has(header.Hash()) {
skip = append(skip, header) skip = append(skip, header)
} else { } else {
@ -370,24 +440,41 @@ func (q *queue) Reserve(p *peer, count int) (*fetchRequest, bool, error) {
} }
// Merge all the skipped headers back // Merge all the skipped headers back
for _, header := range skip { for _, header := range skip {
q.headerQueue.Push(header, -float32(header.Number.Uint64())) taskQueue.Push(header, -float32(header.Number.Uint64()))
} }
// Assemble and return the block download request // Assemble and return the block download request
if len(send) == 0 { if len(send) == 0 {
return nil, process, nil return nil, progress, nil
} }
request := &fetchRequest{ request := &fetchRequest{
Peer: p, Peer: p,
Headers: send, Headers: send,
Time: time.Now(), Time: time.Now(),
} }
q.pendPool[p.id] = request pendPool[p.id] = request
return request, process, nil return request, progress, nil
} }
// Cancel aborts a fetch request, returning all pending hashes to the queue. // Cancel61 aborts a fetch request, returning all pending hashes to the queue.
func (q *queue) Cancel(request *fetchRequest) { func (q *queue) Cancel61(request *fetchRequest) {
q.cancel(request, nil, q.blockPendPool)
}
// CancelBlocks aborts a body fetch request, returning all pending hashes to the
// task queue.
func (q *queue) CancelBlocks(request *fetchRequest) {
q.cancel(request, q.blockTaskQueue, q.blockPendPool)
}
// CancelReceipts aborts a body fetch request, returning all pending hashes to
// the task queue.
func (q *queue) CancelReceipts(request *fetchRequest) {
q.cancel(request, q.receiptTaskQueue, q.receiptPendPool)
}
// Cancel aborts a fetch request, returning all pending hashes to the task queue.
func (q *queue) cancel(request *fetchRequest, taskQueue *prque.Prque, pendPool map[string]*fetchRequest) {
q.lock.Lock() q.lock.Lock()
defer q.lock.Unlock() defer q.lock.Unlock()
@ -395,20 +482,62 @@ func (q *queue) Cancel(request *fetchRequest) {
q.hashQueue.Push(hash, float32(index)) q.hashQueue.Push(hash, float32(index))
} }
for _, header := range request.Headers { for _, header := range request.Headers {
q.headerQueue.Push(header, -float32(header.Number.Uint64())) taskQueue.Push(header, -float32(header.Number.Uint64()))
} }
delete(q.pendPool, request.Peer.id) delete(pendPool, request.Peer.id)
} }
// Expire checks for in flight requests that exceeded a timeout allowance, // Revoke cancels all pending requests belonging to a given peer. This method is
// meant to be called during a peer drop to quickly reassign owned data fetches
// to remaining nodes.
func (q *queue) Revoke(peerId string) {
q.lock.Lock()
defer q.lock.Unlock()
if request, ok := q.blockPendPool[peerId]; ok {
for hash, index := range request.Hashes {
q.hashQueue.Push(hash, float32(index))
}
for _, header := range request.Headers {
q.blockTaskQueue.Push(header, -float32(header.Number.Uint64()))
}
delete(q.blockPendPool, peerId)
}
if request, ok := q.receiptPendPool[peerId]; ok {
for _, header := range request.Headers {
q.receiptTaskQueue.Push(header, -float32(header.Number.Uint64()))
}
delete(q.receiptPendPool, peerId)
}
}
// Expire61 checks for in flight requests that exceeded a timeout allowance,
// canceling them and returning the responsible peers for penalization. // canceling them and returning the responsible peers for penalization.
func (q *queue) Expire(timeout time.Duration) []string { func (q *queue) Expire61(timeout time.Duration) []string {
return q.expire(timeout, q.blockPendPool, nil)
}
// ExpireBlocks checks for in flight block body requests that exceeded a timeout
// allowance, canceling them and returning the responsible peers for penalization.
func (q *queue) ExpireBlocks(timeout time.Duration) []string {
return q.expire(timeout, q.blockPendPool, q.blockTaskQueue)
}
// ExpireReceipts checks for in flight receipt requests that exceeded a timeout
// allowance, canceling them and returning the responsible peers for penalization.
func (q *queue) ExpireReceipts(timeout time.Duration) []string {
return q.expire(timeout, q.receiptPendPool, q.receiptTaskQueue)
}
// expire is the generic check that move expired tasks from a pending pool back
// into a task pool, returning all entities caught with expired tasks.
func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, taskQueue *prque.Prque) []string {
q.lock.Lock() q.lock.Lock()
defer q.lock.Unlock() defer q.lock.Unlock()
// Iterate over the expired requests and return each to the queue // Iterate over the expired requests and return each to the queue
peers := []string{} peers := []string{}
for id, request := range q.pendPool { for id, request := range pendPool {
if time.Since(request.Time) > timeout { if time.Since(request.Time) > timeout {
// Update the metrics with the timeout // Update the metrics with the timeout
if len(request.Hashes) > 0 { if len(request.Hashes) > 0 {
@ -421,14 +550,14 @@ func (q *queue) Expire(timeout time.Duration) []string {
q.hashQueue.Push(hash, float32(index)) q.hashQueue.Push(hash, float32(index))
} }
for _, header := range request.Headers { for _, header := range request.Headers {
q.headerQueue.Push(header, -float32(header.Number.Uint64())) taskQueue.Push(header, -float32(header.Number.Uint64()))
} }
peers = append(peers, id) peers = append(peers, id)
} }
} }
// Remove the expired requests from the pending pool // Remove the expired requests from the pending pool
for _, id := range peers { for _, id := range peers {
delete(q.pendPool, id) delete(pendPool, id)
} }
return peers return peers
} }
@ -439,12 +568,12 @@ func (q *queue) Deliver61(id string, blocks []*types.Block) (err error) {
defer q.lock.Unlock() defer q.lock.Unlock()
// Short circuit if the blocks were never requested // Short circuit if the blocks were never requested
request := q.pendPool[id] request := q.blockPendPool[id]
if request == nil { if request == nil {
return errNoFetchesPending return errNoFetchesPending
} }
blockReqTimer.UpdateSince(request.Time) blockReqTimer.UpdateSince(request.Time)
delete(q.pendPool, id) delete(q.blockPendPool, id)
// If no blocks were retrieved, mark them as unavailable for the origin peer // If no blocks were retrieved, mark them as unavailable for the origin peer
if len(blocks) == 0 { if len(blocks) == 0 {
@ -461,10 +590,19 @@ func (q *queue) Deliver61(id string, blocks []*types.Block) (err error) {
errs = append(errs, fmt.Errorf("non-requested block %x", hash)) errs = append(errs, fmt.Errorf("non-requested block %x", hash))
continue continue
} }
// Queue the block up for processing // Reconstruct the next result if contents match up
if err := q.enqueue(id, block); err != nil { index := int(block.Number().Int64() - int64(q.resultOffset))
return err if index >= len(q.resultCache) || index < 0 {
errs = []error{errInvalidChain}
break
} }
q.resultCache[index] = &fetchResult{
Header: block.Header(),
Transactions: block.Transactions(),
Uncles: block.Uncles(),
}
q.blockDonePool[block.Hash()] = struct{}{}
delete(request.Hashes, hash) delete(request.Hashes, hash)
delete(q.hashPool, hash) delete(q.hashPool, hash)
} }
@ -473,72 +611,12 @@ func (q *queue) Deliver61(id string, blocks []*types.Block) (err error) {
q.hashQueue.Push(hash, float32(index)) q.hashQueue.Push(hash, float32(index))
} }
// If none of the blocks were good, it's a stale delivery // If none of the blocks were good, it's a stale delivery
if len(errs) != 0 {
if len(errs) == len(blocks) {
return errStaleDelivery
}
return fmt.Errorf("multiple failures: %v", errs)
}
return nil
}
// Deliver injects a block body retrieval response into the download queue.
func (q *queue) Deliver(id string, txLists [][]*types.Transaction, uncleLists [][]*types.Header) error {
q.lock.Lock()
defer q.lock.Unlock()
// Short circuit if the block bodies were never requested
request := q.pendPool[id]
if request == nil {
return errNoFetchesPending
}
bodyReqTimer.UpdateSince(request.Time)
delete(q.pendPool, id)
// If no block bodies were retrieved, mark them as unavailable for the origin peer
if len(txLists) == 0 || len(uncleLists) == 0 {
for hash, _ := range request.Headers {
request.Peer.ignored.Add(hash)
}
}
// Assemble each of the block bodies with their headers and queue for processing
errs := make([]error, 0)
for i, header := range request.Headers {
// Short circuit block assembly if no more bodies are found
if i >= len(txLists) || i >= len(uncleLists) {
break
}
// Reconstruct the next block if contents match up
if types.DeriveSha(types.Transactions(txLists[i])) != header.TxHash || types.CalcUncleHash(uncleLists[i]) != header.UncleHash {
errs = []error{errInvalidBody}
break
}
block := types.NewBlockWithHeader(header).WithBody(txLists[i], uncleLists[i])
// Queue the block up for processing
if err := q.enqueue(id, block); err != nil {
errs = []error{err}
break
}
request.Headers[i] = nil
delete(q.headerPool, header.Hash())
}
// Return all failed or missing fetches to the queue
for _, header := range request.Headers {
if header != nil {
q.headerQueue.Push(header, -float32(header.Number.Uint64()))
}
}
// If none of the blocks were good, it's a stale delivery
switch { switch {
case len(errs) == 0: case len(errs) == 0:
return nil return nil
case len(errs) == 1 && errs[0] == errInvalidBody: case len(errs) == 1 && (errs[0] == errInvalidChain || errs[0] == errInvalidBlock):
return errInvalidBody return errs[0]
case len(errs) == 1 && errs[0] == errInvalidChain:
return errInvalidChain
case len(errs) == len(request.Headers): case len(errs) == len(request.Headers):
return errStaleDelivery return errStaleDelivery
@ -548,29 +626,105 @@ func (q *queue) Deliver(id string, txLists [][]*types.Transaction, uncleLists []
} }
} }
// enqueue inserts a new block into the final delivery queue, waiting for pickup // DeliverBlocks injects a block (body) retrieval response into the results queue.
// by the processor. func (q *queue) DeliverBlocks(id string, txLists [][]*types.Transaction, uncleLists [][]*types.Header) error {
func (q *queue) enqueue(origin string, block *types.Block) error { reconstruct := func(header *types.Header, index int, result *fetchResult) error {
// If a requested block falls out of the range, the hash chain is invalid if types.DeriveSha(types.Transactions(txLists[index])) != header.TxHash || types.CalcUncleHash(uncleLists[index]) != header.UncleHash {
index := int(int64(block.NumberU64()) - int64(q.blockOffset)) return errInvalidBody
if index >= len(q.blockCache) || index < 0 {
return errInvalidChain
} }
// Otherwise merge the block and mark the hash done result.Transactions = txLists[index]
q.blockCache[index] = &Block{ result.Uncles = uncleLists[index]
RawBlock: block,
OriginPeer: origin,
}
q.blockPool[block.Header().Hash()] = block.NumberU64()
return nil return nil
}
return q.deliver(id, q.blockTaskPool, q.blockTaskQueue, q.blockPendPool, q.blockDonePool, bodyReqTimer, len(txLists), reconstruct)
} }
// Prepare configures the block cache offset to allow accepting inbound blocks. // DeliverReceipts injects a receipt retrieval response into the results queue.
func (q *queue) Prepare(offset uint64) { func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt) error {
reconstruct := func(header *types.Header, index int, result *fetchResult) error {
if types.DeriveSha(types.Receipts(receiptList[index])) != header.ReceiptHash {
return errInvalidReceipt
}
result.Receipts = receiptList[index]
return nil
}
return q.deliver(id, q.receiptTaskPool, q.receiptTaskQueue, q.receiptPendPool, q.receiptDonePool, receiptReqTimer, len(receiptList), reconstruct)
}
// deliver injects a data retrieval response into the results queue.
func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque, pendPool map[string]*fetchRequest,
donePool map[common.Hash]struct{}, reqTimer metrics.Timer, results int, reconstruct func(header *types.Header, index int, result *fetchResult) error) error {
q.lock.Lock() q.lock.Lock()
defer q.lock.Unlock() defer q.lock.Unlock()
if q.blockOffset < offset { // Short circuit if the data was never requested
q.blockOffset = offset request := pendPool[id]
if request == nil {
return errNoFetchesPending
}
reqTimer.UpdateSince(request.Time)
delete(pendPool, id)
// If no data items were retrieved, mark them as unavailable for the origin peer
if results == 0 {
for hash, _ := range request.Headers {
request.Peer.ignored.Add(hash)
}
}
// Assemble each of the results with their headers and retrieved data parts
errs := make([]error, 0)
for i, header := range request.Headers {
// Short circuit assembly if no more fetch results are found
if i >= results {
break
}
// Reconstruct the next result if contents match up
index := int(header.Number.Int64() - int64(q.resultOffset))
if index >= len(q.resultCache) || index < 0 || q.resultCache[index] == nil {
errs = []error{errInvalidChain}
break
}
if err := reconstruct(header, i, q.resultCache[index]); err != nil {
errs = []error{err}
break
}
donePool[header.Hash()] = struct{}{}
q.resultCache[index].Pending--
// Clean up a successful fetch
request.Headers[i] = nil
delete(taskPool, header.Hash())
}
// Return all failed or missing fetches to the queue
for _, header := range request.Headers {
if header != nil {
taskQueue.Push(header, -float32(header.Number.Uint64()))
}
}
// If none of the blocks were good, it's a stale delivery
switch {
case len(errs) == 0:
return nil
case len(errs) == 1 && (errs[0] == errInvalidChain || errs[0] == errInvalidBody || errs[0] == errInvalidReceipt):
return errs[0]
case len(errs) == len(request.Headers):
return errStaleDelivery
default:
return fmt.Errorf("multiple failures: %v", errs)
} }
} }
// Prepare configures the result cache to allow accepting and caching inbound
// fetch results.
func (q *queue) Prepare(offset uint64, parts int) {
q.lock.Lock()
defer q.lock.Unlock()
if q.resultOffset < offset {
q.resultOffset = offset
}
q.resultParts = parts
}

View File

@ -120,15 +120,25 @@ func NewProtocolManager(mode Mode, networkId int, mux *event.TypeMux, txpool txP
return nil, errIncompatibleConfig return nil, errIncompatibleConfig
} }
// Construct the different synchronisation mechanisms // Construct the different synchronisation mechanisms
manager.downloader = downloader.New(manager.eventMux, manager.blockchain.HasBlock, manager.blockchain.GetBlock, manager.blockchain.CurrentBlock, manager.blockchain.GetTd, manager.blockchain.InsertChain, manager.removePeer) var syncMode downloader.SyncMode
switch mode {
case ArchiveMode:
syncMode = downloader.FullSync
case FullMode:
syncMode = downloader.FastSync
case LightMode:
syncMode = downloader.LightSync
}
manager.downloader = downloader.New(syncMode, manager.eventMux, blockchain.HasHeader, blockchain.HasBlock, blockchain.GetHeader, blockchain.GetBlock,
blockchain.CurrentHeader, blockchain.CurrentBlock, blockchain.GetTd, blockchain.InsertHeaderChain, blockchain.InsertChain, nil, manager.removePeer)
validator := func(block *types.Block, parent *types.Block) error { validator := func(block *types.Block, parent *types.Block) error {
return core.ValidateHeader(pow, block.Header(), parent.Header(), true, false) return core.ValidateHeader(pow, block.Header(), parent.Header(), true, false)
} }
heighter := func() uint64 { heighter := func() uint64 {
return manager.blockchain.CurrentBlock().NumberU64() return blockchain.CurrentBlock().NumberU64()
} }
manager.fetcher = fetcher.New(manager.blockchain.GetBlock, validator, manager.BroadcastBlock, heighter, manager.blockchain.InsertChain, manager.removePeer) manager.fetcher = fetcher.New(blockchain.GetBlock, validator, manager.BroadcastBlock, heighter, blockchain.InsertChain, manager.removePeer)
return manager, nil return manager, nil
} }
@ -210,7 +220,7 @@ func (pm *ProtocolManager) handle(p *peer) error {
// Register the peer in the downloader. If the downloader considers it banned, we disconnect // Register the peer in the downloader. If the downloader considers it banned, we disconnect
if err := pm.downloader.RegisterPeer(p.id, p.version, p.Head(), if err := pm.downloader.RegisterPeer(p.id, p.version, p.Head(),
p.RequestHashes, p.RequestHashesFromNumber, p.RequestBlocks, p.RequestHashes, p.RequestHashesFromNumber, p.RequestBlocks,
p.RequestHeadersByHash, p.RequestHeadersByNumber, p.RequestBodies); err != nil { p.RequestHeadersByHash, p.RequestHeadersByNumber, p.RequestBodies, p.RequestReceipts); err != nil {
return err return err
} }
// Propagate existing transactions. new transactions appearing // Propagate existing transactions. new transactions appearing
@ -514,22 +524,31 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
var ( var (
hash common.Hash hash common.Hash
bytes int bytes int
receipts []*types.Receipt receipts []rlp.RawValue
) )
for bytes < softResponseLimit && len(receipts) < downloader.MaxReceiptsFetch { for bytes < softResponseLimit && len(receipts) < downloader.MaxReceiptFetch {
// Retrieve the hash of the next transaction receipt // Retrieve the hash of the next block
if err := msgStream.Decode(&hash); err == rlp.EOL { if err := msgStream.Decode(&hash); err == rlp.EOL {
break break
} else if err != nil { } else if err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err) return errResp(ErrDecode, "msg %v: %v", msg, err)
} }
// Retrieve the requested receipt, stopping if enough was found // Retrieve the requested block's receipts, skipping if unknown to us
if receipt := core.GetReceipt(pm.chaindb, hash); receipt != nil { results := core.GetBlockReceipts(pm.chaindb, hash)
receipts = append(receipts, receipt) if results == nil {
bytes += len(receipt.RlpEncode()) if header := pm.blockchain.GetHeader(hash); header == nil || header.ReceiptHash != types.EmptyRootHash {
continue
} }
} }
return p.SendReceipts(receipts) // If known, encode and queue for response packet
if encoded, err := rlp.EncodeToBytes(results); err != nil {
glog.V(logger.Error).Infof("failed to encode receipt: %v", err)
} else {
receipts = append(receipts, encoded)
bytes += len(encoded)
}
}
return p.SendReceiptsRLP(receipts)
case msg.Code == NewBlockHashesMsg: case msg.Code == NewBlockHashesMsg:
// Retrieve and deseralize the remote new block hashes notification // Retrieve and deseralize the remote new block hashes notification

View File

@ -535,15 +535,12 @@ func testGetReceipt(t *testing.T, protocol int) {
defer peer.close() defer peer.close()
// Collect the hashes to request, and the response to expect // Collect the hashes to request, and the response to expect
hashes := []common.Hash{} hashes, receipts := []common.Hash{}, []types.Receipts{}
for i := uint64(0); i <= pm.blockchain.CurrentBlock().NumberU64(); i++ { for i := uint64(0); i <= pm.blockchain.CurrentBlock().NumberU64(); i++ {
for _, tx := range pm.blockchain.GetBlockByNumber(i).Transactions() { block := pm.blockchain.GetBlockByNumber(i)
hashes = append(hashes, tx.Hash())
} hashes = append(hashes, block.Hash())
} receipts = append(receipts, core.GetBlockReceipts(pm.chaindb, block.Hash()))
receipts := make([]*types.Receipt, len(hashes))
for i, hash := range hashes {
receipts[i] = core.GetReceipt(pm.chaindb, hash)
} }
// Send the hash request and verify the response // Send the hash request and verify the response
p2p.Send(peer.app, 0x0f, hashes) p2p.Send(peer.app, 0x0f, hashes)

View File

@ -197,9 +197,9 @@ func (p *peer) SendNodeData(data [][]byte) error {
return p2p.Send(p.rw, NodeDataMsg, data) return p2p.Send(p.rw, NodeDataMsg, data)
} }
// SendReceipts sends a batch of transaction receipts, corresponding to the ones // SendReceiptsRLP sends a batch of transaction receipts, corresponding to the
// requested. // ones requested from an already RLP encoded format.
func (p *peer) SendReceipts(receipts []*types.Receipt) error { func (p *peer) SendReceiptsRLP(receipts []rlp.RawValue) error {
return p2p.Send(p.rw, ReceiptsMsg, receipts) return p2p.Send(p.rw, ReceiptsMsg, receipts)
} }