diff --git a/p2p/discover/v5_udp.go b/p2p/discover/v5_udp.go index 48256ea4de..853c73aa03 100644 --- a/p2p/discover/v5_udp.go +++ b/p2p/discover/v5_udp.go @@ -100,9 +100,11 @@ type UDPv5 struct { } type sendRequest struct { - destID enode.ID - destAddr netip.AddrPort - msg v5wire.Packet + destID enode.ID + destAddr netip.AddrPort + msg v5wire.Packet + destNode *enode.Node + asCallType bool } // callV5 represents a remote procedure call against another node. @@ -122,6 +124,9 @@ type callV5 struct { handshakeCount int // # times we attempted handshake for this call challenge *v5wire.Whoareyou // last sent handshake challenge timeout mclock.Timer + + // marked known as utp packet + withoutResponse bool } // callTimeout is the response timeout event of a call. @@ -530,6 +535,10 @@ func (t *UDPv5) dispatch() { if ct.c == active && ct.timer == active.timeout { ct.c.err <- errTimeout } + if ct.c.withoutResponse { + delete(t.activeCallByAuth, ct.c.nonce) + ct.c.timeout.Stop() + } case c := <-t.callDoneCh: active := t.activeCallByNode[c.id] @@ -542,7 +551,12 @@ func (t *UDPv5) dispatch() { t.sendNextCall(c.id) case r := <-t.sendCh: - t.send(r.destID, r.destAddr, r.msg, nil) + if r.asCallType { + // send a TalkResponse out + t.send(r.destID, r.destAddr, r.msg, nil) + } else { + t.sendCallWithoutResponse(&r) + } case p := <-t.packetInCh: t.handlePacket(p.Data, p.Addr) @@ -587,6 +601,27 @@ func (t *UDPv5) startResponseTimeout(c *callV5) { close(done) } +// sendCallWithoutResponse send a talk request contains utp packet by call +func (t *UDPv5) sendCallWithoutResponse(r *sendRequest) { + // send out a TalkRequest that is a UTP packet for portal network + // request should be cached to handle WHOAREYOU + c := &callV5{id: r.destID, addr: r.destAddr} + c.node = r.destNode + c.packet = r.msg + c.reqid = make([]byte, 8) + c.ch = make(chan v5wire.Packet, 1) + c.err = make(chan error, 1) + c.withoutResponse = true + // Assign request ID. + crand.Read(c.reqid) + c.packet.SetRequestID(c.reqid) + + nonce, _ := t.send(c.id, c.addr, c.packet, nil) + c.nonce = nonce + t.activeCallByAuth[nonce] = c + t.startResponseTimeout(c) +} + // sendNextCall sends the next call in the call queue if there is no active call. func (t *UDPv5) sendNextCall(id enode.ID) { queue := t.callQueue[id] @@ -627,7 +662,15 @@ func (t *UDPv5) sendResponse(toID enode.ID, toAddr netip.AddrPort, packet v5wire func (t *UDPv5) sendFromAnotherThread(toID enode.ID, toAddr netip.AddrPort, packet v5wire.Packet) { select { - case t.sendCh <- sendRequest{toID, toAddr, packet}: + case t.sendCh <- sendRequest{destID: toID, destAddr: toAddr, msg: packet}: + case <-t.closeCtx.Done(): + } +} + +// SendAsCallWithoutResponse send a request packet from utp protocol +func (t *UDPv5) SendAsCallWithoutResponse(n *enode.Node, toAddr netip.AddrPort, packet v5wire.Packet) { + select { + case t.sendCh <- sendRequest{n.ID(), toAddr, packet, n, true}: case <-t.closeCtx.Done(): } }