core/types, miner: switch over to the grouped tx sets
This commit is contained in:
parent
0ef327bbee
commit
affffb39b3
|
@ -426,41 +426,58 @@ func (s *TxByPrice) Pop() interface{} {
|
|||
return x
|
||||
}
|
||||
|
||||
// SortByPriceAndNonce sorts the transactions by price in such a way that the
|
||||
// nonce orderings within a single account are maintained.
|
||||
// TransactionsByPriceAndNonce represents a set of transactions that can return
|
||||
// transactions in a profit-maximising sorted order, while supporting removing
|
||||
// entire batches of transactions for non-executable accounts.
|
||||
type TransactionsByPriceAndNonce struct {
|
||||
txs map[common.Address]Transactions // Per account nonce-sorted list of transactions
|
||||
heads TxByPrice // Next transaction for each unique account (price heap)
|
||||
}
|
||||
|
||||
// NewTransactionsByPriceAndNonce creates a transaction set that can retrieve
|
||||
// price sorted transactions in a nonce-honouring way.
|
||||
//
|
||||
// Note, this is not as trivial as it seems from the first look as there are three
|
||||
// different criteria that need to be taken into account (price, nonce, account
|
||||
// match), which cannot be done with any plain sorting method, as certain items
|
||||
// cannot be compared without context.
|
||||
//
|
||||
// This method first sorts the separates the list of transactions into individual
|
||||
// sender accounts and sorts them by nonce. After the account nonce ordering is
|
||||
// satisfied, the results are merged back together by price, always comparing only
|
||||
// the head transaction from each account. This is done via a heap to keep it fast.
|
||||
func SortByPriceAndNonce(txs map[common.Address]Transactions) Transactions {
|
||||
// Note, the input map is reowned so the caller should not interact any more with
|
||||
// if after providng it to the constructor.
|
||||
func NewTransactionsByPriceAndNonce(txs map[common.Address]Transactions) *TransactionsByPriceAndNonce {
|
||||
// Initialize a price based heap with the head transactions
|
||||
byPrice := make(TxByPrice, 0, len(txs))
|
||||
heads := make(TxByPrice, 0, len(txs))
|
||||
for acc, accTxs := range txs {
|
||||
byPrice = append(byPrice, accTxs[0])
|
||||
heads = append(heads, accTxs[0])
|
||||
txs[acc] = accTxs[1:]
|
||||
}
|
||||
heap.Init(&byPrice)
|
||||
heap.Init(&heads)
|
||||
|
||||
// Merge by replacing the best with the next from the same account
|
||||
var sorted Transactions
|
||||
for len(byPrice) > 0 {
|
||||
// Retrieve the next best transaction by price
|
||||
best := heap.Pop(&byPrice).(*Transaction)
|
||||
|
||||
// Push in its place the next transaction from the same account
|
||||
acc, _ := best.From() // we only sort valid txs so this cannot fail
|
||||
if accTxs, ok := txs[acc]; ok && len(accTxs) > 0 {
|
||||
heap.Push(&byPrice, accTxs[0])
|
||||
txs[acc] = accTxs[1:]
|
||||
}
|
||||
// Accumulate the best priced transaction
|
||||
sorted = append(sorted, best)
|
||||
// Assemble and return the transaction set
|
||||
return &TransactionsByPriceAndNonce{
|
||||
txs: txs,
|
||||
heads: heads,
|
||||
}
|
||||
return sorted
|
||||
}
|
||||
|
||||
// Peek returns the next transaction by price.
|
||||
func (t *TransactionsByPriceAndNonce) Peek() *Transaction {
|
||||
if len(t.heads) == 0 {
|
||||
return nil
|
||||
}
|
||||
return t.heads[0]
|
||||
}
|
||||
|
||||
// Shift replaces the current best head with the next one from the same account.
|
||||
func (t *TransactionsByPriceAndNonce) Shift() {
|
||||
acc, _ := t.heads[0].From() // we only sort valid txs so this cannot fail
|
||||
|
||||
if txs, ok := t.txs[acc]; ok && len(txs) > 0 {
|
||||
t.heads[0], t.txs[acc] = txs[0], txs[1:]
|
||||
heap.Fix(&t.heads, 0)
|
||||
} else {
|
||||
heap.Pop(&t.heads)
|
||||
}
|
||||
}
|
||||
|
||||
// Pop removes the best transaction, *not* replacing it with the next one from
|
||||
// the same account. This should be used when a transaction cannot be executed
|
||||
// and hence all subsequent ones should be discarded from the same account.
|
||||
func (t *TransactionsByPriceAndNonce) Pop() {
|
||||
heap.Pop(&t.heads)
|
||||
}
|
||||
|
|
|
@ -137,7 +137,16 @@ func TestTransactionPriceNonceSort(t *testing.T) {
|
|||
}
|
||||
}
|
||||
// Sort the transactions and cross check the nonce ordering
|
||||
txs := SortByPriceAndNonce(groups)
|
||||
txset := NewTransactionsByPriceAndNonce(groups)
|
||||
|
||||
txs := Transactions{}
|
||||
for {
|
||||
if tx := txset.Peek(); tx != nil {
|
||||
txs = append(txs, tx)
|
||||
txset.Shift()
|
||||
}
|
||||
break
|
||||
}
|
||||
for i, txi := range txs {
|
||||
fromi, _ := txi.From()
|
||||
|
||||
|
|
127
miner/worker.go
127
miner/worker.go
|
@ -63,18 +63,16 @@ type uint64RingBuffer struct {
|
|||
// Work is the workers current environment and holds
|
||||
// all of the current state information
|
||||
type Work struct {
|
||||
config *core.ChainConfig
|
||||
state *state.StateDB // apply state changes here
|
||||
ancestors *set.Set // ancestor set (used for checking uncle parent validity)
|
||||
family *set.Set // family set (used for checking uncle invalidity)
|
||||
uncles *set.Set // uncle set
|
||||
tcount int // tx count in cycle
|
||||
ignoredTransactors *set.Set
|
||||
lowGasTransactors *set.Set
|
||||
ownedAccounts *set.Set
|
||||
lowGasTxs types.Transactions
|
||||
failedTxs types.Transactions
|
||||
localMinedBlocks *uint64RingBuffer // the most recent block numbers that were mined locally (used to check block inclusion)
|
||||
config *core.ChainConfig
|
||||
state *state.StateDB // apply state changes here
|
||||
ancestors *set.Set // ancestor set (used for checking uncle parent validity)
|
||||
family *set.Set // family set (used for checking uncle invalidity)
|
||||
uncles *set.Set // uncle set
|
||||
tcount int // tx count in cycle
|
||||
ownedAccounts *set.Set
|
||||
lowGasTxs types.Transactions
|
||||
failedTxs types.Transactions
|
||||
localMinedBlocks *uint64RingBuffer // the most recent block numbers that were mined locally (used to check block inclusion)
|
||||
|
||||
Block *types.Block // the new block
|
||||
|
||||
|
@ -236,7 +234,12 @@ func (self *worker) update() {
|
|||
// Apply transaction to the pending state if we're not mining
|
||||
if atomic.LoadInt32(&self.mining) == 0 {
|
||||
self.currentMu.Lock()
|
||||
self.current.commitTransactions(self.mux, types.Transactions{ev.Tx}, self.gasPrice, self.chain)
|
||||
|
||||
acc, _ := ev.Tx.From()
|
||||
txs := map[common.Address]types.Transactions{acc: types.Transactions{ev.Tx}}
|
||||
txset := types.NewTransactionsByPriceAndNonce(txs)
|
||||
|
||||
self.current.commitTransactions(self.mux, txset, self.gasPrice, self.chain)
|
||||
self.currentMu.Unlock()
|
||||
}
|
||||
}
|
||||
|
@ -384,8 +387,6 @@ func (self *worker) makeCurrent(parent *types.Block, header *types.Header) error
|
|||
|
||||
// Keep track of transactions which return errors so they can be removed
|
||||
work.tcount = 0
|
||||
work.ignoredTransactors = set.New()
|
||||
work.lowGasTransactors = set.New()
|
||||
work.ownedAccounts = accountAddressesSet(accounts)
|
||||
if self.current != nil {
|
||||
work.localMinedBlocks = self.current.localMinedBlocks
|
||||
|
@ -494,43 +495,8 @@ func (self *worker) commitNewWork() {
|
|||
if self.config.DAOForkSupport && self.config.DAOForkBlock != nil && self.config.DAOForkBlock.Cmp(header.Number) == 0 {
|
||||
core.ApplyDAOHardFork(work.state)
|
||||
}
|
||||
|
||||
/* //approach 1
|
||||
transactions := self.eth.TxPool().GetTransactions()
|
||||
sort.Sort(types.TxByNonce(transactions))
|
||||
*/
|
||||
|
||||
//approach 2
|
||||
transactions := types.SortByPriceAndNonce(self.eth.TxPool().Pending())
|
||||
|
||||
/* // approach 3
|
||||
// commit transactions for this run.
|
||||
txPerOwner := make(map[common.Address]types.Transactions)
|
||||
// Sort transactions by owner
|
||||
for _, tx := range self.eth.TxPool().GetTransactions() {
|
||||
from, _ := tx.From() // we can ignore the sender error
|
||||
txPerOwner[from] = append(txPerOwner[from], tx)
|
||||
}
|
||||
var (
|
||||
singleTxOwner types.Transactions
|
||||
multiTxOwner types.Transactions
|
||||
)
|
||||
// Categorise transactions by
|
||||
// 1. 1 owner tx per block
|
||||
// 2. multi txs owner per block
|
||||
for _, txs := range txPerOwner {
|
||||
if len(txs) == 1 {
|
||||
singleTxOwner = append(singleTxOwner, txs[0])
|
||||
} else {
|
||||
multiTxOwner = append(multiTxOwner, txs...)
|
||||
}
|
||||
}
|
||||
sort.Sort(types.TxByPrice(singleTxOwner))
|
||||
sort.Sort(types.TxByNonce(multiTxOwner))
|
||||
transactions := append(singleTxOwner, multiTxOwner...)
|
||||
*/
|
||||
|
||||
work.commitTransactions(self.mux, transactions, self.gasPrice, self.chain)
|
||||
txs := types.NewTransactionsByPriceAndNonce(self.eth.TxPool().Pending())
|
||||
work.commitTransactions(self.mux, txs, self.gasPrice, self.chain)
|
||||
|
||||
self.eth.TxPool().RemoveBatch(work.lowGasTxs)
|
||||
self.eth.TxPool().RemoveBatch(work.failedTxs)
|
||||
|
@ -591,64 +557,51 @@ func (self *worker) commitUncle(work *Work, uncle *types.Header) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (env *Work) commitTransactions(mux *event.TypeMux, transactions types.Transactions, gasPrice *big.Int, bc *core.BlockChain) {
|
||||
func (env *Work) commitTransactions(mux *event.TypeMux, txs *types.TransactionsByPriceAndNonce, gasPrice *big.Int, bc *core.BlockChain) {
|
||||
gp := new(core.GasPool).AddGas(env.header.GasLimit)
|
||||
|
||||
var coalescedLogs vm.Logs
|
||||
for _, tx := range transactions {
|
||||
for {
|
||||
// Retrieve the next transaction and abort if all done
|
||||
tx := txs.Peek()
|
||||
if tx == nil {
|
||||
break
|
||||
}
|
||||
// Error may be ignored here. The error has already been checked
|
||||
// during transaction acceptance is the transaction pool.
|
||||
from, _ := tx.From()
|
||||
|
||||
// Check if it falls within margin. Txs from owned accounts are always processed.
|
||||
// Ignore any transactions (and accounts subsequently) with low gas limits
|
||||
if tx.GasPrice().Cmp(gasPrice) < 0 && !env.ownedAccounts.Has(from) {
|
||||
// ignore the transaction and transactor. We ignore the transactor
|
||||
// because nonce will fail after ignoring this transaction so there's
|
||||
// no point
|
||||
env.lowGasTransactors.Add(from)
|
||||
// Pop the current low-priced transaction without shifting in the next from the account
|
||||
glog.V(logger.Info).Infof("Transaction (%x) below gas price (tx=%v ask=%v). All sequential txs from this address(%x) will be ignored\n", tx.Hash().Bytes()[:4], common.CurrencyToString(tx.GasPrice()), common.CurrencyToString(gasPrice), from[:4])
|
||||
|
||||
glog.V(logger.Info).Infof("transaction(%x) below gas price (tx=%v ask=%v). All sequential txs from this address(%x) will be ignored\n", tx.Hash().Bytes()[:4], common.CurrencyToString(tx.GasPrice()), common.CurrencyToString(gasPrice), from[:4])
|
||||
}
|
||||
env.lowGasTxs = append(env.lowGasTxs, tx)
|
||||
txs.Pop()
|
||||
|
||||
// Continue with the next transaction if the transaction sender is included in
|
||||
// the low gas tx set. This will also remove the tx and all sequential transaction
|
||||
// from this transactor
|
||||
if env.lowGasTransactors.Has(from) {
|
||||
// add tx to the low gas set. This will be removed at the end of the run
|
||||
// owned accounts are ignored
|
||||
if !env.ownedAccounts.Has(from) {
|
||||
env.lowGasTxs = append(env.lowGasTxs, tx)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// Move on to the next transaction when the transactor is in ignored transactions set
|
||||
// This may occur when a transaction hits the gas limit. When a gas limit is hit and
|
||||
// the transaction is processed (that could potentially be included in the block) it
|
||||
// will throw a nonce error because the previous transaction hasn't been processed.
|
||||
// Therefor we need to ignore any transaction after the ignored one.
|
||||
if env.ignoredTransactors.Has(from) {
|
||||
continue
|
||||
}
|
||||
|
||||
// Start executing the transaction
|
||||
env.state.StartRecord(tx.Hash(), common.Hash{}, 0)
|
||||
|
||||
err, logs := env.commitTransaction(tx, bc, gp)
|
||||
switch {
|
||||
case core.IsGasLimitErr(err):
|
||||
// ignore the transactor so no nonce errors will be thrown for this account
|
||||
// next time the worker is run, they'll be picked up again.
|
||||
env.ignoredTransactors.Add(from)
|
||||
// Pop the current out-of-gas transaction without shifting in the next from the account
|
||||
glog.V(logger.Detail).Infof("Gas limit reached for (%x) in this block. Continue to try smaller txs\n", from[:4])
|
||||
txs.Pop()
|
||||
|
||||
case err != nil:
|
||||
// Pop the current failed transaction without shifting in the next from the account
|
||||
glog.V(logger.Detail).Infof("Transaction (%x) failed, will be removed: %v\n", tx.Hash().Bytes()[:4], err)
|
||||
env.failedTxs = append(env.failedTxs, tx)
|
||||
if glog.V(logger.Detail) {
|
||||
glog.Infof("TX (%x) failed, will be removed: %v\n", tx.Hash().Bytes()[:4], err)
|
||||
}
|
||||
txs.Pop()
|
||||
|
||||
default:
|
||||
env.tcount++
|
||||
// Everything ok, collect the logs and shift in the next transaction from the same account
|
||||
coalescedLogs = append(coalescedLogs, logs...)
|
||||
env.tcount++
|
||||
txs.Shift()
|
||||
}
|
||||
}
|
||||
if len(coalescedLogs) > 0 || env.tcount > 0 {
|
||||
|
|
Loading…
Reference in New Issue