diff --git a/core/tx_pool.go b/core/tx_pool.go index b16825332b..5d37fcffc0 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -208,16 +208,14 @@ func (config *TxPoolConfig) sanitize() TxPoolConfig { // current state) and future transactions. Transactions move between those // two states over time as they are received and processed. type TxPool struct { - config TxPoolConfig - chainconfig *params.ChainConfig - chain blockChain - gasPrice *big.Int - txFeed event.Feed - scope event.SubscriptionScope - chainHeadCh chan ChainHeadEvent - chainHeadSub event.Subscription - signer types.Signer - mu sync.RWMutex + config TxPoolConfig + chainconfig *params.ChainConfig + chain blockChain + gasPrice *big.Int + txFeed event.Feed + scope event.SubscriptionScope + signer types.Signer + mu sync.RWMutex currentState *state.StateDB // Current state in the blockchain head pendingState *state.ManagedState // Pending state tracking virtual nonces @@ -232,9 +230,18 @@ type TxPool struct { all *txLookup // All transactions to allow lookups priced *txPricedList // All transactions sorted by price - wg sync.WaitGroup // for shutdown sync + chainHeadCh chan ChainHeadEvent + chainHeadSub event.Subscription + reqResetCh chan *txpoolResetRequest + reqPromoteCh chan *accountSet + queueTxEventCh chan *types.Transaction + reorgDoneCh chan chan struct{} + reorgShutdownCh chan struct{} // requests shutdown of scheduleReorgLoop + wg sync.WaitGroup // tracks loop, scheduleReorgLoop +} - homestead bool +type txpoolResetRequest struct { + oldHead, newHead *types.Header } // NewTxPool creates a new transaction pool to gather, sort and filter inbound @@ -245,16 +252,21 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block // Create the transaction pool with its initial settings pool := &TxPool{ - config: config, - chainconfig: chainconfig, - chain: chain, - signer: types.NewEIP155Signer(chainconfig.ChainID), - pending: make(map[common.Address]*txList), - queue: make(map[common.Address]*txList), - beats: make(map[common.Address]time.Time), - all: newTxLookup(), - chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize), - gasPrice: new(big.Int).SetUint64(config.PriceLimit), + config: config, + chainconfig: chainconfig, + chain: chain, + signer: types.NewEIP155Signer(chainconfig.ChainID), + pending: make(map[common.Address]*txList), + queue: make(map[common.Address]*txList), + beats: make(map[common.Address]time.Time), + all: newTxLookup(), + chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize), + reqResetCh: make(chan *txpoolResetRequest), + reqPromoteCh: make(chan *accountSet), + queueTxEventCh: make(chan *types.Transaction), + reorgDoneCh: make(chan chan struct{}), + reorgShutdownCh: make(chan struct{}), + gasPrice: new(big.Int).SetUint64(config.PriceLimit), } pool.locals = newAccountSet(pool.signer) for _, addr := range config.Locals { @@ -264,6 +276,10 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block pool.priced = newTxPricedList(pool.all) pool.reset(nil, chain.CurrentBlock().Header()) + // Start the reorg loop early so it can handle requests generated during journal loading. + pool.wg.Add(1) + go pool.scheduleReorgLoop() + // If local transactions and journaling is enabled, load from disk if !config.NoLocals && config.Journal != "" { pool.journal = newTxJournal(config.Journal) @@ -275,10 +291,9 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block log.Warn("Failed to rotate transaction journal", "err", err) } } - // Subscribe events from blockchain - pool.chainHeadSub = pool.chain.SubscribeChainHeadEvent(pool.chainHeadCh) - // Start the event loop and return + // Subscribe events from blockchain and start the main event loop. + pool.chainHeadSub = pool.chain.SubscribeChainHeadEvent(pool.chainHeadCh) pool.wg.Add(1) go pool.loop() @@ -291,38 +306,31 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block func (pool *TxPool) loop() { defer pool.wg.Done() - // Start the stats reporting and transaction eviction tickers - var prevPending, prevQueued, prevStales int - - report := time.NewTicker(statsReportInterval) + var ( + prevPending, prevQueued, prevStales int + // Start the stats reporting and transaction eviction tickers + report = time.NewTicker(statsReportInterval) + evict = time.NewTicker(evictionInterval) + journal = time.NewTicker(pool.config.Rejournal) + // Track the previous head headers for transaction reorgs + head = pool.chain.CurrentBlock() + ) defer report.Stop() - - evict := time.NewTicker(evictionInterval) defer evict.Stop() - - journal := time.NewTicker(pool.config.Rejournal) defer journal.Stop() - // Track the previous head headers for transaction reorgs - head := pool.chain.CurrentBlock() - - // Keep waiting for and reacting to the various events for { select { // Handle ChainHeadEvent case ev := <-pool.chainHeadCh: if ev.Block != nil { - pool.mu.Lock() - if pool.chainconfig.IsHomestead(ev.Block.Number()) { - pool.homestead = true - } - pool.reset(head.Header(), ev.Block.Header()) + pool.requestReset(head.Header(), ev.Block.Header()) head = ev.Block - - pool.mu.Unlock() } - // Be unsubscribed due to system stopped + + // System shutdown. case <-pool.chainHeadSub.Err(): + close(pool.reorgShutdownCh) return // Handle stats reporting ticks @@ -367,114 +375,6 @@ func (pool *TxPool) loop() { } } -// lockedReset is a wrapper around reset to allow calling it in a thread safe -// manner. This method is only ever used in the tester! -func (pool *TxPool) lockedReset(oldHead, newHead *types.Header) { - pool.mu.Lock() - defer pool.mu.Unlock() - - pool.reset(oldHead, newHead) -} - -// reset retrieves the current state of the blockchain and ensures the content -// of the transaction pool is valid with regard to the chain state. -func (pool *TxPool) reset(oldHead, newHead *types.Header) { - // If we're reorging an old state, reinject all dropped transactions - var reinject types.Transactions - - if oldHead != nil && oldHead.Hash() != newHead.ParentHash { - // If the reorg is too deep, avoid doing it (will happen during fast sync) - oldNum := oldHead.Number.Uint64() - newNum := newHead.Number.Uint64() - - if depth := uint64(math.Abs(float64(oldNum) - float64(newNum))); depth > 64 { - log.Debug("Skipping deep transaction reorg", "depth", depth) - } else { - // Reorg seems shallow enough to pull in all transactions into memory - var discarded, included types.Transactions - var ( - rem = pool.chain.GetBlock(oldHead.Hash(), oldHead.Number.Uint64()) - add = pool.chain.GetBlock(newHead.Hash(), newHead.Number.Uint64()) - ) - if rem == nil { - // This can happen if a setHead is performed, where we simply discard the old - // head from the chain. - // If that is the case, we don't have the lost transactions any more, and - // there's nothing to add - if newNum < oldNum { - // If the reorg ended up on a lower number, it's indicative of setHead being the cause - log.Debug("Skipping transaction reset caused by setHead", - "old", oldHead.Hash(), "oldnum", oldNum, "new", newHead.Hash(), "newnum", newNum) - } else { - // If we reorged to a same or higher number, then it's not a case of setHead - log.Warn("Transaction pool reset with missing oldhead", - "old", oldHead.Hash(), "oldnum", oldNum, "new", newHead.Hash(), "newnum", newNum) - } - return - } - for rem.NumberU64() > add.NumberU64() { - discarded = append(discarded, rem.Transactions()...) - if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil { - log.Error("Unrooted old chain seen by tx pool", "block", oldHead.Number, "hash", oldHead.Hash()) - return - } - } - for add.NumberU64() > rem.NumberU64() { - included = append(included, add.Transactions()...) - if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil { - log.Error("Unrooted new chain seen by tx pool", "block", newHead.Number, "hash", newHead.Hash()) - return - } - } - for rem.Hash() != add.Hash() { - discarded = append(discarded, rem.Transactions()...) - if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil { - log.Error("Unrooted old chain seen by tx pool", "block", oldHead.Number, "hash", oldHead.Hash()) - return - } - included = append(included, add.Transactions()...) - if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil { - log.Error("Unrooted new chain seen by tx pool", "block", newHead.Number, "hash", newHead.Hash()) - return - } - } - reinject = types.TxDifference(discarded, included) - } - } - // Initialize the internal state to the current head - if newHead == nil { - newHead = pool.chain.CurrentBlock().Header() // Special case during testing - } - statedb, err := pool.chain.StateAt(newHead.Root) - if err != nil { - log.Error("Failed to reset txpool state", "err", err) - return - } - pool.currentState = statedb - pool.pendingState = state.ManageState(statedb) - pool.currentMaxGas = newHead.GasLimit - - // Inject any transactions discarded due to reorgs - log.Debug("Reinjecting stale transactions", "count", len(reinject)) - senderCacher.recover(pool.signer, reinject) - pool.addTxsLocked(reinject, false) - - // validate the pool of pending transactions, this will remove - // any transactions that have been included in the block or - // have been invalidated because of another transaction (e.g. - // higher gas price) - pool.demoteUnexecutables() - - // Update all accounts to the latest known pending nonce - for addr, list := range pool.pending { - txs := list.Flatten() // Heavy but will be cached and is needed by the miner anyway - pool.pendingState.SetNonce(addr, txs[len(txs)-1].Nonce()+1) - } - // Check the queue and move transactions over to the pending if possible - // or remove those that have become invalid - pool.promoteExecutables(nil) -} - // Stop terminates the transaction pool. func (pool *TxPool) Stop() { // Unsubscribe all subscriptions registered from txpool @@ -638,7 +538,8 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error { if pool.currentState.GetBalance(from).Cmp(tx.Cost()) < 0 { return ErrInsufficientFunds } - intrGas, err := IntrinsicGas(tx.Data(), tx.To() == nil, pool.homestead) + // Ensure the transaction has more gas than the basic tx fee. + intrGas, err := IntrinsicGas(tx.Data(), tx.To() == nil, true) if err != nil { return err } @@ -648,27 +549,28 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error { return nil } -// add validates a transaction and inserts it into the non-executable queue for -// later pending promotion and execution. If the transaction is a replacement for -// an already pending or queued one, it overwrites the previous and returns this -// so outer code doesn't uselessly call promote. +// add validates a transaction and inserts it into the non-executable queue for later +// pending promotion and execution. If the transaction is a replacement for an already +// pending or queued one, it overwrites the previous transaction if its price is higher. // // If a newly added transaction is marked as local, its sending account will be -// whitelisted, preventing any associated transaction from being dropped out of -// the pool due to pricing constraints. -func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) { +// whitelisted, preventing any associated transaction from being dropped out of the pool +// due to pricing constraints. +func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err error) { // If the transaction is already known, discard it hash := tx.Hash() if pool.all.Get(hash) != nil { log.Trace("Discarding already known transaction", "hash", hash) return false, fmt.Errorf("known transaction: %x", hash) } + // If the transaction fails basic validation, discard it if err := pool.validateTx(tx, local); err != nil { log.Trace("Discarding invalid transaction", "hash", hash, "err", err) invalidTxMeter.Mark(1) return false, err } + // If the transaction pool is full, discard underpriced transactions if uint64(pool.all.Count()) >= pool.config.GlobalSlots+pool.config.GlobalQueue { // If the new transaction is underpriced, don't accept it @@ -685,7 +587,8 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) { pool.removeTx(tx.Hash(), false) } } - // If the transaction is replacing an already pending one, do directly + + // Try to replace an existing transaction in the pending pool from, _ := types.Sender(pool.signer, tx) // already validated if list := pool.pending[from]; list != nil && list.Overlaps(tx) { // Nonce already pending, check if required price bump is met @@ -703,19 +606,17 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) { pool.all.Add(tx) pool.priced.Put(tx) pool.journalTx(from, tx) - + pool.queueTxEvent(tx) log.Trace("Pooled new executable transaction", "hash", hash, "from", from, "to", tx.To()) - - // We've directly injected a replacement transaction, notify subsystems - go pool.txFeed.Send(NewTxsEvent{types.Transactions{tx}}) - return old != nil, nil } + // New transaction isn't replacing a pending one, push into queue - replace, err := pool.enqueueTx(hash, tx) + replaced, err = pool.enqueueTx(hash, tx) if err != nil { return false, err } + // Mark local addresses and journal local transactions if local { if !pool.locals.contains(from) { @@ -729,7 +630,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) { pool.journalTx(from, tx) log.Trace("Pooled new future transaction", "hash", hash, "from", from, "to", tx.To()) - return replace, nil + return replaced, nil } // enqueueTx inserts a new transaction into the non-executable transaction queue. @@ -817,96 +718,85 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T return true } -// AddLocal enqueues a single transaction into the pool if it is valid, marking -// the sender as a local one in the mean time, ensuring it goes around the local -// pricing constraints. -func (pool *TxPool) AddLocal(tx *types.Transaction) error { - return pool.addTx(tx, !pool.config.NoLocals) -} - -// AddRemote enqueues a single transaction into the pool if it is valid. If the -// sender is not among the locally tracked ones, full pricing constraints will -// apply. -func (pool *TxPool) AddRemote(tx *types.Transaction) error { - return pool.addTx(tx, false) -} - -// AddLocals enqueues a batch of transactions into the pool if they are valid, -// marking the senders as a local ones in the mean time, ensuring they go around -// the local pricing constraints. +// AddLocals enqueues a batch of transactions into the pool if they are valid, marking the +// senders as a local ones, ensuring they go around the local pricing constraints. +// +// This method is used to add transactions from the RPC API and performs synchronous pool +// reorganization and event propagation. func (pool *TxPool) AddLocals(txs []*types.Transaction) []error { - return pool.addTxs(txs, !pool.config.NoLocals) + return pool.addTxs(txs, !pool.config.NoLocals, true) } -// AddRemotes enqueues a batch of transactions into the pool if they are valid. -// If the senders are not among the locally tracked ones, full pricing constraints -// will apply. +// AddLocal enqueues a single local transaction into the pool if it is valid. This is +// a convenience wrapper aroundd AddLocals. +func (pool *TxPool) AddLocal(tx *types.Transaction) error { + errs := pool.AddLocals([]*types.Transaction{tx}) + return errs[0] +} + +// AddRemotes enqueues a batch of transactions into the pool if they are valid. If the +// senders are not among the locally tracked ones, full pricing constraints will apply. +// +// This method is used to add transactions from the p2p network and does not wait for pool +// reorganization and internal event propagation. func (pool *TxPool) AddRemotes(txs []*types.Transaction) []error { - return pool.addTxs(txs, false) + return pool.addTxs(txs, false, false) } -// addTx enqueues a single transaction into the pool if it is valid. -func (pool *TxPool) addTx(tx *types.Transaction, local bool) error { - // Cache sender in transaction before obtaining lock (pool.signer is immutable) - types.Sender(pool.signer, tx) +// This is like AddRemotes, but waits for pool reorganization. Tests use this method. +func (pool *TxPool) addRemotesSync(txs []*types.Transaction) []error { + return pool.addTxs(txs, false, true) +} - pool.mu.Lock() - defer pool.mu.Unlock() +// This is like AddRemotes with a single transaction, but waits for pool reorganization. Tests use this method. +func (pool *TxPool) addRemoteSync(tx *types.Transaction) error { + errs := pool.addRemotesSync([]*types.Transaction{tx}) + return errs[0] +} - // Try to inject the transaction and update any state - replace, err := pool.add(tx, local) - if err != nil { - return err - } - validMeter.Mark(1) - - // If we added a new transaction, run promotion checks and return - if !replace { - from, _ := types.Sender(pool.signer, tx) // already validated - pool.promoteExecutables([]common.Address{from}) - } - return nil +// AddRemote enqueues a single transaction into the pool if it is valid. This is a convenience +// wrapper around AddRemotes. +// +// Deprecated: use AddRemotes +func (pool *TxPool) AddRemote(tx *types.Transaction) error { + errs := pool.AddRemotes([]*types.Transaction{tx}) + return errs[0] } // addTxs attempts to queue a batch of transactions if they are valid. -func (pool *TxPool) addTxs(txs []*types.Transaction, local bool) []error { +func (pool *TxPool) addTxs(txs []*types.Transaction, local, sync bool) []error { // Cache senders in transactions before obtaining lock (pool.signer is immutable) for _, tx := range txs { types.Sender(pool.signer, tx) } + pool.mu.Lock() - defer pool.mu.Unlock() + errs, dirtyAddrs := pool.addTxsLocked(txs, local) + pool.mu.Unlock() - return pool.addTxsLocked(txs, local) -} - -// addTxsLocked attempts to queue a batch of transactions if they are valid, -// whilst assuming the transaction pool lock is already held. -func (pool *TxPool) addTxsLocked(txs []*types.Transaction, local bool) []error { - // Add the batch of transactions, tracking the accepted ones - dirty := make(map[common.Address]struct{}) - errs := make([]error, len(txs)) - - for i, tx := range txs { - var replace bool - if replace, errs[i] = pool.add(tx, local); errs[i] == nil && !replace { - from, _ := types.Sender(pool.signer, tx) // already validated - dirty[from] = struct{}{} - } - } - validMeter.Mark(int64(len(dirty))) - - // Only reprocess the internal state if something was actually added - if len(dirty) > 0 { - addrs := make([]common.Address, 0, len(dirty)) - for addr := range dirty { - addrs = append(addrs, addr) - } - pool.promoteExecutables(addrs) + done := pool.requestPromoteExecutables(dirtyAddrs) + if sync { + <-done } return errs } +// addTxsLocked attempts to queue a batch of transactions if they are valid. +// The transaction pool lock must be held. +func (pool *TxPool) addTxsLocked(txs []*types.Transaction, local bool) ([]error, *accountSet) { + dirty := newAccountSet(pool.signer) + errs := make([]error, len(txs)) + for i, tx := range txs { + replaced, err := pool.add(tx, local) + errs[i] = err + if err == nil && !replaced { + dirty.addTx(tx) + } + } + validMeter.Mark(int64(len(dirty.accounts))) + return errs, dirty +} + // Status returns the status (unknown/pending/queued) of a batch of transactions // identified by their hashes. func (pool *TxPool) Status(hashes []common.Hash) []TxStatus { @@ -927,8 +817,7 @@ func (pool *TxPool) Status(hashes []common.Hash) []TxStatus { return status } -// Get returns a transaction if it is contained in the pool -// and nil otherwise. +// Get returns a transaction if it is contained in the pool and nil otherwise. func (pool *TxPool) Get(hash common.Hash) *types.Transaction { return pool.all.Get(hash) } @@ -984,20 +873,261 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) { } } +// requestPromoteExecutables requests a pool reset to the new head block. +// The returned channel is closed when the reset has occurred. +func (pool *TxPool) requestReset(oldHead *types.Header, newHead *types.Header) chan struct{} { + select { + case pool.reqResetCh <- &txpoolResetRequest{oldHead, newHead}: + return <-pool.reorgDoneCh + case <-pool.reorgShutdownCh: + return pool.reorgShutdownCh + } +} + +// requestPromoteExecutables requests transaction promotion checks for the given addresses. +// The returned channel is closed when the promotion checks have occurred. +func (pool *TxPool) requestPromoteExecutables(set *accountSet) chan struct{} { + select { + case pool.reqPromoteCh <- set: + return <-pool.reorgDoneCh + case <-pool.reorgShutdownCh: + return pool.reorgShutdownCh + } +} + +// queueTxEvent enqueues a transaction event to be sent in the next reorg run. +func (pool *TxPool) queueTxEvent(tx *types.Transaction) { + select { + case pool.queueTxEventCh <- tx: + case <-pool.reorgShutdownCh: + } +} + +// scheduleReorgLoop schedules runs of reset and promoteExecutables. Code above should not +// call those methods directly, but request them being run using requestReset and +// requestPromoteExecutables instead. +func (pool *TxPool) scheduleReorgLoop() { + defer pool.wg.Done() + + var ( + curDone chan struct{} // non-nil while runReorg is active + nextDone = make(chan struct{}) + launchNextRun bool + reset *txpoolResetRequest + dirtyAccounts *accountSet + queuedEvents = make(map[common.Address]*txSortedMap) + ) + for { + // Launch next background reorg if needed + if curDone == nil && launchNextRun { + // Run the background reorg and announcements + go pool.runReorg(nextDone, reset, dirtyAccounts, queuedEvents) + + // Prepare everything for the next round of reorg + curDone, nextDone = nextDone, make(chan struct{}) + launchNextRun = false + + reset, dirtyAccounts = nil, nil + queuedEvents = make(map[common.Address]*txSortedMap) + } + + select { + case req := <-pool.reqResetCh: + // Reset request: update head if request is already pending. + if reset == nil { + reset = req + } else { + reset.newHead = req.newHead + } + launchNextRun = true + pool.reorgDoneCh <- nextDone + + case req := <-pool.reqPromoteCh: + // Promote request: update address set if request is already pending. + if dirtyAccounts == nil { + dirtyAccounts = req + } else { + dirtyAccounts.merge(req) + } + launchNextRun = true + pool.reorgDoneCh <- nextDone + + case tx := <-pool.queueTxEventCh: + // Queue up the event, but don't schedule a reorg. It's up to the caller to + // request one later if they want the events sent. + addr, _ := types.Sender(pool.signer, tx) + if _, ok := queuedEvents[addr]; !ok { + queuedEvents[addr] = newTxSortedMap() + } + queuedEvents[addr].Put(tx) + + case <-curDone: + curDone = nil + + case <-pool.reorgShutdownCh: + // Wait for current run to finish. + if curDone != nil { + <-curDone + } + close(nextDone) + return + } + } +} + +// runReorg runs reset and promoteExecutables on behalf of scheduleReorgLoop. +func (pool *TxPool) runReorg(done chan struct{}, reset *txpoolResetRequest, dirtyAccounts *accountSet, events map[common.Address]*txSortedMap) { + defer close(done) + + var promoteAddrs []common.Address + if dirtyAccounts != nil { + promoteAddrs = dirtyAccounts.flatten() + } + pool.mu.Lock() + if reset != nil { + // Reset from the old head to the new, rescheduling any reorged transactions + pool.reset(reset.oldHead, reset.newHead) + + // Nonces were reset, discard any events that became stale + for addr := range events { + events[addr].Forward(pool.pendingState.GetNonce(addr)) + if events[addr].Len() == 0 { + delete(events, addr) + } + } + // Reset needs promote for all addresses + promoteAddrs = promoteAddrs[:0] + for addr := range pool.queue { + promoteAddrs = append(promoteAddrs, addr) + } + } + // Check for pending transactions for every account that sent new ones + promoted := pool.promoteExecutables(promoteAddrs) + for _, tx := range promoted { + addr, _ := types.Sender(pool.signer, tx) + if _, ok := events[addr]; !ok { + events[addr] = newTxSortedMap() + } + events[addr].Put(tx) + } + // If a new block appeared, validate the pool of pending transactions. This will + // remove any transaction that has been included in the block or was invalidated + // because of another transaction (e.g. higher gas price). + if reset != nil { + pool.demoteUnexecutables() + } + // Ensure pool.queue and pool.pending sizes stay within the configured limits. + pool.truncatePending() + pool.truncateQueue() + + // Update all accounts to the latest known pending nonce + for addr, list := range pool.pending { + txs := list.Flatten() // Heavy but will be cached and is needed by the miner anyway + pool.pendingState.SetNonce(addr, txs[len(txs)-1].Nonce()+1) + } + pool.mu.Unlock() + + // Notify subsystems for newly added transactions + if len(events) > 0 { + var txs []*types.Transaction + for _, set := range events { + txs = append(txs, set.Flatten()...) + } + pool.txFeed.Send(NewTxsEvent{txs}) + } +} + +// reset retrieves the current state of the blockchain and ensures the content +// of the transaction pool is valid with regard to the chain state. +func (pool *TxPool) reset(oldHead, newHead *types.Header) { + // If we're reorging an old state, reinject all dropped transactions + var reinject types.Transactions + + if oldHead != nil && oldHead.Hash() != newHead.ParentHash { + // If the reorg is too deep, avoid doing it (will happen during fast sync) + oldNum := oldHead.Number.Uint64() + newNum := newHead.Number.Uint64() + + if depth := uint64(math.Abs(float64(oldNum) - float64(newNum))); depth > 64 { + log.Debug("Skipping deep transaction reorg", "depth", depth) + } else { + // Reorg seems shallow enough to pull in all transactions into memory + var discarded, included types.Transactions + var ( + rem = pool.chain.GetBlock(oldHead.Hash(), oldHead.Number.Uint64()) + add = pool.chain.GetBlock(newHead.Hash(), newHead.Number.Uint64()) + ) + if rem == nil { + // This can happen if a setHead is performed, where we simply discard the old + // head from the chain. + // If that is the case, we don't have the lost transactions any more, and + // there's nothing to add + if newNum < oldNum { + // If the reorg ended up on a lower number, it's indicative of setHead being the cause + log.Debug("Skipping transaction reset caused by setHead", + "old", oldHead.Hash(), "oldnum", oldNum, "new", newHead.Hash(), "newnum", newNum) + } else { + // If we reorged to a same or higher number, then it's not a case of setHead + log.Warn("Transaction pool reset with missing oldhead", + "old", oldHead.Hash(), "oldnum", oldNum, "new", newHead.Hash(), "newnum", newNum) + } + return + } + for rem.NumberU64() > add.NumberU64() { + discarded = append(discarded, rem.Transactions()...) + if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil { + log.Error("Unrooted old chain seen by tx pool", "block", oldHead.Number, "hash", oldHead.Hash()) + return + } + } + for add.NumberU64() > rem.NumberU64() { + included = append(included, add.Transactions()...) + if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil { + log.Error("Unrooted new chain seen by tx pool", "block", newHead.Number, "hash", newHead.Hash()) + return + } + } + for rem.Hash() != add.Hash() { + discarded = append(discarded, rem.Transactions()...) + if rem = pool.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil { + log.Error("Unrooted old chain seen by tx pool", "block", oldHead.Number, "hash", oldHead.Hash()) + return + } + included = append(included, add.Transactions()...) + if add = pool.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil { + log.Error("Unrooted new chain seen by tx pool", "block", newHead.Number, "hash", newHead.Hash()) + return + } + } + reinject = types.TxDifference(discarded, included) + } + } + // Initialize the internal state to the current head + if newHead == nil { + newHead = pool.chain.CurrentBlock().Header() // Special case during testing + } + statedb, err := pool.chain.StateAt(newHead.Root) + if err != nil { + log.Error("Failed to reset txpool state", "err", err) + return + } + pool.currentState = statedb + pool.pendingState = state.ManageState(statedb) + pool.currentMaxGas = newHead.GasLimit + + // Inject any transactions discarded due to reorgs + log.Debug("Reinjecting stale transactions", "count", len(reinject)) + senderCacher.recover(pool.signer, reinject) + pool.addTxsLocked(reinject, false) +} + // promoteExecutables moves transactions that have become processable from the // future queue to the set of pending transactions. During this process, all // invalidated transactions (low nonce, low balance) are deleted. -func (pool *TxPool) promoteExecutables(accounts []common.Address) { +func (pool *TxPool) promoteExecutables(accounts []common.Address) []*types.Transaction { // Track the promoted transactions to broadcast them at once var promoted []*types.Transaction - // Gather all the accounts potentially needing updates - if accounts == nil { - accounts = make([]common.Address, 0, len(pool.queue)) - for addr := range pool.queue { - accounts = append(accounts, addr) - } - } // Iterate over all accounts and promote any executable transactions for _, addr := range accounts { list := pool.queue[addr] @@ -1053,69 +1183,46 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) { delete(pool.queue, addr) } } - // Notify subsystem for new promoted transactions. - if len(promoted) > 0 { - go pool.txFeed.Send(NewTxsEvent{promoted}) - } - // If the pending limit is overflown, start equalizing allowances + return promoted +} + +// truncatePending removes transactions from the pending queue if the pool is above the +// pending limit. The algorithm tries to reduce transaction counts by an approximately +// equal number for all for accounts with many pending transactions. +func (pool *TxPool) truncatePending() { pending := uint64(0) for _, list := range pool.pending { pending += uint64(list.Len()) } - if pending > pool.config.GlobalSlots { - pendingBeforeCap := pending - // Assemble a spam order to penalize large transactors first - spammers := prque.New(nil) - for addr, list := range pool.pending { - // Only evict transactions from high rollers - if !pool.locals.contains(addr) && uint64(list.Len()) > pool.config.AccountSlots { - spammers.Push(addr, int64(list.Len())) - } + if pending <= pool.config.GlobalSlots { + return + } + + pendingBeforeCap := pending + // Assemble a spam order to penalize large transactors first + spammers := prque.New(nil) + for addr, list := range pool.pending { + // Only evict transactions from high rollers + if !pool.locals.contains(addr) && uint64(list.Len()) > pool.config.AccountSlots { + spammers.Push(addr, int64(list.Len())) } - // Gradually drop transactions from offenders - offenders := []common.Address{} - for pending > pool.config.GlobalSlots && !spammers.Empty() { - // Retrieve the next offender if not local address - offender, _ := spammers.Pop() - offenders = append(offenders, offender.(common.Address)) + } + // Gradually drop transactions from offenders + offenders := []common.Address{} + for pending > pool.config.GlobalSlots && !spammers.Empty() { + // Retrieve the next offender if not local address + offender, _ := spammers.Pop() + offenders = append(offenders, offender.(common.Address)) - // Equalize balances until all the same or below threshold - if len(offenders) > 1 { - // Calculate the equalization threshold for all current offenders - threshold := pool.pending[offender.(common.Address)].Len() + // Equalize balances until all the same or below threshold + if len(offenders) > 1 { + // Calculate the equalization threshold for all current offenders + threshold := pool.pending[offender.(common.Address)].Len() - // Iteratively reduce all offenders until below limit or threshold reached - for pending > pool.config.GlobalSlots && pool.pending[offenders[len(offenders)-2]].Len() > threshold { - for i := 0; i < len(offenders)-1; i++ { - list := pool.pending[offenders[i]] - - caps := list.Cap(list.Len() - 1) - for _, tx := range caps { - // Drop the transaction from the global pools too - hash := tx.Hash() - pool.all.Remove(hash) - - // Update the account nonce to the dropped transaction - if nonce := tx.Nonce(); pool.pendingState.GetNonce(offenders[i]) > nonce { - pool.pendingState.SetNonce(offenders[i], nonce) - } - log.Trace("Removed fairness-exceeding pending transaction", "hash", hash) - } - pool.priced.Removed(len(caps)) - pendingCounter.Dec(int64(len(caps))) - if pool.locals.contains(offenders[i]) { - localCounter.Dec(int64(len(caps))) - } - pending-- - } - } - } - } - // If still above threshold, reduce to limit or min allowance - if pending > pool.config.GlobalSlots && len(offenders) > 0 { - for pending > pool.config.GlobalSlots && uint64(pool.pending[offenders[len(offenders)-1]].Len()) > pool.config.AccountSlots { - for _, addr := range offenders { - list := pool.pending[addr] + // Iteratively reduce all offenders until below limit or threshold reached + for pending > pool.config.GlobalSlots && pool.pending[offenders[len(offenders)-2]].Len() > threshold { + for i := 0; i < len(offenders)-1; i++ { + list := pool.pending[offenders[i]] caps := list.Cap(list.Len() - 1) for _, tx := range caps { @@ -1124,60 +1231,93 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) { pool.all.Remove(hash) // Update the account nonce to the dropped transaction - if nonce := tx.Nonce(); pool.pendingState.GetNonce(addr) > nonce { - pool.pendingState.SetNonce(addr, nonce) + if nonce := tx.Nonce(); pool.pendingState.GetNonce(offenders[i]) > nonce { + pool.pendingState.SetNonce(offenders[i], nonce) } log.Trace("Removed fairness-exceeding pending transaction", "hash", hash) } pool.priced.Removed(len(caps)) pendingCounter.Dec(int64(len(caps))) - if pool.locals.contains(addr) { + if pool.locals.contains(offenders[i]) { localCounter.Dec(int64(len(caps))) } pending-- } } } - pendingRateLimitMeter.Mark(int64(pendingBeforeCap - pending)) } - // If we've queued more transactions than the hard limit, drop oldest ones + + // If still above threshold, reduce to limit or min allowance + if pending > pool.config.GlobalSlots && len(offenders) > 0 { + for pending > pool.config.GlobalSlots && uint64(pool.pending[offenders[len(offenders)-1]].Len()) > pool.config.AccountSlots { + for _, addr := range offenders { + list := pool.pending[addr] + + caps := list.Cap(list.Len() - 1) + for _, tx := range caps { + // Drop the transaction from the global pools too + hash := tx.Hash() + pool.all.Remove(hash) + + // Update the account nonce to the dropped transaction + if nonce := tx.Nonce(); pool.pendingState.GetNonce(addr) > nonce { + pool.pendingState.SetNonce(addr, nonce) + } + log.Trace("Removed fairness-exceeding pending transaction", "hash", hash) + } + pool.priced.Removed(len(caps)) + pendingCounter.Dec(int64(len(caps))) + if pool.locals.contains(addr) { + localCounter.Dec(int64(len(caps))) + } + pending-- + } + } + } + pendingRateLimitMeter.Mark(int64(pendingBeforeCap - pending)) +} + +// truncateQueue drops the oldes transactions in the queue if the pool is above the global queue limit. +func (pool *TxPool) truncateQueue() { queued := uint64(0) for _, list := range pool.queue { queued += uint64(list.Len()) } - if queued > pool.config.GlobalQueue { - // Sort all accounts with queued transactions by heartbeat - addresses := make(addressesByHeartbeat, 0, len(pool.queue)) - for addr := range pool.queue { - if !pool.locals.contains(addr) { // don't drop locals - addresses = append(addresses, addressByHeartbeat{addr, pool.beats[addr]}) - } + if queued <= pool.config.GlobalQueue { + return + } + + // Sort all accounts with queued transactions by heartbeat + addresses := make(addressesByHeartbeat, 0, len(pool.queue)) + for addr := range pool.queue { + if !pool.locals.contains(addr) { // don't drop locals + addresses = append(addresses, addressByHeartbeat{addr, pool.beats[addr]}) } - sort.Sort(addresses) + } + sort.Sort(addresses) - // Drop transactions until the total is below the limit or only locals remain - for drop := queued - pool.config.GlobalQueue; drop > 0 && len(addresses) > 0; { - addr := addresses[len(addresses)-1] - list := pool.queue[addr.address] + // Drop transactions until the total is below the limit or only locals remain + for drop := queued - pool.config.GlobalQueue; drop > 0 && len(addresses) > 0; { + addr := addresses[len(addresses)-1] + list := pool.queue[addr.address] - addresses = addresses[:len(addresses)-1] + addresses = addresses[:len(addresses)-1] - // Drop all transactions if they are less than the overflow - if size := uint64(list.Len()); size <= drop { - for _, tx := range list.Flatten() { - pool.removeTx(tx.Hash(), true) - } - drop -= size - queuedRateLimitMeter.Mark(int64(size)) - continue - } - // Otherwise drop only last few transactions - txs := list.Flatten() - for i := len(txs) - 1; i >= 0 && drop > 0; i-- { - pool.removeTx(txs[i].Hash(), true) - drop-- - queuedRateLimitMeter.Mark(1) + // Drop all transactions if they are less than the overflow + if size := uint64(list.Len()); size <= drop { + for _, tx := range list.Flatten() { + pool.removeTx(tx.Hash(), true) } + drop -= size + queuedRateLimitMeter.Mark(int64(size)) + continue + } + // Otherwise drop only last few transactions + txs := list.Flatten() + for i := len(txs) - 1; i >= 0 && drop > 0; i-- { + pool.removeTx(txs[i].Hash(), true) + drop-- + queuedRateLimitMeter.Mark(1) } } } @@ -1224,7 +1364,7 @@ func (pool *TxPool) demoteUnexecutables() { log.Error("Demoting invalidated transaction", "hash", hash) pool.enqueueTx(hash, tx) } - pendingCounter.Inc(int64(len(gapped))) + pendingCounter.Dec(int64(len(gapped))) } // Delete the entire queue entry if it became empty. if list.Empty() { @@ -1256,11 +1396,15 @@ type accountSet struct { // newAccountSet creates a new address set with an associated signer for sender // derivations. -func newAccountSet(signer types.Signer) *accountSet { - return &accountSet{ +func newAccountSet(signer types.Signer, addrs ...common.Address) *accountSet { + as := &accountSet{ accounts: make(map[common.Address]struct{}), signer: signer, } + for _, addr := range addrs { + as.add(addr) + } + return as } // contains checks if a given address is contained within the set. @@ -1284,6 +1428,13 @@ func (as *accountSet) add(addr common.Address) { as.cache = nil } +// addTx adds the sender of tx into the set. +func (as *accountSet) addTx(tx *types.Transaction) { + if addr, err := types.Sender(as.signer, tx); err == nil { + as.add(addr) + } +} + // flatten returns the list of addresses within this set, also caching it for later // reuse. The returned slice should not be changed! func (as *accountSet) flatten() []common.Address { @@ -1297,6 +1448,14 @@ func (as *accountSet) flatten() []common.Address { return *as.cache } +// merge adds all addresses from the 'other' set into 'as'. +func (as *accountSet) merge(other *accountSet) { + for addr := range other.accounts { + as.accounts[addr] = struct{}{} + } + as.cache = nil +} + // txLookup is used internally by TxPool to track transactions while allowing lookup without // mutex contention. // diff --git a/core/tx_pool_test.go b/core/tx_pool_test.go index 50c73cf535..7f1832a43f 100644 --- a/core/tx_pool_test.go +++ b/core/tx_pool_test.go @@ -200,7 +200,7 @@ func TestStateChangeDuringTransactionPoolReset(t *testing.T) { t.Fatalf("Invalid nonce, want 0, got %d", nonce) } - pool.AddRemotes(types.Transactions{tx0, tx1}) + pool.addRemotesSync([]*types.Transaction{tx0, tx1}) nonce = pool.State().GetNonce(address) if nonce != 2 { @@ -209,8 +209,7 @@ func TestStateChangeDuringTransactionPoolReset(t *testing.T) { // trigger state change in the background trigger = true - - pool.lockedReset(nil, nil) + <-pool.requestReset(nil, nil) _, err := pool.Pending() if err != nil { @@ -268,10 +267,10 @@ func TestTransactionQueue(t *testing.T) { tx := transaction(0, 100, key) from, _ := deriveSender(tx) pool.currentState.AddBalance(from, big.NewInt(1000)) - pool.lockedReset(nil, nil) - pool.enqueueTx(tx.Hash(), tx) + <-pool.requestReset(nil, nil) - pool.promoteExecutables([]common.Address{from}) + pool.enqueueTx(tx.Hash(), tx) + <-pool.requestPromoteExecutables(newAccountSet(pool.signer, from)) if len(pool.pending) != 1 { t.Error("expected valid txs to be 1 is", len(pool.pending)) } @@ -280,33 +279,36 @@ func TestTransactionQueue(t *testing.T) { from, _ = deriveSender(tx) pool.currentState.SetNonce(from, 2) pool.enqueueTx(tx.Hash(), tx) - pool.promoteExecutables([]common.Address{from}) + + <-pool.requestPromoteExecutables(newAccountSet(pool.signer, from)) if _, ok := pool.pending[from].txs.items[tx.Nonce()]; ok { t.Error("expected transaction to be in tx pool") } - if len(pool.queue) > 0 { t.Error("expected transaction queue to be empty. is", len(pool.queue)) } +} - pool, key = setupTxPool() +func TestTransactionQueue2(t *testing.T) { + t.Parallel() + + pool, key := setupTxPool() defer pool.Stop() tx1 := transaction(0, 100, key) tx2 := transaction(10, 100, key) tx3 := transaction(11, 100, key) - from, _ = deriveSender(tx1) + from, _ := deriveSender(tx1) pool.currentState.AddBalance(from, big.NewInt(1000)) - pool.lockedReset(nil, nil) + pool.reset(nil, nil) pool.enqueueTx(tx1.Hash(), tx1) pool.enqueueTx(tx2.Hash(), tx2) pool.enqueueTx(tx3.Hash(), tx3) pool.promoteExecutables([]common.Address{from}) - if len(pool.pending) != 1 { - t.Error("expected tx pool to be 1, got", len(pool.pending)) + t.Error("expected pending length to be 1, got", len(pool.pending)) } if pool.queue[from].Len() != 2 { t.Error("expected len(queue) == 2, got", pool.queue[from].Len()) @@ -339,7 +341,7 @@ func TestTransactionChainFork(t *testing.T) { statedb.AddBalance(addr, big.NewInt(100000000000000)) pool.chain = &testBlockChain{statedb, 1000000, new(event.Feed)} - pool.lockedReset(nil, nil) + <-pool.requestReset(nil, nil) } resetState() @@ -368,7 +370,7 @@ func TestTransactionDoubleNonce(t *testing.T) { statedb.AddBalance(addr, big.NewInt(100000000000000)) pool.chain = &testBlockChain{statedb, 1000000, new(event.Feed)} - pool.lockedReset(nil, nil) + <-pool.requestReset(nil, nil) } resetState() @@ -384,16 +386,17 @@ func TestTransactionDoubleNonce(t *testing.T) { if replace, err := pool.add(tx2, false); err != nil || !replace { t.Errorf("second transaction insert failed (%v) or not reported replacement (%v)", err, replace) } - pool.promoteExecutables([]common.Address{addr}) + <-pool.requestPromoteExecutables(newAccountSet(signer, addr)) if pool.pending[addr].Len() != 1 { t.Error("expected 1 pending transactions, got", pool.pending[addr].Len()) } if tx := pool.pending[addr].txs.items[0]; tx.Hash() != tx2.Hash() { t.Errorf("transaction mismatch: have %x, want %x", tx.Hash(), tx2.Hash()) } + // Add the third transaction and ensure it's not saved (smaller price) pool.add(tx3, false) - pool.promoteExecutables([]common.Address{addr}) + <-pool.requestPromoteExecutables(newAccountSet(signer, addr)) if pool.pending[addr].Len() != 1 { t.Error("expected 1 pending transactions, got", pool.pending[addr].Len()) } @@ -439,7 +442,7 @@ func TestTransactionNonceRecovery(t *testing.T) { addr := crypto.PubkeyToAddress(key.PublicKey) pool.currentState.SetNonce(addr, n) pool.currentState.AddBalance(addr, big.NewInt(100000000000000)) - pool.lockedReset(nil, nil) + <-pool.requestReset(nil, nil) tx := transaction(n, 100000, key) if err := pool.AddRemote(tx); err != nil { @@ -447,7 +450,7 @@ func TestTransactionNonceRecovery(t *testing.T) { } // simulate some weird re-order of transactions and missing nonce(s) pool.currentState.SetNonce(addr, n-1) - pool.lockedReset(nil, nil) + <-pool.requestReset(nil, nil) if fn := pool.pendingState.GetNonce(addr); fn != n-1 { t.Errorf("expected nonce to be %d, got %d", n-1, fn) } @@ -491,7 +494,7 @@ func TestTransactionDropping(t *testing.T) { if pool.all.Count() != 6 { t.Errorf("total transaction mismatch: have %d, want %d", pool.all.Count(), 6) } - pool.lockedReset(nil, nil) + <-pool.requestReset(nil, nil) if pool.pending[account].Len() != 3 { t.Errorf("pending transaction mismatch: have %d, want %d", pool.pending[account].Len(), 3) } @@ -503,7 +506,7 @@ func TestTransactionDropping(t *testing.T) { } // Reduce the balance of the account, and check that invalidated transactions are dropped pool.currentState.AddBalance(account, big.NewInt(-650)) - pool.lockedReset(nil, nil) + <-pool.requestReset(nil, nil) if _, ok := pool.pending[account].txs.items[tx0.Nonce()]; !ok { t.Errorf("funded pending transaction missing: %v", tx0) @@ -528,7 +531,7 @@ func TestTransactionDropping(t *testing.T) { } // Reduce the block gas limit, check that invalidated transactions are dropped pool.chain.(*testBlockChain).gasLimit = 100 - pool.lockedReset(nil, nil) + <-pool.requestReset(nil, nil) if _, ok := pool.pending[account].txs.items[tx0.Nonce()]; !ok { t.Errorf("funded pending transaction missing: %v", tx0) @@ -584,7 +587,7 @@ func TestTransactionPostponing(t *testing.T) { txs = append(txs, tx) } } - for i, err := range pool.AddRemotes(txs) { + for i, err := range pool.addRemotesSync(txs) { if err != nil { t.Fatalf("tx %d: failed to add transactions: %v", i, err) } @@ -599,7 +602,7 @@ func TestTransactionPostponing(t *testing.T) { if pool.all.Count() != len(txs) { t.Errorf("total transaction mismatch: have %d, want %d", pool.all.Count(), len(txs)) } - pool.lockedReset(nil, nil) + <-pool.requestReset(nil, nil) if pending := pool.pending[accs[0]].Len() + pool.pending[accs[1]].Len(); pending != len(txs) { t.Errorf("pending transaction mismatch: have %d, want %d", pending, len(txs)) } @@ -613,7 +616,7 @@ func TestTransactionPostponing(t *testing.T) { for _, addr := range accs { pool.currentState.AddBalance(addr, big.NewInt(-1)) } - pool.lockedReset(nil, nil) + <-pool.requestReset(nil, nil) // The first account's first transaction remains valid, check that subsequent // ones are either filtered out, or queued up for later. @@ -680,12 +683,10 @@ func TestTransactionGapFilling(t *testing.T) { defer sub.Unsubscribe() // Create a pending and a queued transaction with a nonce-gap in between - if err := pool.AddRemote(transaction(0, 100000, key)); err != nil { - t.Fatalf("failed to add pending transaction: %v", err) - } - if err := pool.AddRemote(transaction(2, 100000, key)); err != nil { - t.Fatalf("failed to add queued transaction: %v", err) - } + pool.addRemotesSync([]*types.Transaction{ + transaction(0, 100000, key), + transaction(2, 100000, key), + }) pending, queued := pool.Stats() if pending != 1 { t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 1) @@ -700,7 +701,7 @@ func TestTransactionGapFilling(t *testing.T) { t.Fatalf("pool internal state corrupted: %v", err) } // Fill the nonce gap and ensure all transactions become pending - if err := pool.AddRemote(transaction(1, 100000, key)); err != nil { + if err := pool.addRemoteSync(transaction(1, 100000, key)); err != nil { t.Fatalf("failed to add gapped transaction: %v", err) } pending, queued = pool.Stats() @@ -732,7 +733,7 @@ func TestTransactionQueueAccountLimiting(t *testing.T) { // Keep queuing up transactions and make sure all above a limit are dropped for i := uint64(1); i <= testTxPoolConfig.AccountQueue+5; i++ { - if err := pool.AddRemote(transaction(i, 100000, key)); err != nil { + if err := pool.addRemoteSync(transaction(i, 100000, key)); err != nil { t.Fatalf("tx %d: failed to add transaction: %v", i, err) } if len(pool.pending) != 0 { @@ -799,7 +800,7 @@ func testTransactionQueueGlobalLimiting(t *testing.T, nolocals bool) { nonces[addr]++ } // Import the batch and verify that limits have been enforced - pool.AddRemotes(txs) + pool.addRemotesSync(txs) queued := 0 for addr, list := range pool.queue { @@ -932,7 +933,7 @@ func TestTransactionPendingLimiting(t *testing.T) { // Keep queuing up transactions and make sure all above a limit are dropped for i := uint64(0); i < testTxPoolConfig.AccountQueue+5; i++ { - if err := pool.AddRemote(transaction(i, 100000, key)); err != nil { + if err := pool.addRemoteSync(transaction(i, 100000, key)); err != nil { t.Fatalf("tx %d: failed to add transaction: %v", i, err) } if pool.pending[account].Len() != int(i)+1 { @@ -953,57 +954,6 @@ func TestTransactionPendingLimiting(t *testing.T) { } } -// Tests that the transaction limits are enforced the same way irrelevant whether -// the transactions are added one by one or in batches. -func TestTransactionQueueLimitingEquivalency(t *testing.T) { testTransactionLimitingEquivalency(t, 1) } -func TestTransactionPendingLimitingEquivalency(t *testing.T) { testTransactionLimitingEquivalency(t, 0) } - -func testTransactionLimitingEquivalency(t *testing.T, origin uint64) { - t.Parallel() - - // Add a batch of transactions to a pool one by one - pool1, key1 := setupTxPool() - defer pool1.Stop() - - account1, _ := deriveSender(transaction(0, 0, key1)) - pool1.currentState.AddBalance(account1, big.NewInt(1000000)) - - for i := uint64(0); i < testTxPoolConfig.AccountQueue+5; i++ { - if err := pool1.AddRemote(transaction(origin+i, 100000, key1)); err != nil { - t.Fatalf("tx %d: failed to add transaction: %v", i, err) - } - } - // Add a batch of transactions to a pool in one big batch - pool2, key2 := setupTxPool() - defer pool2.Stop() - - account2, _ := deriveSender(transaction(0, 0, key2)) - pool2.currentState.AddBalance(account2, big.NewInt(1000000)) - - txs := []*types.Transaction{} - for i := uint64(0); i < testTxPoolConfig.AccountQueue+5; i++ { - txs = append(txs, transaction(origin+i, 100000, key2)) - } - pool2.AddRemotes(txs) - - // Ensure the batch optimization honors the same pool mechanics - if len(pool1.pending) != len(pool2.pending) { - t.Errorf("pending transaction count mismatch: one-by-one algo: %d, batch algo: %d", len(pool1.pending), len(pool2.pending)) - } - if len(pool1.queue) != len(pool2.queue) { - t.Errorf("queued transaction count mismatch: one-by-one algo: %d, batch algo: %d", len(pool1.queue), len(pool2.queue)) - } - if pool1.all.Count() != pool2.all.Count() { - t.Errorf("total transaction count mismatch: one-by-one algo %d, batch algo %d", pool1.all.Count(), pool2.all.Count()) - } - if err := validateTxPoolInternals(pool1); err != nil { - t.Errorf("pool 1 internal state corrupted: %v", err) - } - if err := validateTxPoolInternals(pool2); err != nil { - t.Errorf("pool 2 internal state corrupted: %v", err) - } -} - // Tests that if the transaction count belonging to multiple accounts go above // some hard threshold, the higher transactions are dropped to prevent DOS // attacks. @@ -1038,7 +988,7 @@ func TestTransactionPendingGlobalLimiting(t *testing.T) { } } // Import the batch and verify that limits have been enforced - pool.AddRemotes(txs) + pool.addRemotesSync(txs) pending := 0 for _, list := range pool.pending { @@ -1118,7 +1068,7 @@ func TestTransactionPendingMinimumAllowance(t *testing.T) { } } // Import the batch and verify that limits have been enforced - pool.AddRemotes(txs) + pool.addRemotesSync(txs) for addr, list := range pool.pending { if list.Len() != int(config.AccountSlots) { @@ -1174,7 +1124,7 @@ func TestTransactionPoolRepricing(t *testing.T) { ltx := pricedTransaction(0, 100000, big.NewInt(1), keys[3]) // Import the batch and that both pending and queued transactions match up - pool.AddRemotes(txs) + pool.addRemotesSync(txs) pool.AddLocal(ltx) pending, queued := pool.Stats() @@ -1454,7 +1404,7 @@ func TestTransactionPoolStableUnderpricing(t *testing.T) { for i := uint64(0); i < config.GlobalSlots; i++ { txs = append(txs, pricedTransaction(i, 100000, big.NewInt(1), keys[0])) } - pool.AddRemotes(txs) + pool.addRemotesSync(txs) pending, queued := pool.Stats() if pending != int(config.GlobalSlots) { @@ -1470,7 +1420,7 @@ func TestTransactionPoolStableUnderpricing(t *testing.T) { t.Fatalf("pool internal state corrupted: %v", err) } // Ensure that adding high priced transactions drops a cheap, but doesn't produce a gap - if err := pool.AddRemote(pricedTransaction(0, 100000, big.NewInt(3), keys[1])); err != nil { + if err := pool.addRemoteSync(pricedTransaction(0, 100000, big.NewInt(3), keys[1])); err != nil { t.Fatalf("failed to add well priced transaction: %v", err) } pending, queued = pool.Stats() @@ -1513,7 +1463,7 @@ func TestTransactionReplacement(t *testing.T) { price := int64(100) threshold := (price * (100 + int64(testTxPoolConfig.PriceBump))) / 100 - if err := pool.AddRemote(pricedTransaction(0, 100000, big.NewInt(1), key)); err != nil { + if err := pool.addRemoteSync(pricedTransaction(0, 100000, big.NewInt(1), key)); err != nil { t.Fatalf("failed to add original cheap pending transaction: %v", err) } if err := pool.AddRemote(pricedTransaction(0, 100001, big.NewInt(1), key)); err != ErrReplaceUnderpriced { @@ -1526,7 +1476,7 @@ func TestTransactionReplacement(t *testing.T) { t.Fatalf("cheap replacement event firing failed: %v", err) } - if err := pool.AddRemote(pricedTransaction(0, 100000, big.NewInt(price), key)); err != nil { + if err := pool.addRemoteSync(pricedTransaction(0, 100000, big.NewInt(price), key)); err != nil { t.Fatalf("failed to add original proper pending transaction: %v", err) } if err := pool.AddRemote(pricedTransaction(0, 100001, big.NewInt(threshold-1), key)); err != ErrReplaceUnderpriced { @@ -1538,6 +1488,7 @@ func TestTransactionReplacement(t *testing.T) { if err := validateEvents(events, 2); err != nil { t.Fatalf("proper replacement event firing failed: %v", err) } + // Add queued transactions, ensuring the minimum price bump is enforced for replacement (for ultra low prices too) if err := pool.AddRemote(pricedTransaction(2, 100000, big.NewInt(1), key)); err != nil { t.Fatalf("failed to add original cheap queued transaction: %v", err) @@ -1615,7 +1566,7 @@ func testTransactionJournaling(t *testing.T, nolocals bool) { if err := pool.AddLocal(pricedTransaction(2, 100000, big.NewInt(1), local)); err != nil { t.Fatalf("failed to add local transaction: %v", err) } - if err := pool.AddRemote(pricedTransaction(0, 100000, big.NewInt(1), remote)); err != nil { + if err := pool.addRemoteSync(pricedTransaction(0, 100000, big.NewInt(1), remote)); err != nil { t.Fatalf("failed to add remote transaction: %v", err) } pending, queued := pool.Stats() @@ -1653,7 +1604,7 @@ func testTransactionJournaling(t *testing.T, nolocals bool) { } // Bump the nonce temporarily and ensure the newly invalidated transaction is removed statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 2) - pool.lockedReset(nil, nil) + <-pool.requestReset(nil, nil) time.Sleep(2 * config.Rejournal) pool.Stop() @@ -1707,7 +1658,7 @@ func TestTransactionStatusCheck(t *testing.T) { txs = append(txs, pricedTransaction(2, 100000, big.NewInt(1), keys[2])) // Queued only // Import the transaction and ensure they are correctly added - pool.AddRemotes(txs) + pool.addRemotesSync(txs) pending, queued := pool.Stats() if pending != 2 { @@ -1786,26 +1737,6 @@ func benchmarkFuturePromotion(b *testing.B, size int) { } } -// Benchmarks the speed of iterative transaction insertion. -func BenchmarkPoolInsert(b *testing.B) { - // Generate a batch of transactions to enqueue into the pool - pool, key := setupTxPool() - defer pool.Stop() - - account, _ := deriveSender(transaction(0, 0, key)) - pool.currentState.AddBalance(account, big.NewInt(1000000)) - - txs := make(types.Transactions, b.N) - for i := 0; i < b.N; i++ { - txs[i] = transaction(uint64(i), 100000, key) - } - // Benchmark importing the transactions into the queue - b.ResetTimer() - for _, tx := range txs { - pool.AddRemote(tx) - } -} - // Benchmarks the speed of batched transaction insertion. func BenchmarkPoolBatchInsert100(b *testing.B) { benchmarkPoolBatchInsert(b, 100) } func BenchmarkPoolBatchInsert1000(b *testing.B) { benchmarkPoolBatchInsert(b, 1000) }