From b4a52513915d5a39ac055fc38cafed70098eb698 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Thu, 25 Aug 2016 19:04:40 +0300 Subject: [PATCH] core: abstract out a sorted transaction hash map --- core/tx_list.go | 395 ++++++++++++++++++++++--------------------- core/tx_list_test.go | 14 +- core/tx_pool.go | 16 +- core/tx_pool_test.go | 26 +-- 4 files changed, 229 insertions(+), 222 deletions(-) diff --git a/core/tx_list.go b/core/tx_list.go index 8c69331cc4..c3ddf3148e 100644 --- a/core/tx_list.go +++ b/core/tx_list.go @@ -45,20 +45,182 @@ func (h *nonceHeap) Pop() interface{} { return x } +// txSortedMap is a nonce->transaction hash map with a heap based index to allow +// iterating over the contents in a nonce-incrementing way. +type txSortedMap struct { + items map[uint64]*types.Transaction // Hash map storing the transaction data + index *nonceHeap // Heap of nonces of all the stored transactions (non-strict mode) + cache types.Transactions // Cache of the transactions already sorted +} + +// newTxSortedMap creates a new sorted transaction map. +func newTxSortedMap() *txSortedMap { + return &txSortedMap{ + items: make(map[uint64]*types.Transaction), + index: &nonceHeap{}, + } +} + +// Get retrieves the current transactions associated with the given nonce. +func (m *txSortedMap) Get(nonce uint64) *types.Transaction { + return m.items[nonce] +} + +// Put inserts a new transaction into the map, also updating the map's nonce +// index. If a transaction already exists with the same nonce, it's overwritten. +func (m *txSortedMap) Put(tx *types.Transaction) { + nonce := tx.Nonce() + if m.items[nonce] == nil { + heap.Push(m.index, nonce) + } + m.items[nonce], m.cache = tx, nil +} + +// Forward removes all transactions from the map with a nonce lower than the +// provided threshold. Every removed transaction is returned for any post-removal +// maintenance. +func (m *txSortedMap) Forward(threshold uint64) types.Transactions { + var removed types.Transactions + + // Pop off heap items until the threshold is reached + for m.index.Len() > 0 && (*m.index)[0] < threshold { + nonce := heap.Pop(m.index).(uint64) + removed = append(removed, m.items[nonce]) + delete(m.items, nonce) + } + // If we had a cached order, shift the front + if m.cache != nil { + m.cache = m.cache[len(removed):] + } + return removed +} + +// Filter iterates over the list of transactions and removes all of them for which +// the specified function evaluates to true. +func (m *txSortedMap) Filter(filter func(*types.Transaction) bool) types.Transactions { + var removed types.Transactions + + // Collect all the transactions to filter out + for nonce, tx := range m.items { + if filter(tx) { + removed = append(removed, tx) + delete(m.items, nonce) + } + } + // If transactions were removed, the heap and cache are ruined + if len(removed) > 0 { + *m.index = make([]uint64, 0, len(m.items)) + for nonce, _ := range m.items { + *m.index = append(*m.index, nonce) + } + heap.Init(m.index) + + m.cache = nil + } + return removed +} + +// Cap places a hard limit on the number of items, returning all transactions +// exceeding that limit. +func (m *txSortedMap) Cap(threshold int) types.Transactions { + // Short circuit if the number of items is under the limit + if len(m.items) <= threshold { + return nil + } + // Otherwise gather and drop the highest nonce'd transactions + var drops types.Transactions + + sort.Sort(*m.index) + for size := len(m.items); size > threshold; size-- { + drops = append(drops, m.items[(*m.index)[size-1]]) + delete(m.items, (*m.index)[size-1]) + } + *m.index = (*m.index)[:threshold] + heap.Init(m.index) + + // If we had a cache, shift the back + if m.cache != nil { + m.cache = m.cache[:len(m.cache)-len(drops)] + } + return drops +} + +// Remove deletes a transaction from the maintained map, returning whether the +// transaction was found. +func (m *txSortedMap) Remove(nonce uint64) bool { + // Short circuit if no transaction is present + _, ok := m.items[nonce] + if !ok { + return false + } + // Otherwise delete the transaction and fix the heap index + for i := 0; i < m.index.Len(); i++ { + if (*m.index)[i] == nonce { + heap.Remove(m.index, i) + break + } + } + delete(m.items, nonce) + m.cache = nil + + return true +} + +// Ready retrieves a sequentially increasing list of transactions starting at the +// provided nonce that is ready for processing. The returned transactions will be +// removed from the list. +// +// Note, all transactions with nonces lower than start will also be returned to +// prevent getting into and invalid state. This is not something that should ever +// happen but better to be self correcting than failing! +func (m *txSortedMap) Ready(start uint64) types.Transactions { + // Short circuit if no transactions are available + if m.index.Len() == 0 || (*m.index)[0] > start { + return nil + } + // Otherwise start accumulating incremental transactions + var ready types.Transactions + for next := (*m.index)[0]; m.index.Len() > 0 && (*m.index)[0] == next; next++ { + ready = append(ready, m.items[next]) + delete(m.items, next) + heap.Pop(m.index) + } + m.cache = nil + + return ready +} + +// Len returns the length of the transaction map. +func (m *txSortedMap) Len() int { + return len(m.items) +} + +// Flatten creates a nonce-sorted slice of transactions based on the loosely +// sorted internal representation. The result of the sorting is cached in case +// it's requested again before any modifications are made to the contents. +func (m *txSortedMap) Flatten() types.Transactions { + // If the sorting was not cached yet, create and cache it + if m.cache == nil { + m.cache = make(types.Transactions, 0, len(m.items)) + for _, tx := range m.items { + m.cache = append(m.cache, tx) + } + sort.Sort(types.TxByNonce(m.cache)) + } + // Copy the cache to prevent accidental modifications + txs := make(types.Transactions, len(m.cache)) + copy(txs, m.cache) + return txs +} + // txList is a "list" of transactions belonging to an account, sorted by account // nonce. The same type can be used both for storing contiguous transactions for // the executable/pending queue; and for storing gapped transactions for the non- // executable/future queue, with minor behavoiral changes. type txList struct { - strict bool // Whether nonces are strictly continuous or not - items map[uint64]*types.Transaction // Hash map storing the transaction data - cache types.Transactions // Cache of the transactions already sorted - - first uint64 // Nonce of the lowest stored transaction (strict mode) - last uint64 // Nonce of the highest stored transaction (strict mode) - index *nonceHeap // Heap of nonces of all the stored transactions (non-strict mode) - - costcap *big.Int // Price of the highest costing transaction (reset only if exceeds balance) + strict bool // Whether nonces are strictly continuous or not + txs *txSortedMap // Heap indexed sorted hash map of the transactions + costcap *big.Int // Price of the highest costing transaction (reset only if exceeds balance) } // newTxList create a new transaction list for maintaining nonce-indexable fast, @@ -66,9 +228,7 @@ type txList struct { func newTxList(strict bool) *txList { return &txList{ strict: strict, - items: make(map[uint64]*types.Transaction), - first: math.MaxUint64, - index: &nonceHeap{}, + txs: newTxSortedMap(), costcap: new(big.Int), } } @@ -76,36 +236,19 @@ func newTxList(strict bool) *txList { // Add tries to insert a new transaction into the list, returning whether the // transaction was accepted, and if yes, any previous transaction it replaced. // -// In case of strict lists (contiguous nonces) the nonce boundaries are updated -// appropriately with the new transaction. Otherwise (gapped nonces) the heap of -// nonces is expanded with the new transaction. +// If the new transaction is accepted into the list, the lists' cost threshold +// is also potentially updated. func (l *txList) Add(tx *types.Transaction) (bool, *types.Transaction) { - // If an existing transaction is better, discard new one - nonce := tx.Nonce() - - old, ok := l.items[nonce] - if ok && old.GasPrice().Cmp(tx.GasPrice()) >= 0 { + // If there's an older better transaction, abort + old := l.txs.Get(tx.Nonce()) + if old != nil && old.GasPrice().Cmp(tx.GasPrice()) >= 0 { return false, nil } - // Otherwise insert the transaction and replace any previous one - l.items[nonce] = tx + // Otherwise overwrite the old transaction with the current one + l.txs.Put(tx) if cost := tx.Cost(); l.costcap.Cmp(cost) < 0 { l.costcap = cost } - if l.strict { - // In strict mode, maintain the nonce sequence boundaries - if nonce < l.first { - l.first = nonce - } - if nonce > l.last { - l.last = nonce - } - } else { - // In gapped mode, maintain the nonce heap - heap.Push(l.index, nonce) - } - l.cache = nil - return true, old } @@ -113,31 +256,7 @@ func (l *txList) Add(tx *types.Transaction) (bool, *types.Transaction) { // provided threshold. Every removed transaction is returned for any post-removal // maintenance. func (l *txList) Forward(threshold uint64) types.Transactions { - var removed types.Transactions - - if l.strict { - // In strict mode, push the lowest nonce forward to the threshold - for l.first < threshold { - if tx, ok := l.items[l.first]; ok { - removed = append(removed, tx) - } - delete(l.items, l.first) - l.first++ - } - if l.first > l.last { - l.last = l.first - } - } else { - // In gapped mode, pop off heap items until the threshold is reached - for l.index.Len() > 0 && (*l.index)[0] < threshold { - nonce := heap.Pop(l.index).(uint64) - removed = append(removed, l.items[nonce]) - delete(l.items, nonce) - } - } - l.cache = nil - - return removed + return l.txs.Forward(threshold) } // Filter removes all transactions from the list with a cost higher than the @@ -155,110 +274,43 @@ func (l *txList) Filter(threshold *big.Int) (types.Transactions, types.Transacti } l.costcap = new(big.Int).Set(threshold) // Lower the cap to the threshold - // Gather all the transactions needing deletion - var removed types.Transactions - for _, tx := range l.items { - if cost := tx.Cost(); cost.Cmp(threshold) > 0 { - removed = append(removed, tx) - delete(l.items, tx.Nonce()) - } - } - // Readjust the nonce boundaries/indexes and gather invalidate tranactions + // Filter out all the transactions above the account's funds + removed := l.txs.Filter(func(tx *types.Transaction) bool { return tx.Cost().Cmp(threshold) > 0 }) + + // If the list was strict, filter anything above the lowest nonce var invalids types.Transactions - if l.strict { - // In strict mode iterate find the first gap and invalidate everything after it - for i := l.first; i <= l.last; i++ { - if _, ok := l.items[i]; !ok { - // Gap found, invalidate all subsequent transactions - for j := i + 1; j <= l.last; j++ { - if tx, ok := l.items[j]; ok { - invalids = append(invalids, tx) - delete(l.items, j) - } - } - // Reduce the highest transaction nonce and return - l.last = i - 1 - break + if l.strict && len(removed) > 0 { + lowest := uint64(math.MaxUint64) + for _, tx := range removed { + if nonce := tx.Nonce(); lowest > nonce { + lowest = nonce } } - } else { - // In gapped mode no transactions are invalid, but the heap is ruined - l.index = &nonceHeap{} - for nonce, _ := range l.items { - *l.index = append(*l.index, nonce) - } - heap.Init(l.index) + invalids = l.txs.Filter(func(tx *types.Transaction) bool { return tx.Nonce() > lowest }) } - l.cache = nil - return removed, invalids } // Cap places a hard limit on the number of items, returning all transactions // exceeding that limit. func (l *txList) Cap(threshold int) types.Transactions { - // Short circuit if the number of items is under the limit - if len(l.items) < threshold { - return nil - } - // Otherwise gather and drop the highest nonce'd transactions - var drops types.Transactions - - if l.strict { - // In strict mode, just gather top down from last to first - for len(l.items) > threshold { - if tx, ok := l.items[l.last]; ok { - drops = append(drops, tx) - delete(l.items, l.last) - l.last-- - } - } - } else { - // In gapped mode it's expensive: we need to sort and drop like that - sort.Sort(*l.index) - for size := len(l.items); size > threshold; size-- { - drops = append(drops, l.items[(*l.index)[size-1]]) - delete(l.items, (*l.index)[size-1]) - *l.index = (*l.index)[:size-1] - } - heap.Init(l.index) - } - l.cache = nil - - return drops + return l.txs.Cap(threshold) } // Remove deletes a transaction from the maintained list, returning whether the // transaction was found, and also returning any transaction invalidated due to // the deletion (strict mode only). func (l *txList) Remove(tx *types.Transaction) (bool, types.Transactions) { + // Remove the transaction from the set nonce := tx.Nonce() - if _, ok := l.items[nonce]; ok { - // Remove the item and invalidate the sorted cache - delete(l.items, nonce) - l.cache = nil - - // Remove all invalidated transactions (strict mode only!) - var invalids types.Transactions - if l.strict { - invalids = make(types.Transactions, 0, l.last-nonce) - for i := nonce + 1; i <= l.last; i++ { - invalids = append(invalids, l.items[i]) - delete(l.items, i) - } - l.last = nonce - 1 - } else { - // In gapped mode, remove the nonce from the index but honour the heap - for i := 0; i < l.index.Len(); i++ { - if (*l.index)[i] == nonce { - heap.Remove(l.index, i) - break - } - } - } - return true, invalids + if removed := l.txs.Remove(nonce); !removed { + return false, nil } - return false, nil + // In strict mode, filter out non-executable transactions + if l.strict { + return true, l.txs.Filter(func(tx *types.Transaction) bool { return tx.Nonce() > nonce }) + } + return true, nil } // Ready retrieves a sequentially increasing list of transactions starting at the @@ -269,63 +321,22 @@ func (l *txList) Remove(tx *types.Transaction) (bool, types.Transactions) { // prevent getting into and invalid state. This is not something that should ever // happen but better to be self correcting than failing! func (l *txList) Ready(start uint64) types.Transactions { - var txs types.Transactions - if l.strict { - // In strict mode make sure we have valid transaction, return all contiguous - if l.first > start { - return nil - } - for { - if tx, ok := l.items[l.first]; ok { - txs = append(txs, tx) - delete(l.items, l.first) - l.first++ - continue - } - break - } - } else { - // In gapped mode, check the heap start and return all contiguous - if l.index.Len() == 0 || (*l.index)[0] > start { - return nil - } - next := (*l.index)[0] - for l.index.Len() > 0 && (*l.index)[0] == next { - txs = append(txs, l.items[next]) - delete(l.items, next) - heap.Pop(l.index) - next++ - } - } - l.cache = nil - - return txs + return l.txs.Ready(start) } // Len returns the length of the transaction list. func (l *txList) Len() int { - return len(l.items) + return l.txs.Len() } // Empty returns whether the list of transactions is empty or not. func (l *txList) Empty() bool { - return len(l.items) == 0 + return l.Len() == 0 } // Flatten creates a nonce-sorted slice of transactions based on the loosely // sorted internal representation. The result of the sorting is cached in case // it's requested again before any modifications are made to the contents. func (l *txList) Flatten() types.Transactions { - // If the sorting was not cached yet, create and cache it - if l.cache == nil { - l.cache = make(types.Transactions, 0, len(l.items)) - for _, tx := range l.items { - l.cache = append(l.cache, tx) - } - sort.Sort(types.TxByNonce(l.cache)) - } - // Copy the cache to prevent accidental modifications - txs := make(types.Transactions, len(l.cache)) - copy(txs, l.cache) - return txs + return l.txs.Flatten() } diff --git a/core/tx_list_test.go b/core/tx_list_test.go index ea83ca479f..92b2119371 100644 --- a/core/tx_list_test.go +++ b/core/tx_list_test.go @@ -41,18 +41,12 @@ func TestStrictTxListAdd(t *testing.T) { list.Add(txs[v]) } // Verify internal state - if list.first != 0 { - t.Errorf("lowest nonce mismatch: have %d, want %d", list.first, 0) - } - if int(list.last) != len(txs)-1 { - t.Errorf("highest nonce mismatch: have %d, want %d", list.last, len(txs)-1) - } - if len(list.items) != len(txs) { - t.Errorf("transaction count mismatch: have %d, want %d", len(list.items), len(txs)) + if len(list.txs.items) != len(txs) { + t.Errorf("transaction count mismatch: have %d, want %d", len(list.txs.items), len(txs)) } for i, tx := range txs { - if list.items[tx.Nonce()] != tx { - t.Errorf("item %d: transaction mismatch: have %v, want %v", i, list.items[tx.Nonce()], tx) + if list.txs.items[tx.Nonce()] != tx { + t.Errorf("item %d: transaction mismatch: have %v, want %v", i, list.txs.items[tx.Nonce()], tx) } } } diff --git a/core/tx_pool.go b/core/tx_pool.go index 58d304f00b..f8b11a7ce7 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -154,7 +154,8 @@ func (pool *TxPool) resetState() { // Update all accounts to the latest known pending nonce for addr, list := range pool.pending { - pool.pendingState.SetNonce(addr, list.last+1) + txs := list.Flatten() // Heavy but will be cached and is needed by the miner anyway + pool.pendingState.SetNonce(addr, txs[len(txs)-1].Nonce()+1) } // Check the queue and move transactions over to the pending if possible // or remove those that have become invalid @@ -366,7 +367,7 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T // Set the potentially new pending nonce and notify any subsystems of the new tx pool.beats[addr] = time.Now() - pool.pendingState.SetNonce(addr, list.last+1) + pool.pendingState.SetNonce(addr, tx.Nonce()+1) go pool.eventMux.Post(TxPreEvent{tx}) } @@ -439,19 +440,20 @@ func (pool *TxPool) removeTx(hash common.Hash) { // Remove the transaction from the pending lists and reset the account nonce if pending := pool.pending[addr]; pending != nil { if removed, invalids := pending.Remove(tx); removed { - // If no more transactions are left, remove the list and reset the nonce + // If no more transactions are left, remove the list if pending.Empty() { delete(pool.pending, addr) delete(pool.beats, addr) - - pool.pendingState.SetNonce(addr, tx.Nonce()) } else { - // Otherwise update the nonce and postpone any invalidated transactions - pool.pendingState.SetNonce(addr, pending.last) + // Otherwise postpone any invalidated transactions for _, tx := range invalids { pool.enqueueTx(tx.Hash(), tx) } } + // Update the account nonce if needed + if nonce := tx.Nonce(); pool.pendingState.GetNonce(addr) > nonce { + pool.pendingState.SetNonce(addr, tx.Nonce()) + } } } // Transaction is in the future queue diff --git a/core/tx_pool_test.go b/core/tx_pool_test.go index f08334fa19..4bc5aed38d 100644 --- a/core/tx_pool_test.go +++ b/core/tx_pool_test.go @@ -105,7 +105,7 @@ func TestTransactionQueue(t *testing.T) { currentState.SetNonce(from, 2) pool.enqueueTx(tx.Hash(), tx) pool.promoteExecutables() - if _, ok := pool.pending[from].items[tx.Nonce()]; ok { + if _, ok := pool.pending[from].txs.items[tx.Nonce()]; ok { t.Error("expected transaction to be in tx pool") } @@ -224,7 +224,7 @@ func TestTransactionDoubleNonce(t *testing.T) { if pool.pending[addr].Len() != 1 { t.Error("expected 1 pending transactions, got", pool.pending[addr].Len()) } - if tx := pool.pending[addr].items[0]; tx.Hash() != tx2.Hash() { + if tx := pool.pending[addr].txs.items[0]; tx.Hash() != tx2.Hash() { t.Errorf("transaction mismatch: have %x, want %x", tx.Hash(), tx2.Hash()) } // Add the thid transaction and ensure it's not saved (smaller price) @@ -235,7 +235,7 @@ func TestTransactionDoubleNonce(t *testing.T) { if pool.pending[addr].Len() != 1 { t.Error("expected 1 pending transactions, got", pool.pending[addr].Len()) } - if tx := pool.pending[addr].items[0]; tx.Hash() != tx2.Hash() { + if tx := pool.pending[addr].txs.items[0]; tx.Hash() != tx2.Hash() { t.Errorf("transaction mismatch: have %x, want %x", tx.Hash(), tx2.Hash()) } // Ensure the total transaction count is correct @@ -346,16 +346,16 @@ func TestTransactionDropping(t *testing.T) { state.AddBalance(account, big.NewInt(-750)) pool.resetState() - if _, ok := pool.pending[account].items[tx0.Nonce()]; !ok { + if _, ok := pool.pending[account].txs.items[tx0.Nonce()]; !ok { t.Errorf("funded pending transaction missing: %v", tx0) } - if _, ok := pool.pending[account].items[tx1.Nonce()]; ok { + if _, ok := pool.pending[account].txs.items[tx1.Nonce()]; ok { t.Errorf("out-of-fund pending transaction present: %v", tx1) } - if _, ok := pool.queue[account].items[tx10.Nonce()]; !ok { + if _, ok := pool.queue[account].txs.items[tx10.Nonce()]; !ok { t.Errorf("funded queued transaction missing: %v", tx10) } - if _, ok := pool.queue[account].items[tx11.Nonce()]; ok { + if _, ok := pool.queue[account].txs.items[tx11.Nonce()]; ok { t.Errorf("out-of-fund queued transaction present: %v", tx11) } if len(pool.all) != 2 { @@ -410,25 +410,25 @@ func TestTransactionPostponing(t *testing.T) { state.AddBalance(account, big.NewInt(-750)) pool.resetState() - if _, ok := pool.pending[account].items[txns[0].Nonce()]; !ok { + if _, ok := pool.pending[account].txs.items[txns[0].Nonce()]; !ok { t.Errorf("tx %d: valid and funded transaction missing from pending pool: %v", 0, txns[0]) } - if _, ok := pool.queue[account].items[txns[0].Nonce()]; ok { + if _, ok := pool.queue[account].txs.items[txns[0].Nonce()]; ok { t.Errorf("tx %d: valid and funded transaction present in future queue: %v", 0, txns[0]) } for i, tx := range txns[1:] { if i%2 == 1 { - if _, ok := pool.pending[account].items[tx.Nonce()]; ok { + if _, ok := pool.pending[account].txs.items[tx.Nonce()]; ok { t.Errorf("tx %d: valid but future transaction present in pending pool: %v", i+1, tx) } - if _, ok := pool.queue[account].items[tx.Nonce()]; !ok { + if _, ok := pool.queue[account].txs.items[tx.Nonce()]; !ok { t.Errorf("tx %d: valid but future transaction missing from future queue: %v", i+1, tx) } } else { - if _, ok := pool.pending[account].items[tx.Nonce()]; ok { + if _, ok := pool.pending[account].txs.items[tx.Nonce()]; ok { t.Errorf("tx %d: out-of-fund transaction present in pending pool: %v", i+1, tx) } - if _, ok := pool.queue[account].items[tx.Nonce()]; ok { + if _, ok := pool.queue[account].txs.items[tx.Nonce()]; ok { t.Errorf("tx %d: out-of-fund transaction present in future queue: %v", i+1, tx) } }