eth, les: Refactor downloader peer to use structs

This commit is contained in:
Nick Johnson 2017-06-28 13:25:08 +01:00
parent 0550957989
commit ae11545bc5
7 changed files with 316 additions and 297 deletions

View File

@ -205,27 +205,26 @@ func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, chain BlockC
} }
dl := &Downloader{ dl := &Downloader{
mode: mode, mode: mode,
mux: mux, stateDB: stateDb,
queue: newQueue(), mux: mux,
peers: newPeerSet(), queue: newQueue(),
stateDB: stateDb, peers: newPeerSet(),
rttEstimate: uint64(rttMaxEstimate), rttEstimate: uint64(rttMaxEstimate),
rttConfidence: uint64(1000000), rttConfidence: uint64(1000000),
chain: chain, chain: chain,
lightchain: lightchain, lightchain: lightchain,
dropPeer: dropPeer, dropPeer: dropPeer,
headerCh: make(chan dataPack, 1), headerCh: make(chan dataPack, 1),
bodyCh: make(chan dataPack, 1), bodyCh: make(chan dataPack, 1),
receiptCh: make(chan dataPack, 1), receiptCh: make(chan dataPack, 1),
bodyWakeCh: make(chan bool, 1), bodyWakeCh: make(chan bool, 1),
receiptWakeCh: make(chan bool, 1), receiptWakeCh: make(chan bool, 1),
headerProcCh: make(chan []*types.Header, 1), headerProcCh: make(chan []*types.Header, 1),
quitCh: make(chan struct{}), quitCh: make(chan struct{}),
// for stateFetcher stateCh: make(chan dataPack),
stateSyncStart: make(chan *stateSync), stateSyncStart: make(chan *stateSync),
trackStateReq: make(chan *stateReq), trackStateReq: make(chan *stateReq),
stateCh: make(chan dataPack),
} }
go dl.qosTuner() go dl.qosTuner()
go dl.stateFetcher() go dl.stateFetcher()
@ -269,13 +268,11 @@ func (d *Downloader) Synchronising() bool {
// RegisterPeer injects a new download peer into the set of block source to be // RegisterPeer injects a new download peer into the set of block source to be
// used for fetching hashes and blocks from. // used for fetching hashes and blocks from.
func (d *Downloader) RegisterPeer(id string, version int, currentHead currentHeadRetrievalFn, func (d *Downloader) RegisterPeer(id string, version int, peer Peer) error {
getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn,
getReceipts receiptFetcherFn, getNodeData stateFetcherFn) error {
logger := log.New("peer", id) logger := log.New("peer", id)
logger.Trace("Registering sync peer") logger.Trace("Registering sync peer")
if err := d.peers.Register(newPeer(id, version, currentHead, getRelHeaders, getAbsHeaders, getBlockBodies, getReceipts, getNodeData, logger)); err != nil { if err := d.peers.Register(newPeerConnection(id, version, peer, logger)); err != nil {
logger.Error("Failed to register sync peer", "err", err) logger.Error("Failed to register sync peer", "err", err)
return err return err
} }
@ -395,7 +392,7 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
// syncWithPeer starts a block synchronization based on the hash chain from the // syncWithPeer starts a block synchronization based on the hash chain from the
// specified peer and head hash. // specified peer and head hash.
func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err error) { func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.Int) (err error) {
d.mux.Post(StartEvent{}) d.mux.Post(StartEvent{})
defer func() { defer func() {
// reset on error // reset on error
@ -548,12 +545,12 @@ func (d *Downloader) Terminate() {
// fetchHeight retrieves the head header of the remote peer to aid in estimating // fetchHeight retrieves the head header of the remote peer to aid in estimating
// the total time a pending synchronisation would take. // the total time a pending synchronisation would take.
func (d *Downloader) fetchHeight(p *peer) (*types.Header, error) { func (d *Downloader) fetchHeight(p *peerConnection) (*types.Header, error) {
p.log.Debug("Retrieving remote chain height") p.log.Debug("Retrieving remote chain height")
// Request the advertised remote head block and wait for the response // Request the advertised remote head block and wait for the response
head, _ := p.currentHead() head, _ := p.peer.Head()
go p.getRelHeaders(head, 1, 0, false) go p.peer.RequestHeadersByHash(head, 1, 0, false)
ttl := d.requestTTL() ttl := d.requestTTL()
timeout := time.After(ttl) timeout := time.After(ttl)
@ -594,7 +591,7 @@ func (d *Downloader) fetchHeight(p *peer) (*types.Header, error) {
// on the correct chain, checking the top N links should already get us a match. // on the correct chain, checking the top N links should already get us a match.
// In the rare scenario when we ended up on a long reorganisation (i.e. none of // In the rare scenario when we ended up on a long reorganisation (i.e. none of
// the head links match), we do a binary search to find the common ancestor. // the head links match), we do a binary search to find the common ancestor.
func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) { func (d *Downloader) findAncestor(p *peerConnection, height uint64) (uint64, error) {
// Figure out the valid ancestor range to prevent rewrite attacks // Figure out the valid ancestor range to prevent rewrite attacks
floor, ceil := int64(-1), d.lightchain.CurrentHeader().Number.Uint64() floor, ceil := int64(-1), d.lightchain.CurrentHeader().Number.Uint64()
@ -622,7 +619,7 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
if count > limit { if count > limit {
count = limit count = limit
} }
go p.getAbsHeaders(uint64(from), count, 15, false) go p.peer.RequestHeadersByNumber(uint64(from), count, 15, false)
// Wait for the remote response to the head fetch // Wait for the remote response to the head fetch
number, hash := uint64(0), common.Hash{} number, hash := uint64(0), common.Hash{}
@ -704,7 +701,7 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
ttl := d.requestTTL() ttl := d.requestTTL()
timeout := time.After(ttl) timeout := time.After(ttl)
go p.getAbsHeaders(uint64(check), 1, 0, false) go p.peer.RequestHeadersByNumber(uint64(check), 1, 0, false)
// Wait until a reply arrives to this request // Wait until a reply arrives to this request
for arrived := false; !arrived; { for arrived := false; !arrived; {
@ -765,7 +762,7 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
// other peers are only accepted if they map cleanly to the skeleton. If no one // other peers are only accepted if they map cleanly to the skeleton. If no one
// can fill in the skeleton - not even the origin peer - it's assumed invalid and // can fill in the skeleton - not even the origin peer - it's assumed invalid and
// the origin is dropped. // the origin is dropped.
func (d *Downloader) fetchHeaders(p *peer, from uint64) error { func (d *Downloader) fetchHeaders(p *peerConnection, from uint64) error {
p.log.Debug("Directing header downloads", "origin", from) p.log.Debug("Directing header downloads", "origin", from)
defer p.log.Debug("Header download terminated") defer p.log.Debug("Header download terminated")
@ -785,10 +782,10 @@ func (d *Downloader) fetchHeaders(p *peer, from uint64) error {
if skeleton { if skeleton {
p.log.Trace("Fetching skeleton headers", "count", MaxHeaderFetch, "from", from) p.log.Trace("Fetching skeleton headers", "count", MaxHeaderFetch, "from", from)
go p.getAbsHeaders(from+uint64(MaxHeaderFetch)-1, MaxSkeletonSize, MaxHeaderFetch-1, false) go p.peer.RequestHeadersByNumber(from+uint64(MaxHeaderFetch)-1, MaxSkeletonSize, MaxHeaderFetch-1, false)
} else { } else {
p.log.Trace("Fetching full headers", "count", MaxHeaderFetch, "from", from) p.log.Trace("Fetching full headers", "count", MaxHeaderFetch, "from", from)
go p.getAbsHeaders(from, MaxHeaderFetch, 0, false) go p.peer.RequestHeadersByNumber(from, MaxHeaderFetch, 0, false)
} }
} }
// Start pulling the header chain skeleton until all is done // Start pulling the header chain skeleton until all is done
@ -890,12 +887,12 @@ func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) (
} }
expire = func() map[string]int { return d.queue.ExpireHeaders(d.requestTTL()) } expire = func() map[string]int { return d.queue.ExpireHeaders(d.requestTTL()) }
throttle = func() bool { return false } throttle = func() bool { return false }
reserve = func(p *peer, count int) (*fetchRequest, bool, error) { reserve = func(p *peerConnection, count int) (*fetchRequest, bool, error) {
return d.queue.ReserveHeaders(p, count), false, nil return d.queue.ReserveHeaders(p, count), false, nil
} }
fetch = func(p *peer, req *fetchRequest) error { return p.FetchHeaders(req.From, MaxHeaderFetch) } fetch = func(p *peerConnection, req *fetchRequest) error { return p.FetchHeaders(req.From, MaxHeaderFetch) }
capacity = func(p *peer) int { return p.HeaderCapacity(d.requestRTT()) } capacity = func(p *peerConnection) int { return p.HeaderCapacity(d.requestRTT()) }
setIdle = func(p *peer, accepted int) { p.SetHeadersIdle(accepted) } setIdle = func(p *peerConnection, accepted int) { p.SetHeadersIdle(accepted) }
) )
err := d.fetchParts(errCancelHeaderFetch, d.headerCh, deliver, d.queue.headerContCh, expire, err := d.fetchParts(errCancelHeaderFetch, d.headerCh, deliver, d.queue.headerContCh, expire,
d.queue.PendingHeaders, d.queue.InFlightHeaders, throttle, reserve, d.queue.PendingHeaders, d.queue.InFlightHeaders, throttle, reserve,
@ -919,9 +916,9 @@ func (d *Downloader) fetchBodies(from uint64) error {
return d.queue.DeliverBodies(pack.peerId, pack.transactions, pack.uncles) return d.queue.DeliverBodies(pack.peerId, pack.transactions, pack.uncles)
} }
expire = func() map[string]int { return d.queue.ExpireBodies(d.requestTTL()) } expire = func() map[string]int { return d.queue.ExpireBodies(d.requestTTL()) }
fetch = func(p *peer, req *fetchRequest) error { return p.FetchBodies(req) } fetch = func(p *peerConnection, req *fetchRequest) error { return p.FetchBodies(req) }
capacity = func(p *peer) int { return p.BlockCapacity(d.requestRTT()) } capacity = func(p *peerConnection) int { return p.BlockCapacity(d.requestRTT()) }
setIdle = func(p *peer, accepted int) { p.SetBodiesIdle(accepted) } setIdle = func(p *peerConnection, accepted int) { p.SetBodiesIdle(accepted) }
) )
err := d.fetchParts(errCancelBodyFetch, d.bodyCh, deliver, d.bodyWakeCh, expire, err := d.fetchParts(errCancelBodyFetch, d.bodyCh, deliver, d.bodyWakeCh, expire,
d.queue.PendingBlocks, d.queue.InFlightBlocks, d.queue.ShouldThrottleBlocks, d.queue.ReserveBodies, d.queue.PendingBlocks, d.queue.InFlightBlocks, d.queue.ShouldThrottleBlocks, d.queue.ReserveBodies,
@ -943,9 +940,9 @@ func (d *Downloader) fetchReceipts(from uint64) error {
return d.queue.DeliverReceipts(pack.peerId, pack.receipts) return d.queue.DeliverReceipts(pack.peerId, pack.receipts)
} }
expire = func() map[string]int { return d.queue.ExpireReceipts(d.requestTTL()) } expire = func() map[string]int { return d.queue.ExpireReceipts(d.requestTTL()) }
fetch = func(p *peer, req *fetchRequest) error { return p.FetchReceipts(req) } fetch = func(p *peerConnection, req *fetchRequest) error { return p.FetchReceipts(req) }
capacity = func(p *peer) int { return p.ReceiptCapacity(d.requestRTT()) } capacity = func(p *peerConnection) int { return p.ReceiptCapacity(d.requestRTT()) }
setIdle = func(p *peer, accepted int) { p.SetReceiptsIdle(accepted) } setIdle = func(p *peerConnection, accepted int) { p.SetReceiptsIdle(accepted) }
) )
err := d.fetchParts(errCancelReceiptFetch, d.receiptCh, deliver, d.receiptWakeCh, expire, err := d.fetchParts(errCancelReceiptFetch, d.receiptCh, deliver, d.receiptWakeCh, expire,
d.queue.PendingReceipts, d.queue.InFlightReceipts, d.queue.ShouldThrottleReceipts, d.queue.ReserveReceipts, d.queue.PendingReceipts, d.queue.InFlightReceipts, d.queue.ShouldThrottleReceipts, d.queue.ReserveReceipts,
@ -981,9 +978,9 @@ func (d *Downloader) fetchReceipts(from uint64) error {
// - setIdle: network callback to set a peer back to idle and update its estimated capacity (traffic shaping) // - setIdle: network callback to set a peer back to idle and update its estimated capacity (traffic shaping)
// - kind: textual label of the type being downloaded to display in log mesages // - kind: textual label of the type being downloaded to display in log mesages
func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliver func(dataPack) (int, error), wakeCh chan bool, func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliver func(dataPack) (int, error), wakeCh chan bool,
expire func() map[string]int, pending func() int, inFlight func() bool, throttle func() bool, reserve func(*peer, int) (*fetchRequest, bool, error), expire func() map[string]int, pending func() int, inFlight func() bool, throttle func() bool, reserve func(*peerConnection, int) (*fetchRequest, bool, error),
fetchHook func([]*types.Header), fetch func(*peer, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peer) int, fetchHook func([]*types.Header), fetch func(*peerConnection, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peerConnection) int,
idle func() ([]*peer, int), setIdle func(*peer, int), kind string) error { idle func() ([]*peerConnection, int), setIdle func(*peerConnection, int), kind string) error {
// Create a ticker to detect expired retrieval tasks // Create a ticker to detect expired retrieval tasks
ticker := time.NewTicker(100 * time.Millisecond) ticker := time.NewTicker(100 * time.Millisecond)
@ -1261,7 +1258,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
if chunk[len(chunk)-1].Number.Uint64()+uint64(fsHeaderForceVerify) > pivot { if chunk[len(chunk)-1].Number.Uint64()+uint64(fsHeaderForceVerify) > pivot {
frequency = 1 frequency = 1
} }
if n, err := d.chain.InsertHeaderChain(chunk, frequency); err != nil { if n, err := d.lightchain.InsertHeaderChain(chunk, frequency); err != nil {
// If some headers were inserted, add them too to the rollback list // If some headers were inserted, add them too to the rollback list
if n > 0 { if n > 0 {
rollback = append(rollback, chunk[:n]...) rollback = append(rollback, chunk[:n]...)

View File

@ -404,14 +404,7 @@ func (dl *downloadTester) newSlowPeer(id string, version int, hashes []common.Ha
defer dl.lock.Unlock() defer dl.lock.Unlock()
var err error var err error
switch version { err = dl.downloader.RegisterPeer(id, version, &downloadTesterPeer{dl, id, delay})
case 62:
err = dl.downloader.RegisterPeer(id, version, dl.peerCurrentHeadFn(id), dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay), nil, nil)
case 63:
err = dl.downloader.RegisterPeer(id, version, dl.peerCurrentHeadFn(id), dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay), dl.peerGetReceiptsFn(id, delay), dl.peerGetNodeDataFn(id, delay))
case 64:
err = dl.downloader.RegisterPeer(id, version, dl.peerCurrentHeadFn(id), dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay), dl.peerGetReceiptsFn(id, delay), dl.peerGetNodeDataFn(id, delay))
}
if err == nil { if err == nil {
// Assign the owned hashes, headers and blocks to the peer (deep copy) // Assign the owned hashes, headers and blocks to the peer (deep copy)
dl.peerHashes[id] = make([]common.Hash, len(hashes)) dl.peerHashes[id] = make([]common.Hash, len(hashes))
@ -469,139 +462,133 @@ func (dl *downloadTester) dropPeer(id string) {
dl.downloader.UnregisterPeer(id) dl.downloader.UnregisterPeer(id)
} }
// peerCurrentHeadFn constructs a function to retrieve a peer's current head hash type downloadTesterPeer struct {
dl *downloadTester
id string
delay time.Duration
}
// Head constructs a function to retrieve a peer's current head hash
// and total difficulty. // and total difficulty.
func (dl *downloadTester) peerCurrentHeadFn(id string) func() (common.Hash, *big.Int) { func (dlp *downloadTesterPeer) Head() (common.Hash, *big.Int) {
return func() (common.Hash, *big.Int) { dlp.dl.lock.RLock()
dl.lock.RLock() defer dlp.dl.lock.RUnlock()
defer dl.lock.RUnlock()
return dl.peerHashes[id][0], nil return dlp.dl.peerHashes[dlp.id][0], nil
}
} }
// peerGetRelHeadersFn constructs a GetBlockHeaders function based on a hashed // RequestHeadersByHash constructs a GetBlockHeaders function based on a hashed
// origin; associated with a particular peer in the download tester. The returned // origin; associated with a particular peer in the download tester. The returned
// function can be used to retrieve batches of headers from the particular peer. // function can be used to retrieve batches of headers from the particular peer.
func (dl *downloadTester) peerGetRelHeadersFn(id string, delay time.Duration) func(common.Hash, int, int, bool) error { func (dlp *downloadTesterPeer) RequestHeadersByHash(origin common.Hash, amount int, skip int, reverse bool) error {
return func(origin common.Hash, amount int, skip int, reverse bool) error { // Find the canonical number of the hash
// Find the canonical number of the hash dlp.dl.lock.RLock()
dl.lock.RLock() number := uint64(0)
number := uint64(0) for num, hash := range dlp.dl.peerHashes[dlp.id] {
for num, hash := range dl.peerHashes[id] { if hash == origin {
if hash == origin { number = uint64(len(dlp.dl.peerHashes[dlp.id]) - num - 1)
number = uint64(len(dl.peerHashes[id]) - num - 1) break
break
}
} }
dl.lock.RUnlock()
// Use the absolute header fetcher to satisfy the query
return dl.peerGetAbsHeadersFn(id, delay)(number, amount, skip, reverse)
} }
dlp.dl.lock.RUnlock()
// Use the absolute header fetcher to satisfy the query
return dlp.RequestHeadersByNumber(number, amount, skip, reverse)
} }
// peerGetAbsHeadersFn constructs a GetBlockHeaders function based on a numbered // RequestHeadersByNumber constructs a GetBlockHeaders function based on a numbered
// origin; associated with a particular peer in the download tester. The returned // origin; associated with a particular peer in the download tester. The returned
// function can be used to retrieve batches of headers from the particular peer. // function can be used to retrieve batches of headers from the particular peer.
func (dl *downloadTester) peerGetAbsHeadersFn(id string, delay time.Duration) func(uint64, int, int, bool) error { func (dlp *downloadTesterPeer) RequestHeadersByNumber(origin uint64, amount int, skip int, reverse bool) error {
return func(origin uint64, amount int, skip int, reverse bool) error { time.Sleep(dlp.delay)
time.Sleep(delay)
dl.lock.RLock() dlp.dl.lock.RLock()
defer dl.lock.RUnlock() defer dlp.dl.lock.RUnlock()
// Gather the next batch of headers // Gather the next batch of headers
hashes := dl.peerHashes[id] hashes := dlp.dl.peerHashes[dlp.id]
headers := dl.peerHeaders[id] headers := dlp.dl.peerHeaders[dlp.id]
result := make([]*types.Header, 0, amount) result := make([]*types.Header, 0, amount)
for i := 0; i < amount && len(hashes)-int(origin)-1-i*(skip+1) >= 0; i++ { for i := 0; i < amount && len(hashes)-int(origin)-1-i*(skip+1) >= 0; i++ {
if header, ok := headers[hashes[len(hashes)-int(origin)-1-i*(skip+1)]]; ok { if header, ok := headers[hashes[len(hashes)-int(origin)-1-i*(skip+1)]]; ok {
result = append(result, header) result = append(result, header)
}
} }
// Delay delivery a bit to allow attacks to unfold
go func() {
time.Sleep(time.Millisecond)
dl.downloader.DeliverHeaders(id, result)
}()
return nil
} }
// Delay delivery a bit to allow attacks to unfold
go func() {
time.Sleep(time.Millisecond)
dlp.dl.downloader.DeliverHeaders(dlp.id, result)
}()
return nil
} }
// peerGetBodiesFn constructs a getBlockBodies method associated with a particular // RequestBodies constructs a getBlockBodies method associated with a particular
// peer in the download tester. The returned function can be used to retrieve // peer in the download tester. The returned function can be used to retrieve
// batches of block bodies from the particularly requested peer. // batches of block bodies from the particularly requested peer.
func (dl *downloadTester) peerGetBodiesFn(id string, delay time.Duration) func([]common.Hash) error { func (dlp *downloadTesterPeer) RequestBodies(hashes []common.Hash) error {
return func(hashes []common.Hash) error { time.Sleep(dlp.delay)
time.Sleep(delay)
dl.lock.RLock() dlp.dl.lock.RLock()
defer dl.lock.RUnlock() defer dlp.dl.lock.RUnlock()
blocks := dl.peerBlocks[id] blocks := dlp.dl.peerBlocks[dlp.id]
transactions := make([][]*types.Transaction, 0, len(hashes)) transactions := make([][]*types.Transaction, 0, len(hashes))
uncles := make([][]*types.Header, 0, len(hashes)) uncles := make([][]*types.Header, 0, len(hashes))
for _, hash := range hashes { for _, hash := range hashes {
if block, ok := blocks[hash]; ok { if block, ok := blocks[hash]; ok {
transactions = append(transactions, block.Transactions()) transactions = append(transactions, block.Transactions())
uncles = append(uncles, block.Uncles()) uncles = append(uncles, block.Uncles())
}
} }
go dl.downloader.DeliverBodies(id, transactions, uncles)
return nil
} }
go dlp.dl.downloader.DeliverBodies(dlp.id, transactions, uncles)
return nil
} }
// peerGetReceiptsFn constructs a getReceipts method associated with a particular // RequestReceipts constructs a getReceipts method associated with a particular
// peer in the download tester. The returned function can be used to retrieve // peer in the download tester. The returned function can be used to retrieve
// batches of block receipts from the particularly requested peer. // batches of block receipts from the particularly requested peer.
func (dl *downloadTester) peerGetReceiptsFn(id string, delay time.Duration) func([]common.Hash) error { func (dlp *downloadTesterPeer) RequestReceipts(hashes []common.Hash) error {
return func(hashes []common.Hash) error { time.Sleep(dlp.delay)
time.Sleep(delay)
dl.lock.RLock() dlp.dl.lock.RLock()
defer dl.lock.RUnlock() defer dlp.dl.lock.RUnlock()
receipts := dl.peerReceipts[id] receipts := dlp.dl.peerReceipts[dlp.id]
results := make([][]*types.Receipt, 0, len(hashes)) results := make([][]*types.Receipt, 0, len(hashes))
for _, hash := range hashes { for _, hash := range hashes {
if receipt, ok := receipts[hash]; ok { if receipt, ok := receipts[hash]; ok {
results = append(results, receipt) results = append(results, receipt)
}
} }
go dl.downloader.DeliverReceipts(id, results)
return nil
} }
go dlp.dl.downloader.DeliverReceipts(dlp.id, results)
return nil
} }
// peerGetNodeDataFn constructs a getNodeData method associated with a particular // RequestNodeData constructs a getNodeData method associated with a particular
// peer in the download tester. The returned function can be used to retrieve // peer in the download tester. The returned function can be used to retrieve
// batches of node state data from the particularly requested peer. // batches of node state data from the particularly requested peer.
func (dl *downloadTester) peerGetNodeDataFn(id string, delay time.Duration) func([]common.Hash) error { func (dlp *downloadTesterPeer) RequestNodeData(hashes []common.Hash) error {
return func(hashes []common.Hash) error { time.Sleep(dlp.delay)
time.Sleep(delay)
dl.lock.RLock() dlp.dl.lock.RLock()
defer dl.lock.RUnlock() defer dlp.dl.lock.RUnlock()
results := make([][]byte, 0, len(hashes)) results := make([][]byte, 0, len(hashes))
for _, hash := range hashes { for _, hash := range hashes {
if data, err := dl.peerDb.Get(hash.Bytes()); err == nil { if data, err := dlp.dl.peerDb.Get(hash.Bytes()); err == nil {
if !dl.peerMissingStates[id][hash] { if !dlp.dl.peerMissingStates[dlp.id][hash] {
results = append(results, data) results = append(results, data)
}
} }
} }
go dl.downloader.DeliverNodeData(id, results)
return nil
} }
go dlp.dl.downloader.DeliverNodeData(dlp.id, results)
return nil
} }
// assertOwnChain checks if the local chain contains the correct number of items // assertOwnChain checks if the local chain contains the correct number of items
@ -1668,6 +1655,48 @@ func TestDeliverHeadersHang64Full(t *testing.T) { testDeliverHeadersHang(t, 64,
func TestDeliverHeadersHang64Fast(t *testing.T) { testDeliverHeadersHang(t, 64, FastSync) } func TestDeliverHeadersHang64Fast(t *testing.T) { testDeliverHeadersHang(t, 64, FastSync) }
func TestDeliverHeadersHang64Light(t *testing.T) { testDeliverHeadersHang(t, 64, LightSync) } func TestDeliverHeadersHang64Light(t *testing.T) { testDeliverHeadersHang(t, 64, LightSync) }
type floodingTestPeer struct {
peer Peer
tester *downloadTester
}
func (ftp *floodingTestPeer) Head() (common.Hash, *big.Int) { return ftp.peer.Head() }
func (ftp *floodingTestPeer) RequestHeadersByHash(hash common.Hash, count int, skip int, reverse bool) error {
return ftp.peer.RequestHeadersByHash(hash, count, skip, reverse)
}
func (ftp *floodingTestPeer) RequestBodies(hashes []common.Hash) error {
return ftp.peer.RequestBodies(hashes)
}
func (ftp *floodingTestPeer) RequestReceipts(hashes []common.Hash) error {
return ftp.peer.RequestReceipts(hashes)
}
func (ftp *floodingTestPeer) RequestNodeData(hashes []common.Hash) error {
return ftp.peer.RequestNodeData(hashes)
}
func (ftp *floodingTestPeer) RequestHeadersByNumber(from uint64, count, skip int, reverse bool) error {
deliveriesDone := make(chan struct{}, 500)
for i := 0; i < cap(deliveriesDone); i++ {
peer := fmt.Sprintf("fake-peer%d", i)
go func() {
ftp.tester.downloader.DeliverHeaders(peer, []*types.Header{{}, {}, {}, {}})
deliveriesDone <- struct{}{}
}()
}
// Deliver the actual requested headers.
go ftp.peer.RequestHeadersByNumber(from, count, skip, reverse)
// None of the extra deliveries should block.
timeout := time.After(15 * time.Second)
for i := 0; i < cap(deliveriesDone); i++ {
select {
case <-deliveriesDone:
case <-timeout:
panic("blocked")
}
}
return nil
}
func testDeliverHeadersHang(t *testing.T, protocol int, mode SyncMode) { func testDeliverHeadersHang(t *testing.T, protocol int, mode SyncMode) {
t.Parallel() t.Parallel()
@ -1675,7 +1704,6 @@ func testDeliverHeadersHang(t *testing.T, protocol int, mode SyncMode) {
defer master.terminate() defer master.terminate()
hashes, headers, blocks, receipts := master.makeChain(5, 0, master.genesis, nil, false) hashes, headers, blocks, receipts := master.makeChain(5, 0, master.genesis, nil, false)
fakeHeads := []*types.Header{{}, {}, {}, {}}
for i := 0; i < 200; i++ { for i := 0; i < 200; i++ {
tester := newTester() tester := newTester()
tester.peerDb = master.peerDb tester.peerDb = master.peerDb
@ -1683,29 +1711,11 @@ func testDeliverHeadersHang(t *testing.T, protocol int, mode SyncMode) {
tester.newPeer("peer", protocol, hashes, headers, blocks, receipts) tester.newPeer("peer", protocol, hashes, headers, blocks, receipts)
// Whenever the downloader requests headers, flood it with // Whenever the downloader requests headers, flood it with
// a lot of unrequested header deliveries. // a lot of unrequested header deliveries.
tester.downloader.peers.peers["peer"].getAbsHeaders = func(from uint64, count, skip int, reverse bool) error { tester.downloader.peers.peers["peer"].peer = &floodingTestPeer{
deliveriesDone := make(chan struct{}, 500) tester.downloader.peers.peers["peer"].peer,
for i := 0; i < cap(deliveriesDone); i++ { tester,
peer := fmt.Sprintf("fake-peer%d", i)
go func() {
tester.downloader.DeliverHeaders(peer, fakeHeads)
deliveriesDone <- struct{}{}
}()
}
// Deliver the actual requested headers.
impl := tester.peerGetAbsHeadersFn("peer", 0)
go impl(from, count, skip, reverse)
// None of the extra deliveries should block.
timeout := time.After(15 * time.Second)
for i := 0; i < cap(deliveriesDone); i++ {
select {
case <-deliveriesDone:
case <-timeout:
panic("blocked")
}
}
return nil
} }
if err := tester.sync("peer", nil, mode); err != nil { if err := tester.sync("peer", nil, mode); err != nil {
t.Errorf("sync failed: %v", err) t.Errorf("sync failed: %v", err)
} }
@ -1737,7 +1747,7 @@ func testFastCriticalRestarts(t *testing.T, protocol int, progress bool) {
for i := 0; i < fsPivotInterval; i++ { for i := 0; i < fsPivotInterval; i++ {
tester.peerMissingStates["peer"][headers[hashes[fsMinFullBlocks+i]].Root] = true tester.peerMissingStates["peer"][headers[hashes[fsMinFullBlocks+i]].Root] = true
} }
tester.downloader.peers.peers["peer"].getNodeData = tester.peerGetNodeDataFn("peer", 500*time.Millisecond) // Enough to reach the critical section (tester.downloader.peers.peers["peer"].peer).(*downloadTesterPeer).delay = 500 * time.Millisecond // Enough to reach the critical section
// Synchronise with the peer a few times and make sure they fail until the retry limit // Synchronise with the peer a few times and make sure they fail until the retry limit
for i := 0; i < int(fsCriticalTrials)-1; i++ { for i := 0; i < int(fsCriticalTrials)-1; i++ {
@ -1756,7 +1766,7 @@ func testFastCriticalRestarts(t *testing.T, protocol int, progress bool) {
tester.lock.Lock() tester.lock.Lock()
tester.peerHeaders["peer"][hashes[fsMinFullBlocks-1]] = headers[hashes[fsMinFullBlocks-1]] tester.peerHeaders["peer"][hashes[fsMinFullBlocks-1]] = headers[hashes[fsMinFullBlocks-1]]
tester.peerMissingStates["peer"] = map[common.Hash]bool{tester.downloader.fsPivotLock.Root: true} tester.peerMissingStates["peer"] = map[common.Hash]bool{tester.downloader.fsPivotLock.Root: true}
tester.downloader.peers.peers["peer"].getNodeData = tester.peerGetNodeDataFn("peer", 0) (tester.downloader.peers.peers["peer"].peer).(*downloadTesterPeer).delay = 0
tester.lock.Unlock() tester.lock.Unlock()
} }
} }

View File

@ -39,16 +39,6 @@ const (
measurementImpact = 0.1 // The impact a single measurement has on a peer's final throughput value. measurementImpact = 0.1 // The impact a single measurement has on a peer's final throughput value.
) )
// Head hash and total difficulty retriever for
type currentHeadRetrievalFn func() (common.Hash, *big.Int)
// Block header and body fetchers belonging to eth/62 and above
type relativeHeaderFetcherFn func(common.Hash, int, int, bool) error
type absoluteHeaderFetcherFn func(uint64, int, int, bool) error
type blockBodyFetcherFn func([]common.Hash) error
type receiptFetcherFn func([]common.Hash) error
type stateFetcherFn func([]common.Hash) error
var ( var (
errAlreadyFetching = errors.New("already fetching blocks from peer") errAlreadyFetching = errors.New("already fetching blocks from peer")
errAlreadyRegistered = errors.New("peer is already registered") errAlreadyRegistered = errors.New("peer is already registered")
@ -56,7 +46,7 @@ var (
) )
// peer represents an active peer from which hashes and blocks are retrieved. // peer represents an active peer from which hashes and blocks are retrieved.
type peer struct { type peerConnection struct {
id string // Unique identifier of the peer id string // Unique identifier of the peer
headerIdle int32 // Current header activity state of the peer (idle = 0, active = 1) headerIdle int32 // Current header activity state of the peer (idle = 0, active = 1)
@ -78,37 +68,31 @@ type peer struct {
lacking map[common.Hash]struct{} // Set of hashes not to request (didn't have previously) lacking map[common.Hash]struct{} // Set of hashes not to request (didn't have previously)
currentHead currentHeadRetrievalFn // Method to fetch the currently known head of the peer peer Peer
getRelHeaders relativeHeaderFetcherFn // [eth/62] Method to retrieve a batch of headers from an origin hash
getAbsHeaders absoluteHeaderFetcherFn // [eth/62] Method to retrieve a batch of headers from an absolute position
getBlockBodies blockBodyFetcherFn // [eth/62] Method to retrieve a batch of block bodies
getReceipts receiptFetcherFn // [eth/63] Method to retrieve a batch of block transaction receipts
getNodeData stateFetcherFn // [eth/63] Method to retrieve a batch of state trie data
version int // Eth protocol version number to switch strategies version int // Eth protocol version number to switch strategies
log log.Logger // Contextual logger to add extra infos to peer logs log log.Logger // Contextual logger to add extra infos to peer logs
lock sync.RWMutex lock sync.RWMutex
} }
type Peer interface {
Head() (common.Hash, *big.Int)
RequestHeadersByHash(common.Hash, int, int, bool) error
RequestHeadersByNumber(uint64, int, int, bool) error
RequestBodies([]common.Hash) error
RequestReceipts([]common.Hash) error
RequestNodeData([]common.Hash) error
}
// newPeer create a new downloader peer, with specific hash and block retrieval // newPeer create a new downloader peer, with specific hash and block retrieval
// mechanisms. // mechanisms.
func newPeer(id string, version int, currentHead currentHeadRetrievalFn, func newPeerConnection(id string, version int, peer Peer, logger log.Logger) *peerConnection {
getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn,
getReceipts receiptFetcherFn, getNodeData stateFetcherFn, logger log.Logger) *peer {
return &peer{ return &peerConnection{
id: id, id: id,
lacking: make(map[common.Hash]struct{}), lacking: make(map[common.Hash]struct{}),
currentHead: currentHead, peer: peer,
getRelHeaders: getRelHeaders,
getAbsHeaders: getAbsHeaders,
getBlockBodies: getBlockBodies,
getReceipts: getReceipts,
getNodeData: getNodeData,
version: version, version: version,
log: logger, log: logger,
@ -116,7 +100,7 @@ func newPeer(id string, version int, currentHead currentHeadRetrievalFn,
} }
// Reset clears the internal state of a peer entity. // Reset clears the internal state of a peer entity.
func (p *peer) Reset() { func (p *peerConnection) Reset() {
p.lock.Lock() p.lock.Lock()
defer p.lock.Unlock() defer p.lock.Unlock()
@ -134,7 +118,7 @@ func (p *peer) Reset() {
} }
// FetchHeaders sends a header retrieval request to the remote peer. // FetchHeaders sends a header retrieval request to the remote peer.
func (p *peer) FetchHeaders(from uint64, count int) error { func (p *peerConnection) FetchHeaders(from uint64, count int) error {
// Sanity check the protocol version // Sanity check the protocol version
if p.version < 62 { if p.version < 62 {
panic(fmt.Sprintf("header fetch [eth/62+] requested on eth/%d", p.version)) panic(fmt.Sprintf("header fetch [eth/62+] requested on eth/%d", p.version))
@ -146,13 +130,13 @@ func (p *peer) FetchHeaders(from uint64, count int) error {
p.headerStarted = time.Now() p.headerStarted = time.Now()
// Issue the header retrieval request (absolut upwards without gaps) // Issue the header retrieval request (absolut upwards without gaps)
go p.getAbsHeaders(from, count, 0, false) go p.peer.RequestHeadersByNumber(from, count, 0, false)
return nil return nil
} }
// FetchBodies sends a block body retrieval request to the remote peer. // FetchBodies sends a block body retrieval request to the remote peer.
func (p *peer) FetchBodies(request *fetchRequest) error { func (p *peerConnection) FetchBodies(request *fetchRequest) error {
// Sanity check the protocol version // Sanity check the protocol version
if p.version < 62 { if p.version < 62 {
panic(fmt.Sprintf("body fetch [eth/62+] requested on eth/%d", p.version)) panic(fmt.Sprintf("body fetch [eth/62+] requested on eth/%d", p.version))
@ -168,13 +152,13 @@ func (p *peer) FetchBodies(request *fetchRequest) error {
for _, header := range request.Headers { for _, header := range request.Headers {
hashes = append(hashes, header.Hash()) hashes = append(hashes, header.Hash())
} }
go p.getBlockBodies(hashes) go p.peer.RequestBodies(hashes)
return nil return nil
} }
// FetchReceipts sends a receipt retrieval request to the remote peer. // FetchReceipts sends a receipt retrieval request to the remote peer.
func (p *peer) FetchReceipts(request *fetchRequest) error { func (p *peerConnection) FetchReceipts(request *fetchRequest) error {
// Sanity check the protocol version // Sanity check the protocol version
if p.version < 63 { if p.version < 63 {
panic(fmt.Sprintf("body fetch [eth/63+] requested on eth/%d", p.version)) panic(fmt.Sprintf("body fetch [eth/63+] requested on eth/%d", p.version))
@ -190,13 +174,13 @@ func (p *peer) FetchReceipts(request *fetchRequest) error {
for _, header := range request.Headers { for _, header := range request.Headers {
hashes = append(hashes, header.Hash()) hashes = append(hashes, header.Hash())
} }
go p.getReceipts(hashes) go p.peer.RequestReceipts(hashes)
return nil return nil
} }
// FetchNodeData sends a node state data retrieval request to the remote peer. // FetchNodeData sends a node state data retrieval request to the remote peer.
func (p *peer) FetchNodeData(hashes []common.Hash) error { func (p *peerConnection) FetchNodeData(hashes []common.Hash) error {
// Sanity check the protocol version // Sanity check the protocol version
if p.version < 63 { if p.version < 63 {
panic(fmt.Sprintf("node data fetch [eth/63+] requested on eth/%d", p.version)) panic(fmt.Sprintf("node data fetch [eth/63+] requested on eth/%d", p.version))
@ -206,48 +190,50 @@ func (p *peer) FetchNodeData(hashes []common.Hash) error {
return errAlreadyFetching return errAlreadyFetching
} }
p.stateStarted = time.Now() p.stateStarted = time.Now()
go p.getNodeData(hashes)
go p.peer.RequestNodeData(hashes)
return nil return nil
} }
// SetHeadersIdle sets the peer to idle, allowing it to execute new header retrieval // SetHeadersIdle sets the peer to idle, allowing it to execute new header retrieval
// requests. Its estimated header retrieval throughput is updated with that measured // requests. Its estimated header retrieval throughput is updated with that measured
// just now. // just now.
func (p *peer) SetHeadersIdle(delivered int) { func (p *peerConnection) SetHeadersIdle(delivered int) {
p.setIdle(p.headerStarted, delivered, &p.headerThroughput, &p.headerIdle) p.setIdle(p.headerStarted, delivered, &p.headerThroughput, &p.headerIdle)
} }
// SetBlocksIdle sets the peer to idle, allowing it to execute new block retrieval // SetBlocksIdle sets the peer to idle, allowing it to execute new block retrieval
// requests. Its estimated block retrieval throughput is updated with that measured // requests. Its estimated block retrieval throughput is updated with that measured
// just now. // just now.
func (p *peer) SetBlocksIdle(delivered int) { func (p *peerConnection) SetBlocksIdle(delivered int) {
p.setIdle(p.blockStarted, delivered, &p.blockThroughput, &p.blockIdle) p.setIdle(p.blockStarted, delivered, &p.blockThroughput, &p.blockIdle)
} }
// SetBodiesIdle sets the peer to idle, allowing it to execute block body retrieval // SetBodiesIdle sets the peer to idle, allowing it to execute block body retrieval
// requests. Its estimated body retrieval throughput is updated with that measured // requests. Its estimated body retrieval throughput is updated with that measured
// just now. // just now.
func (p *peer) SetBodiesIdle(delivered int) { func (p *peerConnection) SetBodiesIdle(delivered int) {
p.setIdle(p.blockStarted, delivered, &p.blockThroughput, &p.blockIdle) p.setIdle(p.blockStarted, delivered, &p.blockThroughput, &p.blockIdle)
} }
// SetReceiptsIdle sets the peer to idle, allowing it to execute new receipt // SetReceiptsIdle sets the peer to idle, allowing it to execute new receipt
// retrieval requests. Its estimated receipt retrieval throughput is updated // retrieval requests. Its estimated receipt retrieval throughput is updated
// with that measured just now. // with that measured just now.
func (p *peer) SetReceiptsIdle(delivered int) { func (p *peerConnection) SetReceiptsIdle(delivered int) {
p.setIdle(p.receiptStarted, delivered, &p.receiptThroughput, &p.receiptIdle) p.setIdle(p.receiptStarted, delivered, &p.receiptThroughput, &p.receiptIdle)
} }
// SetNodeDataIdle sets the peer to idle, allowing it to execute new state trie // SetNodeDataIdle sets the peer to idle, allowing it to execute new state trie
// data retrieval requests. Its estimated state retrieval throughput is updated // data retrieval requests. Its estimated state retrieval throughput is updated
// with that measured just now. // with that measured just now.
func (p *peer) SetNodeDataIdle(delivered int) { func (p *peerConnection) SetNodeDataIdle(delivered int) {
p.setIdle(p.stateStarted, delivered, &p.stateThroughput, &p.stateIdle) p.setIdle(p.stateStarted, delivered, &p.stateThroughput, &p.stateIdle)
} }
// setIdle sets the peer to idle, allowing it to execute new retrieval requests. // setIdle sets the peer to idle, allowing it to execute new retrieval requests.
// Its estimated retrieval throughput is updated with that measured just now. // Its estimated retrieval throughput is updated with that measured just now.
func (p *peer) setIdle(started time.Time, delivered int, throughput *float64, idle *int32) { func (p *peerConnection) setIdle(started time.Time, delivered int, throughput *float64, idle *int32) {
// Irrelevant of the scaling, make sure the peer ends up idle // Irrelevant of the scaling, make sure the peer ends up idle
defer atomic.StoreInt32(idle, 0) defer atomic.StoreInt32(idle, 0)
@ -274,7 +260,7 @@ func (p *peer) setIdle(started time.Time, delivered int, throughput *float64, id
// HeaderCapacity retrieves the peers header download allowance based on its // HeaderCapacity retrieves the peers header download allowance based on its
// previously discovered throughput. // previously discovered throughput.
func (p *peer) HeaderCapacity(targetRTT time.Duration) int { func (p *peerConnection) HeaderCapacity(targetRTT time.Duration) int {
p.lock.RLock() p.lock.RLock()
defer p.lock.RUnlock() defer p.lock.RUnlock()
@ -283,7 +269,7 @@ func (p *peer) HeaderCapacity(targetRTT time.Duration) int {
// BlockCapacity retrieves the peers block download allowance based on its // BlockCapacity retrieves the peers block download allowance based on its
// previously discovered throughput. // previously discovered throughput.
func (p *peer) BlockCapacity(targetRTT time.Duration) int { func (p *peerConnection) BlockCapacity(targetRTT time.Duration) int {
p.lock.RLock() p.lock.RLock()
defer p.lock.RUnlock() defer p.lock.RUnlock()
@ -292,7 +278,7 @@ func (p *peer) BlockCapacity(targetRTT time.Duration) int {
// ReceiptCapacity retrieves the peers receipt download allowance based on its // ReceiptCapacity retrieves the peers receipt download allowance based on its
// previously discovered throughput. // previously discovered throughput.
func (p *peer) ReceiptCapacity(targetRTT time.Duration) int { func (p *peerConnection) ReceiptCapacity(targetRTT time.Duration) int {
p.lock.RLock() p.lock.RLock()
defer p.lock.RUnlock() defer p.lock.RUnlock()
@ -301,7 +287,7 @@ func (p *peer) ReceiptCapacity(targetRTT time.Duration) int {
// NodeDataCapacity retrieves the peers state download allowance based on its // NodeDataCapacity retrieves the peers state download allowance based on its
// previously discovered throughput. // previously discovered throughput.
func (p *peer) NodeDataCapacity(targetRTT time.Duration) int { func (p *peerConnection) NodeDataCapacity(targetRTT time.Duration) int {
p.lock.RLock() p.lock.RLock()
defer p.lock.RUnlock() defer p.lock.RUnlock()
@ -311,7 +297,7 @@ func (p *peer) NodeDataCapacity(targetRTT time.Duration) int {
// MarkLacking appends a new entity to the set of items (blocks, receipts, states) // MarkLacking appends a new entity to the set of items (blocks, receipts, states)
// that a peer is known not to have (i.e. have been requested before). If the // that a peer is known not to have (i.e. have been requested before). If the
// set reaches its maximum allowed capacity, items are randomly dropped off. // set reaches its maximum allowed capacity, items are randomly dropped off.
func (p *peer) MarkLacking(hash common.Hash) { func (p *peerConnection) MarkLacking(hash common.Hash) {
p.lock.Lock() p.lock.Lock()
defer p.lock.Unlock() defer p.lock.Unlock()
@ -326,7 +312,7 @@ func (p *peer) MarkLacking(hash common.Hash) {
// Lacks retrieves whether the hash of a blockchain item is on the peers lacking // Lacks retrieves whether the hash of a blockchain item is on the peers lacking
// list (i.e. whether we know that the peer does not have it). // list (i.e. whether we know that the peer does not have it).
func (p *peer) Lacks(hash common.Hash) bool { func (p *peerConnection) Lacks(hash common.Hash) bool {
p.lock.RLock() p.lock.RLock()
defer p.lock.RUnlock() defer p.lock.RUnlock()
@ -337,7 +323,7 @@ func (p *peer) Lacks(hash common.Hash) bool {
// peerSet represents the collection of active peer participating in the chain // peerSet represents the collection of active peer participating in the chain
// download procedure. // download procedure.
type peerSet struct { type peerSet struct {
peers map[string]*peer peers map[string]*peerConnection
newPeerFeed event.Feed newPeerFeed event.Feed
lock sync.RWMutex lock sync.RWMutex
} }
@ -345,11 +331,11 @@ type peerSet struct {
// newPeerSet creates a new peer set top track the active download sources. // newPeerSet creates a new peer set top track the active download sources.
func newPeerSet() *peerSet { func newPeerSet() *peerSet {
return &peerSet{ return &peerSet{
peers: make(map[string]*peer), peers: make(map[string]*peerConnection),
} }
} }
func (ps *peerSet) SubscribeNewPeers(ch chan<- *peer) event.Subscription { func (ps *peerSet) SubscribeNewPeers(ch chan<- *peerConnection) event.Subscription {
return ps.newPeerFeed.Subscribe(ch) return ps.newPeerFeed.Subscribe(ch)
} }
@ -370,7 +356,7 @@ func (ps *peerSet) Reset() {
// The method also sets the starting throughput values of the new peer to the // The method also sets the starting throughput values of the new peer to the
// average of all existing peers, to give it a realistic chance of being used // average of all existing peers, to give it a realistic chance of being used
// for data retrievals. // for data retrievals.
func (ps *peerSet) Register(p *peer) error { func (ps *peerSet) Register(p *peerConnection) error {
// Retrieve the current median RTT as a sane default // Retrieve the current median RTT as a sane default
p.rtt = ps.medianRTT() p.rtt = ps.medianRTT()
@ -417,7 +403,7 @@ func (ps *peerSet) Unregister(id string) error {
} }
// Peer retrieves the registered peer with the given id. // Peer retrieves the registered peer with the given id.
func (ps *peerSet) Peer(id string) *peer { func (ps *peerSet) Peer(id string) *peerConnection {
ps.lock.RLock() ps.lock.RLock()
defer ps.lock.RUnlock() defer ps.lock.RUnlock()
@ -433,11 +419,11 @@ func (ps *peerSet) Len() int {
} }
// AllPeers retrieves a flat list of all the peers within the set. // AllPeers retrieves a flat list of all the peers within the set.
func (ps *peerSet) AllPeers() []*peer { func (ps *peerSet) AllPeers() []*peerConnection {
ps.lock.RLock() ps.lock.RLock()
defer ps.lock.RUnlock() defer ps.lock.RUnlock()
list := make([]*peer, 0, len(ps.peers)) list := make([]*peerConnection, 0, len(ps.peers))
for _, p := range ps.peers { for _, p := range ps.peers {
list = append(list, p) list = append(list, p)
} }
@ -446,11 +432,11 @@ func (ps *peerSet) AllPeers() []*peer {
// HeaderIdlePeers retrieves a flat list of all the currently header-idle peers // HeaderIdlePeers retrieves a flat list of all the currently header-idle peers
// within the active peer set, ordered by their reputation. // within the active peer set, ordered by their reputation.
func (ps *peerSet) HeaderIdlePeers() ([]*peer, int) { func (ps *peerSet) HeaderIdlePeers() ([]*peerConnection, int) {
idle := func(p *peer) bool { idle := func(p *peerConnection) bool {
return atomic.LoadInt32(&p.headerIdle) == 0 return atomic.LoadInt32(&p.headerIdle) == 0
} }
throughput := func(p *peer) float64 { throughput := func(p *peerConnection) float64 {
p.lock.RLock() p.lock.RLock()
defer p.lock.RUnlock() defer p.lock.RUnlock()
return p.headerThroughput return p.headerThroughput
@ -460,11 +446,11 @@ func (ps *peerSet) HeaderIdlePeers() ([]*peer, int) {
// BodyIdlePeers retrieves a flat list of all the currently body-idle peers within // BodyIdlePeers retrieves a flat list of all the currently body-idle peers within
// the active peer set, ordered by their reputation. // the active peer set, ordered by their reputation.
func (ps *peerSet) BodyIdlePeers() ([]*peer, int) { func (ps *peerSet) BodyIdlePeers() ([]*peerConnection, int) {
idle := func(p *peer) bool { idle := func(p *peerConnection) bool {
return atomic.LoadInt32(&p.blockIdle) == 0 return atomic.LoadInt32(&p.blockIdle) == 0
} }
throughput := func(p *peer) float64 { throughput := func(p *peerConnection) float64 {
p.lock.RLock() p.lock.RLock()
defer p.lock.RUnlock() defer p.lock.RUnlock()
return p.blockThroughput return p.blockThroughput
@ -474,11 +460,11 @@ func (ps *peerSet) BodyIdlePeers() ([]*peer, int) {
// ReceiptIdlePeers retrieves a flat list of all the currently receipt-idle peers // ReceiptIdlePeers retrieves a flat list of all the currently receipt-idle peers
// within the active peer set, ordered by their reputation. // within the active peer set, ordered by their reputation.
func (ps *peerSet) ReceiptIdlePeers() ([]*peer, int) { func (ps *peerSet) ReceiptIdlePeers() ([]*peerConnection, int) {
idle := func(p *peer) bool { idle := func(p *peerConnection) bool {
return atomic.LoadInt32(&p.receiptIdle) == 0 return atomic.LoadInt32(&p.receiptIdle) == 0
} }
throughput := func(p *peer) float64 { throughput := func(p *peerConnection) float64 {
p.lock.RLock() p.lock.RLock()
defer p.lock.RUnlock() defer p.lock.RUnlock()
return p.receiptThroughput return p.receiptThroughput
@ -488,11 +474,11 @@ func (ps *peerSet) ReceiptIdlePeers() ([]*peer, int) {
// NodeDataIdlePeers retrieves a flat list of all the currently node-data-idle // NodeDataIdlePeers retrieves a flat list of all the currently node-data-idle
// peers within the active peer set, ordered by their reputation. // peers within the active peer set, ordered by their reputation.
func (ps *peerSet) NodeDataIdlePeers() ([]*peer, int) { func (ps *peerSet) NodeDataIdlePeers() ([]*peerConnection, int) {
idle := func(p *peer) bool { idle := func(p *peerConnection) bool {
return atomic.LoadInt32(&p.stateIdle) == 0 return atomic.LoadInt32(&p.stateIdle) == 0
} }
throughput := func(p *peer) float64 { throughput := func(p *peerConnection) float64 {
p.lock.RLock() p.lock.RLock()
defer p.lock.RUnlock() defer p.lock.RUnlock()
return p.stateThroughput return p.stateThroughput
@ -503,11 +489,11 @@ func (ps *peerSet) NodeDataIdlePeers() ([]*peer, int) {
// idlePeers retrieves a flat list of all currently idle peers satisfying the // idlePeers retrieves a flat list of all currently idle peers satisfying the
// protocol version constraints, using the provided function to check idleness. // protocol version constraints, using the provided function to check idleness.
// The resulting set of peers are sorted by their measure throughput. // The resulting set of peers are sorted by their measure throughput.
func (ps *peerSet) idlePeers(minProtocol, maxProtocol int, idleCheck func(*peer) bool, throughput func(*peer) float64) ([]*peer, int) { func (ps *peerSet) idlePeers(minProtocol, maxProtocol int, idleCheck func(*peerConnection) bool, throughput func(*peerConnection) float64) ([]*peerConnection, int) {
ps.lock.RLock() ps.lock.RLock()
defer ps.lock.RUnlock() defer ps.lock.RUnlock()
idle, total := make([]*peer, 0, len(ps.peers)), 0 idle, total := make([]*peerConnection, 0, len(ps.peers)), 0
for _, p := range ps.peers { for _, p := range ps.peers {
if p.version >= minProtocol && p.version <= maxProtocol { if p.version >= minProtocol && p.version <= maxProtocol {
if idleCheck(p) { if idleCheck(p) {

View File

@ -41,7 +41,7 @@ var (
// fetchRequest is a currently running data retrieval operation. // fetchRequest is a currently running data retrieval operation.
type fetchRequest struct { type fetchRequest struct {
Peer *peer // Peer to which the request was sent Peer *peerConnection // Peer to which the request was sent
From uint64 // [eth/62] Requested chain element index (used for skeleton fills only) From uint64 // [eth/62] Requested chain element index (used for skeleton fills only)
Hashes map[common.Hash]int // [eth/61] Requested hashes with their insertion index (priority) Hashes map[common.Hash]int // [eth/61] Requested hashes with their insertion index (priority)
Headers []*types.Header // [eth/62] Requested headers, sorted by request order Headers []*types.Header // [eth/62] Requested headers, sorted by request order
@ -391,7 +391,7 @@ func (q *queue) countProcessableItems() int {
// ReserveHeaders reserves a set of headers for the given peer, skipping any // ReserveHeaders reserves a set of headers for the given peer, skipping any
// previously failed batches. // previously failed batches.
func (q *queue) ReserveHeaders(p *peer, count int) *fetchRequest { func (q *queue) ReserveHeaders(p *peerConnection, count int) *fetchRequest {
q.lock.Lock() q.lock.Lock()
defer q.lock.Unlock() defer q.lock.Unlock()
@ -432,7 +432,7 @@ func (q *queue) ReserveHeaders(p *peer, count int) *fetchRequest {
// ReserveBodies reserves a set of body fetches for the given peer, skipping any // ReserveBodies reserves a set of body fetches for the given peer, skipping any
// previously failed downloads. Beside the next batch of needed fetches, it also // previously failed downloads. Beside the next batch of needed fetches, it also
// returns a flag whether empty blocks were queued requiring processing. // returns a flag whether empty blocks were queued requiring processing.
func (q *queue) ReserveBodies(p *peer, count int) (*fetchRequest, bool, error) { func (q *queue) ReserveBodies(p *peerConnection, count int) (*fetchRequest, bool, error) {
isNoop := func(header *types.Header) bool { isNoop := func(header *types.Header) bool {
return header.TxHash == types.EmptyRootHash && header.UncleHash == types.EmptyUncleHash return header.TxHash == types.EmptyRootHash && header.UncleHash == types.EmptyUncleHash
} }
@ -445,7 +445,7 @@ func (q *queue) ReserveBodies(p *peer, count int) (*fetchRequest, bool, error) {
// ReserveReceipts reserves a set of receipt fetches for the given peer, skipping // ReserveReceipts reserves a set of receipt fetches for the given peer, skipping
// any previously failed downloads. Beside the next batch of needed fetches, it // any previously failed downloads. Beside the next batch of needed fetches, it
// also returns a flag whether empty receipts were queued requiring importing. // also returns a flag whether empty receipts were queued requiring importing.
func (q *queue) ReserveReceipts(p *peer, count int) (*fetchRequest, bool, error) { func (q *queue) ReserveReceipts(p *peerConnection, count int) (*fetchRequest, bool, error) {
isNoop := func(header *types.Header) bool { isNoop := func(header *types.Header) bool {
return header.ReceiptHash == types.EmptyRootHash return header.ReceiptHash == types.EmptyRootHash
} }
@ -462,7 +462,7 @@ func (q *queue) ReserveReceipts(p *peer, count int) (*fetchRequest, bool, error)
// Note, this method expects the queue lock to be already held for writing. The // Note, this method expects the queue lock to be already held for writing. The
// reason the lock is not obtained in here is because the parameters already need // reason the lock is not obtained in here is because the parameters already need
// to access the queue, so they already need a lock anyway. // to access the queue, so they already need a lock anyway.
func (q *queue) reserveHeaders(p *peer, count int, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque, func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque,
pendPool map[string]*fetchRequest, donePool map[common.Hash]struct{}, isNoop func(*types.Header) bool) (*fetchRequest, bool, error) { pendPool map[string]*fetchRequest, donePool map[common.Hash]struct{}, isNoop func(*types.Header) bool) (*fetchRequest, bool, error) {
// Short circuit if the pool has been depleted, or if the peer's already // Short circuit if the pool has been depleted, or if the peer's already
// downloading something (sanity check not to corrupt state) // downloading something (sanity check not to corrupt state)

View File

@ -37,7 +37,7 @@ type stateReq struct {
tasks map[common.Hash]*stateTask // Download tasks to track previous attempts tasks map[common.Hash]*stateTask // Download tasks to track previous attempts
timeout time.Duration // Maximum round trip time for this to complete timeout time.Duration // Maximum round trip time for this to complete
timer *time.Timer // Timer to fire when the RTT timeout expires timer *time.Timer // Timer to fire when the RTT timeout expires
peer *peer // Peer that we're requesting from peer *peerConnection // Peer that we're requesting from
response [][]byte // Response data of the peer (nil for timeouts) response [][]byte // Response data of the peer (nil for timeouts)
} }
@ -246,7 +246,7 @@ func (s *stateSync) Cancel() error {
// and timeouts. // and timeouts.
func (s *stateSync) loop() error { func (s *stateSync) loop() error {
// Listen for new peer events to assign tasks to them // Listen for new peer events to assign tasks to them
newPeer := make(chan *peer, 1024) newPeer := make(chan *peerConnection, 1024)
peerSub := s.d.peers.SubscribeNewPeers(newPeer) peerSub := s.d.peers.SubscribeNewPeers(newPeer)
defer peerSub.Unsubscribe() defer peerSub.Unsubscribe()

View File

@ -265,7 +265,7 @@ func (pm *ProtocolManager) handle(p *peer) error {
defer pm.removePeer(p.id) defer pm.removePeer(p.id)
// Register the peer in the downloader. If the downloader considers it banned, we disconnect // Register the peer in the downloader. If the downloader considers it banned, we disconnect
if err := pm.downloader.RegisterPeer(p.id, p.version, p.Head, p.RequestHeadersByHash, p.RequestHeadersByNumber, p.RequestBodies, p.RequestReceipts, p.RequestNodeData); err != nil { if err := pm.downloader.RegisterPeer(p.id, p.version, p); err != nil {
return err return err
} }
// Propagate existing transactions. new transactions appearing // Propagate existing transactions. new transactions appearing

View File

@ -838,57 +838,83 @@ func (self *ProtocolManager) NodeInfo() *eth.EthNodeInfo {
// downloaderPeerNotify implements peerSetNotify // downloaderPeerNotify implements peerSetNotify
type downloaderPeerNotify ProtocolManager type downloaderPeerNotify ProtocolManager
type peerConnection struct {
manager *ProtocolManager
peer *peer
}
func (pc *peerConnection) Head() (common.Hash, *big.Int) {
return pc.peer.HeadAndTd()
}
func (pc *peerConnection) RequestHeadersByHash(origin common.Hash, amount int, skip int, reverse bool) error {
reqID := genReqID()
rq := &distReq{
getCost: func(dp distPeer) uint64 {
peer := dp.(*peer)
return peer.GetRequestCost(GetBlockHeadersMsg, amount)
},
canSend: func(dp distPeer) bool {
return dp.(*peer) == pc.peer
},
request: func(dp distPeer) func() {
peer := dp.(*peer)
cost := peer.GetRequestCost(GetBlockHeadersMsg, amount)
peer.fcServer.QueueRequest(reqID, cost)
return func() { peer.RequestHeadersByHash(reqID, cost, origin, amount, skip, reverse) }
},
}
_, ok := <-pc.manager.reqDist.queue(rq)
if !ok {
return ErrNoPeers
}
return nil
}
func (pc *peerConnection) RequestHeadersByNumber(origin uint64, amount int, skip int, reverse bool) error {
reqID := genReqID()
rq := &distReq{
getCost: func(dp distPeer) uint64 {
peer := dp.(*peer)
return peer.GetRequestCost(GetBlockHeadersMsg, amount)
},
canSend: func(dp distPeer) bool {
return dp.(*peer) == pc.peer
},
request: func(dp distPeer) func() {
peer := dp.(*peer)
cost := peer.GetRequestCost(GetBlockHeadersMsg, amount)
peer.fcServer.QueueRequest(reqID, cost)
return func() { peer.RequestHeadersByNumber(reqID, cost, origin, amount, skip, reverse) }
},
}
_, ok := <-pc.manager.reqDist.queue(rq)
if !ok {
return ErrNoPeers
}
return nil
}
func (pc *peerConnection) RequestBodies(hashes []common.Hash) error {
panic("RequestBodies not supported in light client mode sync")
}
func (pc *peerConnection) RequestReceipts(hashes []common.Hash) error {
panic("RequestReceipts not supported in light client mode sync")
}
func (pc *peerConnection) RequestNodeData(hashes []common.Hash) error {
panic("RequestNodeData not supported in light client mode sync")
}
func (d *downloaderPeerNotify) registerPeer(p *peer) { func (d *downloaderPeerNotify) registerPeer(p *peer) {
pm := (*ProtocolManager)(d) pm := (*ProtocolManager)(d)
pc := &peerConnection{
requestHeadersByHash := func(origin common.Hash, amount int, skip int, reverse bool) error { manager: pm,
reqID := genReqID() peer: p,
rq := &distReq{
getCost: func(dp distPeer) uint64 {
peer := dp.(*peer)
return peer.GetRequestCost(GetBlockHeadersMsg, amount)
},
canSend: func(dp distPeer) bool {
return dp.(*peer) == p
},
request: func(dp distPeer) func() {
peer := dp.(*peer)
cost := peer.GetRequestCost(GetBlockHeadersMsg, amount)
peer.fcServer.QueueRequest(reqID, cost)
return func() { peer.RequestHeadersByHash(reqID, cost, origin, amount, skip, reverse) }
},
}
_, ok := <-pm.reqDist.queue(rq)
if !ok {
return ErrNoPeers
}
return nil
} }
requestHeadersByNumber := func(origin uint64, amount int, skip int, reverse bool) error { pm.downloader.RegisterPeer(p.id, ethVersion, pc)
reqID := genReqID()
rq := &distReq{
getCost: func(dp distPeer) uint64 {
peer := dp.(*peer)
return peer.GetRequestCost(GetBlockHeadersMsg, amount)
},
canSend: func(dp distPeer) bool {
return dp.(*peer) == p
},
request: func(dp distPeer) func() {
peer := dp.(*peer)
cost := peer.GetRequestCost(GetBlockHeadersMsg, amount)
peer.fcServer.QueueRequest(reqID, cost)
return func() { peer.RequestHeadersByNumber(reqID, cost, origin, amount, skip, reverse) }
},
}
_, ok := <-pm.reqDist.queue(rq)
if !ok {
return ErrNoPeers
}
return nil
}
pm.downloader.RegisterPeer(p.id, ethVersion, p.HeadAndTd, requestHeadersByHash, requestHeadersByNumber, nil, nil, nil)
} }
func (d *downloaderPeerNotify) unregisterPeer(p *peer) { func (d *downloaderPeerNotify) unregisterPeer(p *peer) {