From 6c9f040ebeafcc680b0c457e6f4886e2bca32527 Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Tue, 14 Jul 2020 21:42:32 +0200 Subject: [PATCH] core: transaction pool optimizations (#21328) * core: added local tx pool test case * core, crypto: various allocation savings regarding tx handling * core/txlist, txpool: save a reheap operation, avoid some bigint allocs Co-authored-by: Marius van der Wijden --- core/tx_list.go | 106 ++++++++++++++++++++++++++++++++++--------- core/tx_pool.go | 8 +++- core/tx_pool_test.go | 18 ++++++-- 3 files changed, 104 insertions(+), 28 deletions(-) diff --git a/core/tx_list.go b/core/tx_list.go index 164c73006b..bf304eedcf 100644 --- a/core/tx_list.go +++ b/core/tx_list.go @@ -99,7 +99,30 @@ func (m *txSortedMap) Forward(threshold uint64) types.Transactions { // Filter iterates over the list of transactions and removes all of them for which // the specified function evaluates to true. +// Filter, as opposed to 'filter', re-initialises the heap after the operation is done. +// If you want to do several consecutive filterings, it's therefore better to first +// do a .filter(func1) followed by .Filter(func2) or reheap() func (m *txSortedMap) Filter(filter func(*types.Transaction) bool) types.Transactions { + removed := m.filter(filter) + // If transactions were removed, the heap and cache are ruined + if len(removed) > 0 { + m.reheap() + } + return removed +} + +func (m *txSortedMap) reheap() { + *m.index = make([]uint64, 0, len(m.items)) + for nonce := range m.items { + *m.index = append(*m.index, nonce) + } + heap.Init(m.index) + m.cache = nil +} + +// filter is identical to Filter, but **does not** regenerate the heap. This method +// should only be used if followed immediately by a call to Filter or reheap() +func (m *txSortedMap) filter(filter func(*types.Transaction) bool) types.Transactions { var removed types.Transactions // Collect all the transactions to filter out @@ -109,14 +132,7 @@ func (m *txSortedMap) Filter(filter func(*types.Transaction) bool) types.Transac delete(m.items, nonce) } } - // If transactions were removed, the heap and cache are ruined if len(removed) > 0 { - *m.index = make([]uint64, 0, len(m.items)) - for nonce := range m.items { - *m.index = append(*m.index, nonce) - } - heap.Init(m.index) - m.cache = nil } return removed @@ -197,10 +213,7 @@ func (m *txSortedMap) Len() int { return len(m.items) } -// Flatten creates a nonce-sorted slice of transactions based on the loosely -// sorted internal representation. The result of the sorting is cached in case -// it's requested again before any modifications are made to the contents. -func (m *txSortedMap) Flatten() types.Transactions { +func (m *txSortedMap) flatten() types.Transactions { // If the sorting was not cached yet, create and cache it if m.cache == nil { m.cache = make(types.Transactions, 0, len(m.items)) @@ -209,12 +222,27 @@ func (m *txSortedMap) Flatten() types.Transactions { } sort.Sort(types.TxByNonce(m.cache)) } + return m.cache +} + +// Flatten creates a nonce-sorted slice of transactions based on the loosely +// sorted internal representation. The result of the sorting is cached in case +// it's requested again before any modifications are made to the contents. +func (m *txSortedMap) Flatten() types.Transactions { // Copy the cache to prevent accidental modifications - txs := make(types.Transactions, len(m.cache)) - copy(txs, m.cache) + cache := m.flatten() + txs := make(types.Transactions, len(cache)) + copy(txs, cache) return txs } +// LastElement returns the last element of a flattened list, thus, the +// transaction with the highest nonce +func (m *txSortedMap) LastElement() *types.Transaction { + cache := m.flatten() + return cache[len(cache)-1] +} + // txList is a "list" of transactions belonging to an account, sorted by account // nonce. The same type can be used both for storing contiguous transactions for // the executable/pending queue; and for storing gapped transactions for the non- @@ -252,7 +280,11 @@ func (l *txList) Add(tx *types.Transaction, priceBump uint64) (bool, *types.Tran // If there's an older better transaction, abort old := l.txs.Get(tx.Nonce()) if old != nil { - threshold := new(big.Int).Div(new(big.Int).Mul(old.GasPrice(), big.NewInt(100+int64(priceBump))), big.NewInt(100)) + // threshold = oldGP * (100 + priceBump) / 100 + a := big.NewInt(100 + int64(priceBump)) + a = a.Mul(a, old.GasPrice()) + b := big.NewInt(100) + threshold := a.Div(a, b) // Have to ensure that the new gas price is higher than the old gas // price as well as checking the percentage threshold to ensure that // this is accurate for low (Wei-level) gas price replacements @@ -296,20 +328,25 @@ func (l *txList) Filter(costLimit *big.Int, gasLimit uint64) (types.Transactions l.gascap = gasLimit // Filter out all the transactions above the account's funds - removed := l.txs.Filter(func(tx *types.Transaction) bool { return tx.Cost().Cmp(costLimit) > 0 || tx.Gas() > gasLimit }) + removed := l.txs.Filter(func(tx *types.Transaction) bool { + return tx.Gas() > gasLimit || tx.Cost().Cmp(costLimit) > 0 + }) - // If the list was strict, filter anything above the lowest nonce + if len(removed) == 0 { + return nil, nil + } var invalids types.Transactions - - if l.strict && len(removed) > 0 { + // If the list was strict, filter anything above the lowest nonce + if l.strict { lowest := uint64(math.MaxUint64) for _, tx := range removed { if nonce := tx.Nonce(); lowest > nonce { lowest = nonce } } - invalids = l.txs.Filter(func(tx *types.Transaction) bool { return tx.Nonce() > lowest }) + invalids = l.txs.filter(func(tx *types.Transaction) bool { return tx.Nonce() > lowest }) } + l.txs.reheap() return removed, invalids } @@ -363,6 +400,12 @@ func (l *txList) Flatten() types.Transactions { return l.txs.Flatten() } +// LastElement returns the last element of a flattened list, thus, the +// transaction with the highest nonce +func (l *txList) LastElement() *types.Transaction { + return l.txs.LastElement() +} + // priceHeap is a heap.Interface implementation over transactions for retrieving // price-sorted transactions to discard when the pool fills up. type priceHeap []*types.Transaction @@ -495,8 +538,29 @@ func (l *txPricedList) Underpriced(tx *types.Transaction, local *accountSet) boo // Discard finds a number of most underpriced transactions, removes them from the // priced list and returns them for further removal from the entire pool. func (l *txPricedList) Discard(slots int, local *accountSet) types.Transactions { - drop := make(types.Transactions, 0, slots) // Remote underpriced transactions to drop - save := make(types.Transactions, 0, 64) // Local underpriced transactions to keep + // If we have some local accountset, those will not be discarded + if !local.empty() { + // In case the list is filled to the brim with 'local' txs, we do this + // little check to avoid unpacking / repacking the heap later on, which + // is very expensive + discardable := 0 + for _, tx := range *l.items { + if !local.containsTx(tx) { + discardable++ + } + if discardable >= slots { + break + } + } + if slots > discardable { + slots = discardable + } + } + if slots == 0 { + return nil + } + drop := make(types.Transactions, 0, slots) // Remote underpriced transactions to drop + save := make(types.Transactions, 0, len(*l.items)-slots) // Local underpriced transactions to keep for len(*l.items) > 0 && slots > 0 { // Discard stale transactions if found during cleanup diff --git a/core/tx_pool.go b/core/tx_pool.go index 350acc81b4..1636cc5066 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -1059,8 +1059,8 @@ func (pool *TxPool) runReorg(done chan struct{}, reset *txpoolResetRequest, dirt // Update all accounts to the latest known pending nonce for addr, list := range pool.pending { - txs := list.Flatten() // Heavy but will be cached and is needed by the miner anyway - pool.pendingNonces.set(addr, txs[len(txs)-1].Nonce()+1) + highestPending := list.LastElement() + pool.pendingNonces.set(addr, highestPending.Nonce()+1) } pool.mu.Unlock() @@ -1457,6 +1457,10 @@ func (as *accountSet) contains(addr common.Address) bool { return exist } +func (as *accountSet) empty() bool { + return len(as.accounts) == 0 +} + // containsTx checks if the sender of a given tx is within the set. If the sender // cannot be derived, this method returns false. func (as *accountSet) containsTx(tx *types.Transaction) bool { diff --git a/core/tx_pool_test.go b/core/tx_pool_test.go index 305b3666e6..c436a309f3 100644 --- a/core/tx_pool_test.go +++ b/core/tx_pool_test.go @@ -1890,11 +1890,15 @@ func benchmarkFuturePromotion(b *testing.B, size int) { } // Benchmarks the speed of batched transaction insertion. -func BenchmarkPoolBatchInsert100(b *testing.B) { benchmarkPoolBatchInsert(b, 100) } -func BenchmarkPoolBatchInsert1000(b *testing.B) { benchmarkPoolBatchInsert(b, 1000) } -func BenchmarkPoolBatchInsert10000(b *testing.B) { benchmarkPoolBatchInsert(b, 10000) } +func BenchmarkPoolBatchInsert100(b *testing.B) { benchmarkPoolBatchInsert(b, 100, false) } +func BenchmarkPoolBatchInsert1000(b *testing.B) { benchmarkPoolBatchInsert(b, 1000, false) } +func BenchmarkPoolBatchInsert10000(b *testing.B) { benchmarkPoolBatchInsert(b, 10000, false) } -func benchmarkPoolBatchInsert(b *testing.B, size int) { +func BenchmarkPoolBatchLocalInsert100(b *testing.B) { benchmarkPoolBatchInsert(b, 100, true) } +func BenchmarkPoolBatchLocalInsert1000(b *testing.B) { benchmarkPoolBatchInsert(b, 1000, true) } +func BenchmarkPoolBatchLocalInsert10000(b *testing.B) { benchmarkPoolBatchInsert(b, 10000, true) } + +func benchmarkPoolBatchInsert(b *testing.B, size int, local bool) { // Generate a batch of transactions to enqueue into the pool pool, key := setupTxPool() defer pool.Stop() @@ -1912,6 +1916,10 @@ func benchmarkPoolBatchInsert(b *testing.B, size int) { // Benchmark importing the transactions into the queue b.ResetTimer() for _, batch := range batches { - pool.AddRemotes(batch) + if local { + pool.AddLocals(batch) + } else { + pool.AddRemotes(batch) + } } }