// Copyright 2022 The go-ethereum Authors // This file is part of the go-ethereum library. // // The go-ethereum library is free software: you can redistribute it and/or modify // it under the terms of the GNU Lesser General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // The go-ethereum library is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU Lesser General Public License for more details. // // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see . // Package blobpool implements the EIP-4844 blob transaction pool. package blobpool import ( "container/heap" "errors" "fmt" "math" "math/big" "os" "path/filepath" "sort" "sync" "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus/misc/eip1559" "github.com/ethereum/go-ethereum/consensus/misc/eip4844" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/txpool" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto/kzg4844" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" "github.com/holiman/billy" "github.com/holiman/uint256" ) const ( // blobSize is the protocol constrained byte size of a single blob in a // transaction. There can be multiple of these embedded into a single tx. blobSize = params.BlobTxFieldElementsPerBlob * params.BlobTxBytesPerFieldElement // maxBlobsPerTransaction is the maximum number of blobs a single transaction // is allowed to contain. Whilst the spec states it's unlimited, the block // data slots are protocol bound, which implicitly also limit this. maxBlobsPerTransaction = params.MaxBlobGasPerBlock / params.BlobTxBlobGasPerBlob // txAvgSize is an approximate byte size of a transaction metadata to avoid // tiny overflows causing all txs to move a shelf higher, wasting disk space. txAvgSize = 4 * 1024 // txMaxSize is the maximum size a single transaction can have, outside // the included blobs. Since blob transactions are pulled instead of pushed, // and only a small metadata is kept in ram, the rest is on disk, there is // no critical limit that should be enforced. Still, capping it to some sane // limit can never hurt. txMaxSize = 1024 * 1024 // maxTxsPerAccount is the maximum number of blob transactions admitted from // a single account. The limit is enforced to minimize the DoS potential of // a private tx cancelling publicly propagated blobs. // // Note, transactions resurrected by a reorg are also subject to this limit, // so pushing it down too aggressively might make resurrections non-functional. maxTxsPerAccount = 16 // pendingTransactionStore is the subfolder containing the currently queued // blob transactions. pendingTransactionStore = "queue" // limboedTransactionStore is the subfolder containing the currently included // but not yet finalized transaction blobs. limboedTransactionStore = "limbo" ) // blobTxMeta is the minimal subset of types.BlobTx necessary to validate and // schedule the blob transactions into the following blocks. Only ever add the // bare minimum needed fields to keep the size down (and thus number of entries // larger with the same memory consumption). type blobTxMeta struct { hash common.Hash // Transaction hash to maintain the lookup table vhashes []common.Hash // Blob versioned hashes to maintain the lookup table id uint64 // Storage ID in the pool's persistent store size uint32 // Byte size in the pool's persistent store nonce uint64 // Needed to prioritize inclusion order within an account costCap *uint256.Int // Needed to validate cumulative balance sufficiency execTipCap *uint256.Int // Needed to prioritize inclusion order across accounts and validate replacement price bump execFeeCap *uint256.Int // Needed to validate replacement price bump blobFeeCap *uint256.Int // Needed to validate replacement price bump execGas uint64 // Needed to check inclusion validity before reading the blob blobGas uint64 // Needed to check inclusion validity before reading the blob basefeeJumps float64 // Absolute number of 1559 fee adjustments needed to reach the tx's fee cap blobfeeJumps float64 // Absolute number of 4844 fee adjustments needed to reach the tx's blob fee cap evictionExecTip *uint256.Int // Worst gas tip across all previous nonces evictionExecFeeJumps float64 // Worst base fee (converted to fee jumps) across all previous nonces evictionBlobFeeJumps float64 // Worse blob fee (converted to fee jumps) across all previous nonces } // newBlobTxMeta retrieves the indexed metadata fields from a blob transaction // and assembles a helper struct to track in memory. func newBlobTxMeta(id uint64, size uint32, tx *types.Transaction) *blobTxMeta { meta := &blobTxMeta{ hash: tx.Hash(), vhashes: tx.BlobHashes(), id: id, size: size, nonce: tx.Nonce(), costCap: uint256.MustFromBig(tx.Cost()), execTipCap: uint256.MustFromBig(tx.GasTipCap()), execFeeCap: uint256.MustFromBig(tx.GasFeeCap()), blobFeeCap: uint256.MustFromBig(tx.BlobGasFeeCap()), execGas: tx.Gas(), blobGas: tx.BlobGas(), } meta.basefeeJumps = dynamicFeeJumps(meta.execFeeCap) meta.blobfeeJumps = dynamicFeeJumps(meta.blobFeeCap) return meta } // BlobPool is the transaction pool dedicated to EIP-4844 blob transactions. // // Blob transactions are special snowflakes that are designed for a very specific // purpose (rollups) and are expected to adhere to that specific use case. These // behavioural expectations allow us to design a transaction pool that is more robust // (i.e. resending issues) and more resilient to DoS attacks (e.g. replace-flush // attacks) than the generic tx pool. These improvements will also mean, however, // that we enforce a significantly more aggressive strategy on entering and exiting // the pool: // // - Blob transactions are large. With the initial design aiming for 128KB blobs, // we must ensure that these only traverse the network the absolute minimum // number of times. Broadcasting to sqrt(peers) is out of the question, rather // these should only ever be announced and the remote side should request it if // it wants to. // // - Block blob-space is limited. With blocks being capped to a few blob txs, we // can make use of the very low expected churn rate within the pool. Notably, // we should be able to use a persistent disk backend for the pool, solving // the tx resend issue that plagues the generic tx pool, as long as there's no // artificial churn (i.e. pool wars). // // - Purpose of blobs are layer-2s. Layer-2s are meant to use blob transactions to // commit to their own current state, which is independent of Ethereum mainnet // (state, txs). This means that there's no reason for blob tx cancellation or // replacement, apart from a potential basefee / miner tip adjustment. // // - Replacements are expensive. Given their size, propagating a replacement // blob transaction to an existing one should be aggressively discouraged. // Whilst generic transactions can start at 1 Wei gas cost and require a 10% // fee bump to replace, we suggest requiring a higher min cost (e.g. 1 gwei) // and a more aggressive bump (100%). // // - Cancellation is prohibitive. Evicting an already propagated blob tx is a huge // DoS vector. As such, a) replacement (higher-fee) blob txs mustn't invalidate // already propagated (future) blob txs (cumulative fee); b) nonce-gapped blob // txs are disallowed; c) the presence of blob transactions exclude non-blob // transactions. // // - Malicious cancellations are possible. Although the pool might prevent txs // that cancel blobs, blocks might contain such transaction (malicious miner // or flashbotter). The pool should cap the total number of blob transactions // per account as to prevent propagating too much data before cancelling it // via a normal transaction. It should nonetheless be high enough to support // resurrecting reorged transactions. Perhaps 4-16. // // - Local txs are meaningless. Mining pools historically used local transactions // for payouts or for backdoor deals. With 1559 in place, the basefee usually // dominates the final price, so 0 or non-0 tip doesn't change much. Blob txs // retain the 1559 2D gas pricing (and introduce on top a dynamic blob gas fee), // so locality is moot. With a disk backed blob pool avoiding the resend issue, // there's also no need to save own transactions for later. // // - No-blob blob-txs are bad. Theoretically there's no strong reason to disallow // blob txs containing 0 blobs. In practice, admitting such txs into the pool // breaks the low-churn invariant as blob constraints don't apply anymore. Even // though we could accept blocks containing such txs, a reorg would require moving // them back into the blob pool, which can break invariants. // // - Dropping blobs needs delay. When normal transactions are included, they // are immediately evicted from the pool since they are contained in the // including block. Blobs however are not included in the execution chain, // so a mini reorg cannot re-pool "lost" blob transactions. To support reorgs, // blobs are retained on disk until they are finalised. // // - Blobs can arrive via flashbots. Blocks might contain blob transactions we // have never seen on the network. Since we cannot recover them from blocks // either, the engine_newPayload needs to give them to us, and we cache them // until finality to support reorgs without tx losses. // // Whilst some constraints above might sound overly aggressive, the general idea is // that the blob pool should work robustly for its intended use case and whilst // anyone is free to use blob transactions for arbitrary non-rollup use cases, // they should not be allowed to run amok the network. // // Implementation wise there are a few interesting design choices: // // - Adding a transaction to the pool blocks until persisted to disk. This is // viable because TPS is low (2-4 blobs per block initially, maybe 8-16 at // peak), so natural churn is a couple MB per block. Replacements doing O(n) // updates are forbidden and transaction propagation is pull based (i.e. no // pileup of pending data). // // - When transactions are chosen for inclusion, the primary criteria is the // signer tip (and having a basefee/data fee high enough of course). However, // same-tip transactions will be split by their basefee/datafee, preferring // those that are closer to the current network limits. The idea being that // very relaxed ones can be included even if the fees go up, when the closer // ones could already be invalid. // // When the pool eventually reaches saturation, some old transactions - that may // never execute - will need to be evicted in favor of newer ones. The eviction // strategy is quite complex: // // - Exceeding capacity evicts the highest-nonce of the account with the lowest // paying blob transaction anywhere in the pooled nonce-sequence, as that tx // would be executed the furthest in the future and is thus blocking anything // after it. The smallest is deliberately not evicted to avoid a nonce-gap. // // - Analogously, if the pool is full, the consideration price of a new tx for // evicting an old one is the smallest price in the entire nonce-sequence of // the account. This avoids malicious users DoSing the pool with seemingly // high paying transactions hidden behind a low-paying blocked one. // // - Since blob transactions have 3 price parameters: execution tip, execution // fee cap and data fee cap, there's no singular parameter to create a total // price ordering on. What's more, since the base fee and blob fee can move // independently of one another, there's no pre-defined way to combine them // into a stable order either. This leads to a multi-dimensional problem to // solve after every block. // // - The first observation is that comparing 1559 base fees or 4844 blob fees // needs to happen in the context of their dynamism. Since these fees jump // up or down in ~1.125 multipliers (at max) across blocks, comparing fees // in two transactions should be based on log1.125(fee) to eliminate noise. // // - The second observation is that the basefee and blobfee move independently, // so there's no way to split mixed txs on their own (A has higher base fee, // B has higher blob fee). Rather than look at the absolute fees, the useful // metric is the max time it can take to exceed the transaction's fee caps. // Specifically, we're interested in the number of jumps needed to go from // the current fee to the transaction's cap: // // jumps = log1.125(txfee) - log1.125(basefee) // // - The third observation is that the base fee tends to hover around rather // than swing wildly. The number of jumps needed from the current fee starts // to get less relevant the higher it is. To remove the noise here too, the // pool will use log(jumps) as the delta for comparing transactions. // // delta = sign(jumps) * log(abs(jumps)) // // - To establish a total order, we need to reduce the dimensionality of the // two base fees (log jumps) to a single value. The interesting aspect from // the pool's perspective is how fast will a tx get executable (fees going // down, crossing the smaller negative jump counter) or non-executable (fees // going up, crossing the smaller positive jump counter). As such, the pool // cares only about the min of the two delta values for eviction priority. // // priority = min(deltaBasefee, deltaBlobfee) // // - The above very aggressive dimensionality and noise reduction should result // in transaction being grouped into a small number of buckets, the further // the fees the larger the buckets. This is good because it allows us to use // the miner tip meaningfully as a splitter. // // - For the scenario where the pool does not contain non-executable blob txs // anymore, it does not make sense to grant a later eviction priority to txs // with high fee caps since it could enable pool wars. As such, any positive // priority will be grouped together. // // priority = min(deltaBasefee, deltaBlobfee, 0) // // Optimisation tradeoffs: // // - Eviction relies on 3 fee minimums per account (exec tip, exec cap and blob // cap). Maintaining these values across all transactions from the account is // problematic as each transaction replacement or inclusion would require a // rescan of all other transactions to recalculate the minimum. Instead, the // pool maintains a rolling minimum across the nonce range. Updating all the // minimums will need to be done only starting at the swapped in/out nonce // and leading up to the first no-change. type BlobPool struct { config Config // Pool configuration reserve txpool.AddressReserver // Address reserver to ensure exclusivity across subpools store billy.Database // Persistent data store for the tx metadata and blobs stored uint64 // Useful data size of all transactions on disk limbo *limbo // Persistent data store for the non-finalized blobs signer types.Signer // Transaction signer to use for sender recovery chain BlockChain // Chain object to access the state through head *types.Header // Current head of the chain state *state.StateDB // Current state at the head of the chain gasTip *uint256.Int // Currently accepted minimum gas tip lookup *lookup // Lookup table mapping blobs to txs and txs to billy entries index map[common.Address][]*blobTxMeta // Blob transactions grouped by accounts, sorted by nonce spent map[common.Address]*uint256.Int // Expenditure tracking for individual accounts evict *evictHeap // Heap of cheapest accounts for eviction when full discoverFeed event.Feed // Event feed to send out new tx events on pool discovery (reorg excluded) insertFeed event.Feed // Event feed to send out new tx events on pool inclusion (reorg included) // txValidationFn defaults to txpool.ValidateTransaction, but can be // overridden for testing purposes. txValidationFn txpool.ValidationFunction lock sync.RWMutex // Mutex protecting the pool during reorg handling } // New creates a new blob transaction pool to gather, sort and filter inbound // blob transactions from the network. func New(config Config, chain BlockChain) *BlobPool { // Sanitize the input to ensure no vulnerable gas prices are set config = (&config).sanitize() // Create the transaction pool with its initial settings return &BlobPool{ config: config, signer: types.LatestSigner(chain.Config()), chain: chain, lookup: newLookup(), index: make(map[common.Address][]*blobTxMeta), spent: make(map[common.Address]*uint256.Int), txValidationFn: txpool.ValidateTransaction, } } // Filter returns whether the given transaction can be consumed by the blob pool. func (p *BlobPool) Filter(tx *types.Transaction) bool { return tx.Type() == types.BlobTxType } // Init sets the gas price needed to keep a transaction in the pool and the chain // head to allow balance / nonce checks. The transaction journal will be loaded // from disk and filtered based on the provided starting settings. func (p *BlobPool) Init(gasTip uint64, head *types.Header, reserve txpool.AddressReserver) error { p.reserve = reserve var ( queuedir string limbodir string ) if p.config.Datadir != "" { queuedir = filepath.Join(p.config.Datadir, pendingTransactionStore) if err := os.MkdirAll(queuedir, 0700); err != nil { return err } limbodir = filepath.Join(p.config.Datadir, limboedTransactionStore) if err := os.MkdirAll(limbodir, 0700); err != nil { return err } } // Initialize the state with head block, or fallback to empty one in // case the head state is not available (might occur when node is not // fully synced). state, err := p.chain.StateAt(head.Root) if err != nil { state, err = p.chain.StateAt(types.EmptyRootHash) } if err != nil { return err } p.head, p.state = head, state // Index all transactions on disk and delete anything unprocessable var fails []uint64 index := func(id uint64, size uint32, blob []byte) { if p.parseTransaction(id, size, blob) != nil { fails = append(fails, id) } } store, err := billy.Open(billy.Options{Path: queuedir, Repair: true}, newSlotter(), index) if err != nil { return err } p.store = store if len(fails) > 0 { log.Warn("Dropping invalidated blob transactions", "ids", fails) dropInvalidMeter.Mark(int64(len(fails))) for _, id := range fails { if err := p.store.Delete(id); err != nil { p.Close() return err } } } // Sort the indexed transactions by nonce and delete anything gapped, create // the eviction heap of anyone still standing for addr := range p.index { p.recheck(addr, nil) } var ( basefee = uint256.MustFromBig(eip1559.CalcBaseFee(p.chain.Config(), p.head)) blobfee = uint256.NewInt(params.BlobTxMinBlobGasprice) ) if p.head.ExcessBlobGas != nil { blobfee = uint256.MustFromBig(eip4844.CalcBlobFee(*p.head.ExcessBlobGas)) } p.evict = newPriceHeap(basefee, blobfee, p.index) // Pool initialized, attach the blob limbo to it to track blobs included // recently but not yet finalized p.limbo, err = newLimbo(limbodir) if err != nil { p.Close() return err } // Set the configured gas tip, triggering a filtering of anything just loaded basefeeGauge.Update(int64(basefee.Uint64())) blobfeeGauge.Update(int64(blobfee.Uint64())) p.SetGasTip(new(big.Int).SetUint64(gasTip)) // Since the user might have modified their pool's capacity, evict anything // above the current allowance for p.stored > p.config.Datacap { p.drop() } // Update the metrics and return the constructed pool datacapGauge.Update(int64(p.config.Datacap)) p.updateStorageMetrics() return nil } // Close closes down the underlying persistent store. func (p *BlobPool) Close() error { var errs []error if p.limbo != nil { // Close might be invoked due to error in constructor, before p,limbo is set if err := p.limbo.Close(); err != nil { errs = append(errs, err) } } if err := p.store.Close(); err != nil { errs = append(errs, err) } switch { case errs == nil: return nil case len(errs) == 1: return errs[0] default: return fmt.Errorf("%v", errs) } } // parseTransaction is a callback method on pool creation that gets called for // each transaction on disk to create the in-memory metadata index. func (p *BlobPool) parseTransaction(id uint64, size uint32, blob []byte) error { tx := new(types.Transaction) if err := rlp.DecodeBytes(blob, tx); err != nil { // This path is impossible unless the disk data representation changes // across restarts. For that ever improbable case, recover gracefully // by ignoring this data entry. log.Error("Failed to decode blob pool entry", "id", id, "err", err) return err } if tx.BlobTxSidecar() == nil { log.Error("Missing sidecar in blob pool entry", "id", id, "hash", tx.Hash()) return errors.New("missing blob sidecar") } meta := newBlobTxMeta(id, size, tx) if p.lookup.exists(meta.hash) { // This path is only possible after a crash, where deleted items are not // removed via the normal shutdown-startup procedure and thus may get // partially resurrected. log.Error("Rejecting duplicate blob pool entry", "id", id, "hash", tx.Hash()) return errors.New("duplicate blob entry") } sender, err := types.Sender(p.signer, tx) if err != nil { // This path is impossible unless the signature validity changes across // restarts. For that ever improbable case, recover gracefully by ignoring // this data entry. log.Error("Failed to recover blob tx sender", "id", id, "hash", tx.Hash(), "err", err) return err } if _, ok := p.index[sender]; !ok { if err := p.reserve(sender, true); err != nil { return err } p.index[sender] = []*blobTxMeta{} p.spent[sender] = new(uint256.Int) } p.index[sender] = append(p.index[sender], meta) p.spent[sender] = new(uint256.Int).Add(p.spent[sender], meta.costCap) p.lookup.track(meta) p.stored += uint64(meta.size) return nil } // recheck verifies the pool's content for a specific account and drops anything // that does not fit anymore (dangling or filled nonce, overdraft). func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint64) { // Sort the transactions belonging to the account so reinjects can be simpler txs := p.index[addr] if inclusions != nil && txs == nil { // during reorgs, we might find new accounts return } sort.Slice(txs, func(i, j int) bool { return txs[i].nonce < txs[j].nonce }) // If there is a gap between the chain state and the blob pool, drop // all the transactions as they are non-executable. Similarly, if the // entire tx range was included, drop all. var ( next = p.state.GetNonce(addr) gapped = txs[0].nonce > next filled = txs[len(txs)-1].nonce < next ) if gapped || filled { var ( ids []uint64 nonces []uint64 ) for i := 0; i < len(txs); i++ { ids = append(ids, txs[i].id) nonces = append(nonces, txs[i].nonce) p.stored -= uint64(txs[i].size) p.lookup.untrack(txs[i]) // Included transactions blobs need to be moved to the limbo if filled && inclusions != nil { p.offload(addr, txs[i].nonce, txs[i].id, inclusions) } } delete(p.index, addr) delete(p.spent, addr) if inclusions != nil { // only during reorgs will the heap be initialized heap.Remove(p.evict, p.evict.index[addr]) } p.reserve(addr, false) if gapped { log.Warn("Dropping dangling blob transactions", "from", addr, "missing", next, "drop", nonces, "ids", ids) dropDanglingMeter.Mark(int64(len(ids))) } else { log.Trace("Dropping filled blob transactions", "from", addr, "filled", nonces, "ids", ids) dropFilledMeter.Mark(int64(len(ids))) } for _, id := range ids { if err := p.store.Delete(id); err != nil { log.Error("Failed to delete blob transaction", "from", addr, "id", id, "err", err) } } return } // If there is overlap between the chain state and the blob pool, drop // anything below the current state if txs[0].nonce < next { var ( ids []uint64 nonces []uint64 ) for len(txs) > 0 && txs[0].nonce < next { ids = append(ids, txs[0].id) nonces = append(nonces, txs[0].nonce) p.spent[addr] = new(uint256.Int).Sub(p.spent[addr], txs[0].costCap) p.stored -= uint64(txs[0].size) p.lookup.untrack(txs[0]) // Included transactions blobs need to be moved to the limbo if inclusions != nil { p.offload(addr, txs[0].nonce, txs[0].id, inclusions) } txs = txs[1:] } log.Trace("Dropping overlapped blob transactions", "from", addr, "overlapped", nonces, "ids", ids, "left", len(txs)) dropOverlappedMeter.Mark(int64(len(ids))) for _, id := range ids { if err := p.store.Delete(id); err != nil { log.Error("Failed to delete blob transaction", "from", addr, "id", id, "err", err) } } p.index[addr] = txs } // Iterate over the transactions to initialize their eviction thresholds // and to detect any nonce gaps txs[0].evictionExecTip = txs[0].execTipCap txs[0].evictionExecFeeJumps = txs[0].basefeeJumps txs[0].evictionBlobFeeJumps = txs[0].blobfeeJumps for i := 1; i < len(txs); i++ { // If there's no nonce gap, initialize the eviction thresholds as the // minimum between the cumulative thresholds and the current tx fees if txs[i].nonce == txs[i-1].nonce+1 { txs[i].evictionExecTip = txs[i-1].evictionExecTip if txs[i].evictionExecTip.Cmp(txs[i].execTipCap) > 0 { txs[i].evictionExecTip = txs[i].execTipCap } txs[i].evictionExecFeeJumps = txs[i-1].evictionExecFeeJumps if txs[i].evictionExecFeeJumps > txs[i].basefeeJumps { txs[i].evictionExecFeeJumps = txs[i].basefeeJumps } txs[i].evictionBlobFeeJumps = txs[i-1].evictionBlobFeeJumps if txs[i].evictionBlobFeeJumps > txs[i].blobfeeJumps { txs[i].evictionBlobFeeJumps = txs[i].blobfeeJumps } continue } // Sanity check that there's no double nonce. This case would generally // be a coding error, so better know about it. // // Also, Billy behind the blobpool does not journal deletes. A process // crash would result in previously deleted entities being resurrected. // That could potentially cause a duplicate nonce to appear. if txs[i].nonce == txs[i-1].nonce { id, _ := p.lookup.storeidOfTx(txs[i].hash) log.Error("Dropping repeat nonce blob transaction", "from", addr, "nonce", txs[i].nonce, "id", id) dropRepeatedMeter.Mark(1) p.spent[addr] = new(uint256.Int).Sub(p.spent[addr], txs[i].costCap) p.stored -= uint64(txs[i].size) p.lookup.untrack(txs[i]) if err := p.store.Delete(id); err != nil { log.Error("Failed to delete blob transaction", "from", addr, "id", id, "err", err) } txs = append(txs[:i], txs[i+1:]...) p.index[addr] = txs i-- continue } // Otherwise if there's a nonce gap evict all later transactions var ( ids []uint64 nonces []uint64 ) for j := i; j < len(txs); j++ { ids = append(ids, txs[j].id) nonces = append(nonces, txs[j].nonce) p.spent[addr] = new(uint256.Int).Sub(p.spent[addr], txs[j].costCap) p.stored -= uint64(txs[j].size) p.lookup.untrack(txs[j]) } txs = txs[:i] log.Error("Dropping gapped blob transactions", "from", addr, "missing", txs[i-1].nonce+1, "drop", nonces, "ids", ids) dropGappedMeter.Mark(int64(len(ids))) for _, id := range ids { if err := p.store.Delete(id); err != nil { log.Error("Failed to delete blob transaction", "from", addr, "id", id, "err", err) } } p.index[addr] = txs break } // Ensure that there's no over-draft, this is expected to happen when some // transactions get included without publishing on the network var ( balance = p.state.GetBalance(addr) spent = p.spent[addr] ) if spent.Cmp(balance) > 0 { // Evict the highest nonce transactions until the pending set falls under // the account's available balance var ( ids []uint64 nonces []uint64 ) for p.spent[addr].Cmp(balance) > 0 { last := txs[len(txs)-1] txs[len(txs)-1] = nil txs = txs[:len(txs)-1] ids = append(ids, last.id) nonces = append(nonces, last.nonce) p.spent[addr] = new(uint256.Int).Sub(p.spent[addr], last.costCap) p.stored -= uint64(last.size) p.lookup.untrack(last) } if len(txs) == 0 { delete(p.index, addr) delete(p.spent, addr) if inclusions != nil { // only during reorgs will the heap be initialized heap.Remove(p.evict, p.evict.index[addr]) } p.reserve(addr, false) } else { p.index[addr] = txs } log.Warn("Dropping overdrafted blob transactions", "from", addr, "balance", balance, "spent", spent, "drop", nonces, "ids", ids) dropOverdraftedMeter.Mark(int64(len(ids))) for _, id := range ids { if err := p.store.Delete(id); err != nil { log.Error("Failed to delete blob transaction", "from", addr, "id", id, "err", err) } } } // Sanity check that no account can have more queued transactions than the // DoS protection threshold. if len(txs) > maxTxsPerAccount { // Evict the highest nonce transactions until the pending set falls under // the account's transaction cap var ( ids []uint64 nonces []uint64 ) for len(txs) > maxTxsPerAccount { last := txs[len(txs)-1] txs[len(txs)-1] = nil txs = txs[:len(txs)-1] ids = append(ids, last.id) nonces = append(nonces, last.nonce) p.spent[addr] = new(uint256.Int).Sub(p.spent[addr], last.costCap) p.stored -= uint64(last.size) p.lookup.untrack(last) } p.index[addr] = txs log.Warn("Dropping overcapped blob transactions", "from", addr, "kept", len(txs), "drop", nonces, "ids", ids) dropOvercappedMeter.Mark(int64(len(ids))) for _, id := range ids { if err := p.store.Delete(id); err != nil { log.Error("Failed to delete blob transaction", "from", addr, "id", id, "err", err) } } } // Included cheap transactions might have left the remaining ones better from // an eviction point, fix any potential issues in the heap. if _, ok := p.index[addr]; ok && inclusions != nil { heap.Fix(p.evict, p.evict.index[addr]) } } // offload removes a tracked blob transaction from the pool and moves it into the // limbo for tracking until finality. // // The method may log errors for various unexpected scenarios but will not return // any of it since there's no clear error case. Some errors may be due to coding // issues, others caused by signers mining MEV stuff or swapping transactions. In // all cases, the pool needs to continue operating. func (p *BlobPool) offload(addr common.Address, nonce uint64, id uint64, inclusions map[common.Hash]uint64) { data, err := p.store.Get(id) if err != nil { log.Error("Blobs missing for included transaction", "from", addr, "nonce", nonce, "id", id, "err", err) return } var tx types.Transaction if err = rlp.DecodeBytes(data, &tx); err != nil { log.Error("Blobs corrupted for included transaction", "from", addr, "nonce", nonce, "id", id, "err", err) return } block, ok := inclusions[tx.Hash()] if !ok { log.Warn("Blob transaction swapped out by signer", "from", addr, "nonce", nonce, "id", id) return } if err := p.limbo.push(&tx, block); err != nil { log.Warn("Failed to offload blob tx into limbo", "err", err) return } } // Reset implements txpool.SubPool, allowing the blob pool's internal state to be // kept in sync with the main transaction pool's internal state. func (p *BlobPool) Reset(oldHead, newHead *types.Header) { waitStart := time.Now() p.lock.Lock() resetwaitHist.Update(time.Since(waitStart).Nanoseconds()) defer p.lock.Unlock() defer func(start time.Time) { resettimeHist.Update(time.Since(start).Nanoseconds()) }(time.Now()) statedb, err := p.chain.StateAt(newHead.Root) if err != nil { log.Error("Failed to reset blobpool state", "err", err) return } p.head = newHead p.state = statedb // Run the reorg between the old and new head and figure out which accounts // need to be rechecked and which transactions need to be readded if reinject, inclusions := p.reorg(oldHead, newHead); reinject != nil { var adds []*types.Transaction for addr, txs := range reinject { // Blindly push all the lost transactions back into the pool for _, tx := range txs { if err := p.reinject(addr, tx.Hash()); err == nil { adds = append(adds, tx.WithoutBlobTxSidecar()) } } // Recheck the account's pooled transactions to drop included and // invalidated ones p.recheck(addr, inclusions) } if len(adds) > 0 { p.insertFeed.Send(core.NewTxsEvent{Txs: adds}) } } // Flush out any blobs from limbo that are older than the latest finality if p.chain.Config().IsCancun(p.head.Number, p.head.Time) { p.limbo.finalize(p.chain.CurrentFinalBlock()) } // Reset the price heap for the new set of basefee/blobfee pairs var ( basefee = uint256.MustFromBig(eip1559.CalcBaseFee(p.chain.Config(), newHead)) blobfee = uint256.MustFromBig(big.NewInt(params.BlobTxMinBlobGasprice)) ) if newHead.ExcessBlobGas != nil { blobfee = uint256.MustFromBig(eip4844.CalcBlobFee(*newHead.ExcessBlobGas)) } p.evict.reinit(basefee, blobfee, false) basefeeGauge.Update(int64(basefee.Uint64())) blobfeeGauge.Update(int64(blobfee.Uint64())) p.updateStorageMetrics() } // reorg assembles all the transactors and missing transactions between an old // and new head to figure out which account's tx set needs to be rechecked and // which transactions need to be requeued. // // The transactionblock inclusion infos are also returned to allow tracking any // just-included blocks by block number in the limbo. func (p *BlobPool) reorg(oldHead, newHead *types.Header) (map[common.Address][]*types.Transaction, map[common.Hash]uint64) { // If the pool was not yet initialized, don't do anything if oldHead == nil { return nil, nil } // If the reorg is too deep, avoid doing it (will happen during snap sync) oldNum := oldHead.Number.Uint64() newNum := newHead.Number.Uint64() if depth := uint64(math.Abs(float64(oldNum) - float64(newNum))); depth > 64 { return nil, nil } // Reorg seems shallow enough to pull in all transactions into memory var ( transactors = make(map[common.Address]struct{}) discarded = make(map[common.Address][]*types.Transaction) included = make(map[common.Address][]*types.Transaction) inclusions = make(map[common.Hash]uint64) rem = p.chain.GetBlock(oldHead.Hash(), oldHead.Number.Uint64()) add = p.chain.GetBlock(newHead.Hash(), newHead.Number.Uint64()) ) if add == nil { // if the new head is nil, it means that something happened between // the firing of newhead-event and _now_: most likely a // reorg caused by sync-reversion or explicit sethead back to an // earlier block. log.Warn("Blobpool reset with missing new head", "number", newHead.Number, "hash", newHead.Hash()) return nil, nil } if rem == nil { // This can happen if a setHead is performed, where we simply discard // the old head from the chain. If that is the case, we don't have the // lost transactions anymore, and there's nothing to add. if newNum >= oldNum { // If we reorged to a same or higher number, then it's not a case // of setHead log.Warn("Blobpool reset with missing old head", "old", oldHead.Hash(), "oldnum", oldNum, "new", newHead.Hash(), "newnum", newNum) return nil, nil } // If the reorg ended up on a lower number, it's indicative of setHead // being the cause log.Debug("Skipping blobpool reset caused by setHead", "old", oldHead.Hash(), "oldnum", oldNum, "new", newHead.Hash(), "newnum", newNum) return nil, nil } // Both old and new blocks exist, traverse through the progression chain // and accumulate the transactors and transactions for rem.NumberU64() > add.NumberU64() { for _, tx := range rem.Transactions() { from, _ := types.Sender(p.signer, tx) discarded[from] = append(discarded[from], tx) transactors[from] = struct{}{} } if rem = p.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil { log.Error("Unrooted old chain seen by blobpool", "block", oldHead.Number, "hash", oldHead.Hash()) return nil, nil } } for add.NumberU64() > rem.NumberU64() { for _, tx := range add.Transactions() { from, _ := types.Sender(p.signer, tx) included[from] = append(included[from], tx) inclusions[tx.Hash()] = add.NumberU64() transactors[from] = struct{}{} } if add = p.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil { log.Error("Unrooted new chain seen by blobpool", "block", newHead.Number, "hash", newHead.Hash()) return nil, nil } } for rem.Hash() != add.Hash() { for _, tx := range rem.Transactions() { from, _ := types.Sender(p.signer, tx) discarded[from] = append(discarded[from], tx) transactors[from] = struct{}{} } if rem = p.chain.GetBlock(rem.ParentHash(), rem.NumberU64()-1); rem == nil { log.Error("Unrooted old chain seen by blobpool", "block", oldHead.Number, "hash", oldHead.Hash()) return nil, nil } for _, tx := range add.Transactions() { from, _ := types.Sender(p.signer, tx) included[from] = append(included[from], tx) inclusions[tx.Hash()] = add.NumberU64() transactors[from] = struct{}{} } if add = p.chain.GetBlock(add.ParentHash(), add.NumberU64()-1); add == nil { log.Error("Unrooted new chain seen by blobpool", "block", newHead.Number, "hash", newHead.Hash()) return nil, nil } } // Generate the set of transactions per address to pull back into the pool, // also updating the rest along the way reinject := make(map[common.Address][]*types.Transaction, len(transactors)) for addr := range transactors { // Generate the set that was lost to reinject into the pool lost := make([]*types.Transaction, 0, len(discarded[addr])) for _, tx := range types.TxDifference(discarded[addr], included[addr]) { if p.Filter(tx) { lost = append(lost, tx) } } reinject[addr] = lost // Update the set that was already reincluded to track the blocks in limbo for _, tx := range types.TxDifference(included[addr], discarded[addr]) { if p.Filter(tx) { p.limbo.update(tx.Hash(), inclusions[tx.Hash()]) } } } return reinject, inclusions } // reinject blindly pushes a transaction previously included in the chain - and // just reorged out - into the pool. The transaction is assumed valid (having // been in the chain), thus the only validation needed is nonce sorting and over- // draft checks after injection. // // Note, the method will not initialize the eviction cache values as those will // be done once for all transactions belonging to an account after all individual // transactions are injected back into the pool. func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) error { // Retrieve the associated blob from the limbo. Without the blobs, we cannot // add the transaction back into the pool as it is not mineable. tx, err := p.limbo.pull(txhash) if err != nil { log.Error("Blobs unavailable, dropping reorged tx", "err", err) return err } // TODO: seems like an easy optimization here would be getting the serialized tx // from limbo instead of re-serializing it here. // Serialize the transaction back into the primary datastore. blob, err := rlp.EncodeToBytes(tx) if err != nil { log.Error("Failed to encode transaction for storage", "hash", tx.Hash(), "err", err) return err } id, err := p.store.Put(blob) if err != nil { log.Error("Failed to write transaction into storage", "hash", tx.Hash(), "err", err) return err } // Update the indices and metrics meta := newBlobTxMeta(id, p.store.Size(id), tx) if _, ok := p.index[addr]; !ok { if err := p.reserve(addr, true); err != nil { log.Warn("Failed to reserve account for blob pool", "tx", tx.Hash(), "from", addr, "err", err) return err } p.index[addr] = []*blobTxMeta{meta} p.spent[addr] = meta.costCap p.evict.Push(addr) } else { p.index[addr] = append(p.index[addr], meta) p.spent[addr] = new(uint256.Int).Add(p.spent[addr], meta.costCap) } p.lookup.track(meta) p.stored += uint64(meta.size) return nil } // SetGasTip implements txpool.SubPool, allowing the blob pool's gas requirements // to be kept in sync with the main transaction pool's gas requirements. func (p *BlobPool) SetGasTip(tip *big.Int) { p.lock.Lock() defer p.lock.Unlock() // Store the new minimum gas tip old := p.gasTip p.gasTip = uint256.MustFromBig(tip) // If the min miner fee increased, remove transactions below the new threshold if old == nil || p.gasTip.Cmp(old) > 0 { for addr, txs := range p.index { for i, tx := range txs { if tx.execTipCap.Cmp(p.gasTip) < 0 { // Drop the offending transaction var ( ids = []uint64{tx.id} nonces = []uint64{tx.nonce} ) p.spent[addr] = new(uint256.Int).Sub(p.spent[addr], txs[i].costCap) p.stored -= uint64(tx.size) p.lookup.untrack(tx) txs[i] = nil // Drop everything afterwards, no gaps allowed for j, tx := range txs[i+1:] { ids = append(ids, tx.id) nonces = append(nonces, tx.nonce) p.spent[addr] = new(uint256.Int).Sub(p.spent[addr], tx.costCap) p.stored -= uint64(tx.size) p.lookup.untrack(tx) txs[i+1+j] = nil } // Clear out the dropped transactions from the index if i > 0 { p.index[addr] = txs[:i] heap.Fix(p.evict, p.evict.index[addr]) } else { delete(p.index, addr) delete(p.spent, addr) heap.Remove(p.evict, p.evict.index[addr]) p.reserve(addr, false) } // Clear out the transactions from the data store log.Warn("Dropping underpriced blob transaction", "from", addr, "rejected", tx.nonce, "tip", tx.execTipCap, "want", tip, "drop", nonces, "ids", ids) dropUnderpricedMeter.Mark(int64(len(ids))) for _, id := range ids { if err := p.store.Delete(id); err != nil { log.Error("Failed to delete dropped transaction", "id", id, "err", err) } } break } } } } log.Debug("Blobpool tip threshold updated", "tip", tip) pooltipGauge.Update(tip.Int64()) p.updateStorageMetrics() } // validateTx checks whether a transaction is valid according to the consensus // rules and adheres to some heuristic limits of the local node (price and size). func (p *BlobPool) validateTx(tx *types.Transaction) error { // Ensure the transaction adheres to basic pool filters (type, size, tip) and // consensus rules baseOpts := &txpool.ValidationOptions{ Config: p.chain.Config(), Accept: 1 << types.BlobTxType, MaxSize: txMaxSize, MinTip: p.gasTip.ToBig(), } if err := p.txValidationFn(tx, p.head, p.signer, baseOpts); err != nil { return err } // Ensure the transaction adheres to the stateful pool filters (nonce, balance) stateOpts := &txpool.ValidationOptionsWithState{ State: p.state, FirstNonceGap: func(addr common.Address) uint64 { // Nonce gaps are not permitted in the blob pool, the first gap will // be the next nonce shifted by however many transactions we already // have pooled. return p.state.GetNonce(addr) + uint64(len(p.index[addr])) }, UsedAndLeftSlots: func(addr common.Address) (int, int) { have := len(p.index[addr]) if have >= maxTxsPerAccount { return have, 0 } return have, maxTxsPerAccount - have }, ExistingExpenditure: func(addr common.Address) *big.Int { if spent := p.spent[addr]; spent != nil { return spent.ToBig() } return new(big.Int) }, ExistingCost: func(addr common.Address, nonce uint64) *big.Int { next := p.state.GetNonce(addr) if uint64(len(p.index[addr])) > nonce-next { return p.index[addr][int(nonce-next)].costCap.ToBig() } return nil }, } if err := txpool.ValidateTransactionWithState(tx, p.signer, stateOpts); err != nil { return err } // If the transaction replaces an existing one, ensure that price bumps are // adhered to. var ( from, _ = types.Sender(p.signer, tx) // already validated above next = p.state.GetNonce(from) ) if uint64(len(p.index[from])) > tx.Nonce()-next { prev := p.index[from][int(tx.Nonce()-next)] // Ensure the transaction is different than the one tracked locally if prev.hash == tx.Hash() { return txpool.ErrAlreadyKnown } // Account can support the replacement, but the price bump must also be met switch { case tx.GasFeeCapIntCmp(prev.execFeeCap.ToBig()) <= 0: return fmt.Errorf("%w: new tx gas fee cap %v <= %v queued", txpool.ErrReplaceUnderpriced, tx.GasFeeCap(), prev.execFeeCap) case tx.GasTipCapIntCmp(prev.execTipCap.ToBig()) <= 0: return fmt.Errorf("%w: new tx gas tip cap %v <= %v queued", txpool.ErrReplaceUnderpriced, tx.GasTipCap(), prev.execTipCap) case tx.BlobGasFeeCapIntCmp(prev.blobFeeCap.ToBig()) <= 0: return fmt.Errorf("%w: new tx blob gas fee cap %v <= %v queued", txpool.ErrReplaceUnderpriced, tx.BlobGasFeeCap(), prev.blobFeeCap) } var ( multiplier = uint256.NewInt(100 + p.config.PriceBump) onehundred = uint256.NewInt(100) minGasFeeCap = new(uint256.Int).Div(new(uint256.Int).Mul(multiplier, prev.execFeeCap), onehundred) minGasTipCap = new(uint256.Int).Div(new(uint256.Int).Mul(multiplier, prev.execTipCap), onehundred) minBlobGasFeeCap = new(uint256.Int).Div(new(uint256.Int).Mul(multiplier, prev.blobFeeCap), onehundred) ) switch { case tx.GasFeeCapIntCmp(minGasFeeCap.ToBig()) < 0: return fmt.Errorf("%w: new tx gas fee cap %v < %v queued + %d%% replacement penalty", txpool.ErrReplaceUnderpriced, tx.GasFeeCap(), prev.execFeeCap, p.config.PriceBump) case tx.GasTipCapIntCmp(minGasTipCap.ToBig()) < 0: return fmt.Errorf("%w: new tx gas tip cap %v < %v queued + %d%% replacement penalty", txpool.ErrReplaceUnderpriced, tx.GasTipCap(), prev.execTipCap, p.config.PriceBump) case tx.BlobGasFeeCapIntCmp(minBlobGasFeeCap.ToBig()) < 0: return fmt.Errorf("%w: new tx blob gas fee cap %v < %v queued + %d%% replacement penalty", txpool.ErrReplaceUnderpriced, tx.BlobGasFeeCap(), prev.blobFeeCap, p.config.PriceBump) } } return nil } // Has returns an indicator whether subpool has a transaction cached with the // given hash. func (p *BlobPool) Has(hash common.Hash) bool { p.lock.RLock() defer p.lock.RUnlock() return p.lookup.exists(hash) } // Get returns a transaction if it is contained in the pool, or nil otherwise. func (p *BlobPool) Get(hash common.Hash) *types.Transaction { // Track the amount of time waiting to retrieve a fully resolved blob tx from // the pool and the amount of time actually spent on pulling the data from disk. getStart := time.Now() p.lock.RLock() getwaitHist.Update(time.Since(getStart).Nanoseconds()) defer p.lock.RUnlock() defer func(start time.Time) { gettimeHist.Update(time.Since(start).Nanoseconds()) }(time.Now()) // Pull the blob from disk and return an assembled response id, ok := p.lookup.storeidOfTx(hash) if !ok { return nil } data, err := p.store.Get(id) if err != nil { log.Error("Tracked blob transaction missing from store", "hash", hash, "id", id, "err", err) return nil } item := new(types.Transaction) if err = rlp.DecodeBytes(data, item); err != nil { log.Error("Blobs corrupted for traced transaction", "hash", hash, "id", id, "err", err) return nil } return item } // GetBlobs returns a number of blobs are proofs for the given versioned hashes. // This is a utility method for the engine API, enabling consensus clients to // retrieve blobs from the pools directly instead of the network. func (p *BlobPool) GetBlobs(vhashes []common.Hash) ([]*kzg4844.Blob, []*kzg4844.Proof) { // Create a map of the blob hash to indices for faster fills var ( blobs = make([]*kzg4844.Blob, len(vhashes)) proofs = make([]*kzg4844.Proof, len(vhashes)) ) index := make(map[common.Hash]int) for i, vhash := range vhashes { index[vhash] = i } // Iterate over the blob hashes, pulling transactions that fill it. Take care // to also fill anything else the transaction might include (probably will). for i, vhash := range vhashes { // If already filled by a previous fetch, skip if blobs[i] != nil { continue } // Unfilled, retrieve the datastore item (in a short lock) p.lock.RLock() id, exists := p.lookup.storeidOfBlob(vhash) if !exists { p.lock.RUnlock() continue } data, err := p.store.Get(id) p.lock.RUnlock() // After releasing the lock, try to fill any blobs requested if err != nil { log.Error("Tracked blob transaction missing from store", "id", id, "err", err) continue } item := new(types.Transaction) if err = rlp.DecodeBytes(data, item); err != nil { log.Error("Blobs corrupted for traced transaction", "id", id, "err", err) continue } // Fill anything requested, not just the current versioned hash sidecar := item.BlobTxSidecar() for j, blobhash := range item.BlobHashes() { if idx, ok := index[blobhash]; ok { blobs[idx] = &sidecar.Blobs[j] proofs[idx] = &sidecar.Proofs[j] } } } return blobs, proofs } // Add inserts a set of blob transactions into the pool if they pass validation (both // consensus validity and pool restrictions). func (p *BlobPool) Add(txs []*types.Transaction, local bool, sync bool) []error { var ( adds = make([]*types.Transaction, 0, len(txs)) errs = make([]error, len(txs)) ) for i, tx := range txs { errs[i] = p.add(tx) if errs[i] == nil { adds = append(adds, tx.WithoutBlobTxSidecar()) } } if len(adds) > 0 { p.discoverFeed.Send(core.NewTxsEvent{Txs: adds}) p.insertFeed.Send(core.NewTxsEvent{Txs: adds}) } return errs } // add inserts a new blob transaction into the pool if it passes validation (both // consensus validity and pool restrictions). func (p *BlobPool) add(tx *types.Transaction) (err error) { // The blob pool blocks on adding a transaction. This is because blob txs are // only even pulled from the network, so this method will act as the overload // protection for fetches. waitStart := time.Now() p.lock.Lock() addwaitHist.Update(time.Since(waitStart).Nanoseconds()) defer p.lock.Unlock() defer func(start time.Time) { addtimeHist.Update(time.Since(start).Nanoseconds()) }(time.Now()) // Ensure the transaction is valid from all perspectives if err := p.validateTx(tx); err != nil { log.Trace("Transaction validation failed", "hash", tx.Hash(), "err", err) switch { case errors.Is(err, txpool.ErrUnderpriced): addUnderpricedMeter.Mark(1) case errors.Is(err, core.ErrNonceTooLow): addStaleMeter.Mark(1) case errors.Is(err, core.ErrNonceTooHigh): addGappedMeter.Mark(1) case errors.Is(err, core.ErrInsufficientFunds): addOverdraftedMeter.Mark(1) case errors.Is(err, txpool.ErrAccountLimitExceeded): addOvercappedMeter.Mark(1) case errors.Is(err, txpool.ErrReplaceUnderpriced): addNoreplaceMeter.Mark(1) default: addInvalidMeter.Mark(1) } return err } // If the address is not yet known, request exclusivity to track the account // only by this subpool until all transactions are evicted from, _ := types.Sender(p.signer, tx) // already validated above if _, ok := p.index[from]; !ok { if err := p.reserve(from, true); err != nil { addNonExclusiveMeter.Mark(1) return err } defer func() { // If the transaction is rejected by some post-validation check, remove // the lock on the reservation set. // // Note, `err` here is the named error return, which will be initialized // by a return statement before running deferred methods. Take care with // removing or subscoping err as it will break this clause. if err != nil { p.reserve(from, false) } }() } // Transaction permitted into the pool from a nonce and cost perspective, // insert it into the database and update the indices blob, err := rlp.EncodeToBytes(tx) if err != nil { log.Error("Failed to encode transaction for storage", "hash", tx.Hash(), "err", err) return err } id, err := p.store.Put(blob) if err != nil { return err } meta := newBlobTxMeta(id, p.store.Size(id), tx) var ( next = p.state.GetNonce(from) offset = int(tx.Nonce() - next) newacc = false ) var oldEvictionExecFeeJumps, oldEvictionBlobFeeJumps float64 if txs, ok := p.index[from]; ok { oldEvictionExecFeeJumps = txs[len(txs)-1].evictionExecFeeJumps oldEvictionBlobFeeJumps = txs[len(txs)-1].evictionBlobFeeJumps } if len(p.index[from]) > offset { // Transaction replaces a previously queued one dropReplacedMeter.Mark(1) prev := p.index[from][offset] if err := p.store.Delete(prev.id); err != nil { // Shitty situation, but try to recover gracefully instead of going boom log.Error("Failed to delete replaced transaction", "id", prev.id, "err", err) } // Update the transaction index p.index[from][offset] = meta p.spent[from] = new(uint256.Int).Sub(p.spent[from], prev.costCap) p.spent[from] = new(uint256.Int).Add(p.spent[from], meta.costCap) p.lookup.untrack(prev) p.lookup.track(meta) p.stored += uint64(meta.size) - uint64(prev.size) } else { // Transaction extends previously scheduled ones p.index[from] = append(p.index[from], meta) if _, ok := p.spent[from]; !ok { p.spent[from] = new(uint256.Int) newacc = true } p.spent[from] = new(uint256.Int).Add(p.spent[from], meta.costCap) p.lookup.track(meta) p.stored += uint64(meta.size) } // Recompute the rolling eviction fields. In case of a replacement, this will // recompute all subsequent fields. In case of an append, this will only do // the fresh calculation. txs := p.index[from] for i := offset; i < len(txs); i++ { // The first transaction will always use itself if i == 0 { txs[0].evictionExecTip = txs[0].execTipCap txs[0].evictionExecFeeJumps = txs[0].basefeeJumps txs[0].evictionBlobFeeJumps = txs[0].blobfeeJumps continue } // Subsequent transactions will use a rolling calculation txs[i].evictionExecTip = txs[i-1].evictionExecTip if txs[i].evictionExecTip.Cmp(txs[i].execTipCap) > 0 { txs[i].evictionExecTip = txs[i].execTipCap } txs[i].evictionExecFeeJumps = txs[i-1].evictionExecFeeJumps if txs[i].evictionExecFeeJumps > txs[i].basefeeJumps { txs[i].evictionExecFeeJumps = txs[i].basefeeJumps } txs[i].evictionBlobFeeJumps = txs[i-1].evictionBlobFeeJumps if txs[i].evictionBlobFeeJumps > txs[i].blobfeeJumps { txs[i].evictionBlobFeeJumps = txs[i].blobfeeJumps } } // Update the eviction heap with the new information: // - If the transaction is from a new account, add it to the heap // - If the account had a singleton tx replaced, update the heap (new price caps) // - If the account has a transaction replaced or appended, update the heap if significantly changed switch { case newacc: heap.Push(p.evict, from) case len(txs) == 1: // 1 tx and not a new acc, must be replacement heap.Fix(p.evict, p.evict.index[from]) default: // replacement or new append evictionExecFeeDiff := oldEvictionExecFeeJumps - txs[len(txs)-1].evictionExecFeeJumps evictionBlobFeeDiff := oldEvictionBlobFeeJumps - txs[len(txs)-1].evictionBlobFeeJumps if math.Abs(evictionExecFeeDiff) > 0.001 || math.Abs(evictionBlobFeeDiff) > 0.001 { // need math.Abs, can go up and down heap.Fix(p.evict, p.evict.index[from]) } } // If the pool went over the allowed data limit, evict transactions until // we're again below the threshold for p.stored > p.config.Datacap { p.drop() } p.updateStorageMetrics() addValidMeter.Mark(1) return nil } // drop removes the worst transaction from the pool. It is primarily used when a // freshly added transaction overflows the pool and needs to evict something. The // method is also called on startup if the user resizes their storage, might be an // expensive run but it should be fine-ish. func (p *BlobPool) drop() { // Peek at the account with the worse transaction set to evict from (Go's heap // stores the minimum at index zero of the heap slice) and retrieve it's last // transaction. var ( from = p.evict.addrs[0] // cannot call drop on empty pool txs = p.index[from] drop = txs[len(txs)-1] last = len(txs) == 1 ) // Remove the transaction from the pool's index if last { delete(p.index, from) delete(p.spent, from) p.reserve(from, false) } else { txs[len(txs)-1] = nil txs = txs[:len(txs)-1] p.index[from] = txs p.spent[from] = new(uint256.Int).Sub(p.spent[from], drop.costCap) } p.stored -= uint64(drop.size) p.lookup.untrack(drop) // Remove the transaction from the pool's eviction heap: // - If the entire account was dropped, pop off the address // - Otherwise, if the new tail has better eviction caps, fix the heap if last { heap.Pop(p.evict) } else { tail := txs[len(txs)-1] // new tail, surely exists evictionExecFeeDiff := tail.evictionExecFeeJumps - drop.evictionExecFeeJumps evictionBlobFeeDiff := tail.evictionBlobFeeJumps - drop.evictionBlobFeeJumps if evictionExecFeeDiff > 0.001 || evictionBlobFeeDiff > 0.001 { // no need for math.Abs, monotonic decreasing heap.Fix(p.evict, 0) } } // Remove the transaction from the data store log.Debug("Evicting overflown blob transaction", "from", from, "evicted", drop.nonce, "id", drop.id) dropOverflownMeter.Mark(1) if err := p.store.Delete(drop.id); err != nil { log.Error("Failed to drop evicted transaction", "id", drop.id, "err", err) } } // Pending retrieves all currently processable transactions, grouped by origin // account and sorted by nonce. // // The transactions can also be pre-filtered by the dynamic fee components to // reduce allocations and load on downstream subsystems. func (p *BlobPool) Pending(filter txpool.PendingFilter) map[common.Address][]*txpool.LazyTransaction { // If only plain transactions are requested, this pool is unsuitable as it // contains none, don't even bother. if filter.OnlyPlainTxs { return nil } // Track the amount of time waiting to retrieve the list of pending blob txs // from the pool and the amount of time actually spent on assembling the data. // The latter will be pretty much moot, but we've kept it to have symmetric // across all user operations. pendStart := time.Now() p.lock.RLock() pendwaitHist.Update(time.Since(pendStart).Nanoseconds()) defer p.lock.RUnlock() execStart := time.Now() defer func() { pendtimeHist.Update(time.Since(execStart).Nanoseconds()) }() pending := make(map[common.Address][]*txpool.LazyTransaction, len(p.index)) for addr, txs := range p.index { lazies := make([]*txpool.LazyTransaction, 0, len(txs)) for _, tx := range txs { // If transaction filtering was requested, discard badly priced ones if filter.MinTip != nil && filter.BaseFee != nil { if tx.execFeeCap.Lt(filter.BaseFee) { break // basefee too low, cannot be included, discard rest of txs from the account } tip := new(uint256.Int).Sub(tx.execFeeCap, filter.BaseFee) if tip.Gt(tx.execTipCap) { tip = tx.execTipCap } if tip.Lt(filter.MinTip) { break // allowed or remaining tip too low, cannot be included, discard rest of txs from the account } } if filter.BlobFee != nil { if tx.blobFeeCap.Lt(filter.BlobFee) { break // blobfee too low, cannot be included, discard rest of txs from the account } } // Transaction was accepted according to the filter, append to the pending list lazies = append(lazies, &txpool.LazyTransaction{ Pool: p, Hash: tx.hash, Time: execStart, // TODO(karalabe): Maybe save these and use that? GasFeeCap: tx.execFeeCap, GasTipCap: tx.execTipCap, Gas: tx.execGas, BlobGas: tx.blobGas, }) } if len(lazies) > 0 { pending[addr] = lazies } } return pending } // updateStorageMetrics retrieves a bunch of stats from the data store and pushes // them out as metrics. func (p *BlobPool) updateStorageMetrics() { stats := p.store.Infos() var ( dataused uint64 datareal uint64 slotused uint64 oversizedDataused uint64 oversizedDatagaps uint64 oversizedSlotused uint64 oversizedSlotgaps uint64 ) for _, shelf := range stats.Shelves { slotDataused := shelf.FilledSlots * uint64(shelf.SlotSize) slotDatagaps := shelf.GappedSlots * uint64(shelf.SlotSize) dataused += slotDataused datareal += slotDataused + slotDatagaps slotused += shelf.FilledSlots metrics.GetOrRegisterGauge(fmt.Sprintf(shelfDatausedGaugeName, shelf.SlotSize/blobSize), nil).Update(int64(slotDataused)) metrics.GetOrRegisterGauge(fmt.Sprintf(shelfDatagapsGaugeName, shelf.SlotSize/blobSize), nil).Update(int64(slotDatagaps)) metrics.GetOrRegisterGauge(fmt.Sprintf(shelfSlotusedGaugeName, shelf.SlotSize/blobSize), nil).Update(int64(shelf.FilledSlots)) metrics.GetOrRegisterGauge(fmt.Sprintf(shelfSlotgapsGaugeName, shelf.SlotSize/blobSize), nil).Update(int64(shelf.GappedSlots)) if shelf.SlotSize/blobSize > maxBlobsPerTransaction { oversizedDataused += slotDataused oversizedDatagaps += slotDatagaps oversizedSlotused += shelf.FilledSlots oversizedSlotgaps += shelf.GappedSlots } } datausedGauge.Update(int64(dataused)) datarealGauge.Update(int64(datareal)) slotusedGauge.Update(int64(slotused)) oversizedDatausedGauge.Update(int64(oversizedDataused)) oversizedDatagapsGauge.Update(int64(oversizedDatagaps)) oversizedSlotusedGauge.Update(int64(oversizedSlotused)) oversizedSlotgapsGauge.Update(int64(oversizedSlotgaps)) p.updateLimboMetrics() } // updateLimboMetrics retrieves a bunch of stats from the limbo store and pushes // them out as metrics. func (p *BlobPool) updateLimboMetrics() { stats := p.limbo.store.Infos() var ( dataused uint64 datareal uint64 slotused uint64 ) for _, shelf := range stats.Shelves { slotDataused := shelf.FilledSlots * uint64(shelf.SlotSize) slotDatagaps := shelf.GappedSlots * uint64(shelf.SlotSize) dataused += slotDataused datareal += slotDataused + slotDatagaps slotused += shelf.FilledSlots metrics.GetOrRegisterGauge(fmt.Sprintf(limboShelfDatausedGaugeName, shelf.SlotSize/blobSize), nil).Update(int64(slotDataused)) metrics.GetOrRegisterGauge(fmt.Sprintf(limboShelfDatagapsGaugeName, shelf.SlotSize/blobSize), nil).Update(int64(slotDatagaps)) metrics.GetOrRegisterGauge(fmt.Sprintf(limboShelfSlotusedGaugeName, shelf.SlotSize/blobSize), nil).Update(int64(shelf.FilledSlots)) metrics.GetOrRegisterGauge(fmt.Sprintf(limboShelfSlotgapsGaugeName, shelf.SlotSize/blobSize), nil).Update(int64(shelf.GappedSlots)) } limboDatausedGauge.Update(int64(dataused)) limboDatarealGauge.Update(int64(datareal)) limboSlotusedGauge.Update(int64(slotused)) } // SubscribeTransactions registers a subscription for new transaction events, // supporting feeding only newly seen or also resurrected transactions. func (p *BlobPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription { if reorgs { return p.insertFeed.Subscribe(ch) } else { return p.discoverFeed.Subscribe(ch) } } // Nonce returns the next nonce of an account, with all transactions executable // by the pool already applied on top. func (p *BlobPool) Nonce(addr common.Address) uint64 { // We need a write lock here, since state.GetNonce might write the cache. p.lock.Lock() defer p.lock.Unlock() if txs, ok := p.index[addr]; ok { return txs[len(txs)-1].nonce + 1 } return p.state.GetNonce(addr) } // Stats retrieves the current pool stats, namely the number of pending and the // number of queued (non-executable) transactions. func (p *BlobPool) Stats() (int, int) { p.lock.RLock() defer p.lock.RUnlock() var pending int for _, txs := range p.index { pending += len(txs) } return pending, 0 // No non-executable txs in the blob pool } // Content retrieves the data content of the transaction pool, returning all the // pending as well as queued transactions, grouped by account and sorted by nonce. // // For the blob pool, this method will return nothing for now. // TODO(karalabe): Abstract out the returned metadata. func (p *BlobPool) Content() (map[common.Address][]*types.Transaction, map[common.Address][]*types.Transaction) { return make(map[common.Address][]*types.Transaction), make(map[common.Address][]*types.Transaction) } // ContentFrom retrieves the data content of the transaction pool, returning the // pending as well as queued transactions of this address, grouped by nonce. // // For the blob pool, this method will return nothing for now. // TODO(karalabe): Abstract out the returned metadata. func (p *BlobPool) ContentFrom(addr common.Address) ([]*types.Transaction, []*types.Transaction) { return []*types.Transaction{}, []*types.Transaction{} } // Locals retrieves the accounts currently considered local by the pool. // // There is no notion of local accounts in the blob pool. func (p *BlobPool) Locals() []common.Address { return []common.Address{} } // Status returns the known status (unknown/pending/queued) of a transaction // identified by their hashes. func (p *BlobPool) Status(hash common.Hash) txpool.TxStatus { if p.Has(hash) { return txpool.TxStatusPending } return txpool.TxStatusUnknown } // Clear implements txpool.SubPool, removing all tracked transactions // from the blob pool and persistent store. func (p *BlobPool) Clear() { p.lock.Lock() defer p.lock.Unlock() // manually iterating and deleting every entry is super sub-optimal // However, Clear is not currently used in production so // performance is not critical at the moment. for hash := range p.lookup.txIndex { id, _ := p.lookup.storeidOfTx(hash) if err := p.store.Delete(id); err != nil { log.Warn("failed to delete blob tx from backing store", "err", err) } } for hash := range p.lookup.blobIndex { id, _ := p.lookup.storeidOfBlob(hash) if err := p.store.Delete(id); err != nil { log.Warn("failed to delete blob from backing store", "err", err) } } // unreserve each tracked account. Ideally, we could just clear the // reservation map in the parent txpool context. However, if we clear in // parent context, to avoid exposing the subpool lock, we have to lock the // reservations and then lock each subpool. // // This creates the potential for a deadlock situation: // // * TxPool.Clear locks the reservations // * a new transaction is received which locks the subpool mutex // * TxPool.Clear attempts to lock subpool mutex // // The transaction addition may attempt to reserve the sender addr which // can't happen until Clear releases the reservation lock. Clear cannot // acquire the subpool lock until the transaction addition is completed. for acct := range p.index { p.reserve(acct, false) } p.lookup = newLookup() p.index = make(map[common.Address][]*blobTxMeta) p.spent = make(map[common.Address]*uint256.Int) var ( basefee = uint256.MustFromBig(eip1559.CalcBaseFee(p.chain.Config(), p.head)) blobfee = uint256.NewInt(params.BlobTxMinBlobGasprice) ) p.evict = newPriceHeap(basefee, blobfee, p.index) }