2023-07-27 05:45:35 -05:00
// Copyright 2022 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
// Package blobpool implements the EIP-4844 blob transaction pool.
package blobpool
import (
"container/heap"
"fmt"
"math"
"math/big"
"os"
"path/filepath"
"sort"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/misc"
2023-07-27 06:53:05 -05:00
"github.com/ethereum/go-ethereum/consensus/misc/eip4844"
2023-07-27 05:45:35 -05:00
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto/kzg4844"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
"github.com/holiman/billy"
"github.com/holiman/uint256"
)
const (
// blobSize is the protocol constrained byte size of a single blob in a
// transaction. There can be multiple of these embedded into a single tx.
blobSize = params . BlobTxFieldElementsPerBlob * params . BlobTxBytesPerFieldElement
// maxBlobsPerTransaction is the maximum number of blobs a single transaction
// is allowed to contain. Whilst the spec states it's unlimited, the block
// data slots are protocol bound, which implicitly also limit this.
maxBlobsPerTransaction = params . BlobTxMaxDataGasPerBlock / params . BlobTxDataGasPerBlob
// txAvgSize is an approximate byte size of a transaction metadata to avoid
// tiny overflows causing all txs to move a shelf higher, wasting disk space.
txAvgSize = 4 * 1024
// txMaxSize is the maximum size a single transaction can have, outside
// the included blobs. Since blob transactions are pulled instead of pushed,
// and only a small metadata is kept in ram, the rest is on disk, there is
// no critical limit that should be enforced. Still, capping it to some sane
// limit can never hurt.
txMaxSize = 1024 * 1024
// maxTxsPerAccount is the maximum number of blob transactions admitted from
// a single account. The limit is enforced to minimize the DoS potential of
// a private tx cancelling publicly propagated blobs.
//
// Note, transactions resurrected by a reorg are also subject to this limit,
// so pushing it down too aggressively might make resurrections non-functional.
maxTxsPerAccount = 16
// pendingTransactionStore is the subfolder containing the currently queued
// blob transactions.
pendingTransactionStore = "queue"
// limboedTransactionStore is the subfolder containing the currently included
// but not yet finalized transaction blobs.
limboedTransactionStore = "limbo"
)
// blobTx is a wrapper around types.BlobTx which also contains the literal blob
// data along with all the transaction metadata.
type blobTx struct {
Tx * types . Transaction
Blobs [ ] kzg4844 . Blob
Commits [ ] kzg4844 . Commitment
Proofs [ ] kzg4844 . Proof
}
// blobTxMeta is the minimal subset of types.BlobTx necessary to validate and
// schedule the blob transactions into the following blocks. Only ever add the
// bare minimum needed fields to keep the size down (and thus number of entries
// larger with the same memory consumption).
type blobTxMeta struct {
hash common . Hash // Transaction hash to maintain the lookup table
id uint64 // Storage ID in the pool's persistent store
size uint32 // Byte size in the pool's persistent store
nonce uint64 // Needed to prioritize inclusion order within an account
costCap * uint256 . Int // Needed to validate cumulative balance sufficiency
execTipCap * uint256 . Int // Needed to prioritize inclusion order across accounts and validate replacement price bump
execFeeCap * uint256 . Int // Needed to validate replacement price bump
blobFeeCap * uint256 . Int // Needed to validate replacement price bump
basefeeJumps float64 // Absolute number of 1559 fee adjustments needed to reach the tx's fee cap
blobfeeJumps float64 // Absolute number of 4844 fee adjustments needed to reach the tx's blob fee cap
evictionExecTip * uint256 . Int // Worst gas tip across all previous nonces
evictionExecFeeJumps float64 // Worst base fee (converted to fee jumps) across all previous nonces
evictionBlobFeeJumps float64 // Worse blob fee (converted to fee jumps) across all previous nonces
}
// newBlobTxMeta retrieves the indexed metadata fields from a blob transaction
// and assembles a helper struct to track in memory.
func newBlobTxMeta ( id uint64 , size uint32 , tx * types . Transaction ) * blobTxMeta {
meta := & blobTxMeta {
hash : tx . Hash ( ) ,
id : id ,
size : size ,
nonce : tx . Nonce ( ) ,
costCap : uint256 . MustFromBig ( tx . Cost ( ) ) ,
execTipCap : uint256 . MustFromBig ( tx . GasTipCap ( ) ) ,
execFeeCap : uint256 . MustFromBig ( tx . GasFeeCap ( ) ) ,
blobFeeCap : uint256 . MustFromBig ( tx . BlobGasFeeCap ( ) ) ,
}
meta . basefeeJumps = dynamicFeeJumps ( meta . execFeeCap )
meta . blobfeeJumps = dynamicFeeJumps ( meta . blobFeeCap )
return meta
}
// BlobPool is the transaction pool dedicated to EIP-4844 blob transactions.
//
// Blob transactions are special snowflakes that are designed for a very specific
// purpose (rollups) and are expected to adhere to that specific use case. These
// behavioural expectations allow us to design a transaction pool that is more robust
// (i.e. resending issues) and more resilient to DoS attacks (e.g. replace-flush
// attacks) than the generic tx pool. These improvements will also mean, however,
// that we enforce a significantly more aggressive strategy on entering and exiting
// the pool:
//
// - Blob transactions are large. With the initial design aiming for 128KB blobs,
// we must ensure that these only traverse the network the absolute minimum
// number of times. Broadcasting to sqrt(peers) is out of the question, rather
// these should only ever be announced and the remote side should request it if
// it wants to.
//
// - Block blob-space is limited. With blocks being capped to a few blob txs, we
// can make use of the very low expected churn rate within the pool. Notably,
// we should be able to use a persistent disk backend for the pool, solving
// the tx resend issue that plagues the generic tx pool, as long as there's no
// artificial churn (i.e. pool wars).
//
// - Purpose of blobs are layer-2s. Layer-2s are meant to use blob transactions to
// commit to their own current state, which is independent of Ethereum mainnet
// (state, txs). This means that there's no reason for blob tx cancellation or
// replacement, apart from a potential basefee / miner tip adjustment.
//
// - Replacements are expensive. Given their size, propagating a replacement
// blob transaction to an existing one should be aggressively discouraged.
// Whilst generic transactions can start at 1 Wei gas cost and require a 10%
// fee bump to replace, we suggest requiring a higher min cost (e.g. 1 gwei)
// and a more aggressive bump (100%).
//
// - Cancellation is prohibitive. Evicting an already propagated blob tx is a huge
// DoS vector. As such, a) replacement (higher-fee) blob txs mustn't invalidate
// already propagated (future) blob txs (cumulative fee); b) nonce-gapped blob
// txs are disallowed; c) the presence of blob transactions exclude non-blob
// transactions.
//
// - Malicious cancellations are possible. Although the pool might prevent txs
// that cancel blobs, blocks might contain such transaction (malicious miner
// or flashbotter). The pool should cap the total number of blob transactions
// per account as to prevent propagating too much data before cancelling it
// via a normal transaction. It should nonetheless be high enough to support
// resurrecting reorged transactions. Perhaps 4-16.
//
// - Local txs are meaningless. Mining pools historically used local transactions
// for payouts or for backdoor deals. With 1559 in place, the basefee usually
// dominates the final price, so 0 or non-0 tip doesn't change much. Blob txs
// retain the 1559 2D gas pricing (and introduce on top a dynamic data gas fee),
// so locality is moot. With a disk backed blob pool avoiding the resend issue,
// there's also no need to save own transactions for later.
//
// - No-blob blob-txs are bad. Theoretically there's no strong reason to disallow
// blob txs containing 0 blobs. In practice, admitting such txs into the pool
// breaks the low-churn invariant as blob constraints don't apply anymore. Even
// though we could accept blocks containing such txs, a reorg would require moving
// them back into the blob pool, which can break invariants.
//
// - Dropping blobs needs delay. When normal transactions are included, they
// are immediately evicted from the pool since they are contained in the
// including block. Blobs however are not included in the execution chain,
// so a mini reorg cannot re-pool "lost" blob transactions. To support reorgs,
// blobs are retained on disk until they are finalised.
//
// - Blobs can arrive via flashbots. Blocks might contain blob transactions we
// have never seen on the network. Since we cannot recover them from blocks
// either, the engine_newPayload needs to give them to us, and we cache them
// until finality to support reorgs without tx losses.
//
// Whilst some constraints above might sound overly aggressive, the general idea is
// that the blob pool should work robustly for its intended use case and whilst
// anyone is free to use blob transactions for arbitrary non-rollup use cases,
// they should not be allowed to run amok the network.
//
// Implementation wise there are a few interesting design choices:
//
// - Adding a transaction to the pool blocks until persisted to disk. This is
// viable because TPS is low (2-4 blobs per block initially, maybe 8-16 at
// peak), so natural churn is a couple MB per block. Replacements doing O(n)
// updates are forbidden and transaction propagation is pull based (i.e. no
// pileup of pending data).
//
// - When transactions are chosen for inclusion, the primary criteria is the
// signer tip (and having a basefee/data fee high enough of course). However,
// same-tip transactions will be split by their basefee/datafee, preferring
// those that are closer to the current network limits. The idea being that
// very relaxed ones can be included even if the fees go up, when the closer
// ones could already be invalid.
//
// When the pool eventually reaches saturation, some old transactions - that may
// never execute - will need to be evicted in favor of newer ones. The eviction
// strategy is quite complex:
//
// - Exceeding capacity evicts the highest-nonce of the account with the lowest
// paying blob transaction anywhere in the pooled nonce-sequence, as that tx
// would be executed the furthest in the future and is thus blocking anything
// after it. The smallest is deliberately not evicted to avoid a nonce-gap.
//
// - Analogously, if the pool is full, the consideration price of a new tx for
// evicting an old one is the smallest price in the entire nonce-sequence of
// the account. This avoids malicious users DoSing the pool with seemingly
// high paying transactions hidden behind a low-paying blocked one.
//
// - Since blob transactions have 3 price parameters: execution tip, execution
// fee cap and data fee cap, there's no singular parameter to create a total
// price ordering on. What's more, since the base fee and blob fee can move
// independently of one another, there's no pre-defined way to combine them
// into a stable order either. This leads to a multi-dimensional problem to
// solve after every block.
//
// - The first observation is that comparing 1559 base fees or 4844 blob fees
// needs to happen in the context of their dynamism. Since these fees jump
// up or down in ~1.125 multipliers (at max) across blocks, comparing fees
// in two transactions should be based on log1.125(fee) to eliminate noise.
//
// - The second observation is that the basefee and blobfee move independently,
// so there's no way to split mixed txs on their own (A has higher base fee,
// B has higher blob fee). Rather than look at the absolute fees, the useful
// metric is the max time it can take to exceed the transaction's fee caps.
// Specifically, we're interested in the number of jumps needed to go from
// the current fee to the transaction's cap:
//
// jumps = log1.125(txfee) - log1.125(basefee)
//
// - The third observation is that the base fee tends to hover around rather
// than swing wildly. The number of jumps needed from the current fee starts
// to get less relevant the higher it is. To remove the noise here too, the
// pool will use log(jumps) as the delta for comparing transactions.
//
// delta = sign(jumps) * log(abs(jumps))
//
// - To establish a total order, we need to reduce the dimensionality of the
// two base fees (log jumps) to a single value. The interesting aspect from
// the pool's perspective is how fast will a tx get executable (fees going
// down, crossing the smaller negative jump counter) or non-executable (fees
// going up, crossing the smaller positive jump counter). As such, the pool
// cares only about the min of the two delta values for eviction priority.
//
// priority = min(delta-basefee, delta-blobfee)
//
// - The above very aggressive dimensionality and noise reduction should result
// in transaction being grouped into a small number of buckets, the further
// the fees the larger the buckets. This is good because it allows us to use
// the miner tip meaningfully as a splitter.
//
// - For the scenario where the pool does not contain non-executable blob txs
// anymore, it does not make sense to grant a later eviction priority to txs
// with high fee caps since it could enable pool wars. As such, any positive
// priority will be grouped together.
//
// priority = min(delta-basefee, delta-blobfee, 0)
//
// Optimisation tradeoffs:
//
// - Eviction relies on 3 fee minimums per account (exec tip, exec cap and blob
// cap). Maintaining these values across all transactions from the account is
// problematic as each transaction replacement or inclusion would require a
// rescan of all other transactions to recalculate the minimum. Instead, the
// pool maintains a rolling minimum across the nonce range. Updating all the
// minimums will need to be done only starting at the swapped in/out nonce
// and leading up to the first no-change.
type BlobPool struct {
config Config // Pool configuration
reserve txpool . AddressReserver // Address reserver to ensure exclusivity across subpools
store billy . Database // Persistent data store for the tx metadata and blobs
stored uint64 // Useful data size of all transactions on disk
limbo * limbo // Persistent data store for the non-finalized blobs
signer types . Signer // Transaction signer to use for sender recovery
chain BlockChain // Chain object to access the state through
head * types . Header // Current head of the chain
state * state . StateDB // Current state at the head of the chain
gasTip * uint256 . Int // Currently accepted minimum gas tip
lookup map [ common . Hash ] uint64 // Lookup table mapping hashes to tx billy entries
index map [ common . Address ] [ ] * blobTxMeta // Blob transactions grouped by accounts, sorted by nonce
spent map [ common . Address ] * uint256 . Int // Expenditure tracking for individual accounts
evict * evictHeap // Heap of cheapest accounts for eviction when full
eventFeed event . Feed // Event feed to send out new tx events on pool inclusion
eventScope event . SubscriptionScope // Event scope to track and mass unsubscribe on termination
lock sync . RWMutex // Mutex protecting the pool during reorg handling
}
// New creates a new blob transaction pool to gather, sort and filter inbound
// blob transactions from the network.
func New ( config Config , chain BlockChain ) * BlobPool {
// Sanitize the input to ensure no vulnerable gas prices are set
config = ( & config ) . sanitize ( )
// Create the transaction pool with its initial settings
return & BlobPool {
config : config ,
signer : types . LatestSigner ( chain . Config ( ) ) ,
chain : chain ,
lookup : make ( map [ common . Hash ] uint64 ) ,
index : make ( map [ common . Address ] [ ] * blobTxMeta ) ,
spent : make ( map [ common . Address ] * uint256 . Int ) ,
}
}
// Filter returns whether the given transaction can be consumed by the blob pool.
func ( p * BlobPool ) Filter ( tx * types . Transaction ) bool {
return tx . Type ( ) == types . BlobTxType
}
// Init sets the gas price needed to keep a transaction in the pool and the chain
// head to allow balance / nonce checks. The transaction journal will be loaded
// from disk and filtered based on the provided starting settings.
func ( p * BlobPool ) Init ( gasTip * big . Int , head * types . Header , reserve txpool . AddressReserver ) error {
p . reserve = reserve
var (
queuedir string
limbodir string
)
if p . config . Datadir != "" {
queuedir = filepath . Join ( p . config . Datadir , pendingTransactionStore )
if err := os . MkdirAll ( queuedir , 0700 ) ; err != nil {
return err
}
limbodir = filepath . Join ( p . config . Datadir , limboedTransactionStore )
if err := os . MkdirAll ( limbodir , 0700 ) ; err != nil {
return err
}
}
state , err := p . chain . StateAt ( head . Root )
if err != nil {
return err
}
p . head , p . state = head , state
// Index all transactions on disk and delete anything inprocessable
var fails [ ] uint64
index := func ( id uint64 , size uint32 , blob [ ] byte ) {
if p . parseTransaction ( id , size , blob ) != nil {
fails = append ( fails , id )
}
}
store , err := billy . Open ( billy . Options { Path : queuedir } , newSlotter ( ) , index )
if err != nil {
return err
}
p . store = store
if len ( fails ) > 0 {
log . Warn ( "Dropping invalidated blob transactions" , "ids" , fails )
for _ , id := range fails {
if err := p . store . Delete ( id ) ; err != nil {
p . Close ( )
return err
}
}
}
// Sort the indexed transactions by nonce and delete anything gapped, create
// the eviction heap of anyone still standing
for addr := range p . index {
p . recheck ( addr , nil )
}
var (
basefee = uint256 . MustFromBig ( misc . CalcBaseFee ( p . chain . Config ( ) , p . head ) )
blobfee = uint256 . MustFromBig ( big . NewInt ( params . BlobTxMinDataGasprice ) )
)
if p . head . ExcessDataGas != nil {
2023-07-27 06:53:05 -05:00
blobfee = uint256 . MustFromBig ( eip4844 . CalcBlobFee ( * p . head . ExcessDataGas ) )
2023-07-27 05:45:35 -05:00
}
p . evict = newPriceHeap ( basefee , blobfee , & p . index )
// Pool initialized, attach the blob limbo to it to track blobs included
// recently but not yet finalized
p . limbo , err = newLimbo ( limbodir )
if err != nil {
p . Close ( )
return err
}
// Set the configured gas tip, triggering a filtering of anything just loaded
basefeeGauge . Update ( int64 ( basefee . Uint64 ( ) ) )
blobfeeGauge . Update ( int64 ( blobfee . Uint64 ( ) ) )
p . SetGasTip ( gasTip )
// Since the user might have modified their pool's capacity, evict anything
// above the current allowance
for p . stored > p . config . Datacap {
p . drop ( )
}
// Update the metrics and return the constructed pool
datacapGauge . Update ( int64 ( p . config . Datacap ) )
p . updateStorageMetrics ( )
return nil
}
// Close closes down the underlying persistent store.
func ( p * BlobPool ) Close ( ) error {
var errs [ ] error
if err := p . limbo . Close ( ) ; err != nil {
errs = append ( errs , err )
}
if err := p . store . Close ( ) ; err != nil {
errs = append ( errs , err )
}
p . eventScope . Close ( )
switch {
case errs == nil :
return nil
case len ( errs ) == 1 :
return errs [ 0 ]
default :
return fmt . Errorf ( "%v" , errs )
}
}
// parseTransaction is a callback method on pool creation that gets called for
// each transaction on disk to create the in-memory metadata index.
func ( p * BlobPool ) parseTransaction ( id uint64 , size uint32 , blob [ ] byte ) error {
item := new ( blobTx )
if err := rlp . DecodeBytes ( blob , item ) ; err != nil {
// This path is impossible unless the disk data representation changes
// across restarts. For that ever unprobable case, recover gracefully
// by ignoring this data entry.
log . Error ( "Failed to decode blob pool entry" , "id" , id , "err" , err )
return err
}
meta := newBlobTxMeta ( id , size , item . Tx )
sender , err := p . signer . Sender ( item . Tx )
if err != nil {
// This path is impossible unless the signature validity changes across
// restarts. For that ever unprobable case, recover gracefully by ignoring
// this data entry.
log . Error ( "Failed to recover blob tx sender" , "id" , id , "hash" , item . Tx . Hash ( ) , "err" , err )
return err
}
if _ , ok := p . index [ sender ] ; ! ok {
if err := p . reserve ( sender , true ) ; err != nil {
return err
}
p . index [ sender ] = [ ] * blobTxMeta { }
p . spent [ sender ] = new ( uint256 . Int )
}
p . index [ sender ] = append ( p . index [ sender ] , meta )
p . spent [ sender ] = new ( uint256 . Int ) . Add ( p . spent [ sender ] , meta . costCap )
p . lookup [ meta . hash ] = meta . id
p . stored += uint64 ( meta . size )
return nil
}
// recheck verifies the pool's content for a specific account and drops anything
// that does not fit anymore (dangling or filled nonce, overdraft).
func ( p * BlobPool ) recheck ( addr common . Address , inclusions map [ common . Hash ] uint64 ) {
// Sort the transactions belonging to the account so reinjects can be simpler
txs := p . index [ addr ]
if inclusions != nil && txs == nil { // during reorgs, we might find new accounts
return
}
sort . Slice ( txs , func ( i , j int ) bool {
return txs [ i ] . nonce < txs [ j ] . nonce
} )
// If there is a gap between the chain state and the blob pool, drop
// all the transactions as they are non-executable. Similarly, if the
// entire tx range was included, drop all.
var (
next = p . state . GetNonce ( addr )
gapped = txs [ 0 ] . nonce > next
filled = txs [ len ( txs ) - 1 ] . nonce < next
)
if gapped || filled {
var (
ids [ ] uint64
nonces [ ] uint64
)
for i := 0 ; i < len ( txs ) ; i ++ {
ids = append ( ids , txs [ i ] . id )
nonces = append ( nonces , txs [ i ] . nonce )
p . stored -= uint64 ( txs [ i ] . size )
delete ( p . lookup , txs [ i ] . hash )
// Included transactions blobs need to be moved to the limbo
if filled && inclusions != nil {
p . offload ( addr , txs [ i ] . nonce , txs [ i ] . id , inclusions )
}
}
delete ( p . index , addr )
delete ( p . spent , addr )
if inclusions != nil { // only during reorgs will the heap will be initialized
heap . Remove ( p . evict , p . evict . index [ addr ] )
}
p . reserve ( addr , false )
if gapped {
log . Warn ( "Dropping dangling blob transactions" , "from" , addr , "missing" , next , "drop" , nonces , "ids" , ids )
} else {
log . Trace ( "Dropping filled blob transactions" , "from" , addr , "filled" , nonces , "ids" , ids )
}
for _ , id := range ids {
if err := p . store . Delete ( id ) ; err != nil {
log . Error ( "Failed to delete blob transaction" , "from" , addr , "id" , id , "err" , err )
}
}
return
}
// If there is overlap between the chain state and the blob pool, drop
// anything below the current state
if txs [ 0 ] . nonce < next {
var (
ids [ ] uint64
nonces [ ] uint64
)
for txs [ 0 ] . nonce < next {
ids = append ( ids , txs [ 0 ] . id )
nonces = append ( nonces , txs [ 0 ] . nonce )
p . spent [ addr ] = new ( uint256 . Int ) . Sub ( p . spent [ addr ] , txs [ 0 ] . costCap )
p . stored -= uint64 ( txs [ 0 ] . size )
delete ( p . lookup , txs [ 0 ] . hash )
// Included transactions blobs need to be moved to the limbo
if inclusions != nil {
p . offload ( addr , txs [ 0 ] . nonce , txs [ 0 ] . id , inclusions )
}
txs = txs [ 1 : ]
}
log . Trace ( "Dropping overlapped blob transactions" , "from" , addr , "overlapped" , nonces , "ids" , ids , "left" , len ( txs ) )
for _ , id := range ids {
if err := p . store . Delete ( id ) ; err != nil {
log . Error ( "Failed to delete blob transaction" , "from" , addr , "id" , id , "err" , err )
}
}
p . index [ addr ] = txs
}
// Iterate over the transactions to initialize their eviction thresholds
// and to detect any nonce gaps
txs [ 0 ] . evictionExecTip = txs [ 0 ] . execTipCap
txs [ 0 ] . evictionExecFeeJumps = txs [ 0 ] . basefeeJumps
txs [ 0 ] . evictionBlobFeeJumps = txs [ 0 ] . blobfeeJumps
for i := 1 ; i < len ( txs ) ; i ++ {
// If there's no nonce gap, initialize the evicion thresholds as the
// minimum between the cumulative thresholds and the current tx fees
if txs [ i ] . nonce == txs [ i - 1 ] . nonce + 1 {
txs [ i ] . evictionExecTip = txs [ i - 1 ] . evictionExecTip
if txs [ i ] . evictionExecTip . Cmp ( txs [ i ] . execTipCap ) > 0 {
txs [ i ] . evictionExecTip = txs [ i ] . execTipCap
}
txs [ i ] . evictionExecFeeJumps = txs [ i - 1 ] . evictionExecFeeJumps
if txs [ i ] . evictionExecFeeJumps > txs [ i ] . basefeeJumps {
txs [ i ] . evictionExecFeeJumps = txs [ i ] . basefeeJumps
}
txs [ i ] . evictionBlobFeeJumps = txs [ i - 1 ] . evictionBlobFeeJumps
if txs [ i ] . evictionBlobFeeJumps > txs [ i ] . blobfeeJumps {
txs [ i ] . evictionBlobFeeJumps = txs [ i ] . blobfeeJumps
}
continue
}
// Sanity check that there's no double nonce. This case would be a coding
// error, but better know about it
if txs [ i ] . nonce == txs [ i - 1 ] . nonce {
log . Error ( "Duplicate nonce blob transaction" , "from" , addr , "nonce" , txs [ i ] . nonce )
}
// Otherwise if there's a nonce gap evict all later transactions
var (
ids [ ] uint64
nonces [ ] uint64
)
for j := i ; j < len ( txs ) ; j ++ {
ids = append ( ids , txs [ j ] . id )
nonces = append ( nonces , txs [ j ] . nonce )
p . spent [ addr ] = new ( uint256 . Int ) . Sub ( p . spent [ addr ] , txs [ j ] . costCap )
p . stored -= uint64 ( txs [ j ] . size )
delete ( p . lookup , txs [ j ] . hash )
}
txs = txs [ : i ]
log . Error ( "Dropping gapped blob transactions" , "from" , addr , "missing" , txs [ i - 1 ] . nonce + 1 , "drop" , nonces , "ids" , ids )
for _ , id := range ids {
if err := p . store . Delete ( id ) ; err != nil {
log . Error ( "Failed to delete blob transaction" , "from" , addr , "id" , id , "err" , err )
}
}
p . index [ addr ] = txs
break
}
// Ensure that there's no over-draft, this is expected to happen when some
// transactions get included without publishing on the network
var (
balance = uint256 . MustFromBig ( p . state . GetBalance ( addr ) )
spent = p . spent [ addr ]
)
if spent . Cmp ( balance ) > 0 {
// Evict the highest nonce transactions until the pending set falls under
// the account's available balance
var (
ids [ ] uint64
nonces [ ] uint64
)
for p . spent [ addr ] . Cmp ( balance ) > 0 {
last := txs [ len ( txs ) - 1 ]
txs [ len ( txs ) - 1 ] = nil
txs = txs [ : len ( txs ) - 1 ]
ids = append ( ids , last . id )
nonces = append ( nonces , last . nonce )
p . spent [ addr ] = new ( uint256 . Int ) . Sub ( p . spent [ addr ] , last . costCap )
p . stored -= uint64 ( last . size )
delete ( p . lookup , last . hash )
}
if len ( txs ) == 0 {
delete ( p . index , addr )
delete ( p . spent , addr )
if inclusions != nil { // only during reorgs will the heap will be initialized
heap . Remove ( p . evict , p . evict . index [ addr ] )
}
p . reserve ( addr , false )
} else {
p . index [ addr ] = txs
}
log . Warn ( "Dropping overdrafted blob transactions" , "from" , addr , "balance" , balance , "spent" , spent , "drop" , nonces , "ids" , ids )
for _ , id := range ids {
if err := p . store . Delete ( id ) ; err != nil {
log . Error ( "Failed to delete blob transaction" , "from" , addr , "id" , id , "err" , err )
}
}
}
// Sanity check that no account can have more queued transactions than the
// DoS protection threshold.
if len ( txs ) > maxTxsPerAccount {
// Evict the highest nonce transactions until the pending set falls under
// the account's transaction cap
var (
ids [ ] uint64
nonces [ ] uint64
)
for len ( txs ) > maxTxsPerAccount {
last := txs [ len ( txs ) - 1 ]
txs [ len ( txs ) - 1 ] = nil
txs = txs [ : len ( txs ) - 1 ]
ids = append ( ids , last . id )
nonces = append ( nonces , last . nonce )
p . spent [ addr ] = new ( uint256 . Int ) . Sub ( p . spent [ addr ] , last . costCap )
p . stored -= uint64 ( last . size )
delete ( p . lookup , last . hash )
}
p . index [ addr ] = txs
log . Warn ( "Dropping overcapped blob transactions" , "from" , addr , "kept" , len ( txs ) , "drop" , nonces , "ids" , ids )
for _ , id := range ids {
if err := p . store . Delete ( id ) ; err != nil {
log . Error ( "Failed to delete blob transaction" , "from" , addr , "id" , id , "err" , err )
}
}
}
// Included cheap transactions might have left the remaining ones better from
// an eviction point, fix any potential issues in the heap.
if _ , ok := p . index [ addr ] ; ok && inclusions != nil {
heap . Fix ( p . evict , p . evict . index [ addr ] )
}
}
// offload removes a tracked blob transaction from the pool and moves it into the
// limbo for tracking until finality.
//
// The method may log errors for various unexpcted scenarios but will not return
// any of it since there's no clear error case. Some errors may be due to coding
// issues, others caused by signers mining MEV stuff or swapping transactions. In
// all cases, the pool needs to continue operating.
func ( p * BlobPool ) offload ( addr common . Address , nonce uint64 , id uint64 , inclusions map [ common . Hash ] uint64 ) {
data , err := p . store . Get ( id )
if err != nil {
log . Error ( "Blobs missing for included transaction" , "from" , addr , "nonce" , nonce , "id" , id , "err" , err )
return
}
item := new ( blobTx )
if err = rlp . DecodeBytes ( data , item ) ; err != nil {
log . Error ( "Blobs corrupted for included transaction" , "from" , addr , "nonce" , nonce , "id" , id , "err" , err )
return
}
block , ok := inclusions [ item . Tx . Hash ( ) ]
if ! ok {
log . Warn ( "Blob transaction swapped out by signer" , "from" , addr , "nonce" , nonce , "id" , id )
return
}
if err := p . limbo . push ( item . Tx . Hash ( ) , block , item . Blobs , item . Commits , item . Proofs ) ; err != nil {
log . Warn ( "Failed to offload blob tx into limbo" , "err" , err )
return
}
}
// Reset implements txpool.SubPool, allowing the blob pool's internal state to be
// kept in sync with the main transacion pool's internal state.
func ( p * BlobPool ) Reset ( oldHead , newHead * types . Header ) {
waitStart := time . Now ( )
p . lock . Lock ( )
resetwaitHist . Update ( time . Since ( waitStart ) . Nanoseconds ( ) )
defer p . lock . Unlock ( )
defer func ( start time . Time ) {
resettimeHist . Update ( time . Since ( start ) . Nanoseconds ( ) )
} ( time . Now ( ) )
statedb , err := p . chain . StateAt ( newHead . Root )
if err != nil {
log . Error ( "Failed to reset blobpool state" , "err" , err )
return
}
p . head = newHead
p . state = statedb
// Run the reorg between the old and new head and figure out which accounts
// need to be rechecked and which transactions need to be readded
if reinject , inclusions := p . reorg ( oldHead , newHead ) ; reinject != nil {
for addr , txs := range reinject {
// Blindly push all the lost transactions back into the pool
for _ , tx := range txs {
p . reinject ( addr , tx )
}
// Recheck the account's pooled transactions to drop included and
// invalidated one
p . recheck ( addr , inclusions )
}
}
// Flush out any blobs from limbo that are older than the latest finality
p . limbo . finalize ( p . chain . CurrentFinalBlock ( ) )
// Reset the price heap for the new set of basefee/blobfee pairs
var (
basefee = uint256 . MustFromBig ( misc . CalcBaseFee ( p . chain . Config ( ) , newHead ) )
blobfee = uint256 . MustFromBig ( big . NewInt ( params . BlobTxMinDataGasprice ) )
)
if newHead . ExcessDataGas != nil {
2023-07-27 06:53:05 -05:00
blobfee = uint256 . MustFromBig ( eip4844 . CalcBlobFee ( * newHead . ExcessDataGas ) )
2023-07-27 05:45:35 -05:00
}
p . evict . reinit ( basefee , blobfee , false )
basefeeGauge . Update ( int64 ( basefee . Uint64 ( ) ) )
blobfeeGauge . Update ( int64 ( blobfee . Uint64 ( ) ) )
p . updateStorageMetrics ( )
}
// reorg assembles all the transactors and missing transactions between an old
// and new head to figure out which account's tx set needs to be rechecked and
// which transactions need to be requeued.
//
// The transactionblock inclusion infos are also returned to allow tracking any
// just-included blocks by block number in the limbo.
func ( p * BlobPool ) reorg ( oldHead , newHead * types . Header ) ( map [ common . Address ] [ ] * types . Transaction , map [ common . Hash ] uint64 ) {
// If the pool was not yet initialized, don't do anything
if oldHead == nil {
return nil , nil
}
// If the reorg is too deep, avoid doing it (will happen during snap sync)
oldNum := oldHead . Number . Uint64 ( )
newNum := newHead . Number . Uint64 ( )
if depth := uint64 ( math . Abs ( float64 ( oldNum ) - float64 ( newNum ) ) ) ; depth > 64 {
return nil , nil
}
// Reorg seems shallow enough to pull in all transactions into memory
var (
transactors = make ( map [ common . Address ] struct { } )
discarded = make ( map [ common . Address ] [ ] * types . Transaction )
included = make ( map [ common . Address ] [ ] * types . Transaction )
inclusions = make ( map [ common . Hash ] uint64 )
rem = p . chain . GetBlock ( oldHead . Hash ( ) , oldHead . Number . Uint64 ( ) )
add = p . chain . GetBlock ( newHead . Hash ( ) , newHead . Number . Uint64 ( ) )
)
if add == nil {
// if the new head is nil, it means that something happened between
// the firing of newhead-event and _now_: most likely a
// reorg caused by sync-reversion or explicit sethead back to an
// earlier block.
log . Warn ( "Blobpool reset with missing new head" , "number" , newHead . Number , "hash" , newHead . Hash ( ) )
return nil , nil
}
if rem == nil {
// This can happen if a setHead is performed, where we simply discard
// the old head from the chain. If that is the case, we don't have the
// lost transactions anymore, and there's nothing to add.
if newNum >= oldNum {
// If we reorged to a same or higher number, then it's not a case
// of setHead
log . Warn ( "Blobpool reset with missing old head" ,
"old" , oldHead . Hash ( ) , "oldnum" , oldNum , "new" , newHead . Hash ( ) , "newnum" , newNum )
return nil , nil
}
// If the reorg ended up on a lower number, it's indicative of setHead
// being the cause
log . Debug ( "Skipping blobpool reset caused by setHead" ,
"old" , oldHead . Hash ( ) , "oldnum" , oldNum , "new" , newHead . Hash ( ) , "newnum" , newNum )
return nil , nil
}
// Both old and new blocks exist, traverse through the progression chain
// and accumulate the transactors and transactions
for rem . NumberU64 ( ) > add . NumberU64 ( ) {
for _ , tx := range rem . Transactions ( ) {
from , _ := p . signer . Sender ( tx )
discarded [ from ] = append ( discarded [ from ] , tx )
transactors [ from ] = struct { } { }
}
if rem = p . chain . GetBlock ( rem . ParentHash ( ) , rem . NumberU64 ( ) - 1 ) ; rem == nil {
log . Error ( "Unrooted old chain seen by blobpool" , "block" , oldHead . Number , "hash" , oldHead . Hash ( ) )
return nil , nil
}
}
for add . NumberU64 ( ) > rem . NumberU64 ( ) {
for _ , tx := range add . Transactions ( ) {
from , _ := p . signer . Sender ( tx )
included [ from ] = append ( included [ from ] , tx )
inclusions [ tx . Hash ( ) ] = add . NumberU64 ( )
transactors [ from ] = struct { } { }
}
if add = p . chain . GetBlock ( add . ParentHash ( ) , add . NumberU64 ( ) - 1 ) ; add == nil {
log . Error ( "Unrooted new chain seen by blobpool" , "block" , newHead . Number , "hash" , newHead . Hash ( ) )
return nil , nil
}
}
for rem . Hash ( ) != add . Hash ( ) {
for _ , tx := range rem . Transactions ( ) {
from , _ := p . signer . Sender ( tx )
discarded [ from ] = append ( discarded [ from ] , tx )
transactors [ from ] = struct { } { }
}
if rem = p . chain . GetBlock ( rem . ParentHash ( ) , rem . NumberU64 ( ) - 1 ) ; rem == nil {
log . Error ( "Unrooted old chain seen by blobpool" , "block" , oldHead . Number , "hash" , oldHead . Hash ( ) )
return nil , nil
}
for _ , tx := range add . Transactions ( ) {
from , _ := p . signer . Sender ( tx )
included [ from ] = append ( included [ from ] , tx )
inclusions [ tx . Hash ( ) ] = add . NumberU64 ( )
transactors [ from ] = struct { } { }
}
if add = p . chain . GetBlock ( add . ParentHash ( ) , add . NumberU64 ( ) - 1 ) ; add == nil {
log . Error ( "Unrooted new chain seen by blobpool" , "block" , newHead . Number , "hash" , newHead . Hash ( ) )
return nil , nil
}
}
// Generate the set of transactions per address to pull back into the pool,
// also updating the rest along the way
reinject := make ( map [ common . Address ] [ ] * types . Transaction )
for addr := range transactors {
// Generate the set that was lost to reinject into the pool
lost := make ( [ ] * types . Transaction , 0 , len ( discarded [ addr ] ) )
for _ , tx := range types . TxDifference ( discarded [ addr ] , included [ addr ] ) {
if p . Filter ( tx ) {
lost = append ( lost , tx )
}
}
reinject [ addr ] = lost
// Update the set that was already reincluded to track the blocks in limbo
for _ , tx := range types . TxDifference ( included [ addr ] , discarded [ addr ] ) {
if p . Filter ( tx ) {
p . limbo . update ( tx . Hash ( ) , inclusions [ tx . Hash ( ) ] )
}
}
}
return reinject , inclusions
}
// reinject blindly pushes a transaction previously included in the chain - and
// just reorged out - into the pool. The transaction is assumed valid (having
// been in the chain), thus the only validation needed is nonce sorting and over-
// draft checks after injection.
//
// Note, the method will not initialize the eviction cache values as those will
// be done once for all transactions belonging to an account after all individual
// transactions are injected back into the pool.
func ( p * BlobPool ) reinject ( addr common . Address , tx * types . Transaction ) {
// Retrieve the associated blob from the limbo. Without the blobs, we cannot
// add the transaction back into the pool as it is not mineable.
blobs , commits , proofs , err := p . limbo . pull ( tx . Hash ( ) )
if err != nil {
log . Error ( "Blobs unavailable, dropping reorged tx" , "err" , err )
return
}
// Serialize the transaction back into the primary datastore
blob , err := rlp . EncodeToBytes ( & blobTx { Tx : tx , Blobs : blobs , Commits : commits , Proofs : proofs } )
if err != nil {
log . Error ( "Failed to encode transaction for storage" , "hash" , tx . Hash ( ) , "err" , err )
return
}
id , err := p . store . Put ( blob )
if err != nil {
log . Error ( "Failed to write transaction into storage" , "hash" , tx . Hash ( ) , "err" , err )
return
}
// Update the indixes and metrics
meta := newBlobTxMeta ( id , p . store . Size ( id ) , tx )
if _ , ok := p . index [ addr ] ; ! ok {
if err := p . reserve ( addr , true ) ; err != nil {
log . Warn ( "Failed to reserve account for blob pool" , "tx" , tx . Hash ( ) , "from" , addr , "err" , err )
return
}
p . index [ addr ] = [ ] * blobTxMeta { meta }
p . spent [ addr ] = meta . costCap
p . evict . Push ( addr )
} else {
p . index [ addr ] = append ( p . index [ addr ] , meta )
p . spent [ addr ] = new ( uint256 . Int ) . Add ( p . spent [ addr ] , meta . costCap )
}
p . lookup [ meta . hash ] = meta . id
p . stored += uint64 ( meta . size )
}
// SetGasTip implements txpool.SubPool, allowing the blob pool's gas requirements
// to be kept in sync with the main transacion pool's gas requirements.
func ( p * BlobPool ) SetGasTip ( tip * big . Int ) {
p . lock . Lock ( )
defer p . lock . Unlock ( )
// Store the new minimum gas tip
old := p . gasTip
p . gasTip = uint256 . MustFromBig ( tip )
// If the min miner fee increased, remove transactions below the new threshold
if old == nil || p . gasTip . Cmp ( old ) > 0 {
for addr , txs := range p . index {
for i , tx := range txs {
if tx . execTipCap . Cmp ( p . gasTip ) < 0 {
// Drop the offending transaction
var (
ids = [ ] uint64 { tx . id }
nonces = [ ] uint64 { tx . nonce }
)
p . spent [ addr ] = new ( uint256 . Int ) . Sub ( p . spent [ addr ] , txs [ i ] . costCap )
p . stored -= uint64 ( tx . size )
delete ( p . lookup , tx . hash )
txs [ i ] = nil
// Drop everything afterwards, no gaps allowed
for j , tx := range txs [ i + 1 : ] {
ids = append ( ids , tx . id )
nonces = append ( nonces , tx . nonce )
p . spent [ addr ] = new ( uint256 . Int ) . Sub ( p . spent [ addr ] , tx . costCap )
p . stored -= uint64 ( tx . size )
delete ( p . lookup , tx . hash )
txs [ i + 1 + j ] = nil
}
// Clear out the dropped transactions from the index
if i > 0 {
p . index [ addr ] = txs [ : i ]
heap . Fix ( p . evict , p . evict . index [ addr ] )
} else {
delete ( p . index , addr )
delete ( p . spent , addr )
heap . Remove ( p . evict , p . evict . index [ addr ] )
p . reserve ( addr , false )
}
// Clear out the transactions from the data store
log . Warn ( "Dropping underpriced blob transaction" , "from" , addr , "rejected" , tx . nonce , "tip" , tx . execTipCap , "want" , tip , "drop" , nonces , "ids" , ids )
for _ , id := range ids {
if err := p . store . Delete ( id ) ; err != nil {
log . Error ( "Failed to delete dropped transaction" , "id" , id , "err" , err )
}
}
break
}
}
}
}
log . Debug ( "Blobpool tip threshold updated" , "tip" , tip )
pooltipGauge . Update ( tip . Int64 ( ) )
p . updateStorageMetrics ( )
}
// validateTx checks whether a transaction is valid according to the consensus
// rules and adheres to some heuristic limits of the local node (price and size).
func ( p * BlobPool ) validateTx ( tx * types . Transaction , blobs [ ] kzg4844 . Blob , commits [ ] kzg4844 . Commitment , proofs [ ] kzg4844 . Proof ) error {
// Ensure the transaction adheres to basic pool filters (type, size, tip) and
// consensus rules
baseOpts := & txpool . ValidationOptions {
Config : p . chain . Config ( ) ,
Accept : 1 << types . BlobTxType ,
MaxSize : txMaxSize ,
MinTip : p . gasTip . ToBig ( ) ,
}
if err := txpool . ValidateTransaction ( tx , blobs , commits , proofs , p . head , p . signer , baseOpts ) ; err != nil {
return err
}
// Ensure the transaction adheres to the stateful pool filters (nonce, balance)
stateOpts := & txpool . ValidationOptionsWithState {
State : p . state ,
FirstNonceGap : func ( addr common . Address ) uint64 {
// Nonce gaps are not permitted in the blob pool, the first gap will
// be the next nonce shifted by however many transactions we already
// have pooled.
return p . state . GetNonce ( addr ) + uint64 ( len ( p . index [ addr ] ) )
} ,
UsedAndLeftSlots : func ( addr common . Address ) ( int , int ) {
have := len ( p . index [ addr ] )
if have >= maxTxsPerAccount {
return have , 0
}
return have , maxTxsPerAccount - have
} ,
ExistingExpenditure : func ( addr common . Address ) * big . Int {
if spent := p . spent [ addr ] ; spent != nil {
return spent . ToBig ( )
}
return new ( big . Int )
} ,
ExistingCost : func ( addr common . Address , nonce uint64 ) * big . Int {
next := p . state . GetNonce ( addr )
if uint64 ( len ( p . index [ addr ] ) ) > nonce - next {
return p . index [ addr ] [ int ( tx . Nonce ( ) - next ) ] . costCap . ToBig ( )
}
return nil
} ,
}
if err := txpool . ValidateTransactionWithState ( tx , p . signer , stateOpts ) ; err != nil {
return err
}
// If the transaction replaces an existing one, ensure that price bumps are
// adhered to.
var (
from , _ = p . signer . Sender ( tx ) // already validated above
next = p . state . GetNonce ( from )
)
if uint64 ( len ( p . index [ from ] ) ) > tx . Nonce ( ) - next {
// Account can support the replacement, but the price bump must also be met
prev := p . index [ from ] [ int ( tx . Nonce ( ) - next ) ]
switch {
case tx . GasFeeCapIntCmp ( prev . execFeeCap . ToBig ( ) ) <= 0 :
return fmt . Errorf ( "%w: new tx gas fee cap %v <= %v queued" , txpool . ErrReplaceUnderpriced , tx . GasFeeCap ( ) , prev . execFeeCap )
case tx . GasTipCapIntCmp ( prev . execTipCap . ToBig ( ) ) <= 0 :
return fmt . Errorf ( "%w: new tx gas tip cap %v <= %v queued" , txpool . ErrReplaceUnderpriced , tx . GasTipCap ( ) , prev . execTipCap )
case tx . BlobGasFeeCapIntCmp ( prev . blobFeeCap . ToBig ( ) ) <= 0 :
return fmt . Errorf ( "%w: new tx blob gas fee cap %v <= %v queued" , txpool . ErrReplaceUnderpriced , tx . BlobGasFeeCap ( ) , prev . blobFeeCap )
}
var (
multiplier = uint256 . NewInt ( 100 + p . config . PriceBump )
onehundred = uint256 . NewInt ( 100 )
minGasFeeCap = new ( uint256 . Int ) . Div ( new ( uint256 . Int ) . Mul ( multiplier , prev . execFeeCap ) , onehundred )
minGasTipCap = new ( uint256 . Int ) . Div ( new ( uint256 . Int ) . Mul ( multiplier , prev . execTipCap ) , onehundred )
minBlobGasFeeCap = new ( uint256 . Int ) . Div ( new ( uint256 . Int ) . Mul ( multiplier , prev . blobFeeCap ) , onehundred )
)
switch {
case tx . GasFeeCapIntCmp ( minGasFeeCap . ToBig ( ) ) < 0 :
return fmt . Errorf ( "%w: new tx gas fee cap %v <= %v queued + %d%% replacement penalty" , txpool . ErrReplaceUnderpriced , tx . GasFeeCap ( ) , prev . execFeeCap , p . config . PriceBump )
case tx . GasTipCapIntCmp ( minGasTipCap . ToBig ( ) ) < 0 :
return fmt . Errorf ( "%w: new tx gas tip cap %v <= %v queued + %d%% replacement penalty" , txpool . ErrReplaceUnderpriced , tx . GasTipCap ( ) , prev . execTipCap , p . config . PriceBump )
case tx . BlobGasFeeCapIntCmp ( minBlobGasFeeCap . ToBig ( ) ) < 0 :
return fmt . Errorf ( "%w: new tx blob gas fee cap %v <= %v queued + %d%% replacement penalty" , txpool . ErrReplaceUnderpriced , tx . BlobGasFeeCap ( ) , prev . blobFeeCap , p . config . PriceBump )
}
}
return nil
}
// Has returns an indicator whether subpool has a transaction cached with the
// given hash.
func ( p * BlobPool ) Has ( hash common . Hash ) bool {
p . lock . RLock ( )
defer p . lock . RUnlock ( )
_ , ok := p . lookup [ hash ]
return ok
}
// Get returns a transaction if it is contained in the pool, or nil otherwise.
func ( p * BlobPool ) Get ( hash common . Hash ) * txpool . Transaction {
// Track the amount of time waiting to retrieve a fully resolved blob tx from
// the pool and the amount of time actually spent on pulling the data from disk.
getStart := time . Now ( )
p . lock . RLock ( )
getwaitHist . Update ( time . Since ( getStart ) . Nanoseconds ( ) )
defer p . lock . RUnlock ( )
defer func ( start time . Time ) {
gettimeHist . Update ( time . Since ( start ) . Nanoseconds ( ) )
} ( time . Now ( ) )
// Pull the blob from disk and return an assembled response
id , ok := p . lookup [ hash ]
if ! ok {
return nil
}
data , err := p . store . Get ( id )
if err != nil {
log . Error ( "Tracked blob transaction missing from store" , "hash" , hash , "id" , id , "err" , err )
return nil
}
item := new ( blobTx )
if err = rlp . DecodeBytes ( data , item ) ; err != nil {
log . Error ( "Blobs corrupted for traced transaction" , "hash" , hash , "id" , id , "err" , err )
return nil
}
return & txpool . Transaction {
Tx : item . Tx ,
BlobTxBlobs : item . Blobs ,
BlobTxCommits : item . Commits ,
BlobTxProofs : item . Proofs ,
}
}
// Add inserts a set of blob transactions into the pool if they pass validation (both
// consensus validity and pool restictions).
func ( p * BlobPool ) Add ( txs [ ] * txpool . Transaction , local bool , sync bool ) [ ] error {
errs := make ( [ ] error , len ( txs ) )
for i , tx := range txs {
errs [ i ] = p . add ( tx . Tx , tx . BlobTxBlobs , tx . BlobTxCommits , tx . BlobTxProofs )
}
return errs
}
// Add inserts a new blob transaction into the pool if it passes validation (both
// consensus validity and pool restictions).
func ( p * BlobPool ) add ( tx * types . Transaction , blobs [ ] kzg4844 . Blob , commits [ ] kzg4844 . Commitment , proofs [ ] kzg4844 . Proof ) ( err error ) {
// The blob pool blocks on adding a transaction. This is because blob txs are
// only even pulled form the network, so this method will act as the overload
// protection for fetches.
waitStart := time . Now ( )
p . lock . Lock ( )
addwaitHist . Update ( time . Since ( waitStart ) . Nanoseconds ( ) )
defer p . lock . Unlock ( )
defer func ( start time . Time ) {
addtimeHist . Update ( time . Since ( start ) . Nanoseconds ( ) )
} ( time . Now ( ) )
// Ensure the transaction is valid from all perspectives
if err := p . validateTx ( tx , blobs , commits , proofs ) ; err != nil {
log . Trace ( "Transaction validation failed" , "hash" , tx . Hash ( ) , "err" , err )
return err
}
// If the address is not yet known, request exclusivity to track the account
// only by this subpool until all transactions are evicted
from , _ := types . Sender ( p . signer , tx ) // already validated above
if _ , ok := p . index [ from ] ; ! ok {
if err := p . reserve ( from , true ) ; err != nil {
return err
}
defer func ( ) {
// If the transaction is rejected by some post-validation check, remove
// the lock on the reservation set.
//
// Note, `err` here is the named error return, which will be initialized
// by a return statement before running deferred methods. Take care with
// removing or subscoping err as it will break this clause.
if err != nil {
p . reserve ( from , false )
}
} ( )
}
// Transaction permitted into the pool from a nonce and cost perspective,
// insert it into the database and update the indices
blob , err := rlp . EncodeToBytes ( & blobTx { Tx : tx , Blobs : blobs , Commits : commits , Proofs : proofs } )
if err != nil {
log . Error ( "Failed to encode transaction for storage" , "hash" , tx . Hash ( ) , "err" , err )
return err
}
id , err := p . store . Put ( blob )
if err != nil {
return err
}
meta := newBlobTxMeta ( id , p . store . Size ( id ) , tx )
var (
next = p . state . GetNonce ( from )
offset = int ( tx . Nonce ( ) - next )
newacc = false
)
var oldEvictionExecFeeJumps , oldEvictionBlobFeeJumps float64
if txs , ok := p . index [ from ] ; ok {
oldEvictionExecFeeJumps = txs [ len ( txs ) - 1 ] . evictionExecFeeJumps
oldEvictionBlobFeeJumps = txs [ len ( txs ) - 1 ] . evictionBlobFeeJumps
}
if len ( p . index [ from ] ) > offset {
// Transaction replaces a previously queued one
prev := p . index [ from ] [ offset ]
if err := p . store . Delete ( prev . id ) ; err != nil {
// Shitty situation, but try to recover gracefully instead of going boom
log . Error ( "Failed to delete replaced transaction" , "id" , prev . id , "err" , err )
}
// Update the transaction index
p . index [ from ] [ offset ] = meta
p . spent [ from ] = new ( uint256 . Int ) . Sub ( p . spent [ from ] , prev . costCap )
p . spent [ from ] = new ( uint256 . Int ) . Add ( p . spent [ from ] , meta . costCap )
delete ( p . lookup , prev . hash )
p . lookup [ meta . hash ] = meta . id
p . stored += uint64 ( meta . size ) - uint64 ( prev . size )
} else {
// Transaction extends previously scheduled ones
p . index [ from ] = append ( p . index [ from ] , meta )
if _ , ok := p . spent [ from ] ; ! ok {
p . spent [ from ] = new ( uint256 . Int )
newacc = true
}
p . spent [ from ] = new ( uint256 . Int ) . Add ( p . spent [ from ] , meta . costCap )
p . lookup [ meta . hash ] = meta . id
p . stored += uint64 ( meta . size )
}
// Recompute the rolling eviction fields. In case of a replacement, this will
// recompute all subsequent fields. In case of an append, this will only do
// the fresh calculation.
txs := p . index [ from ]
for i := offset ; i < len ( txs ) ; i ++ {
// The first transaction will always use itself
if i == 0 {
txs [ 0 ] . evictionExecTip = txs [ 0 ] . execTipCap
txs [ 0 ] . evictionExecFeeJumps = txs [ 0 ] . basefeeJumps
txs [ 0 ] . evictionBlobFeeJumps = txs [ 0 ] . blobfeeJumps
continue
}
// Subsequent transactions will use a rolling calculation
txs [ i ] . evictionExecTip = txs [ i - 1 ] . evictionExecTip
if txs [ i ] . evictionExecTip . Cmp ( txs [ i ] . execTipCap ) > 0 {
txs [ i ] . evictionExecTip = txs [ i ] . execTipCap
}
txs [ i ] . evictionExecFeeJumps = txs [ i - 1 ] . evictionExecFeeJumps
if txs [ i ] . evictionExecFeeJumps > txs [ i ] . basefeeJumps {
txs [ i ] . evictionExecFeeJumps = txs [ i ] . basefeeJumps
}
txs [ i ] . evictionBlobFeeJumps = txs [ i - 1 ] . evictionBlobFeeJumps
if txs [ i ] . evictionBlobFeeJumps > txs [ i ] . blobfeeJumps {
txs [ i ] . evictionBlobFeeJumps = txs [ i ] . blobfeeJumps
}
}
// Update the eviction heap with the new information:
// - If the transaction is from a new account, add it to the heap
// - If the account had a singleton tx replaced, update the heap (new price caps)
// - If the account has a transaction replaced or appended, update the heap if significantly changed
switch {
case newacc :
heap . Push ( p . evict , from )
case len ( txs ) == 1 : // 1 tx and not a new acc, must be replacement
heap . Fix ( p . evict , p . evict . index [ from ] )
default : // replacement or new append
evictionExecFeeDiff := oldEvictionExecFeeJumps - txs [ len ( txs ) - 1 ] . evictionExecFeeJumps
evictionBlobFeeDiff := oldEvictionBlobFeeJumps - txs [ len ( txs ) - 1 ] . evictionBlobFeeJumps
if math . Abs ( evictionExecFeeDiff ) > 0.001 || math . Abs ( evictionBlobFeeDiff ) > 0.001 { // need math.Abs, can go up and down
heap . Fix ( p . evict , p . evict . index [ from ] )
}
}
// If the pool went over the allowed data limit, evict transactions until
// we're again below the threshold
for p . stored > p . config . Datacap {
p . drop ( )
}
p . updateStorageMetrics ( )
return nil
}
// drop removes the worst transaction from the pool. It is primarily used when a
// freshly added transaction overflows the pool and needs to evict something. The
// method is also called on startup if the user resizes their storage, might be an
// expensive run but it should be fine-ish.
func ( p * BlobPool ) drop ( ) {
// Peek at the account with the worse transaction set to evict from (Go's heap
// stores the minimum at index zero of the heap slice) and retrieve it's last
// transaction.
var (
from = p . evict . addrs [ 0 ] // cannot call drop on empty pool
txs = p . index [ from ]
drop = txs [ len ( txs ) - 1 ]
last = len ( txs ) == 1
)
// Remove the transaction from the pool's index
if last {
delete ( p . index , from )
delete ( p . spent , from )
p . reserve ( from , false )
} else {
txs [ len ( txs ) - 1 ] = nil
txs = txs [ : len ( txs ) - 1 ]
p . index [ from ] = txs
p . spent [ from ] = new ( uint256 . Int ) . Sub ( p . spent [ from ] , drop . costCap )
}
p . stored -= uint64 ( drop . size )
delete ( p . lookup , drop . hash )
// Remove the transaction from the pool's evicion heap:
// - If the entire account was dropped, pop off the address
// - Otherwise, if the new tail has better eviction caps, fix the heap
if last {
heap . Pop ( p . evict )
} else {
tail := txs [ len ( txs ) - 1 ] // new tail, surely exists
evictionExecFeeDiff := tail . evictionExecFeeJumps - drop . evictionExecFeeJumps
evictionBlobFeeDiff := tail . evictionBlobFeeJumps - drop . evictionBlobFeeJumps
if evictionExecFeeDiff > 0.001 || evictionBlobFeeDiff > 0.001 { // no need for math.Abs, monotonic decreasing
heap . Fix ( p . evict , 0 )
}
}
// Remove the transaction from the data store
log . Warn ( "Evicting overflown blob transaction" , "from" , from , "evicted" , drop . nonce , "id" , drop . id )
if err := p . store . Delete ( drop . id ) ; err != nil {
log . Error ( "Failed to drop evicted transaction" , "id" , drop . id , "err" , err )
}
}
// Pending retrieves all currently processable transactions, grouped by origin
// account and sorted by nonce.
func ( p * BlobPool ) Pending ( enforceTips bool ) map [ common . Address ] [ ] * txpool . LazyTransaction {
// Track the amount of time waiting to retrieve the list of pending blob txs
// from the pool and the amount of time actually spent on assembling the data.
// The latter will be pretty much moot, but we've kept it to have symmetric
// across all user operations.
pendStart := time . Now ( )
p . lock . RLock ( )
pendwaitHist . Update ( time . Since ( pendStart ) . Nanoseconds ( ) )
defer p . lock . RUnlock ( )
defer func ( start time . Time ) {
pendtimeHist . Update ( time . Since ( start ) . Nanoseconds ( ) )
} ( time . Now ( ) )
pending := make ( map [ common . Address ] [ ] * txpool . LazyTransaction )
for addr , txs := range p . index {
var lazies [ ] * txpool . LazyTransaction
for _ , tx := range txs {
lazies = append ( lazies , & txpool . LazyTransaction {
Pool : p ,
Hash : tx . hash ,
Time : time . Now ( ) , // TODO(karalabe): Maybe save these and use that?
GasFeeCap : tx . execFeeCap . ToBig ( ) ,
GasTipCap : tx . execTipCap . ToBig ( ) ,
} )
}
if len ( lazies ) > 0 {
pending [ addr ] = lazies
}
}
return pending
}
// updateStorageMetrics retrieves a bunch of stats from the data store and pushes
// them out as metrics.
func ( p * BlobPool ) updateStorageMetrics ( ) {
stats := p . store . Infos ( )
var (
dataused uint64
datareal uint64
slotused uint64
oversizedDataused uint64
oversizedDatagaps uint64
oversizedSlotused uint64
oversizedSlotgaps uint64
)
for _ , shelf := range stats . Shelves {
slotDataused := shelf . FilledSlots * uint64 ( shelf . SlotSize )
slotDatagaps := shelf . GappedSlots * uint64 ( shelf . SlotSize )
dataused += slotDataused
datareal += slotDataused + slotDatagaps
slotused += shelf . FilledSlots
metrics . GetOrRegisterGauge ( fmt . Sprintf ( shelfDatausedGaugeName , shelf . SlotSize / blobSize ) , nil ) . Update ( int64 ( slotDataused ) )
metrics . GetOrRegisterGauge ( fmt . Sprintf ( shelfDatagapsGaugeName , shelf . SlotSize / blobSize ) , nil ) . Update ( int64 ( slotDatagaps ) )
metrics . GetOrRegisterGauge ( fmt . Sprintf ( shelfSlotusedGaugeName , shelf . SlotSize / blobSize ) , nil ) . Update ( int64 ( shelf . FilledSlots ) )
metrics . GetOrRegisterGauge ( fmt . Sprintf ( shelfSlotgapsGaugeName , shelf . SlotSize / blobSize ) , nil ) . Update ( int64 ( shelf . GappedSlots ) )
if shelf . SlotSize / blobSize > maxBlobsPerTransaction {
oversizedDataused += slotDataused
oversizedDatagaps += slotDatagaps
oversizedSlotused += shelf . FilledSlots
oversizedSlotgaps += shelf . GappedSlots
}
}
datausedGauge . Update ( int64 ( dataused ) )
datarealGauge . Update ( int64 ( datareal ) )
slotusedGauge . Update ( int64 ( slotused ) )
oversizedDatausedGauge . Update ( int64 ( oversizedDataused ) )
oversizedDatagapsGauge . Update ( int64 ( oversizedDatagaps ) )
oversizedSlotusedGauge . Update ( int64 ( oversizedSlotused ) )
oversizedSlotgapsGauge . Update ( int64 ( oversizedSlotgaps ) )
p . updateLimboMetrics ( )
}
// updateLimboMetrics retrieves a bunch of stats from the limbo store and pushes
// // them out as metrics.
func ( p * BlobPool ) updateLimboMetrics ( ) {
stats := p . limbo . store . Infos ( )
var (
dataused uint64
datareal uint64
slotused uint64
)
for _ , shelf := range stats . Shelves {
slotDataused := shelf . FilledSlots * uint64 ( shelf . SlotSize )
slotDatagaps := shelf . GappedSlots * uint64 ( shelf . SlotSize )
dataused += slotDataused
datareal += slotDataused + slotDatagaps
slotused += shelf . FilledSlots
metrics . GetOrRegisterGauge ( fmt . Sprintf ( limboShelfDatausedGaugeName , shelf . SlotSize / blobSize ) , nil ) . Update ( int64 ( slotDataused ) )
metrics . GetOrRegisterGauge ( fmt . Sprintf ( limboShelfDatagapsGaugeName , shelf . SlotSize / blobSize ) , nil ) . Update ( int64 ( slotDatagaps ) )
metrics . GetOrRegisterGauge ( fmt . Sprintf ( limboShelfSlotusedGaugeName , shelf . SlotSize / blobSize ) , nil ) . Update ( int64 ( shelf . FilledSlots ) )
metrics . GetOrRegisterGauge ( fmt . Sprintf ( limboShelfSlotgapsGaugeName , shelf . SlotSize / blobSize ) , nil ) . Update ( int64 ( shelf . GappedSlots ) )
}
limboDatausedGauge . Update ( int64 ( dataused ) )
limboDatarealGauge . Update ( int64 ( datareal ) )
limboSlotusedGauge . Update ( int64 ( slotused ) )
}
// SubscribeTransactions registers a subscription of NewTxsEvent and
// starts sending event to the given channel.
func ( p * BlobPool ) SubscribeTransactions ( ch chan <- core . NewTxsEvent ) event . Subscription {
return p . eventScope . Track ( p . eventFeed . Subscribe ( ch ) )
}
// Nonce returns the next nonce of an account, with all transactions executable
// by the pool already applied on top.
func ( p * BlobPool ) Nonce ( addr common . Address ) uint64 {
p . lock . Lock ( )
defer p . lock . Unlock ( )
if txs , ok := p . index [ addr ] ; ok {
return txs [ len ( txs ) - 1 ] . nonce + 1
}
return p . state . GetNonce ( addr )
}
// Stats retrieves the current pool stats, namely the number of pending and the
// number of queued (non-executable) transactions.
func ( p * BlobPool ) Stats ( ) ( int , int ) {
p . lock . Lock ( )
defer p . lock . Unlock ( )
var pending int
for _ , txs := range p . index {
pending += len ( txs )
}
return pending , 0 // No non-executable txs in the blob pool
}
// Content retrieves the data content of the transaction pool, returning all the
// pending as well as queued transactions, grouped by account and sorted by nonce.
//
// For the blob pool, this method will return nothing for now.
// TODO(karalabe): Abstract out the returned metadata.
func ( p * BlobPool ) Content ( ) ( map [ common . Address ] [ ] * types . Transaction , map [ common . Address ] [ ] * types . Transaction ) {
return make ( map [ common . Address ] [ ] * types . Transaction ) , make ( map [ common . Address ] [ ] * types . Transaction )
}
// ContentFrom retrieves the data content of the transaction pool, returning the
// pending as well as queued transactions of this address, grouped by nonce.
//
// For the blob pool, this method will return nothing for now.
// TODO(karalabe): Abstract out the returned metadata.
func ( p * BlobPool ) ContentFrom ( addr common . Address ) ( [ ] * types . Transaction , [ ] * types . Transaction ) {
return [ ] * types . Transaction { } , [ ] * types . Transaction { }
}
// Locals retrieves the accounts currently considered local by the pool.
//
// There is no notion of local accounts in the blob pool.
func ( p * BlobPool ) Locals ( ) [ ] common . Address {
return [ ] common . Address { }
}
// Status returns the known status (unknown/pending/queued) of a transaction
// identified by their hashes.
func ( p * BlobPool ) Status ( hash common . Hash ) txpool . TxStatus {
if p . Has ( hash ) {
return txpool . TxStatusPending
}
return txpool . TxStatusUnknown
}