core/txpool: split out tracker from the legacy pool

This commit is contained in:
Martin Holst Swende 2024-10-08 16:40:21 +02:00
parent ca63de4eb9
commit c2efd6fc19
No known key found for this signature in database
GPG Key ID: 683B438C05A5DDF0
5 changed files with 35 additions and 33 deletions

View File

@ -1077,7 +1077,7 @@ func (pool *LegacyPool) scheduleReorgLoop() {
launchNextRun bool launchNextRun bool
reset *txpoolResetRequest reset *txpoolResetRequest
dirtyAccounts *accountSet dirtyAccounts *accountSet
queuedEvents = make(map[common.Address]*sortedMap) queuedEvents = make(map[common.Address]*SortedMap)
) )
for { for {
// Launch next background reorg if needed // Launch next background reorg if needed
@ -1090,7 +1090,7 @@ func (pool *LegacyPool) scheduleReorgLoop() {
launchNextRun = false launchNextRun = false
reset, dirtyAccounts = nil, nil reset, dirtyAccounts = nil, nil
queuedEvents = make(map[common.Address]*sortedMap) queuedEvents = make(map[common.Address]*SortedMap)
} }
select { select {
@ -1119,7 +1119,7 @@ func (pool *LegacyPool) scheduleReorgLoop() {
// request one later if they want the events sent. // request one later if they want the events sent.
addr, _ := types.Sender(pool.signer, tx) addr, _ := types.Sender(pool.signer, tx)
if _, ok := queuedEvents[addr]; !ok { if _, ok := queuedEvents[addr]; !ok {
queuedEvents[addr] = newSortedMap() queuedEvents[addr] = NewSortedMap()
} }
queuedEvents[addr].Put(tx) queuedEvents[addr].Put(tx)
@ -1138,7 +1138,7 @@ func (pool *LegacyPool) scheduleReorgLoop() {
} }
// runReorg runs reset and promoteExecutables on behalf of scheduleReorgLoop. // runReorg runs reset and promoteExecutables on behalf of scheduleReorgLoop.
func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest, dirtyAccounts *accountSet, events map[common.Address]*sortedMap) { func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest, dirtyAccounts *accountSet, events map[common.Address]*SortedMap) {
defer func(t0 time.Time) { defer func(t0 time.Time) {
reorgDurationTimer.Update(time.Since(t0)) reorgDurationTimer.Update(time.Since(t0))
}(time.Now()) }(time.Now())
@ -1205,7 +1205,7 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest,
for _, tx := range promoted { for _, tx := range promoted {
addr, _ := types.Sender(pool.signer, tx) addr, _ := types.Sender(pool.signer, tx)
if _, ok := events[addr]; !ok { if _, ok := events[addr]; !ok {
events[addr] = newSortedMap() events[addr] = NewSortedMap()
} }
events[addr].Put(tx) events[addr].Put(tx)
} }

View File

@ -52,31 +52,31 @@ func (h *nonceHeap) Pop() interface{} {
return x return x
} }
// sortedMap is a nonce->transaction hash map with a heap based index to allow // SortedMap is a nonce->transaction hash map with a heap based index to allow
// iterating over the contents in a nonce-incrementing way. // iterating over the contents in a nonce-incrementing way.
type sortedMap struct { type SortedMap struct {
items map[uint64]*types.Transaction // Hash map storing the transaction data items map[uint64]*types.Transaction // Hash map storing the transaction data
index *nonceHeap // Heap of nonces of all the stored transactions (non-strict mode) index *nonceHeap // Heap of nonces of all the stored transactions (non-strict mode)
cache types.Transactions // Cache of the transactions already sorted cache types.Transactions // Cache of the transactions already sorted
cacheMu sync.Mutex // Mutex covering the cache cacheMu sync.Mutex // Mutex covering the cache
} }
// newSortedMap creates a new nonce-sorted transaction map. // NewSortedMap creates a new nonce-sorted transaction map.
func newSortedMap() *sortedMap { func NewSortedMap() *SortedMap {
return &sortedMap{ return &SortedMap{
items: make(map[uint64]*types.Transaction), items: make(map[uint64]*types.Transaction),
index: new(nonceHeap), index: new(nonceHeap),
} }
} }
// Get retrieves the current transactions associated with the given nonce. // Get retrieves the current transactions associated with the given nonce.
func (m *sortedMap) Get(nonce uint64) *types.Transaction { func (m *SortedMap) Get(nonce uint64) *types.Transaction {
return m.items[nonce] return m.items[nonce]
} }
// Put inserts a new transaction into the map, also updating the map's nonce // Put inserts a new transaction into the map, also updating the map's nonce
// index. If a transaction already exists with the same nonce, it's overwritten. // index. If a transaction already exists with the same nonce, it's overwritten.
func (m *sortedMap) Put(tx *types.Transaction) { func (m *SortedMap) Put(tx *types.Transaction) {
nonce := tx.Nonce() nonce := tx.Nonce()
if m.items[nonce] == nil { if m.items[nonce] == nil {
heap.Push(m.index, nonce) heap.Push(m.index, nonce)
@ -89,7 +89,7 @@ func (m *sortedMap) Put(tx *types.Transaction) {
// Forward removes all transactions from the map with a nonce lower than the // Forward removes all transactions from the map with a nonce lower than the
// provided threshold. Every removed transaction is returned for any post-removal // provided threshold. Every removed transaction is returned for any post-removal
// maintenance. // maintenance.
func (m *sortedMap) Forward(threshold uint64) types.Transactions { func (m *SortedMap) Forward(threshold uint64) types.Transactions {
var removed types.Transactions var removed types.Transactions
// Pop off heap items until the threshold is reached // Pop off heap items until the threshold is reached
@ -112,7 +112,7 @@ func (m *sortedMap) Forward(threshold uint64) types.Transactions {
// Filter, as opposed to 'filter', re-initialises the heap after the operation is done. // Filter, as opposed to 'filter', re-initialises the heap after the operation is done.
// If you want to do several consecutive filterings, it's therefore better to first // If you want to do several consecutive filterings, it's therefore better to first
// do a .filter(func1) followed by .Filter(func2) or reheap() // do a .filter(func1) followed by .Filter(func2) or reheap()
func (m *sortedMap) Filter(filter func(*types.Transaction) bool) types.Transactions { func (m *SortedMap) Filter(filter func(*types.Transaction) bool) types.Transactions {
removed := m.filter(filter) removed := m.filter(filter)
// If transactions were removed, the heap and cache are ruined // If transactions were removed, the heap and cache are ruined
if len(removed) > 0 { if len(removed) > 0 {
@ -121,7 +121,7 @@ func (m *sortedMap) Filter(filter func(*types.Transaction) bool) types.Transacti
return removed return removed
} }
func (m *sortedMap) reheap() { func (m *SortedMap) reheap() {
*m.index = make([]uint64, 0, len(m.items)) *m.index = make([]uint64, 0, len(m.items))
for nonce := range m.items { for nonce := range m.items {
*m.index = append(*m.index, nonce) *m.index = append(*m.index, nonce)
@ -134,7 +134,7 @@ func (m *sortedMap) reheap() {
// filter is identical to Filter, but **does not** regenerate the heap. This method // filter is identical to Filter, but **does not** regenerate the heap. This method
// should only be used if followed immediately by a call to Filter or reheap() // should only be used if followed immediately by a call to Filter or reheap()
func (m *sortedMap) filter(filter func(*types.Transaction) bool) types.Transactions { func (m *SortedMap) filter(filter func(*types.Transaction) bool) types.Transactions {
var removed types.Transactions var removed types.Transactions
// Collect all the transactions to filter out // Collect all the transactions to filter out
@ -154,7 +154,7 @@ func (m *sortedMap) filter(filter func(*types.Transaction) bool) types.Transacti
// Cap places a hard limit on the number of items, returning all transactions // Cap places a hard limit on the number of items, returning all transactions
// exceeding that limit. // exceeding that limit.
func (m *sortedMap) Cap(threshold int) types.Transactions { func (m *SortedMap) Cap(threshold int) types.Transactions {
// Short circuit if the number of items is under the limit // Short circuit if the number of items is under the limit
if len(m.items) <= threshold { if len(m.items) <= threshold {
return nil return nil
@ -181,7 +181,7 @@ func (m *sortedMap) Cap(threshold int) types.Transactions {
// Remove deletes a transaction from the maintained map, returning whether the // Remove deletes a transaction from the maintained map, returning whether the
// transaction was found. // transaction was found.
func (m *sortedMap) Remove(nonce uint64) bool { func (m *SortedMap) Remove(nonce uint64) bool {
// Short circuit if no transaction is present // Short circuit if no transaction is present
_, ok := m.items[nonce] _, ok := m.items[nonce]
if !ok { if !ok {
@ -209,7 +209,7 @@ func (m *sortedMap) Remove(nonce uint64) bool {
// Note, all transactions with nonces lower than start will also be returned to // Note, all transactions with nonces lower than start will also be returned to
// prevent getting into an invalid state. This is not something that should ever // prevent getting into an invalid state. This is not something that should ever
// happen but better to be self correcting than failing! // happen but better to be self correcting than failing!
func (m *sortedMap) Ready(start uint64) types.Transactions { func (m *SortedMap) Ready(start uint64) types.Transactions {
// Short circuit if no transactions are available // Short circuit if no transactions are available
if m.index.Len() == 0 || (*m.index)[0] > start { if m.index.Len() == 0 || (*m.index)[0] > start {
return nil return nil
@ -229,11 +229,11 @@ func (m *sortedMap) Ready(start uint64) types.Transactions {
} }
// Len returns the length of the transaction map. // Len returns the length of the transaction map.
func (m *sortedMap) Len() int { func (m *SortedMap) Len() int {
return len(m.items) return len(m.items)
} }
func (m *sortedMap) flatten() types.Transactions { func (m *SortedMap) flatten() types.Transactions {
m.cacheMu.Lock() m.cacheMu.Lock()
defer m.cacheMu.Unlock() defer m.cacheMu.Unlock()
// If the sorting was not cached yet, create and cache it // If the sorting was not cached yet, create and cache it
@ -250,7 +250,7 @@ func (m *sortedMap) flatten() types.Transactions {
// Flatten creates a nonce-sorted slice of transactions based on the loosely // Flatten creates a nonce-sorted slice of transactions based on the loosely
// sorted internal representation. The result of the sorting is cached in case // sorted internal representation. The result of the sorting is cached in case
// it's requested again before any modifications are made to the contents. // it's requested again before any modifications are made to the contents.
func (m *sortedMap) Flatten() types.Transactions { func (m *SortedMap) Flatten() types.Transactions {
cache := m.flatten() cache := m.flatten()
// Copy the cache to prevent accidental modification // Copy the cache to prevent accidental modification
txs := make(types.Transactions, len(cache)) txs := make(types.Transactions, len(cache))
@ -260,7 +260,7 @@ func (m *sortedMap) Flatten() types.Transactions {
// LastElement returns the last element of a flattened list, thus, the // LastElement returns the last element of a flattened list, thus, the
// transaction with the highest nonce // transaction with the highest nonce
func (m *sortedMap) LastElement() *types.Transaction { func (m *SortedMap) LastElement() *types.Transaction {
cache := m.flatten() cache := m.flatten()
return cache[len(cache)-1] return cache[len(cache)-1]
} }
@ -271,7 +271,7 @@ func (m *sortedMap) LastElement() *types.Transaction {
// executable/future queue, with minor behavioral changes. // executable/future queue, with minor behavioral changes.
type list struct { type list struct {
strict bool // Whether nonces are strictly continuous or not strict bool // Whether nonces are strictly continuous or not
txs *sortedMap // Heap indexed sorted hash map of the transactions txs *SortedMap // Heap indexed sorted hash map of the transactions
costcap *uint256.Int // Price of the highest costing transaction (reset only if exceeds balance) costcap *uint256.Int // Price of the highest costing transaction (reset only if exceeds balance)
gascap uint64 // Gas limit of the highest spending transaction (reset only if exceeds block limit) gascap uint64 // Gas limit of the highest spending transaction (reset only if exceeds block limit)
@ -283,7 +283,7 @@ type list struct {
func newList(strict bool) *list { func newList(strict bool) *list {
return &list{ return &list{
strict: strict, strict: strict,
txs: newSortedMap(), txs: NewSortedMap(),
costcap: new(uint256.Int), costcap: new(uint256.Int),
totalcost: new(uint256.Int), totalcost: new(uint256.Int),
} }

View File

@ -14,7 +14,7 @@
// You should have received a copy of the GNU Lesser General Public License // You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package legacypool package tracking
import ( import (
"errors" "errors"

View File

@ -15,7 +15,7 @@
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
// Package legacypool implements the normal EVM execution transaction pool. // Package legacypool implements the normal EVM execution transaction pool.
package legacypool package tracking
import ( import (
"sync" "sync"
@ -23,6 +23,7 @@ import (
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/txpool" "github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/txpool/legacypool"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/params"
@ -38,8 +39,8 @@ var recheckInterval = time.Minute
// This struct does not care about transaction validity, price-bumps or account limits, // This struct does not care about transaction validity, price-bumps or account limits,
// but optimistically accepts transactions. // but optimistically accepts transactions.
type TxTracker struct { type TxTracker struct {
all map[common.Hash]*types.Transaction // All tracked transactions all map[common.Hash]*types.Transaction // All tracked transactions
byAddr map[common.Address]*sortedMap // Transactions by address byAddr map[common.Address]*legacypool.SortedMap // Transactions by address
journal *journal // Journal of local transaction to back up to disk journal *journal // Journal of local transaction to back up to disk
rejournal time.Duration // How often to rotate journal rejournal time.Duration // How often to rotate journal
@ -55,7 +56,7 @@ func NewTxTracker(journalPath string, journalTime time.Duration, chainConfig *pa
signer := types.LatestSigner(chainConfig) signer := types.LatestSigner(chainConfig)
pool := &TxTracker{ pool := &TxTracker{
all: make(map[common.Hash]*types.Transaction), all: make(map[common.Hash]*types.Transaction),
byAddr: make(map[common.Address]*sortedMap), byAddr: make(map[common.Address]*legacypool.SortedMap),
signer: signer, signer: signer,
shutdownCh: make(chan struct{}), shutdownCh: make(chan struct{}),
pool: next, pool: next,
@ -84,7 +85,7 @@ func (tracker *TxTracker) TrackAll(txs []*types.Transaction) {
tracker.all[tx.Hash()] = tx tracker.all[tx.Hash()] = tx
addr, _ := types.Sender(tracker.signer, tx) addr, _ := types.Sender(tracker.signer, tx)
if tracker.byAddr[addr] == nil { if tracker.byAddr[addr] == nil {
tracker.byAddr[addr] = newSortedMap() tracker.byAddr[addr] = legacypool.NewSortedMap()
} }
tracker.byAddr[addr].Put(tx) tracker.byAddr[addr].Put(tx)
_ = tracker.journal.insert(tx) _ = tracker.journal.insert(tx)

View File

@ -35,6 +35,7 @@ import (
"github.com/ethereum/go-ethereum/core/txpool" "github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/txpool/blobpool" "github.com/ethereum/go-ethereum/core/txpool/blobpool"
"github.com/ethereum/go-ethereum/core/txpool/legacypool" "github.com/ethereum/go-ethereum/core/txpool/legacypool"
"github.com/ethereum/go-ethereum/core/txpool/tracking"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/eth/downloader"
@ -69,7 +70,7 @@ type Ethereum struct {
// core protocol objects // core protocol objects
config *ethconfig.Config config *ethconfig.Config
txPool *txpool.TxPool txPool *txpool.TxPool
localTxTracker *legacypool.TxTracker localTxTracker *tracking.TxTracker
blockchain *core.BlockChain blockchain *core.BlockChain
handler *handler handler *handler
@ -240,7 +241,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
// TODO! // TODO!
// We also need to handle config.Locals, the accounts that are // We also need to handle config.Locals, the accounts that are
// to be treated as locals, regardless of how they arrive to geth. // to be treated as locals, regardless of how they arrive to geth.
eth.localTxTracker = legacypool.NewTxTracker(config.TxPool.Journal, eth.localTxTracker = tracking.NewTxTracker(config.TxPool.Journal,
config.TxPool.Rejournal, config.TxPool.Rejournal,
eth.blockchain.Config(), eth.txPool) eth.blockchain.Config(), eth.txPool)
stack.RegisterLifecycle(eth.localTxTracker) stack.RegisterLifecycle(eth.localTxTracker)