send a call without waiting for a response

Signed-off-by: thinkAfCod <q315xia@163.com>
This commit is contained in:
thinkAfCod 2025-02-10 17:18:30 +08:00
parent 4cda8f06ea
commit 2e8beef782
1 changed files with 48 additions and 5 deletions

View File

@ -101,9 +101,11 @@ type UDPv5 struct {
}
type sendRequest struct {
destID enode.ID
destAddr netip.AddrPort
msg v5wire.Packet
destID enode.ID
destAddr netip.AddrPort
msg v5wire.Packet
destNode *enode.Node
asCallType bool
}
// callV5 represents a remote procedure call against another node.
@ -123,6 +125,9 @@ type callV5 struct {
handshakeCount int // # times we attempted handshake for this call
challenge *v5wire.Whoareyou // last sent handshake challenge
timeout mclock.Timer
// marked known as utp packet
withoutResponse bool
}
// callTimeout is the response timeout event of a call.
@ -530,6 +535,10 @@ func (t *UDPv5) dispatch() {
if ct.c == active && ct.timer == active.timeout {
ct.c.err <- errTimeout
}
if ct.c.withoutResponse {
delete(t.activeCallByAuth, ct.c.nonce)
ct.c.timeout.Stop()
}
case c := <-t.callDoneCh:
active := t.activeCallByNode[c.id]
@ -542,7 +551,12 @@ func (t *UDPv5) dispatch() {
t.sendNextCall(c.id)
case r := <-t.sendCh:
t.send(r.destID, r.destAddr, r.msg, nil)
if r.asCallType {
// send a TalkResponse out
t.send(r.destID, r.destAddr, r.msg, nil)
} else {
t.sendCallWithoutResponse(&r)
}
case p := <-t.packetInCh:
t.handlePacket(p.Data, p.Addr)
@ -587,6 +601,27 @@ func (t *UDPv5) startResponseTimeout(c *callV5) {
close(done)
}
// sendCallWithoutResponse send a talk request contains utp packet by call
func (t *UDPv5) sendCallWithoutResponse(r *sendRequest) {
// send out a TalkRequest that is a UTP packet for portal network
// request should be cached to handle WHOAREYOU
c := &callV5{id: r.destID, addr: r.destAddr}
c.node = r.destNode
c.packet = r.msg
c.reqid = make([]byte, 8)
c.ch = make(chan v5wire.Packet, 1)
c.err = make(chan error, 1)
c.withoutResponse = true
// Assign request ID.
crand.Read(c.reqid)
c.packet.SetRequestID(c.reqid)
nonce, _ := t.send(c.id, c.addr, c.packet, nil)
c.nonce = nonce
t.activeCallByAuth[nonce] = c
t.startResponseTimeout(c)
}
// sendNextCall sends the next call in the call queue if there is no active call.
func (t *UDPv5) sendNextCall(id enode.ID) {
queue := t.callQueue[id]
@ -627,7 +662,15 @@ func (t *UDPv5) sendResponse(toID enode.ID, toAddr netip.AddrPort, packet v5wire
func (t *UDPv5) sendFromAnotherThread(toID enode.ID, toAddr netip.AddrPort, packet v5wire.Packet) {
select {
case t.sendCh <- sendRequest{toID, toAddr, packet}:
case t.sendCh <- sendRequest{destID: toID, destAddr: toAddr, msg: packet}:
case <-t.closeCtx.Done():
}
}
// SendAsCallWithoutResponse send a request packet from utp protocol
func (t *UDPv5) SendAsCallWithoutResponse(n *enode.Node, toAddr netip.AddrPort, packet v5wire.Packet) {
select {
case t.sendCh <- sendRequest{n.ID(), toAddr, packet, n, true}:
case <-t.closeCtx.Done():
}
}