From 9cd8c96157728b8e2eb98a5c506abfc543b6e166 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Fri, 3 Apr 2015 02:16:53 +0200 Subject: [PATCH 1/9] p2p/discover: make packet processing less concurrent --- p2p/discover/udp.go | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/p2p/discover/udp.go b/p2p/discover/udp.go index a638a8f35c..3275cf1289 100644 --- a/p2p/discover/udp.go +++ b/p2p/discover/udp.go @@ -379,11 +379,9 @@ func (t *udp) readLoop() { continue } glog.V(logger.Detail).Infof("<<< %v %T %v\n", from, packet, packet) - go func() { - if err := packet.handle(t, from, fromID, hash); err != nil { - glog.V(logger.Debug).Infof("error handling %T from %v: %v", packet, from, err) - } - }() + if err := packet.handle(t, from, fromID, hash); err != nil { + glog.V(logger.Debug).Infof("error handling %T from %v: %v", packet, from, err) + } } } @@ -430,7 +428,7 @@ func (req *ping) handle(t *udp, from *net.UDPAddr, fromID NodeID, mac []byte) er }) if !t.handleReply(fromID, pingPacket, req) { // Note: we're ignoring the provided IP address right now - t.bond(true, fromID, from, req.Port) + go t.bond(true, fromID, from, req.Port) } return nil } From 7be05b4b9d9f2f724cbdfa7b79c10f1c25cbdb4b Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Fri, 3 Apr 2015 02:20:44 +0200 Subject: [PATCH 2/9] p2p/discover: don't log packet content --- p2p/discover/udp.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/p2p/discover/udp.go b/p2p/discover/udp.go index 3275cf1289..d37260e7d7 100644 --- a/p2p/discover/udp.go +++ b/p2p/discover/udp.go @@ -335,7 +335,7 @@ func (t *udp) send(toaddr *net.UDPAddr, ptype byte, req interface{}) error { if err != nil { return err } - glog.V(logger.Detail).Infof(">>> %v %T %v\n", toaddr, req, req) + glog.V(logger.Detail).Infof(">>> %v %T\n", toaddr, req) if _, err = t.conn.WriteToUDP(packet, toaddr); err != nil { glog.V(logger.Detail).Infoln("UDP send failed:", err) } @@ -378,10 +378,11 @@ func (t *udp) readLoop() { glog.V(logger.Debug).Infof("Bad packet from %v: %v\n", from, err) continue } - glog.V(logger.Detail).Infof("<<< %v %T %v\n", from, packet, packet) + status := "ok" if err := packet.handle(t, from, fromID, hash); err != nil { - glog.V(logger.Debug).Infof("error handling %T from %v: %v", packet, from, err) + status = err.Error() } + glog.V(logger.Detail).Infof("<<< %v %T: %s\n", from, packet, status) } } From 22d1f0faf1fcb20063b719f8fc70534e268485b6 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Fri, 3 Apr 2015 03:56:17 +0200 Subject: [PATCH 3/9] p2p: improve peer selection logic This commit introduces a new (temporary) peer selection strategy based on random lookups. While we're here, also implement the TODOs in dialLoop. --- p2p/server.go | 117 ++++++++++++++++++++++++++------------------------ 1 file changed, 61 insertions(+), 56 deletions(-) diff --git a/p2p/server.go b/p2p/server.go index 0a2621aa81..b21b83252e 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -3,6 +3,7 @@ package p2p import ( "bytes" "crypto/ecdsa" + "crypto/rand" "errors" "fmt" "net" @@ -276,46 +277,58 @@ func (srv *Server) listenLoop() { } func (srv *Server) dialLoop() { + var ( + dialed = make(chan *discover.Node) + dialing = make(map[discover.NodeID]bool) + findresults = make(chan []*discover.Node) + refresh = time.NewTimer(0) + ) defer srv.loopWG.Done() - refresh := time.NewTicker(refreshPeersInterval) defer refresh.Stop() + // TODO: maybe limit number of active dials + dial := func(dest *discover.Node) { + srv.lock.Lock() + ok, _ := srv.checkPeer(dest.ID) + srv.lock.Unlock() + // Don't dial nodes that would fail the checks in addPeer. + // This is important because the connection handshake is a lot + // of work and we'd rather avoid doing that work for peers + // that can't be added. + if !ok || dialing[dest.ID] { + return + } + dialing[dest.ID] = true + srv.peerWG.Add(1) + go func() { + srv.dialNode(dest) + dialed <- dest + }() + } + srv.ntab.Bootstrap(srv.BootstrapNodes) - go srv.findPeers() - - dialed := make(chan *discover.Node) - dialing := make(map[discover.NodeID]bool) - - // TODO: limit number of active dials - // TODO: ensure only one findPeers goroutine is running - // TODO: pause findPeers when we're at capacity - for { select { case <-refresh.C: - - go srv.findPeers() - - case dest := <-srv.peerConnect: - // avoid dialing nodes that are already connected. - // there is another check for this in addPeer, - // which runs after the handshake. srv.lock.Lock() - _, isconnected := srv.peers[dest.ID] + needpeers := len(srv.peers) < srv.MaxPeers srv.lock.Unlock() - if isconnected || dialing[dest.ID] || dest.ID == srv.Self().ID { - continue + if needpeers { + go func() { + var target discover.NodeID + rand.Read(target[:]) + findresults <- srv.ntab.Lookup(target) + }() + refresh.Stop() } - dialing[dest.ID] = true - srv.peerWG.Add(1) - go func() { - srv.dialNode(dest) - // at this point, the peer has been added - // or discarded. either way, we're not dialing it anymore. - dialed <- dest - }() - + case dest := <-srv.peerConnect: + dial(dest) + case dests := <-findresults: + for _, dest := range dests { + dial(dest) + } + refresh.Reset(refreshPeersInterval) case dest := <-dialed: delete(dialing, dest.ID) @@ -341,24 +354,6 @@ func (srv *Server) Self() *discover.Node { return srv.ntab.Self() } -func (srv *Server) findPeers() { - far := srv.Self().ID - for i := range far { - far[i] = ^far[i] - } - closeToSelf := srv.ntab.Lookup(srv.Self().ID) - farFromSelf := srv.ntab.Lookup(far) - - for i := 0; i < len(closeToSelf) || i < len(farFromSelf); i++ { - if i < len(closeToSelf) { - srv.peerConnect <- closeToSelf[i] - } - if i < len(farFromSelf) { - srv.peerConnect <- farFromSelf[i] - } - } -} - func (srv *Server) startPeer(fd net.Conn, dest *discover.Node) { // TODO: handle/store session token fd.SetDeadline(time.Now().Add(handshakeTimeout)) @@ -368,7 +363,6 @@ func (srv *Server) startPeer(fd net.Conn, dest *discover.Node) { glog.V(logger.Debug).Infof("Handshake with %v failed: %v", fd.RemoteAddr(), err) return } - conn.MsgReadWriter = &netWrapper{ wrapped: conn.MsgReadWriter, conn: fd, rtimeout: frameReadTimeout, wtimeout: frameWriteTimeout, @@ -379,24 +373,27 @@ func (srv *Server) startPeer(fd net.Conn, dest *discover.Node) { p.politeDisconnect(reason) return } + // The handshakes are done and it passed all checks. + // Spawn the Peer loops. + go srv.runPeer(p) +} +func (srv *Server) runPeer(p *Peer) { glog.V(logger.Debug).Infof("Added %v\n", p) srvjslog.LogJson(&logger.P2PConnected{ - RemoteId: fmt.Sprintf("%x", conn.ID[:]), - RemoteAddress: fd.RemoteAddr().String(), - RemoteVersionString: conn.Name, + RemoteId: p.ID().String(), + RemoteAddress: p.RemoteAddr().String(), + RemoteVersionString: p.Name(), NumConnections: srv.PeerCount(), }) - if srv.newPeerHook != nil { srv.newPeerHook(p) } discreason := p.run() srv.removePeer(p) - glog.V(logger.Debug).Infof("Removed %v (%v)\n", p, discreason) srvjslog.LogJson(&logger.P2PDisconnected{ - RemoteId: fmt.Sprintf("%x", conn.ID[:]), + RemoteId: p.ID().String(), NumConnections: srv.PeerCount(), }) } @@ -404,6 +401,14 @@ func (srv *Server) startPeer(fd net.Conn, dest *discover.Node) { func (srv *Server) addPeer(id discover.NodeID, p *Peer) (bool, DiscReason) { srv.lock.Lock() defer srv.lock.Unlock() + if ok, reason := srv.checkPeer(id); !ok { + return false, reason + } + srv.peers[id] = p + return true, 0 +} + +func (srv *Server) checkPeer(id discover.NodeID) (bool, DiscReason) { switch { case !srv.running: return false, DiscQuitting @@ -413,9 +418,9 @@ func (srv *Server) addPeer(id discover.NodeID, p *Peer) (bool, DiscReason) { return false, DiscAlreadyConnected case id == srv.Self().ID: return false, DiscSelf + default: + return true, 0 } - srv.peers[id] = p - return true, 0 } func (srv *Server) removePeer(p *Peer) { From f1d710af006d7e9ed6046ab410976bd20c1e3c97 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Wed, 8 Apr 2015 17:37:11 +0200 Subject: [PATCH 4/9] p2p: fix Peer shutdown deadlocks There were multiple synchronization issues in the disconnect handling, all caused by the odd special-casing of Peer.readLoop errors. Remove the special handling of read errors and make readLoop part of the Peer WaitGroup. Thanks to @Gustav-Simonsson for pointing at arrows in a diagram and playing rubber-duck. --- p2p/handshake.go | 2 +- p2p/peer.go | 131 ++++++++++++++++++++++++---------------------- p2p/peer_error.go | 10 ++-- p2p/peer_test.go | 74 ++++++++++++++++++++++---- 4 files changed, 139 insertions(+), 78 deletions(-) diff --git a/p2p/handshake.go b/p2p/handshake.go index 031064407f..5a259cd769 100644 --- a/p2p/handshake.go +++ b/p2p/handshake.go @@ -409,7 +409,7 @@ func readProtocolHandshake(r MsgReader, our *protoHandshake) (*protoHandshake, e // spec and we send it ourself if Server.addPeer fails. var reason DiscReason rlp.Decode(msg.Payload, &reason) - return nil, discRequestedError(reason) + return nil, reason } if msg.Code != handshakeMsg { return nil, fmt.Errorf("expected handshake, got %x", msg.Code) diff --git a/p2p/peer.go b/p2p/peer.go index 6b97ea58d9..a82ee4bcad 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -44,7 +44,7 @@ type Peer struct { rw *conn running map[string]*protoRW - protoWG sync.WaitGroup + wg sync.WaitGroup protoErr chan error closed chan struct{} disc chan DiscReason @@ -102,58 +102,50 @@ func (p *Peer) String() string { func newPeer(fd net.Conn, conn *conn, protocols []Protocol) *Peer { logtag := fmt.Sprintf("Peer %.8x %v", conn.ID[:], fd.RemoteAddr()) + protomap := matchProtocols(protocols, conn.Caps, conn) p := &Peer{ Logger: logger.NewLogger(logtag), conn: fd, rw: conn, - running: matchProtocols(protocols, conn.Caps, conn), + running: protomap, disc: make(chan DiscReason), - protoErr: make(chan error), + protoErr: make(chan error, len(protomap)+1), // protocols + pingLoop closed: make(chan struct{}), } return p } func (p *Peer) run() DiscReason { - var readErr = make(chan error, 1) - defer p.closeProtocols() - defer close(p.closed) + readErr := make(chan error, 1) + p.wg.Add(2) + go p.readLoop(readErr) + go p.pingLoop() p.startProtocols() - go func() { readErr <- p.readLoop() }() - - ping := time.NewTicker(pingInterval) - defer ping.Stop() // Wait for an error or disconnect. var reason DiscReason -loop: - for { - select { - case <-ping.C: - go func() { - if err := SendItems(p.rw, pingMsg); err != nil { - p.protoErr <- err - return - } - }() - case err := <-readErr: - // We rely on protocols to abort if there is a write error. It - // might be more robust to handle them here as well. - p.DebugDetailf("Read error: %v\n", err) - p.conn.Close() - return DiscNetworkError - case err := <-p.protoErr: - reason = discReasonForError(err) - break loop - case reason = <-p.disc: - break loop + select { + case err := <-readErr: + if r, ok := err.(DiscReason); ok { + reason = r + break } + // Note: We rely on protocols to abort if there is a write + // error. It might be more robust to handle them here as well. + p.DebugDetailf("Read error: %v\n", err) + p.conn.Close() + reason = DiscNetworkError + case err := <-p.protoErr: + reason = discReasonForError(err) + case reason = <-p.disc: } - p.politeDisconnect(reason) - // Wait for readLoop. It will end because conn is now closed. - <-readErr + close(p.closed) + p.wg.Wait() + if reason != DiscNetworkError { + p.politeDisconnect(reason) + } p.Debugf("Disconnected: %v\n", reason) return reason } @@ -174,18 +166,37 @@ func (p *Peer) politeDisconnect(reason DiscReason) { p.conn.Close() } -func (p *Peer) readLoop() error { +func (p *Peer) pingLoop() { + ping := time.NewTicker(pingInterval) + defer p.wg.Done() + defer ping.Stop() + for { + select { + case <-ping.C: + if err := SendItems(p.rw, pingMsg); err != nil { + p.protoErr <- err + return + } + case <-p.closed: + return + } + } +} + +func (p *Peer) readLoop(errc chan<- error) { + defer p.wg.Done() for { p.conn.SetDeadline(time.Now().Add(frameReadTimeout)) msg, err := p.rw.ReadMsg() if err != nil { - return err + errc <- err + return } if err = p.handle(msg); err != nil { - return err + errc <- err + return } } - return nil } func (p *Peer) handle(msg Msg) error { @@ -195,12 +206,11 @@ func (p *Peer) handle(msg Msg) error { go SendItems(p.rw, pongMsg) case msg.Code == discMsg: var reason [1]DiscReason - // no need to discard or for error checking, we'll close the - // connection after this. + // This is the last message. We don't need to discard or + // check errors because, the connection will be closed after it. rlp.Decode(msg.Payload, &reason) p.Debugf("Disconnect requested: %v\n", reason[0]) - p.Disconnect(DiscRequested) - return discRequestedError(reason[0]) + return DiscRequested case msg.Code < baseProtocolLength: // ignore other base protocol messages return msg.Discard() @@ -210,7 +220,12 @@ func (p *Peer) handle(msg Msg) error { if err != nil { return fmt.Errorf("msg code out of range: %v", msg.Code) } - proto.in <- msg + select { + case proto.in <- msg: + return nil + case <-p.closed: + return io.EOF + } } return nil } @@ -234,10 +249,11 @@ outer: } func (p *Peer) startProtocols() { + p.wg.Add(len(p.running)) for _, proto := range p.running { proto := proto + proto.closed = p.closed p.DebugDetailf("Starting protocol %s/%d\n", proto.Name, proto.Version) - p.protoWG.Add(1) go func() { err := proto.Run(p, proto) if err == nil { @@ -246,11 +262,8 @@ func (p *Peer) startProtocols() { } else { p.DebugDetailf("Protocol %s/%d error: %v\n", proto.Name, proto.Version, err) } - select { - case p.protoErr <- err: - case <-p.closed: - } - p.protoWG.Done() + p.protoErr <- err + p.wg.Done() }() } } @@ -266,13 +279,6 @@ func (p *Peer) getProto(code uint64) (*protoRW, error) { return nil, newPeerError(errInvalidMsgCode, "%d", code) } -func (p *Peer) closeProtocols() { - for _, p := range p.running { - close(p.in) - } - p.protoWG.Wait() -} - // writeProtoMsg sends the given message on behalf of the given named protocol. // this exists because of Server.Broadcast. func (p *Peer) writeProtoMsg(protoName string, msg Msg) error { @@ -289,8 +295,8 @@ func (p *Peer) writeProtoMsg(protoName string, msg Msg) error { type protoRW struct { Protocol - in chan Msg + closed <-chan struct{} offset uint64 w MsgWriter } @@ -304,10 +310,11 @@ func (rw *protoRW) WriteMsg(msg Msg) error { } func (rw *protoRW) ReadMsg() (Msg, error) { - msg, ok := <-rw.in - if !ok { - return msg, io.EOF + select { + case msg := <-rw.in: + msg.Code -= rw.offset + return msg, nil + case <-rw.closed: + return Msg{}, io.EOF } - msg.Code -= rw.offset - return msg, nil } diff --git a/p2p/peer_error.go b/p2p/peer_error.go index 0ff4f4b43e..4021316306 100644 --- a/p2p/peer_error.go +++ b/p2p/peer_error.go @@ -98,15 +98,13 @@ func (d DiscReason) String() string { return discReasonToString[d] } -type discRequestedError DiscReason - -func (err discRequestedError) Error() string { - return fmt.Sprintf("disconnect requested: %v", DiscReason(err)) +func (d DiscReason) Error() string { + return d.String() } func discReasonForError(err error) DiscReason { - if reason, ok := err.(discRequestedError); ok { - return DiscReason(reason) + if reason, ok := err.(DiscReason); ok { + return reason } peerError, ok := err.(*peerError) if !ok { diff --git a/p2p/peer_test.go b/p2p/peer_test.go index 3c4c71c0c0..fb76818a06 100644 --- a/p2p/peer_test.go +++ b/p2p/peer_test.go @@ -2,8 +2,9 @@ package p2p import ( "bytes" + "errors" "fmt" - "io" + "math/rand" "net" "reflect" "testing" @@ -27,7 +28,7 @@ var discard = Protocol{ }, } -func testPeer(protos []Protocol) (io.Closer, *conn, *Peer, <-chan DiscReason) { +func testPeer(protos []Protocol) (func(), *conn, *Peer, <-chan DiscReason) { fd1, _ := net.Pipe() hs1 := &protoHandshake{ID: randomID(), Version: baseProtocolVersion} hs2 := &protoHandshake{ID: randomID(), Version: baseProtocolVersion} @@ -41,7 +42,11 @@ func testPeer(protos []Protocol) (io.Closer, *conn, *Peer, <-chan DiscReason) { errc := make(chan DiscReason, 1) go func() { errc <- peer.run() }() - return p1, &conn{p2, hs2}, peer, errc + closer := func() { + p1.Close() + fd1.Close() + } + return closer, &conn{p2, hs2}, peer, errc } func TestPeerProtoReadMsg(t *testing.T) { @@ -67,7 +72,7 @@ func TestPeerProtoReadMsg(t *testing.T) { } closer, rw, _, errc := testPeer([]Protocol{proto}) - defer closer.Close() + defer closer() Send(rw, baseProtocolLength+2, []uint{1}) Send(rw, baseProtocolLength+3, []uint{2}) @@ -99,7 +104,7 @@ func TestPeerProtoEncodeMsg(t *testing.T) { }, } closer, rw, _, _ := testPeer([]Protocol{proto}) - defer closer.Close() + defer closer() if err := ExpectMsg(rw, 17, []string{"foo", "bar"}); err != nil { t.Error(err) @@ -110,7 +115,7 @@ func TestPeerWriteForBroadcast(t *testing.T) { defer testlog(t).detach() closer, rw, peer, peerErr := testPeer([]Protocol{discard}) - defer closer.Close() + defer closer() emptymsg := func(code uint64) Msg { return Msg{Code: code, Size: 0, Payload: bytes.NewReader(nil)} @@ -150,7 +155,7 @@ func TestPeerPing(t *testing.T) { defer testlog(t).detach() closer, rw, _, _ := testPeer(nil) - defer closer.Close() + defer closer() if err := SendItems(rw, pingMsg); err != nil { t.Fatal(err) } @@ -163,19 +168,70 @@ func TestPeerDisconnect(t *testing.T) { defer testlog(t).detach() closer, rw, _, disc := testPeer(nil) - defer closer.Close() + defer closer() if err := SendItems(rw, discMsg, DiscQuitting); err != nil { t.Fatal(err) } if err := ExpectMsg(rw, discMsg, []interface{}{DiscRequested}); err != nil { t.Error(err) } - closer.Close() // make test end faster + closer() if reason := <-disc; reason != DiscRequested { t.Errorf("run returned wrong reason: got %v, want %v", reason, DiscRequested) } } +// This test is supposed to verify that Peer can reliably handle +// multiple causes of disconnection occurring at the same time. +func TestPeerDisconnectRace(t *testing.T) { + defer testlog(t).detach() + maybe := func() bool { return rand.Intn(1) == 1 } + + for i := 0; i < 1000; i++ { + protoclose := make(chan error) + protodisc := make(chan DiscReason) + closer, rw, p, disc := testPeer([]Protocol{ + { + Name: "closereq", + Run: func(p *Peer, rw MsgReadWriter) error { return <-protoclose }, + Length: 1, + }, + { + Name: "disconnect", + Run: func(p *Peer, rw MsgReadWriter) error { p.Disconnect(<-protodisc); return nil }, + Length: 1, + }, + }) + + // Simulate incoming messages. + go SendItems(rw, baseProtocolLength+1) + go SendItems(rw, baseProtocolLength+2) + // Close the network connection. + go closer() + // Make protocol "closereq" return. + protoclose <- errors.New("protocol closed") + // Make protocol "disconnect" call peer.Disconnect + protodisc <- DiscAlreadyConnected + // In some cases, simulate something else calling peer.Disconnect. + if maybe() { + go p.Disconnect(DiscInvalidIdentity) + } + // In some cases, simulate remote requesting a disconnect. + if maybe() { + go SendItems(rw, discMsg, DiscQuitting) + } + + select { + case <-disc: + case <-time.After(2 * time.Second): + // Peer.run should return quickly. If it doesn't the Peer + // goroutines are probably deadlocked. Call panic in order to + // show the stacks. + panic("Peer.run took to long to return.") + } + } +} + func TestNewPeer(t *testing.T) { name := "nodename" caps := []Cap{{"foo", 2}, {"bar", 3}} From 145330fdf28b6d6bf3558dec0b512a5379d446f7 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Fri, 10 Apr 2015 13:20:26 +0200 Subject: [PATCH 5/9] p2p: properly decrement peer wait group counter for setup errors --- p2p/server.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/p2p/server.go b/p2p/server.go index b21b83252e..d227d477a3 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -344,6 +344,10 @@ func (srv *Server) dialNode(dest *discover.Node) { glog.V(logger.Debug).Infof("Dialing %v\n", dest) conn, err := srv.Dialer.Dial("tcp", addr.String()) if err != nil { + // dialLoop adds to the wait group counter when launching + // dialNode, so we need to count it down again. startPeer also + // does that when an error occurs. + srv.peerWG.Done() glog.V(logger.Detail).Infof("dial error: %v", err) return } @@ -356,11 +360,17 @@ func (srv *Server) Self() *discover.Node { func (srv *Server) startPeer(fd net.Conn, dest *discover.Node) { // TODO: handle/store session token + + // Run setupFunc, which should create an authenticated connection + // and run the capability exchange. Note that any early error + // returns during that exchange need to call peerWG.Done because + // the callers of startPeer added the peer to the wait group already. fd.SetDeadline(time.Now().Add(handshakeTimeout)) conn, err := srv.setupFunc(fd, srv.PrivateKey, srv.ourHandshake, dest) if err != nil { fd.Close() glog.V(logger.Debug).Infof("Handshake with %v failed: %v", fd.RemoteAddr(), err) + srv.peerWG.Done() return } conn.MsgReadWriter = &netWrapper{ @@ -371,6 +381,7 @@ func (srv *Server) startPeer(fd net.Conn, dest *discover.Node) { if ok, reason := srv.addPeer(conn.ID, p); !ok { glog.V(logger.Detail).Infof("Not adding %v (%v)\n", p, reason) p.politeDisconnect(reason) + srv.peerWG.Done() return } // The handshakes are done and it passed all checks. From 99a1db2d4076861d19e2bd704732ff91d509f34b Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Fri, 10 Apr 2015 13:24:10 +0200 Subject: [PATCH 6/9] p2p: don't mess with the socket deadline in Peer.readLoop netWrapper already sets a read deadline in ReadMsg. --- p2p/peer.go | 1 - 1 file changed, 1 deletion(-) diff --git a/p2p/peer.go b/p2p/peer.go index a82ee4bcad..7bc4f9cf6b 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -186,7 +186,6 @@ func (p *Peer) pingLoop() { func (p *Peer) readLoop(errc chan<- error) { defer p.wg.Done() for { - p.conn.SetDeadline(time.Now().Add(frameReadTimeout)) msg, err := p.rw.ReadMsg() if err != nil { errc <- err From b3c058a9e4e9296583ba516c537768b96a2fb8a0 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Fri, 10 Apr 2015 13:25:35 +0200 Subject: [PATCH 7/9] p2p: improve disconnect signaling at handshake time As of this commit, p2p will disconnect nodes directly after the encryption handshake if too many peer connections are active. Errors in the protocol handshake packet are now handled more politely by sending a disconnect packet before closing the connection. --- p2p/handshake.go | 68 ++++++++++++++++++++++++++----------------- p2p/handshake_test.go | 4 +-- p2p/server.go | 16 ++++++---- p2p/server_test.go | 58 +++++++++++++++++++++++++++++++++++- 4 files changed, 111 insertions(+), 35 deletions(-) diff --git a/p2p/handshake.go b/p2p/handshake.go index 5a259cd769..43361364fa 100644 --- a/p2p/handshake.go +++ b/p2p/handshake.go @@ -68,50 +68,61 @@ type protoHandshake struct { // setupConn starts a protocol session on the given connection. // It runs the encryption handshake and the protocol handshake. // If dial is non-nil, the connection the local node is the initiator. -func setupConn(fd net.Conn, prv *ecdsa.PrivateKey, our *protoHandshake, dial *discover.Node) (*conn, error) { +// If atcap is true, the connection will be disconnected with DiscTooManyPeers +// after the key exchange. +func setupConn(fd net.Conn, prv *ecdsa.PrivateKey, our *protoHandshake, dial *discover.Node, atcap bool) (*conn, error) { if dial == nil { - return setupInboundConn(fd, prv, our) + return setupInboundConn(fd, prv, our, atcap) } else { - return setupOutboundConn(fd, prv, our, dial) + return setupOutboundConn(fd, prv, our, dial, atcap) } } -func setupInboundConn(fd net.Conn, prv *ecdsa.PrivateKey, our *protoHandshake) (*conn, error) { +func setupInboundConn(fd net.Conn, prv *ecdsa.PrivateKey, our *protoHandshake, atcap bool) (*conn, error) { secrets, err := receiverEncHandshake(fd, prv, nil) if err != nil { return nil, fmt.Errorf("encryption handshake failed: %v", err) } - - // Run the protocol handshake using authenticated messages. rw := newRlpxFrameRW(fd, secrets) - rhs, err := readProtocolHandshake(rw, our) + if atcap { + SendItems(rw, discMsg, DiscTooManyPeers) + return nil, errors.New("we have too many peers") + } + // Run the protocol handshake using authenticated messages. + rhs, err := readProtocolHandshake(rw, secrets.RemoteID, our) if err != nil { return nil, err } - if rhs.ID != secrets.RemoteID { - return nil, errors.New("node ID in protocol handshake does not match encryption handshake") - } - // TODO: validate that handshake node ID matches if err := Send(rw, handshakeMsg, our); err != nil { - return nil, fmt.Errorf("protocol write error: %v", err) + return nil, fmt.Errorf("protocol handshake write error: %v", err) } return &conn{rw, rhs}, nil } -func setupOutboundConn(fd net.Conn, prv *ecdsa.PrivateKey, our *protoHandshake, dial *discover.Node) (*conn, error) { +func setupOutboundConn(fd net.Conn, prv *ecdsa.PrivateKey, our *protoHandshake, dial *discover.Node, atcap bool) (*conn, error) { secrets, err := initiatorEncHandshake(fd, prv, dial.ID, nil) if err != nil { return nil, fmt.Errorf("encryption handshake failed: %v", err) } - - // Run the protocol handshake using authenticated messages. rw := newRlpxFrameRW(fd, secrets) - if err := Send(rw, handshakeMsg, our); err != nil { - return nil, fmt.Errorf("protocol write error: %v", err) + if atcap { + SendItems(rw, discMsg, DiscTooManyPeers) + return nil, errors.New("we have too many peers") } - rhs, err := readProtocolHandshake(rw, our) + // Run the protocol handshake using authenticated messages. + // + // Note that even though writing the handshake is first, we prefer + // returning the handshake read error. If the remote side + // disconnects us early with a valid reason, we should return it + // as the error so it can be tracked elsewhere. + werr := make(chan error) + go func() { werr <- Send(rw, handshakeMsg, our) }() + rhs, err := readProtocolHandshake(rw, secrets.RemoteID, our) if err != nil { - return nil, fmt.Errorf("protocol handshake read error: %v", err) + return nil, err + } + if err := <-werr; err != nil { + return nil, fmt.Errorf("protocol handshake write error: %v", err) } if rhs.ID != dial.ID { return nil, errors.New("dialed node id mismatch") @@ -398,18 +409,17 @@ func xor(one, other []byte) (xor []byte) { return xor } -func readProtocolHandshake(r MsgReader, our *protoHandshake) (*protoHandshake, error) { - // read and handle remote handshake - msg, err := r.ReadMsg() +func readProtocolHandshake(rw MsgReadWriter, wantID discover.NodeID, our *protoHandshake) (*protoHandshake, error) { + msg, err := rw.ReadMsg() if err != nil { return nil, err } if msg.Code == discMsg { // disconnect before protocol handshake is valid according to the // spec and we send it ourself if Server.addPeer fails. - var reason DiscReason + var reason [1]DiscReason rlp.Decode(msg.Payload, &reason) - return nil, reason + return nil, reason[0] } if msg.Code != handshakeMsg { return nil, fmt.Errorf("expected handshake, got %x", msg.Code) @@ -423,10 +433,16 @@ func readProtocolHandshake(r MsgReader, our *protoHandshake) (*protoHandshake, e } // validate handshake info if hs.Version != our.Version { - return nil, newPeerError(errP2PVersionMismatch, "required version %d, received %d\n", baseProtocolVersion, hs.Version) + SendItems(rw, discMsg, DiscIncompatibleVersion) + return nil, fmt.Errorf("required version %d, received %d\n", baseProtocolVersion, hs.Version) } if (hs.ID == discover.NodeID{}) { - return nil, newPeerError(errPubkeyInvalid, "missing") + SendItems(rw, discMsg, DiscInvalidIdentity) + return nil, errors.New("invalid public key in handshake") + } + if hs.ID != wantID { + SendItems(rw, discMsg, DiscUnexpectedIdentity) + return nil, errors.New("handshake node ID does not match encryption handshake") } return &hs, nil } diff --git a/p2p/handshake_test.go b/p2p/handshake_test.go index 19423bb824..c22af7a9c7 100644 --- a/p2p/handshake_test.go +++ b/p2p/handshake_test.go @@ -143,7 +143,7 @@ func TestSetupConn(t *testing.T) { done := make(chan struct{}) go func() { defer close(done) - conn0, err := setupConn(fd0, prv0, hs0, node1) + conn0, err := setupConn(fd0, prv0, hs0, node1, false) if err != nil { t.Errorf("outbound side error: %v", err) return @@ -156,7 +156,7 @@ func TestSetupConn(t *testing.T) { } }() - conn1, err := setupConn(fd1, prv1, hs1, nil) + conn1, err := setupConn(fd1, prv1, hs1, nil, false) if err != nil { t.Fatalf("inbound side error: %v", err) } diff --git a/p2p/server.go b/p2p/server.go index d227d477a3..88f7ba2ecd 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -99,7 +99,7 @@ type Server struct { peerConnect chan *discover.Node } -type setupFunc func(net.Conn, *ecdsa.PrivateKey, *protoHandshake, *discover.Node) (*conn, error) +type setupFunc func(net.Conn, *ecdsa.PrivateKey, *protoHandshake, *discover.Node, bool) (*conn, error) type newPeerHook func(*Peer) // Peers returns all connected peers. @@ -261,6 +261,11 @@ func (srv *Server) Stop() { srv.peerWG.Wait() } +// Self returns the local node's endpoint information. +func (srv *Server) Self() *discover.Node { + return srv.ntab.Self() +} + // main loop for adding connections via listening func (srv *Server) listenLoop() { defer srv.loopWG.Done() @@ -354,10 +359,6 @@ func (srv *Server) dialNode(dest *discover.Node) { srv.startPeer(conn, dest) } -func (srv *Server) Self() *discover.Node { - return srv.ntab.Self() -} - func (srv *Server) startPeer(fd net.Conn, dest *discover.Node) { // TODO: handle/store session token @@ -366,7 +367,10 @@ func (srv *Server) startPeer(fd net.Conn, dest *discover.Node) { // returns during that exchange need to call peerWG.Done because // the callers of startPeer added the peer to the wait group already. fd.SetDeadline(time.Now().Add(handshakeTimeout)) - conn, err := srv.setupFunc(fd, srv.PrivateKey, srv.ourHandshake, dest) + srv.lock.RLock() + atcap := len(srv.peers) == srv.MaxPeers + srv.lock.RUnlock() + conn, err := srv.setupFunc(fd, srv.PrivateKey, srv.ourHandshake, dest, atcap) if err != nil { fd.Close() glog.V(logger.Debug).Infof("Handshake with %v failed: %v", fd.RemoteAddr(), err) diff --git a/p2p/server_test.go b/p2p/server_test.go index 14e7c7de2d..53cc3c2582 100644 --- a/p2p/server_test.go +++ b/p2p/server_test.go @@ -22,7 +22,7 @@ func startTestServer(t *testing.T, pf newPeerHook) *Server { ListenAddr: "127.0.0.1:0", PrivateKey: newkey(), newPeerHook: pf, - setupFunc: func(fd net.Conn, prv *ecdsa.PrivateKey, our *protoHandshake, dial *discover.Node) (*conn, error) { + setupFunc: func(fd net.Conn, prv *ecdsa.PrivateKey, our *protoHandshake, dial *discover.Node, atcap bool) (*conn, error) { id := randomID() rw := newRlpxFrameRW(fd, secrets{ MAC: zero16, @@ -163,6 +163,62 @@ func TestServerBroadcast(t *testing.T) { } } +// This test checks that connections are disconnected +// just after the encryption handshake when the server is +// at capacity. +// +// It also serves as a light-weight integration test. +func TestServerDisconnectAtCap(t *testing.T) { + defer testlog(t).detach() + + started := make(chan *Peer) + srv := &Server{ + ListenAddr: "127.0.0.1:0", + PrivateKey: newkey(), + MaxPeers: 10, + NoDial: true, + // This hook signals that the peer was actually started. We + // need to wait for the peer to be started before dialing the + // next connection to get a deterministic peer count. + newPeerHook: func(p *Peer) { started <- p }, + } + if err := srv.Start(); err != nil { + t.Fatal(err) + } + defer srv.Stop() + + nconns := srv.MaxPeers + 1 + dialer := &net.Dialer{Deadline: time.Now().Add(3 * time.Second)} + for i := 0; i < nconns; i++ { + conn, err := dialer.Dial("tcp", srv.ListenAddr) + if err != nil { + t.Fatalf("conn %d: dial error: %v", i, err) + } + // Close the connection when the test ends, before + // shutting down the server. + defer conn.Close() + // Run the handshakes just like a real peer would. + key := newkey() + hs := &protoHandshake{Version: baseProtocolVersion, ID: discover.PubkeyID(&key.PublicKey)} + _, err = setupConn(conn, key, hs, srv.Self(), false) + if i == nconns-1 { + // When handling the last connection, the server should + // disconnect immediately instead of running the protocol + // handshake. + if err != DiscTooManyPeers { + t.Errorf("conn %d: got error %q, expected %q", i, err, DiscTooManyPeers) + } + } else { + // For all earlier connections, the handshake should go through. + if err != nil { + t.Fatalf("conn %d: unexpected error: %v", i, err) + } + // Wait for runPeer to be started. + <-started + } + } +} + func newkey() *ecdsa.PrivateKey { key, err := crypto.GenerateKey() if err != nil { From 56977c225ecbf7834e380c03714c3e8d70e0b184 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Fri, 10 Apr 2015 17:23:09 +0200 Subject: [PATCH 8/9] p2p: use RLock instead of Lock for pre-dial checks --- p2p/server.go | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/p2p/server.go b/p2p/server.go index 88f7ba2ecd..7b880f77b4 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -86,12 +86,12 @@ type Server struct { ourHandshake *protoHandshake - lock sync.RWMutex - running bool - listener net.Listener - peers map[discover.NodeID]*Peer + lock sync.RWMutex // protects running and peers + running bool + peers map[discover.NodeID]*Peer - ntab *discover.Table + ntab *discover.Table + listener net.Listener quit chan struct{} loopWG sync.WaitGroup // {dial,listen,nat}Loop @@ -293,16 +293,17 @@ func (srv *Server) dialLoop() { // TODO: maybe limit number of active dials dial := func(dest *discover.Node) { - srv.lock.Lock() - ok, _ := srv.checkPeer(dest.ID) - srv.lock.Unlock() // Don't dial nodes that would fail the checks in addPeer. // This is important because the connection handshake is a lot // of work and we'd rather avoid doing that work for peers // that can't be added. + srv.lock.RLock() + ok, _ := srv.checkPeer(dest.ID) + srv.lock.RUnlock() if !ok || dialing[dest.ID] { return } + dialing[dest.ID] = true srv.peerWG.Add(1) go func() { @@ -315,9 +316,10 @@ func (srv *Server) dialLoop() { for { select { case <-refresh.C: - srv.lock.Lock() + // Grab some nodes to connect to if we're not at capacity. + srv.lock.RLock() needpeers := len(srv.peers) < srv.MaxPeers - srv.lock.Unlock() + srv.lock.RUnlock() if needpeers { go func() { var target discover.NodeID From c5332537f5726610c3c1606ead8cbaa83144b537 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Fri, 10 Apr 2015 17:24:41 +0200 Subject: [PATCH 9/9] p2p: limit number of lingering inbound pre-handshake connections This is supposed to apply some back pressure so Server is not accepting more connections than it can actually handle. The current limit is 50. This doesn't really need to be configurable, but we'll see how it behaves in our test nodes and adjust accordingly. --- p2p/server.go | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/p2p/server.go b/p2p/server.go index 7b880f77b4..5cd3dc2adf 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -21,6 +21,11 @@ const ( defaultDialTimeout = 10 * time.Second refreshPeersInterval = 30 * time.Second + // This is the maximum number of inbound connection + // that are allowed to linger between 'accepted' and + // 'added as peer'. + maxAcceptConns = 50 + // total timeout for encryption handshake and protocol // handshake in both directions. handshakeTimeout = 5 * time.Second @@ -269,15 +274,28 @@ func (srv *Server) Self() *discover.Node { // main loop for adding connections via listening func (srv *Server) listenLoop() { defer srv.loopWG.Done() + + // This channel acts as a semaphore limiting + // active inbound connections that are lingering pre-handshake. + // If all slots are taken, no further connections are accepted. + slots := make(chan struct{}, maxAcceptConns) + for i := 0; i < maxAcceptConns; i++ { + slots <- struct{}{} + } + glog.V(logger.Info).Infoln("Listening on", srv.listener.Addr()) for { + <-slots conn, err := srv.listener.Accept() if err != nil { return } glog.V(logger.Debug).Infof("Accepted conn %v\n", conn.RemoteAddr()) srv.peerWG.Add(1) - go srv.startPeer(conn, nil) + go func() { + srv.startPeer(conn, nil) + slots <- struct{}{} + }() } }