diff --git a/core/txpool/blobpool/blobpool.go b/core/txpool/blobpool/blobpool.go index 78cf2c05af..51b8b67c61 100644 --- a/core/txpool/blobpool/blobpool.go +++ b/core/txpool/blobpool/blobpool.go @@ -1269,7 +1269,7 @@ func (p *BlobPool) GetBlobs(vhashes []common.Hash) ([]*kzg4844.Blob, []*kzg4844. // Add inserts a set of blob transactions into the pool if they pass validation (both // consensus validity and pool restrictions). -func (p *BlobPool) Add(txs []*types.Transaction, local bool, sync bool) []error { +func (p *BlobPool) Add(txs []*types.Transaction, sync bool) []error { var ( adds = make([]*types.Transaction, 0, len(txs)) errs = make([]error, len(txs)) @@ -1701,13 +1701,6 @@ func (p *BlobPool) ContentFrom(addr common.Address) ([]*types.Transaction, []*ty return []*types.Transaction{}, []*types.Transaction{} } -// Locals retrieves the accounts currently considered local by the pool. -// -// There is no notion of local accounts in the blob pool. -func (p *BlobPool) Locals() []common.Address { - return []common.Address{} -} - // Status returns the known status (unknown/pending/queued) of a transaction // identified by their hashes. func (p *BlobPool) Status(hash common.Hash) txpool.TxStatus { diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index 3d780ad373..1440af5440 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -99,7 +99,6 @@ var ( pendingGauge = metrics.NewRegisteredGauge("txpool/pending", nil) queuedGauge = metrics.NewRegisteredGauge("txpool/queued", nil) - localGauge = metrics.NewRegisteredGauge("txpool/local", nil) slotsGauge = metrics.NewRegisteredGauge("txpool/slots", nil) reheapTimer = metrics.NewRegisteredTimer("txpool/reheap", nil) @@ -159,10 +158,6 @@ var DefaultConfig = Config{ // unreasonable or unworkable. func (config *Config) sanitize() Config { conf := *config - if conf.Rejournal < time.Second { - log.Warn("Sanitizing invalid txpool journal time", "provided", conf.Rejournal, "updated", time.Second) - conf.Rejournal = time.Second - } if conf.PriceLimit < 1 { log.Warn("Sanitizing invalid txpool price limit", "provided", conf.PriceLimit, "updated", DefaultConfig.PriceLimit) conf.PriceLimit = DefaultConfig.PriceLimit @@ -214,9 +209,6 @@ type LegacyPool struct { currentState *state.StateDB // Current state in the blockchain head pendingNonces *noncer // Pending state tracking virtual nonces - locals *accountSet // Set of local transaction to exempt from eviction rules - journal *journal // Journal of local transaction to back up to disk - reserve txpool.AddressReserver // Address reserver to ensure exclusivity across subpools pending map[common.Address]*list // All currently processable transactions queue map[common.Address]*list // Queued but non-processable transactions @@ -262,16 +254,8 @@ func New(config Config, chain BlockChain) *LegacyPool { reorgShutdownCh: make(chan struct{}), initDoneCh: make(chan struct{}), } - pool.locals = newAccountSet(pool.signer) - for _, addr := range config.Locals { - log.Info("Setting new local account", "address", addr) - pool.locals.add(addr) - } pool.priced = newPricedList(pool.all) - if !config.NoLocals && config.Journal != "" { - pool.journal = newTxJournal(config.Journal) - } return pool } @@ -287,8 +271,7 @@ func (pool *LegacyPool) Filter(tx *types.Transaction) bool { } // Init sets the gas price needed to keep a transaction in the pool and the chain -// head to allow balance / nonce checks. The transaction journal will be loaded -// from disk and filtered based on the provided starting settings. The internal +// head to allow balance / nonce checks. The internal // goroutines will be spun up and the pool deemed operational afterwards. func (pool *LegacyPool) Init(gasTip uint64, head *types.Header, reserve txpool.AddressReserver) error { // Set the address reserver to request exclusive access to pooled accounts @@ -311,20 +294,9 @@ func (pool *LegacyPool) Init(gasTip uint64, head *types.Header, reserve txpool.A pool.currentState = statedb pool.pendingNonces = newNoncer(statedb) - // Start the reorg loop early, so it can handle requests generated during - // journal loading. pool.wg.Add(1) go pool.scheduleReorgLoop() - // If local transactions and journaling is enabled, load from disk - if pool.journal != nil { - if err := pool.journal.load(pool.addLocals); err != nil { - log.Warn("Failed to load transaction journal", "err", err) - } - if err := pool.journal.rotate(pool.local()); err != nil { - log.Warn("Failed to rotate transaction journal", "err", err) - } - } pool.wg.Add(1) go pool.loop() return nil @@ -340,13 +312,11 @@ func (pool *LegacyPool) loop() { prevPending, prevQueued, prevStales int // Start the stats reporting and transaction eviction tickers - report = time.NewTicker(statsReportInterval) - evict = time.NewTicker(evictionInterval) - journal = time.NewTicker(pool.config.Rejournal) + report = time.NewTicker(statsReportInterval) + evict = time.NewTicker(evictionInterval) ) defer report.Stop() defer evict.Stop() - defer journal.Stop() // Notify tests that the init phase is done close(pool.initDoneCh) @@ -372,11 +342,7 @@ func (pool *LegacyPool) loop() { case <-evict.C: pool.mu.Lock() for addr := range pool.queue { - // Skip local transactions from the eviction mechanism - if pool.locals.contains(addr) { - continue - } - // Any non-locals old enough should be removed + // Any old enough should be removed if time.Since(pool.beats[addr]) > pool.config.Lifetime { list := pool.queue[addr].Flatten() for _, tx := range list { @@ -386,16 +352,6 @@ func (pool *LegacyPool) loop() { } } pool.mu.Unlock() - - // Handle local transaction journal rotation - case <-journal.C: - if pool.journal != nil { - pool.mu.Lock() - if err := pool.journal.rotate(pool.local()); err != nil { - log.Warn("Failed to rotate local tx journal", "err", err) - } - pool.mu.Unlock() - } } } } @@ -406,9 +362,6 @@ func (pool *LegacyPool) Close() error { close(pool.reorgShutdownCh) pool.wg.Wait() - if pool.journal != nil { - pool.journal.close() - } log.Info("Transaction pool stopped") return nil } @@ -444,7 +397,7 @@ func (pool *LegacyPool) SetGasTip(tip *big.Int) { // If the min miner fee increased, remove transactions below the new threshold if newTip.Cmp(old) > 0 { // pool.priced is sorted by GasFeeCap, so we have to iterate through pool.all instead - drop := pool.all.RemotesBelowTip(tip) + drop := pool.all.TxsBelowTip(tip) for _, tx := range drop { pool.removeTx(tx.Hash(), false, true) } @@ -549,7 +502,7 @@ func (pool *LegacyPool) Pending(filter txpool.PendingFilter) map[common.Address] txs := list.Flatten() // If the miner requests tip enforcement, cap the lists now - if minTipBig != nil && !pool.locals.contains(addr) { + if minTipBig != nil { for i, tx := range txs { if tx.EffectiveGasTipIntCmp(minTipBig, baseFeeBig) < 0 { txs = txs[:i] @@ -577,35 +530,11 @@ func (pool *LegacyPool) Pending(filter txpool.PendingFilter) map[common.Address] return pending } -// Locals retrieves the accounts currently considered local by the pool. -func (pool *LegacyPool) Locals() []common.Address { - pool.mu.Lock() - defer pool.mu.Unlock() - - return pool.locals.flatten() -} - -// local retrieves all currently known local transactions, grouped by origin -// account and sorted by nonce. The returned transaction set is a copy and can be -// freely modified by calling code. -func (pool *LegacyPool) local() map[common.Address]types.Transactions { - txs := make(map[common.Address]types.Transactions) - for addr := range pool.locals.accounts { - if pending := pool.pending[addr]; pending != nil { - txs[addr] = append(txs[addr], pending.Flatten()...) - } - if queued := pool.queue[addr]; queued != nil { - txs[addr] = append(txs[addr], queued.Flatten()...) - } - } - return txs -} - // validateTxBasics checks whether a transaction is valid according to the consensus // rules, but does not check state-dependent validation such as sufficient balance. // This check is meant as an early check which only needs to be performed once, // and does not require the pool mutex to be held. -func (pool *LegacyPool) validateTxBasics(tx *types.Transaction, local bool) error { +func (pool *LegacyPool) validateTxBasics(tx *types.Transaction) error { opts := &txpool.ValidationOptions{ Config: pool.chainconfig, Accept: 0 | @@ -615,9 +544,6 @@ func (pool *LegacyPool) validateTxBasics(tx *types.Transaction, local bool) erro MaxSize: txMaxSize, MinTip: pool.gasTip.Load().ToBig(), } - if local { - opts.MinTip = new(big.Int) - } if err := txpool.ValidateTransaction(tx, pool.currentHead.Load(), pool.signer, opts); err != nil { return err } @@ -665,11 +591,7 @@ func (pool *LegacyPool) validateTx(tx *types.Transaction) error { // add validates a transaction and inserts it into the non-executable queue for later // pending promotion and execution. If the transaction is a replacement for an already // pending or queued one, it overwrites the previous transaction if its price is higher. -// -// If a newly added transaction is marked as local, its sending account will be -// added to the allowlist, preventing any associated transaction from being dropped -// out of the pool due to pricing constraints. -func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, err error) { +func (pool *LegacyPool) add(tx *types.Transaction) (replaced bool, err error) { // If the transaction is already known, discard it hash := tx.Hash() if pool.all.Get(hash) != nil { @@ -677,9 +599,6 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e knownTxMeter.Mark(1) return false, txpool.ErrAlreadyKnown } - // Make the local flag. If it's from local source or it's from the network but - // the sender is marked as local previously, treat it as the local transaction. - isLocal := local || pool.locals.containsTx(tx) // If the transaction fails basic validation, discard it if err := pool.validateTx(tx); err != nil { @@ -715,7 +634,7 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e // If the transaction pool is full, discard underpriced transactions if uint64(pool.all.Slots()+numSlots(tx)) > pool.config.GlobalSlots+pool.config.GlobalQueue { // If the new transaction is underpriced, don't accept it - if !isLocal && pool.priced.Underpriced(tx) { + if pool.priced.Underpriced(tx) { log.Trace("Discarding underpriced transaction", "hash", hash, "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap()) underpricedTxMeter.Mark(1) return false, txpool.ErrUnderpriced @@ -731,19 +650,18 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e } // New transaction is better than our worse ones, make room for it. - // If it's a local transaction, forcibly discard all available transactions. - // Otherwise if we can't make enough room for new one, abort the operation. - drop, success := pool.priced.Discard(pool.all.Slots()-int(pool.config.GlobalSlots+pool.config.GlobalQueue)+numSlots(tx), isLocal) + // If we can't make enough room for new one, abort the operation. + drop, success := pool.priced.Discard(pool.all.Slots() - int(pool.config.GlobalSlots+pool.config.GlobalQueue) + numSlots(tx)) // Special case, we still can't make the room for the new remote one. - if !isLocal && !success { + if !success { log.Trace("Discarding overflown transaction", "hash", hash) overflowedTxMeter.Mark(1) return false, ErrTxPoolOverflow } // If the new transaction is a future transaction it should never churn pending transactions - if !isLocal && pool.isGapped(from, tx) { + if pool.isGapped(from, tx) { var replacesPending bool for _, dropTx := range drop { dropSender, _ := types.Sender(pool.signer, dropTx) @@ -755,7 +673,7 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e // Add all transactions back to the priced queue if replacesPending { for _, dropTx := range drop { - pool.priced.Put(dropTx, false) + pool.priced.Put(dropTx) } log.Trace("Discarding future transaction replacing pending tx", "hash", hash) return false, txpool.ErrFutureReplacePending @@ -788,9 +706,8 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e pool.priced.Removed(1) pendingReplaceMeter.Mark(1) } - pool.all.Add(tx, isLocal) - pool.priced.Put(tx, isLocal) - pool.journalTx(from, tx) + pool.all.Add(tx) + pool.priced.Put(tx) pool.queueTxEvent(tx) log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To()) @@ -799,20 +716,10 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e return old != nil, nil } // New transaction isn't replacing a pending one, push into queue - replaced, err = pool.enqueueTx(hash, tx, isLocal, true) + replaced, err = pool.enqueueTx(hash, tx, true) if err != nil { return false, err } - // Mark local addresses and journal local transactions - if local && !pool.locals.contains(from) { - log.Info("Setting new local account", "address", from) - pool.locals.add(from) - pool.priced.Removed(pool.all.RemoteToLocals(pool.locals)) // Migrate the remotes if it's marked as local first time. - } - if isLocal { - localGauge.Inc(1) - } - pool.journalTx(from, tx) log.Trace("Pooled new future transaction", "hash", hash, "from", from, "to", tx.To()) return replaced, nil @@ -845,7 +752,7 @@ func (pool *LegacyPool) isGapped(from common.Address, tx *types.Transaction) boo // enqueueTx inserts a new transaction into the non-executable transaction queue. // // Note, this method assumes the pool lock is held! -func (pool *LegacyPool) enqueueTx(hash common.Hash, tx *types.Transaction, local bool, addAll bool) (bool, error) { +func (pool *LegacyPool) enqueueTx(hash common.Hash, tx *types.Transaction, addAll bool) (bool, error) { // Try to insert the transaction into the future queue from, _ := types.Sender(pool.signer, tx) // already validated if pool.queue[from] == nil { @@ -872,8 +779,8 @@ func (pool *LegacyPool) enqueueTx(hash common.Hash, tx *types.Transaction, local log.Error("Missing transaction in lookup set, please report the issue", "hash", hash) } if addAll { - pool.all.Add(tx, local) - pool.priced.Put(tx, local) + pool.all.Add(tx) + pool.priced.Put(tx) } // If we never record the heartbeat, do it right now. if _, exist := pool.beats[from]; !exist { @@ -882,18 +789,6 @@ func (pool *LegacyPool) enqueueTx(hash common.Hash, tx *types.Transaction, local return old != nil, nil } -// journalTx adds the specified transaction to the local disk journal if it is -// deemed to have been sent from a local account. -func (pool *LegacyPool) journalTx(from common.Address, tx *types.Transaction) { - // Only journal if it's enabled and the transaction is local - if pool.journal == nil || !pool.locals.contains(from) { - return - } - if err := pool.journal.insert(tx); err != nil { - log.Warn("Failed to journal local transaction", "err", err) - } -} - // promoteTx adds a transaction to the pending (processable) list of transactions // and returns whether it was inserted or an older was better. // @@ -930,28 +825,13 @@ func (pool *LegacyPool) promoteTx(addr common.Address, hash common.Hash, tx *typ return true } -// addLocals enqueues a batch of transactions into the pool if they are valid, marking the -// senders as local ones, ensuring they go around the local pricing constraints. -// -// This method is used to add transactions from the RPC API and performs synchronous pool -// reorganization and event propagation. -func (pool *LegacyPool) addLocals(txs []*types.Transaction) []error { - return pool.Add(txs, !pool.config.NoLocals, true) -} - -// addLocal enqueues a single local transaction into the pool if it is valid. This is -// a convenience wrapper around addLocals. -func (pool *LegacyPool) addLocal(tx *types.Transaction) error { - return pool.addLocals([]*types.Transaction{tx})[0] -} - -// addRemotes enqueues a batch of transactions into the pool if they are valid. If the -// senders are not among the locally tracked ones, full pricing constraints will apply. +// addRemotes enqueues a batch of transactions into the pool if they are valid. +// Full pricing constraints will apply. // // This method is used to add transactions from the p2p network and does not wait for pool // reorganization and internal event propagation. func (pool *LegacyPool) addRemotes(txs []*types.Transaction) []error { - return pool.Add(txs, false, false) + return pool.Add(txs, false) } // addRemote enqueues a single transaction into the pool if it is valid. This is a convenience @@ -962,23 +842,19 @@ func (pool *LegacyPool) addRemote(tx *types.Transaction) error { // addRemotesSync is like addRemotes, but waits for pool reorganization. Tests use this method. func (pool *LegacyPool) addRemotesSync(txs []*types.Transaction) []error { - return pool.Add(txs, false, true) + return pool.Add(txs, true) } // This is like addRemotes with a single transaction, but waits for pool reorganization. Tests use this method. func (pool *LegacyPool) addRemoteSync(tx *types.Transaction) error { - return pool.Add([]*types.Transaction{tx}, false, true)[0] + return pool.Add([]*types.Transaction{tx}, true)[0] } -// Add enqueues a batch of transactions into the pool if they are valid. Depending -// on the local flag, full pricing constraints will or will not be applied. +// Add enqueues a batch of transactions into the pool if they are valid. // // If sync is set, the method will block until all internal maintenance related // to the add is finished. Only use this during tests for determinism! -func (pool *LegacyPool) Add(txs []*types.Transaction, local, sync bool) []error { - // Do not treat as local if local transactions have been disabled - local = local && !pool.config.NoLocals - +func (pool *LegacyPool) Add(txs []*types.Transaction, sync bool) []error { // Filter out known ones without obtaining the pool lock or recovering signatures var ( errs = make([]error, len(txs)) @@ -994,7 +870,7 @@ func (pool *LegacyPool) Add(txs []*types.Transaction, local, sync bool) []error // Exclude transactions with basic errors, e.g invalid signatures and // insufficient intrinsic gas as soon as possible and cache senders // in transactions before obtaining lock - if err := pool.validateTxBasics(tx, local); err != nil { + if err := pool.validateTxBasics(tx); err != nil { errs[i] = err log.Trace("Discarding invalid transaction", "hash", tx.Hash(), "err", err) invalidTxMeter.Mark(1) @@ -1009,7 +885,7 @@ func (pool *LegacyPool) Add(txs []*types.Transaction, local, sync bool) []error // Process all the new transaction and merge any errors into the original slice pool.mu.Lock() - newErrs, dirtyAddrs := pool.addTxsLocked(news, local) + newErrs, dirtyAddrs := pool.addTxsLocked(news) pool.mu.Unlock() var nilSlot = 0 @@ -1030,11 +906,11 @@ func (pool *LegacyPool) Add(txs []*types.Transaction, local, sync bool) []error // addTxsLocked attempts to queue a batch of transactions if they are valid. // The transaction pool lock must be held. -func (pool *LegacyPool) addTxsLocked(txs []*types.Transaction, local bool) ([]error, *accountSet) { +func (pool *LegacyPool) addTxsLocked(txs []*types.Transaction) ([]error, *accountSet) { dirty := newAccountSet(pool.signer) errs := make([]error, len(txs)) for i, tx := range txs { - replaced, err := pool.add(tx, local) + replaced, err := pool.add(tx) errs[i] = err if err == nil && !replaced { dirty.addTx(tx) @@ -1126,9 +1002,6 @@ func (pool *LegacyPool) removeTx(hash common.Hash, outofbound bool, unreserve bo if outofbound { pool.priced.Removed(1) } - if pool.locals.contains(addr) { - localGauge.Dec(1) - } // Remove the transaction from the pending lists and reset the account nonce if pending := pool.pending[addr]; pending != nil { if removed, invalids := pending.Remove(tx); removed { @@ -1139,7 +1012,7 @@ func (pool *LegacyPool) removeTx(hash common.Hash, outofbound bool, unreserve bo // Postpone any invalidated transactions for _, tx := range invalids { // Internal shuffle shouldn't touch the lookup set. - pool.enqueueTx(tx.Hash(), tx, false, false) + pool.enqueueTx(tx.Hash(), tx, false) } // Update the account nonce if needed pool.pendingNonces.setIfLower(addr, tx.Nonce()) @@ -1204,7 +1077,7 @@ func (pool *LegacyPool) scheduleReorgLoop() { launchNextRun bool reset *txpoolResetRequest dirtyAccounts *accountSet - queuedEvents = make(map[common.Address]*sortedMap) + queuedEvents = make(map[common.Address]*SortedMap) ) for { // Launch next background reorg if needed @@ -1217,7 +1090,7 @@ func (pool *LegacyPool) scheduleReorgLoop() { launchNextRun = false reset, dirtyAccounts = nil, nil - queuedEvents = make(map[common.Address]*sortedMap) + queuedEvents = make(map[common.Address]*SortedMap) } select { @@ -1246,7 +1119,7 @@ func (pool *LegacyPool) scheduleReorgLoop() { // request one later if they want the events sent. addr, _ := types.Sender(pool.signer, tx) if _, ok := queuedEvents[addr]; !ok { - queuedEvents[addr] = newSortedMap() + queuedEvents[addr] = NewSortedMap() } queuedEvents[addr].Put(tx) @@ -1265,7 +1138,7 @@ func (pool *LegacyPool) scheduleReorgLoop() { } // runReorg runs reset and promoteExecutables on behalf of scheduleReorgLoop. -func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest, dirtyAccounts *accountSet, events map[common.Address]*sortedMap) { +func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest, dirtyAccounts *accountSet, events map[common.Address]*SortedMap) { defer func(t0 time.Time) { reorgDurationTimer.Update(time.Since(t0)) }(time.Now()) @@ -1332,7 +1205,7 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest, for _, tx := range promoted { addr, _ := types.Sender(pool.signer, tx) if _, ok := events[addr]; !ok { - events[addr] = newSortedMap() + events[addr] = NewSortedMap() } events[addr].Put(tx) } @@ -1441,7 +1314,7 @@ func (pool *LegacyPool) reset(oldHead, newHead *types.Header) { // Inject any transactions discarded due to reorgs log.Debug("Reinjecting stale transactions", "count", len(reinject)) core.SenderCacher().Recover(pool.signer, reinject) - pool.addTxsLocked(reinject, false) + pool.addTxsLocked(reinject) } // promoteExecutables moves transactions that have become processable from the @@ -1486,22 +1359,17 @@ func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.T queuedGauge.Dec(int64(len(readies))) // Drop all transactions over the allowed limit - var caps types.Transactions - if !pool.locals.contains(addr) { - caps = list.Cap(int(pool.config.AccountQueue)) - for _, tx := range caps { - hash := tx.Hash() - pool.all.Remove(hash) - log.Trace("Removed cap-exceeding queued transaction", "hash", hash) - } - queuedRateLimitMeter.Mark(int64(len(caps))) + var caps = list.Cap(int(pool.config.AccountQueue)) + for _, tx := range caps { + hash := tx.Hash() + pool.all.Remove(hash) + log.Trace("Removed cap-exceeding queued transaction", "hash", hash) } + queuedRateLimitMeter.Mark(int64(len(caps))) // Mark all the items dropped as removed pool.priced.Removed(len(forwards) + len(drops) + len(caps)) queuedGauge.Dec(int64(len(forwards) + len(drops) + len(caps))) - if pool.locals.contains(addr) { - localGauge.Dec(int64(len(forwards) + len(drops) + len(caps))) - } + // Delete the entire queue entry if it became empty. if list.Empty() { delete(pool.queue, addr) @@ -1531,14 +1399,14 @@ func (pool *LegacyPool) truncatePending() { spammers := prque.New[int64, common.Address](nil) for addr, list := range pool.pending { // Only evict transactions from high rollers - if !pool.locals.contains(addr) && uint64(list.Len()) > pool.config.AccountSlots { + if uint64(list.Len()) > pool.config.AccountSlots { spammers.Push(addr, int64(list.Len())) } } // Gradually drop transactions from offenders offenders := []common.Address{} for pending > pool.config.GlobalSlots && !spammers.Empty() { - // Retrieve the next offender if not local address + // Retrieve the next offender offender, _ := spammers.Pop() offenders = append(offenders, offender) @@ -1564,9 +1432,7 @@ func (pool *LegacyPool) truncatePending() { } pool.priced.Removed(len(caps)) pendingGauge.Dec(int64(len(caps))) - if pool.locals.contains(offenders[i]) { - localGauge.Dec(int64(len(caps))) - } + pending-- } } @@ -1591,9 +1457,6 @@ func (pool *LegacyPool) truncatePending() { } pool.priced.Removed(len(caps)) pendingGauge.Dec(int64(len(caps))) - if pool.locals.contains(addr) { - localGauge.Dec(int64(len(caps))) - } pending-- } } @@ -1614,13 +1477,11 @@ func (pool *LegacyPool) truncateQueue() { // Sort all accounts with queued transactions by heartbeat addresses := make(addressesByHeartbeat, 0, len(pool.queue)) for addr := range pool.queue { - if !pool.locals.contains(addr) { // don't drop locals - addresses = append(addresses, addressByHeartbeat{addr, pool.beats[addr]}) - } + addresses = append(addresses, addressByHeartbeat{addr, pool.beats[addr]}) } sort.Sort(sort.Reverse(addresses)) - // Drop transactions until the total is below the limit or only locals remain + // Drop transactions until the total is below the limit for drop := queued - pool.config.GlobalQueue; drop > 0 && len(addresses) > 0; { addr := addresses[len(addresses)-1] list := pool.queue[addr.address] @@ -1680,12 +1541,10 @@ func (pool *LegacyPool) demoteUnexecutables() { log.Trace("Demoting pending transaction", "hash", hash) // Internal shuffle shouldn't touch the lookup set. - pool.enqueueTx(hash, tx, false, false) + pool.enqueueTx(hash, tx, false) } pendingGauge.Dec(int64(len(olds) + len(drops) + len(invalids))) - if pool.locals.contains(addr) { - localGauge.Dec(int64(len(olds) + len(drops) + len(invalids))) - } + // If there's a gap in front, alert (should never happen) and postpone all transactions if list.Len() > 0 && list.txs.Get(nonce) == nil { gapped := list.Cap(0) @@ -1694,7 +1553,7 @@ func (pool *LegacyPool) demoteUnexecutables() { log.Error("Demoting invalidated transaction", "hash", hash) // Internal shuffle shouldn't touch the lookup set. - pool.enqueueTx(hash, tx, false, false) + pool.enqueueTx(hash, tx, false) } pendingGauge.Dec(int64(len(gapped))) } @@ -1741,21 +1600,6 @@ func newAccountSet(signer types.Signer, addrs ...common.Address) *accountSet { return as } -// contains checks if a given address is contained within the set. -func (as *accountSet) contains(addr common.Address) bool { - _, exist := as.accounts[addr] - return exist -} - -// containsTx checks if the sender of a given tx is within the set. If the sender -// cannot be derived, this method returns false. -func (as *accountSet) containsTx(tx *types.Transaction) bool { - if addr, err := types.Sender(as.signer, tx); err == nil { - return as.contains(addr) - } - return false -} - // add inserts a new address into the set to track. func (as *accountSet) add(addr common.Address) { as.accounts[addr] = struct{}{} @@ -1793,43 +1637,29 @@ func (as *accountSet) merge(other *accountSet) { // internal mechanisms. The sole purpose of the type is to permit out-of-bound // peeking into the pool in LegacyPool.Get without having to acquire the widely scoped // LegacyPool.mu mutex. -// -// This lookup set combines the notion of "local transactions", which is useful -// to build upper-level structure. type lookup struct { - slots int - lock sync.RWMutex - locals map[common.Hash]*types.Transaction - remotes map[common.Hash]*types.Transaction + slots int + lock sync.RWMutex + txs map[common.Hash]*types.Transaction } // newLookup returns a new lookup structure. func newLookup() *lookup { return &lookup{ - locals: make(map[common.Hash]*types.Transaction), - remotes: make(map[common.Hash]*types.Transaction), + txs: make(map[common.Hash]*types.Transaction), } } // Range calls f on each key and value present in the map. The callback passed // should return the indicator whether the iteration needs to be continued. // Callers need to specify which set (or both) to be iterated. -func (t *lookup) Range(f func(hash common.Hash, tx *types.Transaction, local bool) bool, local bool, remote bool) { +func (t *lookup) Range(f func(hash common.Hash, tx *types.Transaction) bool) { t.lock.RLock() defer t.lock.RUnlock() - if local { - for key, value := range t.locals { - if !f(key, value, true) { - return - } - } - } - if remote { - for key, value := range t.remotes { - if !f(key, value, false) { - return - } + for key, value := range t.txs { + if !f(key, value) { + return } } } @@ -1839,26 +1669,7 @@ func (t *lookup) Get(hash common.Hash) *types.Transaction { t.lock.RLock() defer t.lock.RUnlock() - if tx := t.locals[hash]; tx != nil { - return tx - } - return t.remotes[hash] -} - -// GetLocal returns a transaction if it exists in the lookup, or nil if not found. -func (t *lookup) GetLocal(hash common.Hash) *types.Transaction { - t.lock.RLock() - defer t.lock.RUnlock() - - return t.locals[hash] -} - -// GetRemote returns a transaction if it exists in the lookup, or nil if not found. -func (t *lookup) GetRemote(hash common.Hash) *types.Transaction { - t.lock.RLock() - defer t.lock.RUnlock() - - return t.remotes[hash] + return t.txs[hash] } // Count returns the current number of transactions in the lookup. @@ -1866,23 +1677,7 @@ func (t *lookup) Count() int { t.lock.RLock() defer t.lock.RUnlock() - return len(t.locals) + len(t.remotes) -} - -// LocalCount returns the current number of local transactions in the lookup. -func (t *lookup) LocalCount() int { - t.lock.RLock() - defer t.lock.RUnlock() - - return len(t.locals) -} - -// RemoteCount returns the current number of remote transactions in the lookup. -func (t *lookup) RemoteCount() int { - t.lock.RLock() - defer t.lock.RUnlock() - - return len(t.remotes) + return len(t.txs) } // Slots returns the current number of slots used in the lookup. @@ -1894,18 +1689,14 @@ func (t *lookup) Slots() int { } // Add adds a transaction to the lookup. -func (t *lookup) Add(tx *types.Transaction, local bool) { +func (t *lookup) Add(tx *types.Transaction) { t.lock.Lock() defer t.lock.Unlock() t.slots += numSlots(tx) slotsGauge.Update(int64(t.slots)) - if local { - t.locals[tx.Hash()] = tx - } else { - t.remotes[tx.Hash()] = tx - } + t.txs[tx.Hash()] = tx } // Remove removes a transaction from the lookup. @@ -1913,10 +1704,7 @@ func (t *lookup) Remove(hash common.Hash) { t.lock.Lock() defer t.lock.Unlock() - tx, ok := t.locals[hash] - if !ok { - tx, ok = t.remotes[hash] - } + tx, ok := t.txs[hash] if !ok { log.Error("No transaction found to be deleted", "hash", hash) return @@ -1924,36 +1712,18 @@ func (t *lookup) Remove(hash common.Hash) { t.slots -= numSlots(tx) slotsGauge.Update(int64(t.slots)) - delete(t.locals, hash) - delete(t.remotes, hash) + delete(t.txs, hash) } -// RemoteToLocals migrates the transactions belongs to the given locals to locals -// set. The assumption is held the locals set is thread-safe to be used. -func (t *lookup) RemoteToLocals(locals *accountSet) int { - t.lock.Lock() - defer t.lock.Unlock() - - var migrated int - for hash, tx := range t.remotes { - if locals.containsTx(tx) { - t.locals[hash] = tx - delete(t.remotes, hash) - migrated += 1 - } - } - return migrated -} - -// RemotesBelowTip finds all remote transactions below the given tip threshold. -func (t *lookup) RemotesBelowTip(threshold *big.Int) types.Transactions { +// TxsBelowTip finds all remote transactions below the given tip threshold. +func (t *lookup) TxsBelowTip(threshold *big.Int) types.Transactions { found := make(types.Transactions, 0, 128) - t.Range(func(hash common.Hash, tx *types.Transaction, local bool) bool { + t.Range(func(hash common.Hash, tx *types.Transaction) bool { if tx.GasTipCapIntCmp(threshold) < 0 { found = append(found, tx) } return true - }, false, true) // Only iterate remotes + }) return found } @@ -1982,24 +1752,13 @@ func (pool *LegacyPool) Clear() { // The transaction addition may attempt to reserve the sender addr which // can't happen until Clear releases the reservation lock. Clear cannot // acquire the subpool lock until the transaction addition is completed. - for _, tx := range pool.all.remotes { + for _, tx := range pool.all.txs { senderAddr, _ := types.Sender(pool.signer, tx) pool.reserve(senderAddr, false) } - for localSender := range pool.locals.accounts { - pool.reserve(localSender, false) - } - pool.all = newLookup() pool.priced = newPricedList(pool.all) pool.pending = make(map[common.Address]*list) pool.queue = make(map[common.Address]*list) pool.pendingNonces = newNoncer(pool.currentState) - - if !pool.config.NoLocals && pool.config.Journal != "" { - pool.journal = newTxJournal(pool.config.Journal) - if err := pool.journal.rotate(pool.local()); err != nil { - log.Warn("Failed to rotate transaction journal", "err", err) - } - } } diff --git a/core/txpool/legacypool/legacypool_test.go b/core/txpool/legacypool/legacypool_test.go index abbde8cae3..55699e93ee 100644 --- a/core/txpool/legacypool/legacypool_test.go +++ b/core/txpool/legacypool/legacypool_test.go @@ -23,7 +23,6 @@ import ( "fmt" "math/big" "math/rand" - "os" "sync" "sync/atomic" "testing" @@ -183,7 +182,7 @@ func validatePoolInternals(pool *LegacyPool) error { return fmt.Errorf("total transaction count %d != %d pending + %d queued", total, pending, queued) } pool.priced.Reheap() - priced, remote := pool.priced.urgent.Len()+pool.priced.floating.Len(), pool.all.RemoteCount() + priced, remote := pool.priced.urgent.Len()+pool.priced.floating.Len(), pool.all.Count() if priced != remote { return fmt.Errorf("total priced transaction count %d != %d", priced, remote) } @@ -350,9 +349,6 @@ func TestInvalidTransactions(t *testing.T) { if err, want := pool.addRemote(tx), txpool.ErrUnderpriced; !errors.Is(err, want) { t.Errorf("want %v have %v", want, err) } - if err := pool.addLocal(tx); err != nil { - t.Error("expected", nil, "got", err) - } } func TestQueue(t *testing.T) { @@ -366,7 +362,7 @@ func TestQueue(t *testing.T) { testAddBalance(pool, from, big.NewInt(1000)) <-pool.requestReset(nil, nil) - pool.enqueueTx(tx.Hash(), tx, false, true) + pool.enqueueTx(tx.Hash(), tx, true) <-pool.requestPromoteExecutables(newAccountSet(pool.signer, from)) if len(pool.pending) != 1 { t.Error("expected valid txs to be 1 is", len(pool.pending)) @@ -375,7 +371,7 @@ func TestQueue(t *testing.T) { tx = transaction(1, 100, key) from, _ = deriveSender(tx) testSetNonce(pool, from, 2) - pool.enqueueTx(tx.Hash(), tx, false, true) + pool.enqueueTx(tx.Hash(), tx, true) <-pool.requestPromoteExecutables(newAccountSet(pool.signer, from)) if _, ok := pool.pending[from].txs.items[tx.Nonce()]; ok { @@ -399,9 +395,9 @@ func TestQueue2(t *testing.T) { testAddBalance(pool, from, big.NewInt(1000)) pool.reset(nil, nil) - pool.enqueueTx(tx1.Hash(), tx1, false, true) - pool.enqueueTx(tx2.Hash(), tx2, false, true) - pool.enqueueTx(tx3.Hash(), tx3, false, true) + pool.enqueueTx(tx1.Hash(), tx1, true) + pool.enqueueTx(tx2.Hash(), tx2, true) + pool.enqueueTx(tx3.Hash(), tx3, true) pool.promoteExecutables([]common.Address{from}) if len(pool.pending) != 1 { @@ -476,14 +472,14 @@ func TestChainFork(t *testing.T) { resetState() tx := transaction(0, 100000, key) - if _, err := pool.add(tx, false); err != nil { + if _, err := pool.add(tx); err != nil { t.Error("didn't expect error", err) } pool.removeTx(tx.Hash(), true, true) // reset the pool's internal state resetState() - if _, err := pool.add(tx, false); err != nil { + if _, err := pool.add(tx); err != nil { t.Error("didn't expect error", err) } } @@ -510,10 +506,10 @@ func TestDoubleNonce(t *testing.T) { tx3, _ := types.SignTx(types.NewTransaction(0, common.Address{}, big.NewInt(100), 1000000, big.NewInt(1), nil), signer, key) // Add the first two transaction, ensure higher priced stays only - if replace, err := pool.add(tx1, false); err != nil || replace { + if replace, err := pool.add(tx1); err != nil || replace { t.Errorf("first transaction insert failed (%v) or reported replacement (%v)", err, replace) } - if replace, err := pool.add(tx2, false); err != nil || !replace { + if replace, err := pool.add(tx2); err != nil || !replace { t.Errorf("second transaction insert failed (%v) or not reported replacement (%v)", err, replace) } <-pool.requestPromoteExecutables(newAccountSet(signer, addr)) @@ -525,7 +521,7 @@ func TestDoubleNonce(t *testing.T) { } // Add the third transaction and ensure it's not saved (smaller price) - pool.add(tx3, false) + pool.add(tx3) <-pool.requestPromoteExecutables(newAccountSet(signer, addr)) if pool.pending[addr].Len() != 1 { t.Error("expected 1 pending transactions, got", pool.pending[addr].Len()) @@ -548,7 +544,7 @@ func TestMissingNonce(t *testing.T) { addr := crypto.PubkeyToAddress(key.PublicKey) testAddBalance(pool, addr, big.NewInt(100000000000000)) tx := transaction(1, 100000, key) - if _, err := pool.add(tx, false); err != nil { + if _, err := pool.add(tx); err != nil { t.Error("didn't expect error", err) } if len(pool.pending) != 0 { @@ -607,21 +603,21 @@ func TestDropping(t *testing.T) { tx11 = transaction(11, 200, key) tx12 = transaction(12, 300, key) ) - pool.all.Add(tx0, false) - pool.priced.Put(tx0, false) + pool.all.Add(tx0) + pool.priced.Put(tx0) pool.promoteTx(account, tx0.Hash(), tx0) - pool.all.Add(tx1, false) - pool.priced.Put(tx1, false) + pool.all.Add(tx1) + pool.priced.Put(tx1) pool.promoteTx(account, tx1.Hash(), tx1) - pool.all.Add(tx2, false) - pool.priced.Put(tx2, false) + pool.all.Add(tx2) + pool.priced.Put(tx2) pool.promoteTx(account, tx2.Hash(), tx2) - pool.enqueueTx(tx10.Hash(), tx10, false, true) - pool.enqueueTx(tx11.Hash(), tx11, false, true) - pool.enqueueTx(tx12.Hash(), tx12, false, true) + pool.enqueueTx(tx10.Hash(), tx10, true) + pool.enqueueTx(tx11.Hash(), tx11, true) + pool.enqueueTx(tx12.Hash(), tx12, true) // Check that pre and post validations leave the pool as is if pool.pending[account].Len() != 3 { @@ -899,13 +895,6 @@ func TestQueueAccountLimiting(t *testing.T) { // This logic should not hold for local transactions, unless the local tracking // mechanism is disabled. func TestQueueGlobalLimiting(t *testing.T) { - testQueueGlobalLimiting(t, false) -} -func TestQueueGlobalLimitingNoLocals(t *testing.T) { - testQueueGlobalLimiting(t, true) -} - -func testQueueGlobalLimiting(t *testing.T, nolocals bool) { t.Parallel() // Create the pool to test the limit enforcement with @@ -913,7 +902,7 @@ func testQueueGlobalLimiting(t *testing.T, nolocals bool) { blockchain := newTestBlockChain(params.TestChainConfig, 1000000, statedb, new(event.Feed)) config := testTxPoolConfig - config.NoLocals = nolocals + config.NoLocals = true config.GlobalQueue = config.AccountQueue*3 - 1 // reduce the queue limits to shorten test time (-1 to make it non divisible) pool := New(config, blockchain) @@ -926,7 +915,6 @@ func testQueueGlobalLimiting(t *testing.T, nolocals bool) { keys[i], _ = crypto.GenerateKey() testAddBalance(pool, crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000000)) } - local := keys[len(keys)-1] // Generate and queue a batch of transactions nonces := make(map[common.Address]uint64) @@ -952,51 +940,12 @@ func testQueueGlobalLimiting(t *testing.T, nolocals bool) { if queued > int(config.GlobalQueue) { t.Fatalf("total transactions overflow allowance: %d > %d", queued, config.GlobalQueue) } - // Generate a batch of transactions from the local account and import them - txs = txs[:0] - for i := uint64(0); i < 3*config.GlobalQueue; i++ { - txs = append(txs, transaction(i+1, 100000, local)) - } - pool.addLocals(txs) - - // If locals are disabled, the previous eviction algorithm should apply here too - if nolocals { - queued := 0 - for addr, list := range pool.queue { - if list.Len() > int(config.AccountQueue) { - t.Errorf("addr %x: queued accounts overflown allowance: %d > %d", addr, list.Len(), config.AccountQueue) - } - queued += list.Len() - } - if queued > int(config.GlobalQueue) { - t.Fatalf("total transactions overflow allowance: %d > %d", queued, config.GlobalQueue) - } - } else { - // Local exemptions are enabled, make sure the local account owned the queue - if len(pool.queue) != 1 { - t.Errorf("multiple accounts in queue: have %v, want %v", len(pool.queue), 1) - } - // Also ensure no local transactions are ever dropped, even if above global limits - if queued := pool.queue[crypto.PubkeyToAddress(local.PublicKey)].Len(); uint64(queued) != 3*config.GlobalQueue { - t.Fatalf("local account queued transaction count mismatch: have %v, want %v", queued, 3*config.GlobalQueue) - } - } } // Tests that if an account remains idle for a prolonged amount of time, any // non-executable transactions queued up are dropped to prevent wasting resources // on shuffling them around. -// -// This logic should not hold for local transactions, unless the local tracking -// mechanism is disabled. func TestQueueTimeLimiting(t *testing.T) { - testQueueTimeLimiting(t, false) -} -func TestQueueTimeLimitingNoLocals(t *testing.T) { - testQueueTimeLimiting(t, true) -} - -func testQueueTimeLimiting(t *testing.T, nolocals bool) { // Reduce the eviction interval to a testable amount defer func(old time.Duration) { evictionInterval = old }(evictionInterval) evictionInterval = time.Millisecond * 100 @@ -1007,23 +956,17 @@ func testQueueTimeLimiting(t *testing.T, nolocals bool) { config := testTxPoolConfig config.Lifetime = time.Second - config.NoLocals = nolocals pool := New(config, blockchain) pool.Init(config.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver()) defer pool.Close() - // Create two test accounts to ensure remotes expire but locals do not - local, _ := crypto.GenerateKey() + // Create a test account to ensure remotes expire remote, _ := crypto.GenerateKey() - testAddBalance(pool, crypto.PubkeyToAddress(local.PublicKey), big.NewInt(1000000000)) testAddBalance(pool, crypto.PubkeyToAddress(remote.PublicKey), big.NewInt(1000000000)) - // Add the two transactions and ensure they both are queued up - if err := pool.addLocal(pricedTransaction(1, 100000, big.NewInt(1), local)); err != nil { - t.Fatalf("failed to add local transaction: %v", err) - } + // Add the transaction and ensure it is queued up if err := pool.addRemote(pricedTransaction(1, 100000, big.NewInt(1), remote)); err != nil { t.Fatalf("failed to add remote transaction: %v", err) } @@ -1031,7 +974,7 @@ func testQueueTimeLimiting(t *testing.T, nolocals bool) { if pending != 0 { t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 0) } - if queued != 2 { + if queued != 1 { t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 2) } if err := validatePoolInternals(pool); err != nil { @@ -1046,7 +989,7 @@ func testQueueTimeLimiting(t *testing.T, nolocals bool) { if pending != 0 { t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 0) } - if queued != 2 { + if queued != 1 { t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 2) } if err := validatePoolInternals(pool); err != nil { @@ -1060,14 +1003,8 @@ func testQueueTimeLimiting(t *testing.T, nolocals bool) { if pending != 0 { t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 0) } - if nolocals { - if queued != 0 { - t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0) - } - } else { - if queued != 1 { - t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 1) - } + if queued != 0 { + t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0) } if err := validatePoolInternals(pool); err != nil { t.Fatalf("pool internal state corrupted: %v", err) @@ -1075,7 +1012,6 @@ func testQueueTimeLimiting(t *testing.T, nolocals bool) { // remove current transactions and increase nonce to prepare for a reset and cleanup statedb.SetNonce(crypto.PubkeyToAddress(remote.PublicKey), 2) - statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 2) <-pool.requestReset(nil, nil) // make sure queue, pending are cleared @@ -1091,18 +1027,12 @@ func testQueueTimeLimiting(t *testing.T, nolocals bool) { } // Queue gapped transactions - if err := pool.addLocal(pricedTransaction(4, 100000, big.NewInt(1), local)); err != nil { - t.Fatalf("failed to add remote transaction: %v", err) - } if err := pool.addRemoteSync(pricedTransaction(4, 100000, big.NewInt(1), remote)); err != nil { t.Fatalf("failed to add remote transaction: %v", err) } time.Sleep(5 * evictionInterval) // A half lifetime pass // Queue executable transactions, the life cycle should be restarted. - if err := pool.addLocal(pricedTransaction(2, 100000, big.NewInt(1), local)); err != nil { - t.Fatalf("failed to add remote transaction: %v", err) - } if err := pool.addRemoteSync(pricedTransaction(2, 100000, big.NewInt(1), remote)); err != nil { t.Fatalf("failed to add remote transaction: %v", err) } @@ -1110,11 +1040,11 @@ func testQueueTimeLimiting(t *testing.T, nolocals bool) { // All gapped transactions shouldn't be kicked out pending, queued = pool.Stats() - if pending != 2 { - t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 2) + if pending != 1 { + t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 1) } - if queued != 2 { - t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 2) + if queued != 1 { + t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 1) } if err := validatePoolInternals(pool); err != nil { t.Fatalf("pool internal state corrupted: %v", err) @@ -1123,17 +1053,11 @@ func testQueueTimeLimiting(t *testing.T, nolocals bool) { // The whole life time pass after last promotion, kick out stale transactions time.Sleep(2 * config.Lifetime) pending, queued = pool.Stats() - if pending != 2 { - t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 2) + if pending != 1 { + t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 1) } - if nolocals { - if queued != 0 { - t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0) - } - } else { - if queued != 1 { - t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 1) - } + if queued != 0 { + t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0) } if err := validatePoolInternals(pool); err != nil { t.Fatalf("pool internal state corrupted: %v", err) @@ -1363,8 +1287,6 @@ func TestPendingMinimumAllowance(t *testing.T) { // Tests that setting the transaction pool gas price to a higher value correctly // discards everything cheaper than that and moves any gapped transactions back // from the pending pool to the queue. -// -// Note, local transactions are never allowed to be dropped. func TestRepricing(t *testing.T) { t.Parallel() @@ -1382,7 +1304,7 @@ func TestRepricing(t *testing.T) { defer sub.Unsubscribe() // Create a number of test accounts and fund them - keys := make([]*ecdsa.PrivateKey, 4) + keys := make([]*ecdsa.PrivateKey, 3) for i := 0; i < len(keys); i++ { keys[i], _ = crypto.GenerateKey() testAddBalance(pool, crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000000)) @@ -1402,20 +1324,17 @@ func TestRepricing(t *testing.T) { txs = append(txs, pricedTransaction(2, 100000, big.NewInt(1), keys[2])) txs = append(txs, pricedTransaction(3, 100000, big.NewInt(2), keys[2])) - ltx := pricedTransaction(0, 100000, big.NewInt(1), keys[3]) - // Import the batch and that both pending and queued transactions match up pool.addRemotesSync(txs) - pool.addLocal(ltx) pending, queued := pool.Stats() - if pending != 7 { - t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 7) + if pending != 6 { + t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 6) } if queued != 3 { t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 3) } - if err := validateEvents(events, 7); err != nil { + if err := validateEvents(events, 6); err != nil { t.Fatalf("original event firing failed: %v", err) } if err := validatePoolInternals(pool); err != nil { @@ -1425,8 +1344,8 @@ func TestRepricing(t *testing.T) { pool.SetGasTip(big.NewInt(2)) pending, queued = pool.Stats() - if pending != 2 { - t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 2) + if pending != 1 { + t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 1) } if queued != 5 { t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 5) @@ -1453,21 +1372,7 @@ func TestRepricing(t *testing.T) { if err := validatePoolInternals(pool); err != nil { t.Fatalf("pool internal state corrupted: %v", err) } - // However we can add local underpriced transactions - tx := pricedTransaction(1, 100000, big.NewInt(1), keys[3]) - if err := pool.addLocal(tx); err != nil { - t.Fatalf("failed to add underpriced local transaction: %v", err) - } - if pending, _ = pool.Stats(); pending != 3 { - t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 3) - } - if err := validateEvents(events, 1); err != nil { - t.Fatalf("post-reprice local event firing failed: %v", err) - } - if err := validatePoolInternals(pool); err != nil { - t.Fatalf("pool internal state corrupted: %v", err) - } - // And we can fill gaps with properly priced transactions + // we can fill gaps with properly priced transactions if err := pool.addRemote(pricedTransaction(1, 100000, big.NewInt(2), keys[0])); err != nil { t.Fatalf("failed to add pending transaction: %v", err) } @@ -1504,29 +1409,16 @@ func TestMinGasPriceEnforced(t *testing.T) { tx := pricedTransaction(0, 100000, big.NewInt(2), key) pool.SetGasTip(big.NewInt(tx.GasPrice().Int64() + 1)) - if err := pool.addLocal(tx); !errors.Is(err, txpool.ErrUnderpriced) { - t.Fatalf("Min tip not enforced") - } - - if err := pool.Add([]*types.Transaction{tx}, true, false)[0]; !errors.Is(err, txpool.ErrUnderpriced) { + if err := pool.Add([]*types.Transaction{tx}, true)[0]; !errors.Is(err, txpool.ErrUnderpriced) { t.Fatalf("Min tip not enforced") } tx = dynamicFeeTx(0, 100000, big.NewInt(3), big.NewInt(2), key) pool.SetGasTip(big.NewInt(tx.GasTipCap().Int64() + 1)) - if err := pool.addLocal(tx); !errors.Is(err, txpool.ErrUnderpriced) { + if err := pool.Add([]*types.Transaction{tx}, true)[0]; !errors.Is(err, txpool.ErrUnderpriced) { t.Fatalf("Min tip not enforced") } - - if err := pool.Add([]*types.Transaction{tx}, true, false)[0]; !errors.Is(err, txpool.ErrUnderpriced) { - t.Fatalf("Min tip not enforced") - } - // Make sure the tx is accepted if locals are enabled - pool.config.NoLocals = false - if err := pool.Add([]*types.Transaction{tx}, true, false)[0]; err != nil { - t.Fatalf("Min tip enforced with locals enabled, error: %v", err) - } } // Tests that setting the transaction pool gas price to a higher value correctly @@ -1567,20 +1459,17 @@ func TestRepricingDynamicFee(t *testing.T) { txs = append(txs, dynamicFeeTx(2, 100000, big.NewInt(1), big.NewInt(1), keys[2])) txs = append(txs, dynamicFeeTx(3, 100000, big.NewInt(2), big.NewInt(2), keys[2])) - ltx := dynamicFeeTx(0, 100000, big.NewInt(2), big.NewInt(1), keys[3]) - // Import the batch and that both pending and queued transactions match up pool.addRemotesSync(txs) - pool.addLocal(ltx) pending, queued := pool.Stats() - if pending != 7 { - t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 7) + if pending != 6 { + t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 6) } if queued != 3 { t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 3) } - if err := validateEvents(events, 7); err != nil { + if err := validateEvents(events, 6); err != nil { t.Fatalf("original event firing failed: %v", err) } if err := validatePoolInternals(pool); err != nil { @@ -1590,8 +1479,8 @@ func TestRepricingDynamicFee(t *testing.T) { pool.SetGasTip(big.NewInt(2)) pending, queued = pool.Stats() - if pending != 2 { - t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 2) + if pending != 1 { + t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 1) } if queued != 5 { t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 5) @@ -1621,20 +1510,7 @@ func TestRepricingDynamicFee(t *testing.T) { if err := validatePoolInternals(pool); err != nil { t.Fatalf("pool internal state corrupted: %v", err) } - // However we can add local underpriced transactions - tx = dynamicFeeTx(1, 100000, big.NewInt(1), big.NewInt(1), keys[3]) - if err := pool.addLocal(tx); err != nil { - t.Fatalf("failed to add underpriced local transaction: %v", err) - } - if pending, _ = pool.Stats(); pending != 3 { - t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 3) - } - if err := validateEvents(events, 1); err != nil { - t.Fatalf("post-reprice local event firing failed: %v", err) - } - if err := validatePoolInternals(pool); err != nil { - t.Fatalf("pool internal state corrupted: %v", err) - } + // And we can fill gaps with properly priced transactions tx = pricedTransaction(1, 100000, big.NewInt(2), keys[0]) if err := pool.addRemote(tx); err != nil { @@ -1656,77 +1532,6 @@ func TestRepricingDynamicFee(t *testing.T) { } } -// Tests that setting the transaction pool gas price to a higher value does not -// remove local transactions (legacy & dynamic fee). -func TestRepricingKeepsLocals(t *testing.T) { - t.Parallel() - - // Create the pool to test the pricing enforcement with - statedb, _ := state.New(types.EmptyRootHash, state.NewDatabaseForTesting()) - blockchain := newTestBlockChain(eip1559Config, 1000000, statedb, new(event.Feed)) - - pool := New(testTxPoolConfig, blockchain) - pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver()) - defer pool.Close() - - // Create a number of test accounts and fund them - keys := make([]*ecdsa.PrivateKey, 3) - for i := 0; i < len(keys); i++ { - keys[i], _ = crypto.GenerateKey() - testAddBalance(pool, crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(100000*1000000)) - } - // Create transaction (both pending and queued) with a linearly growing gasprice - for i := uint64(0); i < 500; i++ { - // Add pending transaction. - pendingTx := pricedTransaction(i, 100000, big.NewInt(int64(i)), keys[2]) - if err := pool.addLocal(pendingTx); err != nil { - t.Fatal(err) - } - // Add queued transaction. - queuedTx := pricedTransaction(i+501, 100000, big.NewInt(int64(i)), keys[2]) - if err := pool.addLocal(queuedTx); err != nil { - t.Fatal(err) - } - - // Add pending dynamic fee transaction. - pendingTx = dynamicFeeTx(i, 100000, big.NewInt(int64(i)+1), big.NewInt(int64(i)), keys[1]) - if err := pool.addLocal(pendingTx); err != nil { - t.Fatal(err) - } - // Add queued dynamic fee transaction. - queuedTx = dynamicFeeTx(i+501, 100000, big.NewInt(int64(i)+1), big.NewInt(int64(i)), keys[1]) - if err := pool.addLocal(queuedTx); err != nil { - t.Fatal(err) - } - } - pending, queued := pool.Stats() - expPending, expQueued := 1000, 1000 - validate := func() { - pending, queued = pool.Stats() - if pending != expPending { - t.Fatalf("pending transactions mismatched: have %d, want %d", pending, expPending) - } - if queued != expQueued { - t.Fatalf("queued transactions mismatched: have %d, want %d", queued, expQueued) - } - - if err := validatePoolInternals(pool); err != nil { - t.Fatalf("pool internal state corrupted: %v", err) - } - } - validate() - - // Reprice the pool and check that nothing is dropped - pool.SetGasTip(big.NewInt(2)) - validate() - - pool.SetGasTip(big.NewInt(2)) - pool.SetGasTip(big.NewInt(4)) - pool.SetGasTip(big.NewInt(8)) - pool.SetGasTip(big.NewInt(100)) - validate() -} - // Tests that when the pool reaches its global transaction limit, underpriced // transactions are gradually shifted out for more expensive ones and any gapped // pending transactions are moved into the queue. @@ -1756,21 +1561,18 @@ func TestUnderpricing(t *testing.T) { keys := make([]*ecdsa.PrivateKey, 5) for i := 0; i < len(keys); i++ { keys[i], _ = crypto.GenerateKey() - testAddBalance(pool, crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000000)) + testAddBalance(pool, crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(10000000)) } // Generate and queue a batch of transactions, both pending and queued txs := types.Transactions{} - txs = append(txs, pricedTransaction(0, 100000, big.NewInt(1), keys[0])) - txs = append(txs, pricedTransaction(1, 100000, big.NewInt(2), keys[0])) - - txs = append(txs, pricedTransaction(1, 100000, big.NewInt(1), keys[1])) - - ltx := pricedTransaction(0, 100000, big.NewInt(1), keys[2]) + txs = append(txs, pricedTransaction(0, 100000, big.NewInt(1), keys[0])) // pending + txs = append(txs, pricedTransaction(1, 100000, big.NewInt(2), keys[0])) // pending + txs = append(txs, pricedTransaction(0, 100000, big.NewInt(1), keys[2])) // pending + txs = append(txs, pricedTransaction(1, 100000, big.NewInt(1), keys[1])) // queued // Import the batch and that both pending and queued transactions match up - pool.addRemotes(txs) - pool.addLocal(ltx) + pool.addRemotesSync(txs) pending, queued := pool.Stats() if pending != 3 { @@ -1790,7 +1592,7 @@ func TestUnderpricing(t *testing.T) { t.Fatalf("adding underpriced pending transaction error mismatch: have %v, want %v", err, txpool.ErrUnderpriced) } // Replace a future transaction with a future transaction - if err := pool.addRemoteSync(pricedTransaction(1, 100000, big.NewInt(2), keys[1])); err != nil { // +K1:1 => -K1:1 => Pend K0:0, K0:1, K2:0; Que K1:1 + if err := pool.addRemoteSync(pricedTransaction(1, 100000, big.NewInt(5), keys[1])); err != nil { // +K1:1 => -K1:1 => Pend K0:0, K0:1, K2:0; Que K1:1 t.Fatalf("failed to add well priced transaction: %v", err) } // Ensure that adding high priced transactions drops cheap ones, but not own @@ -1800,48 +1602,26 @@ func TestUnderpricing(t *testing.T) { if err := pool.addRemoteSync(pricedTransaction(2, 100000, big.NewInt(4), keys[1])); err != nil { // +K1:2 => -K0:0 => Pend K1:0, K2:0; Que K0:1 K1:2 t.Fatalf("failed to add well priced transaction: %v", err) } - if err := pool.addRemote(pricedTransaction(3, 100000, big.NewInt(5), keys[1])); err != nil { // +K1:3 => -K0:1 => Pend K1:0, K2:0; Que K1:2 K1:3 + if err := pool.addRemoteSync(pricedTransaction(3, 100000, big.NewInt(5), keys[1])); err != nil { // +K1:3 => -K0:1 => Pend K1:0, K2:0; Que K1:2 K1:3 t.Fatalf("failed to add well priced transaction: %v", err) } // Ensure that replacing a pending transaction with a future transaction fails - if err := pool.addRemote(pricedTransaction(5, 100000, big.NewInt(6), keys[1])); err != txpool.ErrFutureReplacePending { + if err := pool.addRemoteSync(pricedTransaction(5, 100000, big.NewInt(6), keys[1])); err != txpool.ErrFutureReplacePending { t.Fatalf("adding future replace transaction error mismatch: have %v, want %v", err, txpool.ErrFutureReplacePending) } pending, queued = pool.Stats() - if pending != 2 { - t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 2) + if pending != 4 { + t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 4) } - if queued != 2 { - t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 2) + if queued != 0 { + t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0) } - if err := validateEvents(events, 2); err != nil { + if err := validateEvents(events, 4); err != nil { t.Fatalf("additional event firing failed: %v", err) } if err := validatePoolInternals(pool); err != nil { t.Fatalf("pool internal state corrupted: %v", err) } - // Ensure that adding local transactions can push out even higher priced ones - ltx = pricedTransaction(1, 100000, big.NewInt(0), keys[2]) - if err := pool.addLocal(ltx); err != nil { - t.Fatalf("failed to append underpriced local transaction: %v", err) - } - ltx = pricedTransaction(0, 100000, big.NewInt(0), keys[3]) - if err := pool.addLocal(ltx); err != nil { - t.Fatalf("failed to add new underpriced local transaction: %v", err) - } - pending, queued = pool.Stats() - if pending != 3 { - t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 3) - } - if queued != 1 { - t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 1) - } - if err := validateEvents(events, 2); err != nil { - t.Fatalf("local event firing failed: %v", err) - } - if err := validatePoolInternals(pool); err != nil { - t.Fatalf("pool internal state corrupted: %v", err) - } } // Tests that more expensive transactions push out cheap ones from the pool, but @@ -1915,8 +1695,6 @@ func TestStableUnderpricing(t *testing.T) { // Tests that when the pool reaches its global transaction limit, underpriced // transactions (legacy & dynamic fee) are gradually shifted out for more // expensive ones and any gapped pending transactions are moved into the queue. -// -// Note, local transactions are never allowed to be dropped. func TestUnderpricingDynamicFee(t *testing.T) { t.Parallel() @@ -1941,15 +1719,13 @@ func TestUnderpricingDynamicFee(t *testing.T) { // Generate and queue a batch of transactions, both pending and queued txs := types.Transactions{} - txs = append(txs, dynamicFeeTx(0, 100000, big.NewInt(3), big.NewInt(2), keys[0])) - txs = append(txs, pricedTransaction(1, 100000, big.NewInt(2), keys[0])) - txs = append(txs, dynamicFeeTx(1, 100000, big.NewInt(2), big.NewInt(1), keys[1])) + txs = append(txs, dynamicFeeTx(0, 100000, big.NewInt(3), big.NewInt(2), keys[0])) // pending + txs = append(txs, pricedTransaction(1, 100000, big.NewInt(2), keys[0])) // pending + txs = append(txs, dynamicFeeTx(1, 100000, big.NewInt(2), big.NewInt(1), keys[1])) // queued + txs = append(txs, dynamicFeeTx(0, 100000, big.NewInt(2), big.NewInt(1), keys[2])) // pending - ltx := dynamicFeeTx(0, 100000, big.NewInt(2), big.NewInt(1), keys[2]) - - // Import the batch and that both pending and queued transactions match up - pool.addRemotes(txs) // Pend K0:0, K0:1; Que K1:1 - pool.addLocal(ltx) // +K2:0 => Pend K0:0, K0:1, K2:0; Que K1:1 + // Import the batch and check that both pending and queued transactions match up + pool.addRemotesSync(txs) // Pend K0:0, K0:1; Que K1:1 pending, queued := pool.Stats() if pending != 3 { @@ -1967,13 +1743,13 @@ func TestUnderpricingDynamicFee(t *testing.T) { // Ensure that adding an underpriced transaction fails tx := dynamicFeeTx(0, 100000, big.NewInt(2), big.NewInt(1), keys[1]) - if err := pool.addRemote(tx); !errors.Is(err, txpool.ErrUnderpriced) { // Pend K0:0, K0:1, K2:0; Que K1:1 + if err := pool.addRemoteSync(tx); !errors.Is(err, txpool.ErrUnderpriced) { // Pend K0:0, K0:1, K2:0; Que K1:1 t.Fatalf("adding underpriced pending transaction error mismatch: have %v, want %v", err, txpool.ErrUnderpriced) } // Ensure that adding high priced transactions drops cheap ones, but not own tx = pricedTransaction(0, 100000, big.NewInt(2), keys[1]) - if err := pool.addRemote(tx); err != nil { // +K1:0, -K1:1 => Pend K0:0, K0:1, K1:0, K2:0; Que - + if err := pool.addRemoteSync(tx); err != nil { // +K1:0, -K1:1 => Pend K0:0, K0:1, K1:0, K2:0; Que - t.Fatalf("failed to add well priced transaction: %v", err) } @@ -1986,40 +1762,18 @@ func TestUnderpricingDynamicFee(t *testing.T) { t.Fatalf("failed to add well priced transaction: %v", err) } pending, queued = pool.Stats() - if pending != 2 { - t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 2) + if pending != 4 { + t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 4) } - if queued != 2 { - t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 2) + if queued != 0 { + t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0) } - if err := validateEvents(events, 2); err != nil { + if err := validateEvents(events, 3); err != nil { t.Fatalf("additional event firing failed: %v", err) } if err := validatePoolInternals(pool); err != nil { t.Fatalf("pool internal state corrupted: %v", err) } - // Ensure that adding local transactions can push out even higher priced ones - ltx = dynamicFeeTx(1, 100000, big.NewInt(0), big.NewInt(0), keys[2]) - if err := pool.addLocal(ltx); err != nil { - t.Fatalf("failed to append underpriced local transaction: %v", err) - } - ltx = dynamicFeeTx(0, 100000, big.NewInt(0), big.NewInt(0), keys[3]) - if err := pool.addLocal(ltx); err != nil { - t.Fatalf("failed to add new underpriced local transaction: %v", err) - } - pending, queued = pool.Stats() - if pending != 3 { - t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 3) - } - if queued != 1 { - t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 1) - } - if err := validateEvents(events, 2); err != nil { - t.Fatalf("local event firing failed: %v", err) - } - if err := validatePoolInternals(pool); err != nil { - t.Fatalf("pool internal state corrupted: %v", err) - } } // Tests whether highest fee cap transaction is retained after a batch of high effective @@ -2039,7 +1793,7 @@ func TestDualHeapEviction(t *testing.T) { ) check := func(tx *types.Transaction, name string) { - if pool.all.GetRemote(tx.Hash()) == nil { + if pool.all.Get(tx.Hash()) == nil { t.Fatalf("highest %s transaction evicted from the pool", name) } } @@ -2336,122 +2090,6 @@ func TestReplacementDynamicFee(t *testing.T) { } } -// Tests that local transactions are journaled to disk, but remote transactions -// get discarded between restarts. -func TestJournaling(t *testing.T) { testJournaling(t, false) } -func TestJournalingNoLocals(t *testing.T) { testJournaling(t, true) } - -func testJournaling(t *testing.T, nolocals bool) { - t.Parallel() - - // Create a temporary file for the journal - file, err := os.CreateTemp("", "") - if err != nil { - t.Fatalf("failed to create temporary journal: %v", err) - } - journal := file.Name() - defer os.Remove(journal) - - // Clean up the temporary file, we only need the path for now - file.Close() - os.Remove(journal) - - // Create the original pool to inject transaction into the journal - statedb, _ := state.New(types.EmptyRootHash, state.NewDatabaseForTesting()) - blockchain := newTestBlockChain(params.TestChainConfig, 1000000, statedb, new(event.Feed)) - - config := testTxPoolConfig - config.NoLocals = nolocals - config.Journal = journal - config.Rejournal = time.Second - - pool := New(config, blockchain) - pool.Init(config.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver()) - - // Create two test accounts to ensure remotes expire but locals do not - local, _ := crypto.GenerateKey() - remote, _ := crypto.GenerateKey() - - testAddBalance(pool, crypto.PubkeyToAddress(local.PublicKey), big.NewInt(1000000000)) - testAddBalance(pool, crypto.PubkeyToAddress(remote.PublicKey), big.NewInt(1000000000)) - - // Add three local and a remote transactions and ensure they are queued up - if err := pool.addLocal(pricedTransaction(0, 100000, big.NewInt(1), local)); err != nil { - t.Fatalf("failed to add local transaction: %v", err) - } - if err := pool.addLocal(pricedTransaction(1, 100000, big.NewInt(1), local)); err != nil { - t.Fatalf("failed to add local transaction: %v", err) - } - if err := pool.addLocal(pricedTransaction(2, 100000, big.NewInt(1), local)); err != nil { - t.Fatalf("failed to add local transaction: %v", err) - } - if err := pool.addRemoteSync(pricedTransaction(0, 100000, big.NewInt(1), remote)); err != nil { - t.Fatalf("failed to add remote transaction: %v", err) - } - pending, queued := pool.Stats() - if pending != 4 { - t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 4) - } - if queued != 0 { - t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0) - } - if err := validatePoolInternals(pool); err != nil { - t.Fatalf("pool internal state corrupted: %v", err) - } - // Terminate the old pool, bump the local nonce, create a new pool and ensure relevant transaction survive - pool.Close() - statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 1) - blockchain = newTestBlockChain(params.TestChainConfig, 1000000, statedb, new(event.Feed)) - - pool = New(config, blockchain) - pool.Init(config.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver()) - - pending, queued = pool.Stats() - if queued != 0 { - t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0) - } - if nolocals { - if pending != 0 { - t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 0) - } - } else { - if pending != 2 { - t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 2) - } - } - if err := validatePoolInternals(pool); err != nil { - t.Fatalf("pool internal state corrupted: %v", err) - } - // Bump the nonce temporarily and ensure the newly invalidated transaction is removed - statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 2) - <-pool.requestReset(nil, nil) - time.Sleep(2 * config.Rejournal) - pool.Close() - - statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 1) - blockchain = newTestBlockChain(params.TestChainConfig, 1000000, statedb, new(event.Feed)) - pool = New(config, blockchain) - pool.Init(config.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver()) - - pending, queued = pool.Stats() - if pending != 0 { - t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 0) - } - if nolocals { - if queued != 0 { - t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0) - } - } else { - if queued != 1 { - t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 1) - } - } - if err := validatePoolInternals(pool); err != nil { - t.Fatalf("pool internal state corrupted: %v", err) - } - pool.Close() -} - // TestStatusCheck tests that the pool can correctly retrieve the // pending status of individual transactions. func TestStatusCheck(t *testing.T) { @@ -2566,7 +2204,7 @@ func benchmarkFuturePromotion(b *testing.B, size int) { for i := 0; i < size; i++ { tx := transaction(uint64(1+i), 100000, key) - pool.enqueueTx(tx.Hash(), tx, false, true) + pool.enqueueTx(tx.Hash(), tx, true) } // Benchmark the speed of pool validation b.ResetTimer() @@ -2576,15 +2214,11 @@ func benchmarkFuturePromotion(b *testing.B, size int) { } // Benchmarks the speed of batched transaction insertion. -func BenchmarkBatchInsert100(b *testing.B) { benchmarkBatchInsert(b, 100, false) } -func BenchmarkBatchInsert1000(b *testing.B) { benchmarkBatchInsert(b, 1000, false) } -func BenchmarkBatchInsert10000(b *testing.B) { benchmarkBatchInsert(b, 10000, false) } +func BenchmarkBatchInsert100(b *testing.B) { benchmarkBatchInsert(b, 100) } +func BenchmarkBatchInsert1000(b *testing.B) { benchmarkBatchInsert(b, 1000) } +func BenchmarkBatchInsert10000(b *testing.B) { benchmarkBatchInsert(b, 10000) } -func BenchmarkBatchLocalInsert100(b *testing.B) { benchmarkBatchInsert(b, 100, true) } -func BenchmarkBatchLocalInsert1000(b *testing.B) { benchmarkBatchInsert(b, 1000, true) } -func BenchmarkBatchLocalInsert10000(b *testing.B) { benchmarkBatchInsert(b, 10000, true) } - -func benchmarkBatchInsert(b *testing.B, size int, local bool) { +func benchmarkBatchInsert(b *testing.B, size int) { // Generate a batch of transactions to enqueue into the pool pool, key := setupPool() defer pool.Close() @@ -2602,46 +2236,7 @@ func benchmarkBatchInsert(b *testing.B, size int, local bool) { // Benchmark importing the transactions into the queue b.ResetTimer() for _, batch := range batches { - if local { - pool.addLocals(batch) - } else { - pool.addRemotes(batch) - } - } -} - -func BenchmarkInsertRemoteWithAllLocals(b *testing.B) { - // Allocate keys for testing - key, _ := crypto.GenerateKey() - account := crypto.PubkeyToAddress(key.PublicKey) - - remoteKey, _ := crypto.GenerateKey() - remoteAddr := crypto.PubkeyToAddress(remoteKey.PublicKey) - - locals := make([]*types.Transaction, 4096+1024) // Occupy all slots - for i := 0; i < len(locals); i++ { - locals[i] = transaction(uint64(i), 100000, key) - } - remotes := make([]*types.Transaction, 1000) - for i := 0; i < len(remotes); i++ { - remotes[i] = pricedTransaction(uint64(i), 100000, big.NewInt(2), remoteKey) // Higher gasprice - } - // Benchmark importing the transactions into the queue - b.ResetTimer() - for i := 0; i < b.N; i++ { - b.StopTimer() - pool, _ := setupPool() - testAddBalance(pool, account, big.NewInt(100000000)) - for _, local := range locals { - pool.addLocal(local) - } - b.StartTimer() - // Assign a high enough balance for testing - testAddBalance(pool, remoteAddr, big.NewInt(100000000)) - for i := 0; i < len(remotes); i++ { - pool.addRemotes([]*types.Transaction{remotes[i]}) - } - pool.Close() + pool.addRemotes(batch) } } diff --git a/core/txpool/legacypool/list.go b/core/txpool/legacypool/list.go index b749db44d4..736c28ec4a 100644 --- a/core/txpool/legacypool/list.go +++ b/core/txpool/legacypool/list.go @@ -52,31 +52,31 @@ func (h *nonceHeap) Pop() interface{} { return x } -// sortedMap is a nonce->transaction hash map with a heap based index to allow +// SortedMap is a nonce->transaction hash map with a heap based index to allow // iterating over the contents in a nonce-incrementing way. -type sortedMap struct { +type SortedMap struct { items map[uint64]*types.Transaction // Hash map storing the transaction data index *nonceHeap // Heap of nonces of all the stored transactions (non-strict mode) cache types.Transactions // Cache of the transactions already sorted cacheMu sync.Mutex // Mutex covering the cache } -// newSortedMap creates a new nonce-sorted transaction map. -func newSortedMap() *sortedMap { - return &sortedMap{ +// NewSortedMap creates a new nonce-sorted transaction map. +func NewSortedMap() *SortedMap { + return &SortedMap{ items: make(map[uint64]*types.Transaction), index: new(nonceHeap), } } // Get retrieves the current transactions associated with the given nonce. -func (m *sortedMap) Get(nonce uint64) *types.Transaction { +func (m *SortedMap) Get(nonce uint64) *types.Transaction { return m.items[nonce] } // Put inserts a new transaction into the map, also updating the map's nonce // index. If a transaction already exists with the same nonce, it's overwritten. -func (m *sortedMap) Put(tx *types.Transaction) { +func (m *SortedMap) Put(tx *types.Transaction) { nonce := tx.Nonce() if m.items[nonce] == nil { heap.Push(m.index, nonce) @@ -89,7 +89,7 @@ func (m *sortedMap) Put(tx *types.Transaction) { // Forward removes all transactions from the map with a nonce lower than the // provided threshold. Every removed transaction is returned for any post-removal // maintenance. -func (m *sortedMap) Forward(threshold uint64) types.Transactions { +func (m *SortedMap) Forward(threshold uint64) types.Transactions { var removed types.Transactions // Pop off heap items until the threshold is reached @@ -112,7 +112,7 @@ func (m *sortedMap) Forward(threshold uint64) types.Transactions { // Filter, as opposed to 'filter', re-initialises the heap after the operation is done. // If you want to do several consecutive filterings, it's therefore better to first // do a .filter(func1) followed by .Filter(func2) or reheap() -func (m *sortedMap) Filter(filter func(*types.Transaction) bool) types.Transactions { +func (m *SortedMap) Filter(filter func(*types.Transaction) bool) types.Transactions { removed := m.filter(filter) // If transactions were removed, the heap and cache are ruined if len(removed) > 0 { @@ -121,7 +121,7 @@ func (m *sortedMap) Filter(filter func(*types.Transaction) bool) types.Transacti return removed } -func (m *sortedMap) reheap() { +func (m *SortedMap) reheap() { *m.index = make([]uint64, 0, len(m.items)) for nonce := range m.items { *m.index = append(*m.index, nonce) @@ -134,7 +134,7 @@ func (m *sortedMap) reheap() { // filter is identical to Filter, but **does not** regenerate the heap. This method // should only be used if followed immediately by a call to Filter or reheap() -func (m *sortedMap) filter(filter func(*types.Transaction) bool) types.Transactions { +func (m *SortedMap) filter(filter func(*types.Transaction) bool) types.Transactions { var removed types.Transactions // Collect all the transactions to filter out @@ -154,7 +154,7 @@ func (m *sortedMap) filter(filter func(*types.Transaction) bool) types.Transacti // Cap places a hard limit on the number of items, returning all transactions // exceeding that limit. -func (m *sortedMap) Cap(threshold int) types.Transactions { +func (m *SortedMap) Cap(threshold int) types.Transactions { // Short circuit if the number of items is under the limit if len(m.items) <= threshold { return nil @@ -181,7 +181,7 @@ func (m *sortedMap) Cap(threshold int) types.Transactions { // Remove deletes a transaction from the maintained map, returning whether the // transaction was found. -func (m *sortedMap) Remove(nonce uint64) bool { +func (m *SortedMap) Remove(nonce uint64) bool { // Short circuit if no transaction is present _, ok := m.items[nonce] if !ok { @@ -209,7 +209,7 @@ func (m *sortedMap) Remove(nonce uint64) bool { // Note, all transactions with nonces lower than start will also be returned to // prevent getting into an invalid state. This is not something that should ever // happen but better to be self correcting than failing! -func (m *sortedMap) Ready(start uint64) types.Transactions { +func (m *SortedMap) Ready(start uint64) types.Transactions { // Short circuit if no transactions are available if m.index.Len() == 0 || (*m.index)[0] > start { return nil @@ -229,11 +229,11 @@ func (m *sortedMap) Ready(start uint64) types.Transactions { } // Len returns the length of the transaction map. -func (m *sortedMap) Len() int { +func (m *SortedMap) Len() int { return len(m.items) } -func (m *sortedMap) flatten() types.Transactions { +func (m *SortedMap) flatten() types.Transactions { m.cacheMu.Lock() defer m.cacheMu.Unlock() // If the sorting was not cached yet, create and cache it @@ -250,7 +250,7 @@ func (m *sortedMap) flatten() types.Transactions { // Flatten creates a nonce-sorted slice of transactions based on the loosely // sorted internal representation. The result of the sorting is cached in case // it's requested again before any modifications are made to the contents. -func (m *sortedMap) Flatten() types.Transactions { +func (m *SortedMap) Flatten() types.Transactions { cache := m.flatten() // Copy the cache to prevent accidental modification txs := make(types.Transactions, len(cache)) @@ -260,7 +260,7 @@ func (m *sortedMap) Flatten() types.Transactions { // LastElement returns the last element of a flattened list, thus, the // transaction with the highest nonce -func (m *sortedMap) LastElement() *types.Transaction { +func (m *SortedMap) LastElement() *types.Transaction { cache := m.flatten() return cache[len(cache)-1] } @@ -271,7 +271,7 @@ func (m *sortedMap) LastElement() *types.Transaction { // executable/future queue, with minor behavioral changes. type list struct { strict bool // Whether nonces are strictly continuous or not - txs *sortedMap // Heap indexed sorted hash map of the transactions + txs *SortedMap // Heap indexed sorted hash map of the transactions costcap *uint256.Int // Price of the highest costing transaction (reset only if exceeds balance) gascap uint64 // Gas limit of the highest spending transaction (reset only if exceeds block limit) @@ -283,7 +283,7 @@ type list struct { func newList(strict bool) *list { return &list{ strict: strict, - txs: newSortedMap(), + txs: NewSortedMap(), costcap: new(uint256.Int), totalcost: new(uint256.Int), } @@ -556,10 +556,7 @@ func newPricedList(all *lookup) *pricedList { } // Put inserts a new transaction into the heap. -func (l *pricedList) Put(tx *types.Transaction, local bool) { - if local { - return - } +func (l *pricedList) Put(tx *types.Transaction) { // Insert every new transaction to the urgent heap first; Discard will balance the heaps heap.Push(&l.urgent, tx) } @@ -593,7 +590,7 @@ func (l *pricedList) underpricedFor(h *priceHeap, tx *types.Transaction) bool { // Discard stale price points if found at the heap start for len(h.list) > 0 { head := h.list[0] - if l.all.GetRemote(head.Hash()) == nil { // Removed or migrated + if l.all.Get(head.Hash()) == nil { // Removed or migrated l.stales.Add(-1) heap.Pop(h) continue @@ -612,15 +609,13 @@ func (l *pricedList) underpricedFor(h *priceHeap, tx *types.Transaction) bool { // Discard finds a number of most underpriced transactions, removes them from the // priced list and returns them for further removal from the entire pool. // If noPending is set to true, we will only consider the floating list -// -// Note local transaction won't be considered for eviction. -func (l *pricedList) Discard(slots int, force bool) (types.Transactions, bool) { +func (l *pricedList) Discard(slots int) (types.Transactions, bool) { drop := make(types.Transactions, 0, slots) // Remote underpriced transactions to drop for slots > 0 { if len(l.urgent.list)*floatingRatio > len(l.floating.list)*urgentRatio { // Discard stale transactions if found during cleanup tx := heap.Pop(&l.urgent).(*types.Transaction) - if l.all.GetRemote(tx.Hash()) == nil { // Removed or migrated + if l.all.Get(tx.Hash()) == nil { // Removed or migrated l.stales.Add(-1) continue } @@ -633,7 +628,7 @@ func (l *pricedList) Discard(slots int, force bool) (types.Transactions, bool) { } // Discard stale transactions if found during cleanup tx := heap.Pop(&l.floating).(*types.Transaction) - if l.all.GetRemote(tx.Hash()) == nil { // Removed or migrated + if l.all.Get(tx.Hash()) == nil { // Removed or migrated l.stales.Add(-1) continue } @@ -643,7 +638,7 @@ func (l *pricedList) Discard(slots int, force bool) (types.Transactions, bool) { } } // If we still can't make enough room for the new transaction - if slots > 0 && !force { + if slots > 0 { for _, tx := range drop { heap.Push(&l.urgent, tx) } @@ -658,11 +653,11 @@ func (l *pricedList) Reheap() { defer l.reheapMu.Unlock() start := time.Now() l.stales.Store(0) - l.urgent.list = make([]*types.Transaction, 0, l.all.RemoteCount()) - l.all.Range(func(hash common.Hash, tx *types.Transaction, local bool) bool { + l.urgent.list = make([]*types.Transaction, 0, l.all.Count()) + l.all.Range(func(hash common.Hash, tx *types.Transaction) bool { l.urgent.list = append(l.urgent.list, tx) return true - }, false, true) // Only iterate remotes + }) heap.Init(&l.urgent) // balance out the two heaps by moving the worse half of transactions into the diff --git a/core/txpool/legacypool/journal.go b/core/txpool/locals/journal.go similarity index 99% rename from core/txpool/legacypool/journal.go rename to core/txpool/locals/journal.go index 899ed00bcc..46fd6de346 100644 --- a/core/txpool/legacypool/journal.go +++ b/core/txpool/locals/journal.go @@ -14,7 +14,7 @@ // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see . -package legacypool +package locals import ( "errors" diff --git a/core/txpool/locals/tx_tracker.go b/core/txpool/locals/tx_tracker.go new file mode 100644 index 0000000000..a24fcb1f4e --- /dev/null +++ b/core/txpool/locals/tx_tracker.go @@ -0,0 +1,212 @@ +// Copyright 2023 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +// Package locals implements tracking for "local" transactions +package locals + +import ( + "sync" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/txpool" + "github.com/ethereum/go-ethereum/core/txpool/legacypool" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/params" + "golang.org/x/exp/slices" +) + +var ( + recheckInterval = time.Minute + localGauge = metrics.GetOrRegisterGauge("txpool/local", nil) +) + +// TxTracker is a struct used to track priority transactions; it will check from +// time to time if the main pool has forgotten about any of the transaction +// it is tracking, and if so, submit it again. +// This is used to track 'locals'. +// This struct does not care about transaction validity, price-bumps or account limits, +// but optimistically accepts transactions. +type TxTracker struct { + all map[common.Hash]*types.Transaction // All tracked transactions + byAddr map[common.Address]*legacypool.SortedMap // Transactions by address + + journal *journal // Journal of local transaction to back up to disk + rejournal time.Duration // How often to rotate journal + pool *txpool.TxPool // The tx pool to interact with + signer types.Signer + + shutdownCh chan struct{} + mu sync.Mutex + wg sync.WaitGroup +} + +// New creates a new TxTracker +func New(journalPath string, journalTime time.Duration, chainConfig *params.ChainConfig, next *txpool.TxPool) *TxTracker { + pool := &TxTracker{ + all: make(map[common.Hash]*types.Transaction), + byAddr: make(map[common.Address]*legacypool.SortedMap), + signer: types.LatestSigner(chainConfig), + shutdownCh: make(chan struct{}), + pool: next, + } + if journalPath != "" { + pool.journal = newTxJournal(journalPath) + pool.rejournal = journalTime + } + return pool +} + +// Track adds a transaction to the tracked set. +// Note: blob-type transactions are ignored. +func (tracker *TxTracker) Track(tx *types.Transaction) { + tracker.TrackAll([]*types.Transaction{tx}) +} + +// TrackAll adds a list of transactions to the tracked set. +// Note: blob-type transactions are ignored. +func (tracker *TxTracker) TrackAll(txs []*types.Transaction) { + tracker.mu.Lock() + defer tracker.mu.Unlock() + + for _, tx := range txs { + if tx.Type() == types.BlobTxType { + continue + } + // If we're already tracking it, it's a no-op + if _, ok := tracker.all[tx.Hash()]; ok { + continue + } + addr, err := types.Sender(tracker.signer, tx) + if err != nil { // Ignore this tx + continue + } + tracker.all[tx.Hash()] = tx + if tracker.byAddr[addr] == nil { + tracker.byAddr[addr] = legacypool.NewSortedMap() + } + tracker.byAddr[addr].Put(tx) + + if tracker.journal != nil { + _ = tracker.journal.insert(tx) + } + } + localGauge.Update(int64(len(tracker.all))) +} + +// recheck checks and returns any transactions that needs to be resubmitted. +func (tracker *TxTracker) recheck(journalCheck bool) (resubmits []*types.Transaction, rejournal map[common.Address]types.Transactions) { + tracker.mu.Lock() + defer tracker.mu.Unlock() + + var ( + numStales = 0 + numOk = 0 + ) + for sender, txs := range tracker.byAddr { + // Wipe the stales + stales := txs.Forward(tracker.pool.Nonce(sender)) + for _, tx := range stales { + delete(tracker.all, tx.Hash()) + } + numStales += len(stales) + + // Check the non-stale + for _, tx := range txs.Flatten() { + if tracker.pool.Has(tx.Hash()) { + numOk++ + continue + } + resubmits = append(resubmits, tx) + } + } + + if journalCheck { // rejournal + rejournal = make(map[common.Address]types.Transactions) + for _, tx := range tracker.all { + addr, _ := types.Sender(tracker.signer, tx) + rejournal[addr] = append(rejournal[addr], tx) + } + // Sort them + for _, list := range rejournal { + // cmp(a, b) should return a negative number when a < b, + slices.SortFunc(list, func(a, b *types.Transaction) int { + return int(a.Nonce() - b.Nonce()) + }) + } + } + localGauge.Update(int64(len(tracker.all))) + log.Debug("Tx tracker status", "need-resubmit", len(resubmits), "stale", numStales, "ok", numOk) + return resubmits, rejournal +} + +// Start implements node.Lifecycle interface +// Start is called after all services have been constructed and the networking +// layer was also initialized to spawn any goroutines required by the service. +func (tracker *TxTracker) Start() error { + tracker.wg.Add(1) + go tracker.loop() + return nil +} + +// Stop implements node.Lifecycle interface +// Stop terminates all goroutines belonging to the service, blocking until they +// are all terminated. +func (tracker *TxTracker) Stop() error { + close(tracker.shutdownCh) + tracker.wg.Wait() + return nil +} + +func (tracker *TxTracker) loop() { + defer tracker.wg.Done() + + if tracker.journal != nil { + tracker.journal.load(func(transactions []*types.Transaction) []error { + tracker.TrackAll(transactions) + return nil + }) + defer tracker.journal.close() + } + var ( + lastJournal = time.Now() + timer = time.NewTimer(10 * time.Second) // Do initial check after 10 seconds, do rechecks more seldom. + ) + for { + select { + case <-tracker.shutdownCh: + return + case <-timer.C: + checkJournal := tracker.journal != nil && time.Since(lastJournal) > tracker.rejournal + resubmits, rejournal := tracker.recheck(checkJournal) + if len(resubmits) > 0 { + tracker.pool.Add(resubmits, false) + } + if checkJournal { + // Lock to prevent journal.rotate <-> journal.insert (via TrackAll) conflicts + tracker.mu.Lock() + lastJournal = time.Now() + if err := tracker.journal.rotate(rejournal); err != nil { + log.Warn("Transaction journal rotation failed", "err", err) + } + tracker.mu.Unlock() + } + timer.Reset(recheckInterval) + } + } +} diff --git a/core/txpool/subpool.go b/core/txpool/subpool.go index 9ee0a69c0b..5ad0f5b0e0 100644 --- a/core/txpool/subpool.go +++ b/core/txpool/subpool.go @@ -132,7 +132,7 @@ type SubPool interface { // Add enqueues a batch of transactions into the pool if they are valid. Due // to the large transaction churn, add may postpone fully integrating the tx // to a later point to batch multiple ones together. - Add(txs []*types.Transaction, local bool, sync bool) []error + Add(txs []*types.Transaction, sync bool) []error // Pending retrieves all currently processable transactions, grouped by origin // account and sorted by nonce. @@ -162,9 +162,6 @@ type SubPool interface { // pending as well as queued transactions of this address, grouped by nonce. ContentFrom(addr common.Address) ([]*types.Transaction, []*types.Transaction) - // Locals retrieves the accounts currently considered local by the pool. - Locals() []common.Address - // Status returns the known status (unknown/pending/queued) of a transaction // identified by their hashes. Status(hash common.Hash) TxStatus diff --git a/core/txpool/txpool.go b/core/txpool/txpool.go index 182706d63c..361dbe8b38 100644 --- a/core/txpool/txpool.go +++ b/core/txpool/txpool.go @@ -328,7 +328,7 @@ func (p *TxPool) GetBlobs(vhashes []common.Hash) ([]*kzg4844.Blob, []*kzg4844.Pr // Add enqueues a batch of transactions into the pool if they are valid. Due // to the large transaction churn, add may postpone fully integrating the tx // to a later point to batch multiple ones together. -func (p *TxPool) Add(txs []*types.Transaction, local bool, sync bool) []error { +func (p *TxPool) Add(txs []*types.Transaction, sync bool) []error { // Split the input transactions between the subpools. It shouldn't really // happen that we receive merged batches, but better graceful than strange // errors. @@ -355,7 +355,7 @@ func (p *TxPool) Add(txs []*types.Transaction, local bool, sync bool) []error { // back the errors into the original sort order. errsets := make([][]error, len(p.subpools)) for i := 0; i < len(p.subpools); i++ { - errsets[i] = p.subpools[i].Add(txsets[i], local, sync) + errsets[i] = p.subpools[i].Add(txsets[i], sync) } errs := make([]error, len(txs)) for i, split := range splits { @@ -456,23 +456,6 @@ func (p *TxPool) ContentFrom(addr common.Address) ([]*types.Transaction, []*type return []*types.Transaction{}, []*types.Transaction{} } -// Locals retrieves the accounts currently considered local by the pool. -func (p *TxPool) Locals() []common.Address { - // Retrieve the locals from each subpool and deduplicate them - locals := make(map[common.Address]struct{}) - for _, subpool := range p.subpools { - for _, local := range subpool.Locals() { - locals[local] = struct{}{} - } - } - // Flatten and return the deduplicated local set - flat := make([]common.Address, 0, len(locals)) - for local := range locals { - flat = append(flat, local) - } - return flat -} - // Status returns the known status (unknown/pending/queued) of a transaction // identified by its hash. func (p *TxPool) Status(hash common.Hash) TxStatus { diff --git a/eth/api_backend.go b/eth/api_backend.go index 52ecd91789..66621190dd 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -272,7 +272,10 @@ func (b *EthAPIBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscri } func (b *EthAPIBackend) SendTx(ctx context.Context, signedTx *types.Transaction) error { - return b.eth.txPool.Add([]*types.Transaction{signedTx}, true, false)[0] + if locals := b.eth.localTxTracker; locals != nil { + locals.Track(signedTx) + } + return b.eth.txPool.Add([]*types.Transaction{signedTx}, false)[0] } func (b *EthAPIBackend) GetPoolTransactions() (types.Transactions, error) { diff --git a/eth/backend.go b/eth/backend.go index a3aa0a7b9b..fea7e4e1fe 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -23,6 +23,7 @@ import ( "math/big" "runtime" "sync" + "time" "github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/common" @@ -35,6 +36,7 @@ import ( "github.com/ethereum/go-ethereum/core/txpool" "github.com/ethereum/go-ethereum/core/txpool/blobpool" "github.com/ethereum/go-ethereum/core/txpool/legacypool" + "github.com/ethereum/go-ethereum/core/txpool/locals" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/eth/downloader" @@ -67,9 +69,10 @@ type Config = ethconfig.Config // Ethereum implements the Ethereum full node service. type Ethereum struct { // core protocol objects - config *ethconfig.Config - txPool *txpool.TxPool - blockchain *core.BlockChain + config *ethconfig.Config + txPool *txpool.TxPool + localTxTracker *locals.TxTracker + blockchain *core.BlockChain handler *handler discmix *enode.FairMix @@ -237,6 +240,16 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { if err != nil { return nil, err } + + if !config.TxPool.NoLocals { + rejournal := config.TxPool.Rejournal + if rejournal < time.Second { + log.Warn("Sanitizing invalid txpool journal time", "provided", rejournal, "updated", time.Second) + rejournal = time.Second + } + eth.localTxTracker = locals.New(config.TxPool.Journal, rejournal, eth.blockchain.Config(), eth.txPool) + stack.RegisterLifecycle(eth.localTxTracker) + } // Permit the downloader to use the trie cache allowance during fast sync cacheLimit := cacheConfig.TrieCleanLimit + cacheConfig.TrieDirtyLimit + cacheConfig.SnapshotLimit if eth.handler, err = newHandler(&handlerConfig{ @@ -255,6 +268,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { eth.miner = miner.New(eth, config.Miner, eth.engine) eth.miner.SetExtra(makeExtraData(config.Miner.ExtraData)) + eth.miner.SetPrioAddresses(config.TxPool.Locals) eth.APIBackend = &EthAPIBackend{stack.Config().ExtRPCEnabled(), stack.Config().AllowUnprotectedTxs, eth, nil} if eth.APIBackend.allowUnprotectedTxs { diff --git a/eth/catalyst/api_test.go b/eth/catalyst/api_test.go index 9840d9c6ad..8fc3361192 100644 --- a/eth/catalyst/api_test.go +++ b/eth/catalyst/api_test.go @@ -115,7 +115,7 @@ func TestEth2AssembleBlock(t *testing.T) { if err != nil { t.Fatalf("error signing transaction, err=%v", err) } - ethservice.TxPool().Add([]*types.Transaction{tx}, true, true) + ethservice.TxPool().Add([]*types.Transaction{tx}, true) blockParams := engine.PayloadAttributes{ Timestamp: blocks[9].Time() + 5, } @@ -152,7 +152,7 @@ func TestEth2AssembleBlockWithAnotherBlocksTxs(t *testing.T) { // Put the 10th block's tx in the pool and produce a new block txs := blocks[9].Transactions() - api.eth.TxPool().Add(txs, false, true) + api.eth.TxPool().Add(txs, true) blockParams := engine.PayloadAttributes{ Timestamp: blocks[8].Time() + 5, } @@ -174,7 +174,7 @@ func TestEth2PrepareAndGetPayload(t *testing.T) { // Put the 10th block's tx in the pool and produce a new block txs := blocks[9].Transactions() - ethservice.TxPool().Add(txs, true, true) + ethservice.TxPool().Add(txs, true) blockParams := engine.PayloadAttributes{ Timestamp: blocks[8].Time() + 5, } @@ -294,7 +294,7 @@ func TestEth2NewBlock(t *testing.T) { statedb, _ := ethservice.BlockChain().StateAt(parent.Root()) nonce := statedb.GetNonce(testAddr) tx, _ := types.SignTx(types.NewContractCreation(nonce, new(big.Int), 1000000, big.NewInt(2*params.InitialBaseFee), logCode), types.LatestSigner(ethservice.BlockChain().Config()), testKey) - ethservice.TxPool().Add([]*types.Transaction{tx}, true, true) + ethservice.TxPool().Add([]*types.Transaction{tx}, true) execData, err := assembleWithTransactions(api, parent.Hash(), &engine.PayloadAttributes{ Timestamp: parent.Time() + 5, @@ -463,7 +463,7 @@ func TestFullAPI(t *testing.T) { statedb, _ := ethservice.BlockChain().StateAt(parent.Root) nonce := statedb.GetNonce(testAddr) tx, _ := types.SignTx(types.NewContractCreation(nonce, new(big.Int), 1000000, big.NewInt(2*params.InitialBaseFee), logCode), types.LatestSigner(ethservice.BlockChain().Config()), testKey) - ethservice.TxPool().Add([]*types.Transaction{tx}, true, false) + ethservice.TxPool().Add([]*types.Transaction{tx}, false) } setupBlocks(t, ethservice, 10, parent, callback, nil, nil) @@ -594,7 +594,7 @@ func TestNewPayloadOnInvalidChain(t *testing.T) { GasPrice: big.NewInt(2 * params.InitialBaseFee), Data: logCode, }) - ethservice.TxPool().Add([]*types.Transaction{tx}, false, true) + ethservice.TxPool().Add([]*types.Transaction{tx}, true) var ( params = engine.PayloadAttributes{ Timestamp: parent.Time + 1, @@ -1246,8 +1246,8 @@ func setupBodies(t *testing.T) (*node.Node, *eth.Ethereum, []*types.Block) { // Create tx to trigger deposit generator. tx2, _ = types.SignTx(types.NewTransaction(statedb.GetNonce(testAddr)+1, ethservice.APIBackend.ChainConfig().DepositContractAddress, new(big.Int), 500000, big.NewInt(2*params.InitialBaseFee), nil), types.LatestSigner(ethservice.BlockChain().Config()), testKey) ) - ethservice.TxPool().Add([]*types.Transaction{tx1}, false, false) - ethservice.TxPool().Add([]*types.Transaction{tx2}, false, false) + ethservice.TxPool().Add([]*types.Transaction{tx1}, false) + ethservice.TxPool().Add([]*types.Transaction{tx2}, false) } // Make some withdrawals to include. @@ -1637,7 +1637,7 @@ func TestWitnessCreationAndConsumption(t *testing.T) { // Put the 10th block's tx in the pool and produce a new block txs := blocks[9].Transactions() - ethservice.TxPool().Add(txs, true, true) + ethservice.TxPool().Add(txs, true) blockParams := engine.PayloadAttributes{ Timestamp: blocks[8].Time() + 5, Withdrawals: make([]*types.Withdrawal, 0), diff --git a/eth/catalyst/simulated_beacon_test.go b/eth/catalyst/simulated_beacon_test.go index 79d9ba738e..ea35482896 100644 --- a/eth/catalyst/simulated_beacon_test.go +++ b/eth/catalyst/simulated_beacon_test.go @@ -18,6 +18,7 @@ package catalyst import ( "context" + "fmt" "math/big" "testing" "time" @@ -143,9 +144,14 @@ func TestSimulatedBeaconSendWithdrawals(t *testing.T) { // Tests that zero-period dev mode can handle a lot of simultaneous // transactions/withdrawals func TestOnDemandSpam(t *testing.T) { + // This test is flaky, due to various causes, and the root cause is synchronicity. + // We have optimistic timeouts here and there in the simulated becaon and the worker. + // This test typically fails on 32-bit windows appveyor. + t.Skip("flaky test") var ( withdrawals []types.Withdrawal - txs = make(map[common.Hash]*types.Transaction) + txCount = 20000 + wxCount = 20 testKey, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") testAddr = crypto.PubkeyToAddress(testKey.PublicKey) gasLimit uint64 = 10_000_000 @@ -160,7 +166,7 @@ func TestOnDemandSpam(t *testing.T) { defer sub.Unsubscribe() // generate some withdrawals - for i := 0; i < 20; i++ { + for i := 0; i < wxCount; i++ { withdrawals = append(withdrawals, types.Withdrawal{Index: uint64(i)}) if err := mock.withdrawals.add(&withdrawals[i]); err != nil { t.Fatal("addWithdrawal failed", err) @@ -168,37 +174,37 @@ func TestOnDemandSpam(t *testing.T) { } // generate a bunch of transactions - for i := 0; i < 20000; i++ { - tx, err := types.SignTx(types.NewTransaction(uint64(i), common.Address{byte(i), byte(1)}, big.NewInt(1000), params.TxGas, big.NewInt(params.InitialBaseFee*2), nil), signer, testKey) - if err != nil { - t.Fatal("error signing transaction", err) + go func() { + for i := 0; i < txCount; i++ { + tx, err := types.SignTx(types.NewTransaction(uint64(i), common.Address{byte(i), byte(1)}, big.NewInt(1000), params.TxGas, big.NewInt(params.InitialBaseFee*2), nil), signer, testKey) + if err != nil { + panic(fmt.Sprintf("error signing transaction: %v", err)) + } + if err := eth.TxPool().Add([]*types.Transaction{tx}, false)[0]; err != nil { + panic(fmt.Sprintf("error adding txs to pool: %v", err)) + } } - txs[tx.Hash()] = tx - if err := eth.APIBackend.SendTx(context.Background(), tx); err != nil { - t.Fatal("error adding txs to pool", err) - } - } - + }() var ( - includedTxs = make(map[common.Hash]struct{}) - includedWxs []uint64 + includedTxs int + includedWxs int + abort = time.NewTimer(10 * time.Second) ) + defer abort.Stop() for { select { case ev := <-chainHeadCh: block := eth.BlockChain().GetBlock(ev.Header.Hash(), ev.Header.Number.Uint64()) - for _, itx := range block.Transactions() { - includedTxs[itx.Hash()] = struct{}{} - } - for _, iwx := range block.Withdrawals() { - includedWxs = append(includedWxs, iwx.Index) - } + includedTxs += len(block.Transactions()) + includedWxs += len(block.Withdrawals()) // ensure all withdrawals/txs included. this will take two blocks b/c number of withdrawals > 10 - if len(includedTxs) == len(txs) && len(includedWxs) == len(withdrawals) { + if includedTxs == txCount && includedWxs == wxCount { return } - case <-time.After(10 * time.Second): - t.Fatalf("timed out without including all withdrawals/txs: have txs %d, want %d, have wxs %d, want %d", len(includedTxs), len(txs), len(includedWxs), len(withdrawals)) + abort.Reset(10 * time.Second) + case <-abort.C: + t.Fatalf("timed out without including all withdrawals/txs: have txs %d, want %d, have wxs %d, want %d", + includedTxs, txCount, includedWxs, wxCount) } } } diff --git a/eth/handler.go b/eth/handler.go index 8893920497..6ac890902b 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -68,7 +68,7 @@ type txPool interface { Get(hash common.Hash) *types.Transaction // Add should add the given transactions to the pool. - Add(txs []*types.Transaction, local bool, sync bool) []error + Add(txs []*types.Transaction, sync bool) []error // Pending should return pending transactions. // The slice should be modifiable by the caller. @@ -189,7 +189,7 @@ func newHandler(config *handlerConfig) (*handler, error) { return p.RequestTxs(hashes) } addTxs := func(txs []*types.Transaction) []error { - return h.txpool.Add(txs, false, false) + return h.txpool.Add(txs, false) } h.txFetcher = fetcher.NewTxFetcher(h.txpool.Has, addTxs, fetchTx, h.removePeer) return h, nil diff --git a/eth/handler_eth_test.go b/eth/handler_eth_test.go index 622880b097..8d572ca966 100644 --- a/eth/handler_eth_test.go +++ b/eth/handler_eth_test.go @@ -299,8 +299,8 @@ func testSendTransactions(t *testing.T, protocol uint) { tx, _ = types.SignTx(tx, types.HomesteadSigner{}, testKey) insert[nonce] = tx } - go handler.txpool.Add(insert, false, false) // Need goroutine to not block on feed - time.Sleep(250 * time.Millisecond) // Wait until tx events get out of the system (can't use events, tx broadcaster races with peer join) + go handler.txpool.Add(insert, false) // Need goroutine to not block on feed + time.Sleep(250 * time.Millisecond) // Wait until tx events get out of the system (can't use events, tx broadcaster races with peer join) // Create a source handler to send messages through and a sink peer to receive them p2pSrc, p2pSink := p2p.MsgPipe() @@ -419,7 +419,7 @@ func testTransactionPropagation(t *testing.T, protocol uint) { tx, _ = types.SignTx(tx, types.HomesteadSigner{}, testKey) txs[nonce] = tx } - source.txpool.Add(txs, false, false) + source.txpool.Add(txs, false) // Iterate through all the sinks and ensure they all got the transactions for i := range sinks { diff --git a/eth/handler_test.go b/eth/handler_test.go index b63d3e8592..d5d46a3c65 100644 --- a/eth/handler_test.go +++ b/eth/handler_test.go @@ -80,7 +80,7 @@ func (p *testTxPool) Get(hash common.Hash) *types.Transaction { // Add appends a batch of transactions to the pool, and notifies any // listeners if the addition channel is non nil -func (p *testTxPool) Add(txs []*types.Transaction, local bool, sync bool) []error { +func (p *testTxPool) Add(txs []*types.Transaction, sync bool) []error { p.lock.Lock() defer p.lock.Unlock() diff --git a/miner/miner.go b/miner/miner.go index 9892c08ed6..595ef8081c 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -70,6 +70,7 @@ type Miner struct { chainConfig *params.ChainConfig engine consensus.Engine txpool *txpool.TxPool + prio []common.Address // A list of senders to prioritize chain *core.BlockChain pending *pending pendingMu sync.Mutex // Lock protects the pending block @@ -109,6 +110,13 @@ func (miner *Miner) SetExtra(extra []byte) error { return nil } +// SetPrioAddresses sets a list of addresses to prioritize for transaction inclusion. +func (miner *Miner) SetPrioAddresses(prio []common.Address) { + miner.confMu.Lock() + miner.prio = prio + miner.confMu.Unlock() +} + // SetGasCeil sets the gaslimit to strive for when mining blocks post 1559. // For pre-1559 blocks, it sets the ceiling. func (miner *Miner) SetGasCeil(ceil uint64) { diff --git a/miner/payload_building_test.go b/miner/payload_building_test.go index e5eb0297a1..307024c6bb 100644 --- a/miner/payload_building_test.go +++ b/miner/payload_building_test.go @@ -138,7 +138,7 @@ func (b *testWorkerBackend) TxPool() *txpool.TxPool { return b.txPool } func newTestWorker(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine, db ethdb.Database, blocks int) (*Miner, *testWorkerBackend) { backend := newTestWorkerBackend(t, chainConfig, engine, db, blocks) - backend.txPool.Add(pendingTxs, true, true) + backend.txPool.Add(pendingTxs, true) w := New(backend, testConfig, engine) return w, backend } diff --git a/miner/worker.go b/miner/worker.go index f8f4bae833..16ac7de9a9 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -427,6 +427,7 @@ func (miner *Miner) commitTransactions(env *environment, plainTxs, blobTxs *tran func (miner *Miner) fillTransactions(interrupt *atomic.Int32, env *environment) error { miner.confMu.RLock() tip := miner.config.GasPrice + prio := miner.prio miner.confMu.RUnlock() // Retrieve the pending transactions pre-filtered by the 1559/4844 dynamic fees @@ -446,31 +447,31 @@ func (miner *Miner) fillTransactions(interrupt *atomic.Int32, env *environment) pendingBlobTxs := miner.txpool.Pending(filter) // Split the pending transactions into locals and remotes. - localPlainTxs, remotePlainTxs := make(map[common.Address][]*txpool.LazyTransaction), pendingPlainTxs - localBlobTxs, remoteBlobTxs := make(map[common.Address][]*txpool.LazyTransaction), pendingBlobTxs + prioPlainTxs, normalPlainTxs := make(map[common.Address][]*txpool.LazyTransaction), pendingPlainTxs + prioBlobTxs, normalBlobTxs := make(map[common.Address][]*txpool.LazyTransaction), pendingBlobTxs - for _, account := range miner.txpool.Locals() { - if txs := remotePlainTxs[account]; len(txs) > 0 { - delete(remotePlainTxs, account) - localPlainTxs[account] = txs + for _, account := range prio { + if txs := normalPlainTxs[account]; len(txs) > 0 { + delete(normalPlainTxs, account) + prioPlainTxs[account] = txs } - if txs := remoteBlobTxs[account]; len(txs) > 0 { - delete(remoteBlobTxs, account) - localBlobTxs[account] = txs + if txs := normalBlobTxs[account]; len(txs) > 0 { + delete(normalBlobTxs, account) + prioBlobTxs[account] = txs } } // Fill the block with all available pending transactions. - if len(localPlainTxs) > 0 || len(localBlobTxs) > 0 { - plainTxs := newTransactionsByPriceAndNonce(env.signer, localPlainTxs, env.header.BaseFee) - blobTxs := newTransactionsByPriceAndNonce(env.signer, localBlobTxs, env.header.BaseFee) + if len(prioPlainTxs) > 0 || len(prioBlobTxs) > 0 { + plainTxs := newTransactionsByPriceAndNonce(env.signer, prioPlainTxs, env.header.BaseFee) + blobTxs := newTransactionsByPriceAndNonce(env.signer, prioBlobTxs, env.header.BaseFee) if err := miner.commitTransactions(env, plainTxs, blobTxs, interrupt); err != nil { return err } } - if len(remotePlainTxs) > 0 || len(remoteBlobTxs) > 0 { - plainTxs := newTransactionsByPriceAndNonce(env.signer, remotePlainTxs, env.header.BaseFee) - blobTxs := newTransactionsByPriceAndNonce(env.signer, remoteBlobTxs, env.header.BaseFee) + if len(normalPlainTxs) > 0 || len(normalBlobTxs) > 0 { + plainTxs := newTransactionsByPriceAndNonce(env.signer, normalPlainTxs, env.header.BaseFee) + blobTxs := newTransactionsByPriceAndNonce(env.signer, normalBlobTxs, env.header.BaseFee) if err := miner.commitTransactions(env, plainTxs, blobTxs, interrupt); err != nil { return err