p2p, p2p/discover: add dial metrics (#27621)
This PR adds metrics for p2p dialing, which gives us visibility into the quality of the dial candidates returned by our discovery methods.
This commit is contained in:
parent
ea782809f7
commit
cbf2579691
|
@ -38,6 +38,7 @@ import (
|
||||||
"github.com/ethereum/go-ethereum/ethdb"
|
"github.com/ethereum/go-ethereum/ethdb"
|
||||||
"github.com/ethereum/go-ethereum/event"
|
"github.com/ethereum/go-ethereum/event"
|
||||||
"github.com/ethereum/go-ethereum/log"
|
"github.com/ethereum/go-ethereum/log"
|
||||||
|
"github.com/ethereum/go-ethereum/metrics"
|
||||||
"github.com/ethereum/go-ethereum/p2p"
|
"github.com/ethereum/go-ethereum/p2p"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -424,6 +425,13 @@ func (h *handler) runSnapExtension(peer *snap.Peer, handler snap.Handler) error
|
||||||
defer h.peerWG.Done()
|
defer h.peerWG.Done()
|
||||||
|
|
||||||
if err := h.peers.registerSnapExtension(peer); err != nil {
|
if err := h.peers.registerSnapExtension(peer); err != nil {
|
||||||
|
if metrics.Enabled {
|
||||||
|
if peer.Inbound() {
|
||||||
|
snap.IngressRegistrationErrorMeter.Mark(1)
|
||||||
|
} else {
|
||||||
|
snap.EgressRegistrationErrorMeter.Mark(1)
|
||||||
|
}
|
||||||
|
}
|
||||||
peer.Log().Warn("Snapshot extension registration failed", "err", err)
|
peer.Log().Warn("Snapshot extension registration failed", "err", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,12 +17,14 @@
|
||||||
package eth
|
package eth
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math/big"
|
"math/big"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/common"
|
"github.com/ethereum/go-ethereum/common"
|
||||||
"github.com/ethereum/go-ethereum/core/forkid"
|
"github.com/ethereum/go-ethereum/core/forkid"
|
||||||
|
"github.com/ethereum/go-ethereum/metrics"
|
||||||
"github.com/ethereum/go-ethereum/p2p"
|
"github.com/ethereum/go-ethereum/p2p"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -59,9 +61,11 @@ func (p *Peer) Handshake(network uint64, td *big.Int, head common.Hash, genesis
|
||||||
select {
|
select {
|
||||||
case err := <-errc:
|
case err := <-errc:
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
markError(p, err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
case <-timeout.C:
|
case <-timeout.C:
|
||||||
|
markError(p, p2p.DiscReadTimeout)
|
||||||
return p2p.DiscReadTimeout
|
return p2p.DiscReadTimeout
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -105,3 +109,25 @@ func (p *Peer) readStatus(network uint64, status *StatusPacket, genesis common.H
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// markError registers the error with the corresponding metric.
|
||||||
|
func markError(p *Peer, err error) {
|
||||||
|
if !metrics.Enabled {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
m := meters.get(p.Inbound())
|
||||||
|
switch errors.Unwrap(err) {
|
||||||
|
case errNetworkIDMismatch:
|
||||||
|
m.networkIDMismatch.Mark(1)
|
||||||
|
case errProtocolVersionMismatch:
|
||||||
|
m.protocolVersionMismatch.Mark(1)
|
||||||
|
case errGenesisMismatch:
|
||||||
|
m.genesisMismatch.Mark(1)
|
||||||
|
case errForkIDRejected:
|
||||||
|
m.forkidRejected.Mark(1)
|
||||||
|
case p2p.DiscReadTimeout:
|
||||||
|
m.timeoutError.Mark(1)
|
||||||
|
default:
|
||||||
|
m.peerError.Mark(1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,81 @@
|
||||||
|
// Copyright 2023 The go-ethereum Authors
|
||||||
|
// This file is part of the go-ethereum library.
|
||||||
|
//
|
||||||
|
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Lesser General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
//
|
||||||
|
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Lesser General Public License for more details.
|
||||||
|
//
|
||||||
|
// You should have received a copy of the GNU Lesser General Public License
|
||||||
|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
package eth
|
||||||
|
|
||||||
|
import "github.com/ethereum/go-ethereum/metrics"
|
||||||
|
|
||||||
|
// meters stores ingress and egress handshake meters.
|
||||||
|
var meters bidirectionalMeters
|
||||||
|
|
||||||
|
// bidirectionalMeters stores ingress and egress handshake meters.
|
||||||
|
type bidirectionalMeters struct {
|
||||||
|
ingress *hsMeters
|
||||||
|
egress *hsMeters
|
||||||
|
}
|
||||||
|
|
||||||
|
// get returns the corresponding meter depending if ingress or egress is
|
||||||
|
// desired.
|
||||||
|
func (h *bidirectionalMeters) get(ingress bool) *hsMeters {
|
||||||
|
if ingress {
|
||||||
|
return h.ingress
|
||||||
|
}
|
||||||
|
return h.egress
|
||||||
|
}
|
||||||
|
|
||||||
|
// hsMeters is a collection of meters which track metrics related to the
|
||||||
|
// eth subprotocol handshake.
|
||||||
|
type hsMeters struct {
|
||||||
|
// peerError measures the number of errors related to incorrect peer
|
||||||
|
// behaviour, such as invalid message code, size, encoding, etc.
|
||||||
|
peerError metrics.Meter
|
||||||
|
|
||||||
|
// timeoutError measures the number of timeouts.
|
||||||
|
timeoutError metrics.Meter
|
||||||
|
|
||||||
|
// networkIDMismatch measures the number of network id mismatch errors.
|
||||||
|
networkIDMismatch metrics.Meter
|
||||||
|
|
||||||
|
// protocolVersionMismatch measures the number of differing protocol
|
||||||
|
// versions.
|
||||||
|
protocolVersionMismatch metrics.Meter
|
||||||
|
|
||||||
|
// genesisMismatch measures the number of differing genesises.
|
||||||
|
genesisMismatch metrics.Meter
|
||||||
|
|
||||||
|
// forkidRejected measures the number of differing forkids.
|
||||||
|
forkidRejected metrics.Meter
|
||||||
|
}
|
||||||
|
|
||||||
|
// newHandshakeMeters registers and returns handshake meters for the given
|
||||||
|
// base.
|
||||||
|
func newHandshakeMeters(base string) *hsMeters {
|
||||||
|
return &hsMeters{
|
||||||
|
peerError: metrics.NewRegisteredMeter(base+"error/peer", nil),
|
||||||
|
timeoutError: metrics.NewRegisteredMeter(base+"error/timeout", nil),
|
||||||
|
networkIDMismatch: metrics.NewRegisteredMeter(base+"error/network", nil),
|
||||||
|
protocolVersionMismatch: metrics.NewRegisteredMeter(base+"error/version", nil),
|
||||||
|
genesisMismatch: metrics.NewRegisteredMeter(base+"error/genesis", nil),
|
||||||
|
forkidRejected: metrics.NewRegisteredMeter(base+"error/forkid", nil),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
meters = bidirectionalMeters{
|
||||||
|
ingress: newHandshakeMeters("eth/protocols/eth/ingress/handshake/"),
|
||||||
|
egress: newHandshakeMeters("eth/protocols/eth/egress/handshake/"),
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,29 @@
|
||||||
|
// Copyright 2023 The go-ethereum Authors
|
||||||
|
// This file is part of the go-ethereum library.
|
||||||
|
//
|
||||||
|
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Lesser General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
//
|
||||||
|
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Lesser General Public License for more details.
|
||||||
|
//
|
||||||
|
// You should have received a copy of the GNU Lesser General Public License
|
||||||
|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
package snap
|
||||||
|
|
||||||
|
import (
|
||||||
|
metrics "github.com/ethereum/go-ethereum/metrics"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
ingressRegistrationErrorName = "eth/protocols/snap/ingress/registration/error"
|
||||||
|
egressRegistrationErrorName = "eth/protocols/snap/egress/registration/error"
|
||||||
|
|
||||||
|
IngressRegistrationErrorMeter = metrics.NewRegisteredMeter(ingressRegistrationErrorName, nil)
|
||||||
|
EgressRegistrationErrorMeter = metrics.NewRegisteredMeter(egressRegistrationErrorName, nil)
|
||||||
|
)
|
|
@ -521,13 +521,14 @@ func (t *dialTask) resolve(d *dialScheduler) bool {
|
||||||
|
|
||||||
// dial performs the actual connection attempt.
|
// dial performs the actual connection attempt.
|
||||||
func (t *dialTask) dial(d *dialScheduler, dest *enode.Node) error {
|
func (t *dialTask) dial(d *dialScheduler, dest *enode.Node) error {
|
||||||
|
dialMeter.Mark(1)
|
||||||
fd, err := d.dialer.Dial(d.ctx, t.dest)
|
fd, err := d.dialer.Dial(d.ctx, t.dest)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
d.log.Trace("Dial error", "id", t.dest.ID(), "addr", nodeAddr(t.dest), "conn", t.flags, "err", cleanupDialErr(err))
|
d.log.Trace("Dial error", "id", t.dest.ID(), "addr", nodeAddr(t.dest), "conn", t.flags, "err", cleanupDialErr(err))
|
||||||
|
dialConnectionError.Mark(1)
|
||||||
return &dialError{err}
|
return &dialError{err}
|
||||||
}
|
}
|
||||||
mfd := newMeteredConn(fd, false, &net.TCPAddr{IP: dest.IP(), Port: dest.TCP()})
|
return d.setupFunc(newMeteredConn(fd), t.flags, dest)
|
||||||
return d.setupFunc(mfd, t.flags, dest)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *dialTask) String() string {
|
func (t *dialTask) String() string {
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
package discover
|
package discover
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/metrics"
|
"github.com/ethereum/go-ethereum/metrics"
|
||||||
|
@ -32,10 +33,17 @@ const (
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
bucketsCounter []metrics.Counter
|
||||||
ingressTrafficMeter = metrics.NewRegisteredMeter(ingressMeterName, nil)
|
ingressTrafficMeter = metrics.NewRegisteredMeter(ingressMeterName, nil)
|
||||||
egressTrafficMeter = metrics.NewRegisteredMeter(egressMeterName, nil)
|
egressTrafficMeter = metrics.NewRegisteredMeter(egressMeterName, nil)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
for i := 0; i < nBuckets; i++ {
|
||||||
|
bucketsCounter = append(bucketsCounter, metrics.NewRegisteredCounter(fmt.Sprintf("%s/bucket/%d/count", moduleName, i), nil))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// meteredConn is a wrapper around a net.UDPConn that meters both the
|
// meteredConn is a wrapper around a net.UDPConn that meters both the
|
||||||
// inbound and outbound network traffic.
|
// inbound and outbound network traffic.
|
||||||
type meteredUdpConn struct {
|
type meteredUdpConn struct {
|
||||||
|
|
|
@ -34,6 +34,7 @@ import (
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/common"
|
"github.com/ethereum/go-ethereum/common"
|
||||||
"github.com/ethereum/go-ethereum/log"
|
"github.com/ethereum/go-ethereum/log"
|
||||||
|
"github.com/ethereum/go-ethereum/metrics"
|
||||||
"github.com/ethereum/go-ethereum/p2p/enode"
|
"github.com/ethereum/go-ethereum/p2p/enode"
|
||||||
"github.com/ethereum/go-ethereum/p2p/netutil"
|
"github.com/ethereum/go-ethereum/p2p/netutil"
|
||||||
)
|
)
|
||||||
|
@ -80,7 +81,8 @@ type Table struct {
|
||||||
closeReq chan struct{}
|
closeReq chan struct{}
|
||||||
closed chan struct{}
|
closed chan struct{}
|
||||||
|
|
||||||
nodeAddedHook func(*node) // for testing
|
nodeAddedHook func(*bucket, *node)
|
||||||
|
nodeRemovedHook func(*bucket, *node)
|
||||||
}
|
}
|
||||||
|
|
||||||
// transport is implemented by the UDP transports.
|
// transport is implemented by the UDP transports.
|
||||||
|
@ -98,6 +100,7 @@ type bucket struct {
|
||||||
entries []*node // live entries, sorted by time of last contact
|
entries []*node // live entries, sorted by time of last contact
|
||||||
replacements []*node // recently seen nodes to be used if revalidation fails
|
replacements []*node // recently seen nodes to be used if revalidation fails
|
||||||
ips netutil.DistinctNetSet
|
ips netutil.DistinctNetSet
|
||||||
|
index int
|
||||||
}
|
}
|
||||||
|
|
||||||
func newTable(t transport, db *enode.DB, cfg Config) (*Table, error) {
|
func newTable(t transport, db *enode.DB, cfg Config) (*Table, error) {
|
||||||
|
@ -119,7 +122,8 @@ func newTable(t transport, db *enode.DB, cfg Config) (*Table, error) {
|
||||||
}
|
}
|
||||||
for i := range tab.buckets {
|
for i := range tab.buckets {
|
||||||
tab.buckets[i] = &bucket{
|
tab.buckets[i] = &bucket{
|
||||||
ips: netutil.DistinctNetSet{Subnet: bucketSubnet, Limit: bucketIPLimit},
|
index: i,
|
||||||
|
ips: netutil.DistinctNetSet{Subnet: bucketSubnet, Limit: bucketIPLimit},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
tab.seedRand()
|
tab.seedRand()
|
||||||
|
@ -128,6 +132,22 @@ func newTable(t transport, db *enode.DB, cfg Config) (*Table, error) {
|
||||||
return tab, nil
|
return tab, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newMeteredTable(t transport, db *enode.DB, cfg Config) (*Table, error) {
|
||||||
|
tab, err := newTable(t, db, cfg)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if metrics.Enabled {
|
||||||
|
tab.nodeAddedHook = func(b *bucket, n *node) {
|
||||||
|
bucketsCounter[b.index].Inc(1)
|
||||||
|
}
|
||||||
|
tab.nodeRemovedHook = func(b *bucket, n *node) {
|
||||||
|
bucketsCounter[b.index].Dec(1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return tab, nil
|
||||||
|
}
|
||||||
|
|
||||||
// Nodes returns all nodes contained in the table.
|
// Nodes returns all nodes contained in the table.
|
||||||
func (tab *Table) Nodes() []*enode.Node {
|
func (tab *Table) Nodes() []*enode.Node {
|
||||||
if !tab.isInitDone() {
|
if !tab.isInitDone() {
|
||||||
|
@ -495,7 +515,7 @@ func (tab *Table) addSeenNode(n *node) {
|
||||||
n.addedAt = time.Now()
|
n.addedAt = time.Now()
|
||||||
|
|
||||||
if tab.nodeAddedHook != nil {
|
if tab.nodeAddedHook != nil {
|
||||||
tab.nodeAddedHook(n)
|
tab.nodeAddedHook(b, n)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -539,7 +559,7 @@ func (tab *Table) addVerifiedNode(n *node) {
|
||||||
n.addedAt = time.Now()
|
n.addedAt = time.Now()
|
||||||
|
|
||||||
if tab.nodeAddedHook != nil {
|
if tab.nodeAddedHook != nil {
|
||||||
tab.nodeAddedHook(n)
|
tab.nodeAddedHook(b, n)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -638,8 +658,16 @@ func (tab *Table) bumpInBucket(b *bucket, n *node) bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tab *Table) deleteInBucket(b *bucket, n *node) {
|
func (tab *Table) deleteInBucket(b *bucket, n *node) {
|
||||||
|
// Check if the node is actually in the bucket so the removed hook
|
||||||
|
// isn't called multiple times for the same node.
|
||||||
|
if !contains(b.entries, n.ID()) {
|
||||||
|
return
|
||||||
|
}
|
||||||
b.entries = deleteNode(b.entries, n)
|
b.entries = deleteNode(b.entries, n)
|
||||||
tab.removeIP(b, n.IP())
|
tab.removeIP(b, n.IP())
|
||||||
|
if tab.nodeRemovedHook != nil {
|
||||||
|
tab.nodeRemovedHook(b, n)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func contains(ns []*node, id enode.ID) bool {
|
func contains(ns []*node, id enode.ID) bool {
|
||||||
|
|
|
@ -142,7 +142,7 @@ func ListenV4(c UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv4, error) {
|
||||||
log: cfg.Log,
|
log: cfg.Log,
|
||||||
}
|
}
|
||||||
|
|
||||||
tab, err := newTable(t, ln.Database(), cfg)
|
tab, err := newMeteredTable(t, ln.Database(), cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -394,7 +394,7 @@ func TestUDPv4_pingMatchIP(t *testing.T) {
|
||||||
func TestUDPv4_successfulPing(t *testing.T) {
|
func TestUDPv4_successfulPing(t *testing.T) {
|
||||||
test := newUDPTest(t)
|
test := newUDPTest(t)
|
||||||
added := make(chan *node, 1)
|
added := make(chan *node, 1)
|
||||||
test.table.nodeAddedHook = func(n *node) { added <- n }
|
test.table.nodeAddedHook = func(b *bucket, n *node) { added <- n }
|
||||||
defer test.close()
|
defer test.close()
|
||||||
|
|
||||||
// The remote side sends a ping packet to initiate the exchange.
|
// The remote side sends a ping packet to initiate the exchange.
|
||||||
|
|
|
@ -174,7 +174,7 @@ func newUDPv5(conn UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv5, error) {
|
||||||
cancelCloseCtx: cancelCloseCtx,
|
cancelCloseCtx: cancelCloseCtx,
|
||||||
}
|
}
|
||||||
t.talk = newTalkSystem(t)
|
t.talk = newTalkSystem(t)
|
||||||
tab, err := newTable(t, t.db, cfg)
|
tab, err := newMeteredTable(t, t.db, cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,30 +19,86 @@
|
||||||
package p2p
|
package p2p
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"net"
|
"net"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/metrics"
|
"github.com/ethereum/go-ethereum/metrics"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
// HandleHistName is the prefix of the per-packet serving time histograms.
|
||||||
|
HandleHistName = "p2p/handle"
|
||||||
|
|
||||||
// ingressMeterName is the prefix of the per-packet inbound metrics.
|
// ingressMeterName is the prefix of the per-packet inbound metrics.
|
||||||
ingressMeterName = "p2p/ingress"
|
ingressMeterName = "p2p/ingress"
|
||||||
|
|
||||||
// egressMeterName is the prefix of the per-packet outbound metrics.
|
// egressMeterName is the prefix of the per-packet outbound metrics.
|
||||||
egressMeterName = "p2p/egress"
|
egressMeterName = "p2p/egress"
|
||||||
|
|
||||||
// HandleHistName is the prefix of the per-packet serving time histograms.
|
|
||||||
HandleHistName = "p2p/handle"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
ingressConnectMeter = metrics.NewRegisteredMeter("p2p/serves", nil)
|
activePeerGauge metrics.Gauge = metrics.NilGauge{}
|
||||||
ingressTrafficMeter = metrics.NewRegisteredMeter(ingressMeterName, nil)
|
|
||||||
egressConnectMeter = metrics.NewRegisteredMeter("p2p/dials", nil)
|
ingressTrafficMeter = metrics.NewRegisteredMeter("p2p/ingress", nil)
|
||||||
egressTrafficMeter = metrics.NewRegisteredMeter(egressMeterName, nil)
|
egressTrafficMeter = metrics.NewRegisteredMeter("p2p/egress", nil)
|
||||||
activePeerGauge = metrics.NewRegisteredGauge("p2p/peers", nil)
|
|
||||||
|
// general ingress/egress connection meters
|
||||||
|
serveMeter metrics.Meter = metrics.NilMeter{}
|
||||||
|
serveSuccessMeter metrics.Meter = metrics.NilMeter{}
|
||||||
|
dialMeter metrics.Meter = metrics.NilMeter{}
|
||||||
|
dialSuccessMeter metrics.Meter = metrics.NilMeter{}
|
||||||
|
dialConnectionError metrics.Meter = metrics.NilMeter{}
|
||||||
|
|
||||||
|
// handshake error meters
|
||||||
|
dialTooManyPeers = metrics.NewRegisteredMeter("p2p/dials/error/saturated", nil)
|
||||||
|
dialAlreadyConnected = metrics.NewRegisteredMeter("p2p/dials/error/known", nil)
|
||||||
|
dialSelf = metrics.NewRegisteredMeter("p2p/dials/error/self", nil)
|
||||||
|
dialUselessPeer = metrics.NewRegisteredMeter("p2p/dials/error/useless", nil)
|
||||||
|
dialUnexpectedIdentity = metrics.NewRegisteredMeter("p2p/dials/error/id/unexpected", nil)
|
||||||
|
dialEncHandshakeError = metrics.NewRegisteredMeter("p2p/dials/error/rlpx/enc", nil)
|
||||||
|
dialProtoHandshakeError = metrics.NewRegisteredMeter("p2p/dials/error/rlpx/proto", nil)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
if !metrics.Enabled {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
activePeerGauge = metrics.NewRegisteredGauge("p2p/peers", nil)
|
||||||
|
serveMeter = metrics.NewRegisteredMeter("p2p/serves", nil)
|
||||||
|
serveSuccessMeter = metrics.NewRegisteredMeter("p2p/serves/success", nil)
|
||||||
|
dialMeter = metrics.NewRegisteredMeter("p2p/dials", nil)
|
||||||
|
dialSuccessMeter = metrics.NewRegisteredMeter("p2p/dials/success", nil)
|
||||||
|
dialConnectionError = metrics.NewRegisteredMeter("p2p/dials/error/connection", nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
// markDialError matches errors that occur while setting up a dial connection
|
||||||
|
// to the corresponding meter.
|
||||||
|
func markDialError(err error) {
|
||||||
|
if !metrics.Enabled {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err2 := errors.Unwrap(err); err2 != nil {
|
||||||
|
err = err2
|
||||||
|
}
|
||||||
|
switch err {
|
||||||
|
case DiscTooManyPeers:
|
||||||
|
dialTooManyPeers.Mark(1)
|
||||||
|
case DiscAlreadyConnected:
|
||||||
|
dialAlreadyConnected.Mark(1)
|
||||||
|
case DiscSelf:
|
||||||
|
dialSelf.Mark(1)
|
||||||
|
case DiscUselessPeer:
|
||||||
|
dialUselessPeer.Mark(1)
|
||||||
|
case DiscUnexpectedIdentity:
|
||||||
|
dialUnexpectedIdentity.Mark(1)
|
||||||
|
case errEncHandshakeError:
|
||||||
|
dialEncHandshakeError.Mark(1)
|
||||||
|
case errProtoHandshakeError:
|
||||||
|
dialProtoHandshakeError.Mark(1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// meteredConn is a wrapper around a net.Conn that meters both the
|
// meteredConn is a wrapper around a net.Conn that meters both the
|
||||||
// inbound and outbound network traffic.
|
// inbound and outbound network traffic.
|
||||||
type meteredConn struct {
|
type meteredConn struct {
|
||||||
|
@ -52,18 +108,10 @@ type meteredConn struct {
|
||||||
// newMeteredConn creates a new metered connection, bumps the ingress or egress
|
// newMeteredConn creates a new metered connection, bumps the ingress or egress
|
||||||
// connection meter and also increases the metered peer count. If the metrics
|
// connection meter and also increases the metered peer count. If the metrics
|
||||||
// system is disabled, function returns the original connection.
|
// system is disabled, function returns the original connection.
|
||||||
func newMeteredConn(conn net.Conn, ingress bool, addr *net.TCPAddr) net.Conn {
|
func newMeteredConn(conn net.Conn) net.Conn {
|
||||||
// Short circuit if metrics are disabled
|
|
||||||
if !metrics.Enabled {
|
if !metrics.Enabled {
|
||||||
return conn
|
return conn
|
||||||
}
|
}
|
||||||
// Bump the connection counters and wrap the connection
|
|
||||||
if ingress {
|
|
||||||
ingressConnectMeter.Mark(1)
|
|
||||||
} else {
|
|
||||||
egressConnectMeter.Mark(1)
|
|
||||||
}
|
|
||||||
activePeerGauge.Inc(1)
|
|
||||||
return &meteredConn{Conn: conn}
|
return &meteredConn{Conn: conn}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -82,13 +130,3 @@ func (c *meteredConn) Write(b []byte) (n int, err error) {
|
||||||
egressTrafficMeter.Mark(int64(n))
|
egressTrafficMeter.Mark(int64(n))
|
||||||
return n, err
|
return n, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close delegates a close operation to the underlying connection, unregisters
|
|
||||||
// the peer from the traffic registries and emits close event.
|
|
||||||
func (c *meteredConn) Close() error {
|
|
||||||
err := c.Conn.Close()
|
|
||||||
if err == nil {
|
|
||||||
activePeerGauge.Dec(1)
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
|
@ -64,7 +64,11 @@ const (
|
||||||
frameWriteTimeout = 20 * time.Second
|
frameWriteTimeout = 20 * time.Second
|
||||||
)
|
)
|
||||||
|
|
||||||
var errServerStopped = errors.New("server stopped")
|
var (
|
||||||
|
errServerStopped = errors.New("server stopped")
|
||||||
|
errEncHandshakeError = errors.New("rlpx enc error")
|
||||||
|
errProtoHandshakeError = errors.New("rlpx proto error")
|
||||||
|
)
|
||||||
|
|
||||||
// Config holds Server options.
|
// Config holds Server options.
|
||||||
type Config struct {
|
type Config struct {
|
||||||
|
@ -772,7 +776,11 @@ running:
|
||||||
srv.dialsched.peerAdded(c)
|
srv.dialsched.peerAdded(c)
|
||||||
if p.Inbound() {
|
if p.Inbound() {
|
||||||
inboundCount++
|
inboundCount++
|
||||||
|
serveSuccessMeter.Mark(1)
|
||||||
|
} else {
|
||||||
|
dialSuccessMeter.Mark(1)
|
||||||
}
|
}
|
||||||
|
activePeerGauge.Inc(1)
|
||||||
}
|
}
|
||||||
c.cont <- err
|
c.cont <- err
|
||||||
|
|
||||||
|
@ -785,6 +793,7 @@ running:
|
||||||
if pd.Inbound() {
|
if pd.Inbound() {
|
||||||
inboundCount--
|
inboundCount--
|
||||||
}
|
}
|
||||||
|
activePeerGauge.Dec(1)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -894,11 +903,8 @@ func (srv *Server) listenLoop() {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if remoteIP != nil {
|
if remoteIP != nil {
|
||||||
var addr *net.TCPAddr
|
fd = newMeteredConn(fd)
|
||||||
if tcp, ok := fd.RemoteAddr().(*net.TCPAddr); ok {
|
serveMeter.Mark(1)
|
||||||
addr = tcp
|
|
||||||
}
|
|
||||||
fd = newMeteredConn(fd, true, addr)
|
|
||||||
srv.log.Trace("Accepted connection", "addr", fd.RemoteAddr())
|
srv.log.Trace("Accepted connection", "addr", fd.RemoteAddr())
|
||||||
}
|
}
|
||||||
go func() {
|
go func() {
|
||||||
|
@ -939,6 +945,9 @@ func (srv *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *enode.Node)
|
||||||
|
|
||||||
err := srv.setupConn(c, flags, dialDest)
|
err := srv.setupConn(c, flags, dialDest)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if !c.is(inboundConn) {
|
||||||
|
markDialError(err)
|
||||||
|
}
|
||||||
c.close(err)
|
c.close(err)
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
|
@ -957,7 +966,7 @@ func (srv *Server) setupConn(c *conn, flags connFlag, dialDest *enode.Node) erro
|
||||||
if dialDest != nil {
|
if dialDest != nil {
|
||||||
dialPubkey := new(ecdsa.PublicKey)
|
dialPubkey := new(ecdsa.PublicKey)
|
||||||
if err := dialDest.Load((*enode.Secp256k1)(dialPubkey)); err != nil {
|
if err := dialDest.Load((*enode.Secp256k1)(dialPubkey)); err != nil {
|
||||||
err = errors.New("dial destination doesn't have a secp256k1 public key")
|
err = fmt.Errorf("%w: dial destination doesn't have a secp256k1 public key", errEncHandshakeError)
|
||||||
srv.log.Trace("Setting up connection failed", "addr", c.fd.RemoteAddr(), "conn", c.flags, "err", err)
|
srv.log.Trace("Setting up connection failed", "addr", c.fd.RemoteAddr(), "conn", c.flags, "err", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -967,7 +976,7 @@ func (srv *Server) setupConn(c *conn, flags connFlag, dialDest *enode.Node) erro
|
||||||
remotePubkey, err := c.doEncHandshake(srv.PrivateKey)
|
remotePubkey, err := c.doEncHandshake(srv.PrivateKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
srv.log.Trace("Failed RLPx handshake", "addr", c.fd.RemoteAddr(), "conn", c.flags, "err", err)
|
srv.log.Trace("Failed RLPx handshake", "addr", c.fd.RemoteAddr(), "conn", c.flags, "err", err)
|
||||||
return err
|
return fmt.Errorf("%w: %v", errEncHandshakeError, err)
|
||||||
}
|
}
|
||||||
if dialDest != nil {
|
if dialDest != nil {
|
||||||
c.node = dialDest
|
c.node = dialDest
|
||||||
|
@ -985,7 +994,7 @@ func (srv *Server) setupConn(c *conn, flags connFlag, dialDest *enode.Node) erro
|
||||||
phs, err := c.doProtoHandshake(srv.ourHandshake)
|
phs, err := c.doProtoHandshake(srv.ourHandshake)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
clog.Trace("Failed p2p handshake", "err", err)
|
clog.Trace("Failed p2p handshake", "err", err)
|
||||||
return err
|
return fmt.Errorf("%w: %v", errProtoHandshakeError, err)
|
||||||
}
|
}
|
||||||
if id := c.node.ID(); !bytes.Equal(crypto.Keccak256(phs.ID), id[:]) {
|
if id := c.node.ID(); !bytes.Equal(crypto.Keccak256(phs.ID), id[:]) {
|
||||||
clog.Trace("Wrong devp2p handshake identity", "phsid", hex.EncodeToString(phs.ID))
|
clog.Trace("Wrong devp2p handshake identity", "phsid", hex.EncodeToString(phs.ID))
|
||||||
|
|
|
@ -370,8 +370,6 @@ func TestServerSetupConn(t *testing.T) {
|
||||||
clientkey, srvkey = newkey(), newkey()
|
clientkey, srvkey = newkey(), newkey()
|
||||||
clientpub = &clientkey.PublicKey
|
clientpub = &clientkey.PublicKey
|
||||||
srvpub = &srvkey.PublicKey
|
srvpub = &srvkey.PublicKey
|
||||||
fooErr = errors.New("foo")
|
|
||||||
readErr = errors.New("read error")
|
|
||||||
)
|
)
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
dontstart bool
|
dontstart bool
|
||||||
|
@ -389,10 +387,10 @@ func TestServerSetupConn(t *testing.T) {
|
||||||
wantCloseErr: errServerStopped,
|
wantCloseErr: errServerStopped,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
tt: &setupTransport{pubkey: clientpub, encHandshakeErr: readErr},
|
tt: &setupTransport{pubkey: clientpub, encHandshakeErr: errEncHandshakeError},
|
||||||
flags: inboundConn,
|
flags: inboundConn,
|
||||||
wantCalls: "doEncHandshake,close,",
|
wantCalls: "doEncHandshake,close,",
|
||||||
wantCloseErr: readErr,
|
wantCloseErr: errEncHandshakeError,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
tt: &setupTransport{pubkey: clientpub, phs: protoHandshake{ID: randomID().Bytes()}},
|
tt: &setupTransport{pubkey: clientpub, phs: protoHandshake{ID: randomID().Bytes()}},
|
||||||
|
@ -402,11 +400,11 @@ func TestServerSetupConn(t *testing.T) {
|
||||||
wantCloseErr: DiscUnexpectedIdentity,
|
wantCloseErr: DiscUnexpectedIdentity,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
tt: &setupTransport{pubkey: clientpub, protoHandshakeErr: fooErr},
|
tt: &setupTransport{pubkey: clientpub, protoHandshakeErr: errProtoHandshakeError},
|
||||||
dialDest: enode.NewV4(clientpub, nil, 0, 0),
|
dialDest: enode.NewV4(clientpub, nil, 0, 0),
|
||||||
flags: dynDialedConn,
|
flags: dynDialedConn,
|
||||||
wantCalls: "doEncHandshake,doProtoHandshake,close,",
|
wantCalls: "doEncHandshake,doProtoHandshake,close,",
|
||||||
wantCloseErr: fooErr,
|
wantCloseErr: errProtoHandshakeError,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
tt: &setupTransport{pubkey: srvpub, phs: protoHandshake{ID: crypto.FromECDSAPub(srvpub)[1:]}},
|
tt: &setupTransport{pubkey: srvpub, phs: protoHandshake{ID: crypto.FromECDSAPub(srvpub)[1:]}},
|
||||||
|
|
Loading…
Reference in New Issue