From 2b8a21db259a8d75576543dd96cedd657a6fe083 Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Wed, 9 Oct 2024 09:16:09 +0200 Subject: [PATCH] core/txpool, eth/catalyst: api changes, test update, external triggering of recheck core/txpool/tracking: add ability to trigger recheck externally core/txpool: remove local param from api eth/catalyst: update tests for new API miner, eth: update tests for changed txpool api eth/catalyst: remove trace-log-output from tests core/txpool: fix nits, also store on insert --- core/txpool/legacypool/legacypool.go | 15 --------- core/txpool/tracking/tx_tracker.go | 16 ++++++++- core/txpool/txpool.go | 2 +- eth/api_backend.go | 2 +- eth/backend.go | 3 +- eth/catalyst/api_test.go | 18 +++++----- eth/catalyst/simulated_beacon_test.go | 48 ++++++++++++++------------- eth/handler.go | 4 +-- eth/handler_eth_test.go | 6 ++-- eth/handler_test.go | 2 +- miner/payload_building_test.go | 2 +- 11 files changed, 60 insertions(+), 58 deletions(-) diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index 1b0f70dab6..f9f7a7d7a6 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -1600,21 +1600,6 @@ func newAccountSet(signer types.Signer, addrs ...common.Address) *accountSet { return as } -// contains checks if a given address is contained within the set. -func (as *accountSet) contains(addr common.Address) bool { - _, exist := as.accounts[addr] - return exist -} - -// containsTx checks if the sender of a given tx is within the set. If the sender -// cannot be derived, this method returns false. -func (as *accountSet) containsTx(tx *types.Transaction) bool { - if addr, err := types.Sender(as.signer, tx); err == nil { - return as.contains(addr) - } - return false -} - // add inserts a new address into the set to track. func (as *accountSet) add(addr common.Address) { as.accounts[addr] = struct{}{} diff --git a/core/txpool/tracking/tx_tracker.go b/core/txpool/tracking/tx_tracker.go index c8d5ef6422..a4c2602a45 100644 --- a/core/txpool/tracking/tx_tracker.go +++ b/core/txpool/tracking/tx_tracker.go @@ -48,6 +48,7 @@ type TxTracker struct { signer types.Signer shutdownCh chan struct{} + triggerCh chan struct{} mu sync.Mutex wg sync.WaitGroup } @@ -59,6 +60,7 @@ func NewTxTracker(journalPath string, journalTime time.Duration, chainConfig *pa byAddr: make(map[common.Address]*legacypool.SortedMap), signer: signer, shutdownCh: make(chan struct{}), + triggerCh: make(chan struct{}), pool: next, } if journalPath != "" { @@ -153,6 +155,13 @@ func (tracker *TxTracker) Stop() error { return nil } +// TriggerRecheck triggers a recheck, whereby the tracker potentially resubmits +// transactions to the tx pool. This method is mainly useful for test purposes, +// in order to speed up the process. +func (tracker *TxTracker) TriggerRecheck() { + tracker.triggerCh <- struct{}{} +} + func (tracker *TxTracker) loop() { defer tracker.wg.Done() if tracker.journal != nil { @@ -169,11 +178,16 @@ func (tracker *TxTracker) loop() { select { case <-tracker.shutdownCh: return + case <-tracker.triggerCh: + resubmits, _ := tracker.recheck(false) + if len(resubmits) > 0 { + tracker.pool.Add(resubmits, false) + } case <-t.C: checkJournal := tracker.journal != nil && time.Since(lastJournal) > tracker.rejournal resubmits, rejournal := tracker.recheck(checkJournal) if len(resubmits) > 0 { - tracker.pool.Add(resubmits, false, false) + tracker.pool.Add(resubmits, false) } if checkJournal { // Lock to prevent journal.rotate <-> journal.insert (via TrackAll) conflicts diff --git a/core/txpool/txpool.go b/core/txpool/txpool.go index 93b2ac82bf..e5d9db8296 100644 --- a/core/txpool/txpool.go +++ b/core/txpool/txpool.go @@ -325,7 +325,7 @@ func (p *TxPool) GetBlobs(vhashes []common.Hash) ([]*kzg4844.Blob, []*kzg4844.Pr // Add enqueues a batch of transactions into the pool if they are valid. Due // to the large transaction churn, add may postpone fully integrating the tx // to a later point to batch multiple ones together. -func (p *TxPool) Add(txs []*types.Transaction, local bool, sync bool) []error { +func (p *TxPool) Add(txs []*types.Transaction, sync bool) []error { // Split the input transactions between the subpools. It shouldn't really // happen that we receive merged batches, but better graceful than strange // errors. diff --git a/eth/api_backend.go b/eth/api_backend.go index bc5ce75c1d..8a77c5e599 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -282,7 +282,7 @@ func (b *EthAPIBackend) SendTx(ctx context.Context, signedTx *types.Transaction) if locals := b.eth.localTxTracker; locals != nil { locals.Track(signedTx) } - return b.eth.txPool.Add([]*types.Transaction{signedTx}, true, false)[0] + return b.eth.txPool.Add([]*types.Transaction{signedTx}, false)[0] } func (b *EthAPIBackend) GetPoolTransactions() (types.Transactions, error) { diff --git a/eth/backend.go b/eth/backend.go index 0cf50001b5..077885b569 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -340,7 +340,8 @@ func (s *Ethereum) ResetWithGenesisBlock(gb *types.Block) { s.blockchain.ResetWithGenesisBlock(gb) } -func (s *Ethereum) Miner() *miner.Miner { return s.miner } +func (s *Ethereum) Miner() *miner.Miner { return s.miner } +func (s *Ethereum) Tracker() *tracking.TxTracker { return s.localTxTracker } func (s *Ethereum) AccountManager() *accounts.Manager { return s.accountManager } func (s *Ethereum) BlockChain() *core.BlockChain { return s.blockchain } diff --git a/eth/catalyst/api_test.go b/eth/catalyst/api_test.go index e0a155f12b..0d89fd2377 100644 --- a/eth/catalyst/api_test.go +++ b/eth/catalyst/api_test.go @@ -116,7 +116,7 @@ func TestEth2AssembleBlock(t *testing.T) { if err != nil { t.Fatalf("error signing transaction, err=%v", err) } - ethservice.TxPool().Add([]*types.Transaction{tx}, true, true) + ethservice.TxPool().Add([]*types.Transaction{tx}, true) blockParams := engine.PayloadAttributes{ Timestamp: blocks[9].Time() + 5, } @@ -153,7 +153,7 @@ func TestEth2AssembleBlockWithAnotherBlocksTxs(t *testing.T) { // Put the 10th block's tx in the pool and produce a new block txs := blocks[9].Transactions() - api.eth.TxPool().Add(txs, false, true) + api.eth.TxPool().Add(txs, true) blockParams := engine.PayloadAttributes{ Timestamp: blocks[8].Time() + 5, } @@ -193,7 +193,7 @@ func TestEth2PrepareAndGetPayload(t *testing.T) { // Put the 10th block's tx in the pool and produce a new block txs := blocks[9].Transactions() - ethservice.TxPool().Add(txs, true, true) + ethservice.TxPool().Add(txs, true) blockParams := engine.PayloadAttributes{ Timestamp: blocks[8].Time() + 5, } @@ -313,7 +313,7 @@ func TestEth2NewBlock(t *testing.T) { statedb, _ := ethservice.BlockChain().StateAt(parent.Root()) nonce := statedb.GetNonce(testAddr) tx, _ := types.SignTx(types.NewContractCreation(nonce, new(big.Int), 1000000, big.NewInt(2*params.InitialBaseFee), logCode), types.LatestSigner(ethservice.BlockChain().Config()), testKey) - ethservice.TxPool().Add([]*types.Transaction{tx}, true, true) + ethservice.TxPool().Add([]*types.Transaction{tx}, true) execData, err := assembleWithTransactions(api, parent.Hash(), &engine.PayloadAttributes{ Timestamp: parent.Time() + 5, @@ -482,7 +482,7 @@ func TestFullAPI(t *testing.T) { statedb, _ := ethservice.BlockChain().StateAt(parent.Root) nonce := statedb.GetNonce(testAddr) tx, _ := types.SignTx(types.NewContractCreation(nonce, new(big.Int), 1000000, big.NewInt(2*params.InitialBaseFee), logCode), types.LatestSigner(ethservice.BlockChain().Config()), testKey) - ethservice.TxPool().Add([]*types.Transaction{tx}, true, false) + ethservice.TxPool().Add([]*types.Transaction{tx}, false) } setupBlocks(t, ethservice, 10, parent, callback, nil, nil) @@ -613,7 +613,7 @@ func TestNewPayloadOnInvalidChain(t *testing.T) { GasPrice: big.NewInt(2 * params.InitialBaseFee), Data: logCode, }) - ethservice.TxPool().Add([]*types.Transaction{tx}, false, true) + ethservice.TxPool().Add([]*types.Transaction{tx}, true) var ( params = engine.PayloadAttributes{ Timestamp: parent.Time + 1, @@ -1328,8 +1328,8 @@ func setupBodies(t *testing.T) (*node.Node, *eth.Ethereum, []*types.Block) { // Create tx to trigger deposit generator. tx2, _ = types.SignTx(types.NewTransaction(statedb.GetNonce(testAddr)+1, ethservice.APIBackend.ChainConfig().DepositContractAddress, new(big.Int), 500000, big.NewInt(2*params.InitialBaseFee), nil), types.LatestSigner(ethservice.BlockChain().Config()), testKey) ) - ethservice.TxPool().Add([]*types.Transaction{tx1}, false, false) - ethservice.TxPool().Add([]*types.Transaction{tx2}, false, false) + ethservice.TxPool().Add([]*types.Transaction{tx1}, false) + ethservice.TxPool().Add([]*types.Transaction{tx2}, false) } // Make some withdrawals to include. @@ -1717,7 +1717,7 @@ func TestWitnessCreationAndConsumption(t *testing.T) { // Put the 10th block's tx in the pool and produce a new block txs := blocks[9].Transactions() - ethservice.TxPool().Add(txs, true, true) + ethservice.TxPool().Add(txs, true) blockParams := engine.PayloadAttributes{ Timestamp: blocks[8].Time() + 5, Withdrawals: make([]*types.Withdrawal, 0), diff --git a/eth/catalyst/simulated_beacon_test.go b/eth/catalyst/simulated_beacon_test.go index 79d9ba738e..6b08730ec0 100644 --- a/eth/catalyst/simulated_beacon_test.go +++ b/eth/catalyst/simulated_beacon_test.go @@ -18,6 +18,7 @@ package catalyst import ( "context" + "fmt" "math/big" "testing" "time" @@ -145,7 +146,8 @@ func TestSimulatedBeaconSendWithdrawals(t *testing.T) { func TestOnDemandSpam(t *testing.T) { var ( withdrawals []types.Withdrawal - txs = make(map[common.Hash]*types.Transaction) + txCount = 20000 + wxCount = 20 testKey, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") testAddr = crypto.PubkeyToAddress(testKey.PublicKey) gasLimit uint64 = 10_000_000 @@ -160,7 +162,7 @@ func TestOnDemandSpam(t *testing.T) { defer sub.Unsubscribe() // generate some withdrawals - for i := 0; i < 20; i++ { + for i := 0; i < wxCount; i++ { withdrawals = append(withdrawals, types.Withdrawal{Index: uint64(i)}) if err := mock.withdrawals.add(&withdrawals[i]); err != nil { t.Fatal("addWithdrawal failed", err) @@ -168,37 +170,37 @@ func TestOnDemandSpam(t *testing.T) { } // generate a bunch of transactions - for i := 0; i < 20000; i++ { - tx, err := types.SignTx(types.NewTransaction(uint64(i), common.Address{byte(i), byte(1)}, big.NewInt(1000), params.TxGas, big.NewInt(params.InitialBaseFee*2), nil), signer, testKey) - if err != nil { - t.Fatal("error signing transaction", err) + go func() { + for i := 0; i < txCount; i++ { + tx, err := types.SignTx(types.NewTransaction(uint64(i), common.Address{byte(i), byte(1)}, big.NewInt(1000), params.TxGas, big.NewInt(params.InitialBaseFee*2), nil), signer, testKey) + if err != nil { + panic(fmt.Sprintf("error signing transaction: %v", err)) + } + if err := eth.TxPool().Add([]*types.Transaction{tx}, false)[0]; err != nil { + panic(fmt.Sprintf("error adding txs to pool: %v", err)) + } } - txs[tx.Hash()] = tx - if err := eth.APIBackend.SendTx(context.Background(), tx); err != nil { - t.Fatal("error adding txs to pool", err) - } - } - + }() var ( - includedTxs = make(map[common.Hash]struct{}) - includedWxs []uint64 + includedTxs int + includedWxs int + abort = time.NewTimer(10 * time.Second) ) + defer abort.Stop() for { select { case ev := <-chainHeadCh: block := eth.BlockChain().GetBlock(ev.Header.Hash(), ev.Header.Number.Uint64()) - for _, itx := range block.Transactions() { - includedTxs[itx.Hash()] = struct{}{} - } - for _, iwx := range block.Withdrawals() { - includedWxs = append(includedWxs, iwx.Index) - } + includedTxs += len(block.Transactions()) + includedWxs += len(block.Withdrawals()) // ensure all withdrawals/txs included. this will take two blocks b/c number of withdrawals > 10 - if len(includedTxs) == len(txs) && len(includedWxs) == len(withdrawals) { + if includedTxs == txCount && includedWxs == wxCount { return } - case <-time.After(10 * time.Second): - t.Fatalf("timed out without including all withdrawals/txs: have txs %d, want %d, have wxs %d, want %d", len(includedTxs), len(txs), len(includedWxs), len(withdrawals)) + abort.Reset(5 * time.Second) + case <-abort.C: + t.Fatalf("timed out without including all withdrawals/txs: have txs %d, want %d, have wxs %d, want %d", + includedTxs, txCount, includedWxs, wxCount) } } } diff --git a/eth/handler.go b/eth/handler.go index 9820118173..c1782c5259 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -68,7 +68,7 @@ type txPool interface { Get(hash common.Hash) *types.Transaction // Add should add the given transactions to the pool. - Add(txs []*types.Transaction, local bool, sync bool) []error + Add(txs []*types.Transaction, sync bool) []error // Pending should return pending transactions. // The slice should be modifiable by the caller. @@ -189,7 +189,7 @@ func newHandler(config *handlerConfig) (*handler, error) { return p.RequestTxs(hashes) } addTxs := func(txs []*types.Transaction) []error { - return h.txpool.Add(txs, false, false) + return h.txpool.Add(txs, false) } h.txFetcher = fetcher.NewTxFetcher(h.txpool.Has, addTxs, fetchTx, h.removePeer) return h, nil diff --git a/eth/handler_eth_test.go b/eth/handler_eth_test.go index ce17345358..a3b708b7fa 100644 --- a/eth/handler_eth_test.go +++ b/eth/handler_eth_test.go @@ -300,8 +300,8 @@ func testSendTransactions(t *testing.T, protocol uint) { tx, _ = types.SignTx(tx, types.HomesteadSigner{}, testKey) insert[nonce] = tx } - go handler.txpool.Add(insert, false, false) // Need goroutine to not block on feed - time.Sleep(250 * time.Millisecond) // Wait until tx events get out of the system (can't use events, tx broadcaster races with peer join) + go handler.txpool.Add(insert, false) // Need goroutine to not block on feed + time.Sleep(250 * time.Millisecond) // Wait until tx events get out of the system (can't use events, tx broadcaster races with peer join) // Create a source handler to send messages through and a sink peer to receive them p2pSrc, p2pSink := p2p.MsgPipe() @@ -421,7 +421,7 @@ func testTransactionPropagation(t *testing.T, protocol uint) { tx, _ = types.SignTx(tx, types.HomesteadSigner{}, testKey) txs[nonce] = tx } - source.txpool.Add(txs, false, false) + source.txpool.Add(txs, false) // Iterate through all the sinks and ensure they all got the transactions for i := range sinks { diff --git a/eth/handler_test.go b/eth/handler_test.go index b63d3e8592..d5d46a3c65 100644 --- a/eth/handler_test.go +++ b/eth/handler_test.go @@ -80,7 +80,7 @@ func (p *testTxPool) Get(hash common.Hash) *types.Transaction { // Add appends a batch of transactions to the pool, and notifies any // listeners if the addition channel is non nil -func (p *testTxPool) Add(txs []*types.Transaction, local bool, sync bool) []error { +func (p *testTxPool) Add(txs []*types.Transaction, sync bool) []error { p.lock.Lock() defer p.lock.Unlock() diff --git a/miner/payload_building_test.go b/miner/payload_building_test.go index e5eb0297a1..307024c6bb 100644 --- a/miner/payload_building_test.go +++ b/miner/payload_building_test.go @@ -138,7 +138,7 @@ func (b *testWorkerBackend) TxPool() *txpool.TxPool { return b.txPool } func newTestWorker(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine, db ethdb.Database, blocks int) (*Miner, *testWorkerBackend) { backend := newTestWorkerBackend(t, chainConfig, engine, db, blocks) - backend.txPool.Add(pendingTxs, true, true) + backend.txPool.Add(pendingTxs, true) w := New(backend, testConfig, engine) return w, backend }