eth: implement eth/68 (#25980)
* eth: implement eth/68 * eth/protocols/eth: added tx size to announcement * eth/protocols/eth: check equal lengths on receiving announcement * eth/protocols/eth: add +1 to tx size because of the type byte * eth: happy lint, add eth68 tests, enable eth68 * eth: various nitpick fixes on eth/68 * eth/protocols/eth: fix announced tx size wrt type byte Co-authored-by: MariusVanDerWijden <m.vanderwijden@live.de> Co-authored-by: Péter Szilágyi <peterke@gmail.com>
This commit is contained in:
parent
5329aa3786
commit
b0d44338bb
|
@ -127,7 +127,7 @@ func (msg NewBlock) Code() int { return 23 }
|
||||||
func (msg NewBlock) ReqID() uint64 { return 0 }
|
func (msg NewBlock) ReqID() uint64 { return 0 }
|
||||||
|
|
||||||
// NewPooledTransactionHashes is the network packet for the tx hash propagation message.
|
// NewPooledTransactionHashes is the network packet for the tx hash propagation message.
|
||||||
type NewPooledTransactionHashes eth.NewPooledTransactionHashesPacket
|
type NewPooledTransactionHashes eth.NewPooledTransactionHashesPacket66
|
||||||
|
|
||||||
func (msg NewPooledTransactionHashes) Code() int { return 24 }
|
func (msg NewPooledTransactionHashes) Code() int { return 24 }
|
||||||
func (msg NewPooledTransactionHashes) ReqID() uint64 { return 0 }
|
func (msg NewPooledTransactionHashes) ReqID() uint64 { return 0 }
|
||||||
|
|
|
@ -67,9 +67,12 @@ func (h *ethHandler) Handle(peer *eth.Peer, packet eth.Packet) error {
|
||||||
case *eth.NewBlockPacket:
|
case *eth.NewBlockPacket:
|
||||||
return h.handleBlockBroadcast(peer, packet.Block, packet.TD)
|
return h.handleBlockBroadcast(peer, packet.Block, packet.TD)
|
||||||
|
|
||||||
case *eth.NewPooledTransactionHashesPacket:
|
case *eth.NewPooledTransactionHashesPacket66:
|
||||||
return h.txFetcher.Notify(peer.ID(), *packet)
|
return h.txFetcher.Notify(peer.ID(), *packet)
|
||||||
|
|
||||||
|
case *eth.NewPooledTransactionHashesPacket68:
|
||||||
|
return h.txFetcher.Notify(peer.ID(), packet.Hashes)
|
||||||
|
|
||||||
case *eth.TransactionsPacket:
|
case *eth.TransactionsPacket:
|
||||||
return h.txFetcher.Enqueue(peer.ID(), *packet, false)
|
return h.txFetcher.Enqueue(peer.ID(), *packet, false)
|
||||||
|
|
||||||
|
|
|
@ -61,10 +61,14 @@ func (h *testEthHandler) Handle(peer *eth.Peer, packet eth.Packet) error {
|
||||||
h.blockBroadcasts.Send(packet.Block)
|
h.blockBroadcasts.Send(packet.Block)
|
||||||
return nil
|
return nil
|
||||||
|
|
||||||
case *eth.NewPooledTransactionHashesPacket:
|
case *eth.NewPooledTransactionHashesPacket66:
|
||||||
h.txAnnounces.Send(([]common.Hash)(*packet))
|
h.txAnnounces.Send(([]common.Hash)(*packet))
|
||||||
return nil
|
return nil
|
||||||
|
|
||||||
|
case *eth.NewPooledTransactionHashesPacket68:
|
||||||
|
h.txAnnounces.Send(packet.Hashes)
|
||||||
|
return nil
|
||||||
|
|
||||||
case *eth.TransactionsPacket:
|
case *eth.TransactionsPacket:
|
||||||
h.txBroadcasts.Send(([]*types.Transaction)(*packet))
|
h.txBroadcasts.Send(([]*types.Transaction)(*packet))
|
||||||
return nil
|
return nil
|
||||||
|
@ -81,6 +85,8 @@ func (h *testEthHandler) Handle(peer *eth.Peer, packet eth.Packet) error {
|
||||||
// Tests that peers are correctly accepted (or rejected) based on the advertised
|
// Tests that peers are correctly accepted (or rejected) based on the advertised
|
||||||
// fork IDs in the protocol handshake.
|
// fork IDs in the protocol handshake.
|
||||||
func TestForkIDSplit66(t *testing.T) { testForkIDSplit(t, eth.ETH66) }
|
func TestForkIDSplit66(t *testing.T) { testForkIDSplit(t, eth.ETH66) }
|
||||||
|
func TestForkIDSplit67(t *testing.T) { testForkIDSplit(t, eth.ETH67) }
|
||||||
|
func TestForkIDSplit68(t *testing.T) { testForkIDSplit(t, eth.ETH68) }
|
||||||
|
|
||||||
func testForkIDSplit(t *testing.T, protocol uint) {
|
func testForkIDSplit(t *testing.T, protocol uint) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
@ -235,6 +241,8 @@ func testForkIDSplit(t *testing.T, protocol uint) {
|
||||||
|
|
||||||
// Tests that received transactions are added to the local pool.
|
// Tests that received transactions are added to the local pool.
|
||||||
func TestRecvTransactions66(t *testing.T) { testRecvTransactions(t, eth.ETH66) }
|
func TestRecvTransactions66(t *testing.T) { testRecvTransactions(t, eth.ETH66) }
|
||||||
|
func TestRecvTransactions67(t *testing.T) { testRecvTransactions(t, eth.ETH67) }
|
||||||
|
func TestRecvTransactions68(t *testing.T) { testRecvTransactions(t, eth.ETH68) }
|
||||||
|
|
||||||
func testRecvTransactions(t *testing.T, protocol uint) {
|
func testRecvTransactions(t *testing.T, protocol uint) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
@ -292,6 +300,8 @@ func testRecvTransactions(t *testing.T, protocol uint) {
|
||||||
|
|
||||||
// This test checks that pending transactions are sent.
|
// This test checks that pending transactions are sent.
|
||||||
func TestSendTransactions66(t *testing.T) { testSendTransactions(t, eth.ETH66) }
|
func TestSendTransactions66(t *testing.T) { testSendTransactions(t, eth.ETH66) }
|
||||||
|
func TestSendTransactions67(t *testing.T) { testSendTransactions(t, eth.ETH67) }
|
||||||
|
func TestSendTransactions68(t *testing.T) { testSendTransactions(t, eth.ETH68) }
|
||||||
|
|
||||||
func testSendTransactions(t *testing.T, protocol uint) {
|
func testSendTransactions(t *testing.T, protocol uint) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
@ -350,7 +360,7 @@ func testSendTransactions(t *testing.T, protocol uint) {
|
||||||
seen := make(map[common.Hash]struct{})
|
seen := make(map[common.Hash]struct{})
|
||||||
for len(seen) < len(insert) {
|
for len(seen) < len(insert) {
|
||||||
switch protocol {
|
switch protocol {
|
||||||
case 66:
|
case 66, 67, 68:
|
||||||
select {
|
select {
|
||||||
case hashes := <-anns:
|
case hashes := <-anns:
|
||||||
for _, hash := range hashes {
|
for _, hash := range hashes {
|
||||||
|
@ -377,6 +387,8 @@ func testSendTransactions(t *testing.T, protocol uint) {
|
||||||
// Tests that transactions get propagated to all attached peers, either via direct
|
// Tests that transactions get propagated to all attached peers, either via direct
|
||||||
// broadcasts or via announcements/retrievals.
|
// broadcasts or via announcements/retrievals.
|
||||||
func TestTransactionPropagation66(t *testing.T) { testTransactionPropagation(t, eth.ETH66) }
|
func TestTransactionPropagation66(t *testing.T) { testTransactionPropagation(t, eth.ETH66) }
|
||||||
|
func TestTransactionPropagation67(t *testing.T) { testTransactionPropagation(t, eth.ETH67) }
|
||||||
|
func TestTransactionPropagation68(t *testing.T) { testTransactionPropagation(t, eth.ETH68) }
|
||||||
|
|
||||||
func testTransactionPropagation(t *testing.T, protocol uint) {
|
func testTransactionPropagation(t *testing.T, protocol uint) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
@ -678,6 +690,8 @@ func testBroadcastBlock(t *testing.T, peers, bcasts int) {
|
||||||
// Tests that a propagated malformed block (uncles or transactions don't match
|
// Tests that a propagated malformed block (uncles or transactions don't match
|
||||||
// with the hashes in the header) gets discarded and not broadcast forward.
|
// with the hashes in the header) gets discarded and not broadcast forward.
|
||||||
func TestBroadcastMalformedBlock66(t *testing.T) { testBroadcastMalformedBlock(t, eth.ETH66) }
|
func TestBroadcastMalformedBlock66(t *testing.T) { testBroadcastMalformedBlock(t, eth.ETH66) }
|
||||||
|
func TestBroadcastMalformedBlock67(t *testing.T) { testBroadcastMalformedBlock(t, eth.ETH67) }
|
||||||
|
func TestBroadcastMalformedBlock68(t *testing.T) { testBroadcastMalformedBlock(t, eth.ETH68) }
|
||||||
|
|
||||||
func testBroadcastMalformedBlock(t *testing.T, protocol uint) {
|
func testBroadcastMalformedBlock(t *testing.T, protocol uint) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
|
@ -142,13 +142,17 @@ func (p *Peer) announceTransactions() {
|
||||||
if done == nil && len(queue) > 0 {
|
if done == nil && len(queue) > 0 {
|
||||||
// Pile transaction hashes until we reach our allowed network limit
|
// Pile transaction hashes until we reach our allowed network limit
|
||||||
var (
|
var (
|
||||||
count int
|
count int
|
||||||
pending []common.Hash
|
pending []common.Hash
|
||||||
size common.StorageSize
|
pendingTypes []byte
|
||||||
|
pendingSizes []uint32
|
||||||
|
size common.StorageSize
|
||||||
)
|
)
|
||||||
for count = 0; count < len(queue) && size < maxTxPacketSize; count++ {
|
for count = 0; count < len(queue) && size < maxTxPacketSize; count++ {
|
||||||
if p.txpool.Get(queue[count]) != nil {
|
if tx := p.txpool.Get(queue[count]); tx != nil {
|
||||||
pending = append(pending, queue[count])
|
pending = append(pending, queue[count])
|
||||||
|
pendingTypes = append(pendingTypes, tx.Type())
|
||||||
|
pendingSizes = append(pendingSizes, uint32(tx.Size()))
|
||||||
size += common.HashLength
|
size += common.HashLength
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -159,9 +163,16 @@ func (p *Peer) announceTransactions() {
|
||||||
if len(pending) > 0 {
|
if len(pending) > 0 {
|
||||||
done = make(chan struct{})
|
done = make(chan struct{})
|
||||||
go func() {
|
go func() {
|
||||||
if err := p.sendPooledTransactionHashes(pending); err != nil {
|
if p.version >= ETH68 {
|
||||||
fail <- err
|
if err := p.sendPooledTransactionHashes68(pending, pendingTypes, pendingSizes); err != nil {
|
||||||
return
|
fail <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if err := p.sendPooledTransactionHashes66(pending); err != nil {
|
||||||
|
fail <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
close(done)
|
close(done)
|
||||||
p.Log().Trace("Sent transaction announcements", "count", len(pending))
|
p.Log().Trace("Sent transaction announcements", "count", len(pending))
|
||||||
|
|
|
@ -168,7 +168,7 @@ var eth66 = map[uint64]msgHandler{
|
||||||
NewBlockHashesMsg: handleNewBlockhashes,
|
NewBlockHashesMsg: handleNewBlockhashes,
|
||||||
NewBlockMsg: handleNewBlock,
|
NewBlockMsg: handleNewBlock,
|
||||||
TransactionsMsg: handleTransactions,
|
TransactionsMsg: handleTransactions,
|
||||||
NewPooledTransactionHashesMsg: handleNewPooledTransactionHashes,
|
NewPooledTransactionHashesMsg: handleNewPooledTransactionHashes66,
|
||||||
GetBlockHeadersMsg: handleGetBlockHeaders66,
|
GetBlockHeadersMsg: handleGetBlockHeaders66,
|
||||||
BlockHeadersMsg: handleBlockHeaders66,
|
BlockHeadersMsg: handleBlockHeaders66,
|
||||||
GetBlockBodiesMsg: handleGetBlockBodies66,
|
GetBlockBodiesMsg: handleGetBlockBodies66,
|
||||||
|
@ -185,7 +185,22 @@ var eth67 = map[uint64]msgHandler{
|
||||||
NewBlockHashesMsg: handleNewBlockhashes,
|
NewBlockHashesMsg: handleNewBlockhashes,
|
||||||
NewBlockMsg: handleNewBlock,
|
NewBlockMsg: handleNewBlock,
|
||||||
TransactionsMsg: handleTransactions,
|
TransactionsMsg: handleTransactions,
|
||||||
NewPooledTransactionHashesMsg: handleNewPooledTransactionHashes,
|
NewPooledTransactionHashesMsg: handleNewPooledTransactionHashes66,
|
||||||
|
GetBlockHeadersMsg: handleGetBlockHeaders66,
|
||||||
|
BlockHeadersMsg: handleBlockHeaders66,
|
||||||
|
GetBlockBodiesMsg: handleGetBlockBodies66,
|
||||||
|
BlockBodiesMsg: handleBlockBodies66,
|
||||||
|
GetReceiptsMsg: handleGetReceipts66,
|
||||||
|
ReceiptsMsg: handleReceipts66,
|
||||||
|
GetPooledTransactionsMsg: handleGetPooledTransactions66,
|
||||||
|
PooledTransactionsMsg: handlePooledTransactions66,
|
||||||
|
}
|
||||||
|
|
||||||
|
var eth68 = map[uint64]msgHandler{
|
||||||
|
NewBlockHashesMsg: handleNewBlockhashes,
|
||||||
|
NewBlockMsg: handleNewBlock,
|
||||||
|
TransactionsMsg: handleTransactions,
|
||||||
|
NewPooledTransactionHashesMsg: handleNewPooledTransactionHashes68,
|
||||||
GetBlockHeadersMsg: handleGetBlockHeaders66,
|
GetBlockHeadersMsg: handleGetBlockHeaders66,
|
||||||
BlockHeadersMsg: handleBlockHeaders66,
|
BlockHeadersMsg: handleBlockHeaders66,
|
||||||
GetBlockBodiesMsg: handleGetBlockBodies66,
|
GetBlockBodiesMsg: handleGetBlockBodies66,
|
||||||
|
@ -210,9 +225,12 @@ func handleMessage(backend Backend, peer *Peer) error {
|
||||||
defer msg.Discard()
|
defer msg.Discard()
|
||||||
|
|
||||||
var handlers = eth66
|
var handlers = eth66
|
||||||
if peer.Version() >= ETH67 {
|
if peer.Version() == ETH67 {
|
||||||
handlers = eth67
|
handlers = eth67
|
||||||
}
|
}
|
||||||
|
if peer.Version() >= ETH68 {
|
||||||
|
handlers = eth68
|
||||||
|
}
|
||||||
|
|
||||||
// Track the amount of time it takes to serve the request and run the handler
|
// Track the amount of time it takes to serve the request and run the handler
|
||||||
if metrics.Enabled {
|
if metrics.Enabled {
|
||||||
|
|
|
@ -112,6 +112,8 @@ func (b *testBackend) Handle(*Peer, Packet) error {
|
||||||
|
|
||||||
// Tests that block headers can be retrieved from a remote chain based on user queries.
|
// Tests that block headers can be retrieved from a remote chain based on user queries.
|
||||||
func TestGetBlockHeaders66(t *testing.T) { testGetBlockHeaders(t, ETH66) }
|
func TestGetBlockHeaders66(t *testing.T) { testGetBlockHeaders(t, ETH66) }
|
||||||
|
func TestGetBlockHeaders67(t *testing.T) { testGetBlockHeaders(t, ETH67) }
|
||||||
|
func TestGetBlockHeaders68(t *testing.T) { testGetBlockHeaders(t, ETH68) }
|
||||||
|
|
||||||
func testGetBlockHeaders(t *testing.T, protocol uint) {
|
func testGetBlockHeaders(t *testing.T, protocol uint) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
@ -297,6 +299,8 @@ func testGetBlockHeaders(t *testing.T, protocol uint) {
|
||||||
|
|
||||||
// Tests that block contents can be retrieved from a remote chain based on their hashes.
|
// Tests that block contents can be retrieved from a remote chain based on their hashes.
|
||||||
func TestGetBlockBodies66(t *testing.T) { testGetBlockBodies(t, ETH66) }
|
func TestGetBlockBodies66(t *testing.T) { testGetBlockBodies(t, ETH66) }
|
||||||
|
func TestGetBlockBodies67(t *testing.T) { testGetBlockBodies(t, ETH67) }
|
||||||
|
func TestGetBlockBodies68(t *testing.T) { testGetBlockBodies(t, ETH68) }
|
||||||
|
|
||||||
func testGetBlockBodies(t *testing.T, protocol uint) {
|
func testGetBlockBodies(t *testing.T, protocol uint) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
@ -379,9 +383,11 @@ func testGetBlockBodies(t *testing.T, protocol uint) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Tests that the state trie nodes can be retrieved based on hashes.
|
// Tests that the state trie nodes can be retrieved based on hashes.
|
||||||
func TestGetNodeData66(t *testing.T) { testGetNodeData(t, ETH66) }
|
func TestGetNodeData66(t *testing.T) { testGetNodeData(t, ETH66, false) }
|
||||||
|
func TestGetNodeData67(t *testing.T) { testGetNodeData(t, ETH67, true) }
|
||||||
|
func TestGetNodeData68(t *testing.T) { testGetNodeData(t, ETH68, true) }
|
||||||
|
|
||||||
func testGetNodeData(t *testing.T, protocol uint) {
|
func testGetNodeData(t *testing.T, protocol uint, drop bool) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
// Define three accounts to simulate transactions with
|
// Define three accounts to simulate transactions with
|
||||||
|
@ -442,8 +448,15 @@ func testGetNodeData(t *testing.T, protocol uint) {
|
||||||
GetNodeDataPacket: hashes,
|
GetNodeDataPacket: hashes,
|
||||||
})
|
})
|
||||||
msg, err := peer.app.ReadMsg()
|
msg, err := peer.app.ReadMsg()
|
||||||
if err != nil {
|
if !drop {
|
||||||
t.Fatalf("failed to read node data response: %v", err)
|
if err != nil {
|
||||||
|
t.Fatalf("failed to read node data response: %v", err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
t.Fatalf("succeeded to read node data response on non-supporting protocol: %v", msg)
|
||||||
}
|
}
|
||||||
if msg.Code != NodeDataMsg {
|
if msg.Code != NodeDataMsg {
|
||||||
t.Fatalf("response packet code mismatch: have %x, want %x", msg.Code, NodeDataMsg)
|
t.Fatalf("response packet code mismatch: have %x, want %x", msg.Code, NodeDataMsg)
|
||||||
|
@ -489,6 +502,8 @@ func testGetNodeData(t *testing.T, protocol uint) {
|
||||||
|
|
||||||
// Tests that the transaction receipts can be retrieved based on hashes.
|
// Tests that the transaction receipts can be retrieved based on hashes.
|
||||||
func TestGetBlockReceipts66(t *testing.T) { testGetBlockReceipts(t, ETH66) }
|
func TestGetBlockReceipts66(t *testing.T) { testGetBlockReceipts(t, ETH66) }
|
||||||
|
func TestGetBlockReceipts67(t *testing.T) { testGetBlockReceipts(t, ETH67) }
|
||||||
|
func TestGetBlockReceipts68(t *testing.T) { testGetBlockReceipts(t, ETH68) }
|
||||||
|
|
||||||
func testGetBlockReceipts(t *testing.T, protocol uint) {
|
func testGetBlockReceipts(t *testing.T, protocol uint) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
|
@ -430,13 +430,13 @@ func handleReceipts66(backend Backend, msg Decoder, peer *Peer) error {
|
||||||
}, metadata)
|
}, metadata)
|
||||||
}
|
}
|
||||||
|
|
||||||
func handleNewPooledTransactionHashes(backend Backend, msg Decoder, peer *Peer) error {
|
func handleNewPooledTransactionHashes66(backend Backend, msg Decoder, peer *Peer) error {
|
||||||
// New transaction announcement arrived, make sure we have
|
// New transaction announcement arrived, make sure we have
|
||||||
// a valid and fresh chain to handle them
|
// a valid and fresh chain to handle them
|
||||||
if !backend.AcceptTxs() {
|
if !backend.AcceptTxs() {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
ann := new(NewPooledTransactionHashesPacket)
|
ann := new(NewPooledTransactionHashesPacket66)
|
||||||
if err := msg.Decode(ann); err != nil {
|
if err := msg.Decode(ann); err != nil {
|
||||||
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
|
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
|
||||||
}
|
}
|
||||||
|
@ -447,6 +447,26 @@ func handleNewPooledTransactionHashes(backend Backend, msg Decoder, peer *Peer)
|
||||||
return backend.Handle(peer, ann)
|
return backend.Handle(peer, ann)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func handleNewPooledTransactionHashes68(backend Backend, msg Decoder, peer *Peer) error {
|
||||||
|
// New transaction announcement arrived, make sure we have
|
||||||
|
// a valid and fresh chain to handle them
|
||||||
|
if !backend.AcceptTxs() {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
ann := new(NewPooledTransactionHashesPacket68)
|
||||||
|
if err := msg.Decode(ann); err != nil {
|
||||||
|
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
|
||||||
|
}
|
||||||
|
if len(ann.Hashes) != len(ann.Types) || len(ann.Hashes) != len(ann.Sizes) {
|
||||||
|
return fmt.Errorf("%w: message %v: invalid len of fields: %v %v %v", errDecode, msg, len(ann.Hashes), len(ann.Types), len(ann.Sizes))
|
||||||
|
}
|
||||||
|
// Schedule all the unknown hashes for retrieval
|
||||||
|
for _, hash := range ann.Hashes {
|
||||||
|
peer.markTransaction(hash)
|
||||||
|
}
|
||||||
|
return backend.Handle(peer, ann)
|
||||||
|
}
|
||||||
|
|
||||||
func handleGetPooledTransactions66(backend Backend, msg Decoder, peer *Peer) error {
|
func handleGetPooledTransactions66(backend Backend, msg Decoder, peer *Peer) error {
|
||||||
// Decode the pooled transactions retrieval message
|
// Decode the pooled transactions retrieval message
|
||||||
var query GetPooledTransactionsPacket66
|
var query GetPooledTransactionsPacket66
|
||||||
|
|
|
@ -210,16 +210,29 @@ func (p *Peer) AsyncSendTransactions(hashes []common.Hash) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// sendPooledTransactionHashes sends transaction hashes to the peer and includes
|
// sendPooledTransactionHashes66 sends transaction hashes to the peer and includes
|
||||||
// them in its transaction hash set for future reference.
|
// them in its transaction hash set for future reference.
|
||||||
//
|
//
|
||||||
// This method is a helper used by the async transaction announcer. Don't call it
|
// This method is a helper used by the async transaction announcer. Don't call it
|
||||||
// directly as the queueing (memory) and transmission (bandwidth) costs should
|
// directly as the queueing (memory) and transmission (bandwidth) costs should
|
||||||
// not be managed directly.
|
// not be managed directly.
|
||||||
func (p *Peer) sendPooledTransactionHashes(hashes []common.Hash) error {
|
func (p *Peer) sendPooledTransactionHashes66(hashes []common.Hash) error {
|
||||||
// Mark all the transactions as known, but ensure we don't overflow our limits
|
// Mark all the transactions as known, but ensure we don't overflow our limits
|
||||||
p.knownTxs.Add(hashes...)
|
p.knownTxs.Add(hashes...)
|
||||||
return p2p.Send(p.rw, NewPooledTransactionHashesMsg, NewPooledTransactionHashesPacket(hashes))
|
return p2p.Send(p.rw, NewPooledTransactionHashesMsg, NewPooledTransactionHashesPacket66(hashes))
|
||||||
|
}
|
||||||
|
|
||||||
|
// sendPooledTransactionHashes68 sends transaction hashes (tagged with their type
|
||||||
|
// and size) to the peer and includes them in its transaction hash set for future
|
||||||
|
// reference.
|
||||||
|
//
|
||||||
|
// This method is a helper used by the async transaction announcer. Don't call it
|
||||||
|
// directly as the queueing (memory) and transmission (bandwidth) costs should
|
||||||
|
// not be managed directly.
|
||||||
|
func (p *Peer) sendPooledTransactionHashes68(hashes []common.Hash, types []byte, sizes []uint32) error {
|
||||||
|
// Mark all the transactions as known, but ensure we don't overflow our limits
|
||||||
|
p.knownTxs.Add(hashes...)
|
||||||
|
return p2p.Send(p.rw, NewPooledTransactionHashesMsg, NewPooledTransactionHashesPacket68{Types: types, Sizes: sizes, Hashes: hashes})
|
||||||
}
|
}
|
||||||
|
|
||||||
// AsyncSendPooledTransactionHashes queues a list of transactions hashes to eventually
|
// AsyncSendPooledTransactionHashes queues a list of transactions hashes to eventually
|
||||||
|
|
|
@ -48,6 +48,8 @@ func newTestPeer(name string, version uint, backend Backend) (*testPeer, <-chan
|
||||||
peer := NewPeer(version, p2p.NewPeer(id, name, nil), net, backend.TxPool())
|
peer := NewPeer(version, p2p.NewPeer(id, name, nil), net, backend.TxPool())
|
||||||
errc := make(chan error, 1)
|
errc := make(chan error, 1)
|
||||||
go func() {
|
go func() {
|
||||||
|
defer app.Close()
|
||||||
|
|
||||||
errc <- backend.RunPeer(peer, func(peer *Peer) error {
|
errc <- backend.RunPeer(peer, func(peer *Peer) error {
|
||||||
return Handle(backend, peer)
|
return Handle(backend, peer)
|
||||||
})
|
})
|
||||||
|
|
|
@ -32,6 +32,7 @@ import (
|
||||||
const (
|
const (
|
||||||
ETH66 = 66
|
ETH66 = 66
|
||||||
ETH67 = 67
|
ETH67 = 67
|
||||||
|
ETH68 = 68
|
||||||
)
|
)
|
||||||
|
|
||||||
// ProtocolName is the official short name of the `eth` protocol used during
|
// ProtocolName is the official short name of the `eth` protocol used during
|
||||||
|
@ -40,11 +41,11 @@ const ProtocolName = "eth"
|
||||||
|
|
||||||
// ProtocolVersions are the supported versions of the `eth` protocol (first
|
// ProtocolVersions are the supported versions of the `eth` protocol (first
|
||||||
// is primary).
|
// is primary).
|
||||||
var ProtocolVersions = []uint{ETH67, ETH66}
|
var ProtocolVersions = []uint{ETH68, ETH67, ETH66}
|
||||||
|
|
||||||
// protocolLengths are the number of implemented message corresponding to
|
// protocolLengths are the number of implemented message corresponding to
|
||||||
// different protocol versions.
|
// different protocol versions.
|
||||||
var protocolLengths = map[uint]uint64{ETH67: 17, ETH66: 17}
|
var protocolLengths = map[uint]uint64{ETH68: 17, ETH67: 17, ETH66: 17}
|
||||||
|
|
||||||
// maxMessageSize is the maximum cap on the size of a protocol message.
|
// maxMessageSize is the maximum cap on the size of a protocol message.
|
||||||
const maxMessageSize = 10 * 1024 * 1024
|
const maxMessageSize = 10 * 1024 * 1024
|
||||||
|
@ -298,8 +299,15 @@ type ReceiptsRLPPacket66 struct {
|
||||||
ReceiptsRLPPacket
|
ReceiptsRLPPacket
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewPooledTransactionHashesPacket represents a transaction announcement packet.
|
// NewPooledTransactionHashesPacket66 represents a transaction announcement packet on eth/66 and eth/67.
|
||||||
type NewPooledTransactionHashesPacket []common.Hash
|
type NewPooledTransactionHashesPacket66 []common.Hash
|
||||||
|
|
||||||
|
// NewPooledTransactionHashesPacket68 represents a transaction announcement packet on eth/68 and newer.
|
||||||
|
type NewPooledTransactionHashesPacket68 struct {
|
||||||
|
Types []byte
|
||||||
|
Sizes []uint32
|
||||||
|
Hashes []common.Hash
|
||||||
|
}
|
||||||
|
|
||||||
// GetPooledTransactionsPacket represents a transaction query.
|
// GetPooledTransactionsPacket represents a transaction query.
|
||||||
type GetPooledTransactionsPacket []common.Hash
|
type GetPooledTransactionsPacket []common.Hash
|
||||||
|
@ -364,8 +372,11 @@ func (*GetReceiptsPacket) Kind() byte { return GetReceiptsMsg }
|
||||||
func (*ReceiptsPacket) Name() string { return "Receipts" }
|
func (*ReceiptsPacket) Name() string { return "Receipts" }
|
||||||
func (*ReceiptsPacket) Kind() byte { return ReceiptsMsg }
|
func (*ReceiptsPacket) Kind() byte { return ReceiptsMsg }
|
||||||
|
|
||||||
func (*NewPooledTransactionHashesPacket) Name() string { return "NewPooledTransactionHashes" }
|
func (*NewPooledTransactionHashesPacket66) Name() string { return "NewPooledTransactionHashes" }
|
||||||
func (*NewPooledTransactionHashesPacket) Kind() byte { return NewPooledTransactionHashesMsg }
|
func (*NewPooledTransactionHashesPacket66) Kind() byte { return NewPooledTransactionHashesMsg }
|
||||||
|
|
||||||
|
func (*NewPooledTransactionHashesPacket68) Name() string { return "NewPooledTransactionHashes" }
|
||||||
|
func (*NewPooledTransactionHashesPacket68) Kind() byte { return NewPooledTransactionHashesMsg }
|
||||||
|
|
||||||
func (*GetPooledTransactionsPacket) Name() string { return "GetPooledTransactions" }
|
func (*GetPooledTransactionsPacket) Name() string { return "GetPooledTransactions" }
|
||||||
func (*GetPooledTransactionsPacket) Kind() byte { return GetPooledTransactionsMsg }
|
func (*GetPooledTransactionsPacket) Kind() byte { return GetPooledTransactionsMsg }
|
||||||
|
|
Loading…
Reference in New Issue