core/txpool: remove locals-tracking from txpools (#30559)

Replaces  #29297, descendant from #27535

---------

This PR removes `locals` as a concept from transaction pools. Therefore,
the pool now acts as very a good simulation/approximation of how our
peers' pools behave. What this PR does instead, is implement a
locals-tracker, which basically is a little thing which, from time to
time, asks the pool "did you forget this transaction?". If it did, the
tracker resubmits it.

If the txpool _had_ forgotten it, chances are that the peers had also
forgotten it. It will be propagated again.

Doing this change means that we can simplify the pool internals, quite a
lot.

### The semantics of `local` 

Historically, there has been two features, or usecases, that has been
combined into the concept of `locals`.

1. "I want my local node to remember this transaction indefinitely, and
resubmit to the network occasionally"
2. "I want this (valid) transaction included to be top-prio for my
miner"


This PR splits these features up, let's call it `1: local` and `2:
prio`. The `prio` is not actually individual transaction, but rather a
set of `address`es to prioritize.
The attribute `local` means it will be tracked, and `prio` means it will
be prioritized by miner.

For `local`: anything transaction received via the RPC is marked as
`local`, and tracked by the tracker.
For `prio`: any transactions from this sender is included first, when
building a block. The existing commandline-flag `--txpool.locals` sets
the set of `prio` addresses.

---------

Co-authored-by: Gary Rong <garyrong0905@gmail.com>
This commit is contained in:
Martin HS 2025-02-04 17:23:01 +01:00 committed by GitHub
parent e332431cb2
commit 7c7b7f6ab1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 493 additions and 927 deletions

View File

@ -1269,7 +1269,7 @@ func (p *BlobPool) GetBlobs(vhashes []common.Hash) ([]*kzg4844.Blob, []*kzg4844.
// Add inserts a set of blob transactions into the pool if they pass validation (both
// consensus validity and pool restrictions).
func (p *BlobPool) Add(txs []*types.Transaction, local bool, sync bool) []error {
func (p *BlobPool) Add(txs []*types.Transaction, sync bool) []error {
var (
adds = make([]*types.Transaction, 0, len(txs))
errs = make([]error, len(txs))
@ -1701,13 +1701,6 @@ func (p *BlobPool) ContentFrom(addr common.Address) ([]*types.Transaction, []*ty
return []*types.Transaction{}, []*types.Transaction{}
}
// Locals retrieves the accounts currently considered local by the pool.
//
// There is no notion of local accounts in the blob pool.
func (p *BlobPool) Locals() []common.Address {
return []common.Address{}
}
// Status returns the known status (unknown/pending/queued) of a transaction
// identified by their hashes.
func (p *BlobPool) Status(hash common.Hash) txpool.TxStatus {

View File

@ -99,7 +99,6 @@ var (
pendingGauge = metrics.NewRegisteredGauge("txpool/pending", nil)
queuedGauge = metrics.NewRegisteredGauge("txpool/queued", nil)
localGauge = metrics.NewRegisteredGauge("txpool/local", nil)
slotsGauge = metrics.NewRegisteredGauge("txpool/slots", nil)
reheapTimer = metrics.NewRegisteredTimer("txpool/reheap", nil)
@ -159,10 +158,6 @@ var DefaultConfig = Config{
// unreasonable or unworkable.
func (config *Config) sanitize() Config {
conf := *config
if conf.Rejournal < time.Second {
log.Warn("Sanitizing invalid txpool journal time", "provided", conf.Rejournal, "updated", time.Second)
conf.Rejournal = time.Second
}
if conf.PriceLimit < 1 {
log.Warn("Sanitizing invalid txpool price limit", "provided", conf.PriceLimit, "updated", DefaultConfig.PriceLimit)
conf.PriceLimit = DefaultConfig.PriceLimit
@ -214,9 +209,6 @@ type LegacyPool struct {
currentState *state.StateDB // Current state in the blockchain head
pendingNonces *noncer // Pending state tracking virtual nonces
locals *accountSet // Set of local transaction to exempt from eviction rules
journal *journal // Journal of local transaction to back up to disk
reserve txpool.AddressReserver // Address reserver to ensure exclusivity across subpools
pending map[common.Address]*list // All currently processable transactions
queue map[common.Address]*list // Queued but non-processable transactions
@ -262,16 +254,8 @@ func New(config Config, chain BlockChain) *LegacyPool {
reorgShutdownCh: make(chan struct{}),
initDoneCh: make(chan struct{}),
}
pool.locals = newAccountSet(pool.signer)
for _, addr := range config.Locals {
log.Info("Setting new local account", "address", addr)
pool.locals.add(addr)
}
pool.priced = newPricedList(pool.all)
if !config.NoLocals && config.Journal != "" {
pool.journal = newTxJournal(config.Journal)
}
return pool
}
@ -287,8 +271,7 @@ func (pool *LegacyPool) Filter(tx *types.Transaction) bool {
}
// Init sets the gas price needed to keep a transaction in the pool and the chain
// head to allow balance / nonce checks. The transaction journal will be loaded
// from disk and filtered based on the provided starting settings. The internal
// head to allow balance / nonce checks. The internal
// goroutines will be spun up and the pool deemed operational afterwards.
func (pool *LegacyPool) Init(gasTip uint64, head *types.Header, reserve txpool.AddressReserver) error {
// Set the address reserver to request exclusive access to pooled accounts
@ -311,20 +294,9 @@ func (pool *LegacyPool) Init(gasTip uint64, head *types.Header, reserve txpool.A
pool.currentState = statedb
pool.pendingNonces = newNoncer(statedb)
// Start the reorg loop early, so it can handle requests generated during
// journal loading.
pool.wg.Add(1)
go pool.scheduleReorgLoop()
// If local transactions and journaling is enabled, load from disk
if pool.journal != nil {
if err := pool.journal.load(pool.addLocals); err != nil {
log.Warn("Failed to load transaction journal", "err", err)
}
if err := pool.journal.rotate(pool.local()); err != nil {
log.Warn("Failed to rotate transaction journal", "err", err)
}
}
pool.wg.Add(1)
go pool.loop()
return nil
@ -340,13 +312,11 @@ func (pool *LegacyPool) loop() {
prevPending, prevQueued, prevStales int
// Start the stats reporting and transaction eviction tickers
report = time.NewTicker(statsReportInterval)
evict = time.NewTicker(evictionInterval)
journal = time.NewTicker(pool.config.Rejournal)
report = time.NewTicker(statsReportInterval)
evict = time.NewTicker(evictionInterval)
)
defer report.Stop()
defer evict.Stop()
defer journal.Stop()
// Notify tests that the init phase is done
close(pool.initDoneCh)
@ -372,11 +342,7 @@ func (pool *LegacyPool) loop() {
case <-evict.C:
pool.mu.Lock()
for addr := range pool.queue {
// Skip local transactions from the eviction mechanism
if pool.locals.contains(addr) {
continue
}
// Any non-locals old enough should be removed
// Any old enough should be removed
if time.Since(pool.beats[addr]) > pool.config.Lifetime {
list := pool.queue[addr].Flatten()
for _, tx := range list {
@ -386,16 +352,6 @@ func (pool *LegacyPool) loop() {
}
}
pool.mu.Unlock()
// Handle local transaction journal rotation
case <-journal.C:
if pool.journal != nil {
pool.mu.Lock()
if err := pool.journal.rotate(pool.local()); err != nil {
log.Warn("Failed to rotate local tx journal", "err", err)
}
pool.mu.Unlock()
}
}
}
}
@ -406,9 +362,6 @@ func (pool *LegacyPool) Close() error {
close(pool.reorgShutdownCh)
pool.wg.Wait()
if pool.journal != nil {
pool.journal.close()
}
log.Info("Transaction pool stopped")
return nil
}
@ -444,7 +397,7 @@ func (pool *LegacyPool) SetGasTip(tip *big.Int) {
// If the min miner fee increased, remove transactions below the new threshold
if newTip.Cmp(old) > 0 {
// pool.priced is sorted by GasFeeCap, so we have to iterate through pool.all instead
drop := pool.all.RemotesBelowTip(tip)
drop := pool.all.TxsBelowTip(tip)
for _, tx := range drop {
pool.removeTx(tx.Hash(), false, true)
}
@ -549,7 +502,7 @@ func (pool *LegacyPool) Pending(filter txpool.PendingFilter) map[common.Address]
txs := list.Flatten()
// If the miner requests tip enforcement, cap the lists now
if minTipBig != nil && !pool.locals.contains(addr) {
if minTipBig != nil {
for i, tx := range txs {
if tx.EffectiveGasTipIntCmp(minTipBig, baseFeeBig) < 0 {
txs = txs[:i]
@ -577,35 +530,11 @@ func (pool *LegacyPool) Pending(filter txpool.PendingFilter) map[common.Address]
return pending
}
// Locals retrieves the accounts currently considered local by the pool.
func (pool *LegacyPool) Locals() []common.Address {
pool.mu.Lock()
defer pool.mu.Unlock()
return pool.locals.flatten()
}
// local retrieves all currently known local transactions, grouped by origin
// account and sorted by nonce. The returned transaction set is a copy and can be
// freely modified by calling code.
func (pool *LegacyPool) local() map[common.Address]types.Transactions {
txs := make(map[common.Address]types.Transactions)
for addr := range pool.locals.accounts {
if pending := pool.pending[addr]; pending != nil {
txs[addr] = append(txs[addr], pending.Flatten()...)
}
if queued := pool.queue[addr]; queued != nil {
txs[addr] = append(txs[addr], queued.Flatten()...)
}
}
return txs
}
// validateTxBasics checks whether a transaction is valid according to the consensus
// rules, but does not check state-dependent validation such as sufficient balance.
// This check is meant as an early check which only needs to be performed once,
// and does not require the pool mutex to be held.
func (pool *LegacyPool) validateTxBasics(tx *types.Transaction, local bool) error {
func (pool *LegacyPool) validateTxBasics(tx *types.Transaction) error {
opts := &txpool.ValidationOptions{
Config: pool.chainconfig,
Accept: 0 |
@ -615,9 +544,6 @@ func (pool *LegacyPool) validateTxBasics(tx *types.Transaction, local bool) erro
MaxSize: txMaxSize,
MinTip: pool.gasTip.Load().ToBig(),
}
if local {
opts.MinTip = new(big.Int)
}
if err := txpool.ValidateTransaction(tx, pool.currentHead.Load(), pool.signer, opts); err != nil {
return err
}
@ -665,11 +591,7 @@ func (pool *LegacyPool) validateTx(tx *types.Transaction) error {
// add validates a transaction and inserts it into the non-executable queue for later
// pending promotion and execution. If the transaction is a replacement for an already
// pending or queued one, it overwrites the previous transaction if its price is higher.
//
// If a newly added transaction is marked as local, its sending account will be
// added to the allowlist, preventing any associated transaction from being dropped
// out of the pool due to pricing constraints.
func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, err error) {
func (pool *LegacyPool) add(tx *types.Transaction) (replaced bool, err error) {
// If the transaction is already known, discard it
hash := tx.Hash()
if pool.all.Get(hash) != nil {
@ -677,9 +599,6 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
knownTxMeter.Mark(1)
return false, txpool.ErrAlreadyKnown
}
// Make the local flag. If it's from local source or it's from the network but
// the sender is marked as local previously, treat it as the local transaction.
isLocal := local || pool.locals.containsTx(tx)
// If the transaction fails basic validation, discard it
if err := pool.validateTx(tx); err != nil {
@ -715,7 +634,7 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
// If the transaction pool is full, discard underpriced transactions
if uint64(pool.all.Slots()+numSlots(tx)) > pool.config.GlobalSlots+pool.config.GlobalQueue {
// If the new transaction is underpriced, don't accept it
if !isLocal && pool.priced.Underpriced(tx) {
if pool.priced.Underpriced(tx) {
log.Trace("Discarding underpriced transaction", "hash", hash, "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap())
underpricedTxMeter.Mark(1)
return false, txpool.ErrUnderpriced
@ -731,19 +650,18 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
}
// New transaction is better than our worse ones, make room for it.
// If it's a local transaction, forcibly discard all available transactions.
// Otherwise if we can't make enough room for new one, abort the operation.
drop, success := pool.priced.Discard(pool.all.Slots()-int(pool.config.GlobalSlots+pool.config.GlobalQueue)+numSlots(tx), isLocal)
// If we can't make enough room for new one, abort the operation.
drop, success := pool.priced.Discard(pool.all.Slots() - int(pool.config.GlobalSlots+pool.config.GlobalQueue) + numSlots(tx))
// Special case, we still can't make the room for the new remote one.
if !isLocal && !success {
if !success {
log.Trace("Discarding overflown transaction", "hash", hash)
overflowedTxMeter.Mark(1)
return false, ErrTxPoolOverflow
}
// If the new transaction is a future transaction it should never churn pending transactions
if !isLocal && pool.isGapped(from, tx) {
if pool.isGapped(from, tx) {
var replacesPending bool
for _, dropTx := range drop {
dropSender, _ := types.Sender(pool.signer, dropTx)
@ -755,7 +673,7 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
// Add all transactions back to the priced queue
if replacesPending {
for _, dropTx := range drop {
pool.priced.Put(dropTx, false)
pool.priced.Put(dropTx)
}
log.Trace("Discarding future transaction replacing pending tx", "hash", hash)
return false, txpool.ErrFutureReplacePending
@ -788,9 +706,8 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
pool.priced.Removed(1)
pendingReplaceMeter.Mark(1)
}
pool.all.Add(tx, isLocal)
pool.priced.Put(tx, isLocal)
pool.journalTx(from, tx)
pool.all.Add(tx)
pool.priced.Put(tx)
pool.queueTxEvent(tx)
log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To())
@ -799,20 +716,10 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
return old != nil, nil
}
// New transaction isn't replacing a pending one, push into queue
replaced, err = pool.enqueueTx(hash, tx, isLocal, true)
replaced, err = pool.enqueueTx(hash, tx, true)
if err != nil {
return false, err
}
// Mark local addresses and journal local transactions
if local && !pool.locals.contains(from) {
log.Info("Setting new local account", "address", from)
pool.locals.add(from)
pool.priced.Removed(pool.all.RemoteToLocals(pool.locals)) // Migrate the remotes if it's marked as local first time.
}
if isLocal {
localGauge.Inc(1)
}
pool.journalTx(from, tx)
log.Trace("Pooled new future transaction", "hash", hash, "from", from, "to", tx.To())
return replaced, nil
@ -845,7 +752,7 @@ func (pool *LegacyPool) isGapped(from common.Address, tx *types.Transaction) boo
// enqueueTx inserts a new transaction into the non-executable transaction queue.
//
// Note, this method assumes the pool lock is held!
func (pool *LegacyPool) enqueueTx(hash common.Hash, tx *types.Transaction, local bool, addAll bool) (bool, error) {
func (pool *LegacyPool) enqueueTx(hash common.Hash, tx *types.Transaction, addAll bool) (bool, error) {
// Try to insert the transaction into the future queue
from, _ := types.Sender(pool.signer, tx) // already validated
if pool.queue[from] == nil {
@ -872,8 +779,8 @@ func (pool *LegacyPool) enqueueTx(hash common.Hash, tx *types.Transaction, local
log.Error("Missing transaction in lookup set, please report the issue", "hash", hash)
}
if addAll {
pool.all.Add(tx, local)
pool.priced.Put(tx, local)
pool.all.Add(tx)
pool.priced.Put(tx)
}
// If we never record the heartbeat, do it right now.
if _, exist := pool.beats[from]; !exist {
@ -882,18 +789,6 @@ func (pool *LegacyPool) enqueueTx(hash common.Hash, tx *types.Transaction, local
return old != nil, nil
}
// journalTx adds the specified transaction to the local disk journal if it is
// deemed to have been sent from a local account.
func (pool *LegacyPool) journalTx(from common.Address, tx *types.Transaction) {
// Only journal if it's enabled and the transaction is local
if pool.journal == nil || !pool.locals.contains(from) {
return
}
if err := pool.journal.insert(tx); err != nil {
log.Warn("Failed to journal local transaction", "err", err)
}
}
// promoteTx adds a transaction to the pending (processable) list of transactions
// and returns whether it was inserted or an older was better.
//
@ -930,28 +825,13 @@ func (pool *LegacyPool) promoteTx(addr common.Address, hash common.Hash, tx *typ
return true
}
// addLocals enqueues a batch of transactions into the pool if they are valid, marking the
// senders as local ones, ensuring they go around the local pricing constraints.
//
// This method is used to add transactions from the RPC API and performs synchronous pool
// reorganization and event propagation.
func (pool *LegacyPool) addLocals(txs []*types.Transaction) []error {
return pool.Add(txs, !pool.config.NoLocals, true)
}
// addLocal enqueues a single local transaction into the pool if it is valid. This is
// a convenience wrapper around addLocals.
func (pool *LegacyPool) addLocal(tx *types.Transaction) error {
return pool.addLocals([]*types.Transaction{tx})[0]
}
// addRemotes enqueues a batch of transactions into the pool if they are valid. If the
// senders are not among the locally tracked ones, full pricing constraints will apply.
// addRemotes enqueues a batch of transactions into the pool if they are valid.
// Full pricing constraints will apply.
//
// This method is used to add transactions from the p2p network and does not wait for pool
// reorganization and internal event propagation.
func (pool *LegacyPool) addRemotes(txs []*types.Transaction) []error {
return pool.Add(txs, false, false)
return pool.Add(txs, false)
}
// addRemote enqueues a single transaction into the pool if it is valid. This is a convenience
@ -962,23 +842,19 @@ func (pool *LegacyPool) addRemote(tx *types.Transaction) error {
// addRemotesSync is like addRemotes, but waits for pool reorganization. Tests use this method.
func (pool *LegacyPool) addRemotesSync(txs []*types.Transaction) []error {
return pool.Add(txs, false, true)
return pool.Add(txs, true)
}
// This is like addRemotes with a single transaction, but waits for pool reorganization. Tests use this method.
func (pool *LegacyPool) addRemoteSync(tx *types.Transaction) error {
return pool.Add([]*types.Transaction{tx}, false, true)[0]
return pool.Add([]*types.Transaction{tx}, true)[0]
}
// Add enqueues a batch of transactions into the pool if they are valid. Depending
// on the local flag, full pricing constraints will or will not be applied.
// Add enqueues a batch of transactions into the pool if they are valid.
//
// If sync is set, the method will block until all internal maintenance related
// to the add is finished. Only use this during tests for determinism!
func (pool *LegacyPool) Add(txs []*types.Transaction, local, sync bool) []error {
// Do not treat as local if local transactions have been disabled
local = local && !pool.config.NoLocals
func (pool *LegacyPool) Add(txs []*types.Transaction, sync bool) []error {
// Filter out known ones without obtaining the pool lock or recovering signatures
var (
errs = make([]error, len(txs))
@ -994,7 +870,7 @@ func (pool *LegacyPool) Add(txs []*types.Transaction, local, sync bool) []error
// Exclude transactions with basic errors, e.g invalid signatures and
// insufficient intrinsic gas as soon as possible and cache senders
// in transactions before obtaining lock
if err := pool.validateTxBasics(tx, local); err != nil {
if err := pool.validateTxBasics(tx); err != nil {
errs[i] = err
log.Trace("Discarding invalid transaction", "hash", tx.Hash(), "err", err)
invalidTxMeter.Mark(1)
@ -1009,7 +885,7 @@ func (pool *LegacyPool) Add(txs []*types.Transaction, local, sync bool) []error
// Process all the new transaction and merge any errors into the original slice
pool.mu.Lock()
newErrs, dirtyAddrs := pool.addTxsLocked(news, local)
newErrs, dirtyAddrs := pool.addTxsLocked(news)
pool.mu.Unlock()
var nilSlot = 0
@ -1030,11 +906,11 @@ func (pool *LegacyPool) Add(txs []*types.Transaction, local, sync bool) []error
// addTxsLocked attempts to queue a batch of transactions if they are valid.
// The transaction pool lock must be held.
func (pool *LegacyPool) addTxsLocked(txs []*types.Transaction, local bool) ([]error, *accountSet) {
func (pool *LegacyPool) addTxsLocked(txs []*types.Transaction) ([]error, *accountSet) {
dirty := newAccountSet(pool.signer)
errs := make([]error, len(txs))
for i, tx := range txs {
replaced, err := pool.add(tx, local)
replaced, err := pool.add(tx)
errs[i] = err
if err == nil && !replaced {
dirty.addTx(tx)
@ -1126,9 +1002,6 @@ func (pool *LegacyPool) removeTx(hash common.Hash, outofbound bool, unreserve bo
if outofbound {
pool.priced.Removed(1)
}
if pool.locals.contains(addr) {
localGauge.Dec(1)
}
// Remove the transaction from the pending lists and reset the account nonce
if pending := pool.pending[addr]; pending != nil {
if removed, invalids := pending.Remove(tx); removed {
@ -1139,7 +1012,7 @@ func (pool *LegacyPool) removeTx(hash common.Hash, outofbound bool, unreserve bo
// Postpone any invalidated transactions
for _, tx := range invalids {
// Internal shuffle shouldn't touch the lookup set.
pool.enqueueTx(tx.Hash(), tx, false, false)
pool.enqueueTx(tx.Hash(), tx, false)
}
// Update the account nonce if needed
pool.pendingNonces.setIfLower(addr, tx.Nonce())
@ -1204,7 +1077,7 @@ func (pool *LegacyPool) scheduleReorgLoop() {
launchNextRun bool
reset *txpoolResetRequest
dirtyAccounts *accountSet
queuedEvents = make(map[common.Address]*sortedMap)
queuedEvents = make(map[common.Address]*SortedMap)
)
for {
// Launch next background reorg if needed
@ -1217,7 +1090,7 @@ func (pool *LegacyPool) scheduleReorgLoop() {
launchNextRun = false
reset, dirtyAccounts = nil, nil
queuedEvents = make(map[common.Address]*sortedMap)
queuedEvents = make(map[common.Address]*SortedMap)
}
select {
@ -1246,7 +1119,7 @@ func (pool *LegacyPool) scheduleReorgLoop() {
// request one later if they want the events sent.
addr, _ := types.Sender(pool.signer, tx)
if _, ok := queuedEvents[addr]; !ok {
queuedEvents[addr] = newSortedMap()
queuedEvents[addr] = NewSortedMap()
}
queuedEvents[addr].Put(tx)
@ -1265,7 +1138,7 @@ func (pool *LegacyPool) scheduleReorgLoop() {
}
// runReorg runs reset and promoteExecutables on behalf of scheduleReorgLoop.
func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest, dirtyAccounts *accountSet, events map[common.Address]*sortedMap) {
func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest, dirtyAccounts *accountSet, events map[common.Address]*SortedMap) {
defer func(t0 time.Time) {
reorgDurationTimer.Update(time.Since(t0))
}(time.Now())
@ -1332,7 +1205,7 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest,
for _, tx := range promoted {
addr, _ := types.Sender(pool.signer, tx)
if _, ok := events[addr]; !ok {
events[addr] = newSortedMap()
events[addr] = NewSortedMap()
}
events[addr].Put(tx)
}
@ -1441,7 +1314,7 @@ func (pool *LegacyPool) reset(oldHead, newHead *types.Header) {
// Inject any transactions discarded due to reorgs
log.Debug("Reinjecting stale transactions", "count", len(reinject))
core.SenderCacher().Recover(pool.signer, reinject)
pool.addTxsLocked(reinject, false)
pool.addTxsLocked(reinject)
}
// promoteExecutables moves transactions that have become processable from the
@ -1486,22 +1359,17 @@ func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.T
queuedGauge.Dec(int64(len(readies)))
// Drop all transactions over the allowed limit
var caps types.Transactions
if !pool.locals.contains(addr) {
caps = list.Cap(int(pool.config.AccountQueue))
for _, tx := range caps {
hash := tx.Hash()
pool.all.Remove(hash)
log.Trace("Removed cap-exceeding queued transaction", "hash", hash)
}
queuedRateLimitMeter.Mark(int64(len(caps)))
var caps = list.Cap(int(pool.config.AccountQueue))
for _, tx := range caps {
hash := tx.Hash()
pool.all.Remove(hash)
log.Trace("Removed cap-exceeding queued transaction", "hash", hash)
}
queuedRateLimitMeter.Mark(int64(len(caps)))
// Mark all the items dropped as removed
pool.priced.Removed(len(forwards) + len(drops) + len(caps))
queuedGauge.Dec(int64(len(forwards) + len(drops) + len(caps)))
if pool.locals.contains(addr) {
localGauge.Dec(int64(len(forwards) + len(drops) + len(caps)))
}
// Delete the entire queue entry if it became empty.
if list.Empty() {
delete(pool.queue, addr)
@ -1531,14 +1399,14 @@ func (pool *LegacyPool) truncatePending() {
spammers := prque.New[int64, common.Address](nil)
for addr, list := range pool.pending {
// Only evict transactions from high rollers
if !pool.locals.contains(addr) && uint64(list.Len()) > pool.config.AccountSlots {
if uint64(list.Len()) > pool.config.AccountSlots {
spammers.Push(addr, int64(list.Len()))
}
}
// Gradually drop transactions from offenders
offenders := []common.Address{}
for pending > pool.config.GlobalSlots && !spammers.Empty() {
// Retrieve the next offender if not local address
// Retrieve the next offender
offender, _ := spammers.Pop()
offenders = append(offenders, offender)
@ -1564,9 +1432,7 @@ func (pool *LegacyPool) truncatePending() {
}
pool.priced.Removed(len(caps))
pendingGauge.Dec(int64(len(caps)))
if pool.locals.contains(offenders[i]) {
localGauge.Dec(int64(len(caps)))
}
pending--
}
}
@ -1591,9 +1457,6 @@ func (pool *LegacyPool) truncatePending() {
}
pool.priced.Removed(len(caps))
pendingGauge.Dec(int64(len(caps)))
if pool.locals.contains(addr) {
localGauge.Dec(int64(len(caps)))
}
pending--
}
}
@ -1614,13 +1477,11 @@ func (pool *LegacyPool) truncateQueue() {
// Sort all accounts with queued transactions by heartbeat
addresses := make(addressesByHeartbeat, 0, len(pool.queue))
for addr := range pool.queue {
if !pool.locals.contains(addr) { // don't drop locals
addresses = append(addresses, addressByHeartbeat{addr, pool.beats[addr]})
}
addresses = append(addresses, addressByHeartbeat{addr, pool.beats[addr]})
}
sort.Sort(sort.Reverse(addresses))
// Drop transactions until the total is below the limit or only locals remain
// Drop transactions until the total is below the limit
for drop := queued - pool.config.GlobalQueue; drop > 0 && len(addresses) > 0; {
addr := addresses[len(addresses)-1]
list := pool.queue[addr.address]
@ -1680,12 +1541,10 @@ func (pool *LegacyPool) demoteUnexecutables() {
log.Trace("Demoting pending transaction", "hash", hash)
// Internal shuffle shouldn't touch the lookup set.
pool.enqueueTx(hash, tx, false, false)
pool.enqueueTx(hash, tx, false)
}
pendingGauge.Dec(int64(len(olds) + len(drops) + len(invalids)))
if pool.locals.contains(addr) {
localGauge.Dec(int64(len(olds) + len(drops) + len(invalids)))
}
// If there's a gap in front, alert (should never happen) and postpone all transactions
if list.Len() > 0 && list.txs.Get(nonce) == nil {
gapped := list.Cap(0)
@ -1694,7 +1553,7 @@ func (pool *LegacyPool) demoteUnexecutables() {
log.Error("Demoting invalidated transaction", "hash", hash)
// Internal shuffle shouldn't touch the lookup set.
pool.enqueueTx(hash, tx, false, false)
pool.enqueueTx(hash, tx, false)
}
pendingGauge.Dec(int64(len(gapped)))
}
@ -1741,21 +1600,6 @@ func newAccountSet(signer types.Signer, addrs ...common.Address) *accountSet {
return as
}
// contains checks if a given address is contained within the set.
func (as *accountSet) contains(addr common.Address) bool {
_, exist := as.accounts[addr]
return exist
}
// containsTx checks if the sender of a given tx is within the set. If the sender
// cannot be derived, this method returns false.
func (as *accountSet) containsTx(tx *types.Transaction) bool {
if addr, err := types.Sender(as.signer, tx); err == nil {
return as.contains(addr)
}
return false
}
// add inserts a new address into the set to track.
func (as *accountSet) add(addr common.Address) {
as.accounts[addr] = struct{}{}
@ -1793,43 +1637,29 @@ func (as *accountSet) merge(other *accountSet) {
// internal mechanisms. The sole purpose of the type is to permit out-of-bound
// peeking into the pool in LegacyPool.Get without having to acquire the widely scoped
// LegacyPool.mu mutex.
//
// This lookup set combines the notion of "local transactions", which is useful
// to build upper-level structure.
type lookup struct {
slots int
lock sync.RWMutex
locals map[common.Hash]*types.Transaction
remotes map[common.Hash]*types.Transaction
slots int
lock sync.RWMutex
txs map[common.Hash]*types.Transaction
}
// newLookup returns a new lookup structure.
func newLookup() *lookup {
return &lookup{
locals: make(map[common.Hash]*types.Transaction),
remotes: make(map[common.Hash]*types.Transaction),
txs: make(map[common.Hash]*types.Transaction),
}
}
// Range calls f on each key and value present in the map. The callback passed
// should return the indicator whether the iteration needs to be continued.
// Callers need to specify which set (or both) to be iterated.
func (t *lookup) Range(f func(hash common.Hash, tx *types.Transaction, local bool) bool, local bool, remote bool) {
func (t *lookup) Range(f func(hash common.Hash, tx *types.Transaction) bool) {
t.lock.RLock()
defer t.lock.RUnlock()
if local {
for key, value := range t.locals {
if !f(key, value, true) {
return
}
}
}
if remote {
for key, value := range t.remotes {
if !f(key, value, false) {
return
}
for key, value := range t.txs {
if !f(key, value) {
return
}
}
}
@ -1839,26 +1669,7 @@ func (t *lookup) Get(hash common.Hash) *types.Transaction {
t.lock.RLock()
defer t.lock.RUnlock()
if tx := t.locals[hash]; tx != nil {
return tx
}
return t.remotes[hash]
}
// GetLocal returns a transaction if it exists in the lookup, or nil if not found.
func (t *lookup) GetLocal(hash common.Hash) *types.Transaction {
t.lock.RLock()
defer t.lock.RUnlock()
return t.locals[hash]
}
// GetRemote returns a transaction if it exists in the lookup, or nil if not found.
func (t *lookup) GetRemote(hash common.Hash) *types.Transaction {
t.lock.RLock()
defer t.lock.RUnlock()
return t.remotes[hash]
return t.txs[hash]
}
// Count returns the current number of transactions in the lookup.
@ -1866,23 +1677,7 @@ func (t *lookup) Count() int {
t.lock.RLock()
defer t.lock.RUnlock()
return len(t.locals) + len(t.remotes)
}
// LocalCount returns the current number of local transactions in the lookup.
func (t *lookup) LocalCount() int {
t.lock.RLock()
defer t.lock.RUnlock()
return len(t.locals)
}
// RemoteCount returns the current number of remote transactions in the lookup.
func (t *lookup) RemoteCount() int {
t.lock.RLock()
defer t.lock.RUnlock()
return len(t.remotes)
return len(t.txs)
}
// Slots returns the current number of slots used in the lookup.
@ -1894,18 +1689,14 @@ func (t *lookup) Slots() int {
}
// Add adds a transaction to the lookup.
func (t *lookup) Add(tx *types.Transaction, local bool) {
func (t *lookup) Add(tx *types.Transaction) {
t.lock.Lock()
defer t.lock.Unlock()
t.slots += numSlots(tx)
slotsGauge.Update(int64(t.slots))
if local {
t.locals[tx.Hash()] = tx
} else {
t.remotes[tx.Hash()] = tx
}
t.txs[tx.Hash()] = tx
}
// Remove removes a transaction from the lookup.
@ -1913,10 +1704,7 @@ func (t *lookup) Remove(hash common.Hash) {
t.lock.Lock()
defer t.lock.Unlock()
tx, ok := t.locals[hash]
if !ok {
tx, ok = t.remotes[hash]
}
tx, ok := t.txs[hash]
if !ok {
log.Error("No transaction found to be deleted", "hash", hash)
return
@ -1924,36 +1712,18 @@ func (t *lookup) Remove(hash common.Hash) {
t.slots -= numSlots(tx)
slotsGauge.Update(int64(t.slots))
delete(t.locals, hash)
delete(t.remotes, hash)
delete(t.txs, hash)
}
// RemoteToLocals migrates the transactions belongs to the given locals to locals
// set. The assumption is held the locals set is thread-safe to be used.
func (t *lookup) RemoteToLocals(locals *accountSet) int {
t.lock.Lock()
defer t.lock.Unlock()
var migrated int
for hash, tx := range t.remotes {
if locals.containsTx(tx) {
t.locals[hash] = tx
delete(t.remotes, hash)
migrated += 1
}
}
return migrated
}
// RemotesBelowTip finds all remote transactions below the given tip threshold.
func (t *lookup) RemotesBelowTip(threshold *big.Int) types.Transactions {
// TxsBelowTip finds all remote transactions below the given tip threshold.
func (t *lookup) TxsBelowTip(threshold *big.Int) types.Transactions {
found := make(types.Transactions, 0, 128)
t.Range(func(hash common.Hash, tx *types.Transaction, local bool) bool {
t.Range(func(hash common.Hash, tx *types.Transaction) bool {
if tx.GasTipCapIntCmp(threshold) < 0 {
found = append(found, tx)
}
return true
}, false, true) // Only iterate remotes
})
return found
}
@ -1982,24 +1752,13 @@ func (pool *LegacyPool) Clear() {
// The transaction addition may attempt to reserve the sender addr which
// can't happen until Clear releases the reservation lock. Clear cannot
// acquire the subpool lock until the transaction addition is completed.
for _, tx := range pool.all.remotes {
for _, tx := range pool.all.txs {
senderAddr, _ := types.Sender(pool.signer, tx)
pool.reserve(senderAddr, false)
}
for localSender := range pool.locals.accounts {
pool.reserve(localSender, false)
}
pool.all = newLookup()
pool.priced = newPricedList(pool.all)
pool.pending = make(map[common.Address]*list)
pool.queue = make(map[common.Address]*list)
pool.pendingNonces = newNoncer(pool.currentState)
if !pool.config.NoLocals && pool.config.Journal != "" {
pool.journal = newTxJournal(pool.config.Journal)
if err := pool.journal.rotate(pool.local()); err != nil {
log.Warn("Failed to rotate transaction journal", "err", err)
}
}
}

View File

@ -23,7 +23,6 @@ import (
"fmt"
"math/big"
"math/rand"
"os"
"sync"
"sync/atomic"
"testing"
@ -183,7 +182,7 @@ func validatePoolInternals(pool *LegacyPool) error {
return fmt.Errorf("total transaction count %d != %d pending + %d queued", total, pending, queued)
}
pool.priced.Reheap()
priced, remote := pool.priced.urgent.Len()+pool.priced.floating.Len(), pool.all.RemoteCount()
priced, remote := pool.priced.urgent.Len()+pool.priced.floating.Len(), pool.all.Count()
if priced != remote {
return fmt.Errorf("total priced transaction count %d != %d", priced, remote)
}
@ -350,9 +349,6 @@ func TestInvalidTransactions(t *testing.T) {
if err, want := pool.addRemote(tx), txpool.ErrUnderpriced; !errors.Is(err, want) {
t.Errorf("want %v have %v", want, err)
}
if err := pool.addLocal(tx); err != nil {
t.Error("expected", nil, "got", err)
}
}
func TestQueue(t *testing.T) {
@ -366,7 +362,7 @@ func TestQueue(t *testing.T) {
testAddBalance(pool, from, big.NewInt(1000))
<-pool.requestReset(nil, nil)
pool.enqueueTx(tx.Hash(), tx, false, true)
pool.enqueueTx(tx.Hash(), tx, true)
<-pool.requestPromoteExecutables(newAccountSet(pool.signer, from))
if len(pool.pending) != 1 {
t.Error("expected valid txs to be 1 is", len(pool.pending))
@ -375,7 +371,7 @@ func TestQueue(t *testing.T) {
tx = transaction(1, 100, key)
from, _ = deriveSender(tx)
testSetNonce(pool, from, 2)
pool.enqueueTx(tx.Hash(), tx, false, true)
pool.enqueueTx(tx.Hash(), tx, true)
<-pool.requestPromoteExecutables(newAccountSet(pool.signer, from))
if _, ok := pool.pending[from].txs.items[tx.Nonce()]; ok {
@ -399,9 +395,9 @@ func TestQueue2(t *testing.T) {
testAddBalance(pool, from, big.NewInt(1000))
pool.reset(nil, nil)
pool.enqueueTx(tx1.Hash(), tx1, false, true)
pool.enqueueTx(tx2.Hash(), tx2, false, true)
pool.enqueueTx(tx3.Hash(), tx3, false, true)
pool.enqueueTx(tx1.Hash(), tx1, true)
pool.enqueueTx(tx2.Hash(), tx2, true)
pool.enqueueTx(tx3.Hash(), tx3, true)
pool.promoteExecutables([]common.Address{from})
if len(pool.pending) != 1 {
@ -476,14 +472,14 @@ func TestChainFork(t *testing.T) {
resetState()
tx := transaction(0, 100000, key)
if _, err := pool.add(tx, false); err != nil {
if _, err := pool.add(tx); err != nil {
t.Error("didn't expect error", err)
}
pool.removeTx(tx.Hash(), true, true)
// reset the pool's internal state
resetState()
if _, err := pool.add(tx, false); err != nil {
if _, err := pool.add(tx); err != nil {
t.Error("didn't expect error", err)
}
}
@ -510,10 +506,10 @@ func TestDoubleNonce(t *testing.T) {
tx3, _ := types.SignTx(types.NewTransaction(0, common.Address{}, big.NewInt(100), 1000000, big.NewInt(1), nil), signer, key)
// Add the first two transaction, ensure higher priced stays only
if replace, err := pool.add(tx1, false); err != nil || replace {
if replace, err := pool.add(tx1); err != nil || replace {
t.Errorf("first transaction insert failed (%v) or reported replacement (%v)", err, replace)
}
if replace, err := pool.add(tx2, false); err != nil || !replace {
if replace, err := pool.add(tx2); err != nil || !replace {
t.Errorf("second transaction insert failed (%v) or not reported replacement (%v)", err, replace)
}
<-pool.requestPromoteExecutables(newAccountSet(signer, addr))
@ -525,7 +521,7 @@ func TestDoubleNonce(t *testing.T) {
}
// Add the third transaction and ensure it's not saved (smaller price)
pool.add(tx3, false)
pool.add(tx3)
<-pool.requestPromoteExecutables(newAccountSet(signer, addr))
if pool.pending[addr].Len() != 1 {
t.Error("expected 1 pending transactions, got", pool.pending[addr].Len())
@ -548,7 +544,7 @@ func TestMissingNonce(t *testing.T) {
addr := crypto.PubkeyToAddress(key.PublicKey)
testAddBalance(pool, addr, big.NewInt(100000000000000))
tx := transaction(1, 100000, key)
if _, err := pool.add(tx, false); err != nil {
if _, err := pool.add(tx); err != nil {
t.Error("didn't expect error", err)
}
if len(pool.pending) != 0 {
@ -607,21 +603,21 @@ func TestDropping(t *testing.T) {
tx11 = transaction(11, 200, key)
tx12 = transaction(12, 300, key)
)
pool.all.Add(tx0, false)
pool.priced.Put(tx0, false)
pool.all.Add(tx0)
pool.priced.Put(tx0)
pool.promoteTx(account, tx0.Hash(), tx0)
pool.all.Add(tx1, false)
pool.priced.Put(tx1, false)
pool.all.Add(tx1)
pool.priced.Put(tx1)
pool.promoteTx(account, tx1.Hash(), tx1)
pool.all.Add(tx2, false)
pool.priced.Put(tx2, false)
pool.all.Add(tx2)
pool.priced.Put(tx2)
pool.promoteTx(account, tx2.Hash(), tx2)
pool.enqueueTx(tx10.Hash(), tx10, false, true)
pool.enqueueTx(tx11.Hash(), tx11, false, true)
pool.enqueueTx(tx12.Hash(), tx12, false, true)
pool.enqueueTx(tx10.Hash(), tx10, true)
pool.enqueueTx(tx11.Hash(), tx11, true)
pool.enqueueTx(tx12.Hash(), tx12, true)
// Check that pre and post validations leave the pool as is
if pool.pending[account].Len() != 3 {
@ -899,13 +895,6 @@ func TestQueueAccountLimiting(t *testing.T) {
// This logic should not hold for local transactions, unless the local tracking
// mechanism is disabled.
func TestQueueGlobalLimiting(t *testing.T) {
testQueueGlobalLimiting(t, false)
}
func TestQueueGlobalLimitingNoLocals(t *testing.T) {
testQueueGlobalLimiting(t, true)
}
func testQueueGlobalLimiting(t *testing.T, nolocals bool) {
t.Parallel()
// Create the pool to test the limit enforcement with
@ -913,7 +902,7 @@ func testQueueGlobalLimiting(t *testing.T, nolocals bool) {
blockchain := newTestBlockChain(params.TestChainConfig, 1000000, statedb, new(event.Feed))
config := testTxPoolConfig
config.NoLocals = nolocals
config.NoLocals = true
config.GlobalQueue = config.AccountQueue*3 - 1 // reduce the queue limits to shorten test time (-1 to make it non divisible)
pool := New(config, blockchain)
@ -926,7 +915,6 @@ func testQueueGlobalLimiting(t *testing.T, nolocals bool) {
keys[i], _ = crypto.GenerateKey()
testAddBalance(pool, crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000000))
}
local := keys[len(keys)-1]
// Generate and queue a batch of transactions
nonces := make(map[common.Address]uint64)
@ -952,51 +940,12 @@ func testQueueGlobalLimiting(t *testing.T, nolocals bool) {
if queued > int(config.GlobalQueue) {
t.Fatalf("total transactions overflow allowance: %d > %d", queued, config.GlobalQueue)
}
// Generate a batch of transactions from the local account and import them
txs = txs[:0]
for i := uint64(0); i < 3*config.GlobalQueue; i++ {
txs = append(txs, transaction(i+1, 100000, local))
}
pool.addLocals(txs)
// If locals are disabled, the previous eviction algorithm should apply here too
if nolocals {
queued := 0
for addr, list := range pool.queue {
if list.Len() > int(config.AccountQueue) {
t.Errorf("addr %x: queued accounts overflown allowance: %d > %d", addr, list.Len(), config.AccountQueue)
}
queued += list.Len()
}
if queued > int(config.GlobalQueue) {
t.Fatalf("total transactions overflow allowance: %d > %d", queued, config.GlobalQueue)
}
} else {
// Local exemptions are enabled, make sure the local account owned the queue
if len(pool.queue) != 1 {
t.Errorf("multiple accounts in queue: have %v, want %v", len(pool.queue), 1)
}
// Also ensure no local transactions are ever dropped, even if above global limits
if queued := pool.queue[crypto.PubkeyToAddress(local.PublicKey)].Len(); uint64(queued) != 3*config.GlobalQueue {
t.Fatalf("local account queued transaction count mismatch: have %v, want %v", queued, 3*config.GlobalQueue)
}
}
}
// Tests that if an account remains idle for a prolonged amount of time, any
// non-executable transactions queued up are dropped to prevent wasting resources
// on shuffling them around.
//
// This logic should not hold for local transactions, unless the local tracking
// mechanism is disabled.
func TestQueueTimeLimiting(t *testing.T) {
testQueueTimeLimiting(t, false)
}
func TestQueueTimeLimitingNoLocals(t *testing.T) {
testQueueTimeLimiting(t, true)
}
func testQueueTimeLimiting(t *testing.T, nolocals bool) {
// Reduce the eviction interval to a testable amount
defer func(old time.Duration) { evictionInterval = old }(evictionInterval)
evictionInterval = time.Millisecond * 100
@ -1007,23 +956,17 @@ func testQueueTimeLimiting(t *testing.T, nolocals bool) {
config := testTxPoolConfig
config.Lifetime = time.Second
config.NoLocals = nolocals
pool := New(config, blockchain)
pool.Init(config.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver())
defer pool.Close()
// Create two test accounts to ensure remotes expire but locals do not
local, _ := crypto.GenerateKey()
// Create a test account to ensure remotes expire
remote, _ := crypto.GenerateKey()
testAddBalance(pool, crypto.PubkeyToAddress(local.PublicKey), big.NewInt(1000000000))
testAddBalance(pool, crypto.PubkeyToAddress(remote.PublicKey), big.NewInt(1000000000))
// Add the two transactions and ensure they both are queued up
if err := pool.addLocal(pricedTransaction(1, 100000, big.NewInt(1), local)); err != nil {
t.Fatalf("failed to add local transaction: %v", err)
}
// Add the transaction and ensure it is queued up
if err := pool.addRemote(pricedTransaction(1, 100000, big.NewInt(1), remote)); err != nil {
t.Fatalf("failed to add remote transaction: %v", err)
}
@ -1031,7 +974,7 @@ func testQueueTimeLimiting(t *testing.T, nolocals bool) {
if pending != 0 {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 0)
}
if queued != 2 {
if queued != 1 {
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 2)
}
if err := validatePoolInternals(pool); err != nil {
@ -1046,7 +989,7 @@ func testQueueTimeLimiting(t *testing.T, nolocals bool) {
if pending != 0 {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 0)
}
if queued != 2 {
if queued != 1 {
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 2)
}
if err := validatePoolInternals(pool); err != nil {
@ -1060,14 +1003,8 @@ func testQueueTimeLimiting(t *testing.T, nolocals bool) {
if pending != 0 {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 0)
}
if nolocals {
if queued != 0 {
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0)
}
} else {
if queued != 1 {
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 1)
}
if queued != 0 {
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0)
}
if err := validatePoolInternals(pool); err != nil {
t.Fatalf("pool internal state corrupted: %v", err)
@ -1075,7 +1012,6 @@ func testQueueTimeLimiting(t *testing.T, nolocals bool) {
// remove current transactions and increase nonce to prepare for a reset and cleanup
statedb.SetNonce(crypto.PubkeyToAddress(remote.PublicKey), 2)
statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 2)
<-pool.requestReset(nil, nil)
// make sure queue, pending are cleared
@ -1091,18 +1027,12 @@ func testQueueTimeLimiting(t *testing.T, nolocals bool) {
}
// Queue gapped transactions
if err := pool.addLocal(pricedTransaction(4, 100000, big.NewInt(1), local)); err != nil {
t.Fatalf("failed to add remote transaction: %v", err)
}
if err := pool.addRemoteSync(pricedTransaction(4, 100000, big.NewInt(1), remote)); err != nil {
t.Fatalf("failed to add remote transaction: %v", err)
}
time.Sleep(5 * evictionInterval) // A half lifetime pass
// Queue executable transactions, the life cycle should be restarted.
if err := pool.addLocal(pricedTransaction(2, 100000, big.NewInt(1), local)); err != nil {
t.Fatalf("failed to add remote transaction: %v", err)
}
if err := pool.addRemoteSync(pricedTransaction(2, 100000, big.NewInt(1), remote)); err != nil {
t.Fatalf("failed to add remote transaction: %v", err)
}
@ -1110,11 +1040,11 @@ func testQueueTimeLimiting(t *testing.T, nolocals bool) {
// All gapped transactions shouldn't be kicked out
pending, queued = pool.Stats()
if pending != 2 {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 2)
if pending != 1 {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 1)
}
if queued != 2 {
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 2)
if queued != 1 {
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 1)
}
if err := validatePoolInternals(pool); err != nil {
t.Fatalf("pool internal state corrupted: %v", err)
@ -1123,17 +1053,11 @@ func testQueueTimeLimiting(t *testing.T, nolocals bool) {
// The whole life time pass after last promotion, kick out stale transactions
time.Sleep(2 * config.Lifetime)
pending, queued = pool.Stats()
if pending != 2 {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 2)
if pending != 1 {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 1)
}
if nolocals {
if queued != 0 {
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0)
}
} else {
if queued != 1 {
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 1)
}
if queued != 0 {
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0)
}
if err := validatePoolInternals(pool); err != nil {
t.Fatalf("pool internal state corrupted: %v", err)
@ -1363,8 +1287,6 @@ func TestPendingMinimumAllowance(t *testing.T) {
// Tests that setting the transaction pool gas price to a higher value correctly
// discards everything cheaper than that and moves any gapped transactions back
// from the pending pool to the queue.
//
// Note, local transactions are never allowed to be dropped.
func TestRepricing(t *testing.T) {
t.Parallel()
@ -1382,7 +1304,7 @@ func TestRepricing(t *testing.T) {
defer sub.Unsubscribe()
// Create a number of test accounts and fund them
keys := make([]*ecdsa.PrivateKey, 4)
keys := make([]*ecdsa.PrivateKey, 3)
for i := 0; i < len(keys); i++ {
keys[i], _ = crypto.GenerateKey()
testAddBalance(pool, crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000000))
@ -1402,20 +1324,17 @@ func TestRepricing(t *testing.T) {
txs = append(txs, pricedTransaction(2, 100000, big.NewInt(1), keys[2]))
txs = append(txs, pricedTransaction(3, 100000, big.NewInt(2), keys[2]))
ltx := pricedTransaction(0, 100000, big.NewInt(1), keys[3])
// Import the batch and that both pending and queued transactions match up
pool.addRemotesSync(txs)
pool.addLocal(ltx)
pending, queued := pool.Stats()
if pending != 7 {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 7)
if pending != 6 {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 6)
}
if queued != 3 {
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 3)
}
if err := validateEvents(events, 7); err != nil {
if err := validateEvents(events, 6); err != nil {
t.Fatalf("original event firing failed: %v", err)
}
if err := validatePoolInternals(pool); err != nil {
@ -1425,8 +1344,8 @@ func TestRepricing(t *testing.T) {
pool.SetGasTip(big.NewInt(2))
pending, queued = pool.Stats()
if pending != 2 {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 2)
if pending != 1 {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 1)
}
if queued != 5 {
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 5)
@ -1453,21 +1372,7 @@ func TestRepricing(t *testing.T) {
if err := validatePoolInternals(pool); err != nil {
t.Fatalf("pool internal state corrupted: %v", err)
}
// However we can add local underpriced transactions
tx := pricedTransaction(1, 100000, big.NewInt(1), keys[3])
if err := pool.addLocal(tx); err != nil {
t.Fatalf("failed to add underpriced local transaction: %v", err)
}
if pending, _ = pool.Stats(); pending != 3 {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 3)
}
if err := validateEvents(events, 1); err != nil {
t.Fatalf("post-reprice local event firing failed: %v", err)
}
if err := validatePoolInternals(pool); err != nil {
t.Fatalf("pool internal state corrupted: %v", err)
}
// And we can fill gaps with properly priced transactions
// we can fill gaps with properly priced transactions
if err := pool.addRemote(pricedTransaction(1, 100000, big.NewInt(2), keys[0])); err != nil {
t.Fatalf("failed to add pending transaction: %v", err)
}
@ -1504,29 +1409,16 @@ func TestMinGasPriceEnforced(t *testing.T) {
tx := pricedTransaction(0, 100000, big.NewInt(2), key)
pool.SetGasTip(big.NewInt(tx.GasPrice().Int64() + 1))
if err := pool.addLocal(tx); !errors.Is(err, txpool.ErrUnderpriced) {
t.Fatalf("Min tip not enforced")
}
if err := pool.Add([]*types.Transaction{tx}, true, false)[0]; !errors.Is(err, txpool.ErrUnderpriced) {
if err := pool.Add([]*types.Transaction{tx}, true)[0]; !errors.Is(err, txpool.ErrUnderpriced) {
t.Fatalf("Min tip not enforced")
}
tx = dynamicFeeTx(0, 100000, big.NewInt(3), big.NewInt(2), key)
pool.SetGasTip(big.NewInt(tx.GasTipCap().Int64() + 1))
if err := pool.addLocal(tx); !errors.Is(err, txpool.ErrUnderpriced) {
if err := pool.Add([]*types.Transaction{tx}, true)[0]; !errors.Is(err, txpool.ErrUnderpriced) {
t.Fatalf("Min tip not enforced")
}
if err := pool.Add([]*types.Transaction{tx}, true, false)[0]; !errors.Is(err, txpool.ErrUnderpriced) {
t.Fatalf("Min tip not enforced")
}
// Make sure the tx is accepted if locals are enabled
pool.config.NoLocals = false
if err := pool.Add([]*types.Transaction{tx}, true, false)[0]; err != nil {
t.Fatalf("Min tip enforced with locals enabled, error: %v", err)
}
}
// Tests that setting the transaction pool gas price to a higher value correctly
@ -1567,20 +1459,17 @@ func TestRepricingDynamicFee(t *testing.T) {
txs = append(txs, dynamicFeeTx(2, 100000, big.NewInt(1), big.NewInt(1), keys[2]))
txs = append(txs, dynamicFeeTx(3, 100000, big.NewInt(2), big.NewInt(2), keys[2]))
ltx := dynamicFeeTx(0, 100000, big.NewInt(2), big.NewInt(1), keys[3])
// Import the batch and that both pending and queued transactions match up
pool.addRemotesSync(txs)
pool.addLocal(ltx)
pending, queued := pool.Stats()
if pending != 7 {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 7)
if pending != 6 {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 6)
}
if queued != 3 {
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 3)
}
if err := validateEvents(events, 7); err != nil {
if err := validateEvents(events, 6); err != nil {
t.Fatalf("original event firing failed: %v", err)
}
if err := validatePoolInternals(pool); err != nil {
@ -1590,8 +1479,8 @@ func TestRepricingDynamicFee(t *testing.T) {
pool.SetGasTip(big.NewInt(2))
pending, queued = pool.Stats()
if pending != 2 {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 2)
if pending != 1 {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 1)
}
if queued != 5 {
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 5)
@ -1621,20 +1510,7 @@ func TestRepricingDynamicFee(t *testing.T) {
if err := validatePoolInternals(pool); err != nil {
t.Fatalf("pool internal state corrupted: %v", err)
}
// However we can add local underpriced transactions
tx = dynamicFeeTx(1, 100000, big.NewInt(1), big.NewInt(1), keys[3])
if err := pool.addLocal(tx); err != nil {
t.Fatalf("failed to add underpriced local transaction: %v", err)
}
if pending, _ = pool.Stats(); pending != 3 {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 3)
}
if err := validateEvents(events, 1); err != nil {
t.Fatalf("post-reprice local event firing failed: %v", err)
}
if err := validatePoolInternals(pool); err != nil {
t.Fatalf("pool internal state corrupted: %v", err)
}
// And we can fill gaps with properly priced transactions
tx = pricedTransaction(1, 100000, big.NewInt(2), keys[0])
if err := pool.addRemote(tx); err != nil {
@ -1656,77 +1532,6 @@ func TestRepricingDynamicFee(t *testing.T) {
}
}
// Tests that setting the transaction pool gas price to a higher value does not
// remove local transactions (legacy & dynamic fee).
func TestRepricingKeepsLocals(t *testing.T) {
t.Parallel()
// Create the pool to test the pricing enforcement with
statedb, _ := state.New(types.EmptyRootHash, state.NewDatabaseForTesting())
blockchain := newTestBlockChain(eip1559Config, 1000000, statedb, new(event.Feed))
pool := New(testTxPoolConfig, blockchain)
pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver())
defer pool.Close()
// Create a number of test accounts and fund them
keys := make([]*ecdsa.PrivateKey, 3)
for i := 0; i < len(keys); i++ {
keys[i], _ = crypto.GenerateKey()
testAddBalance(pool, crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(100000*1000000))
}
// Create transaction (both pending and queued) with a linearly growing gasprice
for i := uint64(0); i < 500; i++ {
// Add pending transaction.
pendingTx := pricedTransaction(i, 100000, big.NewInt(int64(i)), keys[2])
if err := pool.addLocal(pendingTx); err != nil {
t.Fatal(err)
}
// Add queued transaction.
queuedTx := pricedTransaction(i+501, 100000, big.NewInt(int64(i)), keys[2])
if err := pool.addLocal(queuedTx); err != nil {
t.Fatal(err)
}
// Add pending dynamic fee transaction.
pendingTx = dynamicFeeTx(i, 100000, big.NewInt(int64(i)+1), big.NewInt(int64(i)), keys[1])
if err := pool.addLocal(pendingTx); err != nil {
t.Fatal(err)
}
// Add queued dynamic fee transaction.
queuedTx = dynamicFeeTx(i+501, 100000, big.NewInt(int64(i)+1), big.NewInt(int64(i)), keys[1])
if err := pool.addLocal(queuedTx); err != nil {
t.Fatal(err)
}
}
pending, queued := pool.Stats()
expPending, expQueued := 1000, 1000
validate := func() {
pending, queued = pool.Stats()
if pending != expPending {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, expPending)
}
if queued != expQueued {
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, expQueued)
}
if err := validatePoolInternals(pool); err != nil {
t.Fatalf("pool internal state corrupted: %v", err)
}
}
validate()
// Reprice the pool and check that nothing is dropped
pool.SetGasTip(big.NewInt(2))
validate()
pool.SetGasTip(big.NewInt(2))
pool.SetGasTip(big.NewInt(4))
pool.SetGasTip(big.NewInt(8))
pool.SetGasTip(big.NewInt(100))
validate()
}
// Tests that when the pool reaches its global transaction limit, underpriced
// transactions are gradually shifted out for more expensive ones and any gapped
// pending transactions are moved into the queue.
@ -1756,21 +1561,18 @@ func TestUnderpricing(t *testing.T) {
keys := make([]*ecdsa.PrivateKey, 5)
for i := 0; i < len(keys); i++ {
keys[i], _ = crypto.GenerateKey()
testAddBalance(pool, crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000000))
testAddBalance(pool, crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(10000000))
}
// Generate and queue a batch of transactions, both pending and queued
txs := types.Transactions{}
txs = append(txs, pricedTransaction(0, 100000, big.NewInt(1), keys[0]))
txs = append(txs, pricedTransaction(1, 100000, big.NewInt(2), keys[0]))
txs = append(txs, pricedTransaction(1, 100000, big.NewInt(1), keys[1]))
ltx := pricedTransaction(0, 100000, big.NewInt(1), keys[2])
txs = append(txs, pricedTransaction(0, 100000, big.NewInt(1), keys[0])) // pending
txs = append(txs, pricedTransaction(1, 100000, big.NewInt(2), keys[0])) // pending
txs = append(txs, pricedTransaction(0, 100000, big.NewInt(1), keys[2])) // pending
txs = append(txs, pricedTransaction(1, 100000, big.NewInt(1), keys[1])) // queued
// Import the batch and that both pending and queued transactions match up
pool.addRemotes(txs)
pool.addLocal(ltx)
pool.addRemotesSync(txs)
pending, queued := pool.Stats()
if pending != 3 {
@ -1790,7 +1592,7 @@ func TestUnderpricing(t *testing.T) {
t.Fatalf("adding underpriced pending transaction error mismatch: have %v, want %v", err, txpool.ErrUnderpriced)
}
// Replace a future transaction with a future transaction
if err := pool.addRemoteSync(pricedTransaction(1, 100000, big.NewInt(2), keys[1])); err != nil { // +K1:1 => -K1:1 => Pend K0:0, K0:1, K2:0; Que K1:1
if err := pool.addRemoteSync(pricedTransaction(1, 100000, big.NewInt(5), keys[1])); err != nil { // +K1:1 => -K1:1 => Pend K0:0, K0:1, K2:0; Que K1:1
t.Fatalf("failed to add well priced transaction: %v", err)
}
// Ensure that adding high priced transactions drops cheap ones, but not own
@ -1800,48 +1602,26 @@ func TestUnderpricing(t *testing.T) {
if err := pool.addRemoteSync(pricedTransaction(2, 100000, big.NewInt(4), keys[1])); err != nil { // +K1:2 => -K0:0 => Pend K1:0, K2:0; Que K0:1 K1:2
t.Fatalf("failed to add well priced transaction: %v", err)
}
if err := pool.addRemote(pricedTransaction(3, 100000, big.NewInt(5), keys[1])); err != nil { // +K1:3 => -K0:1 => Pend K1:0, K2:0; Que K1:2 K1:3
if err := pool.addRemoteSync(pricedTransaction(3, 100000, big.NewInt(5), keys[1])); err != nil { // +K1:3 => -K0:1 => Pend K1:0, K2:0; Que K1:2 K1:3
t.Fatalf("failed to add well priced transaction: %v", err)
}
// Ensure that replacing a pending transaction with a future transaction fails
if err := pool.addRemote(pricedTransaction(5, 100000, big.NewInt(6), keys[1])); err != txpool.ErrFutureReplacePending {
if err := pool.addRemoteSync(pricedTransaction(5, 100000, big.NewInt(6), keys[1])); err != txpool.ErrFutureReplacePending {
t.Fatalf("adding future replace transaction error mismatch: have %v, want %v", err, txpool.ErrFutureReplacePending)
}
pending, queued = pool.Stats()
if pending != 2 {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 2)
if pending != 4 {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 4)
}
if queued != 2 {
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 2)
if queued != 0 {
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0)
}
if err := validateEvents(events, 2); err != nil {
if err := validateEvents(events, 4); err != nil {
t.Fatalf("additional event firing failed: %v", err)
}
if err := validatePoolInternals(pool); err != nil {
t.Fatalf("pool internal state corrupted: %v", err)
}
// Ensure that adding local transactions can push out even higher priced ones
ltx = pricedTransaction(1, 100000, big.NewInt(0), keys[2])
if err := pool.addLocal(ltx); err != nil {
t.Fatalf("failed to append underpriced local transaction: %v", err)
}
ltx = pricedTransaction(0, 100000, big.NewInt(0), keys[3])
if err := pool.addLocal(ltx); err != nil {
t.Fatalf("failed to add new underpriced local transaction: %v", err)
}
pending, queued = pool.Stats()
if pending != 3 {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 3)
}
if queued != 1 {
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 1)
}
if err := validateEvents(events, 2); err != nil {
t.Fatalf("local event firing failed: %v", err)
}
if err := validatePoolInternals(pool); err != nil {
t.Fatalf("pool internal state corrupted: %v", err)
}
}
// Tests that more expensive transactions push out cheap ones from the pool, but
@ -1915,8 +1695,6 @@ func TestStableUnderpricing(t *testing.T) {
// Tests that when the pool reaches its global transaction limit, underpriced
// transactions (legacy & dynamic fee) are gradually shifted out for more
// expensive ones and any gapped pending transactions are moved into the queue.
//
// Note, local transactions are never allowed to be dropped.
func TestUnderpricingDynamicFee(t *testing.T) {
t.Parallel()
@ -1941,15 +1719,13 @@ func TestUnderpricingDynamicFee(t *testing.T) {
// Generate and queue a batch of transactions, both pending and queued
txs := types.Transactions{}
txs = append(txs, dynamicFeeTx(0, 100000, big.NewInt(3), big.NewInt(2), keys[0]))
txs = append(txs, pricedTransaction(1, 100000, big.NewInt(2), keys[0]))
txs = append(txs, dynamicFeeTx(1, 100000, big.NewInt(2), big.NewInt(1), keys[1]))
txs = append(txs, dynamicFeeTx(0, 100000, big.NewInt(3), big.NewInt(2), keys[0])) // pending
txs = append(txs, pricedTransaction(1, 100000, big.NewInt(2), keys[0])) // pending
txs = append(txs, dynamicFeeTx(1, 100000, big.NewInt(2), big.NewInt(1), keys[1])) // queued
txs = append(txs, dynamicFeeTx(0, 100000, big.NewInt(2), big.NewInt(1), keys[2])) // pending
ltx := dynamicFeeTx(0, 100000, big.NewInt(2), big.NewInt(1), keys[2])
// Import the batch and that both pending and queued transactions match up
pool.addRemotes(txs) // Pend K0:0, K0:1; Que K1:1
pool.addLocal(ltx) // +K2:0 => Pend K0:0, K0:1, K2:0; Que K1:1
// Import the batch and check that both pending and queued transactions match up
pool.addRemotesSync(txs) // Pend K0:0, K0:1; Que K1:1
pending, queued := pool.Stats()
if pending != 3 {
@ -1967,13 +1743,13 @@ func TestUnderpricingDynamicFee(t *testing.T) {
// Ensure that adding an underpriced transaction fails
tx := dynamicFeeTx(0, 100000, big.NewInt(2), big.NewInt(1), keys[1])
if err := pool.addRemote(tx); !errors.Is(err, txpool.ErrUnderpriced) { // Pend K0:0, K0:1, K2:0; Que K1:1
if err := pool.addRemoteSync(tx); !errors.Is(err, txpool.ErrUnderpriced) { // Pend K0:0, K0:1, K2:0; Que K1:1
t.Fatalf("adding underpriced pending transaction error mismatch: have %v, want %v", err, txpool.ErrUnderpriced)
}
// Ensure that adding high priced transactions drops cheap ones, but not own
tx = pricedTransaction(0, 100000, big.NewInt(2), keys[1])
if err := pool.addRemote(tx); err != nil { // +K1:0, -K1:1 => Pend K0:0, K0:1, K1:0, K2:0; Que -
if err := pool.addRemoteSync(tx); err != nil { // +K1:0, -K1:1 => Pend K0:0, K0:1, K1:0, K2:0; Que -
t.Fatalf("failed to add well priced transaction: %v", err)
}
@ -1986,40 +1762,18 @@ func TestUnderpricingDynamicFee(t *testing.T) {
t.Fatalf("failed to add well priced transaction: %v", err)
}
pending, queued = pool.Stats()
if pending != 2 {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 2)
if pending != 4 {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 4)
}
if queued != 2 {
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 2)
if queued != 0 {
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0)
}
if err := validateEvents(events, 2); err != nil {
if err := validateEvents(events, 3); err != nil {
t.Fatalf("additional event firing failed: %v", err)
}
if err := validatePoolInternals(pool); err != nil {
t.Fatalf("pool internal state corrupted: %v", err)
}
// Ensure that adding local transactions can push out even higher priced ones
ltx = dynamicFeeTx(1, 100000, big.NewInt(0), big.NewInt(0), keys[2])
if err := pool.addLocal(ltx); err != nil {
t.Fatalf("failed to append underpriced local transaction: %v", err)
}
ltx = dynamicFeeTx(0, 100000, big.NewInt(0), big.NewInt(0), keys[3])
if err := pool.addLocal(ltx); err != nil {
t.Fatalf("failed to add new underpriced local transaction: %v", err)
}
pending, queued = pool.Stats()
if pending != 3 {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 3)
}
if queued != 1 {
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 1)
}
if err := validateEvents(events, 2); err != nil {
t.Fatalf("local event firing failed: %v", err)
}
if err := validatePoolInternals(pool); err != nil {
t.Fatalf("pool internal state corrupted: %v", err)
}
}
// Tests whether highest fee cap transaction is retained after a batch of high effective
@ -2039,7 +1793,7 @@ func TestDualHeapEviction(t *testing.T) {
)
check := func(tx *types.Transaction, name string) {
if pool.all.GetRemote(tx.Hash()) == nil {
if pool.all.Get(tx.Hash()) == nil {
t.Fatalf("highest %s transaction evicted from the pool", name)
}
}
@ -2336,122 +2090,6 @@ func TestReplacementDynamicFee(t *testing.T) {
}
}
// Tests that local transactions are journaled to disk, but remote transactions
// get discarded between restarts.
func TestJournaling(t *testing.T) { testJournaling(t, false) }
func TestJournalingNoLocals(t *testing.T) { testJournaling(t, true) }
func testJournaling(t *testing.T, nolocals bool) {
t.Parallel()
// Create a temporary file for the journal
file, err := os.CreateTemp("", "")
if err != nil {
t.Fatalf("failed to create temporary journal: %v", err)
}
journal := file.Name()
defer os.Remove(journal)
// Clean up the temporary file, we only need the path for now
file.Close()
os.Remove(journal)
// Create the original pool to inject transaction into the journal
statedb, _ := state.New(types.EmptyRootHash, state.NewDatabaseForTesting())
blockchain := newTestBlockChain(params.TestChainConfig, 1000000, statedb, new(event.Feed))
config := testTxPoolConfig
config.NoLocals = nolocals
config.Journal = journal
config.Rejournal = time.Second
pool := New(config, blockchain)
pool.Init(config.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver())
// Create two test accounts to ensure remotes expire but locals do not
local, _ := crypto.GenerateKey()
remote, _ := crypto.GenerateKey()
testAddBalance(pool, crypto.PubkeyToAddress(local.PublicKey), big.NewInt(1000000000))
testAddBalance(pool, crypto.PubkeyToAddress(remote.PublicKey), big.NewInt(1000000000))
// Add three local and a remote transactions and ensure they are queued up
if err := pool.addLocal(pricedTransaction(0, 100000, big.NewInt(1), local)); err != nil {
t.Fatalf("failed to add local transaction: %v", err)
}
if err := pool.addLocal(pricedTransaction(1, 100000, big.NewInt(1), local)); err != nil {
t.Fatalf("failed to add local transaction: %v", err)
}
if err := pool.addLocal(pricedTransaction(2, 100000, big.NewInt(1), local)); err != nil {
t.Fatalf("failed to add local transaction: %v", err)
}
if err := pool.addRemoteSync(pricedTransaction(0, 100000, big.NewInt(1), remote)); err != nil {
t.Fatalf("failed to add remote transaction: %v", err)
}
pending, queued := pool.Stats()
if pending != 4 {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 4)
}
if queued != 0 {
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0)
}
if err := validatePoolInternals(pool); err != nil {
t.Fatalf("pool internal state corrupted: %v", err)
}
// Terminate the old pool, bump the local nonce, create a new pool and ensure relevant transaction survive
pool.Close()
statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 1)
blockchain = newTestBlockChain(params.TestChainConfig, 1000000, statedb, new(event.Feed))
pool = New(config, blockchain)
pool.Init(config.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver())
pending, queued = pool.Stats()
if queued != 0 {
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0)
}
if nolocals {
if pending != 0 {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 0)
}
} else {
if pending != 2 {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 2)
}
}
if err := validatePoolInternals(pool); err != nil {
t.Fatalf("pool internal state corrupted: %v", err)
}
// Bump the nonce temporarily and ensure the newly invalidated transaction is removed
statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 2)
<-pool.requestReset(nil, nil)
time.Sleep(2 * config.Rejournal)
pool.Close()
statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 1)
blockchain = newTestBlockChain(params.TestChainConfig, 1000000, statedb, new(event.Feed))
pool = New(config, blockchain)
pool.Init(config.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver())
pending, queued = pool.Stats()
if pending != 0 {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 0)
}
if nolocals {
if queued != 0 {
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0)
}
} else {
if queued != 1 {
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 1)
}
}
if err := validatePoolInternals(pool); err != nil {
t.Fatalf("pool internal state corrupted: %v", err)
}
pool.Close()
}
// TestStatusCheck tests that the pool can correctly retrieve the
// pending status of individual transactions.
func TestStatusCheck(t *testing.T) {
@ -2566,7 +2204,7 @@ func benchmarkFuturePromotion(b *testing.B, size int) {
for i := 0; i < size; i++ {
tx := transaction(uint64(1+i), 100000, key)
pool.enqueueTx(tx.Hash(), tx, false, true)
pool.enqueueTx(tx.Hash(), tx, true)
}
// Benchmark the speed of pool validation
b.ResetTimer()
@ -2576,15 +2214,11 @@ func benchmarkFuturePromotion(b *testing.B, size int) {
}
// Benchmarks the speed of batched transaction insertion.
func BenchmarkBatchInsert100(b *testing.B) { benchmarkBatchInsert(b, 100, false) }
func BenchmarkBatchInsert1000(b *testing.B) { benchmarkBatchInsert(b, 1000, false) }
func BenchmarkBatchInsert10000(b *testing.B) { benchmarkBatchInsert(b, 10000, false) }
func BenchmarkBatchInsert100(b *testing.B) { benchmarkBatchInsert(b, 100) }
func BenchmarkBatchInsert1000(b *testing.B) { benchmarkBatchInsert(b, 1000) }
func BenchmarkBatchInsert10000(b *testing.B) { benchmarkBatchInsert(b, 10000) }
func BenchmarkBatchLocalInsert100(b *testing.B) { benchmarkBatchInsert(b, 100, true) }
func BenchmarkBatchLocalInsert1000(b *testing.B) { benchmarkBatchInsert(b, 1000, true) }
func BenchmarkBatchLocalInsert10000(b *testing.B) { benchmarkBatchInsert(b, 10000, true) }
func benchmarkBatchInsert(b *testing.B, size int, local bool) {
func benchmarkBatchInsert(b *testing.B, size int) {
// Generate a batch of transactions to enqueue into the pool
pool, key := setupPool()
defer pool.Close()
@ -2602,46 +2236,7 @@ func benchmarkBatchInsert(b *testing.B, size int, local bool) {
// Benchmark importing the transactions into the queue
b.ResetTimer()
for _, batch := range batches {
if local {
pool.addLocals(batch)
} else {
pool.addRemotes(batch)
}
}
}
func BenchmarkInsertRemoteWithAllLocals(b *testing.B) {
// Allocate keys for testing
key, _ := crypto.GenerateKey()
account := crypto.PubkeyToAddress(key.PublicKey)
remoteKey, _ := crypto.GenerateKey()
remoteAddr := crypto.PubkeyToAddress(remoteKey.PublicKey)
locals := make([]*types.Transaction, 4096+1024) // Occupy all slots
for i := 0; i < len(locals); i++ {
locals[i] = transaction(uint64(i), 100000, key)
}
remotes := make([]*types.Transaction, 1000)
for i := 0; i < len(remotes); i++ {
remotes[i] = pricedTransaction(uint64(i), 100000, big.NewInt(2), remoteKey) // Higher gasprice
}
// Benchmark importing the transactions into the queue
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
pool, _ := setupPool()
testAddBalance(pool, account, big.NewInt(100000000))
for _, local := range locals {
pool.addLocal(local)
}
b.StartTimer()
// Assign a high enough balance for testing
testAddBalance(pool, remoteAddr, big.NewInt(100000000))
for i := 0; i < len(remotes); i++ {
pool.addRemotes([]*types.Transaction{remotes[i]})
}
pool.Close()
pool.addRemotes(batch)
}
}

View File

@ -52,31 +52,31 @@ func (h *nonceHeap) Pop() interface{} {
return x
}
// sortedMap is a nonce->transaction hash map with a heap based index to allow
// SortedMap is a nonce->transaction hash map with a heap based index to allow
// iterating over the contents in a nonce-incrementing way.
type sortedMap struct {
type SortedMap struct {
items map[uint64]*types.Transaction // Hash map storing the transaction data
index *nonceHeap // Heap of nonces of all the stored transactions (non-strict mode)
cache types.Transactions // Cache of the transactions already sorted
cacheMu sync.Mutex // Mutex covering the cache
}
// newSortedMap creates a new nonce-sorted transaction map.
func newSortedMap() *sortedMap {
return &sortedMap{
// NewSortedMap creates a new nonce-sorted transaction map.
func NewSortedMap() *SortedMap {
return &SortedMap{
items: make(map[uint64]*types.Transaction),
index: new(nonceHeap),
}
}
// Get retrieves the current transactions associated with the given nonce.
func (m *sortedMap) Get(nonce uint64) *types.Transaction {
func (m *SortedMap) Get(nonce uint64) *types.Transaction {
return m.items[nonce]
}
// Put inserts a new transaction into the map, also updating the map's nonce
// index. If a transaction already exists with the same nonce, it's overwritten.
func (m *sortedMap) Put(tx *types.Transaction) {
func (m *SortedMap) Put(tx *types.Transaction) {
nonce := tx.Nonce()
if m.items[nonce] == nil {
heap.Push(m.index, nonce)
@ -89,7 +89,7 @@ func (m *sortedMap) Put(tx *types.Transaction) {
// Forward removes all transactions from the map with a nonce lower than the
// provided threshold. Every removed transaction is returned for any post-removal
// maintenance.
func (m *sortedMap) Forward(threshold uint64) types.Transactions {
func (m *SortedMap) Forward(threshold uint64) types.Transactions {
var removed types.Transactions
// Pop off heap items until the threshold is reached
@ -112,7 +112,7 @@ func (m *sortedMap) Forward(threshold uint64) types.Transactions {
// Filter, as opposed to 'filter', re-initialises the heap after the operation is done.
// If you want to do several consecutive filterings, it's therefore better to first
// do a .filter(func1) followed by .Filter(func2) or reheap()
func (m *sortedMap) Filter(filter func(*types.Transaction) bool) types.Transactions {
func (m *SortedMap) Filter(filter func(*types.Transaction) bool) types.Transactions {
removed := m.filter(filter)
// If transactions were removed, the heap and cache are ruined
if len(removed) > 0 {
@ -121,7 +121,7 @@ func (m *sortedMap) Filter(filter func(*types.Transaction) bool) types.Transacti
return removed
}
func (m *sortedMap) reheap() {
func (m *SortedMap) reheap() {
*m.index = make([]uint64, 0, len(m.items))
for nonce := range m.items {
*m.index = append(*m.index, nonce)
@ -134,7 +134,7 @@ func (m *sortedMap) reheap() {
// filter is identical to Filter, but **does not** regenerate the heap. This method
// should only be used if followed immediately by a call to Filter or reheap()
func (m *sortedMap) filter(filter func(*types.Transaction) bool) types.Transactions {
func (m *SortedMap) filter(filter func(*types.Transaction) bool) types.Transactions {
var removed types.Transactions
// Collect all the transactions to filter out
@ -154,7 +154,7 @@ func (m *sortedMap) filter(filter func(*types.Transaction) bool) types.Transacti
// Cap places a hard limit on the number of items, returning all transactions
// exceeding that limit.
func (m *sortedMap) Cap(threshold int) types.Transactions {
func (m *SortedMap) Cap(threshold int) types.Transactions {
// Short circuit if the number of items is under the limit
if len(m.items) <= threshold {
return nil
@ -181,7 +181,7 @@ func (m *sortedMap) Cap(threshold int) types.Transactions {
// Remove deletes a transaction from the maintained map, returning whether the
// transaction was found.
func (m *sortedMap) Remove(nonce uint64) bool {
func (m *SortedMap) Remove(nonce uint64) bool {
// Short circuit if no transaction is present
_, ok := m.items[nonce]
if !ok {
@ -209,7 +209,7 @@ func (m *sortedMap) Remove(nonce uint64) bool {
// Note, all transactions with nonces lower than start will also be returned to
// prevent getting into an invalid state. This is not something that should ever
// happen but better to be self correcting than failing!
func (m *sortedMap) Ready(start uint64) types.Transactions {
func (m *SortedMap) Ready(start uint64) types.Transactions {
// Short circuit if no transactions are available
if m.index.Len() == 0 || (*m.index)[0] > start {
return nil
@ -229,11 +229,11 @@ func (m *sortedMap) Ready(start uint64) types.Transactions {
}
// Len returns the length of the transaction map.
func (m *sortedMap) Len() int {
func (m *SortedMap) Len() int {
return len(m.items)
}
func (m *sortedMap) flatten() types.Transactions {
func (m *SortedMap) flatten() types.Transactions {
m.cacheMu.Lock()
defer m.cacheMu.Unlock()
// If the sorting was not cached yet, create and cache it
@ -250,7 +250,7 @@ func (m *sortedMap) flatten() types.Transactions {
// Flatten creates a nonce-sorted slice of transactions based on the loosely
// sorted internal representation. The result of the sorting is cached in case
// it's requested again before any modifications are made to the contents.
func (m *sortedMap) Flatten() types.Transactions {
func (m *SortedMap) Flatten() types.Transactions {
cache := m.flatten()
// Copy the cache to prevent accidental modification
txs := make(types.Transactions, len(cache))
@ -260,7 +260,7 @@ func (m *sortedMap) Flatten() types.Transactions {
// LastElement returns the last element of a flattened list, thus, the
// transaction with the highest nonce
func (m *sortedMap) LastElement() *types.Transaction {
func (m *SortedMap) LastElement() *types.Transaction {
cache := m.flatten()
return cache[len(cache)-1]
}
@ -271,7 +271,7 @@ func (m *sortedMap) LastElement() *types.Transaction {
// executable/future queue, with minor behavioral changes.
type list struct {
strict bool // Whether nonces are strictly continuous or not
txs *sortedMap // Heap indexed sorted hash map of the transactions
txs *SortedMap // Heap indexed sorted hash map of the transactions
costcap *uint256.Int // Price of the highest costing transaction (reset only if exceeds balance)
gascap uint64 // Gas limit of the highest spending transaction (reset only if exceeds block limit)
@ -283,7 +283,7 @@ type list struct {
func newList(strict bool) *list {
return &list{
strict: strict,
txs: newSortedMap(),
txs: NewSortedMap(),
costcap: new(uint256.Int),
totalcost: new(uint256.Int),
}
@ -556,10 +556,7 @@ func newPricedList(all *lookup) *pricedList {
}
// Put inserts a new transaction into the heap.
func (l *pricedList) Put(tx *types.Transaction, local bool) {
if local {
return
}
func (l *pricedList) Put(tx *types.Transaction) {
// Insert every new transaction to the urgent heap first; Discard will balance the heaps
heap.Push(&l.urgent, tx)
}
@ -593,7 +590,7 @@ func (l *pricedList) underpricedFor(h *priceHeap, tx *types.Transaction) bool {
// Discard stale price points if found at the heap start
for len(h.list) > 0 {
head := h.list[0]
if l.all.GetRemote(head.Hash()) == nil { // Removed or migrated
if l.all.Get(head.Hash()) == nil { // Removed or migrated
l.stales.Add(-1)
heap.Pop(h)
continue
@ -612,15 +609,13 @@ func (l *pricedList) underpricedFor(h *priceHeap, tx *types.Transaction) bool {
// Discard finds a number of most underpriced transactions, removes them from the
// priced list and returns them for further removal from the entire pool.
// If noPending is set to true, we will only consider the floating list
//
// Note local transaction won't be considered for eviction.
func (l *pricedList) Discard(slots int, force bool) (types.Transactions, bool) {
func (l *pricedList) Discard(slots int) (types.Transactions, bool) {
drop := make(types.Transactions, 0, slots) // Remote underpriced transactions to drop
for slots > 0 {
if len(l.urgent.list)*floatingRatio > len(l.floating.list)*urgentRatio {
// Discard stale transactions if found during cleanup
tx := heap.Pop(&l.urgent).(*types.Transaction)
if l.all.GetRemote(tx.Hash()) == nil { // Removed or migrated
if l.all.Get(tx.Hash()) == nil { // Removed or migrated
l.stales.Add(-1)
continue
}
@ -633,7 +628,7 @@ func (l *pricedList) Discard(slots int, force bool) (types.Transactions, bool) {
}
// Discard stale transactions if found during cleanup
tx := heap.Pop(&l.floating).(*types.Transaction)
if l.all.GetRemote(tx.Hash()) == nil { // Removed or migrated
if l.all.Get(tx.Hash()) == nil { // Removed or migrated
l.stales.Add(-1)
continue
}
@ -643,7 +638,7 @@ func (l *pricedList) Discard(slots int, force bool) (types.Transactions, bool) {
}
}
// If we still can't make enough room for the new transaction
if slots > 0 && !force {
if slots > 0 {
for _, tx := range drop {
heap.Push(&l.urgent, tx)
}
@ -658,11 +653,11 @@ func (l *pricedList) Reheap() {
defer l.reheapMu.Unlock()
start := time.Now()
l.stales.Store(0)
l.urgent.list = make([]*types.Transaction, 0, l.all.RemoteCount())
l.all.Range(func(hash common.Hash, tx *types.Transaction, local bool) bool {
l.urgent.list = make([]*types.Transaction, 0, l.all.Count())
l.all.Range(func(hash common.Hash, tx *types.Transaction) bool {
l.urgent.list = append(l.urgent.list, tx)
return true
}, false, true) // Only iterate remotes
})
heap.Init(&l.urgent)
// balance out the two heaps by moving the worse half of transactions into the

View File

@ -14,7 +14,7 @@
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package legacypool
package locals
import (
"errors"

View File

@ -0,0 +1,212 @@
// Copyright 2023 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
// Package locals implements tracking for "local" transactions
package locals
import (
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/txpool/legacypool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/params"
"golang.org/x/exp/slices"
)
var (
recheckInterval = time.Minute
localGauge = metrics.GetOrRegisterGauge("txpool/local", nil)
)
// TxTracker is a struct used to track priority transactions; it will check from
// time to time if the main pool has forgotten about any of the transaction
// it is tracking, and if so, submit it again.
// This is used to track 'locals'.
// This struct does not care about transaction validity, price-bumps or account limits,
// but optimistically accepts transactions.
type TxTracker struct {
all map[common.Hash]*types.Transaction // All tracked transactions
byAddr map[common.Address]*legacypool.SortedMap // Transactions by address
journal *journal // Journal of local transaction to back up to disk
rejournal time.Duration // How often to rotate journal
pool *txpool.TxPool // The tx pool to interact with
signer types.Signer
shutdownCh chan struct{}
mu sync.Mutex
wg sync.WaitGroup
}
// New creates a new TxTracker
func New(journalPath string, journalTime time.Duration, chainConfig *params.ChainConfig, next *txpool.TxPool) *TxTracker {
pool := &TxTracker{
all: make(map[common.Hash]*types.Transaction),
byAddr: make(map[common.Address]*legacypool.SortedMap),
signer: types.LatestSigner(chainConfig),
shutdownCh: make(chan struct{}),
pool: next,
}
if journalPath != "" {
pool.journal = newTxJournal(journalPath)
pool.rejournal = journalTime
}
return pool
}
// Track adds a transaction to the tracked set.
// Note: blob-type transactions are ignored.
func (tracker *TxTracker) Track(tx *types.Transaction) {
tracker.TrackAll([]*types.Transaction{tx})
}
// TrackAll adds a list of transactions to the tracked set.
// Note: blob-type transactions are ignored.
func (tracker *TxTracker) TrackAll(txs []*types.Transaction) {
tracker.mu.Lock()
defer tracker.mu.Unlock()
for _, tx := range txs {
if tx.Type() == types.BlobTxType {
continue
}
// If we're already tracking it, it's a no-op
if _, ok := tracker.all[tx.Hash()]; ok {
continue
}
addr, err := types.Sender(tracker.signer, tx)
if err != nil { // Ignore this tx
continue
}
tracker.all[tx.Hash()] = tx
if tracker.byAddr[addr] == nil {
tracker.byAddr[addr] = legacypool.NewSortedMap()
}
tracker.byAddr[addr].Put(tx)
if tracker.journal != nil {
_ = tracker.journal.insert(tx)
}
}
localGauge.Update(int64(len(tracker.all)))
}
// recheck checks and returns any transactions that needs to be resubmitted.
func (tracker *TxTracker) recheck(journalCheck bool) (resubmits []*types.Transaction, rejournal map[common.Address]types.Transactions) {
tracker.mu.Lock()
defer tracker.mu.Unlock()
var (
numStales = 0
numOk = 0
)
for sender, txs := range tracker.byAddr {
// Wipe the stales
stales := txs.Forward(tracker.pool.Nonce(sender))
for _, tx := range stales {
delete(tracker.all, tx.Hash())
}
numStales += len(stales)
// Check the non-stale
for _, tx := range txs.Flatten() {
if tracker.pool.Has(tx.Hash()) {
numOk++
continue
}
resubmits = append(resubmits, tx)
}
}
if journalCheck { // rejournal
rejournal = make(map[common.Address]types.Transactions)
for _, tx := range tracker.all {
addr, _ := types.Sender(tracker.signer, tx)
rejournal[addr] = append(rejournal[addr], tx)
}
// Sort them
for _, list := range rejournal {
// cmp(a, b) should return a negative number when a < b,
slices.SortFunc(list, func(a, b *types.Transaction) int {
return int(a.Nonce() - b.Nonce())
})
}
}
localGauge.Update(int64(len(tracker.all)))
log.Debug("Tx tracker status", "need-resubmit", len(resubmits), "stale", numStales, "ok", numOk)
return resubmits, rejournal
}
// Start implements node.Lifecycle interface
// Start is called after all services have been constructed and the networking
// layer was also initialized to spawn any goroutines required by the service.
func (tracker *TxTracker) Start() error {
tracker.wg.Add(1)
go tracker.loop()
return nil
}
// Stop implements node.Lifecycle interface
// Stop terminates all goroutines belonging to the service, blocking until they
// are all terminated.
func (tracker *TxTracker) Stop() error {
close(tracker.shutdownCh)
tracker.wg.Wait()
return nil
}
func (tracker *TxTracker) loop() {
defer tracker.wg.Done()
if tracker.journal != nil {
tracker.journal.load(func(transactions []*types.Transaction) []error {
tracker.TrackAll(transactions)
return nil
})
defer tracker.journal.close()
}
var (
lastJournal = time.Now()
timer = time.NewTimer(10 * time.Second) // Do initial check after 10 seconds, do rechecks more seldom.
)
for {
select {
case <-tracker.shutdownCh:
return
case <-timer.C:
checkJournal := tracker.journal != nil && time.Since(lastJournal) > tracker.rejournal
resubmits, rejournal := tracker.recheck(checkJournal)
if len(resubmits) > 0 {
tracker.pool.Add(resubmits, false)
}
if checkJournal {
// Lock to prevent journal.rotate <-> journal.insert (via TrackAll) conflicts
tracker.mu.Lock()
lastJournal = time.Now()
if err := tracker.journal.rotate(rejournal); err != nil {
log.Warn("Transaction journal rotation failed", "err", err)
}
tracker.mu.Unlock()
}
timer.Reset(recheckInterval)
}
}
}

View File

@ -132,7 +132,7 @@ type SubPool interface {
// Add enqueues a batch of transactions into the pool if they are valid. Due
// to the large transaction churn, add may postpone fully integrating the tx
// to a later point to batch multiple ones together.
Add(txs []*types.Transaction, local bool, sync bool) []error
Add(txs []*types.Transaction, sync bool) []error
// Pending retrieves all currently processable transactions, grouped by origin
// account and sorted by nonce.
@ -162,9 +162,6 @@ type SubPool interface {
// pending as well as queued transactions of this address, grouped by nonce.
ContentFrom(addr common.Address) ([]*types.Transaction, []*types.Transaction)
// Locals retrieves the accounts currently considered local by the pool.
Locals() []common.Address
// Status returns the known status (unknown/pending/queued) of a transaction
// identified by their hashes.
Status(hash common.Hash) TxStatus

View File

@ -328,7 +328,7 @@ func (p *TxPool) GetBlobs(vhashes []common.Hash) ([]*kzg4844.Blob, []*kzg4844.Pr
// Add enqueues a batch of transactions into the pool if they are valid. Due
// to the large transaction churn, add may postpone fully integrating the tx
// to a later point to batch multiple ones together.
func (p *TxPool) Add(txs []*types.Transaction, local bool, sync bool) []error {
func (p *TxPool) Add(txs []*types.Transaction, sync bool) []error {
// Split the input transactions between the subpools. It shouldn't really
// happen that we receive merged batches, but better graceful than strange
// errors.
@ -355,7 +355,7 @@ func (p *TxPool) Add(txs []*types.Transaction, local bool, sync bool) []error {
// back the errors into the original sort order.
errsets := make([][]error, len(p.subpools))
for i := 0; i < len(p.subpools); i++ {
errsets[i] = p.subpools[i].Add(txsets[i], local, sync)
errsets[i] = p.subpools[i].Add(txsets[i], sync)
}
errs := make([]error, len(txs))
for i, split := range splits {
@ -456,23 +456,6 @@ func (p *TxPool) ContentFrom(addr common.Address) ([]*types.Transaction, []*type
return []*types.Transaction{}, []*types.Transaction{}
}
// Locals retrieves the accounts currently considered local by the pool.
func (p *TxPool) Locals() []common.Address {
// Retrieve the locals from each subpool and deduplicate them
locals := make(map[common.Address]struct{})
for _, subpool := range p.subpools {
for _, local := range subpool.Locals() {
locals[local] = struct{}{}
}
}
// Flatten and return the deduplicated local set
flat := make([]common.Address, 0, len(locals))
for local := range locals {
flat = append(flat, local)
}
return flat
}
// Status returns the known status (unknown/pending/queued) of a transaction
// identified by its hash.
func (p *TxPool) Status(hash common.Hash) TxStatus {

View File

@ -272,7 +272,10 @@ func (b *EthAPIBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscri
}
func (b *EthAPIBackend) SendTx(ctx context.Context, signedTx *types.Transaction) error {
return b.eth.txPool.Add([]*types.Transaction{signedTx}, true, false)[0]
if locals := b.eth.localTxTracker; locals != nil {
locals.Track(signedTx)
}
return b.eth.txPool.Add([]*types.Transaction{signedTx}, false)[0]
}
func (b *EthAPIBackend) GetPoolTransactions() (types.Transactions, error) {

View File

@ -23,6 +23,7 @@ import (
"math/big"
"runtime"
"sync"
"time"
"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/common"
@ -35,6 +36,7 @@ import (
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/txpool/blobpool"
"github.com/ethereum/go-ethereum/core/txpool/legacypool"
"github.com/ethereum/go-ethereum/core/txpool/locals"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/eth/downloader"
@ -67,9 +69,10 @@ type Config = ethconfig.Config
// Ethereum implements the Ethereum full node service.
type Ethereum struct {
// core protocol objects
config *ethconfig.Config
txPool *txpool.TxPool
blockchain *core.BlockChain
config *ethconfig.Config
txPool *txpool.TxPool
localTxTracker *locals.TxTracker
blockchain *core.BlockChain
handler *handler
discmix *enode.FairMix
@ -237,6 +240,16 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
if err != nil {
return nil, err
}
if !config.TxPool.NoLocals {
rejournal := config.TxPool.Rejournal
if rejournal < time.Second {
log.Warn("Sanitizing invalid txpool journal time", "provided", rejournal, "updated", time.Second)
rejournal = time.Second
}
eth.localTxTracker = locals.New(config.TxPool.Journal, rejournal, eth.blockchain.Config(), eth.txPool)
stack.RegisterLifecycle(eth.localTxTracker)
}
// Permit the downloader to use the trie cache allowance during fast sync
cacheLimit := cacheConfig.TrieCleanLimit + cacheConfig.TrieDirtyLimit + cacheConfig.SnapshotLimit
if eth.handler, err = newHandler(&handlerConfig{
@ -255,6 +268,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
eth.miner = miner.New(eth, config.Miner, eth.engine)
eth.miner.SetExtra(makeExtraData(config.Miner.ExtraData))
eth.miner.SetPrioAddresses(config.TxPool.Locals)
eth.APIBackend = &EthAPIBackend{stack.Config().ExtRPCEnabled(), stack.Config().AllowUnprotectedTxs, eth, nil}
if eth.APIBackend.allowUnprotectedTxs {

View File

@ -115,7 +115,7 @@ func TestEth2AssembleBlock(t *testing.T) {
if err != nil {
t.Fatalf("error signing transaction, err=%v", err)
}
ethservice.TxPool().Add([]*types.Transaction{tx}, true, true)
ethservice.TxPool().Add([]*types.Transaction{tx}, true)
blockParams := engine.PayloadAttributes{
Timestamp: blocks[9].Time() + 5,
}
@ -152,7 +152,7 @@ func TestEth2AssembleBlockWithAnotherBlocksTxs(t *testing.T) {
// Put the 10th block's tx in the pool and produce a new block
txs := blocks[9].Transactions()
api.eth.TxPool().Add(txs, false, true)
api.eth.TxPool().Add(txs, true)
blockParams := engine.PayloadAttributes{
Timestamp: blocks[8].Time() + 5,
}
@ -174,7 +174,7 @@ func TestEth2PrepareAndGetPayload(t *testing.T) {
// Put the 10th block's tx in the pool and produce a new block
txs := blocks[9].Transactions()
ethservice.TxPool().Add(txs, true, true)
ethservice.TxPool().Add(txs, true)
blockParams := engine.PayloadAttributes{
Timestamp: blocks[8].Time() + 5,
}
@ -294,7 +294,7 @@ func TestEth2NewBlock(t *testing.T) {
statedb, _ := ethservice.BlockChain().StateAt(parent.Root())
nonce := statedb.GetNonce(testAddr)
tx, _ := types.SignTx(types.NewContractCreation(nonce, new(big.Int), 1000000, big.NewInt(2*params.InitialBaseFee), logCode), types.LatestSigner(ethservice.BlockChain().Config()), testKey)
ethservice.TxPool().Add([]*types.Transaction{tx}, true, true)
ethservice.TxPool().Add([]*types.Transaction{tx}, true)
execData, err := assembleWithTransactions(api, parent.Hash(), &engine.PayloadAttributes{
Timestamp: parent.Time() + 5,
@ -463,7 +463,7 @@ func TestFullAPI(t *testing.T) {
statedb, _ := ethservice.BlockChain().StateAt(parent.Root)
nonce := statedb.GetNonce(testAddr)
tx, _ := types.SignTx(types.NewContractCreation(nonce, new(big.Int), 1000000, big.NewInt(2*params.InitialBaseFee), logCode), types.LatestSigner(ethservice.BlockChain().Config()), testKey)
ethservice.TxPool().Add([]*types.Transaction{tx}, true, false)
ethservice.TxPool().Add([]*types.Transaction{tx}, false)
}
setupBlocks(t, ethservice, 10, parent, callback, nil, nil)
@ -594,7 +594,7 @@ func TestNewPayloadOnInvalidChain(t *testing.T) {
GasPrice: big.NewInt(2 * params.InitialBaseFee),
Data: logCode,
})
ethservice.TxPool().Add([]*types.Transaction{tx}, false, true)
ethservice.TxPool().Add([]*types.Transaction{tx}, true)
var (
params = engine.PayloadAttributes{
Timestamp: parent.Time + 1,
@ -1246,8 +1246,8 @@ func setupBodies(t *testing.T) (*node.Node, *eth.Ethereum, []*types.Block) {
// Create tx to trigger deposit generator.
tx2, _ = types.SignTx(types.NewTransaction(statedb.GetNonce(testAddr)+1, ethservice.APIBackend.ChainConfig().DepositContractAddress, new(big.Int), 500000, big.NewInt(2*params.InitialBaseFee), nil), types.LatestSigner(ethservice.BlockChain().Config()), testKey)
)
ethservice.TxPool().Add([]*types.Transaction{tx1}, false, false)
ethservice.TxPool().Add([]*types.Transaction{tx2}, false, false)
ethservice.TxPool().Add([]*types.Transaction{tx1}, false)
ethservice.TxPool().Add([]*types.Transaction{tx2}, false)
}
// Make some withdrawals to include.
@ -1637,7 +1637,7 @@ func TestWitnessCreationAndConsumption(t *testing.T) {
// Put the 10th block's tx in the pool and produce a new block
txs := blocks[9].Transactions()
ethservice.TxPool().Add(txs, true, true)
ethservice.TxPool().Add(txs, true)
blockParams := engine.PayloadAttributes{
Timestamp: blocks[8].Time() + 5,
Withdrawals: make([]*types.Withdrawal, 0),

View File

@ -18,6 +18,7 @@ package catalyst
import (
"context"
"fmt"
"math/big"
"testing"
"time"
@ -143,9 +144,14 @@ func TestSimulatedBeaconSendWithdrawals(t *testing.T) {
// Tests that zero-period dev mode can handle a lot of simultaneous
// transactions/withdrawals
func TestOnDemandSpam(t *testing.T) {
// This test is flaky, due to various causes, and the root cause is synchronicity.
// We have optimistic timeouts here and there in the simulated becaon and the worker.
// This test typically fails on 32-bit windows appveyor.
t.Skip("flaky test")
var (
withdrawals []types.Withdrawal
txs = make(map[common.Hash]*types.Transaction)
txCount = 20000
wxCount = 20
testKey, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
testAddr = crypto.PubkeyToAddress(testKey.PublicKey)
gasLimit uint64 = 10_000_000
@ -160,7 +166,7 @@ func TestOnDemandSpam(t *testing.T) {
defer sub.Unsubscribe()
// generate some withdrawals
for i := 0; i < 20; i++ {
for i := 0; i < wxCount; i++ {
withdrawals = append(withdrawals, types.Withdrawal{Index: uint64(i)})
if err := mock.withdrawals.add(&withdrawals[i]); err != nil {
t.Fatal("addWithdrawal failed", err)
@ -168,37 +174,37 @@ func TestOnDemandSpam(t *testing.T) {
}
// generate a bunch of transactions
for i := 0; i < 20000; i++ {
tx, err := types.SignTx(types.NewTransaction(uint64(i), common.Address{byte(i), byte(1)}, big.NewInt(1000), params.TxGas, big.NewInt(params.InitialBaseFee*2), nil), signer, testKey)
if err != nil {
t.Fatal("error signing transaction", err)
go func() {
for i := 0; i < txCount; i++ {
tx, err := types.SignTx(types.NewTransaction(uint64(i), common.Address{byte(i), byte(1)}, big.NewInt(1000), params.TxGas, big.NewInt(params.InitialBaseFee*2), nil), signer, testKey)
if err != nil {
panic(fmt.Sprintf("error signing transaction: %v", err))
}
if err := eth.TxPool().Add([]*types.Transaction{tx}, false)[0]; err != nil {
panic(fmt.Sprintf("error adding txs to pool: %v", err))
}
}
txs[tx.Hash()] = tx
if err := eth.APIBackend.SendTx(context.Background(), tx); err != nil {
t.Fatal("error adding txs to pool", err)
}
}
}()
var (
includedTxs = make(map[common.Hash]struct{})
includedWxs []uint64
includedTxs int
includedWxs int
abort = time.NewTimer(10 * time.Second)
)
defer abort.Stop()
for {
select {
case ev := <-chainHeadCh:
block := eth.BlockChain().GetBlock(ev.Header.Hash(), ev.Header.Number.Uint64())
for _, itx := range block.Transactions() {
includedTxs[itx.Hash()] = struct{}{}
}
for _, iwx := range block.Withdrawals() {
includedWxs = append(includedWxs, iwx.Index)
}
includedTxs += len(block.Transactions())
includedWxs += len(block.Withdrawals())
// ensure all withdrawals/txs included. this will take two blocks b/c number of withdrawals > 10
if len(includedTxs) == len(txs) && len(includedWxs) == len(withdrawals) {
if includedTxs == txCount && includedWxs == wxCount {
return
}
case <-time.After(10 * time.Second):
t.Fatalf("timed out without including all withdrawals/txs: have txs %d, want %d, have wxs %d, want %d", len(includedTxs), len(txs), len(includedWxs), len(withdrawals))
abort.Reset(10 * time.Second)
case <-abort.C:
t.Fatalf("timed out without including all withdrawals/txs: have txs %d, want %d, have wxs %d, want %d",
includedTxs, txCount, includedWxs, wxCount)
}
}
}

View File

@ -68,7 +68,7 @@ type txPool interface {
Get(hash common.Hash) *types.Transaction
// Add should add the given transactions to the pool.
Add(txs []*types.Transaction, local bool, sync bool) []error
Add(txs []*types.Transaction, sync bool) []error
// Pending should return pending transactions.
// The slice should be modifiable by the caller.
@ -189,7 +189,7 @@ func newHandler(config *handlerConfig) (*handler, error) {
return p.RequestTxs(hashes)
}
addTxs := func(txs []*types.Transaction) []error {
return h.txpool.Add(txs, false, false)
return h.txpool.Add(txs, false)
}
h.txFetcher = fetcher.NewTxFetcher(h.txpool.Has, addTxs, fetchTx, h.removePeer)
return h, nil

View File

@ -299,8 +299,8 @@ func testSendTransactions(t *testing.T, protocol uint) {
tx, _ = types.SignTx(tx, types.HomesteadSigner{}, testKey)
insert[nonce] = tx
}
go handler.txpool.Add(insert, false, false) // Need goroutine to not block on feed
time.Sleep(250 * time.Millisecond) // Wait until tx events get out of the system (can't use events, tx broadcaster races with peer join)
go handler.txpool.Add(insert, false) // Need goroutine to not block on feed
time.Sleep(250 * time.Millisecond) // Wait until tx events get out of the system (can't use events, tx broadcaster races with peer join)
// Create a source handler to send messages through and a sink peer to receive them
p2pSrc, p2pSink := p2p.MsgPipe()
@ -419,7 +419,7 @@ func testTransactionPropagation(t *testing.T, protocol uint) {
tx, _ = types.SignTx(tx, types.HomesteadSigner{}, testKey)
txs[nonce] = tx
}
source.txpool.Add(txs, false, false)
source.txpool.Add(txs, false)
// Iterate through all the sinks and ensure they all got the transactions
for i := range sinks {

View File

@ -80,7 +80,7 @@ func (p *testTxPool) Get(hash common.Hash) *types.Transaction {
// Add appends a batch of transactions to the pool, and notifies any
// listeners if the addition channel is non nil
func (p *testTxPool) Add(txs []*types.Transaction, local bool, sync bool) []error {
func (p *testTxPool) Add(txs []*types.Transaction, sync bool) []error {
p.lock.Lock()
defer p.lock.Unlock()

View File

@ -70,6 +70,7 @@ type Miner struct {
chainConfig *params.ChainConfig
engine consensus.Engine
txpool *txpool.TxPool
prio []common.Address // A list of senders to prioritize
chain *core.BlockChain
pending *pending
pendingMu sync.Mutex // Lock protects the pending block
@ -109,6 +110,13 @@ func (miner *Miner) SetExtra(extra []byte) error {
return nil
}
// SetPrioAddresses sets a list of addresses to prioritize for transaction inclusion.
func (miner *Miner) SetPrioAddresses(prio []common.Address) {
miner.confMu.Lock()
miner.prio = prio
miner.confMu.Unlock()
}
// SetGasCeil sets the gaslimit to strive for when mining blocks post 1559.
// For pre-1559 blocks, it sets the ceiling.
func (miner *Miner) SetGasCeil(ceil uint64) {

View File

@ -138,7 +138,7 @@ func (b *testWorkerBackend) TxPool() *txpool.TxPool { return b.txPool }
func newTestWorker(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine, db ethdb.Database, blocks int) (*Miner, *testWorkerBackend) {
backend := newTestWorkerBackend(t, chainConfig, engine, db, blocks)
backend.txPool.Add(pendingTxs, true, true)
backend.txPool.Add(pendingTxs, true)
w := New(backend, testConfig, engine)
return w, backend
}

View File

@ -427,6 +427,7 @@ func (miner *Miner) commitTransactions(env *environment, plainTxs, blobTxs *tran
func (miner *Miner) fillTransactions(interrupt *atomic.Int32, env *environment) error {
miner.confMu.RLock()
tip := miner.config.GasPrice
prio := miner.prio
miner.confMu.RUnlock()
// Retrieve the pending transactions pre-filtered by the 1559/4844 dynamic fees
@ -446,31 +447,31 @@ func (miner *Miner) fillTransactions(interrupt *atomic.Int32, env *environment)
pendingBlobTxs := miner.txpool.Pending(filter)
// Split the pending transactions into locals and remotes.
localPlainTxs, remotePlainTxs := make(map[common.Address][]*txpool.LazyTransaction), pendingPlainTxs
localBlobTxs, remoteBlobTxs := make(map[common.Address][]*txpool.LazyTransaction), pendingBlobTxs
prioPlainTxs, normalPlainTxs := make(map[common.Address][]*txpool.LazyTransaction), pendingPlainTxs
prioBlobTxs, normalBlobTxs := make(map[common.Address][]*txpool.LazyTransaction), pendingBlobTxs
for _, account := range miner.txpool.Locals() {
if txs := remotePlainTxs[account]; len(txs) > 0 {
delete(remotePlainTxs, account)
localPlainTxs[account] = txs
for _, account := range prio {
if txs := normalPlainTxs[account]; len(txs) > 0 {
delete(normalPlainTxs, account)
prioPlainTxs[account] = txs
}
if txs := remoteBlobTxs[account]; len(txs) > 0 {
delete(remoteBlobTxs, account)
localBlobTxs[account] = txs
if txs := normalBlobTxs[account]; len(txs) > 0 {
delete(normalBlobTxs, account)
prioBlobTxs[account] = txs
}
}
// Fill the block with all available pending transactions.
if len(localPlainTxs) > 0 || len(localBlobTxs) > 0 {
plainTxs := newTransactionsByPriceAndNonce(env.signer, localPlainTxs, env.header.BaseFee)
blobTxs := newTransactionsByPriceAndNonce(env.signer, localBlobTxs, env.header.BaseFee)
if len(prioPlainTxs) > 0 || len(prioBlobTxs) > 0 {
plainTxs := newTransactionsByPriceAndNonce(env.signer, prioPlainTxs, env.header.BaseFee)
blobTxs := newTransactionsByPriceAndNonce(env.signer, prioBlobTxs, env.header.BaseFee)
if err := miner.commitTransactions(env, plainTxs, blobTxs, interrupt); err != nil {
return err
}
}
if len(remotePlainTxs) > 0 || len(remoteBlobTxs) > 0 {
plainTxs := newTransactionsByPriceAndNonce(env.signer, remotePlainTxs, env.header.BaseFee)
blobTxs := newTransactionsByPriceAndNonce(env.signer, remoteBlobTxs, env.header.BaseFee)
if len(normalPlainTxs) > 0 || len(normalBlobTxs) > 0 {
plainTxs := newTransactionsByPriceAndNonce(env.signer, normalPlainTxs, env.header.BaseFee)
blobTxs := newTransactionsByPriceAndNonce(env.signer, normalBlobTxs, env.header.BaseFee)
if err := miner.commitTransactions(env, plainTxs, blobTxs, interrupt); err != nil {
return err