From ca63de4eb966ae3dc646608c7482eab83993beb4 Mon Sep 17 00:00:00 2001 From: Martin Holst Swende Date: Tue, 8 Oct 2024 16:24:22 +0200 Subject: [PATCH] core/txpool: remove locals-tracking from subpools --- core/txpool/blobpool/blobpool.go | 2 +- core/txpool/legacypool/legacypool.go | 320 +++++---------------------- core/txpool/legacypool/list.go | 15 +- core/txpool/subpool.go | 5 +- core/txpool/txpool.go | 19 +- eth/backend.go | 3 + 6 files changed, 68 insertions(+), 296 deletions(-) diff --git a/core/txpool/blobpool/blobpool.go b/core/txpool/blobpool/blobpool.go index 02d339f99c..985e4d5567 100644 --- a/core/txpool/blobpool/blobpool.go +++ b/core/txpool/blobpool/blobpool.go @@ -1268,7 +1268,7 @@ func (p *BlobPool) GetBlobs(vhashes []common.Hash) ([]*kzg4844.Blob, []*kzg4844. // Add inserts a set of blob transactions into the pool if they pass validation (both // consensus validity and pool restrictions). -func (p *BlobPool) Add(txs []*types.Transaction, local bool, sync bool) []error { +func (p *BlobPool) Add(txs []*types.Transaction, sync bool) []error { var ( adds = make([]*types.Transaction, 0, len(txs)) errs = make([]error, len(txs)) diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index 0d5b242500..2e07e45521 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -99,7 +99,6 @@ var ( pendingGauge = metrics.NewRegisteredGauge("txpool/pending", nil) queuedGauge = metrics.NewRegisteredGauge("txpool/queued", nil) - localGauge = metrics.NewRegisteredGauge("txpool/local", nil) slotsGauge = metrics.NewRegisteredGauge("txpool/slots", nil) reheapTimer = metrics.NewRegisteredTimer("txpool/reheap", nil) @@ -159,10 +158,6 @@ var DefaultConfig = Config{ // unreasonable or unworkable. func (config *Config) sanitize() Config { conf := *config - if conf.Rejournal < time.Second { - log.Warn("Sanitizing invalid txpool journal time", "provided", conf.Rejournal, "updated", time.Second) - conf.Rejournal = time.Second - } if conf.PriceLimit < 1 { log.Warn("Sanitizing invalid txpool price limit", "provided", conf.PriceLimit, "updated", DefaultConfig.PriceLimit) conf.PriceLimit = DefaultConfig.PriceLimit @@ -214,9 +209,6 @@ type LegacyPool struct { currentState *state.StateDB // Current state in the blockchain head pendingNonces *noncer // Pending state tracking virtual nonces - locals *accountSet // Set of local transaction to exempt from eviction rules - journal *journal // Journal of local transaction to back up to disk - reserve txpool.AddressReserver // Address reserver to ensure exclusivity across subpools pending map[common.Address]*list // All currently processable transactions queue map[common.Address]*list // Queued but non-processable transactions @@ -245,11 +237,6 @@ func New(config Config, chain BlockChain) *LegacyPool { // Sanitize the input to ensure no vulnerable gas prices are set config = (&config).sanitize() - // Disable locals-handling in this pool. This is a precursor to fully - // deleting locals-related code - config.NoLocals = true - config.Locals = nil - // Create the transaction pool with its initial settings pool := &LegacyPool{ config: config, @@ -267,16 +254,8 @@ func New(config Config, chain BlockChain) *LegacyPool { reorgShutdownCh: make(chan struct{}), initDoneCh: make(chan struct{}), } - pool.locals = newAccountSet(pool.signer) - for _, addr := range config.Locals { - log.Info("Setting new local account", "address", addr) - pool.locals.add(addr) - } pool.priced = newPricedList(pool.all) - if !config.NoLocals && config.Journal != "" { - pool.journal = newTxJournal(config.Journal) - } return pool } @@ -292,8 +271,7 @@ func (pool *LegacyPool) Filter(tx *types.Transaction) bool { } // Init sets the gas price needed to keep a transaction in the pool and the chain -// head to allow balance / nonce checks. The transaction journal will be loaded -// from disk and filtered based on the provided starting settings. The internal +// head to allow balance / nonce checks. The internal // goroutines will be spun up and the pool deemed operational afterwards. func (pool *LegacyPool) Init(gasTip uint64, head *types.Header, reserve txpool.AddressReserver) error { // Set the address reserver to request exclusive access to pooled accounts @@ -316,20 +294,9 @@ func (pool *LegacyPool) Init(gasTip uint64, head *types.Header, reserve txpool.A pool.currentState = statedb pool.pendingNonces = newNoncer(statedb) - // Start the reorg loop early, so it can handle requests generated during - // journal loading. pool.wg.Add(1) go pool.scheduleReorgLoop() - // If local transactions and journaling is enabled, load from disk - if pool.journal != nil { - if err := pool.journal.load(pool.addLocals); err != nil { - log.Warn("Failed to load transaction journal", "err", err) - } - if err := pool.journal.rotate(pool.local()); err != nil { - log.Warn("Failed to rotate transaction journal", "err", err) - } - } pool.wg.Add(1) go pool.loop() return nil @@ -345,13 +312,11 @@ func (pool *LegacyPool) loop() { prevPending, prevQueued, prevStales int // Start the stats reporting and transaction eviction tickers - report = time.NewTicker(statsReportInterval) - evict = time.NewTicker(evictionInterval) - journal = time.NewTicker(pool.config.Rejournal) + report = time.NewTicker(statsReportInterval) + evict = time.NewTicker(evictionInterval) ) defer report.Stop() defer evict.Stop() - defer journal.Stop() // Notify tests that the init phase is done close(pool.initDoneCh) @@ -377,11 +342,7 @@ func (pool *LegacyPool) loop() { case <-evict.C: pool.mu.Lock() for addr := range pool.queue { - // Skip local transactions from the eviction mechanism - if pool.locals.contains(addr) { - continue - } - // Any non-locals old enough should be removed + // Any old enough should be removed if time.Since(pool.beats[addr]) > pool.config.Lifetime { list := pool.queue[addr].Flatten() for _, tx := range list { @@ -391,16 +352,6 @@ func (pool *LegacyPool) loop() { } } pool.mu.Unlock() - - // Handle local transaction journal rotation - case <-journal.C: - if pool.journal != nil { - pool.mu.Lock() - if err := pool.journal.rotate(pool.local()); err != nil { - log.Warn("Failed to rotate local tx journal", "err", err) - } - pool.mu.Unlock() - } } } } @@ -411,9 +362,6 @@ func (pool *LegacyPool) Close() error { close(pool.reorgShutdownCh) pool.wg.Wait() - if pool.journal != nil { - pool.journal.close() - } log.Info("Transaction pool stopped") return nil } @@ -554,7 +502,7 @@ func (pool *LegacyPool) Pending(filter txpool.PendingFilter) map[common.Address] txs := list.Flatten() // If the miner requests tip enforcement, cap the lists now - if minTipBig != nil && !pool.locals.contains(addr) { + if minTipBig != nil { for i, tx := range txs { if tx.EffectiveGasTipIntCmp(minTipBig, baseFeeBig) < 0 { txs = txs[:i] @@ -582,35 +530,11 @@ func (pool *LegacyPool) Pending(filter txpool.PendingFilter) map[common.Address] return pending } -// Locals retrieves the accounts currently considered local by the pool. -func (pool *LegacyPool) Locals() []common.Address { - pool.mu.Lock() - defer pool.mu.Unlock() - - return pool.locals.flatten() -} - -// local retrieves all currently known local transactions, grouped by origin -// account and sorted by nonce. The returned transaction set is a copy and can be -// freely modified by calling code. -func (pool *LegacyPool) local() map[common.Address]types.Transactions { - txs := make(map[common.Address]types.Transactions) - for addr := range pool.locals.accounts { - if pending := pool.pending[addr]; pending != nil { - txs[addr] = append(txs[addr], pending.Flatten()...) - } - if queued := pool.queue[addr]; queued != nil { - txs[addr] = append(txs[addr], queued.Flatten()...) - } - } - return txs -} - // validateTxBasics checks whether a transaction is valid according to the consensus // rules, but does not check state-dependent validation such as sufficient balance. // This check is meant as an early check which only needs to be performed once, // and does not require the pool mutex to be held. -func (pool *LegacyPool) validateTxBasics(tx *types.Transaction, local bool) error { +func (pool *LegacyPool) validateTxBasics(tx *types.Transaction) error { opts := &txpool.ValidationOptions{ Config: pool.chainconfig, Accept: 0 | @@ -620,9 +544,6 @@ func (pool *LegacyPool) validateTxBasics(tx *types.Transaction, local bool) erro MaxSize: txMaxSize, MinTip: pool.gasTip.Load().ToBig(), } - if local { - opts.MinTip = new(big.Int) - } if err := txpool.ValidateTransaction(tx, pool.currentHead.Load(), pool.signer, opts); err != nil { return err } @@ -670,11 +591,7 @@ func (pool *LegacyPool) validateTx(tx *types.Transaction) error { // add validates a transaction and inserts it into the non-executable queue for later // pending promotion and execution. If the transaction is a replacement for an already // pending or queued one, it overwrites the previous transaction if its price is higher. -// -// If a newly added transaction is marked as local, its sending account will be -// added to the allowlist, preventing any associated transaction from being dropped -// out of the pool due to pricing constraints. -func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, err error) { +func (pool *LegacyPool) add(tx *types.Transaction) (replaced bool, err error) { // If the transaction is already known, discard it hash := tx.Hash() if pool.all.Get(hash) != nil { @@ -682,9 +599,6 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e knownTxMeter.Mark(1) return false, txpool.ErrAlreadyKnown } - // Make the local flag. If it's from local source or it's from the network but - // the sender is marked as local previously, treat it as the local transaction. - isLocal := local || pool.locals.containsTx(tx) // If the transaction fails basic validation, discard it if err := pool.validateTx(tx); err != nil { @@ -720,7 +634,7 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e // If the transaction pool is full, discard underpriced transactions if uint64(pool.all.Slots()+numSlots(tx)) > pool.config.GlobalSlots+pool.config.GlobalQueue { // If the new transaction is underpriced, don't accept it - if !isLocal && pool.priced.Underpriced(tx) { + if pool.priced.Underpriced(tx) { log.Trace("Discarding underpriced transaction", "hash", hash, "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap()) underpricedTxMeter.Mark(1) return false, txpool.ErrUnderpriced @@ -736,19 +650,18 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e } // New transaction is better than our worse ones, make room for it. - // If it's a local transaction, forcibly discard all available transactions. - // Otherwise if we can't make enough room for new one, abort the operation. - drop, success := pool.priced.Discard(pool.all.Slots()-int(pool.config.GlobalSlots+pool.config.GlobalQueue)+numSlots(tx), isLocal) + // If we can't make enough room for new one, abort the operation. + drop, success := pool.priced.Discard(pool.all.Slots() - int(pool.config.GlobalSlots+pool.config.GlobalQueue) + numSlots(tx)) // Special case, we still can't make the room for the new remote one. - if !isLocal && !success { + if !success { log.Trace("Discarding overflown transaction", "hash", hash) overflowedTxMeter.Mark(1) return false, ErrTxPoolOverflow } // If the new transaction is a future transaction it should never churn pending transactions - if !isLocal && pool.isGapped(from, tx) { + if pool.isGapped(from, tx) { var replacesPending bool for _, dropTx := range drop { dropSender, _ := types.Sender(pool.signer, dropTx) @@ -760,7 +673,7 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e // Add all transactions back to the priced queue if replacesPending { for _, dropTx := range drop { - pool.priced.Put(dropTx, false) + pool.priced.Put(dropTx) } log.Trace("Discarding future transaction replacing pending tx", "hash", hash) return false, txpool.ErrFutureReplacePending @@ -793,9 +706,8 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e pool.priced.Removed(1) pendingReplaceMeter.Mark(1) } - pool.all.Add(tx, isLocal) - pool.priced.Put(tx, isLocal) - pool.journalTx(from, tx) + pool.all.Add(tx) + pool.priced.Put(tx) pool.queueTxEvent(tx) log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To()) @@ -804,20 +716,10 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e return old != nil, nil } // New transaction isn't replacing a pending one, push into queue - replaced, err = pool.enqueueTx(hash, tx, isLocal, true) + replaced, err = pool.enqueueTx(hash, tx, true) if err != nil { return false, err } - // Mark local addresses and journal local transactions - if local && !pool.locals.contains(from) { - log.Info("Setting new local account", "address", from) - pool.locals.add(from) - pool.priced.Removed(pool.all.RemoteToLocals(pool.locals)) // Migrate the remotes if it's marked as local first time. - } - if isLocal { - localGauge.Inc(1) - } - pool.journalTx(from, tx) log.Trace("Pooled new future transaction", "hash", hash, "from", from, "to", tx.To()) return replaced, nil @@ -850,7 +752,7 @@ func (pool *LegacyPool) isGapped(from common.Address, tx *types.Transaction) boo // enqueueTx inserts a new transaction into the non-executable transaction queue. // // Note, this method assumes the pool lock is held! -func (pool *LegacyPool) enqueueTx(hash common.Hash, tx *types.Transaction, local bool, addAll bool) (bool, error) { +func (pool *LegacyPool) enqueueTx(hash common.Hash, tx *types.Transaction, addAll bool) (bool, error) { // Try to insert the transaction into the future queue from, _ := types.Sender(pool.signer, tx) // already validated if pool.queue[from] == nil { @@ -877,8 +779,8 @@ func (pool *LegacyPool) enqueueTx(hash common.Hash, tx *types.Transaction, local log.Error("Missing transaction in lookup set, please report the issue", "hash", hash) } if addAll { - pool.all.Add(tx, local) - pool.priced.Put(tx, local) + pool.all.Add(tx) + pool.priced.Put(tx) } // If we never record the heartbeat, do it right now. if _, exist := pool.beats[from]; !exist { @@ -887,18 +789,6 @@ func (pool *LegacyPool) enqueueTx(hash common.Hash, tx *types.Transaction, local return old != nil, nil } -// journalTx adds the specified transaction to the local disk journal if it is -// deemed to have been sent from a local account. -func (pool *LegacyPool) journalTx(from common.Address, tx *types.Transaction) { - // Only journal if it's enabled and the transaction is local - if pool.journal == nil || !pool.locals.contains(from) { - return - } - if err := pool.journal.insert(tx); err != nil { - log.Warn("Failed to journal local transaction", "err", err) - } -} - // promoteTx adds a transaction to the pending (processable) list of transactions // and returns whether it was inserted or an older was better. // @@ -935,28 +825,13 @@ func (pool *LegacyPool) promoteTx(addr common.Address, hash common.Hash, tx *typ return true } -// addLocals enqueues a batch of transactions into the pool if they are valid, marking the -// senders as local ones, ensuring they go around the local pricing constraints. -// -// This method is used to add transactions from the RPC API and performs synchronous pool -// reorganization and event propagation. -func (pool *LegacyPool) addLocals(txs []*types.Transaction) []error { - return pool.Add(txs, !pool.config.NoLocals, true) -} - -// addLocal enqueues a single local transaction into the pool if it is valid. This is -// a convenience wrapper around addLocals. -func (pool *LegacyPool) addLocal(tx *types.Transaction) error { - return pool.addLocals([]*types.Transaction{tx})[0] -} - -// addRemotes enqueues a batch of transactions into the pool if they are valid. If the -// senders are not among the locally tracked ones, full pricing constraints will apply. +// addRemotes enqueues a batch of transactions into the pool if they are valid. +// Full pricing constraints will apply. // // This method is used to add transactions from the p2p network and does not wait for pool // reorganization and internal event propagation. func (pool *LegacyPool) addRemotes(txs []*types.Transaction) []error { - return pool.Add(txs, false, false) + return pool.Add(txs, false) } // addRemote enqueues a single transaction into the pool if it is valid. This is a convenience @@ -967,23 +842,19 @@ func (pool *LegacyPool) addRemote(tx *types.Transaction) error { // addRemotesSync is like addRemotes, but waits for pool reorganization. Tests use this method. func (pool *LegacyPool) addRemotesSync(txs []*types.Transaction) []error { - return pool.Add(txs, false, true) + return pool.Add(txs, true) } // This is like addRemotes with a single transaction, but waits for pool reorganization. Tests use this method. func (pool *LegacyPool) addRemoteSync(tx *types.Transaction) error { - return pool.Add([]*types.Transaction{tx}, false, true)[0] + return pool.Add([]*types.Transaction{tx}, true)[0] } -// Add enqueues a batch of transactions into the pool if they are valid. Depending -// on the local flag, full pricing constraints will or will not be applied. +// Add enqueues a batch of transactions into the pool if they are valid. // // If sync is set, the method will block until all internal maintenance related // to the add is finished. Only use this during tests for determinism! -func (pool *LegacyPool) Add(txs []*types.Transaction, local, sync bool) []error { - // Do not treat as local if local transactions have been disabled - local = local && !pool.config.NoLocals - +func (pool *LegacyPool) Add(txs []*types.Transaction, sync bool) []error { // Filter out known ones without obtaining the pool lock or recovering signatures var ( errs = make([]error, len(txs)) @@ -999,7 +870,7 @@ func (pool *LegacyPool) Add(txs []*types.Transaction, local, sync bool) []error // Exclude transactions with basic errors, e.g invalid signatures and // insufficient intrinsic gas as soon as possible and cache senders // in transactions before obtaining lock - if err := pool.validateTxBasics(tx, local); err != nil { + if err := pool.validateTxBasics(tx); err != nil { errs[i] = err log.Trace("Discarding invalid transaction", "hash", tx.Hash(), "err", err) invalidTxMeter.Mark(1) @@ -1014,7 +885,7 @@ func (pool *LegacyPool) Add(txs []*types.Transaction, local, sync bool) []error // Process all the new transaction and merge any errors into the original slice pool.mu.Lock() - newErrs, dirtyAddrs := pool.addTxsLocked(news, local) + newErrs, dirtyAddrs := pool.addTxsLocked(news) pool.mu.Unlock() var nilSlot = 0 @@ -1035,11 +906,11 @@ func (pool *LegacyPool) Add(txs []*types.Transaction, local, sync bool) []error // addTxsLocked attempts to queue a batch of transactions if they are valid. // The transaction pool lock must be held. -func (pool *LegacyPool) addTxsLocked(txs []*types.Transaction, local bool) ([]error, *accountSet) { +func (pool *LegacyPool) addTxsLocked(txs []*types.Transaction) ([]error, *accountSet) { dirty := newAccountSet(pool.signer) errs := make([]error, len(txs)) for i, tx := range txs { - replaced, err := pool.add(tx, local) + replaced, err := pool.add(tx) errs[i] = err if err == nil && !replaced { dirty.addTx(tx) @@ -1131,9 +1002,6 @@ func (pool *LegacyPool) removeTx(hash common.Hash, outofbound bool, unreserve bo if outofbound { pool.priced.Removed(1) } - if pool.locals.contains(addr) { - localGauge.Dec(1) - } // Remove the transaction from the pending lists and reset the account nonce if pending := pool.pending[addr]; pending != nil { if removed, invalids := pending.Remove(tx); removed { @@ -1144,7 +1012,7 @@ func (pool *LegacyPool) removeTx(hash common.Hash, outofbound bool, unreserve bo // Postpone any invalidated transactions for _, tx := range invalids { // Internal shuffle shouldn't touch the lookup set. - pool.enqueueTx(tx.Hash(), tx, false, false) + pool.enqueueTx(tx.Hash(), tx, false) } // Update the account nonce if needed pool.pendingNonces.setIfLower(addr, tx.Nonce()) @@ -1446,7 +1314,7 @@ func (pool *LegacyPool) reset(oldHead, newHead *types.Header) { // Inject any transactions discarded due to reorgs log.Debug("Reinjecting stale transactions", "count", len(reinject)) core.SenderCacher.Recover(pool.signer, reinject) - pool.addTxsLocked(reinject, false) + pool.addTxsLocked(reinject) } // promoteExecutables moves transactions that have become processable from the @@ -1491,22 +1359,17 @@ func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.T queuedGauge.Dec(int64(len(readies))) // Drop all transactions over the allowed limit - var caps types.Transactions - if !pool.locals.contains(addr) { - caps = list.Cap(int(pool.config.AccountQueue)) - for _, tx := range caps { - hash := tx.Hash() - pool.all.Remove(hash) - log.Trace("Removed cap-exceeding queued transaction", "hash", hash) - } - queuedRateLimitMeter.Mark(int64(len(caps))) + var caps = list.Cap(int(pool.config.AccountQueue)) + for _, tx := range caps { + hash := tx.Hash() + pool.all.Remove(hash) + log.Trace("Removed cap-exceeding queued transaction", "hash", hash) } + queuedRateLimitMeter.Mark(int64(len(caps))) // Mark all the items dropped as removed pool.priced.Removed(len(forwards) + len(drops) + len(caps)) queuedGauge.Dec(int64(len(forwards) + len(drops) + len(caps))) - if pool.locals.contains(addr) { - localGauge.Dec(int64(len(forwards) + len(drops) + len(caps))) - } + // Delete the entire queue entry if it became empty. if list.Empty() { delete(pool.queue, addr) @@ -1536,14 +1399,14 @@ func (pool *LegacyPool) truncatePending() { spammers := prque.New[int64, common.Address](nil) for addr, list := range pool.pending { // Only evict transactions from high rollers - if !pool.locals.contains(addr) && uint64(list.Len()) > pool.config.AccountSlots { + if uint64(list.Len()) > pool.config.AccountSlots { spammers.Push(addr, int64(list.Len())) } } // Gradually drop transactions from offenders offenders := []common.Address{} for pending > pool.config.GlobalSlots && !spammers.Empty() { - // Retrieve the next offender if not local address + // Retrieve the next offender offender, _ := spammers.Pop() offenders = append(offenders, offender) @@ -1569,9 +1432,7 @@ func (pool *LegacyPool) truncatePending() { } pool.priced.Removed(len(caps)) pendingGauge.Dec(int64(len(caps))) - if pool.locals.contains(offenders[i]) { - localGauge.Dec(int64(len(caps))) - } + pending-- } } @@ -1596,9 +1457,6 @@ func (pool *LegacyPool) truncatePending() { } pool.priced.Removed(len(caps)) pendingGauge.Dec(int64(len(caps))) - if pool.locals.contains(addr) { - localGauge.Dec(int64(len(caps))) - } pending-- } } @@ -1619,13 +1477,11 @@ func (pool *LegacyPool) truncateQueue() { // Sort all accounts with queued transactions by heartbeat addresses := make(addressesByHeartbeat, 0, len(pool.queue)) for addr := range pool.queue { - if !pool.locals.contains(addr) { // don't drop locals - addresses = append(addresses, addressByHeartbeat{addr, pool.beats[addr]}) - } + addresses = append(addresses, addressByHeartbeat{addr, pool.beats[addr]}) } sort.Sort(sort.Reverse(addresses)) - // Drop transactions until the total is below the limit or only locals remain + // Drop transactions until the total is below the limit for drop := queued - pool.config.GlobalQueue; drop > 0 && len(addresses) > 0; { addr := addresses[len(addresses)-1] list := pool.queue[addr.address] @@ -1685,12 +1541,10 @@ func (pool *LegacyPool) demoteUnexecutables() { log.Trace("Demoting pending transaction", "hash", hash) // Internal shuffle shouldn't touch the lookup set. - pool.enqueueTx(hash, tx, false, false) + pool.enqueueTx(hash, tx, false) } pendingGauge.Dec(int64(len(olds) + len(drops) + len(invalids))) - if pool.locals.contains(addr) { - localGauge.Dec(int64(len(olds) + len(drops) + len(invalids))) - } + // If there's a gap in front, alert (should never happen) and postpone all transactions if list.Len() > 0 && list.txs.Get(nonce) == nil { gapped := list.Cap(0) @@ -1699,7 +1553,7 @@ func (pool *LegacyPool) demoteUnexecutables() { log.Error("Demoting invalidated transaction", "hash", hash) // Internal shuffle shouldn't touch the lookup set. - pool.enqueueTx(hash, tx, false, false) + pool.enqueueTx(hash, tx, false) } pendingGauge.Dec(int64(len(gapped))) } @@ -1798,20 +1652,15 @@ func (as *accountSet) merge(other *accountSet) { // internal mechanisms. The sole purpose of the type is to permit out-of-bound // peeking into the pool in LegacyPool.Get without having to acquire the widely scoped // LegacyPool.mu mutex. -// -// This lookup set combines the notion of "local transactions", which is useful -// to build upper-level structure. type lookup struct { slots int lock sync.RWMutex - locals map[common.Hash]*types.Transaction remotes map[common.Hash]*types.Transaction } // newLookup returns a new lookup structure. func newLookup() *lookup { return &lookup{ - locals: make(map[common.Hash]*types.Transaction), remotes: make(map[common.Hash]*types.Transaction), } } @@ -1819,22 +1668,12 @@ func newLookup() *lookup { // Range calls f on each key and value present in the map. The callback passed // should return the indicator whether the iteration needs to be continued. // Callers need to specify which set (or both) to be iterated. -func (t *lookup) Range(f func(hash common.Hash, tx *types.Transaction, local bool) bool, local bool, remote bool) { +func (t *lookup) Range(f func(hash common.Hash, tx *types.Transaction) bool) { t.lock.RLock() defer t.lock.RUnlock() - - if local { - for key, value := range t.locals { - if !f(key, value, true) { - return - } - } - } - if remote { - for key, value := range t.remotes { - if !f(key, value, false) { - return - } + for key, value := range t.remotes { + if !f(key, value) { + return } } } @@ -1843,21 +1682,9 @@ func (t *lookup) Range(f func(hash common.Hash, tx *types.Transaction, local boo func (t *lookup) Get(hash common.Hash) *types.Transaction { t.lock.RLock() defer t.lock.RUnlock() - - if tx := t.locals[hash]; tx != nil { - return tx - } return t.remotes[hash] } -// GetLocal returns a transaction if it exists in the lookup, or nil if not found. -func (t *lookup) GetLocal(hash common.Hash) *types.Transaction { - t.lock.RLock() - defer t.lock.RUnlock() - - return t.locals[hash] -} - // GetRemote returns a transaction if it exists in the lookup, or nil if not found. func (t *lookup) GetRemote(hash common.Hash) *types.Transaction { t.lock.RLock() @@ -1871,15 +1698,7 @@ func (t *lookup) Count() int { t.lock.RLock() defer t.lock.RUnlock() - return len(t.locals) + len(t.remotes) -} - -// LocalCount returns the current number of local transactions in the lookup. -func (t *lookup) LocalCount() int { - t.lock.RLock() - defer t.lock.RUnlock() - - return len(t.locals) + return len(t.remotes) } // RemoteCount returns the current number of remote transactions in the lookup. @@ -1899,18 +1718,14 @@ func (t *lookup) Slots() int { } // Add adds a transaction to the lookup. -func (t *lookup) Add(tx *types.Transaction, local bool) { +func (t *lookup) Add(tx *types.Transaction) { t.lock.Lock() defer t.lock.Unlock() t.slots += numSlots(tx) slotsGauge.Update(int64(t.slots)) - if local { - t.locals[tx.Hash()] = tx - } else { - t.remotes[tx.Hash()] = tx - } + t.remotes[tx.Hash()] = tx } // Remove removes a transaction from the lookup. @@ -1918,10 +1733,7 @@ func (t *lookup) Remove(hash common.Hash) { t.lock.Lock() defer t.lock.Unlock() - tx, ok := t.locals[hash] - if !ok { - tx, ok = t.remotes[hash] - } + tx, ok := t.remotes[hash] if !ok { log.Error("No transaction found to be deleted", "hash", hash) return @@ -1929,36 +1741,18 @@ func (t *lookup) Remove(hash common.Hash) { t.slots -= numSlots(tx) slotsGauge.Update(int64(t.slots)) - delete(t.locals, hash) delete(t.remotes, hash) } -// RemoteToLocals migrates the transactions belongs to the given locals to locals -// set. The assumption is held the locals set is thread-safe to be used. -func (t *lookup) RemoteToLocals(locals *accountSet) int { - t.lock.Lock() - defer t.lock.Unlock() - - var migrated int - for hash, tx := range t.remotes { - if locals.containsTx(tx) { - t.locals[hash] = tx - delete(t.remotes, hash) - migrated += 1 - } - } - return migrated -} - // RemotesBelowTip finds all remote transactions below the given tip threshold. func (t *lookup) RemotesBelowTip(threshold *big.Int) types.Transactions { found := make(types.Transactions, 0, 128) - t.Range(func(hash common.Hash, tx *types.Transaction, local bool) bool { + t.Range(func(hash common.Hash, tx *types.Transaction) bool { if tx.GasTipCapIntCmp(threshold) < 0 { found = append(found, tx) } return true - }, false, true) // Only iterate remotes + }) return found } diff --git a/core/txpool/legacypool/list.go b/core/txpool/legacypool/list.go index b749db44d4..ddaad0dd9a 100644 --- a/core/txpool/legacypool/list.go +++ b/core/txpool/legacypool/list.go @@ -556,10 +556,7 @@ func newPricedList(all *lookup) *pricedList { } // Put inserts a new transaction into the heap. -func (l *pricedList) Put(tx *types.Transaction, local bool) { - if local { - return - } +func (l *pricedList) Put(tx *types.Transaction) { // Insert every new transaction to the urgent heap first; Discard will balance the heaps heap.Push(&l.urgent, tx) } @@ -612,9 +609,7 @@ func (l *pricedList) underpricedFor(h *priceHeap, tx *types.Transaction) bool { // Discard finds a number of most underpriced transactions, removes them from the // priced list and returns them for further removal from the entire pool. // If noPending is set to true, we will only consider the floating list -// -// Note local transaction won't be considered for eviction. -func (l *pricedList) Discard(slots int, force bool) (types.Transactions, bool) { +func (l *pricedList) Discard(slots int) (types.Transactions, bool) { drop := make(types.Transactions, 0, slots) // Remote underpriced transactions to drop for slots > 0 { if len(l.urgent.list)*floatingRatio > len(l.floating.list)*urgentRatio { @@ -643,7 +638,7 @@ func (l *pricedList) Discard(slots int, force bool) (types.Transactions, bool) { } } // If we still can't make enough room for the new transaction - if slots > 0 && !force { + if slots > 0 { for _, tx := range drop { heap.Push(&l.urgent, tx) } @@ -659,10 +654,10 @@ func (l *pricedList) Reheap() { start := time.Now() l.stales.Store(0) l.urgent.list = make([]*types.Transaction, 0, l.all.RemoteCount()) - l.all.Range(func(hash common.Hash, tx *types.Transaction, local bool) bool { + l.all.Range(func(hash common.Hash, tx *types.Transaction) bool { l.urgent.list = append(l.urgent.list, tx) return true - }, false, true) // Only iterate remotes + }) heap.Init(&l.urgent) // balance out the two heaps by moving the worse half of transactions into the diff --git a/core/txpool/subpool.go b/core/txpool/subpool.go index 9ee0a69c0b..5ad0f5b0e0 100644 --- a/core/txpool/subpool.go +++ b/core/txpool/subpool.go @@ -132,7 +132,7 @@ type SubPool interface { // Add enqueues a batch of transactions into the pool if they are valid. Due // to the large transaction churn, add may postpone fully integrating the tx // to a later point to batch multiple ones together. - Add(txs []*types.Transaction, local bool, sync bool) []error + Add(txs []*types.Transaction, sync bool) []error // Pending retrieves all currently processable transactions, grouped by origin // account and sorted by nonce. @@ -162,9 +162,6 @@ type SubPool interface { // pending as well as queued transactions of this address, grouped by nonce. ContentFrom(addr common.Address) ([]*types.Transaction, []*types.Transaction) - // Locals retrieves the accounts currently considered local by the pool. - Locals() []common.Address - // Status returns the known status (unknown/pending/queued) of a transaction // identified by their hashes. Status(hash common.Hash) TxStatus diff --git a/core/txpool/txpool.go b/core/txpool/txpool.go index ac2c2ba412..93b2ac82bf 100644 --- a/core/txpool/txpool.go +++ b/core/txpool/txpool.go @@ -353,7 +353,7 @@ func (p *TxPool) Add(txs []*types.Transaction, local bool, sync bool) []error { errsets := make([][]error, len(p.subpools)) for i := 0; i < len(p.subpools); i++ { // Note: local is explicitly set to false here. - errsets[i] = p.subpools[i].Add(txsets[i], false, sync) + errsets[i] = p.subpools[i].Add(txsets[i], sync) } errs := make([]error, len(txs)) for i, split := range splits { @@ -454,23 +454,6 @@ func (p *TxPool) ContentFrom(addr common.Address) ([]*types.Transaction, []*type return []*types.Transaction{}, []*types.Transaction{} } -// Locals retrieves the accounts currently considered local by the pool. -func (p *TxPool) Locals() []common.Address { - // Retrieve the locals from each subpool and deduplicate them - locals := make(map[common.Address]struct{}) - for _, subpool := range p.subpools { - for _, local := range subpool.Locals() { - locals[local] = struct{}{} - } - } - // Flatten and return the deduplicated local set - flat := make([]common.Address, 0, len(locals)) - for local := range locals { - flat = append(flat, local) - } - return flat -} - // Status returns the known status (unknown/pending/queued) of a transaction // identified by its hash. func (p *TxPool) Status(hash common.Hash) TxStatus { diff --git a/eth/backend.go b/eth/backend.go index a6bbb9539f..1cbd062d57 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -237,6 +237,9 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { eth.txPool, err = txpool.New(config.TxPool.PriceLimit, eth.blockchain, []txpool.SubPool{legacyPool, blobPool}) if !config.TxPool.NoLocals { + // TODO! + // We also need to handle config.Locals, the accounts that are + // to be treated as locals, regardless of how they arrive to geth. eth.localTxTracker = legacypool.NewTxTracker(config.TxPool.Journal, config.TxPool.Rejournal, eth.blockchain.Config(), eth.txPool)