core/txpool: remove locals-tracking from subpools

This commit is contained in:
Martin Holst Swende 2024-10-08 16:24:22 +02:00
parent 02bfc7b065
commit ca63de4eb9
No known key found for this signature in database
GPG Key ID: 683B438C05A5DDF0
6 changed files with 68 additions and 296 deletions

View File

@ -1268,7 +1268,7 @@ func (p *BlobPool) GetBlobs(vhashes []common.Hash) ([]*kzg4844.Blob, []*kzg4844.
// Add inserts a set of blob transactions into the pool if they pass validation (both // Add inserts a set of blob transactions into the pool if they pass validation (both
// consensus validity and pool restrictions). // consensus validity and pool restrictions).
func (p *BlobPool) Add(txs []*types.Transaction, local bool, sync bool) []error { func (p *BlobPool) Add(txs []*types.Transaction, sync bool) []error {
var ( var (
adds = make([]*types.Transaction, 0, len(txs)) adds = make([]*types.Transaction, 0, len(txs))
errs = make([]error, len(txs)) errs = make([]error, len(txs))

View File

@ -99,7 +99,6 @@ var (
pendingGauge = metrics.NewRegisteredGauge("txpool/pending", nil) pendingGauge = metrics.NewRegisteredGauge("txpool/pending", nil)
queuedGauge = metrics.NewRegisteredGauge("txpool/queued", nil) queuedGauge = metrics.NewRegisteredGauge("txpool/queued", nil)
localGauge = metrics.NewRegisteredGauge("txpool/local", nil)
slotsGauge = metrics.NewRegisteredGauge("txpool/slots", nil) slotsGauge = metrics.NewRegisteredGauge("txpool/slots", nil)
reheapTimer = metrics.NewRegisteredTimer("txpool/reheap", nil) reheapTimer = metrics.NewRegisteredTimer("txpool/reheap", nil)
@ -159,10 +158,6 @@ var DefaultConfig = Config{
// unreasonable or unworkable. // unreasonable or unworkable.
func (config *Config) sanitize() Config { func (config *Config) sanitize() Config {
conf := *config conf := *config
if conf.Rejournal < time.Second {
log.Warn("Sanitizing invalid txpool journal time", "provided", conf.Rejournal, "updated", time.Second)
conf.Rejournal = time.Second
}
if conf.PriceLimit < 1 { if conf.PriceLimit < 1 {
log.Warn("Sanitizing invalid txpool price limit", "provided", conf.PriceLimit, "updated", DefaultConfig.PriceLimit) log.Warn("Sanitizing invalid txpool price limit", "provided", conf.PriceLimit, "updated", DefaultConfig.PriceLimit)
conf.PriceLimit = DefaultConfig.PriceLimit conf.PriceLimit = DefaultConfig.PriceLimit
@ -214,9 +209,6 @@ type LegacyPool struct {
currentState *state.StateDB // Current state in the blockchain head currentState *state.StateDB // Current state in the blockchain head
pendingNonces *noncer // Pending state tracking virtual nonces pendingNonces *noncer // Pending state tracking virtual nonces
locals *accountSet // Set of local transaction to exempt from eviction rules
journal *journal // Journal of local transaction to back up to disk
reserve txpool.AddressReserver // Address reserver to ensure exclusivity across subpools reserve txpool.AddressReserver // Address reserver to ensure exclusivity across subpools
pending map[common.Address]*list // All currently processable transactions pending map[common.Address]*list // All currently processable transactions
queue map[common.Address]*list // Queued but non-processable transactions queue map[common.Address]*list // Queued but non-processable transactions
@ -245,11 +237,6 @@ func New(config Config, chain BlockChain) *LegacyPool {
// Sanitize the input to ensure no vulnerable gas prices are set // Sanitize the input to ensure no vulnerable gas prices are set
config = (&config).sanitize() config = (&config).sanitize()
// Disable locals-handling in this pool. This is a precursor to fully
// deleting locals-related code
config.NoLocals = true
config.Locals = nil
// Create the transaction pool with its initial settings // Create the transaction pool with its initial settings
pool := &LegacyPool{ pool := &LegacyPool{
config: config, config: config,
@ -267,16 +254,8 @@ func New(config Config, chain BlockChain) *LegacyPool {
reorgShutdownCh: make(chan struct{}), reorgShutdownCh: make(chan struct{}),
initDoneCh: make(chan struct{}), initDoneCh: make(chan struct{}),
} }
pool.locals = newAccountSet(pool.signer)
for _, addr := range config.Locals {
log.Info("Setting new local account", "address", addr)
pool.locals.add(addr)
}
pool.priced = newPricedList(pool.all) pool.priced = newPricedList(pool.all)
if !config.NoLocals && config.Journal != "" {
pool.journal = newTxJournal(config.Journal)
}
return pool return pool
} }
@ -292,8 +271,7 @@ func (pool *LegacyPool) Filter(tx *types.Transaction) bool {
} }
// Init sets the gas price needed to keep a transaction in the pool and the chain // Init sets the gas price needed to keep a transaction in the pool and the chain
// head to allow balance / nonce checks. The transaction journal will be loaded // head to allow balance / nonce checks. The internal
// from disk and filtered based on the provided starting settings. The internal
// goroutines will be spun up and the pool deemed operational afterwards. // goroutines will be spun up and the pool deemed operational afterwards.
func (pool *LegacyPool) Init(gasTip uint64, head *types.Header, reserve txpool.AddressReserver) error { func (pool *LegacyPool) Init(gasTip uint64, head *types.Header, reserve txpool.AddressReserver) error {
// Set the address reserver to request exclusive access to pooled accounts // Set the address reserver to request exclusive access to pooled accounts
@ -316,20 +294,9 @@ func (pool *LegacyPool) Init(gasTip uint64, head *types.Header, reserve txpool.A
pool.currentState = statedb pool.currentState = statedb
pool.pendingNonces = newNoncer(statedb) pool.pendingNonces = newNoncer(statedb)
// Start the reorg loop early, so it can handle requests generated during
// journal loading.
pool.wg.Add(1) pool.wg.Add(1)
go pool.scheduleReorgLoop() go pool.scheduleReorgLoop()
// If local transactions and journaling is enabled, load from disk
if pool.journal != nil {
if err := pool.journal.load(pool.addLocals); err != nil {
log.Warn("Failed to load transaction journal", "err", err)
}
if err := pool.journal.rotate(pool.local()); err != nil {
log.Warn("Failed to rotate transaction journal", "err", err)
}
}
pool.wg.Add(1) pool.wg.Add(1)
go pool.loop() go pool.loop()
return nil return nil
@ -345,13 +312,11 @@ func (pool *LegacyPool) loop() {
prevPending, prevQueued, prevStales int prevPending, prevQueued, prevStales int
// Start the stats reporting and transaction eviction tickers // Start the stats reporting and transaction eviction tickers
report = time.NewTicker(statsReportInterval) report = time.NewTicker(statsReportInterval)
evict = time.NewTicker(evictionInterval) evict = time.NewTicker(evictionInterval)
journal = time.NewTicker(pool.config.Rejournal)
) )
defer report.Stop() defer report.Stop()
defer evict.Stop() defer evict.Stop()
defer journal.Stop()
// Notify tests that the init phase is done // Notify tests that the init phase is done
close(pool.initDoneCh) close(pool.initDoneCh)
@ -377,11 +342,7 @@ func (pool *LegacyPool) loop() {
case <-evict.C: case <-evict.C:
pool.mu.Lock() pool.mu.Lock()
for addr := range pool.queue { for addr := range pool.queue {
// Skip local transactions from the eviction mechanism // Any old enough should be removed
if pool.locals.contains(addr) {
continue
}
// Any non-locals old enough should be removed
if time.Since(pool.beats[addr]) > pool.config.Lifetime { if time.Since(pool.beats[addr]) > pool.config.Lifetime {
list := pool.queue[addr].Flatten() list := pool.queue[addr].Flatten()
for _, tx := range list { for _, tx := range list {
@ -391,16 +352,6 @@ func (pool *LegacyPool) loop() {
} }
} }
pool.mu.Unlock() pool.mu.Unlock()
// Handle local transaction journal rotation
case <-journal.C:
if pool.journal != nil {
pool.mu.Lock()
if err := pool.journal.rotate(pool.local()); err != nil {
log.Warn("Failed to rotate local tx journal", "err", err)
}
pool.mu.Unlock()
}
} }
} }
} }
@ -411,9 +362,6 @@ func (pool *LegacyPool) Close() error {
close(pool.reorgShutdownCh) close(pool.reorgShutdownCh)
pool.wg.Wait() pool.wg.Wait()
if pool.journal != nil {
pool.journal.close()
}
log.Info("Transaction pool stopped") log.Info("Transaction pool stopped")
return nil return nil
} }
@ -554,7 +502,7 @@ func (pool *LegacyPool) Pending(filter txpool.PendingFilter) map[common.Address]
txs := list.Flatten() txs := list.Flatten()
// If the miner requests tip enforcement, cap the lists now // If the miner requests tip enforcement, cap the lists now
if minTipBig != nil && !pool.locals.contains(addr) { if minTipBig != nil {
for i, tx := range txs { for i, tx := range txs {
if tx.EffectiveGasTipIntCmp(minTipBig, baseFeeBig) < 0 { if tx.EffectiveGasTipIntCmp(minTipBig, baseFeeBig) < 0 {
txs = txs[:i] txs = txs[:i]
@ -582,35 +530,11 @@ func (pool *LegacyPool) Pending(filter txpool.PendingFilter) map[common.Address]
return pending return pending
} }
// Locals retrieves the accounts currently considered local by the pool.
func (pool *LegacyPool) Locals() []common.Address {
pool.mu.Lock()
defer pool.mu.Unlock()
return pool.locals.flatten()
}
// local retrieves all currently known local transactions, grouped by origin
// account and sorted by nonce. The returned transaction set is a copy and can be
// freely modified by calling code.
func (pool *LegacyPool) local() map[common.Address]types.Transactions {
txs := make(map[common.Address]types.Transactions)
for addr := range pool.locals.accounts {
if pending := pool.pending[addr]; pending != nil {
txs[addr] = append(txs[addr], pending.Flatten()...)
}
if queued := pool.queue[addr]; queued != nil {
txs[addr] = append(txs[addr], queued.Flatten()...)
}
}
return txs
}
// validateTxBasics checks whether a transaction is valid according to the consensus // validateTxBasics checks whether a transaction is valid according to the consensus
// rules, but does not check state-dependent validation such as sufficient balance. // rules, but does not check state-dependent validation such as sufficient balance.
// This check is meant as an early check which only needs to be performed once, // This check is meant as an early check which only needs to be performed once,
// and does not require the pool mutex to be held. // and does not require the pool mutex to be held.
func (pool *LegacyPool) validateTxBasics(tx *types.Transaction, local bool) error { func (pool *LegacyPool) validateTxBasics(tx *types.Transaction) error {
opts := &txpool.ValidationOptions{ opts := &txpool.ValidationOptions{
Config: pool.chainconfig, Config: pool.chainconfig,
Accept: 0 | Accept: 0 |
@ -620,9 +544,6 @@ func (pool *LegacyPool) validateTxBasics(tx *types.Transaction, local bool) erro
MaxSize: txMaxSize, MaxSize: txMaxSize,
MinTip: pool.gasTip.Load().ToBig(), MinTip: pool.gasTip.Load().ToBig(),
} }
if local {
opts.MinTip = new(big.Int)
}
if err := txpool.ValidateTransaction(tx, pool.currentHead.Load(), pool.signer, opts); err != nil { if err := txpool.ValidateTransaction(tx, pool.currentHead.Load(), pool.signer, opts); err != nil {
return err return err
} }
@ -670,11 +591,7 @@ func (pool *LegacyPool) validateTx(tx *types.Transaction) error {
// add validates a transaction and inserts it into the non-executable queue for later // add validates a transaction and inserts it into the non-executable queue for later
// pending promotion and execution. If the transaction is a replacement for an already // pending promotion and execution. If the transaction is a replacement for an already
// pending or queued one, it overwrites the previous transaction if its price is higher. // pending or queued one, it overwrites the previous transaction if its price is higher.
// func (pool *LegacyPool) add(tx *types.Transaction) (replaced bool, err error) {
// If a newly added transaction is marked as local, its sending account will be
// added to the allowlist, preventing any associated transaction from being dropped
// out of the pool due to pricing constraints.
func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, err error) {
// If the transaction is already known, discard it // If the transaction is already known, discard it
hash := tx.Hash() hash := tx.Hash()
if pool.all.Get(hash) != nil { if pool.all.Get(hash) != nil {
@ -682,9 +599,6 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
knownTxMeter.Mark(1) knownTxMeter.Mark(1)
return false, txpool.ErrAlreadyKnown return false, txpool.ErrAlreadyKnown
} }
// Make the local flag. If it's from local source or it's from the network but
// the sender is marked as local previously, treat it as the local transaction.
isLocal := local || pool.locals.containsTx(tx)
// If the transaction fails basic validation, discard it // If the transaction fails basic validation, discard it
if err := pool.validateTx(tx); err != nil { if err := pool.validateTx(tx); err != nil {
@ -720,7 +634,7 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
// If the transaction pool is full, discard underpriced transactions // If the transaction pool is full, discard underpriced transactions
if uint64(pool.all.Slots()+numSlots(tx)) > pool.config.GlobalSlots+pool.config.GlobalQueue { if uint64(pool.all.Slots()+numSlots(tx)) > pool.config.GlobalSlots+pool.config.GlobalQueue {
// If the new transaction is underpriced, don't accept it // If the new transaction is underpriced, don't accept it
if !isLocal && pool.priced.Underpriced(tx) { if pool.priced.Underpriced(tx) {
log.Trace("Discarding underpriced transaction", "hash", hash, "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap()) log.Trace("Discarding underpriced transaction", "hash", hash, "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap())
underpricedTxMeter.Mark(1) underpricedTxMeter.Mark(1)
return false, txpool.ErrUnderpriced return false, txpool.ErrUnderpriced
@ -736,19 +650,18 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
} }
// New transaction is better than our worse ones, make room for it. // New transaction is better than our worse ones, make room for it.
// If it's a local transaction, forcibly discard all available transactions. // If we can't make enough room for new one, abort the operation.
// Otherwise if we can't make enough room for new one, abort the operation. drop, success := pool.priced.Discard(pool.all.Slots() - int(pool.config.GlobalSlots+pool.config.GlobalQueue) + numSlots(tx))
drop, success := pool.priced.Discard(pool.all.Slots()-int(pool.config.GlobalSlots+pool.config.GlobalQueue)+numSlots(tx), isLocal)
// Special case, we still can't make the room for the new remote one. // Special case, we still can't make the room for the new remote one.
if !isLocal && !success { if !success {
log.Trace("Discarding overflown transaction", "hash", hash) log.Trace("Discarding overflown transaction", "hash", hash)
overflowedTxMeter.Mark(1) overflowedTxMeter.Mark(1)
return false, ErrTxPoolOverflow return false, ErrTxPoolOverflow
} }
// If the new transaction is a future transaction it should never churn pending transactions // If the new transaction is a future transaction it should never churn pending transactions
if !isLocal && pool.isGapped(from, tx) { if pool.isGapped(from, tx) {
var replacesPending bool var replacesPending bool
for _, dropTx := range drop { for _, dropTx := range drop {
dropSender, _ := types.Sender(pool.signer, dropTx) dropSender, _ := types.Sender(pool.signer, dropTx)
@ -760,7 +673,7 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
// Add all transactions back to the priced queue // Add all transactions back to the priced queue
if replacesPending { if replacesPending {
for _, dropTx := range drop { for _, dropTx := range drop {
pool.priced.Put(dropTx, false) pool.priced.Put(dropTx)
} }
log.Trace("Discarding future transaction replacing pending tx", "hash", hash) log.Trace("Discarding future transaction replacing pending tx", "hash", hash)
return false, txpool.ErrFutureReplacePending return false, txpool.ErrFutureReplacePending
@ -793,9 +706,8 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
pool.priced.Removed(1) pool.priced.Removed(1)
pendingReplaceMeter.Mark(1) pendingReplaceMeter.Mark(1)
} }
pool.all.Add(tx, isLocal) pool.all.Add(tx)
pool.priced.Put(tx, isLocal) pool.priced.Put(tx)
pool.journalTx(from, tx)
pool.queueTxEvent(tx) pool.queueTxEvent(tx)
log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To()) log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To())
@ -804,20 +716,10 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
return old != nil, nil return old != nil, nil
} }
// New transaction isn't replacing a pending one, push into queue // New transaction isn't replacing a pending one, push into queue
replaced, err = pool.enqueueTx(hash, tx, isLocal, true) replaced, err = pool.enqueueTx(hash, tx, true)
if err != nil { if err != nil {
return false, err return false, err
} }
// Mark local addresses and journal local transactions
if local && !pool.locals.contains(from) {
log.Info("Setting new local account", "address", from)
pool.locals.add(from)
pool.priced.Removed(pool.all.RemoteToLocals(pool.locals)) // Migrate the remotes if it's marked as local first time.
}
if isLocal {
localGauge.Inc(1)
}
pool.journalTx(from, tx)
log.Trace("Pooled new future transaction", "hash", hash, "from", from, "to", tx.To()) log.Trace("Pooled new future transaction", "hash", hash, "from", from, "to", tx.To())
return replaced, nil return replaced, nil
@ -850,7 +752,7 @@ func (pool *LegacyPool) isGapped(from common.Address, tx *types.Transaction) boo
// enqueueTx inserts a new transaction into the non-executable transaction queue. // enqueueTx inserts a new transaction into the non-executable transaction queue.
// //
// Note, this method assumes the pool lock is held! // Note, this method assumes the pool lock is held!
func (pool *LegacyPool) enqueueTx(hash common.Hash, tx *types.Transaction, local bool, addAll bool) (bool, error) { func (pool *LegacyPool) enqueueTx(hash common.Hash, tx *types.Transaction, addAll bool) (bool, error) {
// Try to insert the transaction into the future queue // Try to insert the transaction into the future queue
from, _ := types.Sender(pool.signer, tx) // already validated from, _ := types.Sender(pool.signer, tx) // already validated
if pool.queue[from] == nil { if pool.queue[from] == nil {
@ -877,8 +779,8 @@ func (pool *LegacyPool) enqueueTx(hash common.Hash, tx *types.Transaction, local
log.Error("Missing transaction in lookup set, please report the issue", "hash", hash) log.Error("Missing transaction in lookup set, please report the issue", "hash", hash)
} }
if addAll { if addAll {
pool.all.Add(tx, local) pool.all.Add(tx)
pool.priced.Put(tx, local) pool.priced.Put(tx)
} }
// If we never record the heartbeat, do it right now. // If we never record the heartbeat, do it right now.
if _, exist := pool.beats[from]; !exist { if _, exist := pool.beats[from]; !exist {
@ -887,18 +789,6 @@ func (pool *LegacyPool) enqueueTx(hash common.Hash, tx *types.Transaction, local
return old != nil, nil return old != nil, nil
} }
// journalTx adds the specified transaction to the local disk journal if it is
// deemed to have been sent from a local account.
func (pool *LegacyPool) journalTx(from common.Address, tx *types.Transaction) {
// Only journal if it's enabled and the transaction is local
if pool.journal == nil || !pool.locals.contains(from) {
return
}
if err := pool.journal.insert(tx); err != nil {
log.Warn("Failed to journal local transaction", "err", err)
}
}
// promoteTx adds a transaction to the pending (processable) list of transactions // promoteTx adds a transaction to the pending (processable) list of transactions
// and returns whether it was inserted or an older was better. // and returns whether it was inserted or an older was better.
// //
@ -935,28 +825,13 @@ func (pool *LegacyPool) promoteTx(addr common.Address, hash common.Hash, tx *typ
return true return true
} }
// addLocals enqueues a batch of transactions into the pool if they are valid, marking the // addRemotes enqueues a batch of transactions into the pool if they are valid.
// senders as local ones, ensuring they go around the local pricing constraints. // Full pricing constraints will apply.
//
// This method is used to add transactions from the RPC API and performs synchronous pool
// reorganization and event propagation.
func (pool *LegacyPool) addLocals(txs []*types.Transaction) []error {
return pool.Add(txs, !pool.config.NoLocals, true)
}
// addLocal enqueues a single local transaction into the pool if it is valid. This is
// a convenience wrapper around addLocals.
func (pool *LegacyPool) addLocal(tx *types.Transaction) error {
return pool.addLocals([]*types.Transaction{tx})[0]
}
// addRemotes enqueues a batch of transactions into the pool if they are valid. If the
// senders are not among the locally tracked ones, full pricing constraints will apply.
// //
// This method is used to add transactions from the p2p network and does not wait for pool // This method is used to add transactions from the p2p network and does not wait for pool
// reorganization and internal event propagation. // reorganization and internal event propagation.
func (pool *LegacyPool) addRemotes(txs []*types.Transaction) []error { func (pool *LegacyPool) addRemotes(txs []*types.Transaction) []error {
return pool.Add(txs, false, false) return pool.Add(txs, false)
} }
// addRemote enqueues a single transaction into the pool if it is valid. This is a convenience // addRemote enqueues a single transaction into the pool if it is valid. This is a convenience
@ -967,23 +842,19 @@ func (pool *LegacyPool) addRemote(tx *types.Transaction) error {
// addRemotesSync is like addRemotes, but waits for pool reorganization. Tests use this method. // addRemotesSync is like addRemotes, but waits for pool reorganization. Tests use this method.
func (pool *LegacyPool) addRemotesSync(txs []*types.Transaction) []error { func (pool *LegacyPool) addRemotesSync(txs []*types.Transaction) []error {
return pool.Add(txs, false, true) return pool.Add(txs, true)
} }
// This is like addRemotes with a single transaction, but waits for pool reorganization. Tests use this method. // This is like addRemotes with a single transaction, but waits for pool reorganization. Tests use this method.
func (pool *LegacyPool) addRemoteSync(tx *types.Transaction) error { func (pool *LegacyPool) addRemoteSync(tx *types.Transaction) error {
return pool.Add([]*types.Transaction{tx}, false, true)[0] return pool.Add([]*types.Transaction{tx}, true)[0]
} }
// Add enqueues a batch of transactions into the pool if they are valid. Depending // Add enqueues a batch of transactions into the pool if they are valid.
// on the local flag, full pricing constraints will or will not be applied.
// //
// If sync is set, the method will block until all internal maintenance related // If sync is set, the method will block until all internal maintenance related
// to the add is finished. Only use this during tests for determinism! // to the add is finished. Only use this during tests for determinism!
func (pool *LegacyPool) Add(txs []*types.Transaction, local, sync bool) []error { func (pool *LegacyPool) Add(txs []*types.Transaction, sync bool) []error {
// Do not treat as local if local transactions have been disabled
local = local && !pool.config.NoLocals
// Filter out known ones without obtaining the pool lock or recovering signatures // Filter out known ones without obtaining the pool lock or recovering signatures
var ( var (
errs = make([]error, len(txs)) errs = make([]error, len(txs))
@ -999,7 +870,7 @@ func (pool *LegacyPool) Add(txs []*types.Transaction, local, sync bool) []error
// Exclude transactions with basic errors, e.g invalid signatures and // Exclude transactions with basic errors, e.g invalid signatures and
// insufficient intrinsic gas as soon as possible and cache senders // insufficient intrinsic gas as soon as possible and cache senders
// in transactions before obtaining lock // in transactions before obtaining lock
if err := pool.validateTxBasics(tx, local); err != nil { if err := pool.validateTxBasics(tx); err != nil {
errs[i] = err errs[i] = err
log.Trace("Discarding invalid transaction", "hash", tx.Hash(), "err", err) log.Trace("Discarding invalid transaction", "hash", tx.Hash(), "err", err)
invalidTxMeter.Mark(1) invalidTxMeter.Mark(1)
@ -1014,7 +885,7 @@ func (pool *LegacyPool) Add(txs []*types.Transaction, local, sync bool) []error
// Process all the new transaction and merge any errors into the original slice // Process all the new transaction and merge any errors into the original slice
pool.mu.Lock() pool.mu.Lock()
newErrs, dirtyAddrs := pool.addTxsLocked(news, local) newErrs, dirtyAddrs := pool.addTxsLocked(news)
pool.mu.Unlock() pool.mu.Unlock()
var nilSlot = 0 var nilSlot = 0
@ -1035,11 +906,11 @@ func (pool *LegacyPool) Add(txs []*types.Transaction, local, sync bool) []error
// addTxsLocked attempts to queue a batch of transactions if they are valid. // addTxsLocked attempts to queue a batch of transactions if they are valid.
// The transaction pool lock must be held. // The transaction pool lock must be held.
func (pool *LegacyPool) addTxsLocked(txs []*types.Transaction, local bool) ([]error, *accountSet) { func (pool *LegacyPool) addTxsLocked(txs []*types.Transaction) ([]error, *accountSet) {
dirty := newAccountSet(pool.signer) dirty := newAccountSet(pool.signer)
errs := make([]error, len(txs)) errs := make([]error, len(txs))
for i, tx := range txs { for i, tx := range txs {
replaced, err := pool.add(tx, local) replaced, err := pool.add(tx)
errs[i] = err errs[i] = err
if err == nil && !replaced { if err == nil && !replaced {
dirty.addTx(tx) dirty.addTx(tx)
@ -1131,9 +1002,6 @@ func (pool *LegacyPool) removeTx(hash common.Hash, outofbound bool, unreserve bo
if outofbound { if outofbound {
pool.priced.Removed(1) pool.priced.Removed(1)
} }
if pool.locals.contains(addr) {
localGauge.Dec(1)
}
// Remove the transaction from the pending lists and reset the account nonce // Remove the transaction from the pending lists and reset the account nonce
if pending := pool.pending[addr]; pending != nil { if pending := pool.pending[addr]; pending != nil {
if removed, invalids := pending.Remove(tx); removed { if removed, invalids := pending.Remove(tx); removed {
@ -1144,7 +1012,7 @@ func (pool *LegacyPool) removeTx(hash common.Hash, outofbound bool, unreserve bo
// Postpone any invalidated transactions // Postpone any invalidated transactions
for _, tx := range invalids { for _, tx := range invalids {
// Internal shuffle shouldn't touch the lookup set. // Internal shuffle shouldn't touch the lookup set.
pool.enqueueTx(tx.Hash(), tx, false, false) pool.enqueueTx(tx.Hash(), tx, false)
} }
// Update the account nonce if needed // Update the account nonce if needed
pool.pendingNonces.setIfLower(addr, tx.Nonce()) pool.pendingNonces.setIfLower(addr, tx.Nonce())
@ -1446,7 +1314,7 @@ func (pool *LegacyPool) reset(oldHead, newHead *types.Header) {
// Inject any transactions discarded due to reorgs // Inject any transactions discarded due to reorgs
log.Debug("Reinjecting stale transactions", "count", len(reinject)) log.Debug("Reinjecting stale transactions", "count", len(reinject))
core.SenderCacher.Recover(pool.signer, reinject) core.SenderCacher.Recover(pool.signer, reinject)
pool.addTxsLocked(reinject, false) pool.addTxsLocked(reinject)
} }
// promoteExecutables moves transactions that have become processable from the // promoteExecutables moves transactions that have become processable from the
@ -1491,22 +1359,17 @@ func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.T
queuedGauge.Dec(int64(len(readies))) queuedGauge.Dec(int64(len(readies)))
// Drop all transactions over the allowed limit // Drop all transactions over the allowed limit
var caps types.Transactions var caps = list.Cap(int(pool.config.AccountQueue))
if !pool.locals.contains(addr) { for _, tx := range caps {
caps = list.Cap(int(pool.config.AccountQueue)) hash := tx.Hash()
for _, tx := range caps { pool.all.Remove(hash)
hash := tx.Hash() log.Trace("Removed cap-exceeding queued transaction", "hash", hash)
pool.all.Remove(hash)
log.Trace("Removed cap-exceeding queued transaction", "hash", hash)
}
queuedRateLimitMeter.Mark(int64(len(caps)))
} }
queuedRateLimitMeter.Mark(int64(len(caps)))
// Mark all the items dropped as removed // Mark all the items dropped as removed
pool.priced.Removed(len(forwards) + len(drops) + len(caps)) pool.priced.Removed(len(forwards) + len(drops) + len(caps))
queuedGauge.Dec(int64(len(forwards) + len(drops) + len(caps))) queuedGauge.Dec(int64(len(forwards) + len(drops) + len(caps)))
if pool.locals.contains(addr) {
localGauge.Dec(int64(len(forwards) + len(drops) + len(caps)))
}
// Delete the entire queue entry if it became empty. // Delete the entire queue entry if it became empty.
if list.Empty() { if list.Empty() {
delete(pool.queue, addr) delete(pool.queue, addr)
@ -1536,14 +1399,14 @@ func (pool *LegacyPool) truncatePending() {
spammers := prque.New[int64, common.Address](nil) spammers := prque.New[int64, common.Address](nil)
for addr, list := range pool.pending { for addr, list := range pool.pending {
// Only evict transactions from high rollers // Only evict transactions from high rollers
if !pool.locals.contains(addr) && uint64(list.Len()) > pool.config.AccountSlots { if uint64(list.Len()) > pool.config.AccountSlots {
spammers.Push(addr, int64(list.Len())) spammers.Push(addr, int64(list.Len()))
} }
} }
// Gradually drop transactions from offenders // Gradually drop transactions from offenders
offenders := []common.Address{} offenders := []common.Address{}
for pending > pool.config.GlobalSlots && !spammers.Empty() { for pending > pool.config.GlobalSlots && !spammers.Empty() {
// Retrieve the next offender if not local address // Retrieve the next offender
offender, _ := spammers.Pop() offender, _ := spammers.Pop()
offenders = append(offenders, offender) offenders = append(offenders, offender)
@ -1569,9 +1432,7 @@ func (pool *LegacyPool) truncatePending() {
} }
pool.priced.Removed(len(caps)) pool.priced.Removed(len(caps))
pendingGauge.Dec(int64(len(caps))) pendingGauge.Dec(int64(len(caps)))
if pool.locals.contains(offenders[i]) {
localGauge.Dec(int64(len(caps)))
}
pending-- pending--
} }
} }
@ -1596,9 +1457,6 @@ func (pool *LegacyPool) truncatePending() {
} }
pool.priced.Removed(len(caps)) pool.priced.Removed(len(caps))
pendingGauge.Dec(int64(len(caps))) pendingGauge.Dec(int64(len(caps)))
if pool.locals.contains(addr) {
localGauge.Dec(int64(len(caps)))
}
pending-- pending--
} }
} }
@ -1619,13 +1477,11 @@ func (pool *LegacyPool) truncateQueue() {
// Sort all accounts with queued transactions by heartbeat // Sort all accounts with queued transactions by heartbeat
addresses := make(addressesByHeartbeat, 0, len(pool.queue)) addresses := make(addressesByHeartbeat, 0, len(pool.queue))
for addr := range pool.queue { for addr := range pool.queue {
if !pool.locals.contains(addr) { // don't drop locals addresses = append(addresses, addressByHeartbeat{addr, pool.beats[addr]})
addresses = append(addresses, addressByHeartbeat{addr, pool.beats[addr]})
}
} }
sort.Sort(sort.Reverse(addresses)) sort.Sort(sort.Reverse(addresses))
// Drop transactions until the total is below the limit or only locals remain // Drop transactions until the total is below the limit
for drop := queued - pool.config.GlobalQueue; drop > 0 && len(addresses) > 0; { for drop := queued - pool.config.GlobalQueue; drop > 0 && len(addresses) > 0; {
addr := addresses[len(addresses)-1] addr := addresses[len(addresses)-1]
list := pool.queue[addr.address] list := pool.queue[addr.address]
@ -1685,12 +1541,10 @@ func (pool *LegacyPool) demoteUnexecutables() {
log.Trace("Demoting pending transaction", "hash", hash) log.Trace("Demoting pending transaction", "hash", hash)
// Internal shuffle shouldn't touch the lookup set. // Internal shuffle shouldn't touch the lookup set.
pool.enqueueTx(hash, tx, false, false) pool.enqueueTx(hash, tx, false)
} }
pendingGauge.Dec(int64(len(olds) + len(drops) + len(invalids))) pendingGauge.Dec(int64(len(olds) + len(drops) + len(invalids)))
if pool.locals.contains(addr) {
localGauge.Dec(int64(len(olds) + len(drops) + len(invalids)))
}
// If there's a gap in front, alert (should never happen) and postpone all transactions // If there's a gap in front, alert (should never happen) and postpone all transactions
if list.Len() > 0 && list.txs.Get(nonce) == nil { if list.Len() > 0 && list.txs.Get(nonce) == nil {
gapped := list.Cap(0) gapped := list.Cap(0)
@ -1699,7 +1553,7 @@ func (pool *LegacyPool) demoteUnexecutables() {
log.Error("Demoting invalidated transaction", "hash", hash) log.Error("Demoting invalidated transaction", "hash", hash)
// Internal shuffle shouldn't touch the lookup set. // Internal shuffle shouldn't touch the lookup set.
pool.enqueueTx(hash, tx, false, false) pool.enqueueTx(hash, tx, false)
} }
pendingGauge.Dec(int64(len(gapped))) pendingGauge.Dec(int64(len(gapped)))
} }
@ -1798,20 +1652,15 @@ func (as *accountSet) merge(other *accountSet) {
// internal mechanisms. The sole purpose of the type is to permit out-of-bound // internal mechanisms. The sole purpose of the type is to permit out-of-bound
// peeking into the pool in LegacyPool.Get without having to acquire the widely scoped // peeking into the pool in LegacyPool.Get without having to acquire the widely scoped
// LegacyPool.mu mutex. // LegacyPool.mu mutex.
//
// This lookup set combines the notion of "local transactions", which is useful
// to build upper-level structure.
type lookup struct { type lookup struct {
slots int slots int
lock sync.RWMutex lock sync.RWMutex
locals map[common.Hash]*types.Transaction
remotes map[common.Hash]*types.Transaction remotes map[common.Hash]*types.Transaction
} }
// newLookup returns a new lookup structure. // newLookup returns a new lookup structure.
func newLookup() *lookup { func newLookup() *lookup {
return &lookup{ return &lookup{
locals: make(map[common.Hash]*types.Transaction),
remotes: make(map[common.Hash]*types.Transaction), remotes: make(map[common.Hash]*types.Transaction),
} }
} }
@ -1819,22 +1668,12 @@ func newLookup() *lookup {
// Range calls f on each key and value present in the map. The callback passed // Range calls f on each key and value present in the map. The callback passed
// should return the indicator whether the iteration needs to be continued. // should return the indicator whether the iteration needs to be continued.
// Callers need to specify which set (or both) to be iterated. // Callers need to specify which set (or both) to be iterated.
func (t *lookup) Range(f func(hash common.Hash, tx *types.Transaction, local bool) bool, local bool, remote bool) { func (t *lookup) Range(f func(hash common.Hash, tx *types.Transaction) bool) {
t.lock.RLock() t.lock.RLock()
defer t.lock.RUnlock() defer t.lock.RUnlock()
for key, value := range t.remotes {
if local { if !f(key, value) {
for key, value := range t.locals { return
if !f(key, value, true) {
return
}
}
}
if remote {
for key, value := range t.remotes {
if !f(key, value, false) {
return
}
} }
} }
} }
@ -1843,21 +1682,9 @@ func (t *lookup) Range(f func(hash common.Hash, tx *types.Transaction, local boo
func (t *lookup) Get(hash common.Hash) *types.Transaction { func (t *lookup) Get(hash common.Hash) *types.Transaction {
t.lock.RLock() t.lock.RLock()
defer t.lock.RUnlock() defer t.lock.RUnlock()
if tx := t.locals[hash]; tx != nil {
return tx
}
return t.remotes[hash] return t.remotes[hash]
} }
// GetLocal returns a transaction if it exists in the lookup, or nil if not found.
func (t *lookup) GetLocal(hash common.Hash) *types.Transaction {
t.lock.RLock()
defer t.lock.RUnlock()
return t.locals[hash]
}
// GetRemote returns a transaction if it exists in the lookup, or nil if not found. // GetRemote returns a transaction if it exists in the lookup, or nil if not found.
func (t *lookup) GetRemote(hash common.Hash) *types.Transaction { func (t *lookup) GetRemote(hash common.Hash) *types.Transaction {
t.lock.RLock() t.lock.RLock()
@ -1871,15 +1698,7 @@ func (t *lookup) Count() int {
t.lock.RLock() t.lock.RLock()
defer t.lock.RUnlock() defer t.lock.RUnlock()
return len(t.locals) + len(t.remotes) return len(t.remotes)
}
// LocalCount returns the current number of local transactions in the lookup.
func (t *lookup) LocalCount() int {
t.lock.RLock()
defer t.lock.RUnlock()
return len(t.locals)
} }
// RemoteCount returns the current number of remote transactions in the lookup. // RemoteCount returns the current number of remote transactions in the lookup.
@ -1899,18 +1718,14 @@ func (t *lookup) Slots() int {
} }
// Add adds a transaction to the lookup. // Add adds a transaction to the lookup.
func (t *lookup) Add(tx *types.Transaction, local bool) { func (t *lookup) Add(tx *types.Transaction) {
t.lock.Lock() t.lock.Lock()
defer t.lock.Unlock() defer t.lock.Unlock()
t.slots += numSlots(tx) t.slots += numSlots(tx)
slotsGauge.Update(int64(t.slots)) slotsGauge.Update(int64(t.slots))
if local { t.remotes[tx.Hash()] = tx
t.locals[tx.Hash()] = tx
} else {
t.remotes[tx.Hash()] = tx
}
} }
// Remove removes a transaction from the lookup. // Remove removes a transaction from the lookup.
@ -1918,10 +1733,7 @@ func (t *lookup) Remove(hash common.Hash) {
t.lock.Lock() t.lock.Lock()
defer t.lock.Unlock() defer t.lock.Unlock()
tx, ok := t.locals[hash] tx, ok := t.remotes[hash]
if !ok {
tx, ok = t.remotes[hash]
}
if !ok { if !ok {
log.Error("No transaction found to be deleted", "hash", hash) log.Error("No transaction found to be deleted", "hash", hash)
return return
@ -1929,36 +1741,18 @@ func (t *lookup) Remove(hash common.Hash) {
t.slots -= numSlots(tx) t.slots -= numSlots(tx)
slotsGauge.Update(int64(t.slots)) slotsGauge.Update(int64(t.slots))
delete(t.locals, hash)
delete(t.remotes, hash) delete(t.remotes, hash)
} }
// RemoteToLocals migrates the transactions belongs to the given locals to locals
// set. The assumption is held the locals set is thread-safe to be used.
func (t *lookup) RemoteToLocals(locals *accountSet) int {
t.lock.Lock()
defer t.lock.Unlock()
var migrated int
for hash, tx := range t.remotes {
if locals.containsTx(tx) {
t.locals[hash] = tx
delete(t.remotes, hash)
migrated += 1
}
}
return migrated
}
// RemotesBelowTip finds all remote transactions below the given tip threshold. // RemotesBelowTip finds all remote transactions below the given tip threshold.
func (t *lookup) RemotesBelowTip(threshold *big.Int) types.Transactions { func (t *lookup) RemotesBelowTip(threshold *big.Int) types.Transactions {
found := make(types.Transactions, 0, 128) found := make(types.Transactions, 0, 128)
t.Range(func(hash common.Hash, tx *types.Transaction, local bool) bool { t.Range(func(hash common.Hash, tx *types.Transaction) bool {
if tx.GasTipCapIntCmp(threshold) < 0 { if tx.GasTipCapIntCmp(threshold) < 0 {
found = append(found, tx) found = append(found, tx)
} }
return true return true
}, false, true) // Only iterate remotes })
return found return found
} }

View File

@ -556,10 +556,7 @@ func newPricedList(all *lookup) *pricedList {
} }
// Put inserts a new transaction into the heap. // Put inserts a new transaction into the heap.
func (l *pricedList) Put(tx *types.Transaction, local bool) { func (l *pricedList) Put(tx *types.Transaction) {
if local {
return
}
// Insert every new transaction to the urgent heap first; Discard will balance the heaps // Insert every new transaction to the urgent heap first; Discard will balance the heaps
heap.Push(&l.urgent, tx) heap.Push(&l.urgent, tx)
} }
@ -612,9 +609,7 @@ func (l *pricedList) underpricedFor(h *priceHeap, tx *types.Transaction) bool {
// Discard finds a number of most underpriced transactions, removes them from the // Discard finds a number of most underpriced transactions, removes them from the
// priced list and returns them for further removal from the entire pool. // priced list and returns them for further removal from the entire pool.
// If noPending is set to true, we will only consider the floating list // If noPending is set to true, we will only consider the floating list
// func (l *pricedList) Discard(slots int) (types.Transactions, bool) {
// Note local transaction won't be considered for eviction.
func (l *pricedList) Discard(slots int, force bool) (types.Transactions, bool) {
drop := make(types.Transactions, 0, slots) // Remote underpriced transactions to drop drop := make(types.Transactions, 0, slots) // Remote underpriced transactions to drop
for slots > 0 { for slots > 0 {
if len(l.urgent.list)*floatingRatio > len(l.floating.list)*urgentRatio { if len(l.urgent.list)*floatingRatio > len(l.floating.list)*urgentRatio {
@ -643,7 +638,7 @@ func (l *pricedList) Discard(slots int, force bool) (types.Transactions, bool) {
} }
} }
// If we still can't make enough room for the new transaction // If we still can't make enough room for the new transaction
if slots > 0 && !force { if slots > 0 {
for _, tx := range drop { for _, tx := range drop {
heap.Push(&l.urgent, tx) heap.Push(&l.urgent, tx)
} }
@ -659,10 +654,10 @@ func (l *pricedList) Reheap() {
start := time.Now() start := time.Now()
l.stales.Store(0) l.stales.Store(0)
l.urgent.list = make([]*types.Transaction, 0, l.all.RemoteCount()) l.urgent.list = make([]*types.Transaction, 0, l.all.RemoteCount())
l.all.Range(func(hash common.Hash, tx *types.Transaction, local bool) bool { l.all.Range(func(hash common.Hash, tx *types.Transaction) bool {
l.urgent.list = append(l.urgent.list, tx) l.urgent.list = append(l.urgent.list, tx)
return true return true
}, false, true) // Only iterate remotes })
heap.Init(&l.urgent) heap.Init(&l.urgent)
// balance out the two heaps by moving the worse half of transactions into the // balance out the two heaps by moving the worse half of transactions into the

View File

@ -132,7 +132,7 @@ type SubPool interface {
// Add enqueues a batch of transactions into the pool if they are valid. Due // Add enqueues a batch of transactions into the pool if they are valid. Due
// to the large transaction churn, add may postpone fully integrating the tx // to the large transaction churn, add may postpone fully integrating the tx
// to a later point to batch multiple ones together. // to a later point to batch multiple ones together.
Add(txs []*types.Transaction, local bool, sync bool) []error Add(txs []*types.Transaction, sync bool) []error
// Pending retrieves all currently processable transactions, grouped by origin // Pending retrieves all currently processable transactions, grouped by origin
// account and sorted by nonce. // account and sorted by nonce.
@ -162,9 +162,6 @@ type SubPool interface {
// pending as well as queued transactions of this address, grouped by nonce. // pending as well as queued transactions of this address, grouped by nonce.
ContentFrom(addr common.Address) ([]*types.Transaction, []*types.Transaction) ContentFrom(addr common.Address) ([]*types.Transaction, []*types.Transaction)
// Locals retrieves the accounts currently considered local by the pool.
Locals() []common.Address
// Status returns the known status (unknown/pending/queued) of a transaction // Status returns the known status (unknown/pending/queued) of a transaction
// identified by their hashes. // identified by their hashes.
Status(hash common.Hash) TxStatus Status(hash common.Hash) TxStatus

View File

@ -353,7 +353,7 @@ func (p *TxPool) Add(txs []*types.Transaction, local bool, sync bool) []error {
errsets := make([][]error, len(p.subpools)) errsets := make([][]error, len(p.subpools))
for i := 0; i < len(p.subpools); i++ { for i := 0; i < len(p.subpools); i++ {
// Note: local is explicitly set to false here. // Note: local is explicitly set to false here.
errsets[i] = p.subpools[i].Add(txsets[i], false, sync) errsets[i] = p.subpools[i].Add(txsets[i], sync)
} }
errs := make([]error, len(txs)) errs := make([]error, len(txs))
for i, split := range splits { for i, split := range splits {
@ -454,23 +454,6 @@ func (p *TxPool) ContentFrom(addr common.Address) ([]*types.Transaction, []*type
return []*types.Transaction{}, []*types.Transaction{} return []*types.Transaction{}, []*types.Transaction{}
} }
// Locals retrieves the accounts currently considered local by the pool.
func (p *TxPool) Locals() []common.Address {
// Retrieve the locals from each subpool and deduplicate them
locals := make(map[common.Address]struct{})
for _, subpool := range p.subpools {
for _, local := range subpool.Locals() {
locals[local] = struct{}{}
}
}
// Flatten and return the deduplicated local set
flat := make([]common.Address, 0, len(locals))
for local := range locals {
flat = append(flat, local)
}
return flat
}
// Status returns the known status (unknown/pending/queued) of a transaction // Status returns the known status (unknown/pending/queued) of a transaction
// identified by its hash. // identified by its hash.
func (p *TxPool) Status(hash common.Hash) TxStatus { func (p *TxPool) Status(hash common.Hash) TxStatus {

View File

@ -237,6 +237,9 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
eth.txPool, err = txpool.New(config.TxPool.PriceLimit, eth.blockchain, []txpool.SubPool{legacyPool, blobPool}) eth.txPool, err = txpool.New(config.TxPool.PriceLimit, eth.blockchain, []txpool.SubPool{legacyPool, blobPool})
if !config.TxPool.NoLocals { if !config.TxPool.NoLocals {
// TODO!
// We also need to handle config.Locals, the accounts that are
// to be treated as locals, regardless of how they arrive to geth.
eth.localTxTracker = legacypool.NewTxTracker(config.TxPool.Journal, eth.localTxTracker = legacypool.NewTxTracker(config.TxPool.Journal,
config.TxPool.Rejournal, config.TxPool.Rejournal,
eth.blockchain.Config(), eth.txPool) eth.blockchain.Config(), eth.txPool)