core/txpool, eth/catalyst: api changes, test update, external triggering of recheck

core/txpool/tracking: add ability to trigger recheck externally
core/txpool: remove local param from api
eth/catalyst: update tests for new API
miner, eth: update tests for changed txpool api
eth/catalyst: remove trace-log-output from tests
core/txpool: fix nits, also store on insert
This commit is contained in:
Martin Holst Swende 2024-10-09 09:16:09 +02:00
parent 183fc1085e
commit 2b8a21db25
No known key found for this signature in database
GPG Key ID: 683B438C05A5DDF0
11 changed files with 60 additions and 58 deletions

View File

@ -1600,21 +1600,6 @@ func newAccountSet(signer types.Signer, addrs ...common.Address) *accountSet {
return as return as
} }
// contains checks if a given address is contained within the set.
func (as *accountSet) contains(addr common.Address) bool {
_, exist := as.accounts[addr]
return exist
}
// containsTx checks if the sender of a given tx is within the set. If the sender
// cannot be derived, this method returns false.
func (as *accountSet) containsTx(tx *types.Transaction) bool {
if addr, err := types.Sender(as.signer, tx); err == nil {
return as.contains(addr)
}
return false
}
// add inserts a new address into the set to track. // add inserts a new address into the set to track.
func (as *accountSet) add(addr common.Address) { func (as *accountSet) add(addr common.Address) {
as.accounts[addr] = struct{}{} as.accounts[addr] = struct{}{}

View File

@ -48,6 +48,7 @@ type TxTracker struct {
signer types.Signer signer types.Signer
shutdownCh chan struct{} shutdownCh chan struct{}
triggerCh chan struct{}
mu sync.Mutex mu sync.Mutex
wg sync.WaitGroup wg sync.WaitGroup
} }
@ -59,6 +60,7 @@ func NewTxTracker(journalPath string, journalTime time.Duration, chainConfig *pa
byAddr: make(map[common.Address]*legacypool.SortedMap), byAddr: make(map[common.Address]*legacypool.SortedMap),
signer: signer, signer: signer,
shutdownCh: make(chan struct{}), shutdownCh: make(chan struct{}),
triggerCh: make(chan struct{}),
pool: next, pool: next,
} }
if journalPath != "" { if journalPath != "" {
@ -153,6 +155,13 @@ func (tracker *TxTracker) Stop() error {
return nil return nil
} }
// TriggerRecheck triggers a recheck, whereby the tracker potentially resubmits
// transactions to the tx pool. This method is mainly useful for test purposes,
// in order to speed up the process.
func (tracker *TxTracker) TriggerRecheck() {
tracker.triggerCh <- struct{}{}
}
func (tracker *TxTracker) loop() { func (tracker *TxTracker) loop() {
defer tracker.wg.Done() defer tracker.wg.Done()
if tracker.journal != nil { if tracker.journal != nil {
@ -169,11 +178,16 @@ func (tracker *TxTracker) loop() {
select { select {
case <-tracker.shutdownCh: case <-tracker.shutdownCh:
return return
case <-tracker.triggerCh:
resubmits, _ := tracker.recheck(false)
if len(resubmits) > 0 {
tracker.pool.Add(resubmits, false)
}
case <-t.C: case <-t.C:
checkJournal := tracker.journal != nil && time.Since(lastJournal) > tracker.rejournal checkJournal := tracker.journal != nil && time.Since(lastJournal) > tracker.rejournal
resubmits, rejournal := tracker.recheck(checkJournal) resubmits, rejournal := tracker.recheck(checkJournal)
if len(resubmits) > 0 { if len(resubmits) > 0 {
tracker.pool.Add(resubmits, false, false) tracker.pool.Add(resubmits, false)
} }
if checkJournal { if checkJournal {
// Lock to prevent journal.rotate <-> journal.insert (via TrackAll) conflicts // Lock to prevent journal.rotate <-> journal.insert (via TrackAll) conflicts

View File

@ -325,7 +325,7 @@ func (p *TxPool) GetBlobs(vhashes []common.Hash) ([]*kzg4844.Blob, []*kzg4844.Pr
// Add enqueues a batch of transactions into the pool if they are valid. Due // Add enqueues a batch of transactions into the pool if they are valid. Due
// to the large transaction churn, add may postpone fully integrating the tx // to the large transaction churn, add may postpone fully integrating the tx
// to a later point to batch multiple ones together. // to a later point to batch multiple ones together.
func (p *TxPool) Add(txs []*types.Transaction, local bool, sync bool) []error { func (p *TxPool) Add(txs []*types.Transaction, sync bool) []error {
// Split the input transactions between the subpools. It shouldn't really // Split the input transactions between the subpools. It shouldn't really
// happen that we receive merged batches, but better graceful than strange // happen that we receive merged batches, but better graceful than strange
// errors. // errors.

View File

@ -282,7 +282,7 @@ func (b *EthAPIBackend) SendTx(ctx context.Context, signedTx *types.Transaction)
if locals := b.eth.localTxTracker; locals != nil { if locals := b.eth.localTxTracker; locals != nil {
locals.Track(signedTx) locals.Track(signedTx)
} }
return b.eth.txPool.Add([]*types.Transaction{signedTx}, true, false)[0] return b.eth.txPool.Add([]*types.Transaction{signedTx}, false)[0]
} }
func (b *EthAPIBackend) GetPoolTransactions() (types.Transactions, error) { func (b *EthAPIBackend) GetPoolTransactions() (types.Transactions, error) {

View File

@ -340,7 +340,8 @@ func (s *Ethereum) ResetWithGenesisBlock(gb *types.Block) {
s.blockchain.ResetWithGenesisBlock(gb) s.blockchain.ResetWithGenesisBlock(gb)
} }
func (s *Ethereum) Miner() *miner.Miner { return s.miner } func (s *Ethereum) Miner() *miner.Miner { return s.miner }
func (s *Ethereum) Tracker() *tracking.TxTracker { return s.localTxTracker }
func (s *Ethereum) AccountManager() *accounts.Manager { return s.accountManager } func (s *Ethereum) AccountManager() *accounts.Manager { return s.accountManager }
func (s *Ethereum) BlockChain() *core.BlockChain { return s.blockchain } func (s *Ethereum) BlockChain() *core.BlockChain { return s.blockchain }

View File

@ -116,7 +116,7 @@ func TestEth2AssembleBlock(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("error signing transaction, err=%v", err) t.Fatalf("error signing transaction, err=%v", err)
} }
ethservice.TxPool().Add([]*types.Transaction{tx}, true, true) ethservice.TxPool().Add([]*types.Transaction{tx}, true)
blockParams := engine.PayloadAttributes{ blockParams := engine.PayloadAttributes{
Timestamp: blocks[9].Time() + 5, Timestamp: blocks[9].Time() + 5,
} }
@ -153,7 +153,7 @@ func TestEth2AssembleBlockWithAnotherBlocksTxs(t *testing.T) {
// Put the 10th block's tx in the pool and produce a new block // Put the 10th block's tx in the pool and produce a new block
txs := blocks[9].Transactions() txs := blocks[9].Transactions()
api.eth.TxPool().Add(txs, false, true) api.eth.TxPool().Add(txs, true)
blockParams := engine.PayloadAttributes{ blockParams := engine.PayloadAttributes{
Timestamp: blocks[8].Time() + 5, Timestamp: blocks[8].Time() + 5,
} }
@ -193,7 +193,7 @@ func TestEth2PrepareAndGetPayload(t *testing.T) {
// Put the 10th block's tx in the pool and produce a new block // Put the 10th block's tx in the pool and produce a new block
txs := blocks[9].Transactions() txs := blocks[9].Transactions()
ethservice.TxPool().Add(txs, true, true) ethservice.TxPool().Add(txs, true)
blockParams := engine.PayloadAttributes{ blockParams := engine.PayloadAttributes{
Timestamp: blocks[8].Time() + 5, Timestamp: blocks[8].Time() + 5,
} }
@ -313,7 +313,7 @@ func TestEth2NewBlock(t *testing.T) {
statedb, _ := ethservice.BlockChain().StateAt(parent.Root()) statedb, _ := ethservice.BlockChain().StateAt(parent.Root())
nonce := statedb.GetNonce(testAddr) nonce := statedb.GetNonce(testAddr)
tx, _ := types.SignTx(types.NewContractCreation(nonce, new(big.Int), 1000000, big.NewInt(2*params.InitialBaseFee), logCode), types.LatestSigner(ethservice.BlockChain().Config()), testKey) tx, _ := types.SignTx(types.NewContractCreation(nonce, new(big.Int), 1000000, big.NewInt(2*params.InitialBaseFee), logCode), types.LatestSigner(ethservice.BlockChain().Config()), testKey)
ethservice.TxPool().Add([]*types.Transaction{tx}, true, true) ethservice.TxPool().Add([]*types.Transaction{tx}, true)
execData, err := assembleWithTransactions(api, parent.Hash(), &engine.PayloadAttributes{ execData, err := assembleWithTransactions(api, parent.Hash(), &engine.PayloadAttributes{
Timestamp: parent.Time() + 5, Timestamp: parent.Time() + 5,
@ -482,7 +482,7 @@ func TestFullAPI(t *testing.T) {
statedb, _ := ethservice.BlockChain().StateAt(parent.Root) statedb, _ := ethservice.BlockChain().StateAt(parent.Root)
nonce := statedb.GetNonce(testAddr) nonce := statedb.GetNonce(testAddr)
tx, _ := types.SignTx(types.NewContractCreation(nonce, new(big.Int), 1000000, big.NewInt(2*params.InitialBaseFee), logCode), types.LatestSigner(ethservice.BlockChain().Config()), testKey) tx, _ := types.SignTx(types.NewContractCreation(nonce, new(big.Int), 1000000, big.NewInt(2*params.InitialBaseFee), logCode), types.LatestSigner(ethservice.BlockChain().Config()), testKey)
ethservice.TxPool().Add([]*types.Transaction{tx}, true, false) ethservice.TxPool().Add([]*types.Transaction{tx}, false)
} }
setupBlocks(t, ethservice, 10, parent, callback, nil, nil) setupBlocks(t, ethservice, 10, parent, callback, nil, nil)
@ -613,7 +613,7 @@ func TestNewPayloadOnInvalidChain(t *testing.T) {
GasPrice: big.NewInt(2 * params.InitialBaseFee), GasPrice: big.NewInt(2 * params.InitialBaseFee),
Data: logCode, Data: logCode,
}) })
ethservice.TxPool().Add([]*types.Transaction{tx}, false, true) ethservice.TxPool().Add([]*types.Transaction{tx}, true)
var ( var (
params = engine.PayloadAttributes{ params = engine.PayloadAttributes{
Timestamp: parent.Time + 1, Timestamp: parent.Time + 1,
@ -1328,8 +1328,8 @@ func setupBodies(t *testing.T) (*node.Node, *eth.Ethereum, []*types.Block) {
// Create tx to trigger deposit generator. // Create tx to trigger deposit generator.
tx2, _ = types.SignTx(types.NewTransaction(statedb.GetNonce(testAddr)+1, ethservice.APIBackend.ChainConfig().DepositContractAddress, new(big.Int), 500000, big.NewInt(2*params.InitialBaseFee), nil), types.LatestSigner(ethservice.BlockChain().Config()), testKey) tx2, _ = types.SignTx(types.NewTransaction(statedb.GetNonce(testAddr)+1, ethservice.APIBackend.ChainConfig().DepositContractAddress, new(big.Int), 500000, big.NewInt(2*params.InitialBaseFee), nil), types.LatestSigner(ethservice.BlockChain().Config()), testKey)
) )
ethservice.TxPool().Add([]*types.Transaction{tx1}, false, false) ethservice.TxPool().Add([]*types.Transaction{tx1}, false)
ethservice.TxPool().Add([]*types.Transaction{tx2}, false, false) ethservice.TxPool().Add([]*types.Transaction{tx2}, false)
} }
// Make some withdrawals to include. // Make some withdrawals to include.
@ -1717,7 +1717,7 @@ func TestWitnessCreationAndConsumption(t *testing.T) {
// Put the 10th block's tx in the pool and produce a new block // Put the 10th block's tx in the pool and produce a new block
txs := blocks[9].Transactions() txs := blocks[9].Transactions()
ethservice.TxPool().Add(txs, true, true) ethservice.TxPool().Add(txs, true)
blockParams := engine.PayloadAttributes{ blockParams := engine.PayloadAttributes{
Timestamp: blocks[8].Time() + 5, Timestamp: blocks[8].Time() + 5,
Withdrawals: make([]*types.Withdrawal, 0), Withdrawals: make([]*types.Withdrawal, 0),

View File

@ -18,6 +18,7 @@ package catalyst
import ( import (
"context" "context"
"fmt"
"math/big" "math/big"
"testing" "testing"
"time" "time"
@ -145,7 +146,8 @@ func TestSimulatedBeaconSendWithdrawals(t *testing.T) {
func TestOnDemandSpam(t *testing.T) { func TestOnDemandSpam(t *testing.T) {
var ( var (
withdrawals []types.Withdrawal withdrawals []types.Withdrawal
txs = make(map[common.Hash]*types.Transaction) txCount = 20000
wxCount = 20
testKey, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") testKey, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
testAddr = crypto.PubkeyToAddress(testKey.PublicKey) testAddr = crypto.PubkeyToAddress(testKey.PublicKey)
gasLimit uint64 = 10_000_000 gasLimit uint64 = 10_000_000
@ -160,7 +162,7 @@ func TestOnDemandSpam(t *testing.T) {
defer sub.Unsubscribe() defer sub.Unsubscribe()
// generate some withdrawals // generate some withdrawals
for i := 0; i < 20; i++ { for i := 0; i < wxCount; i++ {
withdrawals = append(withdrawals, types.Withdrawal{Index: uint64(i)}) withdrawals = append(withdrawals, types.Withdrawal{Index: uint64(i)})
if err := mock.withdrawals.add(&withdrawals[i]); err != nil { if err := mock.withdrawals.add(&withdrawals[i]); err != nil {
t.Fatal("addWithdrawal failed", err) t.Fatal("addWithdrawal failed", err)
@ -168,37 +170,37 @@ func TestOnDemandSpam(t *testing.T) {
} }
// generate a bunch of transactions // generate a bunch of transactions
for i := 0; i < 20000; i++ { go func() {
tx, err := types.SignTx(types.NewTransaction(uint64(i), common.Address{byte(i), byte(1)}, big.NewInt(1000), params.TxGas, big.NewInt(params.InitialBaseFee*2), nil), signer, testKey) for i := 0; i < txCount; i++ {
if err != nil { tx, err := types.SignTx(types.NewTransaction(uint64(i), common.Address{byte(i), byte(1)}, big.NewInt(1000), params.TxGas, big.NewInt(params.InitialBaseFee*2), nil), signer, testKey)
t.Fatal("error signing transaction", err) if err != nil {
panic(fmt.Sprintf("error signing transaction: %v", err))
}
if err := eth.TxPool().Add([]*types.Transaction{tx}, false)[0]; err != nil {
panic(fmt.Sprintf("error adding txs to pool: %v", err))
}
} }
txs[tx.Hash()] = tx }()
if err := eth.APIBackend.SendTx(context.Background(), tx); err != nil {
t.Fatal("error adding txs to pool", err)
}
}
var ( var (
includedTxs = make(map[common.Hash]struct{}) includedTxs int
includedWxs []uint64 includedWxs int
abort = time.NewTimer(10 * time.Second)
) )
defer abort.Stop()
for { for {
select { select {
case ev := <-chainHeadCh: case ev := <-chainHeadCh:
block := eth.BlockChain().GetBlock(ev.Header.Hash(), ev.Header.Number.Uint64()) block := eth.BlockChain().GetBlock(ev.Header.Hash(), ev.Header.Number.Uint64())
for _, itx := range block.Transactions() { includedTxs += len(block.Transactions())
includedTxs[itx.Hash()] = struct{}{} includedWxs += len(block.Withdrawals())
}
for _, iwx := range block.Withdrawals() {
includedWxs = append(includedWxs, iwx.Index)
}
// ensure all withdrawals/txs included. this will take two blocks b/c number of withdrawals > 10 // ensure all withdrawals/txs included. this will take two blocks b/c number of withdrawals > 10
if len(includedTxs) == len(txs) && len(includedWxs) == len(withdrawals) { if includedTxs == txCount && includedWxs == wxCount {
return return
} }
case <-time.After(10 * time.Second): abort.Reset(5 * time.Second)
t.Fatalf("timed out without including all withdrawals/txs: have txs %d, want %d, have wxs %d, want %d", len(includedTxs), len(txs), len(includedWxs), len(withdrawals)) case <-abort.C:
t.Fatalf("timed out without including all withdrawals/txs: have txs %d, want %d, have wxs %d, want %d",
includedTxs, txCount, includedWxs, wxCount)
} }
} }
} }

View File

@ -68,7 +68,7 @@ type txPool interface {
Get(hash common.Hash) *types.Transaction Get(hash common.Hash) *types.Transaction
// Add should add the given transactions to the pool. // Add should add the given transactions to the pool.
Add(txs []*types.Transaction, local bool, sync bool) []error Add(txs []*types.Transaction, sync bool) []error
// Pending should return pending transactions. // Pending should return pending transactions.
// The slice should be modifiable by the caller. // The slice should be modifiable by the caller.
@ -189,7 +189,7 @@ func newHandler(config *handlerConfig) (*handler, error) {
return p.RequestTxs(hashes) return p.RequestTxs(hashes)
} }
addTxs := func(txs []*types.Transaction) []error { addTxs := func(txs []*types.Transaction) []error {
return h.txpool.Add(txs, false, false) return h.txpool.Add(txs, false)
} }
h.txFetcher = fetcher.NewTxFetcher(h.txpool.Has, addTxs, fetchTx, h.removePeer) h.txFetcher = fetcher.NewTxFetcher(h.txpool.Has, addTxs, fetchTx, h.removePeer)
return h, nil return h, nil

View File

@ -300,8 +300,8 @@ func testSendTransactions(t *testing.T, protocol uint) {
tx, _ = types.SignTx(tx, types.HomesteadSigner{}, testKey) tx, _ = types.SignTx(tx, types.HomesteadSigner{}, testKey)
insert[nonce] = tx insert[nonce] = tx
} }
go handler.txpool.Add(insert, false, false) // Need goroutine to not block on feed go handler.txpool.Add(insert, false) // Need goroutine to not block on feed
time.Sleep(250 * time.Millisecond) // Wait until tx events get out of the system (can't use events, tx broadcaster races with peer join) time.Sleep(250 * time.Millisecond) // Wait until tx events get out of the system (can't use events, tx broadcaster races with peer join)
// Create a source handler to send messages through and a sink peer to receive them // Create a source handler to send messages through and a sink peer to receive them
p2pSrc, p2pSink := p2p.MsgPipe() p2pSrc, p2pSink := p2p.MsgPipe()
@ -421,7 +421,7 @@ func testTransactionPropagation(t *testing.T, protocol uint) {
tx, _ = types.SignTx(tx, types.HomesteadSigner{}, testKey) tx, _ = types.SignTx(tx, types.HomesteadSigner{}, testKey)
txs[nonce] = tx txs[nonce] = tx
} }
source.txpool.Add(txs, false, false) source.txpool.Add(txs, false)
// Iterate through all the sinks and ensure they all got the transactions // Iterate through all the sinks and ensure they all got the transactions
for i := range sinks { for i := range sinks {

View File

@ -80,7 +80,7 @@ func (p *testTxPool) Get(hash common.Hash) *types.Transaction {
// Add appends a batch of transactions to the pool, and notifies any // Add appends a batch of transactions to the pool, and notifies any
// listeners if the addition channel is non nil // listeners if the addition channel is non nil
func (p *testTxPool) Add(txs []*types.Transaction, local bool, sync bool) []error { func (p *testTxPool) Add(txs []*types.Transaction, sync bool) []error {
p.lock.Lock() p.lock.Lock()
defer p.lock.Unlock() defer p.lock.Unlock()

View File

@ -138,7 +138,7 @@ func (b *testWorkerBackend) TxPool() *txpool.TxPool { return b.txPool }
func newTestWorker(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine, db ethdb.Database, blocks int) (*Miner, *testWorkerBackend) { func newTestWorker(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine, db ethdb.Database, blocks int) (*Miner, *testWorkerBackend) {
backend := newTestWorkerBackend(t, chainConfig, engine, db, blocks) backend := newTestWorkerBackend(t, chainConfig, engine, db, blocks)
backend.txPool.Add(pendingTxs, true, true) backend.txPool.Add(pendingTxs, true)
w := New(backend, testConfig, engine) w := New(backend, testConfig, engine)
return w, backend return w, backend
} }