This commit is contained in:
Marius van der Wijden 2024-11-25 14:13:51 +01:00 committed by GitHub
commit 4c10212e02
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 312 additions and 23 deletions

View File

@ -66,10 +66,10 @@ func (s *Suite) dialAs(key *ecdsa.PrivateKey) (*Conn, error) {
return nil, err
}
conn.caps = []p2p.Cap{
{Name: "eth", Version: 67},
{Name: "eth", Version: 68},
{Name: "eth", Version: 69},
}
conn.ourHighestProtoVersion = 68
conn.ourHighestProtoVersion = 69
return &conn, nil
}
@ -316,8 +316,10 @@ loop:
return fmt.Errorf("wrong head block in status, want: %#x (block %d) have %#x",
want, chain.blocks[chain.Len()-1].NumberU64(), have)
}
if have, want := msg.TD.Cmp(chain.TD()), 0; have != want {
return fmt.Errorf("wrong TD in status: have %v want %v", have, want)
if c.negotiatedProtoVersion < 69 {
if have, want := msg.TD.Cmp(chain.TD()), 0; have != want {
return fmt.Errorf("wrong TD in status: have %v want %v", have, want)
}
}
if have, want := msg.ForkID, chain.ForkID(); !reflect.DeepEqual(have, want) {
return fmt.Errorf("wrong fork ID in status: have %v, want %v", have, want)

View File

@ -230,6 +230,20 @@ func (bc *BlockChain) GetReceiptsByHash(hash common.Hash) types.Receipts {
return receipts
}
// GetRawReceiptsByHash retrieves the receipts for all transactions in a given block
// without deriving the internal fields and the Bloom.
func (bc *BlockChain) GetRawReceiptsByHash(hash common.Hash) rlp.RawValue {
number := rawdb.ReadHeaderNumber(bc.db, hash)
if number == nil {
return nil
}
receipts := rawdb.ReadReceiptsRLP(bc.db, hash, *number)
if receipts == nil {
return nil
}
return receipts
}
// GetUnclesInChain retrieves all the uncles from a given block backwards until
// a specific distance is reached.
func (bc *BlockChain) GetUnclesInChain(block *types.Block, length int) []*types.Header {

View File

@ -585,8 +585,8 @@ func ReadReceiptsRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawVa
}
// ReadRawReceipts retrieves all the transaction receipts belonging to a block.
// The receipt metadata fields are not guaranteed to be populated, so they
// should not be used. Use ReadReceipts instead if the metadata is needed.
// The receipt metadata fields and the Bloom are not guaranteed to be populated,
// so they should not be used. Use ReadReceipts instead if the metadata is needed.
func ReadRawReceipts(db ethdb.Reader, hash common.Hash, number uint64) types.Receipts {
// Retrieve the flattened receipt slice
data := ReadReceiptsRLP(db, hash, number)

View File

@ -400,7 +400,11 @@ func TestBlockReceiptStorage(t *testing.T) {
t.Fatalf("receipts returned when body was deleted: %v", rs)
}
// Ensure that receipts without metadata can be returned without the block body too
if err := checkReceiptsRLP(ReadRawReceipts(db, hash, 0), receipts); err != nil {
raw := ReadRawReceipts(db, hash, 0)
for _, r := range raw {
r.Bloom = types.CreateBloom(types.Receipts{r})
}
if err := checkReceiptsRLP(raw, receipts); err != nil {
t.Fatal(err)
}
// Sanity check that body alone without the receipt is a full purge

View File

@ -258,7 +258,7 @@ func (r *Receipt) Size() common.StorageSize {
}
// ReceiptForStorage is a wrapper around a Receipt with RLP serialization
// that omits the Bloom field and deserialization that re-computes it.
// that omits the Bloom field. The Bloom field is recomputed by DeriveFields.
type ReceiptForStorage Receipt
// EncodeRLP implements rlp.Encoder, and flattens all content fields of a receipt
@ -291,7 +291,6 @@ func (r *ReceiptForStorage) DecodeRLP(s *rlp.Stream) error {
}
r.CumulativeGasUsed = stored.CumulativeGasUsed
r.Logs = stored.Logs
r.Bloom = CreateBloom(Receipts{(*Receipt)(r)})
return nil
}
@ -372,6 +371,9 @@ func (rs Receipts) DeriveFields(config *params.ChainConfig, hash common.Hash, nu
rs[i].Logs[j].Index = logIndex
logIndex++
}
// also derive the Bloom if not derived yet
rs[i].Bloom = CreateBloom(Receipts{rs[i]})
}
return nil
}

View File

@ -296,6 +296,13 @@ var (
}
)
func init() {
// Correctly compute the bloom filters
for _, receipt := range receipts {
receipt.Bloom = CreateBloom(Receipts{receipt})
}
}
func TestDecodeEmptyTypedReceipt(t *testing.T) {
input := []byte{0x80}
var r Receipt
@ -511,6 +518,7 @@ func clearComputedFieldsOnReceipt(receipt *Receipt) *Receipt {
cpy.EffectiveGasPrice = big.NewInt(0)
cpy.BlobGasUsed = 0
cpy.BlobGasPrice = nil
cpy.Bloom = CreateBloom(Receipts{&cpy})
return &cpy
}

View File

@ -262,7 +262,7 @@ func (dlp *downloadTesterPeer) RequestBodies(hashes []common.Hash, sink chan *et
// peer in the download tester. The returned function can be used to retrieve
// batches of block receipts from the particularly requested peer.
func (dlp *downloadTesterPeer) RequestReceipts(hashes []common.Hash, sink chan *eth.Response) (*eth.Request, error) {
blobs := eth.ServiceGetReceiptsQuery(dlp.chain, hashes)
blobs := eth.ServiceGetReceiptsQuery68(dlp.chain, hashes)
receipts := make([][]*types.Receipt, len(blobs))
for i, blob := range blobs {

View File

@ -168,8 +168,21 @@ var eth68 = map[uint64]msgHandler{
BlockHeadersMsg: handleBlockHeaders,
GetBlockBodiesMsg: handleGetBlockBodies,
BlockBodiesMsg: handleBlockBodies,
GetReceiptsMsg: handleGetReceipts,
ReceiptsMsg: handleReceipts,
GetReceiptsMsg: handleGetReceipts68,
ReceiptsMsg: handleReceipts68,
GetPooledTransactionsMsg: handleGetPooledTransactions,
PooledTransactionsMsg: handlePooledTransactions,
}
var eth69 = map[uint64]msgHandler{
TransactionsMsg: handleTransactions,
NewPooledTransactionHashesMsg: handleNewPooledTransactionHashes,
GetBlockHeadersMsg: handleGetBlockHeaders,
BlockHeadersMsg: handleBlockHeaders,
GetBlockBodiesMsg: handleGetBlockBodies,
BlockBodiesMsg: handleBlockBodies,
GetReceiptsMsg: handleGetReceipts69,
ReceiptsMsg: handleReceipts69,
GetPooledTransactionsMsg: handleGetPooledTransactions,
PooledTransactionsMsg: handlePooledTransactions,
}
@ -187,7 +200,14 @@ func handleMessage(backend Backend, peer *Peer) error {
}
defer msg.Discard()
var handlers = eth68
var handlers map[uint64]msgHandler
if peer.version == ETH68 {
handlers = eth68
} else if peer.version == ETH69 {
handlers = eth69
} else {
return fmt.Errorf("unknown eth protocol version: %v", peer.version)
}
// Track the amount of time it takes to serve the request and run the handler
if metrics.Enabled {

View File

@ -576,3 +576,41 @@ func FuzzEthProtocolHandlers(f *testing.F) {
handler(backend, decoder{msg: msg}, peer.Peer)
})
}
func TestTransformReceipts(t *testing.T) {
tests := []struct {
input []types.ReceiptForStorage
txs []*types.Transaction
output []receiptForNetwork
}{
{
input: []types.ReceiptForStorage{{CumulativeGasUsed: 123, Status: 1, Logs: nil}},
txs: []*types.Transaction{types.NewTx(&types.LegacyTx{})},
output: []receiptForNetwork{{GasUsed: 123, Status: 1, Logs: nil}},
},
{
input: []types.ReceiptForStorage{{CumulativeGasUsed: 123, Status: 1, Logs: nil}},
txs: []*types.Transaction{types.NewTx(&types.DynamicFeeTx{})},
output: []receiptForNetwork{{GasUsed: 123, Status: 1, Logs: nil, Type: 2}},
},
{
input: []types.ReceiptForStorage{{CumulativeGasUsed: 123, Status: 1, Logs: nil}},
txs: []*types.Transaction{types.NewTx(&types.AccessListTx{})},
output: []receiptForNetwork{{GasUsed: 123, Status: 1, Logs: nil, Type: 1}},
},
{
input: []types.ReceiptForStorage{{CumulativeGasUsed: 123, Status: 1, Logs: []*types.Log{{Address: common.Address{1}, Topics: []common.Hash{{1}}}}}},
txs: []*types.Transaction{types.NewTx(&types.AccessListTx{})},
output: []receiptForNetwork{{GasUsed: 123, Status: 1, Logs: []*types.Log{{Address: common.Address{1}, Topics: []common.Hash{{1}}}}, Type: 1}},
},
}
for i, test := range tests {
in, _ := rlp.EncodeToBytes(test.input)
have := transformReceipts(in, test.txs)
out, _ := rlp.EncodeToBytes(test.output)
if !bytes.Equal(have, out) {
t.Fatalf("transforming receipt mismatch, test %v: want %v have %v", i, out, have)
}
}
}

View File

@ -17,6 +17,7 @@
package eth
import (
"bytes"
"encoding/json"
"errors"
"fmt"
@ -236,19 +237,29 @@ func ServiceGetBlockBodiesQuery(chain *core.BlockChain, query GetBlockBodiesRequ
return bodies
}
func handleGetReceipts(backend Backend, msg Decoder, peer *Peer) error {
func handleGetReceipts68(backend Backend, msg Decoder, peer *Peer) error {
// Decode the block receipts retrieval message
var query GetReceiptsPacket
if err := msg.Decode(&query); err != nil {
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
}
response := ServiceGetReceiptsQuery(backend.Chain(), query.GetReceiptsRequest)
response := ServiceGetReceiptsQuery68(backend.Chain(), query.GetReceiptsRequest)
return peer.ReplyReceiptsRLP(query.RequestId, response)
}
// ServiceGetReceiptsQuery assembles the response to a receipt query. It is
func handleGetReceipts69(backend Backend, msg Decoder, peer *Peer) error {
// Decode the block receipts retrieval message
var query GetReceiptsPacket
if err := msg.Decode(&query); err != nil {
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
}
response := serviceGetReceiptsQuery69(backend.Chain(), query.GetReceiptsRequest)
return peer.ReplyReceiptsRLP(query.RequestId, response)
}
// ServiceGetReceiptsQuery68 assembles the response to a receipt query. It is
// exposed to allow external packages to test protocol behavior.
func ServiceGetReceiptsQuery(chain *core.BlockChain, query GetReceiptsRequest) []rlp.RawValue {
func ServiceGetReceiptsQuery68(chain *core.BlockChain, query GetReceiptsRequest) []rlp.RawValue {
// Gather state data until the fetch or network limits is reached
var (
bytes int
@ -277,6 +288,67 @@ func ServiceGetReceiptsQuery(chain *core.BlockChain, query GetReceiptsRequest) [
return receipts
}
// serviceGetReceiptsQuery69 assembles the response to a receipt query.
// It does not send the bloom filters for the receipts
func serviceGetReceiptsQuery69(chain *core.BlockChain, query GetReceiptsRequest) []rlp.RawValue {
// Gather state data until the fetch or network limits is reached
var (
bytes int
receipts []rlp.RawValue
)
for lookups, hash := range query {
if bytes >= softResponseLimit || len(receipts) >= maxReceiptsServe ||
lookups >= 2*maxReceiptsServe {
break
}
// Retrieve the requested block's receipts
results := chain.GetRawReceiptsByHash(hash)
if results == nil {
if header := chain.GetHeaderByHash(hash); header == nil || header.ReceiptHash != types.EmptyRootHash {
continue
}
} else {
header := chain.GetHeaderByHash(hash)
if header.ReceiptHash != types.EmptyReceiptsHash {
body := new(types.Body)
if err := rlp.DecodeBytes(chain.GetBodyRLP(hash), &body); err == nil {
results = transformReceipts(results, body.Transactions)
}
}
}
receipts = append(receipts, results)
bytes += len(results)
}
return receipts
}
// transformReceipts takes a slice of rlp-encoded receipts, and transactions,
// and applies the type-encoding on the receipts (for non-legacy receipts).
// e.g. for non-legacy receipts: receipt-data -> {tx-type || receipt-data}
func transformReceipts(blockReceipts []byte, txs []*types.Transaction) []byte {
var (
out bytes.Buffer
enc = rlp.NewEncoderBuffer(&out)
it, _ = rlp.NewListIterator(blockReceipts)
)
outer := enc.List()
for i := 0; it.Next(); i++ {
if txs[i].Type() == types.LegacyTxType {
enc.Write(it.Value())
continue
}
content, _, _ := rlp.SplitList(it.Value())
receiptList := enc.List()
enc.Write([]byte{txs[i].Type()})
enc.Write(content)
enc.ListEnd(receiptList)
}
enc.ListEnd(outer)
enc.Flush()
return out.Bytes()
}
func handleNewBlockhashes(backend Backend, msg Decoder, peer *Peer) error {
return errors.New("block announcements disallowed") // We dropped support for non-merge networks
}
@ -334,7 +406,7 @@ func handleBlockBodies(backend Backend, msg Decoder, peer *Peer) error {
}, metadata)
}
func handleReceipts(backend Backend, msg Decoder, peer *Peer) error {
func handleReceipts68(backend Backend, msg Decoder, peer *Peer) error {
// A batch of receipts arrived to one of our previous requests
res := new(ReceiptsPacket)
if err := msg.Decode(res); err != nil {
@ -355,6 +427,41 @@ func handleReceipts(backend Backend, msg Decoder, peer *Peer) error {
}, metadata)
}
func handleReceipts69(backend Backend, msg Decoder, peer *Peer) error {
// A batch of receipts arrived to one of our previous requests
res := new(ReceiptsPacket69)
if err := msg.Decode(res); err != nil {
return fmt.Errorf("%w: message %v: %v", errDecode, msg, err)
}
var response ReceiptsResponse
// calculate the bloom filter before dispatching
for _, receipts := range res.ReceiptsResponse69 {
var rec []*types.Receipt
for _, receipt := range receipts {
receipt.Bloom = types.CreateBloom(types.Receipts{(*types.Receipt)(receipt)})
rec = append(rec, (*types.Receipt)(receipt))
}
response = append(response, rec)
}
metadata := func() interface{} {
hasher := trie.NewStackTrie(nil)
hashes := make([]common.Hash, len(response))
for i, receipts := range response {
var r []*types.Receipt
for _, receipt := range receipts {
r = append(r, (*types.Receipt)(receipt))
}
hashes[i] = types.DeriveSha(types.Receipts(r), hasher)
}
return hashes
}
return peer.dispatchResponse(&Response{
id: res.RequestId,
code: ReceiptsMsg,
Res: &response,
}, metadata)
}
func handleNewPooledTransactionHashes(backend Backend, msg Decoder, peer *Peer) error {
// New transaction announcement arrived, make sure we have
// a valid and fresh chain to handle them

View File

@ -43,14 +43,17 @@ func (p *Peer) Handshake(network uint64, td *big.Int, head common.Hash, genesis
var status StatusPacket // safe to read after two values have been received from errc
go func() {
errc <- p2p.Send(p.rw, StatusMsg, &StatusPacket{
pkt := &StatusPacket{
ProtocolVersion: uint32(p.version),
NetworkID: network,
TD: td,
Head: head,
Genesis: genesis,
ForkID: forkID,
})
}
if p.version == ETH68 {
pkt.TD = td
}
errc <- p2p.Send(p.rw, StatusMsg, pkt)
}()
go func() {
errc <- p.readStatus(network, &status, genesis, forkFilter)

View File

@ -31,6 +31,7 @@ import (
// Constants to match up protocol versions and messages
const (
ETH68 = 68
ETH69 = 69
)
// ProtocolName is the official short name of the `eth` protocol used during
@ -39,11 +40,11 @@ const ProtocolName = "eth"
// ProtocolVersions are the supported versions of the `eth` protocol (first
// is primary).
var ProtocolVersions = []uint{ETH68}
var ProtocolVersions = []uint{ETH69, ETH68}
// protocolLengths are the number of implemented message corresponding to
// different protocol versions.
var protocolLengths = map[uint]uint64{ETH68: 17}
var protocolLengths = map[uint]uint64{ETH68: 17, ETH69: 17}
// maxMessageSize is the maximum cap on the size of a protocol message.
const maxMessageSize = 10 * 1024 * 1024
@ -259,6 +260,16 @@ type ReceiptsPacket struct {
ReceiptsResponse
}
// ReceiptsResponse is the network packet for block receipts distribution.
type ReceiptsResponse69 [][]*receiptForNetwork
// ReceiptsPacket is the network packet for block receipts distribution with
// request ID wrapping.
type ReceiptsPacket69 struct {
RequestId uint64
ReceiptsResponse69
}
// ReceiptsRLPResponse is used for receipts, when we already have it encoded
type ReceiptsRLPResponse []rlp.RawValue

View File

@ -0,0 +1,80 @@
// Copyright 2024 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package eth
import (
"errors"
"io"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rlp"
)
type receiptForNetwork types.Receipt
func (r *receiptForNetwork) DecodeRLP(s *rlp.Stream) error {
kind, size, err := s.Kind()
if err != nil {
return err
}
if kind != rlp.List {
return errors.New("invalid receipt")
}
if size == 4 {
var rec = new(struct {
Type byte
Status uint64
GasUsed uint64
Logs []*types.Log
})
if err := s.Decode(rec); err != nil {
return err
}
r.Type = rec.Type
r.Status = rec.Status
r.GasUsed = rec.GasUsed
r.Logs = rec.Logs
} else {
s.Decode(&r)
}
return nil
}
func (r *receiptForNetwork) EncodeRLP(_w io.Writer) error {
data := &types.ReceiptForStorage{Status: r.Status, CumulativeGasUsed: r.GasUsed, Logs: r.Logs}
if r.Type == types.LegacyTxType {
return rlp.Encode(_w, data)
}
w := rlp.NewEncoderBuffer(_w)
outerList := w.List()
w.Write([]byte{r.Type})
if r.Status == types.ReceiptStatusSuccessful {
w.Write([]byte{0x01})
} else {
w.Write([]byte{0x00})
}
w.WriteUint64(r.GasUsed)
logList := w.List()
for _, log := range r.Logs {
if err := log.EncodeRLP(w); err != nil {
return err
}
}
w.ListEnd(logList)
w.ListEnd(outerList)
return w.Flush()
}