From af5c97aebe1d37486635521ef553cb8bd4bada13 Mon Sep 17 00:00:00 2001 From: Marius van der Wijden Date: Wed, 1 Jul 2020 19:35:26 +0200 Subject: [PATCH] core, txpool: less allocations when handling transactions (#21232) * core: use uint64 for total tx costs instead of big.Int * core: added local tx pool test case * core, crypto: various allocation savings regarding tx handling * Update core/tx_list.go * core: added tx.GasPriceIntCmp for comparison without allocation adds a method to remove unneeded allocation in comparison to tx.gasPrice * core: handle pools full of locals better * core/tests: benchmark for tx_list * core/txlist, txpool: save a reheap operation, avoid some bigint allocs Co-authored-by: Martin Holst Swende --- common/math/integer.go | 10 +-- core/tx_list.go | 129 +++++++++++++++++++++++++++++--------- core/tx_list_test.go | 19 +++--- core/tx_pool.go | 18 ++++-- core/tx_pool_test.go | 18 ++++-- core/types/transaction.go | 11 ++++ 6 files changed, 151 insertions(+), 54 deletions(-) diff --git a/common/math/integer.go b/common/math/integer.go index 93b1d036dd..46d91ab2a4 100644 --- a/common/math/integer.go +++ b/common/math/integer.go @@ -18,6 +18,7 @@ package math import ( "fmt" + "math/bits" "strconv" ) @@ -87,13 +88,12 @@ func SafeSub(x, y uint64) (uint64, bool) { // SafeAdd returns the result and whether overflow occurred. func SafeAdd(x, y uint64) (uint64, bool) { - return x + y, y > MaxUint64-x + sum, carry := bits.Add64(x, y, 0) + return sum, carry != 0 } // SafeMul returns multiplication result and whether overflow occurred. func SafeMul(x, y uint64) (uint64, bool) { - if x == 0 || y == 0 { - return 0, false - } - return x * y, y > MaxUint64/x + hi, lo := bits.Mul64(x, y) + return lo, hi != 0 } diff --git a/core/tx_list.go b/core/tx_list.go index 164c73006b..8beb28bba9 100644 --- a/core/tx_list.go +++ b/core/tx_list.go @@ -99,7 +99,30 @@ func (m *txSortedMap) Forward(threshold uint64) types.Transactions { // Filter iterates over the list of transactions and removes all of them for which // the specified function evaluates to true. +// Filter, as opposed to 'filter', re-initialises the heap after the operation is done. +// If you want to do several consecutive filterings, it's therefore better to first +// do a .filter(func1) followed by .Filter(func2) or reheap() func (m *txSortedMap) Filter(filter func(*types.Transaction) bool) types.Transactions { + removed := m.filter(filter) + // If transactions were removed, the heap and cache are ruined + if len(removed) > 0 { + m.reheap() + } + return removed +} + +func (m *txSortedMap) reheap() { + *m.index = make([]uint64, 0, len(m.items)) + for nonce := range m.items { + *m.index = append(*m.index, nonce) + } + heap.Init(m.index) + m.cache = nil +} + +// filter is identical to Filter, but **does not** regenerate the heap. This method +// should only be used if followed immediately by a call to Filter or reheap() +func (m *txSortedMap) filter(filter func(*types.Transaction) bool) types.Transactions { var removed types.Transactions // Collect all the transactions to filter out @@ -109,14 +132,7 @@ func (m *txSortedMap) Filter(filter func(*types.Transaction) bool) types.Transac delete(m.items, nonce) } } - // If transactions were removed, the heap and cache are ruined if len(removed) > 0 { - *m.index = make([]uint64, 0, len(m.items)) - for nonce := range m.items { - *m.index = append(*m.index, nonce) - } - heap.Init(m.index) - m.cache = nil } return removed @@ -197,10 +213,7 @@ func (m *txSortedMap) Len() int { return len(m.items) } -// Flatten creates a nonce-sorted slice of transactions based on the loosely -// sorted internal representation. The result of the sorting is cached in case -// it's requested again before any modifications are made to the contents. -func (m *txSortedMap) Flatten() types.Transactions { +func (m *txSortedMap) flatten() types.Transactions { // If the sorting was not cached yet, create and cache it if m.cache == nil { m.cache = make(types.Transactions, 0, len(m.items)) @@ -209,12 +222,27 @@ func (m *txSortedMap) Flatten() types.Transactions { } sort.Sort(types.TxByNonce(m.cache)) } + return m.cache +} + +// Flatten creates a nonce-sorted slice of transactions based on the loosely +// sorted internal representation. The result of the sorting is cached in case +// it's requested again before any modifications are made to the contents. +func (m *txSortedMap) Flatten() types.Transactions { // Copy the cache to prevent accidental modifications - txs := make(types.Transactions, len(m.cache)) - copy(txs, m.cache) + cache := m.flatten() + txs := make(types.Transactions, len(cache)) + copy(txs, cache) return txs } +// LastElement returns the last element of a flattened list, thus, the +// transaction with the highest nonce +func (m *txSortedMap) LastElement() *types.Transaction { + cache := m.flatten() + return cache[len(cache)-1] +} + // txList is a "list" of transactions belonging to an account, sorted by account // nonce. The same type can be used both for storing contiguous transactions for // the executable/pending queue; and for storing gapped transactions for the non- @@ -223,17 +251,16 @@ type txList struct { strict bool // Whether nonces are strictly continuous or not txs *txSortedMap // Heap indexed sorted hash map of the transactions - costcap *big.Int // Price of the highest costing transaction (reset only if exceeds balance) - gascap uint64 // Gas limit of the highest spending transaction (reset only if exceeds block limit) + costcap uint64 // Price of the highest costing transaction (reset only if exceeds balance) + gascap uint64 // Gas limit of the highest spending transaction (reset only if exceeds block limit) } // newTxList create a new transaction list for maintaining nonce-indexable fast, // gapped, sortable transaction lists. func newTxList(strict bool) *txList { return &txList{ - strict: strict, - txs: newTxSortedMap(), - costcap: new(big.Int), + strict: strict, + txs: newTxSortedMap(), } } @@ -252,7 +279,11 @@ func (l *txList) Add(tx *types.Transaction, priceBump uint64) (bool, *types.Tran // If there's an older better transaction, abort old := l.txs.Get(tx.Nonce()) if old != nil { - threshold := new(big.Int).Div(new(big.Int).Mul(old.GasPrice(), big.NewInt(100+int64(priceBump))), big.NewInt(100)) + // threshold = oldGP * (100 + priceBump) / 100 + a := big.NewInt(100 + int64(priceBump)) + a = a.Mul(a, old.GasPrice()) + b := big.NewInt(100) + threshold := a.Div(a, b) // Have to ensure that the new gas price is higher than the old gas // price as well as checking the percentage threshold to ensure that // this is accurate for low (Wei-level) gas price replacements @@ -260,9 +291,14 @@ func (l *txList) Add(tx *types.Transaction, priceBump uint64) (bool, *types.Tran return false, nil } } + cost, overflow := tx.CostU64() + if overflow { + log.Warn("transaction cost overflown, txHash: %v txCost: %v", tx.Hash(), cost) + return false, nil + } // Otherwise overwrite the old transaction with the current one l.txs.Put(tx) - if cost := tx.Cost(); l.costcap.Cmp(cost) < 0 { + if l.costcap < cost { l.costcap = cost } if gas := tx.Gas(); l.gascap < gas { @@ -287,29 +323,35 @@ func (l *txList) Forward(threshold uint64) types.Transactions { // a point in calculating all the costs or if the balance covers all. If the threshold // is lower than the costgas cap, the caps will be reset to a new high after removing // the newly invalidated transactions. -func (l *txList) Filter(costLimit *big.Int, gasLimit uint64) (types.Transactions, types.Transactions) { +func (l *txList) Filter(costLimit uint64, gasLimit uint64) (types.Transactions, types.Transactions) { // If all transactions are below the threshold, short circuit - if l.costcap.Cmp(costLimit) <= 0 && l.gascap <= gasLimit { + if l.costcap <= costLimit && l.gascap <= gasLimit { return nil, nil } - l.costcap = new(big.Int).Set(costLimit) // Lower the caps to the thresholds + l.costcap = costLimit // Lower the caps to the thresholds l.gascap = gasLimit // Filter out all the transactions above the account's funds - removed := l.txs.Filter(func(tx *types.Transaction) bool { return tx.Cost().Cmp(costLimit) > 0 || tx.Gas() > gasLimit }) + removed := l.txs.filter(func(tx *types.Transaction) bool { + cost, _ := tx.CostU64() + return cost > costLimit || tx.Gas() > gasLimit + }) - // If the list was strict, filter anything above the lowest nonce + if len(removed) == 0 { + return nil, nil + } var invalids types.Transactions - - if l.strict && len(removed) > 0 { + // If the list was strict, filter anything above the lowest nonce + if l.strict { lowest := uint64(math.MaxUint64) for _, tx := range removed { if nonce := tx.Nonce(); lowest > nonce { lowest = nonce } } - invalids = l.txs.Filter(func(tx *types.Transaction) bool { return tx.Nonce() > lowest }) + invalids = l.txs.filter(func(tx *types.Transaction) bool { return tx.Nonce() > lowest }) } + l.txs.reheap() return removed, invalids } @@ -363,6 +405,12 @@ func (l *txList) Flatten() types.Transactions { return l.txs.Flatten() } +// LastElement returns the last element of a flattened list, thus, the +// transaction with the highest nonce +func (l *txList) LastElement() *types.Transaction { + return l.txs.LastElement() +} + // priceHeap is a heap.Interface implementation over transactions for retrieving // price-sorted transactions to discard when the pool fills up. type priceHeap []*types.Transaction @@ -495,8 +543,29 @@ func (l *txPricedList) Underpriced(tx *types.Transaction, local *accountSet) boo // Discard finds a number of most underpriced transactions, removes them from the // priced list and returns them for further removal from the entire pool. func (l *txPricedList) Discard(slots int, local *accountSet) types.Transactions { - drop := make(types.Transactions, 0, slots) // Remote underpriced transactions to drop - save := make(types.Transactions, 0, 64) // Local underpriced transactions to keep + // If we have some local accountset, those will not be discarded + if !local.empty() { + // In case the list is filled to the brim with 'local' txs, we do this + // little check to avoid unpacking / repacking the heap later on, which + // is very expensive + discardable := 0 + for _, tx := range *l.items { + if !local.containsTx(tx) { + discardable++ + } + if discardable >= slots { + break + } + } + if slots > discardable { + slots = discardable + } + } + if slots == 0 { + return nil + } + drop := make(types.Transactions, 0, slots) // Remote underpriced transactions to drop + save := make(types.Transactions, 0, len(*l.items)-slots) // Local underpriced transactions to keep for len(*l.items) > 0 && slots > 0 { // Discard stale transactions if found during cleanup diff --git a/core/tx_list_test.go b/core/tx_list_test.go index 3a5842d2e8..d9f4eba267 100644 --- a/core/tx_list_test.go +++ b/core/tx_list_test.go @@ -17,7 +17,6 @@ package core import ( - "math/big" "math/rand" "testing" @@ -51,20 +50,22 @@ func TestStrictTxListAdd(t *testing.T) { } } -func BenchmarkTxListAdd(t *testing.B) { +func BenchmarkTxListAdd(b *testing.B) { // Generate a list of transactions to insert key, _ := crypto.GenerateKey() - txs := make(types.Transactions, 100000) + txs := make(types.Transactions, 2000) for i := 0; i < len(txs); i++ { txs[i] = transaction(uint64(i), 0, key) } // Insert the transactions in a random order - list := newTxList(true) - priceLimit := big.NewInt(int64(DefaultTxPoolConfig.PriceLimit)) - t.ResetTimer() - for _, v := range rand.Perm(len(txs)) { - list.Add(txs[v], DefaultTxPoolConfig.PriceBump) - list.Filter(priceLimit, DefaultTxPoolConfig.PriceBump) + b.ResetTimer() + priceLimit := DefaultTxPoolConfig.PriceLimit + for i := 0; i < b.N; i++ { + list := newTxList(true) + for _, v := range rand.Perm(len(txs)) { + list.Add(txs[v], DefaultTxPoolConfig.PriceBump) + list.Filter(priceLimit, DefaultTxPoolConfig.PriceBump) + } } } diff --git a/core/tx_pool.go b/core/tx_pool.go index 350acc81b4..2a4a994d47 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -543,7 +543,11 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error { } // Transactor should have enough funds to cover the costs // cost == V + GP * GL - if pool.currentState.GetBalance(from).Cmp(tx.Cost()) < 0 { + cost, overflow := tx.CostU64() + if overflow { + return ErrInsufficientFunds + } + if pool.currentState.GetBalance(from).Uint64() < cost { return ErrInsufficientFunds } // Ensure the transaction has more gas than the basic tx fee. @@ -1059,8 +1063,8 @@ func (pool *TxPool) runReorg(done chan struct{}, reset *txpoolResetRequest, dirt // Update all accounts to the latest known pending nonce for addr, list := range pool.pending { - txs := list.Flatten() // Heavy but will be cached and is needed by the miner anyway - pool.pendingNonces.set(addr, txs[len(txs)-1].Nonce()+1) + highestPending := list.LastElement() + pool.pendingNonces.set(addr, highestPending.Nonce()+1) } pool.mu.Unlock() @@ -1190,7 +1194,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) []*types.Trans } log.Trace("Removed old queued transactions", "count", len(forwards)) // Drop all transactions that are too costly (low balance or out of gas) - drops, _ := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas) + drops, _ := list.Filter(pool.currentState.GetBalance(addr).Uint64(), pool.currentMaxGas) for _, tx := range drops { hash := tx.Hash() pool.all.Remove(hash) @@ -1382,7 +1386,7 @@ func (pool *TxPool) demoteUnexecutables() { log.Trace("Removed old pending transaction", "hash", hash) } // Drop all transactions that are too costly (low balance or out of gas), and queue any invalids back for later - drops, invalids := list.Filter(pool.currentState.GetBalance(addr), pool.currentMaxGas) + drops, invalids := list.Filter(pool.currentState.GetBalance(addr).Uint64(), pool.currentMaxGas) for _, tx := range drops { hash := tx.Hash() log.Trace("Removed unpayable pending transaction", "hash", hash) @@ -1457,6 +1461,10 @@ func (as *accountSet) contains(addr common.Address) bool { return exist } +func (as *accountSet) empty() bool { + return len(as.accounts) == 0 +} + // containsTx checks if the sender of a given tx is within the set. If the sender // cannot be derived, this method returns false. func (as *accountSet) containsTx(tx *types.Transaction) bool { diff --git a/core/tx_pool_test.go b/core/tx_pool_test.go index 78290862e0..be06577e37 100644 --- a/core/tx_pool_test.go +++ b/core/tx_pool_test.go @@ -1889,11 +1889,15 @@ func benchmarkFuturePromotion(b *testing.B, size int) { } // Benchmarks the speed of batched transaction insertion. -func BenchmarkPoolBatchInsert100(b *testing.B) { benchmarkPoolBatchInsert(b, 100) } -func BenchmarkPoolBatchInsert1000(b *testing.B) { benchmarkPoolBatchInsert(b, 1000) } -func BenchmarkPoolBatchInsert10000(b *testing.B) { benchmarkPoolBatchInsert(b, 10000) } +func BenchmarkPoolBatchInsert100(b *testing.B) { benchmarkPoolBatchInsert(b, 100, false) } +func BenchmarkPoolBatchInsert1000(b *testing.B) { benchmarkPoolBatchInsert(b, 1000, false) } +func BenchmarkPoolBatchInsert10000(b *testing.B) { benchmarkPoolBatchInsert(b, 10000, false) } -func benchmarkPoolBatchInsert(b *testing.B, size int) { +func BenchmarkPoolBatchLocalInsert100(b *testing.B) { benchmarkPoolBatchInsert(b, 100, true) } +func BenchmarkPoolBatchLocalInsert1000(b *testing.B) { benchmarkPoolBatchInsert(b, 1000, true) } +func BenchmarkPoolBatchLocalInsert10000(b *testing.B) { benchmarkPoolBatchInsert(b, 10000, true) } + +func benchmarkPoolBatchInsert(b *testing.B, size int, local bool) { // Generate a batch of transactions to enqueue into the pool pool, key := setupTxPool() defer pool.Stop() @@ -1911,6 +1915,10 @@ func benchmarkPoolBatchInsert(b *testing.B, size int) { // Benchmark importing the transactions into the queue b.ResetTimer() for _, batch := range batches { - pool.AddRemotes(batch) + if local { + pool.AddLocals(batch) + } else { + pool.AddRemotes(batch) + } } } diff --git a/core/types/transaction.go b/core/types/transaction.go index da691bb03f..347db2da0b 100644 --- a/core/types/transaction.go +++ b/core/types/transaction.go @@ -25,6 +25,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/common/math" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/rlp" ) @@ -41,6 +42,7 @@ type Transaction struct { hash atomic.Value size atomic.Value from atomic.Value + cost atomic.Value } type txdata struct { @@ -258,6 +260,15 @@ func (tx *Transaction) Cost() *big.Int { return total } +func (tx *Transaction) CostU64() (uint64, bool) { + if tx.data.Price.BitLen() > 63 || tx.data.Amount.BitLen() > 63 { + return 0, false + } + cost, overflowMul := math.SafeMul(tx.data.Price.Uint64(), tx.data.GasLimit) + total, overflowAdd := math.SafeAdd(cost, tx.data.Amount.Uint64()) + return total, overflowMul || overflowAdd +} + // RawSignatureValues returns the V, R, S signature values of the transaction. // The return values should not be modified by the caller. func (tx *Transaction) RawSignatureValues() (v, r, s *big.Int) {