diff --git a/core/txpool/blobpool/blobpool.go b/core/txpool/blobpool/blobpool.go index 685c017cbd..864b92257b 100644 --- a/core/txpool/blobpool/blobpool.go +++ b/core/txpool/blobpool/blobpool.go @@ -1013,6 +1013,56 @@ func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) error { return nil } +func (p *BlobPool) flushTransactionsBelowTip(tip *uint256.Int) { + for addr, txs := range p.index { + for i, tx := range txs { + if tx.execTipCap.Cmp(tip) < 0 { + // Drop the offending transaction + var ( + ids = []uint64{tx.id} + nonces = []uint64{tx.nonce} + ) + p.spent[addr] = new(uint256.Int).Sub(p.spent[addr], txs[i].costCap) + p.stored -= uint64(tx.size) + delete(p.lookup, tx.hash) + txs[i] = nil + + // Drop everything afterwards, no gaps allowed + for j, tx := range txs[i+1:] { + ids = append(ids, tx.id) + nonces = append(nonces, tx.nonce) + + p.spent[addr] = new(uint256.Int).Sub(p.spent[addr], tx.costCap) + p.stored -= uint64(tx.size) + delete(p.lookup, tx.hash) + txs[i+1+j] = nil + } + // Clear out the dropped transactions from the index + if i > 0 { + p.index[addr] = txs[:i] + heap.Fix(p.evict, p.evict.index[addr]) + } else { + delete(p.index, addr) + delete(p.spent, addr) + + heap.Remove(p.evict, p.evict.index[addr]) + p.reserve(addr, false) + } + // Clear out the transactions from the data store + log.Warn("Dropping underpriced blob transaction", "from", addr, "rejected", tx.nonce, "tip", tx.execTipCap, "want", tip, "drop", nonces, "ids", ids) + dropUnderpricedMeter.Mark(int64(len(ids))) + + for _, id := range ids { + if err := p.store.Delete(id); err != nil { + log.Error("Failed to delete dropped transaction", "id", id, "err", err) + } + } + break + } + } + } +} + // SetGasTip implements txpool.SubPool, allowing the blob pool's gas requirements // to be kept in sync with the main transaction pool's gas requirements. func (p *BlobPool) SetGasTip(tip *big.Int) { @@ -1025,59 +1075,20 @@ func (p *BlobPool) SetGasTip(tip *big.Int) { // If the min miner fee increased, remove transactions below the new threshold if old == nil || p.gasTip.Cmp(old) > 0 { - for addr, txs := range p.index { - for i, tx := range txs { - if tx.execTipCap.Cmp(p.gasTip) < 0 { - // Drop the offending transaction - var ( - ids = []uint64{tx.id} - nonces = []uint64{tx.nonce} - ) - p.spent[addr] = new(uint256.Int).Sub(p.spent[addr], txs[i].costCap) - p.stored -= uint64(tx.size) - delete(p.lookup, tx.hash) - txs[i] = nil - - // Drop everything afterwards, no gaps allowed - for j, tx := range txs[i+1:] { - ids = append(ids, tx.id) - nonces = append(nonces, tx.nonce) - - p.spent[addr] = new(uint256.Int).Sub(p.spent[addr], tx.costCap) - p.stored -= uint64(tx.size) - delete(p.lookup, tx.hash) - txs[i+1+j] = nil - } - // Clear out the dropped transactions from the index - if i > 0 { - p.index[addr] = txs[:i] - heap.Fix(p.evict, p.evict.index[addr]) - } else { - delete(p.index, addr) - delete(p.spent, addr) - - heap.Remove(p.evict, p.evict.index[addr]) - p.reserve(addr, false) - } - // Clear out the transactions from the data store - log.Warn("Dropping underpriced blob transaction", "from", addr, "rejected", tx.nonce, "tip", tx.execTipCap, "want", tip, "drop", nonces, "ids", ids) - dropUnderpricedMeter.Mark(int64(len(ids))) - - for _, id := range ids { - if err := p.store.Delete(id); err != nil { - log.Error("Failed to delete dropped transaction", "id", id, "err", err) - } - } - break - } - } - } + p.flushTransactionsBelowTip(p.gasTip) } log.Debug("Blobpool tip threshold updated", "tip", tip) pooltipGauge.Update(tip.Int64()) p.updateStorageMetrics() } +func (p *BlobPool) FlushAllTransactions() { + maxUint256 := uint256.MustFromBig(new(big.Int).Sub(new(big.Int).Lsh(common.Big1, 256), common.Big1)) + p.lock.Lock() + defer p.lock.Unlock() + p.flushTransactionsBelowTip(maxUint256) +} + // validateTx checks whether a transaction is valid according to the consensus // rules and adheres to some heuristic limits of the local node (price and size). func (p *BlobPool) validateTx(tx *types.Transaction) error { diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index 5506ecc31f..1ac8086522 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -429,6 +429,15 @@ func (pool *LegacyPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs return pool.txFeed.Subscribe(ch) } +func (pool *LegacyPool) flushTransactionsBelowTip(tip *big.Int) { + // pool.priced is sorted by GasFeeCap, so we have to iterate through pool.all instead + drop := pool.all.RemotesBelowTip(tip) + for _, tx := range drop { + pool.removeTx(tx.Hash(), false, true) + } + pool.priced.Removed(len(drop)) +} + // SetGasTip updates the minimum gas tip required by the transaction pool for a // new transaction, and drops all transactions below this threshold. func (pool *LegacyPool) SetGasTip(tip *big.Int) { @@ -442,16 +451,18 @@ func (pool *LegacyPool) SetGasTip(tip *big.Int) { pool.gasTip.Store(newTip) // If the min miner fee increased, remove transactions below the new threshold if newTip.Cmp(old) > 0 { - // pool.priced is sorted by GasFeeCap, so we have to iterate through pool.all instead - drop := pool.all.RemotesBelowTip(tip) - for _, tx := range drop { - pool.removeTx(tx.Hash(), false, true) - } - pool.priced.Removed(len(drop)) + pool.flushTransactionsBelowTip(tip) } log.Info("Legacy pool tip threshold updated", "tip", newTip) } +func (pool *LegacyPool) FlushAllTransactions() { + maxUint256 := new(big.Int).Sub(new(big.Int).Lsh(common.Big1, 256), common.Big1) + pool.mu.Lock() + defer pool.mu.Unlock() + pool.flushTransactionsBelowTip(maxUint256) +} + // Nonce returns the next nonce of an account, with all transactions executable // by the pool already applied on top. func (pool *LegacyPool) Nonce(addr common.Address) uint64 { diff --git a/core/txpool/subpool.go b/core/txpool/subpool.go index 9881ed1b8f..e361eabd4e 100644 --- a/core/txpool/subpool.go +++ b/core/txpool/subpool.go @@ -116,6 +116,9 @@ type SubPool interface { // transaction, and drops all transactions below this threshold. SetGasTip(tip *big.Int) + // FlushAllTransactions drops all transactions in the pool. + FlushAllTransactions() + // Has returns an indicator whether subpool has a transaction cached with the // given hash. Has(hash common.Hash) bool diff --git a/core/txpool/txpool.go b/core/txpool/txpool.go index be7435247d..f18eb4c6a0 100644 --- a/core/txpool/txpool.go +++ b/core/txpool/txpool.go @@ -104,6 +104,13 @@ func New(gasTip uint64, chain BlockChain, subpools []SubPool) (*TxPool, error) { return pool, nil } +// FlushAllTransactions removes all transactions from all subpools +func (p *TxPool) FlushAllTransactions() { + for _, subpool := range p.subpools { + subpool.FlushAllTransactions() + } +} + // reserver is a method to create an address reservation callback to exclusively // assign/deassign addresses to/from subpools. This can ensure that at any point // in time, only a single subpool is able to manage an account, avoiding cross diff --git a/eth/catalyst/simulated_beacon.go b/eth/catalyst/simulated_beacon.go index dbf561ca41..247d906d05 100644 --- a/eth/catalyst/simulated_beacon.go +++ b/eth/catalyst/simulated_beacon.go @@ -21,7 +21,6 @@ import ( "crypto/sha256" "errors" "fmt" - "math/big" "sync" "time" @@ -34,7 +33,6 @@ import ( "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/node" - "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rpc" ) @@ -286,12 +284,7 @@ func (c *SimulatedBeacon) Commit() common.Hash { // Rollback un-sends previously added transactions. func (c *SimulatedBeacon) Rollback() { - // Flush all transactions from the transaction pools - maxUint256 := new(big.Int).Sub(new(big.Int).Lsh(common.Big1, 256), common.Big1) - c.eth.TxPool().SetGasTip(maxUint256) - // Set the gas tip back to accept new transactions - // TODO (Marius van der Wijden): set gas tip to parameter passed by config - c.eth.TxPool().SetGasTip(big.NewInt(params.GWei)) + c.eth.TxPool().FlushAllTransactions() } // Fork sets the head to the provided hash.