diff --git a/core/txpool/list.go b/core/txpool/list.go index 724bb6caca..639d69bcbe 100644 --- a/core/txpool/list.go +++ b/core/txpool/list.go @@ -508,10 +508,7 @@ func (h *priceHeap) Pop() interface{} { // the floating heap is better. When baseFee is decreasing they behave similarly. type pricedList struct { // Number of stale price points to (re-heap trigger). - // This field is accessed atomically, and must be the first field - // to ensure it has correct alignment for atomic.AddInt64. - // See https://golang.org/pkg/sync/atomic/#pkg-note-BUG. - stales int64 + stales atomic.Int64 all *lookup // Pointer to the map of all transactions urgent, floating priceHeap // Heaps of prices of all the stored **remote** transactions @@ -545,7 +542,7 @@ func (l *pricedList) Put(tx *types.Transaction, local bool) { // the heap if a large enough ratio of transactions go stale. func (l *pricedList) Removed(count int) { // Bump the stale counter, but exit if still too low (< 25%) - stales := atomic.AddInt64(&l.stales, int64(count)) + stales := l.stales.Add(int64(count)) if int(stales) <= (len(l.urgent.list)+len(l.floating.list))/4 { return } @@ -570,7 +567,7 @@ func (l *pricedList) underpricedFor(h *priceHeap, tx *types.Transaction) bool { for len(h.list) > 0 { head := h.list[0] if l.all.GetRemote(head.Hash()) == nil { // Removed or migrated - atomic.AddInt64(&l.stales, -1) + l.stales.Add(-1) heap.Pop(h) continue } @@ -597,7 +594,7 @@ func (l *pricedList) Discard(slots int, force bool) (types.Transactions, bool) { // Discard stale transactions if found during cleanup tx := heap.Pop(&l.urgent).(*types.Transaction) if l.all.GetRemote(tx.Hash()) == nil { // Removed or migrated - atomic.AddInt64(&l.stales, -1) + l.stales.Add(-1) continue } // Non stale transaction found, move to floating heap @@ -610,7 +607,7 @@ func (l *pricedList) Discard(slots int, force bool) (types.Transactions, bool) { // Discard stale transactions if found during cleanup tx := heap.Pop(&l.floating).(*types.Transaction) if l.all.GetRemote(tx.Hash()) == nil { // Removed or migrated - atomic.AddInt64(&l.stales, -1) + l.stales.Add(-1) continue } // Non stale transaction found, discard it @@ -633,7 +630,7 @@ func (l *pricedList) Reheap() { l.reheapMu.Lock() defer l.reheapMu.Unlock() start := time.Now() - atomic.StoreInt64(&l.stales, 0) + l.stales.Store(0) l.urgent.list = make([]*types.Transaction, 0, l.all.RemoteCount()) l.all.Range(func(hash common.Hash, tx *types.Transaction, local bool) bool { l.urgent.list = append(l.urgent.list, tx) diff --git a/core/txpool/txpool.go b/core/txpool/txpool.go index ad556f39fb..cc673ccc10 100644 --- a/core/txpool/txpool.go +++ b/core/txpool/txpool.go @@ -23,7 +23,6 @@ import ( "math/big" "sort" "sync" - "sync/atomic" "time" "github.com/ethereum/go-ethereum/common" @@ -383,7 +382,7 @@ func (pool *TxPool) loop() { pool.mu.RLock() pending, queued := pool.stats() pool.mu.RUnlock() - stales := int(atomic.LoadInt64(&pool.priced.stales)) + stales := int(pool.priced.stales.Load()) if pending != prevPending || queued != prevQueued || stales != prevStales { log.Debug("Transaction pool status report", "executable", pending, "queued", queued, "stales", stales) diff --git a/core/txpool/txpool2_test.go b/core/txpool/txpool2_test.go index 20d6dd713a..6d84975d83 100644 --- a/core/txpool/txpool2_test.go +++ b/core/txpool/txpool2_test.go @@ -79,7 +79,7 @@ func TestTransactionFutureAttack(t *testing.T) { // Create the pool to test the limit enforcement with statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil) - blockchain := &testBlockChain{1000000, statedb, new(event.Feed)} + blockchain := newTestBlockChain(1000000, statedb, new(event.Feed)) config := testTxPoolConfig config.GlobalQueue = 100 config.GlobalSlots = 100 @@ -115,7 +115,7 @@ func TestTransactionFuture1559(t *testing.T) { t.Parallel() // Create the pool to test the pricing enforcement with statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil) - blockchain := &testBlockChain{1000000, statedb, new(event.Feed)} + blockchain := newTestBlockChain(1000000, statedb, new(event.Feed)) pool := NewTxPool(testTxPoolConfig, eip1559Config, blockchain) defer pool.Stop() @@ -147,7 +147,7 @@ func TestTransactionZAttack(t *testing.T) { t.Parallel() // Create the pool to test the pricing enforcement with statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil) - blockchain := &testBlockChain{1000000, statedb, new(event.Feed)} + blockchain := newTestBlockChain(1000000, statedb, new(event.Feed)) pool := NewTxPool(testTxPoolConfig, eip1559Config, blockchain) defer pool.Stop() // Create a number of test accounts, fund them and make transactions diff --git a/core/txpool/txpool_test.go b/core/txpool/txpool_test.go index c4c62db2d6..7771c5f7cd 100644 --- a/core/txpool/txpool_test.go +++ b/core/txpool/txpool_test.go @@ -59,15 +59,21 @@ func init() { } type testBlockChain struct { - gasLimit uint64 // must be first field for 64 bit alignment (atomic access) + gasLimit atomic.Uint64 statedb *state.StateDB chainHeadFeed *event.Feed } +func newTestBlockChain(gasLimit uint64, statedb *state.StateDB, chainHeadFeed *event.Feed) *testBlockChain { + bc := testBlockChain{statedb: statedb, chainHeadFeed: new(event.Feed)} + bc.gasLimit.Store(gasLimit) + return &bc +} + func (bc *testBlockChain) CurrentBlock() *types.Header { return &types.Header{ Number: new(big.Int), - GasLimit: atomic.LoadUint64(&bc.gasLimit), + GasLimit: bc.gasLimit.Load(), } } @@ -121,7 +127,7 @@ func setupPool() (*TxPool, *ecdsa.PrivateKey) { func setupPoolWithConfig(config *params.ChainConfig) (*TxPool, *ecdsa.PrivateKey) { statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil) - blockchain := &testBlockChain{10000000, statedb, new(event.Feed)} + blockchain := newTestBlockChain(10000000, statedb, new(event.Feed)) key, _ := crypto.GenerateKey() pool := NewTxPool(testTxPoolConfig, config, blockchain) @@ -236,7 +242,7 @@ func TestStateChangeDuringReset(t *testing.T) { // setup pool with 2 transaction in it statedb.SetBalance(address, new(big.Int).SetUint64(params.Ether)) - blockchain := &testChain{&testBlockChain{1000000000, statedb, new(event.Feed)}, address, &trigger} + blockchain := &testChain{newTestBlockChain(1000000000, statedb, new(event.Feed)), address, &trigger} tx0 := transaction(0, 100000, key) tx1 := transaction(1, 100000, key) @@ -430,7 +436,7 @@ func TestChainFork(t *testing.T) { statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil) statedb.AddBalance(addr, big.NewInt(100000000000000)) - pool.chain = &testBlockChain{1000000, statedb, new(event.Feed)} + pool.chain = newTestBlockChain(1000000, statedb, new(event.Feed)) <-pool.requestReset(nil, nil) } resetState() @@ -459,7 +465,7 @@ func TestDoubleNonce(t *testing.T) { statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil) statedb.AddBalance(addr, big.NewInt(100000000000000)) - pool.chain = &testBlockChain{1000000, statedb, new(event.Feed)} + pool.chain = newTestBlockChain(1000000, statedb, new(event.Feed)) <-pool.requestReset(nil, nil) } resetState() @@ -629,7 +635,7 @@ func TestDropping(t *testing.T) { t.Errorf("total transaction mismatch: have %d, want %d", pool.all.Count(), 4) } // Reduce the block gas limit, check that invalidated transactions are dropped - atomic.StoreUint64(&pool.chain.(*testBlockChain).gasLimit, 100) + pool.chain.(*testBlockChain).gasLimit.Store(100) <-pool.requestReset(nil, nil) if _, ok := pool.pending[account].txs.items[tx0.Nonce()]; !ok { @@ -657,7 +663,7 @@ func TestPostponing(t *testing.T) { // Create the pool to test the postponing with statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil) - blockchain := &testBlockChain{1000000, statedb, new(event.Feed)} + blockchain := newTestBlockChain(1000000, statedb, new(event.Feed)) pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain) defer pool.Stop() @@ -869,7 +875,7 @@ func testQueueGlobalLimiting(t *testing.T, nolocals bool) { // Create the pool to test the limit enforcement with statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil) - blockchain := &testBlockChain{1000000, statedb, new(event.Feed)} + blockchain := newTestBlockChain(1000000, statedb, new(event.Feed)) config := testTxPoolConfig config.NoLocals = nolocals @@ -961,7 +967,7 @@ func testQueueTimeLimiting(t *testing.T, nolocals bool) { // Create the pool to test the non-expiration enforcement statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil) - blockchain := &testBlockChain{1000000, statedb, new(event.Feed)} + blockchain := newTestBlockChain(1000000, statedb, new(event.Feed)) config := testTxPoolConfig config.Lifetime = time.Second @@ -1146,7 +1152,7 @@ func TestPendingGlobalLimiting(t *testing.T) { // Create the pool to test the limit enforcement with statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil) - blockchain := &testBlockChain{1000000, statedb, new(event.Feed)} + blockchain := newTestBlockChain(1000000, statedb, new(event.Feed)) config := testTxPoolConfig config.GlobalSlots = config.AccountSlots * 10 @@ -1248,7 +1254,7 @@ func TestCapClearsFromAll(t *testing.T) { // Create the pool to test the limit enforcement with statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil) - blockchain := &testBlockChain{1000000, statedb, new(event.Feed)} + blockchain := newTestBlockChain(1000000, statedb, new(event.Feed)) config := testTxPoolConfig config.AccountSlots = 2 @@ -1282,7 +1288,7 @@ func TestPendingMinimumAllowance(t *testing.T) { // Create the pool to test the limit enforcement with statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil) - blockchain := &testBlockChain{1000000, statedb, new(event.Feed)} + blockchain := newTestBlockChain(1000000, statedb, new(event.Feed)) config := testTxPoolConfig config.GlobalSlots = 1 @@ -1330,7 +1336,7 @@ func TestRepricing(t *testing.T) { // Create the pool to test the pricing enforcement with statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil) - blockchain := &testBlockChain{1000000, statedb, new(event.Feed)} + blockchain := newTestBlockChain(1000000, statedb, new(event.Feed)) pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain) defer pool.Stop() @@ -1578,7 +1584,7 @@ func TestRepricingKeepsLocals(t *testing.T) { // Create the pool to test the pricing enforcement with statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil) - blockchain := &testBlockChain{1000000, statedb, new(event.Feed)} + blockchain := newTestBlockChain(1000000, statedb, new(event.Feed)) pool := NewTxPool(testTxPoolConfig, eip1559Config, blockchain) defer pool.Stop() @@ -1651,7 +1657,7 @@ func TestUnderpricing(t *testing.T) { // Create the pool to test the pricing enforcement with statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil) - blockchain := &testBlockChain{1000000, statedb, new(event.Feed)} + blockchain := newTestBlockChain(1000000, statedb, new(event.Feed)) config := testTxPoolConfig config.GlobalSlots = 2 @@ -1765,7 +1771,7 @@ func TestStableUnderpricing(t *testing.T) { // Create the pool to test the pricing enforcement with statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil) - blockchain := &testBlockChain{1000000, statedb, new(event.Feed)} + blockchain := newTestBlockChain(1000000, statedb, new(event.Feed)) config := testTxPoolConfig config.GlobalSlots = 128 @@ -1997,7 +2003,7 @@ func TestDeduplication(t *testing.T) { // Create the pool to test the pricing enforcement with statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil) - blockchain := &testBlockChain{1000000, statedb, new(event.Feed)} + blockchain := newTestBlockChain(1000000, statedb, new(event.Feed)) pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain) defer pool.Stop() @@ -2063,7 +2069,7 @@ func TestReplacement(t *testing.T) { // Create the pool to test the pricing enforcement with statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil) - blockchain := &testBlockChain{1000000, statedb, new(event.Feed)} + blockchain := newTestBlockChain(1000000, statedb, new(event.Feed)) pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain) defer pool.Stop() @@ -2268,7 +2274,7 @@ func testJournaling(t *testing.T, nolocals bool) { // Create the original pool to inject transaction into the journal statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil) - blockchain := &testBlockChain{1000000, statedb, new(event.Feed)} + blockchain := newTestBlockChain(1000000, statedb, new(event.Feed)) config := testTxPoolConfig config.NoLocals = nolocals @@ -2310,7 +2316,7 @@ func testJournaling(t *testing.T, nolocals bool) { // Terminate the old pool, bump the local nonce, create a new pool and ensure relevant transaction survive pool.Stop() statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 1) - blockchain = &testBlockChain{1000000, statedb, new(event.Feed)} + blockchain = newTestBlockChain(1000000, statedb, new(event.Feed)) pool = NewTxPool(config, params.TestChainConfig, blockchain) @@ -2337,7 +2343,7 @@ func testJournaling(t *testing.T, nolocals bool) { pool.Stop() statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 1) - blockchain = &testBlockChain{1000000, statedb, new(event.Feed)} + blockchain = newTestBlockChain(1000000, statedb, new(event.Feed)) pool = NewTxPool(config, params.TestChainConfig, blockchain) pending, queued = pool.Stats() @@ -2366,7 +2372,7 @@ func TestStatusCheck(t *testing.T) { // Create the pool to test the status retrievals with statedb, _ := state.New(common.Hash{}, state.NewDatabase(rawdb.NewMemoryDatabase()), nil) - blockchain := &testBlockChain{1000000, statedb, new(event.Feed)} + blockchain := newTestBlockChain(1000000, statedb, new(event.Feed)) pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain) defer pool.Stop()