2020-05-22 06:46:34 -05:00
|
|
|
// Copyright 2020 The go-ethereum Authors
|
2016-11-17 08:54:24 -06:00
|
|
|
// This file is part of the go-ethereum library.
|
|
|
|
//
|
|
|
|
// The go-ethereum library is free software: you can redistribute it and/or modify
|
|
|
|
// it under the terms of the GNU Lesser General Public License as published by
|
|
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
|
|
// (at your option) any later version.
|
|
|
|
//
|
|
|
|
// The go-ethereum library is distributed in the hope that it will be useful,
|
|
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
|
// GNU Lesser General Public License for more details.
|
|
|
|
//
|
|
|
|
// You should have received a copy of the GNU Lesser General Public License
|
|
|
|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
|
|
|
|
package les
|
|
|
|
|
|
|
|
import (
|
2020-05-22 06:46:34 -05:00
|
|
|
"errors"
|
2016-11-17 08:54:24 -06:00
|
|
|
"math/rand"
|
2020-05-22 06:46:34 -05:00
|
|
|
"reflect"
|
2016-11-17 08:54:24 -06:00
|
|
|
"sync"
|
2020-05-22 06:46:34 -05:00
|
|
|
"sync/atomic"
|
2016-11-17 08:54:24 -06:00
|
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/ethereum/go-ethereum/common/mclock"
|
|
|
|
"github.com/ethereum/go-ethereum/ethdb"
|
2020-05-22 06:46:34 -05:00
|
|
|
lpc "github.com/ethereum/go-ethereum/les/lespay/client"
|
2020-03-31 10:17:24 -05:00
|
|
|
"github.com/ethereum/go-ethereum/les/utils"
|
2017-02-22 06:10:07 -06:00
|
|
|
"github.com/ethereum/go-ethereum/log"
|
all: new p2p node representation (#17643)
Package p2p/enode provides a generalized representation of p2p nodes
which can contain arbitrary information in key/value pairs. It is also
the new home for the node database. The "v4" identity scheme is also
moved here from p2p/enr to remove the dependency on Ethereum crypto from
that package.
Record signature handling is changed significantly. The identity scheme
registry is removed and acceptable schemes must be passed to any method
that needs identity. This means records must now be validated explicitly
after decoding.
The enode API is designed to make signature handling easy and safe: most
APIs around the codebase work with enode.Node, which is a wrapper around
a valid record. Going from enr.Record to enode.Node requires a valid
signature.
* p2p/discover: port to p2p/enode
This ports the discovery code to the new node representation in
p2p/enode. The wire protocol is unchanged, this can be considered a
refactoring change. The Kademlia table can now deal with nodes using an
arbitrary identity scheme. This requires a few incompatible API changes:
- Table.Lookup is not available anymore. It used to take a public key
as argument because v4 protocol requires one. Its replacement is
LookupRandom.
- Table.Resolve takes *enode.Node instead of NodeID. This is also for
v4 protocol compatibility because nodes cannot be looked up by ID
alone.
- Types Node and NodeID are gone. Further commits in the series will be
fixes all over the the codebase to deal with those removals.
* p2p: port to p2p/enode and discovery changes
This adapts package p2p to the changes in p2p/discover. All uses of
discover.Node and discover.NodeID are replaced by their equivalents from
p2p/enode.
New API is added to retrieve the enode.Node instance of a peer. The
behavior of Server.Self with discovery disabled is improved. It now
tries much harder to report a working IP address, falling back to
127.0.0.1 if no suitable address can be determined through other means.
These changes were needed for tests of other packages later in the
series.
* p2p/simulations, p2p/testing: port to p2p/enode
No surprises here, mostly replacements of discover.Node, discover.NodeID
with their new equivalents. The 'interesting' API changes are:
- testing.ProtocolSession tracks complete nodes, not just their IDs.
- adapters.NodeConfig has a new method to create a complete node.
These changes were needed to make swarm tests work.
Note that the NodeID change makes the code incompatible with old
simulation snapshots.
* whisper/whisperv5, whisper/whisperv6: port to p2p/enode
This port was easy because whisper uses []byte for node IDs and
URL strings in the API.
* eth: port to p2p/enode
Again, easy to port because eth uses strings for node IDs and doesn't
care about node information in any way.
* les: port to p2p/enode
Apart from replacing discover.NodeID with enode.ID, most changes are in
the server pool code. It now deals with complete nodes instead
of (Pubkey, IP, Port) triples. The database format is unchanged for now,
but we should probably change it to use the node database later.
* node: port to p2p/enode
This change simply replaces discover.Node and discover.NodeID with their
new equivalents.
* swarm/network: port to p2p/enode
Swarm has its own node address representation, BzzAddr, containing both
an overlay address (the hash of a secp256k1 public key) and an underlay
address (enode:// URL).
There are no changes to the BzzAddr format in this commit, but certain
operations such as creating a BzzAddr from a node ID are now impossible
because node IDs aren't public keys anymore.
Most swarm-related changes in the series remove uses of
NewAddrFromNodeID, replacing it with NewAddr which takes a complete node
as argument. ToOverlayAddr is removed because we can just use the node
ID directly.
2018-09-24 17:59:00 -05:00
|
|
|
"github.com/ethereum/go-ethereum/p2p/enode"
|
2020-05-22 06:46:34 -05:00
|
|
|
"github.com/ethereum/go-ethereum/p2p/enr"
|
|
|
|
"github.com/ethereum/go-ethereum/p2p/nodestate"
|
2016-11-17 08:54:24 -06:00
|
|
|
"github.com/ethereum/go-ethereum/rlp"
|
|
|
|
)
|
|
|
|
|
|
|
|
const (
|
2020-05-22 06:46:34 -05:00
|
|
|
minTimeout = time.Millisecond * 500 // minimum request timeout suggested by the server pool
|
|
|
|
timeoutRefresh = time.Second * 5 // recalculate timeout if older than this
|
|
|
|
dialCost = 10000 // cost of a TCP dial (used for known node selection weight calculation)
|
|
|
|
dialWaitStep = 1.5 // exponential multiplier of redial wait time when no value was provided by the server
|
|
|
|
queryCost = 500 // cost of a UDP pre-negotiation query
|
|
|
|
queryWaitStep = 1.02 // exponential multiplier of redial wait time when no value was provided by the server
|
|
|
|
waitThreshold = time.Hour * 2000 // drop node if waiting time is over the threshold
|
|
|
|
nodeWeightMul = 1000000 // multiplier constant for node weight calculation
|
|
|
|
nodeWeightThreshold = 100 // minimum weight for keeping a node in the the known (valuable) set
|
|
|
|
minRedialWait = 10 // minimum redial wait time in seconds
|
|
|
|
preNegLimit = 5 // maximum number of simultaneous pre-negotiation queries
|
|
|
|
maxQueryFails = 100 // number of consecutive UDP query failures before we print a warning
|
2016-11-17 08:54:24 -06:00
|
|
|
)
|
|
|
|
|
2020-05-22 06:46:34 -05:00
|
|
|
// serverPool provides a node iterator for dial candidates. The output is a mix of newly discovered
|
|
|
|
// nodes, a weighted random selection of known (previously valuable) nodes and trusted/paid nodes.
|
2016-11-17 08:54:24 -06:00
|
|
|
type serverPool struct {
|
2020-05-22 06:46:34 -05:00
|
|
|
clock mclock.Clock
|
|
|
|
unixTime func() int64
|
|
|
|
db ethdb.KeyValueStore
|
|
|
|
|
|
|
|
ns *nodestate.NodeStateMachine
|
|
|
|
vt *lpc.ValueTracker
|
|
|
|
mixer *enode.FairMix
|
|
|
|
mixSources []enode.Iterator
|
|
|
|
dialIterator enode.Iterator
|
|
|
|
validSchemes enr.IdentityScheme
|
|
|
|
trustedURLs []string
|
|
|
|
fillSet *lpc.FillSet
|
|
|
|
queryFails uint32
|
|
|
|
|
|
|
|
timeoutLock sync.RWMutex
|
|
|
|
timeout time.Duration
|
|
|
|
timeWeights lpc.ResponseTimeWeights
|
|
|
|
timeoutRefreshed mclock.AbsTime
|
|
|
|
}
|
|
|
|
|
|
|
|
// nodeHistory keeps track of dial costs which determine node weight together with the
|
|
|
|
// service value calculated by lpc.ValueTracker.
|
|
|
|
type nodeHistory struct {
|
|
|
|
dialCost utils.ExpiredValue
|
|
|
|
redialWaitStart, redialWaitEnd int64 // unix time (seconds)
|
|
|
|
}
|
|
|
|
|
|
|
|
type nodeHistoryEnc struct {
|
|
|
|
DialCost utils.ExpiredValue
|
|
|
|
RedialWaitStart, RedialWaitEnd uint64
|
|
|
|
}
|
|
|
|
|
|
|
|
// queryFunc sends a pre-negotiation query and blocks until a response arrives or timeout occurs.
|
|
|
|
// It returns 1 if the remote node has confirmed that connection is possible, 0 if not
|
|
|
|
// possible and -1 if no response arrived (timeout).
|
|
|
|
type queryFunc func(*enode.Node) int
|
|
|
|
|
|
|
|
var (
|
|
|
|
serverPoolSetup = &nodestate.Setup{Version: 1}
|
|
|
|
sfHasValue = serverPoolSetup.NewPersistentFlag("hasValue")
|
|
|
|
sfQueried = serverPoolSetup.NewFlag("queried")
|
|
|
|
sfCanDial = serverPoolSetup.NewFlag("canDial")
|
|
|
|
sfDialing = serverPoolSetup.NewFlag("dialed")
|
|
|
|
sfWaitDialTimeout = serverPoolSetup.NewFlag("dialTimeout")
|
|
|
|
sfConnected = serverPoolSetup.NewFlag("connected")
|
|
|
|
sfRedialWait = serverPoolSetup.NewFlag("redialWait")
|
|
|
|
sfAlwaysConnect = serverPoolSetup.NewFlag("alwaysConnect")
|
|
|
|
sfDisableSelection = nodestate.MergeFlags(sfQueried, sfCanDial, sfDialing, sfConnected, sfRedialWait)
|
|
|
|
|
|
|
|
sfiNodeHistory = serverPoolSetup.NewPersistentField("nodeHistory", reflect.TypeOf(nodeHistory{}),
|
|
|
|
func(field interface{}) ([]byte, error) {
|
|
|
|
if n, ok := field.(nodeHistory); ok {
|
|
|
|
ne := nodeHistoryEnc{
|
|
|
|
DialCost: n.dialCost,
|
|
|
|
RedialWaitStart: uint64(n.redialWaitStart),
|
|
|
|
RedialWaitEnd: uint64(n.redialWaitEnd),
|
|
|
|
}
|
|
|
|
enc, err := rlp.EncodeToBytes(&ne)
|
|
|
|
return enc, err
|
|
|
|
} else {
|
|
|
|
return nil, errors.New("invalid field type")
|
2020-02-18 05:24:05 -06:00
|
|
|
}
|
2020-05-22 06:46:34 -05:00
|
|
|
},
|
|
|
|
func(enc []byte) (interface{}, error) {
|
|
|
|
var ne nodeHistoryEnc
|
|
|
|
err := rlp.DecodeBytes(enc, &ne)
|
|
|
|
n := nodeHistory{
|
|
|
|
dialCost: ne.DialCost,
|
|
|
|
redialWaitStart: int64(ne.RedialWaitStart),
|
|
|
|
redialWaitEnd: int64(ne.RedialWaitEnd),
|
|
|
|
}
|
|
|
|
return n, err
|
|
|
|
},
|
|
|
|
)
|
|
|
|
sfiNodeWeight = serverPoolSetup.NewField("nodeWeight", reflect.TypeOf(uint64(0)))
|
|
|
|
sfiConnectedStats = serverPoolSetup.NewField("connectedStats", reflect.TypeOf(lpc.ResponseTimeStats{}))
|
2016-11-29 23:02:08 -06:00
|
|
|
)
|
|
|
|
|
2020-05-22 06:46:34 -05:00
|
|
|
// newServerPool creates a new server pool
|
|
|
|
func newServerPool(db ethdb.KeyValueStore, dbKey []byte, vt *lpc.ValueTracker, discovery enode.Iterator, mixTimeout time.Duration, query queryFunc, clock mclock.Clock, trustedURLs []string) *serverPool {
|
|
|
|
s := &serverPool{
|
|
|
|
db: db,
|
|
|
|
clock: clock,
|
|
|
|
unixTime: func() int64 { return time.Now().Unix() },
|
|
|
|
validSchemes: enode.ValidSchemes,
|
|
|
|
trustedURLs: trustedURLs,
|
|
|
|
vt: vt,
|
|
|
|
ns: nodestate.NewNodeStateMachine(db, []byte(string(dbKey)+"ns:"), clock, serverPoolSetup),
|
|
|
|
}
|
|
|
|
s.recalTimeout()
|
|
|
|
s.mixer = enode.NewFairMix(mixTimeout)
|
|
|
|
knownSelector := lpc.NewWrsIterator(s.ns, sfHasValue, sfDisableSelection, sfiNodeWeight)
|
|
|
|
alwaysConnect := lpc.NewQueueIterator(s.ns, sfAlwaysConnect, sfDisableSelection, true, nil)
|
|
|
|
s.mixSources = append(s.mixSources, knownSelector)
|
|
|
|
s.mixSources = append(s.mixSources, alwaysConnect)
|
|
|
|
if discovery != nil {
|
|
|
|
s.mixSources = append(s.mixSources, discovery)
|
|
|
|
}
|
|
|
|
|
|
|
|
iter := enode.Iterator(s.mixer)
|
|
|
|
if query != nil {
|
|
|
|
iter = s.addPreNegFilter(iter, query)
|
|
|
|
}
|
|
|
|
s.dialIterator = enode.Filter(iter, func(node *enode.Node) bool {
|
|
|
|
s.ns.SetState(node, sfDialing, sfCanDial, 0)
|
|
|
|
s.ns.SetState(node, sfWaitDialTimeout, nodestate.Flags{}, time.Second*10)
|
|
|
|
return true
|
|
|
|
})
|
2018-06-25 03:52:25 -05:00
|
|
|
|
2020-05-22 06:46:34 -05:00
|
|
|
s.ns.SubscribeState(nodestate.MergeFlags(sfWaitDialTimeout, sfConnected), func(n *enode.Node, oldState, newState nodestate.Flags) {
|
|
|
|
if oldState.Equals(sfWaitDialTimeout) && newState.IsEmpty() {
|
|
|
|
// dial timeout, no connection
|
|
|
|
s.setRedialWait(n, dialCost, dialWaitStep)
|
2020-09-14 07:01:18 -05:00
|
|
|
s.ns.SetStateSub(n, nodestate.Flags{}, sfDialing, 0)
|
2018-06-25 03:52:25 -05:00
|
|
|
}
|
2020-05-22 06:46:34 -05:00
|
|
|
})
|
2016-11-17 08:54:24 -06:00
|
|
|
|
2020-05-22 06:46:34 -05:00
|
|
|
s.ns.AddLogMetrics(sfHasValue, sfDisableSelection, "selectable", nil, nil, serverSelectableGauge)
|
|
|
|
s.ns.AddLogMetrics(sfDialing, nodestate.Flags{}, "dialed", serverDialedMeter, nil, nil)
|
|
|
|
s.ns.AddLogMetrics(sfConnected, nodestate.Flags{}, "connected", nil, nil, serverConnectedGauge)
|
|
|
|
return s
|
|
|
|
}
|
|
|
|
|
|
|
|
// addPreNegFilter installs a node filter mechanism that performs a pre-negotiation query.
|
|
|
|
// Nodes that are filtered out and does not appear on the output iterator are put back
|
|
|
|
// into redialWait state.
|
|
|
|
func (s *serverPool) addPreNegFilter(input enode.Iterator, query queryFunc) enode.Iterator {
|
|
|
|
s.fillSet = lpc.NewFillSet(s.ns, input, sfQueried)
|
|
|
|
s.ns.SubscribeState(sfQueried, func(n *enode.Node, oldState, newState nodestate.Flags) {
|
|
|
|
if newState.Equals(sfQueried) {
|
|
|
|
fails := atomic.LoadUint32(&s.queryFails)
|
|
|
|
if fails == maxQueryFails {
|
|
|
|
log.Warn("UDP pre-negotiation query does not seem to work")
|
2016-11-29 23:02:08 -06:00
|
|
|
}
|
2020-05-22 06:46:34 -05:00
|
|
|
if fails > maxQueryFails {
|
|
|
|
fails = maxQueryFails
|
2019-01-24 05:18:26 -06:00
|
|
|
}
|
2020-05-22 06:46:34 -05:00
|
|
|
if rand.Intn(maxQueryFails*2) < int(fails) {
|
|
|
|
// skip pre-negotiation with increasing chance, max 50%
|
|
|
|
// this ensures that the client can operate even if UDP is not working at all
|
2020-09-14 07:01:18 -05:00
|
|
|
s.ns.SetStateSub(n, sfCanDial, nodestate.Flags{}, time.Second*10)
|
2020-05-22 06:46:34 -05:00
|
|
|
// set canDial before resetting queried so that FillSet will not read more
|
|
|
|
// candidates unnecessarily
|
2020-09-14 07:01:18 -05:00
|
|
|
s.ns.SetStateSub(n, nodestate.Flags{}, sfQueried, 0)
|
2020-05-22 06:46:34 -05:00
|
|
|
return
|
2016-11-17 08:54:24 -06:00
|
|
|
}
|
2020-05-22 06:46:34 -05:00
|
|
|
go func() {
|
|
|
|
q := query(n)
|
|
|
|
if q == -1 {
|
|
|
|
atomic.AddUint32(&s.queryFails, 1)
|
|
|
|
} else {
|
|
|
|
atomic.StoreUint32(&s.queryFails, 0)
|
2019-01-24 05:18:26 -06:00
|
|
|
}
|
2020-09-14 07:01:18 -05:00
|
|
|
s.ns.Operation(func() {
|
|
|
|
// we are no longer running in the operation that the callback belongs to, start a new one because of setRedialWait
|
|
|
|
if q == 1 {
|
|
|
|
s.ns.SetStateSub(n, sfCanDial, nodestate.Flags{}, time.Second*10)
|
|
|
|
} else {
|
|
|
|
s.setRedialWait(n, queryCost, queryWaitStep)
|
|
|
|
}
|
|
|
|
s.ns.SetStateSub(n, nodestate.Flags{}, sfQueried, 0)
|
|
|
|
})
|
2018-06-25 03:52:25 -05:00
|
|
|
}()
|
2019-01-24 05:18:26 -06:00
|
|
|
}
|
2020-05-22 06:46:34 -05:00
|
|
|
})
|
|
|
|
return lpc.NewQueueIterator(s.ns, sfCanDial, nodestate.Flags{}, false, func(waiting bool) {
|
|
|
|
if waiting {
|
|
|
|
s.fillSet.SetTarget(preNegLimit)
|
|
|
|
} else {
|
|
|
|
s.fillSet.SetTarget(0)
|
2016-11-17 08:54:24 -06:00
|
|
|
}
|
2020-05-22 06:46:34 -05:00
|
|
|
})
|
2016-11-17 08:54:24 -06:00
|
|
|
}
|
|
|
|
|
2020-05-22 06:46:34 -05:00
|
|
|
// start starts the server pool. Note that NodeStateMachine should be started first.
|
|
|
|
func (s *serverPool) start() {
|
|
|
|
s.ns.Start()
|
|
|
|
for _, iter := range s.mixSources {
|
|
|
|
// add sources to mixer at startup because the mixer instantly tries to read them
|
|
|
|
// which should only happen after NodeStateMachine has been started
|
|
|
|
s.mixer.AddSource(iter)
|
2016-11-17 08:54:24 -06:00
|
|
|
}
|
2020-05-22 06:46:34 -05:00
|
|
|
for _, url := range s.trustedURLs {
|
|
|
|
if node, err := enode.Parse(s.validSchemes, url); err == nil {
|
|
|
|
s.ns.SetState(node, sfAlwaysConnect, nodestate.Flags{}, 0)
|
|
|
|
} else {
|
|
|
|
log.Error("Invalid trusted server URL", "url", url, "error", err)
|
2016-11-17 08:54:24 -06:00
|
|
|
}
|
|
|
|
}
|
2020-05-22 06:46:34 -05:00
|
|
|
unixTime := s.unixTime()
|
2020-09-14 07:01:18 -05:00
|
|
|
s.ns.Operation(func() {
|
|
|
|
s.ns.ForEach(sfHasValue, nodestate.Flags{}, func(node *enode.Node, state nodestate.Flags) {
|
|
|
|
s.calculateWeight(node)
|
|
|
|
if n, ok := s.ns.GetField(node, sfiNodeHistory).(nodeHistory); ok && n.redialWaitEnd > unixTime {
|
|
|
|
wait := n.redialWaitEnd - unixTime
|
|
|
|
lastWait := n.redialWaitEnd - n.redialWaitStart
|
|
|
|
if wait > lastWait {
|
|
|
|
// if the time until expiration is larger than the last suggested
|
|
|
|
// waiting time then the system clock was probably adjusted
|
|
|
|
wait = lastWait
|
|
|
|
}
|
|
|
|
s.ns.SetStateSub(node, sfRedialWait, nodestate.Flags{}, time.Duration(wait)*time.Second)
|
2016-11-17 08:54:24 -06:00
|
|
|
}
|
2020-09-14 07:01:18 -05:00
|
|
|
})
|
all: new p2p node representation (#17643)
Package p2p/enode provides a generalized representation of p2p nodes
which can contain arbitrary information in key/value pairs. It is also
the new home for the node database. The "v4" identity scheme is also
moved here from p2p/enr to remove the dependency on Ethereum crypto from
that package.
Record signature handling is changed significantly. The identity scheme
registry is removed and acceptable schemes must be passed to any method
that needs identity. This means records must now be validated explicitly
after decoding.
The enode API is designed to make signature handling easy and safe: most
APIs around the codebase work with enode.Node, which is a wrapper around
a valid record. Going from enr.Record to enode.Node requires a valid
signature.
* p2p/discover: port to p2p/enode
This ports the discovery code to the new node representation in
p2p/enode. The wire protocol is unchanged, this can be considered a
refactoring change. The Kademlia table can now deal with nodes using an
arbitrary identity scheme. This requires a few incompatible API changes:
- Table.Lookup is not available anymore. It used to take a public key
as argument because v4 protocol requires one. Its replacement is
LookupRandom.
- Table.Resolve takes *enode.Node instead of NodeID. This is also for
v4 protocol compatibility because nodes cannot be looked up by ID
alone.
- Types Node and NodeID are gone. Further commits in the series will be
fixes all over the the codebase to deal with those removals.
* p2p: port to p2p/enode and discovery changes
This adapts package p2p to the changes in p2p/discover. All uses of
discover.Node and discover.NodeID are replaced by their equivalents from
p2p/enode.
New API is added to retrieve the enode.Node instance of a peer. The
behavior of Server.Self with discovery disabled is improved. It now
tries much harder to report a working IP address, falling back to
127.0.0.1 if no suitable address can be determined through other means.
These changes were needed for tests of other packages later in the
series.
* p2p/simulations, p2p/testing: port to p2p/enode
No surprises here, mostly replacements of discover.Node, discover.NodeID
with their new equivalents. The 'interesting' API changes are:
- testing.ProtocolSession tracks complete nodes, not just their IDs.
- adapters.NodeConfig has a new method to create a complete node.
These changes were needed to make swarm tests work.
Note that the NodeID change makes the code incompatible with old
simulation snapshots.
* whisper/whisperv5, whisper/whisperv6: port to p2p/enode
This port was easy because whisper uses []byte for node IDs and
URL strings in the API.
* eth: port to p2p/enode
Again, easy to port because eth uses strings for node IDs and doesn't
care about node information in any way.
* les: port to p2p/enode
Apart from replacing discover.NodeID with enode.ID, most changes are in
the server pool code. It now deals with complete nodes instead
of (Pubkey, IP, Port) triples. The database format is unchanged for now,
but we should probably change it to use the node database later.
* node: port to p2p/enode
This change simply replaces discover.Node and discover.NodeID with their
new equivalents.
* swarm/network: port to p2p/enode
Swarm has its own node address representation, BzzAddr, containing both
an overlay address (the hash of a secp256k1 public key) and an underlay
address (enode:// URL).
There are no changes to the BzzAddr format in this commit, but certain
operations such as creating a BzzAddr from a node ID are now impossible
because node IDs aren't public keys anymore.
Most swarm-related changes in the series remove uses of
NewAddrFromNodeID, replacing it with NewAddr which takes a complete node
as argument. ToOverlayAddr is removed because we can just use the node
ID directly.
2018-09-24 17:59:00 -05:00
|
|
|
})
|
2016-11-17 08:54:24 -06:00
|
|
|
}
|
|
|
|
|
2020-05-22 06:46:34 -05:00
|
|
|
// stop stops the server pool
|
|
|
|
func (s *serverPool) stop() {
|
|
|
|
s.dialIterator.Close()
|
|
|
|
if s.fillSet != nil {
|
|
|
|
s.fillSet.Close()
|
2016-11-17 08:54:24 -06:00
|
|
|
}
|
2020-09-14 07:01:18 -05:00
|
|
|
s.ns.Operation(func() {
|
|
|
|
s.ns.ForEach(sfConnected, nodestate.Flags{}, func(n *enode.Node, state nodestate.Flags) {
|
|
|
|
// recalculate weight of connected nodes in order to update hasValue flag if necessary
|
|
|
|
s.calculateWeight(n)
|
|
|
|
})
|
2020-05-22 06:46:34 -05:00
|
|
|
})
|
|
|
|
s.ns.Stop()
|
|
|
|
}
|
|
|
|
|
|
|
|
// registerPeer implements serverPeerSubscriber
|
|
|
|
func (s *serverPool) registerPeer(p *serverPeer) {
|
|
|
|
s.ns.SetState(p.Node(), sfConnected, sfDialing.Or(sfWaitDialTimeout), 0)
|
|
|
|
nvt := s.vt.Register(p.ID())
|
|
|
|
s.ns.SetField(p.Node(), sfiConnectedStats, nvt.RtStats())
|
|
|
|
p.setValueTracker(s.vt, nvt)
|
|
|
|
p.updateVtParams()
|
|
|
|
}
|
|
|
|
|
|
|
|
// unregisterPeer implements serverPeerSubscriber
|
|
|
|
func (s *serverPool) unregisterPeer(p *serverPeer) {
|
2020-09-14 07:01:18 -05:00
|
|
|
s.ns.Operation(func() {
|
|
|
|
s.setRedialWait(p.Node(), dialCost, dialWaitStep)
|
|
|
|
s.ns.SetStateSub(p.Node(), nodestate.Flags{}, sfConnected, 0)
|
|
|
|
s.ns.SetFieldSub(p.Node(), sfiConnectedStats, nil)
|
|
|
|
})
|
2020-05-22 06:46:34 -05:00
|
|
|
s.vt.Unregister(p.ID())
|
|
|
|
p.setValueTracker(nil, nil)
|
|
|
|
}
|
|
|
|
|
|
|
|
// recalTimeout calculates the current recommended timeout. This value is used by
|
|
|
|
// the client as a "soft timeout" value. It also affects the service value calculation
|
|
|
|
// of individual nodes.
|
|
|
|
func (s *serverPool) recalTimeout() {
|
|
|
|
// Use cached result if possible, avoid recalculating too frequently.
|
|
|
|
s.timeoutLock.RLock()
|
|
|
|
refreshed := s.timeoutRefreshed
|
|
|
|
s.timeoutLock.RUnlock()
|
|
|
|
now := s.clock.Now()
|
|
|
|
if refreshed != 0 && time.Duration(now-refreshed) < timeoutRefresh {
|
|
|
|
return
|
2016-11-17 08:54:24 -06:00
|
|
|
}
|
2020-05-22 06:46:34 -05:00
|
|
|
// Cached result is stale, recalculate a new one.
|
|
|
|
rts := s.vt.RtStats()
|
2016-11-17 08:54:24 -06:00
|
|
|
|
2020-05-22 06:46:34 -05:00
|
|
|
// Add a fake statistic here. It is an easy way to initialize with some
|
|
|
|
// conservative values when the database is new. As soon as we have a
|
|
|
|
// considerable amount of real stats this small value won't matter.
|
|
|
|
rts.Add(time.Second*2, 10, s.vt.StatsExpFactor())
|
2016-11-17 08:54:24 -06:00
|
|
|
|
2020-05-22 06:46:34 -05:00
|
|
|
// Use either 10% failure rate timeout or twice the median response time
|
|
|
|
// as the recommended timeout.
|
|
|
|
timeout := minTimeout
|
|
|
|
if t := rts.Timeout(0.1); t > timeout {
|
|
|
|
timeout = t
|
2016-11-17 08:54:24 -06:00
|
|
|
}
|
2020-05-22 06:46:34 -05:00
|
|
|
if t := rts.Timeout(0.5) * 2; t > timeout {
|
|
|
|
timeout = t
|
2016-11-17 08:54:24 -06:00
|
|
|
}
|
2020-05-22 06:46:34 -05:00
|
|
|
s.timeoutLock.Lock()
|
|
|
|
if s.timeout != timeout {
|
|
|
|
s.timeout = timeout
|
|
|
|
s.timeWeights = lpc.TimeoutWeights(s.timeout)
|
2016-11-17 08:54:24 -06:00
|
|
|
|
2020-05-22 06:46:34 -05:00
|
|
|
suggestedTimeoutGauge.Update(int64(s.timeout / time.Millisecond))
|
|
|
|
totalValueGauge.Update(int64(rts.Value(s.timeWeights, s.vt.StatsExpFactor())))
|
2016-11-17 08:54:24 -06:00
|
|
|
}
|
2020-05-22 06:46:34 -05:00
|
|
|
s.timeoutRefreshed = now
|
|
|
|
s.timeoutLock.Unlock()
|
2016-11-17 08:54:24 -06:00
|
|
|
}
|
|
|
|
|
2020-05-22 06:46:34 -05:00
|
|
|
// getTimeout returns the recommended request timeout.
|
|
|
|
func (s *serverPool) getTimeout() time.Duration {
|
|
|
|
s.recalTimeout()
|
|
|
|
s.timeoutLock.RLock()
|
|
|
|
defer s.timeoutLock.RUnlock()
|
|
|
|
return s.timeout
|
2016-11-17 08:54:24 -06:00
|
|
|
}
|
|
|
|
|
2020-05-22 06:46:34 -05:00
|
|
|
// getTimeoutAndWeight returns the recommended request timeout as well as the
|
|
|
|
// response time weight which is necessary to calculate service value.
|
|
|
|
func (s *serverPool) getTimeoutAndWeight() (time.Duration, lpc.ResponseTimeWeights) {
|
|
|
|
s.recalTimeout()
|
|
|
|
s.timeoutLock.RLock()
|
|
|
|
defer s.timeoutLock.RUnlock()
|
|
|
|
return s.timeout, s.timeWeights
|
2016-11-17 08:54:24 -06:00
|
|
|
}
|
|
|
|
|
2020-05-22 06:46:34 -05:00
|
|
|
// addDialCost adds the given amount of dial cost to the node history and returns the current
|
|
|
|
// amount of total dial cost
|
|
|
|
func (s *serverPool) addDialCost(n *nodeHistory, amount int64) uint64 {
|
|
|
|
logOffset := s.vt.StatsExpirer().LogOffset(s.clock.Now())
|
|
|
|
if amount > 0 {
|
|
|
|
n.dialCost.Add(amount, logOffset)
|
2016-11-17 08:54:24 -06:00
|
|
|
}
|
2020-05-22 06:46:34 -05:00
|
|
|
totalDialCost := n.dialCost.Value(logOffset)
|
|
|
|
if totalDialCost < dialCost {
|
|
|
|
totalDialCost = dialCost
|
2016-11-17 08:54:24 -06:00
|
|
|
}
|
2020-05-22 06:46:34 -05:00
|
|
|
return totalDialCost
|
2016-11-17 08:54:24 -06:00
|
|
|
}
|
|
|
|
|
2020-05-22 06:46:34 -05:00
|
|
|
// serviceValue returns the service value accumulated in this session and in total
|
|
|
|
func (s *serverPool) serviceValue(node *enode.Node) (sessionValue, totalValue float64) {
|
|
|
|
nvt := s.vt.GetNode(node.ID())
|
|
|
|
if nvt == nil {
|
|
|
|
return 0, 0
|
2016-11-17 08:54:24 -06:00
|
|
|
}
|
2020-05-22 06:46:34 -05:00
|
|
|
currentStats := nvt.RtStats()
|
|
|
|
_, timeWeights := s.getTimeoutAndWeight()
|
|
|
|
expFactor := s.vt.StatsExpFactor()
|
2016-11-17 08:54:24 -06:00
|
|
|
|
2020-05-22 06:46:34 -05:00
|
|
|
totalValue = currentStats.Value(timeWeights, expFactor)
|
|
|
|
if connStats, ok := s.ns.GetField(node, sfiConnectedStats).(lpc.ResponseTimeStats); ok {
|
|
|
|
diff := currentStats
|
|
|
|
diff.SubStats(&connStats)
|
|
|
|
sessionValue = diff.Value(timeWeights, expFactor)
|
|
|
|
sessionValueMeter.Mark(int64(sessionValue))
|
2016-11-17 08:54:24 -06:00
|
|
|
}
|
2020-05-22 06:46:34 -05:00
|
|
|
return
|
2016-11-17 08:54:24 -06:00
|
|
|
}
|
|
|
|
|
2020-05-22 06:46:34 -05:00
|
|
|
// updateWeight calculates the node weight and updates the nodeWeight field and the
|
|
|
|
// hasValue flag. It also saves the node state if necessary.
|
2020-09-14 07:01:18 -05:00
|
|
|
// Note: this function should run inside a NodeStateMachine operation
|
2020-05-22 06:46:34 -05:00
|
|
|
func (s *serverPool) updateWeight(node *enode.Node, totalValue float64, totalDialCost uint64) {
|
|
|
|
weight := uint64(totalValue * nodeWeightMul / float64(totalDialCost))
|
|
|
|
if weight >= nodeWeightThreshold {
|
2020-09-14 07:01:18 -05:00
|
|
|
s.ns.SetStateSub(node, sfHasValue, nodestate.Flags{}, 0)
|
|
|
|
s.ns.SetFieldSub(node, sfiNodeWeight, weight)
|
2016-11-17 08:54:24 -06:00
|
|
|
} else {
|
2020-09-14 07:01:18 -05:00
|
|
|
s.ns.SetStateSub(node, nodestate.Flags{}, sfHasValue, 0)
|
|
|
|
s.ns.SetFieldSub(node, sfiNodeWeight, nil)
|
|
|
|
s.ns.SetFieldSub(node, sfiNodeHistory, nil)
|
2020-05-22 06:46:34 -05:00
|
|
|
}
|
|
|
|
s.ns.Persist(node) // saved if node history or hasValue changed
|
|
|
|
}
|
|
|
|
|
|
|
|
// setRedialWait calculates and sets the redialWait timeout based on the service value
|
|
|
|
// and dial cost accumulated during the last session/attempt and in total.
|
|
|
|
// The waiting time is raised exponentially if no service value has been received in order
|
|
|
|
// to prevent dialing an unresponsive node frequently for a very long time just because it
|
|
|
|
// was useful in the past. It can still be occasionally dialed though and once it provides
|
|
|
|
// a significant amount of service value again its waiting time is quickly reduced or reset
|
|
|
|
// to the minimum.
|
|
|
|
// Note: node weight is also recalculated and updated by this function.
|
2020-09-14 07:01:18 -05:00
|
|
|
// Note 2: this function should run inside a NodeStateMachine operation
|
2020-05-22 06:46:34 -05:00
|
|
|
func (s *serverPool) setRedialWait(node *enode.Node, addDialCost int64, waitStep float64) {
|
|
|
|
n, _ := s.ns.GetField(node, sfiNodeHistory).(nodeHistory)
|
|
|
|
sessionValue, totalValue := s.serviceValue(node)
|
|
|
|
totalDialCost := s.addDialCost(&n, addDialCost)
|
|
|
|
|
|
|
|
// if the current dial session has yielded at least the average value/dial cost ratio
|
|
|
|
// then the waiting time should be reset to the minimum. If the session value
|
|
|
|
// is below average but still positive then timeout is limited to the ratio of
|
|
|
|
// average / current service value multiplied by the minimum timeout. If the attempt
|
|
|
|
// was unsuccessful then timeout is raised exponentially without limitation.
|
|
|
|
// Note: dialCost is used in the formula below even if dial was not attempted at all
|
|
|
|
// because the pre-negotiation query did not return a positive result. In this case
|
|
|
|
// the ratio has no meaning anyway and waitFactor is always raised, though in smaller
|
|
|
|
// steps because queries are cheaper and therefore we can allow more failed attempts.
|
|
|
|
unixTime := s.unixTime()
|
|
|
|
plannedTimeout := float64(n.redialWaitEnd - n.redialWaitStart) // last planned redialWait timeout
|
|
|
|
var actualWait float64 // actual waiting time elapsed
|
|
|
|
if unixTime > n.redialWaitEnd {
|
|
|
|
// the planned timeout has elapsed
|
|
|
|
actualWait = plannedTimeout
|
|
|
|
} else {
|
|
|
|
// if the node was redialed earlier then we do not raise the planned timeout
|
|
|
|
// exponentially because that could lead to the timeout rising very high in
|
|
|
|
// a short amount of time
|
|
|
|
// Note that in case of an early redial actualWait also includes the dial
|
|
|
|
// timeout or connection time of the last attempt but it still serves its
|
|
|
|
// purpose of preventing the timeout rising quicker than linearly as a function
|
|
|
|
// of total time elapsed without a successful connection.
|
|
|
|
actualWait = float64(unixTime - n.redialWaitStart)
|
|
|
|
}
|
|
|
|
// raise timeout exponentially if the last planned timeout has elapsed
|
|
|
|
// (use at least the last planned timeout otherwise)
|
|
|
|
nextTimeout := actualWait * waitStep
|
|
|
|
if plannedTimeout > nextTimeout {
|
|
|
|
nextTimeout = plannedTimeout
|
|
|
|
}
|
|
|
|
// we reduce the waiting time if the server has provided service value during the
|
|
|
|
// connection (but never under the minimum)
|
|
|
|
a := totalValue * dialCost * float64(minRedialWait)
|
|
|
|
b := float64(totalDialCost) * sessionValue
|
|
|
|
if a < b*nextTimeout {
|
|
|
|
nextTimeout = a / b
|
|
|
|
}
|
|
|
|
if nextTimeout < minRedialWait {
|
|
|
|
nextTimeout = minRedialWait
|
|
|
|
}
|
|
|
|
wait := time.Duration(float64(time.Second) * nextTimeout)
|
|
|
|
if wait < waitThreshold {
|
|
|
|
n.redialWaitStart = unixTime
|
|
|
|
n.redialWaitEnd = unixTime + int64(nextTimeout)
|
2020-09-14 07:01:18 -05:00
|
|
|
s.ns.SetFieldSub(node, sfiNodeHistory, n)
|
|
|
|
s.ns.SetStateSub(node, sfRedialWait, nodestate.Flags{}, wait)
|
2020-05-22 06:46:34 -05:00
|
|
|
s.updateWeight(node, totalValue, totalDialCost)
|
|
|
|
} else {
|
|
|
|
// discard known node statistics if waiting time is very long because the node
|
|
|
|
// hasn't been responsive for a very long time
|
2020-09-14 07:01:18 -05:00
|
|
|
s.ns.SetFieldSub(node, sfiNodeHistory, nil)
|
|
|
|
s.ns.SetFieldSub(node, sfiNodeWeight, nil)
|
|
|
|
s.ns.SetStateSub(node, nodestate.Flags{}, sfHasValue, 0)
|
2020-05-22 06:46:34 -05:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// calculateWeight calculates and sets the node weight without altering the node history.
|
|
|
|
// This function should be called during startup and shutdown only, otherwise setRedialWait
|
|
|
|
// will keep the weights updated as the underlying statistics are adjusted.
|
2020-09-14 07:01:18 -05:00
|
|
|
// Note: this function should run inside a NodeStateMachine operation
|
2020-05-22 06:46:34 -05:00
|
|
|
func (s *serverPool) calculateWeight(node *enode.Node) {
|
|
|
|
n, _ := s.ns.GetField(node, sfiNodeHistory).(nodeHistory)
|
|
|
|
_, totalValue := s.serviceValue(node)
|
|
|
|
totalDialCost := s.addDialCost(&n, 0)
|
|
|
|
s.updateWeight(node, totalValue, totalDialCost)
|
2016-11-17 08:54:24 -06:00
|
|
|
}
|