diff --git a/p2p/discover/udp.go b/p2p/discover/udp.go index d7ca9000d4..008e63937d 100644 --- a/p2p/discover/udp.go +++ b/p2p/discover/udp.go @@ -18,6 +18,7 @@ package discover import ( "bytes" + "container/list" "crypto/ecdsa" "errors" "fmt" @@ -43,6 +44,7 @@ var ( errUnsolicitedReply = errors.New("unsolicited reply") errUnknownNode = errors.New("unknown node") errTimeout = errors.New("RPC timeout") + errClockWarp = errors.New("reply deadline too far in the future") errClosed = errors.New("socket closed") ) @@ -296,7 +298,7 @@ func (t *udp) pending(id NodeID, ptype byte, callback func(interface{}) bool) <- } func (t *udp) handleReply(from NodeID, ptype byte, req packet) bool { - matched := make(chan bool) + matched := make(chan bool, 1) select { case t.gotreply <- reply{from, ptype, req, matched}: // loop will handle it @@ -310,68 +312,82 @@ func (t *udp) handleReply(from NodeID, ptype byte, req packet) bool { // the refresh timer and the pending reply queue. func (t *udp) loop() { var ( - pending []*pending - nextDeadline time.Time - timeout = time.NewTimer(0) - refresh = time.NewTicker(refreshInterval) + plist = list.New() + timeout = time.NewTimer(0) + nextTimeout *pending // head of plist when timeout was last reset + refresh = time.NewTicker(refreshInterval) ) <-timeout.C // ignore first timeout defer refresh.Stop() defer timeout.Stop() - rearmTimeout := func() { - now := time.Now() - if len(pending) == 0 || now.Before(nextDeadline) { + resetTimeout := func() { + if plist.Front() == nil || nextTimeout == plist.Front().Value { return } - nextDeadline = pending[0].deadline - timeout.Reset(nextDeadline.Sub(now)) + // Start the timer so it fires when the next pending reply has expired. + now := time.Now() + for el := plist.Front(); el != nil; el = el.Next() { + nextTimeout = el.Value.(*pending) + if dist := nextTimeout.deadline.Sub(now); dist < 2*respTimeout { + timeout.Reset(dist) + return + } + // Remove pending replies whose deadline is too far in the + // future. These can occur if the system clock jumped + // backwards after the deadline was assigned. + nextTimeout.errc <- errClockWarp + plist.Remove(el) + } + nextTimeout = nil + timeout.Stop() } for { + resetTimeout() + select { case <-refresh.C: go t.refresh() case <-t.closing: - for _, p := range pending { - p.errc <- errClosed + for el := plist.Front(); el != nil; el = el.Next() { + el.Value.(*pending).errc <- errClosed } - pending = nil return case p := <-t.addpending: p.deadline = time.Now().Add(respTimeout) - pending = append(pending, p) - rearmTimeout() + plist.PushBack(p) case r := <-t.gotreply: var matched bool - for i := 0; i < len(pending); i++ { - if p := pending[i]; p.from == r.from && p.ptype == r.ptype { + for el := plist.Front(); el != nil; el = el.Next() { + p := el.Value.(*pending) + if p.from == r.from && p.ptype == r.ptype { matched = true + // Remove the matcher if its callback indicates + // that all replies have been received. This is + // required for packet types that expect multiple + // reply packets. if p.callback(r.data) { - // callback indicates the request is done, remove it. p.errc <- nil - copy(pending[i:], pending[i+1:]) - pending = pending[:len(pending)-1] - i-- + plist.Remove(el) } } } r.matched <- matched case now := <-timeout.C: - // notify and remove callbacks whose deadline is in the past. - i := 0 - for ; i < len(pending) && now.After(pending[i].deadline); i++ { - pending[i].errc <- errTimeout + nextTimeout = nil + // Notify and remove callbacks whose deadline is in the past. + for el := plist.Front(); el != nil; el = el.Next() { + p := el.Value.(*pending) + if now.After(p.deadline) || now.Equal(p.deadline) { + p.errc <- errTimeout + plist.Remove(el) + } } - if i > 0 { - copy(pending, pending[i:]) - pending = pending[:len(pending)-i] - } - rearmTimeout() } } } @@ -385,7 +401,7 @@ const ( var ( headSpace = make([]byte, headSize) - // Neighbors responses are sent across multiple packets to + // Neighbors replies are sent across multiple packets to // stay below the 1280 byte limit. We compute the maximum number // of entries by stuffing a packet until it grows too large. maxNeighbors int diff --git a/p2p/discover/udp_test.go b/p2p/discover/udp_test.go index b913424dd0..a86d3737bc 100644 --- a/p2p/discover/udp_test.go +++ b/p2p/discover/udp_test.go @@ -19,10 +19,12 @@ package discover import ( "bytes" "crypto/ecdsa" + "encoding/binary" "errors" "fmt" "io" logpkg "log" + "math/rand" "net" "os" "path/filepath" @@ -138,6 +140,77 @@ func TestUDP_pingTimeout(t *testing.T) { } } +func TestUDP_responseTimeouts(t *testing.T) { + t.Parallel() + test := newUDPTest(t) + defer test.table.Close() + + rand.Seed(time.Now().UnixNano()) + randomDuration := func(max time.Duration) time.Duration { + return time.Duration(rand.Int63n(int64(max))) + } + + var ( + nReqs = 200 + nTimeouts = 0 // number of requests with ptype > 128 + nilErr = make(chan error, nReqs) // for requests that get a reply + timeoutErr = make(chan error, nReqs) // for requests that time out + ) + for i := 0; i < nReqs; i++ { + // Create a matcher for a random request in udp.loop. Requests + // with ptype <= 128 will not get a reply and should time out. + // For all other requests, a reply is scheduled to arrive + // within the timeout window. + p := &pending{ + ptype: byte(rand.Intn(255)), + callback: func(interface{}) bool { return true }, + } + binary.BigEndian.PutUint64(p.from[:], uint64(i)) + if p.ptype <= 128 { + p.errc = timeoutErr + nTimeouts++ + } else { + p.errc = nilErr + time.AfterFunc(randomDuration(60*time.Millisecond), func() { + if !test.udp.handleReply(p.from, p.ptype, nil) { + t.Logf("not matched: %v", p) + } + }) + } + test.udp.addpending <- p + time.Sleep(randomDuration(30 * time.Millisecond)) + } + + // Check that all timeouts were delivered and that the rest got nil errors. + // The replies must be delivered. + var ( + recvDeadline = time.After(20 * time.Second) + nTimeoutsRecv, nNil = 0, 0 + ) + for i := 0; i < nReqs; i++ { + select { + case err := <-timeoutErr: + if err != errTimeout { + t.Fatalf("got non-timeout error on timeoutErr %d: %v", i, err) + } + nTimeoutsRecv++ + case err := <-nilErr: + if err != nil { + t.Fatalf("got non-nil error on nilErr %d: %v", i, err) + } + nNil++ + case <-recvDeadline: + t.Fatalf("exceeded recv deadline") + } + } + if nTimeoutsRecv != nTimeouts { + t.Errorf("wrong number of timeout errors received: got %d, want %d", nTimeoutsRecv, nTimeouts) + } + if nNil != nReqs-nTimeouts { + t.Errorf("wrong number of successful replies: got %d, want %d", nNil, nReqs-nTimeouts) + } +} + func TestUDP_findnodeTimeout(t *testing.T) { t.Parallel() test := newUDPTest(t)