les: implement request distributor, fix blocking issues (#3660)

* les: implement request distributor, fix blocking issues
* core: moved header validation before chain mutex lock
This commit is contained in:
Felföldi Zsolt 2017-03-22 20:44:22 +01:00 committed by Felix Lange
parent 1c1dc0e0fc
commit 525116dbff
19 changed files with 875 additions and 320 deletions

View File

@ -1313,6 +1313,11 @@ Error: %v
// of the header retrieval mechanisms already need to verify nonces, as well as // of the header retrieval mechanisms already need to verify nonces, as well as
// because nonces can be verified sparsely, not needing to check each. // because nonces can be verified sparsely, not needing to check each.
func (self *BlockChain) InsertHeaderChain(chain []*types.Header, checkFreq int) (int, error) { func (self *BlockChain) InsertHeaderChain(chain []*types.Header, checkFreq int) (int, error) {
start := time.Now()
if i, err := self.hc.ValidateHeaderChain(chain, checkFreq); err != nil {
return i, err
}
// Make sure only one thread manipulates the chain at once // Make sure only one thread manipulates the chain at once
self.chainmu.Lock() self.chainmu.Lock()
defer self.chainmu.Unlock() defer self.chainmu.Unlock()
@ -1328,7 +1333,7 @@ func (self *BlockChain) InsertHeaderChain(chain []*types.Header, checkFreq int)
return err return err
} }
return self.hc.InsertHeaderChain(chain, checkFreq, whFunc) return self.hc.InsertHeaderChain(chain, whFunc, start)
} }
// writeHeader writes a header into the local chain, given that its parent is // writeHeader writes a header into the local chain, given that its parent is

View File

@ -219,7 +219,8 @@ type WhCallback func(*types.Header) error
// should be done or not. The reason behind the optional check is because some // should be done or not. The reason behind the optional check is because some
// of the header retrieval mechanisms already need to verfy nonces, as well as // of the header retrieval mechanisms already need to verfy nonces, as well as
// because nonces can be verified sparsely, not needing to check each. // because nonces can be verified sparsely, not needing to check each.
func (hc *HeaderChain) InsertHeaderChain(chain []*types.Header, checkFreq int, writeHeader WhCallback) (int, error) {
func (hc *HeaderChain) ValidateHeaderChain(chain []*types.Header, checkFreq int) (int, error) {
// Do a sanity check that the provided chain is actually ordered and linked // Do a sanity check that the provided chain is actually ordered and linked
for i := 1; i < len(chain); i++ { for i := 1; i < len(chain); i++ {
if chain[i].Number.Uint64() != chain[i-1].Number.Uint64()+1 || chain[i].ParentHash != chain[i-1].Hash() { if chain[i].Number.Uint64() != chain[i-1].Number.Uint64()+1 || chain[i].ParentHash != chain[i-1].Hash() {
@ -231,9 +232,6 @@ func (hc *HeaderChain) InsertHeaderChain(chain []*types.Header, checkFreq int, w
chain[i-1].Hash().Bytes()[:4], i, chain[i].Number, chain[i].Hash().Bytes()[:4], chain[i].ParentHash[:4]) chain[i-1].Hash().Bytes()[:4], i, chain[i].Number, chain[i].Hash().Bytes()[:4], chain[i].ParentHash[:4])
} }
} }
// Collect some import statistics to report on
stats := struct{ processed, ignored int }{}
start := time.Now()
// Generate the list of headers that should be POW verified // Generate the list of headers that should be POW verified
verify := make([]bool, len(chain)) verify := make([]bool, len(chain))
@ -309,6 +307,13 @@ func (hc *HeaderChain) InsertHeaderChain(chain []*types.Header, checkFreq int, w
} }
} }
} }
return 0, nil
}
func (hc *HeaderChain) InsertHeaderChain(chain []*types.Header, writeHeader WhCallback, start time.Time) (int, error) {
// Collect some import statistics to report on
stats := struct{ processed, ignored int }{}
// All headers passed verification, import them into the database // All headers passed verification, import them into the database
for i, header := range chain { for i, header := range chain {
// Short circuit insertion if shutting down // Short circuit insertion if shutting down

View File

@ -107,6 +107,8 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) {
if eth.protocolManager, err = NewProtocolManager(eth.chainConfig, config.LightMode, config.NetworkId, eth.eventMux, eth.pow, eth.blockchain, nil, chainDb, odr, relay); err != nil { if eth.protocolManager, err = NewProtocolManager(eth.chainConfig, config.LightMode, config.NetworkId, eth.eventMux, eth.pow, eth.blockchain, nil, chainDb, odr, relay); err != nil {
return nil, err return nil, err
} }
relay.ps = eth.protocolManager.peers
relay.reqDist = eth.protocolManager.reqDist
eth.ApiBackend = &LesApiBackend{eth, nil} eth.ApiBackend = &LesApiBackend{eth, nil}
eth.ApiBackend.gpo = gasprice.NewLightPriceOracle(eth.ApiBackend) eth.ApiBackend.gpo = gasprice.NewLightPriceOracle(eth.ApiBackend)

259
les/distributor.go Normal file
View File

@ -0,0 +1,259 @@
// Copyright 2016 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
// Package light implements on-demand retrieval capable state and chain objects
// for the Ethereum Light Client.
package les
import (
"container/list"
"errors"
"sync"
"time"
)
// ErrNoPeers is returned if no peers capable of serving a queued request are available
var ErrNoPeers = errors.New("no suitable peers available")
// requestDistributor implements a mechanism that distributes requests to
// suitable peers, obeying flow control rules and prioritizing them in creation
// order (even when a resend is necessary).
type requestDistributor struct {
reqQueue *list.List
lastReqOrder uint64
stopChn, loopChn chan struct{}
loopNextSent bool
lock sync.Mutex
getAllPeers func() map[distPeer]struct{}
}
// distPeer is an LES server peer interface for the request distributor.
// waitBefore returns either the necessary waiting time before sending a request
// with the given upper estimated cost or the estimated remaining relative buffer
// value after sending such a request (in which case the request can be sent
// immediately). At least one of these values is always zero.
type distPeer interface {
waitBefore(uint64) (time.Duration, float64)
canQueue() bool
queueSend(f func())
}
// distReq is the request abstraction used by the distributor. It is based on
// three callback functions:
// - getCost returns the upper estimate of the cost of sending the request to a given peer
// - canSend tells if the server peer is suitable to serve the request
// - request prepares sending the request to the given peer and returns a function that
// does the actual sending. Request order should be preserved but the callback itself should not
// block until it is sent because other peers might still be able to receive requests while
// one of them is blocking. Instead, the returned function is put in the peer's send queue.
type distReq struct {
getCost func(distPeer) uint64
canSend func(distPeer) bool
request func(distPeer) func()
reqOrder uint64
sentChn chan distPeer
element *list.Element
}
// newRequestDistributor creates a new request distributor
func newRequestDistributor(getAllPeers func() map[distPeer]struct{}, stopChn chan struct{}) *requestDistributor {
r := &requestDistributor{
reqQueue: list.New(),
loopChn: make(chan struct{}, 2),
stopChn: stopChn,
getAllPeers: getAllPeers,
}
go r.loop()
return r
}
// distMaxWait is the maximum waiting time after which further necessary waiting
// times are recalculated based on new feedback from the servers
const distMaxWait = time.Millisecond * 10
// main event loop
func (d *requestDistributor) loop() {
for {
select {
case <-d.stopChn:
d.lock.Lock()
elem := d.reqQueue.Front()
for elem != nil {
close(elem.Value.(*distReq).sentChn)
elem = elem.Next()
}
d.lock.Unlock()
return
case <-d.loopChn:
d.lock.Lock()
d.loopNextSent = false
loop:
for {
peer, req, wait := d.nextRequest()
if req != nil && wait == 0 {
chn := req.sentChn // save sentChn because remove sets it to nil
d.remove(req)
send := req.request(peer)
if send != nil {
peer.queueSend(send)
}
chn <- peer
close(chn)
} else {
if wait == 0 {
// no request to send and nothing to wait for; the next
// queued request will wake up the loop
break loop
}
d.loopNextSent = true // a "next" signal has been sent, do not send another one until this one has been received
if wait > distMaxWait {
// waiting times may be reduced by incoming request replies, if it is too long, recalculate it periodically
wait = distMaxWait
}
go func() {
time.Sleep(wait)
d.loopChn <- struct{}{}
}()
break loop
}
}
d.lock.Unlock()
}
}
}
// selectPeerItem represents a peer to be selected for a request by weightedRandomSelect
type selectPeerItem struct {
peer distPeer
req *distReq
weight int64
}
// Weight implements wrsItem interface
func (sp selectPeerItem) Weight() int64 {
return sp.weight
}
// nextRequest returns the next possible request from any peer, along with the
// associated peer and necessary waiting time
func (d *requestDistributor) nextRequest() (distPeer, *distReq, time.Duration) {
peers := d.getAllPeers()
elem := d.reqQueue.Front()
var (
bestPeer distPeer
bestReq *distReq
bestWait time.Duration
sel *weightedRandomSelect
)
for (len(peers) > 0 || elem == d.reqQueue.Front()) && elem != nil {
req := elem.Value.(*distReq)
canSend := false
for peer, _ := range peers {
if peer.canQueue() && req.canSend(peer) {
canSend = true
cost := req.getCost(peer)
wait, bufRemain := peer.waitBefore(cost)
if wait == 0 {
if sel == nil {
sel = newWeightedRandomSelect()
}
sel.update(selectPeerItem{peer: peer, req: req, weight: int64(bufRemain*1000000) + 1})
} else {
if bestReq == nil || wait < bestWait {
bestPeer = peer
bestReq = req
bestWait = wait
}
}
delete(peers, peer)
}
}
next := elem.Next()
if !canSend && elem == d.reqQueue.Front() {
close(req.sentChn)
d.remove(req)
}
elem = next
}
if sel != nil {
c := sel.choose().(selectPeerItem)
return c.peer, c.req, 0
}
return bestPeer, bestReq, bestWait
}
// queue adds a request to the distribution queue, returns a channel where the
// receiving peer is sent once the request has been sent (request callback returned).
// If the request is cancelled or timed out without suitable peers, the channel is
// closed without sending any peer references to it.
func (d *requestDistributor) queue(r *distReq) chan distPeer {
d.lock.Lock()
defer d.lock.Unlock()
if r.reqOrder == 0 {
d.lastReqOrder++
r.reqOrder = d.lastReqOrder
}
back := d.reqQueue.Back()
if back == nil || r.reqOrder > back.Value.(*distReq).reqOrder {
r.element = d.reqQueue.PushBack(r)
} else {
before := d.reqQueue.Front()
for before.Value.(*distReq).reqOrder < r.reqOrder {
before = before.Next()
}
r.element = d.reqQueue.InsertBefore(r, before)
}
if !d.loopNextSent {
d.loopNextSent = true
d.loopChn <- struct{}{}
}
r.sentChn = make(chan distPeer, 1)
return r.sentChn
}
// cancel removes a request from the queue if it has not been sent yet (returns
// false if it has been sent already). It is guaranteed that the callback functions
// will not be called after cancel returns.
func (d *requestDistributor) cancel(r *distReq) bool {
d.lock.Lock()
defer d.lock.Unlock()
if r.sentChn == nil {
return false
}
close(r.sentChn)
d.remove(r)
return true
}
// remove removes a request from the queue
func (d *requestDistributor) remove(r *distReq) {
r.sentChn = nil
if r.element != nil {
d.reqQueue.Remove(r.element)
r.element = nil
}
}

192
les/distributor_test.go Normal file
View File

@ -0,0 +1,192 @@
// Copyright 2016 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
// Package light implements on-demand retrieval capable state and chain objects
// for the Ethereum Light Client.
package les
import (
"math/rand"
"sync"
"testing"
"time"
)
type testDistReq struct {
cost, procTime, order uint64
canSendTo map[*testDistPeer]struct{}
}
func (r *testDistReq) getCost(dp distPeer) uint64 {
return r.cost
}
func (r *testDistReq) canSend(dp distPeer) bool {
_, ok := r.canSendTo[dp.(*testDistPeer)]
return ok
}
func (r *testDistReq) request(dp distPeer) func() {
return func() { dp.(*testDistPeer).send(r) }
}
type testDistPeer struct {
sent []*testDistReq
sumCost uint64
lock sync.RWMutex
}
func (p *testDistPeer) send(r *testDistReq) {
p.lock.Lock()
defer p.lock.Unlock()
p.sent = append(p.sent, r)
p.sumCost += r.cost
}
func (p *testDistPeer) worker(t *testing.T, checkOrder bool, stop chan struct{}) {
var last uint64
for {
wait := time.Millisecond
p.lock.Lock()
if len(p.sent) > 0 {
rq := p.sent[0]
wait = time.Duration(rq.procTime)
p.sumCost -= rq.cost
if checkOrder {
if rq.order <= last {
t.Errorf("Requests processed in wrong order")
}
last = rq.order
}
p.sent = p.sent[1:]
}
p.lock.Unlock()
select {
case <-stop:
return
case <-time.After(wait):
}
}
}
const (
testDistBufLimit = 10000000
testDistMaxCost = 1000000
testDistPeerCount = 5
testDistReqCount = 50000
testDistMaxResendCount = 3
)
func (p *testDistPeer) waitBefore(cost uint64) (time.Duration, float64) {
p.lock.RLock()
sumCost := p.sumCost + cost
p.lock.RUnlock()
if sumCost < testDistBufLimit {
return 0, float64(testDistBufLimit-sumCost) / float64(testDistBufLimit)
} else {
return time.Duration(sumCost - testDistBufLimit), 0
}
}
func (p *testDistPeer) canQueue() bool {
return true
}
func (p *testDistPeer) queueSend(f func()) {
f()
}
func TestRequestDistributor(t *testing.T) {
testRequestDistributor(t, false)
}
func TestRequestDistributorResend(t *testing.T) {
testRequestDistributor(t, true)
}
func testRequestDistributor(t *testing.T, resend bool) {
stop := make(chan struct{})
defer close(stop)
var peers [testDistPeerCount]*testDistPeer
for i, _ := range peers {
peers[i] = &testDistPeer{}
go peers[i].worker(t, !resend, stop)
}
dist := newRequestDistributor(func() map[distPeer]struct{} {
m := make(map[distPeer]struct{})
for _, peer := range peers {
m[peer] = struct{}{}
}
return m
}, stop)
var wg sync.WaitGroup
for i := 1; i <= testDistReqCount; i++ {
cost := uint64(rand.Int63n(testDistMaxCost))
procTime := uint64(rand.Int63n(int64(cost + 1)))
rq := &testDistReq{
cost: cost,
procTime: procTime,
order: uint64(i),
canSendTo: make(map[*testDistPeer]struct{}),
}
for _, peer := range peers {
if rand.Intn(2) != 0 {
rq.canSendTo[peer] = struct{}{}
}
}
wg.Add(1)
req := &distReq{
getCost: rq.getCost,
canSend: rq.canSend,
request: rq.request,
}
chn := dist.queue(req)
go func() {
cnt := 1
if resend && len(rq.canSendTo) != 0 {
cnt = rand.Intn(testDistMaxResendCount) + 1
}
for i := 0; i < cnt; i++ {
if i != 0 {
chn = dist.queue(req)
}
p := <-chn
if p == nil {
if len(rq.canSendTo) != 0 {
t.Errorf("Request that could have been sent was dropped")
}
} else {
peer := p.(*testDistPeer)
if _, ok := rq.canSendTo[peer]; !ok {
t.Errorf("Request sent to wrong peer")
}
}
}
wg.Done()
}()
if rand.Intn(1000) == 0 {
time.Sleep(time.Duration(rand.Intn(5000000)))
}
}
wg.Wait()
}

71
les/execqueue.go Normal file
View File

@ -0,0 +1,71 @@
// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package les
import (
"sync/atomic"
)
// ExecQueue implements a queue that executes function calls in a single thread,
// in the same order as they have been queued.
type execQueue struct {
chn chan func()
cnt, stop, capacity int32
}
// NewExecQueue creates a new execution queue.
func newExecQueue(capacity int32) *execQueue {
q := &execQueue{
chn: make(chan func(), capacity),
capacity: capacity,
}
go q.loop()
return q
}
func (q *execQueue) loop() {
for f := range q.chn {
atomic.AddInt32(&q.cnt, -1)
if atomic.LoadInt32(&q.stop) != 0 {
return
}
f()
}
}
// CanQueue returns true if more function calls can be added to the execution queue.
func (q *execQueue) canQueue() bool {
return atomic.LoadInt32(&q.stop) == 0 && atomic.LoadInt32(&q.cnt) < q.capacity
}
// Queue adds a function call to the execution queue. Returns true if successful.
func (q *execQueue) queue(f func()) bool {
if atomic.LoadInt32(&q.stop) != 0 {
return false
}
if atomic.AddInt32(&q.cnt, 1) > q.capacity {
atomic.AddInt32(&q.cnt, -1)
return false
}
q.chn <- f
return true
}
// Stop stops the exec queue.
func (q *execQueue) quit() {
atomic.StoreInt32(&q.stop, 1)
}

View File

@ -135,11 +135,24 @@ func (f *lightFetcher) syncLoop() {
f.lock.Lock() f.lock.Lock()
s := requesting s := requesting
requesting = false requesting = false
var (
rq *distReq
reqID uint64
)
if !f.syncing && !(newAnnounce && s) { if !f.syncing && !(newAnnounce && s) {
reqID := getNextReqID() rq, reqID = f.nextRequest()
if peer, node, amount, retry := f.nextRequest(reqID); node != nil { }
syncing := f.syncing
f.lock.Unlock()
if rq != nil {
requesting = true requesting = true
if reqID, ok := f.request(peer, reqID, node, amount); ok { _, ok := <-f.pm.reqDist.queue(rq)
if !ok {
f.requestChn <- false
}
if !syncing {
go func() { go func() {
time.Sleep(softRequestTimeout) time.Sleep(softRequestTimeout)
f.reqMu.Lock() f.reqMu.Lock()
@ -153,17 +166,7 @@ func (f *lightFetcher) syncLoop() {
f.requestChn <- false f.requestChn <- false
}() }()
} }
} else {
if retry {
requesting = true
go func() {
time.Sleep(time.Millisecond * 100)
f.requestChn <- false
}()
} }
}
}
f.lock.Unlock()
case reqID := <-f.timeoutChn: case reqID := <-f.timeoutChn:
f.reqMu.Lock() f.reqMu.Lock()
req, ok := f.requested[reqID] req, ok := f.requested[reqID]
@ -334,6 +337,12 @@ func (f *lightFetcher) peerHasBlock(p *peer, hash common.Hash, number uint64) bo
f.lock.Lock() f.lock.Lock()
defer f.lock.Unlock() defer f.lock.Unlock()
if f.syncing {
// always return true when syncing
// false positives are acceptable, a more sophisticated condition can be implemented later
return true
}
fp := f.peers[p] fp := f.peers[p]
if fp == nil || fp.root == nil { if fp == nil || fp.root == nil {
return false return false
@ -346,43 +355,13 @@ func (f *lightFetcher) peerHasBlock(p *peer, hash common.Hash, number uint64) bo
f.chain.LockChain() f.chain.LockChain()
defer f.chain.UnlockChain() defer f.chain.UnlockChain()
// if it's older than the peer's block tree root but it's in the same canonical chain // if it's older than the peer's block tree root but it's in the same canonical chain
// than the root, we can still be sure the peer knows it // as the root, we can still be sure the peer knows it
//
// when syncing, just check if it is part of the known chain, there is nothing better we
// can do since we do not know the most recent block hash yet
return core.GetCanonicalHash(f.pm.chainDb, fp.root.number) == fp.root.hash && core.GetCanonicalHash(f.pm.chainDb, number) == hash return core.GetCanonicalHash(f.pm.chainDb, fp.root.number) == fp.root.hash && core.GetCanonicalHash(f.pm.chainDb, number) == hash
} }
// request initiates a header download request from a certain peer
func (f *lightFetcher) request(p *peer, reqID uint64, n *fetcherTreeNode, amount uint64) (uint64, bool) {
fp := f.peers[p]
if fp == nil {
p.Log().Debug("Requesting from unknown peer")
p.fcServer.DeassignRequest(reqID)
return 0, false
}
if fp.bestConfirmed == nil || fp.root == nil || !f.checkKnownNode(p, fp.root) {
f.syncing = true
go func() {
p.Log().Debug("Synchronisation started")
f.pm.synchronise(p)
f.syncDone <- p
}()
p.fcServer.DeassignRequest(reqID)
return 0, false
}
n.requested = true
cost := p.GetRequestCost(GetBlockHeadersMsg, int(amount))
p.fcServer.SendRequest(reqID, cost)
f.reqMu.Lock()
f.requested[reqID] = fetchRequest{hash: n.hash, amount: amount, peer: p, sent: mclock.Now()}
f.reqMu.Unlock()
go p.RequestHeadersByHash(reqID, cost, n.hash, int(amount), 0, true)
go func() {
time.Sleep(hardRequestTimeout)
f.timeoutChn <- reqID
}()
return reqID, true
}
// requestAmount calculates the amount of headers to be downloaded starting // requestAmount calculates the amount of headers to be downloaded starting
// from a certain head backwards // from a certain head backwards
func (f *lightFetcher) requestAmount(p *peer, n *fetcherTreeNode) uint64 { func (f *lightFetcher) requestAmount(p *peer, n *fetcherTreeNode) uint64 {
@ -408,12 +387,13 @@ func (f *lightFetcher) requestedID(reqID uint64) bool {
// nextRequest selects the peer and announced head to be requested next, amount // nextRequest selects the peer and announced head to be requested next, amount
// to be downloaded starting from the head backwards is also returned // to be downloaded starting from the head backwards is also returned
func (f *lightFetcher) nextRequest(reqID uint64) (*peer, *fetcherTreeNode, uint64, bool) { func (f *lightFetcher) nextRequest() (*distReq, uint64) {
var ( var (
bestHash common.Hash bestHash common.Hash
bestAmount uint64 bestAmount uint64
) )
bestTd := f.maxConfirmedTd bestTd := f.maxConfirmedTd
bestSyncing := false
for p, fp := range f.peers { for p, fp := range f.peers {
for hash, n := range fp.nodeByHash { for hash, n := range fp.nodeByHash {
@ -423,29 +403,83 @@ func (f *lightFetcher) nextRequest(reqID uint64) (*peer, *fetcherTreeNode, uint6
bestHash = hash bestHash = hash
bestAmount = amount bestAmount = amount
bestTd = n.td bestTd = n.td
bestSyncing = fp.bestConfirmed == nil || fp.root == nil || !f.checkKnownNode(p, fp.root)
} }
} }
} }
} }
if bestTd == f.maxConfirmedTd { if bestTd == f.maxConfirmedTd {
return nil, nil, 0, false return nil, 0
} }
peer, _, locked := f.pm.serverPool.selectPeer(reqID, func(p *peer) (bool, time.Duration) { f.syncing = bestSyncing
var rq *distReq
reqID := getNextReqID()
if f.syncing {
rq = &distReq{
getCost: func(dp distPeer) uint64 {
return 0
},
canSend: func(dp distPeer) bool {
p := dp.(*peer)
fp := f.peers[p] fp := f.peers[p]
if fp == nil || fp.nodeByHash[bestHash] == nil { return fp != nil && fp.nodeByHash[bestHash] != nil
return false, 0 },
request: func(dp distPeer) func() {
go func() {
p := dp.(*peer)
p.Log().Debug("Synchronisation started")
f.pm.synchronise(p)
f.syncDone <- p
}()
return nil
},
} }
return true, p.fcServer.CanSend(p.GetRequestCost(GetBlockHeadersMsg, int(bestAmount))) } else {
}) rq = &distReq{
if !locked { getCost: func(dp distPeer) uint64 {
return nil, nil, 0, true p := dp.(*peer)
return p.GetRequestCost(GetBlockHeadersMsg, int(bestAmount))
},
canSend: func(dp distPeer) bool {
p := dp.(*peer)
f.lock.Lock()
defer f.lock.Unlock()
fp := f.peers[p]
if fp == nil {
return false
} }
var node *fetcherTreeNode n := fp.nodeByHash[bestHash]
if peer != nil { return n != nil && !n.requested
node = f.peers[peer].nodeByHash[bestHash] },
request: func(dp distPeer) func() {
p := dp.(*peer)
f.lock.Lock()
fp := f.peers[p]
if fp != nil {
n := fp.nodeByHash[bestHash]
if n != nil {
n.requested = true
} }
return peer, node, bestAmount, false }
f.lock.Unlock()
cost := p.GetRequestCost(GetBlockHeadersMsg, int(bestAmount))
p.fcServer.QueueRequest(reqID, cost)
f.reqMu.Lock()
f.requested[reqID] = fetchRequest{hash: bestHash, amount: bestAmount, peer: p, sent: mclock.Now()}
f.reqMu.Unlock()
go func() {
time.Sleep(hardRequestTimeout)
f.timeoutChn <- reqID
}()
return func() { p.RequestHeadersByHash(reqID, cost, bestHash, int(bestAmount), 0, true) }
},
}
}
return rq, reqID
} }
// deliverHeaders delivers header download request responses for processing // deliverHeaders delivers header download request responses for processing

View File

@ -99,8 +99,6 @@ type ServerNode struct {
params *ServerParams params *ServerParams
sumCost uint64 // sum of req costs sent to this server sumCost uint64 // sum of req costs sent to this server
pending map[uint64]uint64 // value = sumCost after sending the given req pending map[uint64]uint64 // value = sumCost after sending the given req
assignedRequest uint64 // when != 0, only the request with the given ID can be sent to this peer
assignToken chan struct{} // send to this channel before assigning, read from it after deassigning
lock sync.RWMutex lock sync.RWMutex
} }
@ -110,7 +108,6 @@ func NewServerNode(params *ServerParams) *ServerNode {
lastTime: mclock.Now(), lastTime: mclock.Now(),
params: params, params: params,
pending: make(map[uint64]uint64), pending: make(map[uint64]uint64),
assignToken: make(chan struct{}, 1),
} }
} }
@ -127,94 +124,37 @@ func (peer *ServerNode) recalcBLE(time mclock.AbsTime) {
} }
// safetyMargin is added to the flow control waiting time when estimated buffer value is low // safetyMargin is added to the flow control waiting time when estimated buffer value is low
const safetyMargin = time.Millisecond * 200 const safetyMargin = time.Millisecond
func (peer *ServerNode) canSend(maxCost uint64) time.Duration { func (peer *ServerNode) canSend(maxCost uint64) (time.Duration, float64) {
peer.recalcBLE(mclock.Now())
maxCost += uint64(safetyMargin) * peer.params.MinRecharge / uint64(fcTimeConst) maxCost += uint64(safetyMargin) * peer.params.MinRecharge / uint64(fcTimeConst)
if maxCost > peer.params.BufLimit { if maxCost > peer.params.BufLimit {
maxCost = peer.params.BufLimit maxCost = peer.params.BufLimit
} }
if peer.bufEstimate >= maxCost { if peer.bufEstimate >= maxCost {
return 0 return 0, float64(peer.bufEstimate-maxCost) / float64(peer.params.BufLimit)
} }
return time.Duration((maxCost - peer.bufEstimate) * uint64(fcTimeConst) / peer.params.MinRecharge) return time.Duration((maxCost - peer.bufEstimate) * uint64(fcTimeConst) / peer.params.MinRecharge), 0
} }
// CanSend returns the minimum waiting time required before sending a request // CanSend returns the minimum waiting time required before sending a request
// with the given maximum estimated cost // with the given maximum estimated cost. Second return value is the relative
func (peer *ServerNode) CanSend(maxCost uint64) time.Duration { // estimated buffer level after sending the request (divided by BufLimit).
func (peer *ServerNode) CanSend(maxCost uint64) (time.Duration, float64) {
peer.lock.RLock() peer.lock.RLock()
defer peer.lock.RUnlock() defer peer.lock.RUnlock()
return peer.canSend(maxCost) return peer.canSend(maxCost)
} }
// AssignRequest tries to assign the server node to the given request, guaranteeing // QueueRequest should be called when the request has been assigned to the given
// that once it returns true, no request will be sent to the node before this one // server node, before putting it in the send queue. It is mandatory that requests
func (peer *ServerNode) AssignRequest(reqID uint64) bool { // are sent in the same order as the QueueRequest calls are made.
select { func (peer *ServerNode) QueueRequest(reqID, maxCost uint64) {
case peer.assignToken <- struct{}{}:
default:
return false
}
peer.lock.Lock()
peer.assignedRequest = reqID
peer.lock.Unlock()
return true
}
// MustAssignRequest waits until the node can be assigned to the given request.
// It is always guaranteed that assignments are released in a short amount of time.
func (peer *ServerNode) MustAssignRequest(reqID uint64) {
peer.assignToken <- struct{}{}
peer.lock.Lock()
peer.assignedRequest = reqID
peer.lock.Unlock()
}
// DeassignRequest releases a request assignment in case the planned request
// is not being sent.
func (peer *ServerNode) DeassignRequest(reqID uint64) {
peer.lock.Lock()
if peer.assignedRequest == reqID {
peer.assignedRequest = 0
<-peer.assignToken
}
peer.lock.Unlock()
}
// IsAssigned returns true if the server node has already been assigned to a request
// (note that this function returning false does not guarantee that you can assign a request
// immediately afterwards, its only purpose is to help peer selection)
func (peer *ServerNode) IsAssigned() bool {
peer.lock.RLock()
locked := peer.assignedRequest != 0
peer.lock.RUnlock()
return locked
}
// blocks until request can be sent
func (peer *ServerNode) SendRequest(reqID, maxCost uint64) {
peer.lock.Lock() peer.lock.Lock()
defer peer.lock.Unlock() defer peer.lock.Unlock()
if peer.assignedRequest != reqID {
peer.lock.Unlock()
peer.MustAssignRequest(reqID)
peer.lock.Lock()
}
peer.recalcBLE(mclock.Now())
wait := peer.canSend(maxCost)
for wait > 0 {
peer.lock.Unlock()
time.Sleep(wait)
peer.lock.Lock()
peer.recalcBLE(mclock.Now())
wait = peer.canSend(maxCost)
}
peer.assignedRequest = 0
<-peer.assignToken
peer.bufEstimate -= maxCost peer.bufEstimate -= maxCost
peer.sumCost += maxCost peer.sumCost += maxCost
if reqID >= 0 { if reqID >= 0 {
@ -222,6 +162,8 @@ func (peer *ServerNode) SendRequest(reqID, maxCost uint64) {
} }
} }
// GotReply adjusts estimated buffer value according to the value included in
// the latest request reply.
func (peer *ServerNode) GotReply(reqID, bv uint64) { func (peer *ServerNode) GotReply(reqID, bv uint64) {
peer.lock.Lock() peer.lock.Lock()
@ -235,6 +177,10 @@ func (peer *ServerNode) GotReply(reqID, bv uint64) {
return return
} }
delete(peer.pending, reqID) delete(peer.pending, reqID)
peer.bufEstimate = bv - (peer.sumCost - sc) cc := peer.sumCost - sc
peer.bufEstimate = 0
if bv > cc {
peer.bufEstimate = bv - cc
}
peer.lastTime = mclock.Now() peer.lastTime = mclock.Now()
} }

View File

@ -102,6 +102,7 @@ type ProtocolManager struct {
odr *LesOdr odr *LesOdr
server *LesServer server *LesServer
serverPool *serverPool serverPool *serverPool
reqDist *requestDistributor
downloader *downloader.Downloader downloader *downloader.Downloader
fetcher *lightFetcher fetcher *lightFetcher
@ -203,8 +204,17 @@ func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, network
blockchain.InsertHeaderChain, nil, nil, blockchain.Rollback, removePeer) blockchain.InsertHeaderChain, nil, nil, blockchain.Rollback, removePeer)
} }
manager.reqDist = newRequestDistributor(func() map[distPeer]struct{} {
m := make(map[distPeer]struct{})
peers := manager.peers.AllPeers()
for _, peer := range peers {
m[peer] = struct{}{}
}
return m
}, manager.quitSync)
if odr != nil { if odr != nil {
odr.removePeer = removePeer odr.removePeer = removePeer
odr.reqDist = manager.reqDist
} }
/*validator := func(block *types.Block, parent *types.Block) error { /*validator := func(block *types.Block, parent *types.Block) error {
@ -334,17 +344,49 @@ func (pm *ProtocolManager) handle(p *peer) error {
if pm.lightSync { if pm.lightSync {
requestHeadersByHash := func(origin common.Hash, amount int, skip int, reverse bool) error { requestHeadersByHash := func(origin common.Hash, amount int, skip int, reverse bool) error {
reqID := getNextReqID() reqID := getNextReqID()
cost := p.GetRequestCost(GetBlockHeadersMsg, amount) rq := &distReq{
p.fcServer.MustAssignRequest(reqID) getCost: func(dp distPeer) uint64 {
p.fcServer.SendRequest(reqID, cost) peer := dp.(*peer)
return p.RequestHeadersByHash(reqID, cost, origin, amount, skip, reverse) return peer.GetRequestCost(GetBlockHeadersMsg, amount)
},
canSend: func(dp distPeer) bool {
return dp.(*peer) == p
},
request: func(dp distPeer) func() {
peer := dp.(*peer)
cost := peer.GetRequestCost(GetBlockHeadersMsg, amount)
peer.fcServer.QueueRequest(reqID, cost)
return func() { peer.RequestHeadersByHash(reqID, cost, origin, amount, skip, reverse) }
},
}
_, ok := <-pm.reqDist.queue(rq)
if !ok {
return ErrNoPeers
}
return nil
} }
requestHeadersByNumber := func(origin uint64, amount int, skip int, reverse bool) error { requestHeadersByNumber := func(origin uint64, amount int, skip int, reverse bool) error {
reqID := getNextReqID() reqID := getNextReqID()
cost := p.GetRequestCost(GetBlockHeadersMsg, amount) rq := &distReq{
p.fcServer.MustAssignRequest(reqID) getCost: func(dp distPeer) uint64 {
p.fcServer.SendRequest(reqID, cost) peer := dp.(*peer)
return p.RequestHeadersByNumber(reqID, cost, origin, amount, skip, reverse) return peer.GetRequestCost(GetBlockHeadersMsg, amount)
},
canSend: func(dp distPeer) bool {
return dp.(*peer) == p
},
request: func(dp distPeer) func() {
peer := dp.(*peer)
cost := peer.GetRequestCost(GetBlockHeadersMsg, amount)
peer.fcServer.QueueRequest(reqID, cost)
return func() { peer.RequestHeadersByNumber(reqID, cost, origin, amount, skip, reverse) }
},
}
_, ok := <-pm.reqDist.queue(rq)
if !ok {
return ErrNoPeers
}
return nil
} }
if err := pm.downloader.RegisterPeer(p.id, ethVersion, p.HeadAndTd, if err := pm.downloader.RegisterPeer(p.id, ethVersion, p.HeadAndTd,
requestHeadersByHash, requestHeadersByNumber, nil, nil, nil); err != nil { requestHeadersByHash, requestHeadersByNumber, nil, nil, nil); err != nil {
@ -884,7 +926,13 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
} }
if deliverMsg != nil { if deliverMsg != nil {
return pm.odr.Deliver(p, deliverMsg) err := pm.odr.Deliver(p, deliverMsg)
if err != nil {
p.responseErrors++
if p.responseErrors > maxResponseErrors {
return err
}
}
} }
return nil return nil
} }

View File

@ -352,11 +352,15 @@ func (p *testServerPool) setPeer(peer *peer) {
p.peer = peer p.peer = peer
} }
func (p *testServerPool) selectPeerWait(uint64, func(*peer) (bool, time.Duration), <-chan struct{}) *peer { func (p *testServerPool) getAllPeers() map[distPeer]struct{} {
p.lock.RLock() p.lock.RLock()
defer p.lock.RUnlock() defer p.lock.RUnlock()
return p.peer m := make(map[distPeer]struct{})
if p.peer != nil {
m[p.peer] = struct{}{}
}
return m
} }
func (p *testServerPool) adjustResponseTime(*poolEntry, time.Duration, bool) { func (p *testServerPool) adjustResponseTime(*poolEntry, time.Duration, bool) {

View File

@ -32,14 +32,12 @@ import (
var ( var (
softRequestTimeout = time.Millisecond * 500 softRequestTimeout = time.Millisecond * 500
hardRequestTimeout = time.Second * 10 hardRequestTimeout = time.Second * 10
retryPeers = time.Second * 1
) )
// peerDropFn is a callback type for dropping a peer detected as malicious. // peerDropFn is a callback type for dropping a peer detected as malicious.
type peerDropFn func(id string) type peerDropFn func(id string)
type odrPeerSelector interface { type odrPeerSelector interface {
selectPeerWait(uint64, func(*peer) (bool, time.Duration), <-chan struct{}) *peer
adjustResponseTime(*poolEntry, time.Duration, bool) adjustResponseTime(*poolEntry, time.Duration, bool)
} }
@ -51,6 +49,7 @@ type LesOdr struct {
mlock, clock sync.Mutex mlock, clock sync.Mutex
sentReqs map[uint64]*sentReq sentReqs map[uint64]*sentReq
serverPool odrPeerSelector serverPool odrPeerSelector
reqDist *requestDistributor
} }
func NewLesOdr(db ethdb.Database) *LesOdr { func NewLesOdr(db ethdb.Database) *LesOdr {
@ -165,18 +164,48 @@ func (self *LesOdr) requestPeer(req *sentReq, peer *peer, delivered, timeout cha
func (self *LesOdr) networkRequest(ctx context.Context, lreq LesOdrRequest) error { func (self *LesOdr) networkRequest(ctx context.Context, lreq LesOdrRequest) error {
answered := make(chan struct{}) answered := make(chan struct{})
req := &sentReq{ req := &sentReq{
valFunc: lreq.Valid, valFunc: lreq.Validate,
sentTo: make(map[*peer]chan struct{}), sentTo: make(map[*peer]chan struct{}),
answered: answered, // reply delivered by any peer answered: answered, // reply delivered by any peer
} }
reqID := getNextReqID()
self.mlock.Lock() exclude := make(map[*peer]struct{})
self.sentReqs[reqID] = req
self.mlock.Unlock()
reqWg := new(sync.WaitGroup) reqWg := new(sync.WaitGroup)
reqWg.Add(1) reqWg.Add(1)
defer reqWg.Done() defer reqWg.Done()
var timeout chan struct{}
reqID := getNextReqID()
rq := &distReq{
getCost: func(dp distPeer) uint64 {
return lreq.GetCost(dp.(*peer))
},
canSend: func(dp distPeer) bool {
p := dp.(*peer)
_, ok := exclude[p]
return !ok && lreq.CanSend(p)
},
request: func(dp distPeer) func() {
p := dp.(*peer)
exclude[p] = struct{}{}
delivered := make(chan struct{})
timeout = make(chan struct{})
req.lock.Lock()
req.sentTo[p] = delivered
req.lock.Unlock()
reqWg.Add(1)
cost := lreq.GetCost(p)
p.fcServer.QueueRequest(reqID, cost)
go self.requestPeer(req, p, delivered, timeout, reqWg)
return func() { lreq.Request(reqID, p) }
},
}
self.mlock.Lock()
self.sentReqs[reqID] = req
self.mlock.Unlock()
go func() { go func() {
reqWg.Wait() reqWg.Wait()
self.mlock.Lock() self.mlock.Lock()
@ -184,37 +213,20 @@ func (self *LesOdr) networkRequest(ctx context.Context, lreq LesOdrRequest) erro
self.mlock.Unlock() self.mlock.Unlock()
}() }()
exclude := make(map[*peer]struct{})
for { for {
var p *peer peerChn := self.reqDist.queue(rq)
if self.serverPool != nil {
p = self.serverPool.selectPeerWait(reqID, func(p *peer) (bool, time.Duration) {
if _, ok := exclude[p]; ok || !lreq.CanSend(p) {
return false, 0
}
return true, p.fcServer.CanSend(lreq.GetCost(p))
}, ctx.Done())
}
if p == nil {
select { select {
case <-ctx.Done(): case <-ctx.Done():
self.reqDist.cancel(rq)
return ctx.Err() return ctx.Err()
case <-req.answered: case <-answered:
self.reqDist.cancel(rq)
return nil return nil
case <-time.After(retryPeers): case _, ok := <-peerChn:
if !ok {
return ErrNoPeers
}
} }
} else {
exclude[p] = struct{}{}
delivered := make(chan struct{})
timeout := make(chan struct{})
req.lock.Lock()
req.sentTo[p] = delivered
req.lock.Unlock()
reqWg.Add(1)
cost := lreq.GetCost(p)
p.fcServer.SendRequest(reqID, cost)
go self.requestPeer(req, p, delivered, timeout, reqWg)
lreq.Request(reqID, p)
select { select {
case <-ctx.Done(): case <-ctx.Done():
@ -225,9 +237,8 @@ func (self *LesOdr) networkRequest(ctx context.Context, lreq LesOdrRequest) erro
} }
} }
} }
}
// Retrieve tries to fetch an object from the local db, then from the LES network. // Retrieve tries to fetch an object from the LES network.
// If the network retrieval was successful, it stores the object in local db. // If the network retrieval was successful, it stores the object in local db.
func (self *LesOdr) Retrieve(ctx context.Context, req light.OdrRequest) (err error) { func (self *LesOdr) Retrieve(ctx context.Context, req light.OdrRequest) (err error) {
lreq := LesRequest(req) lreq := LesRequest(req)

View File

@ -49,7 +49,7 @@ type LesOdrRequest interface {
GetCost(*peer) uint64 GetCost(*peer) uint64
CanSend(*peer) bool CanSend(*peer) bool
Request(uint64, *peer) error Request(uint64, *peer) error
Valid(ethdb.Database, *Msg) error // if true, keeps the retrieved object Validate(ethdb.Database, *Msg) error
} }
func LesRequest(req light.OdrRequest) LesOdrRequest { func LesRequest(req light.OdrRequest) LesOdrRequest {
@ -92,7 +92,7 @@ func (r *BlockRequest) Request(reqID uint64, peer *peer) error {
// Valid processes an ODR request reply message from the LES network // Valid processes an ODR request reply message from the LES network
// returns true and stores results in memory if the message was a valid reply // returns true and stores results in memory if the message was a valid reply
// to the request (implementation of LesOdrRequest) // to the request (implementation of LesOdrRequest)
func (r *BlockRequest) Valid(db ethdb.Database, msg *Msg) error { func (r *BlockRequest) Validate(db ethdb.Database, msg *Msg) error {
log.Debug("Validating block body", "hash", r.Hash) log.Debug("Validating block body", "hash", r.Hash)
// Ensure we have a correct message with a single block body // Ensure we have a correct message with a single block body
@ -148,7 +148,7 @@ func (r *ReceiptsRequest) Request(reqID uint64, peer *peer) error {
// Valid processes an ODR request reply message from the LES network // Valid processes an ODR request reply message from the LES network
// returns true and stores results in memory if the message was a valid reply // returns true and stores results in memory if the message was a valid reply
// to the request (implementation of LesOdrRequest) // to the request (implementation of LesOdrRequest)
func (r *ReceiptsRequest) Valid(db ethdb.Database, msg *Msg) error { func (r *ReceiptsRequest) Validate(db ethdb.Database, msg *Msg) error {
log.Debug("Validating block receipts", "hash", r.Hash) log.Debug("Validating block receipts", "hash", r.Hash)
// Ensure we have a correct message with a single block receipt // Ensure we have a correct message with a single block receipt
@ -208,7 +208,7 @@ func (r *TrieRequest) Request(reqID uint64, peer *peer) error {
// Valid processes an ODR request reply message from the LES network // Valid processes an ODR request reply message from the LES network
// returns true and stores results in memory if the message was a valid reply // returns true and stores results in memory if the message was a valid reply
// to the request (implementation of LesOdrRequest) // to the request (implementation of LesOdrRequest)
func (r *TrieRequest) Valid(db ethdb.Database, msg *Msg) error { func (r *TrieRequest) Validate(db ethdb.Database, msg *Msg) error {
log.Debug("Validating trie proof", "root", r.Id.Root, "key", r.Key) log.Debug("Validating trie proof", "root", r.Id.Root, "key", r.Key)
// Ensure we have a correct message with a single proof // Ensure we have a correct message with a single proof
@ -259,7 +259,7 @@ func (r *CodeRequest) Request(reqID uint64, peer *peer) error {
// Valid processes an ODR request reply message from the LES network // Valid processes an ODR request reply message from the LES network
// returns true and stores results in memory if the message was a valid reply // returns true and stores results in memory if the message was a valid reply
// to the request (implementation of LesOdrRequest) // to the request (implementation of LesOdrRequest)
func (r *CodeRequest) Valid(db ethdb.Database, msg *Msg) error { func (r *CodeRequest) Validate(db ethdb.Database, msg *Msg) error {
log.Debug("Validating code data", "hash", r.Hash) log.Debug("Validating code data", "hash", r.Hash)
// Ensure we have a correct message with a single code element // Ensure we have a correct message with a single code element
@ -319,7 +319,7 @@ func (r *ChtRequest) Request(reqID uint64, peer *peer) error {
// Valid processes an ODR request reply message from the LES network // Valid processes an ODR request reply message from the LES network
// returns true and stores results in memory if the message was a valid reply // returns true and stores results in memory if the message was a valid reply
// to the request (implementation of LesOdrRequest) // to the request (implementation of LesOdrRequest)
func (r *ChtRequest) Valid(db ethdb.Database, msg *Msg) error { func (r *ChtRequest) Validate(db ethdb.Database, msg *Msg) error {
log.Debug("Validating CHT", "cht", r.ChtNum, "block", r.BlockNum) log.Debug("Validating CHT", "cht", r.ChtNum, "block", r.BlockNum)
// Ensure we have a correct message with a single proof element // Ensure we have a correct message with a single proof element

View File

@ -162,8 +162,11 @@ func testOdr(t *testing.T, protocol int, expFail uint64, fn odrTestFn) {
lpm, ldb, odr := newTestProtocolManagerMust(t, true, 0, nil) lpm, ldb, odr := newTestProtocolManagerMust(t, true, 0, nil)
_, err1, lpeer, err2 := newTestPeerPair("peer", protocol, pm, lpm) _, err1, lpeer, err2 := newTestPeerPair("peer", protocol, pm, lpm)
pool := &testServerPool{} pool := &testServerPool{}
lpm.reqDist = newRequestDistributor(pool.getAllPeers, lpm.quitSync)
odr.reqDist = lpm.reqDist
pool.setPeer(lpeer) pool.setPeer(lpeer)
odr.serverPool = pool odr.serverPool = pool
lpeer.hasBlock = func(common.Hash, uint64) bool { return true }
select { select {
case <-time.After(time.Millisecond * 100): case <-time.After(time.Millisecond * 100):
case err := <-err1: case err := <-err1:

View File

@ -22,6 +22,7 @@ import (
"fmt" "fmt"
"math/big" "math/big"
"sync" "sync"
"time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
@ -37,7 +38,10 @@ var (
errNotRegistered = errors.New("peer is not registered") errNotRegistered = errors.New("peer is not registered")
) )
const maxHeadInfoLen = 20 const (
maxHeadInfoLen = 20
maxResponseErrors = 50 // number of invalid responses tolerated (makes the protocol less brittle but still avoids spam)
)
type peer struct { type peer struct {
*p2p.Peer *p2p.Peer
@ -53,9 +57,11 @@ type peer struct {
lock sync.RWMutex lock sync.RWMutex
announceChn chan announceData announceChn chan announceData
sendQueue *execQueue
poolEntry *poolEntry poolEntry *poolEntry
hasBlock func(common.Hash, uint64) bool hasBlock func(common.Hash, uint64) bool
responseErrors int
fcClient *flowcontrol.ClientNode // nil if the peer is server only fcClient *flowcontrol.ClientNode // nil if the peer is server only
fcServer *flowcontrol.ServerNode // nil if the peer is client only fcServer *flowcontrol.ServerNode // nil if the peer is client only
@ -76,6 +82,14 @@ func newPeer(version, network int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
} }
} }
func (p *peer) canQueue() bool {
return p.sendQueue.canQueue()
}
func (p *peer) queueSend(f func()) {
p.sendQueue.queue(f)
}
// Info gathers and returns a collection of metadata known about a peer. // Info gathers and returns a collection of metadata known about a peer.
func (p *peer) Info() *eth.PeerInfo { func (p *peer) Info() *eth.PeerInfo {
return &eth.PeerInfo{ return &eth.PeerInfo{
@ -117,6 +131,11 @@ func (p *peer) Td() *big.Int {
return new(big.Int).Set(p.headInfo.Td) return new(big.Int).Set(p.headInfo.Td)
} }
// waitBefore implements distPeer interface
func (p *peer) waitBefore(maxCost uint64) (time.Duration, float64) {
return p.fcServer.CanSend(maxCost)
}
func sendRequest(w p2p.MsgWriter, msgcode, reqID, cost uint64, data interface{}) error { func sendRequest(w p2p.MsgWriter, msgcode, reqID, cost uint64, data interface{}) error {
type req struct { type req struct {
ReqID uint64 ReqID uint64
@ -237,11 +256,8 @@ func (p *peer) RequestHeaderProofs(reqID, cost uint64, reqs []*ChtReq) error {
return sendRequest(p.rw, GetHeaderProofsMsg, reqID, cost, reqs) return sendRequest(p.rw, GetHeaderProofsMsg, reqID, cost, reqs)
} }
func (p *peer) SendTxs(cost uint64, txs types.Transactions) error { func (p *peer) SendTxs(reqID, cost uint64, txs types.Transactions) error {
p.Log().Debug("Fetching batch of transactions", "count", len(txs)) p.Log().Debug("Fetching batch of transactions", "count", len(txs))
reqID := getNextReqID()
p.fcServer.MustAssignRequest(reqID)
p.fcServer.SendRequest(reqID, cost)
return p2p.Send(p.rw, SendTxMsg, txs) return p2p.Send(p.rw, SendTxMsg, txs)
} }
@ -444,6 +460,7 @@ func (ps *peerSet) Register(p *peer) error {
return errAlreadyRegistered return errAlreadyRegistered
} }
ps.peers[p.id] = p ps.peers[p.id] = p
p.sendQueue = newExecQueue(100)
return nil return nil
} }
@ -453,8 +470,10 @@ func (ps *peerSet) Unregister(id string) error {
ps.lock.Lock() ps.lock.Lock()
defer ps.lock.Unlock() defer ps.lock.Unlock()
if _, ok := ps.peers[id]; !ok { if p, ok := ps.peers[id]; !ok {
return errNotRegistered return errNotRegistered
} else {
p.sendQueue.quit()
} }
delete(ps.peers, id) delete(ps.peers, id)
return nil return nil

View File

@ -72,8 +72,11 @@ func testAccess(t *testing.T, protocol int, fn accessTestFn) {
lpm, ldb, odr := newTestProtocolManagerMust(t, true, 0, nil) lpm, ldb, odr := newTestProtocolManagerMust(t, true, 0, nil)
_, err1, lpeer, err2 := newTestPeerPair("peer", protocol, pm, lpm) _, err1, lpeer, err2 := newTestPeerPair("peer", protocol, pm, lpm)
pool := &testServerPool{} pool := &testServerPool{}
lpm.reqDist = newRequestDistributor(pool.getAllPeers, lpm.quitSync)
odr.reqDist = lpm.reqDist
pool.setPeer(lpeer) pool.setPeer(lpeer)
odr.serverPool = pool odr.serverPool = pool
lpeer.hasBlock = func(common.Hash, uint64) bool { return true }
select { select {
case <-time.After(time.Millisecond * 100): case <-time.After(time.Millisecond * 100):
case err := <-err1: case err := <-err1:

View File

@ -268,82 +268,6 @@ func (pool *serverPool) adjustResponseTime(entry *poolEntry, time time.Duration,
} }
} }
type selectPeerItem struct {
peer *peer
weight int64
wait time.Duration
}
func (sp selectPeerItem) Weight() int64 {
return sp.weight
}
// selectPeer selects a suitable peer for a request, also returning a necessary waiting time to perform the request
// and a "locked" flag meaning that the request has been assigned to the given peer and its execution is guaranteed
// after the given waiting time. If locked flag is false, selectPeer should be called again after the waiting time.
func (pool *serverPool) selectPeer(reqID uint64, canSend func(*peer) (bool, time.Duration)) (*peer, time.Duration, bool) {
pool.lock.Lock()
type selectPeer struct {
peer *peer
rstat, tstat float64
}
var list []selectPeer
sel := newWeightedRandomSelect()
for _, entry := range pool.entries {
if entry.state == psRegistered {
if !entry.peer.fcServer.IsAssigned() {
list = append(list, selectPeer{entry.peer, entry.responseStats.recentAvg(), entry.timeoutStats.recentAvg()})
}
}
}
pool.lock.Unlock()
for _, sp := range list {
ok, wait := canSend(sp.peer)
if ok {
w := int64(1000000000 * (peerSelectMinWeight + math.Exp(-(sp.rstat+float64(wait))/float64(responseScoreTC))*math.Pow((1-sp.tstat), timeoutPow)))
sel.update(selectPeerItem{peer: sp.peer, weight: w, wait: wait})
}
}
choice := sel.choose()
if choice == nil {
return nil, 0, false
}
peer, wait := choice.(selectPeerItem).peer, choice.(selectPeerItem).wait
locked := false
if wait < time.Millisecond*100 {
if peer.fcServer.AssignRequest(reqID) {
ok, w := canSend(peer)
wait = time.Duration(w)
if ok && wait < time.Millisecond*100 {
locked = true
} else {
peer.fcServer.DeassignRequest(reqID)
wait = time.Millisecond * 100
}
}
} else {
wait = time.Millisecond * 100
}
return peer, wait, locked
}
// selectPeer selects a suitable peer for a request, waiting until an assignment to
// the request is guaranteed or the process is aborted.
func (pool *serverPool) selectPeerWait(reqID uint64, canSend func(*peer) (bool, time.Duration), abort <-chan struct{}) *peer {
for {
peer, wait, locked := pool.selectPeer(reqID, canSend)
if locked {
return peer
}
select {
case <-abort:
return nil
case <-time.After(wait):
}
}
}
// eventLoop handles pool events and mutex locking for all internal functions // eventLoop handles pool events and mutex locking for all internal functions
func (pool *serverPool) eventLoop() { func (pool *serverPool) eventLoop() {
lookupCnt := 0 lookupCnt := 0

View File

@ -35,13 +35,14 @@ type LesTxRelay struct {
peerList []*peer peerList []*peer
peerStartPos int peerStartPos int
lock sync.RWMutex lock sync.RWMutex
reqDist *requestDistributor
} }
func NewLesTxRelay() *LesTxRelay { func NewLesTxRelay() *LesTxRelay {
return &LesTxRelay{ return &LesTxRelay{
txSent: make(map[common.Hash]*ltrInfo), txSent: make(map[common.Hash]*ltrInfo),
txPending: make(map[common.Hash]struct{}), txPending: make(map[common.Hash]struct{}),
ps: newPeerSet(),
} }
} }
@ -108,10 +109,26 @@ func (self *LesTxRelay) send(txs types.Transactions, count int) {
} }
for p, list := range sendTo { for p, list := range sendTo {
cost := p.GetRequestCost(SendTxMsg, len(list)) pp := p
go func(p *peer, list types.Transactions, cost uint64) { ll := list
p.SendTxs(cost, list)
}(p, list, cost) reqID := getNextReqID()
rq := &distReq{
getCost: func(dp distPeer) uint64 {
peer := dp.(*peer)
return peer.GetRequestCost(SendTxMsg, len(ll))
},
canSend: func(dp distPeer) bool {
return dp.(*peer) == pp
},
request: func(dp distPeer) func() {
peer := dp.(*peer)
cost := peer.GetRequestCost(SendTxMsg, len(ll))
peer.fcServer.QueueRequest(reqID, cost)
return func() { peer.SendTxs(reqID, cost, ll) }
},
}
self.reqDist.queue(rq)
} }
} }

View File

@ -20,6 +20,7 @@ import (
"math/big" "math/big"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core"
@ -369,9 +370,17 @@ func (self *LightChain) postChainEvents(events []interface{}) {
// In the case of a light chain, InsertHeaderChain also creates and posts light // In the case of a light chain, InsertHeaderChain also creates and posts light
// chain events when necessary. // chain events when necessary.
func (self *LightChain) InsertHeaderChain(chain []*types.Header, checkFreq int) (int, error) { func (self *LightChain) InsertHeaderChain(chain []*types.Header, checkFreq int) (int, error) {
start := time.Now()
if i, err := self.hc.ValidateHeaderChain(chain, checkFreq); err != nil {
return i, err
}
// Make sure only one thread manipulates the chain at once // Make sure only one thread manipulates the chain at once
self.chainmu.Lock() self.chainmu.Lock()
defer self.chainmu.Unlock() defer func() {
self.chainmu.Unlock()
time.Sleep(time.Millisecond * 10) // ugly hack; do not hog chain lock in case syncing is CPU-limited by validation
}()
self.wg.Add(1) self.wg.Add(1)
defer self.wg.Done() defer self.wg.Done()
@ -397,7 +406,7 @@ func (self *LightChain) InsertHeaderChain(chain []*types.Header, checkFreq int)
} }
return err return err
} }
i, err := self.hc.InsertHeaderChain(chain, checkFreq, whFunc) i, err := self.hc.InsertHeaderChain(chain, whFunc, start)
go self.postChainEvents(events) go self.postChainEvents(events)
return i, err return i, err
} }

View File

@ -276,6 +276,7 @@ func (pool *TxPool) setNewHead(ctx context.Context, newHeader *types.Header) (tx
// clear old mined tx entries of old blocks // clear old mined tx entries of old blocks
if idx := newHeader.Number.Uint64(); idx > pool.clearIdx+txPermanent { if idx := newHeader.Number.Uint64(); idx > pool.clearIdx+txPermanent {
idx2 := idx - txPermanent idx2 := idx - txPermanent
if len(pool.mined) > 0 {
for i := pool.clearIdx; i < idx2; i++ { for i := pool.clearIdx; i < idx2; i++ {
hash := core.GetCanonicalHash(pool.chainDb, i) hash := core.GetCanonicalHash(pool.chainDb, i)
if list, ok := pool.mined[hash]; ok { if list, ok := pool.mined[hash]; ok {
@ -287,6 +288,7 @@ func (pool *TxPool) setNewHead(ctx context.Context, newHeader *types.Header) (tx
delete(pool.mined, hash) delete(pool.mined, hash)
} }
} }
}
pool.clearIdx = idx2 pool.clearIdx = idx2
} }
@ -303,15 +305,16 @@ func (pool *TxPool) eventLoop() {
for ev := range pool.events.Chan() { for ev := range pool.events.Chan() {
switch ev.Data.(type) { switch ev.Data.(type) {
case core.ChainHeadEvent: case core.ChainHeadEvent:
head := pool.chain.CurrentHeader()
pool.mu.Lock() pool.mu.Lock()
ctx, _ := context.WithTimeout(context.Background(), blockCheckTimeout) ctx, _ := context.WithTimeout(context.Background(), blockCheckTimeout)
head := pool.chain.CurrentHeader()
txc, _ := pool.setNewHead(ctx, head) txc, _ := pool.setNewHead(ctx, head)
m, r := txc.getLists() m, r := txc.getLists()
pool.relay.NewHead(pool.head, m, r) pool.relay.NewHead(pool.head, m, r)
pool.homestead = pool.config.IsHomestead(head.Number) pool.homestead = pool.config.IsHomestead(head.Number)
pool.signer = types.MakeSigner(pool.config, head.Number) pool.signer = types.MakeSigner(pool.config, head.Number)
pool.mu.Unlock() pool.mu.Unlock()
time.Sleep(time.Millisecond) // hack in order to avoid hogging the lock; this part will be replaced by a subsequent PR
} }
} }
} }