eth/filters, eth/tracers: add request cancellation checks (#26320)
This ensures that RPC method handlers will react to a timeout or cancelled request soon after the event occurs. Co-authored-by: Sina Mahmoodi <itz.s1na@gmail.com>
This commit is contained in:
parent
f51f6edb40
commit
f53ff0ff4a
|
@ -405,13 +405,13 @@ type storageEntry struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// StorageRangeAt returns the storage at the given block height and transaction index.
|
// StorageRangeAt returns the storage at the given block height and transaction index.
|
||||||
func (api *DebugAPI) StorageRangeAt(blockHash common.Hash, txIndex int, contractAddress common.Address, keyStart hexutil.Bytes, maxResult int) (StorageRangeResult, error) {
|
func (api *DebugAPI) StorageRangeAt(ctx context.Context, blockHash common.Hash, txIndex int, contractAddress common.Address, keyStart hexutil.Bytes, maxResult int) (StorageRangeResult, error) {
|
||||||
// Retrieve the block
|
// Retrieve the block
|
||||||
block := api.eth.blockchain.GetBlockByHash(blockHash)
|
block := api.eth.blockchain.GetBlockByHash(blockHash)
|
||||||
if block == nil {
|
if block == nil {
|
||||||
return StorageRangeResult{}, fmt.Errorf("block %#x not found", blockHash)
|
return StorageRangeResult{}, fmt.Errorf("block %#x not found", blockHash)
|
||||||
}
|
}
|
||||||
_, _, statedb, release, err := api.eth.stateAtTransaction(block, txIndex, 0)
|
_, _, statedb, release, err := api.eth.stateAtTransaction(ctx, block, txIndex, 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return StorageRangeResult{}, err
|
return StorageRangeResult{}, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -365,9 +365,9 @@ func (b *EthAPIBackend) StartMining(threads int) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *EthAPIBackend) StateAtBlock(ctx context.Context, block *types.Block, reexec uint64, base *state.StateDB, readOnly bool, preferDisk bool) (*state.StateDB, tracers.StateReleaseFunc, error) {
|
func (b *EthAPIBackend) StateAtBlock(ctx context.Context, block *types.Block, reexec uint64, base *state.StateDB, readOnly bool, preferDisk bool) (*state.StateDB, tracers.StateReleaseFunc, error) {
|
||||||
return b.eth.StateAtBlock(block, reexec, base, readOnly, preferDisk)
|
return b.eth.StateAtBlock(ctx, block, reexec, base, readOnly, preferDisk)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *EthAPIBackend) StateAtTransaction(ctx context.Context, block *types.Block, txIndex int, reexec uint64) (core.Message, vm.BlockContext, *state.StateDB, tracers.StateReleaseFunc, error) {
|
func (b *EthAPIBackend) StateAtTransaction(ctx context.Context, block *types.Block, txIndex int, reexec uint64) (core.Message, vm.BlockContext, *state.StateDB, tracers.StateReleaseFunc, error) {
|
||||||
return b.eth.stateAtTransaction(block, txIndex, reexec)
|
return b.eth.stateAtTransaction(ctx, block, txIndex, reexec)
|
||||||
}
|
}
|
||||||
|
|
|
@ -234,6 +234,9 @@ func (f *Filter) unindexedLogs(ctx context.Context, end uint64) ([]*types.Log, e
|
||||||
var logs []*types.Log
|
var logs []*types.Log
|
||||||
|
|
||||||
for ; f.begin <= int64(end); f.begin++ {
|
for ; f.begin <= int64(end); f.begin++ {
|
||||||
|
if f.begin%10 == 0 && ctx.Err() != nil {
|
||||||
|
return logs, ctx.Err()
|
||||||
|
}
|
||||||
header, err := f.sys.backend.HeaderByNumber(ctx, rpc.BlockNumber(f.begin))
|
header, err := f.sys.backend.HeaderByNumber(ctx, rpc.BlockNumber(f.begin))
|
||||||
if header == nil || err != nil {
|
if header == nil || err != nil {
|
||||||
return logs, err
|
return logs, err
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
package eth
|
package eth
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
@ -56,7 +57,7 @@ var noopReleaser = tracers.StateReleaseFunc(func() {})
|
||||||
// - preferDisk: this arg can be used by the caller to signal that even though the 'base' is
|
// - preferDisk: this arg can be used by the caller to signal that even though the 'base' is
|
||||||
// provided, it would be preferable to start from a fresh state, if we have it
|
// provided, it would be preferable to start from a fresh state, if we have it
|
||||||
// on disk.
|
// on disk.
|
||||||
func (eth *Ethereum) StateAtBlock(block *types.Block, reexec uint64, base *state.StateDB, readOnly bool, preferDisk bool) (statedb *state.StateDB, release tracers.StateReleaseFunc, err error) {
|
func (eth *Ethereum) StateAtBlock(ctx context.Context, block *types.Block, reexec uint64, base *state.StateDB, readOnly bool, preferDisk bool) (statedb *state.StateDB, release tracers.StateReleaseFunc, err error) {
|
||||||
var (
|
var (
|
||||||
current *types.Block
|
current *types.Block
|
||||||
database state.Database
|
database state.Database
|
||||||
|
@ -111,6 +112,9 @@ func (eth *Ethereum) StateAtBlock(block *types.Block, reexec uint64, base *state
|
||||||
}
|
}
|
||||||
// Database does not have the state for the given block, try to regenerate
|
// Database does not have the state for the given block, try to regenerate
|
||||||
for i := uint64(0); i < reexec; i++ {
|
for i := uint64(0); i < reexec; i++ {
|
||||||
|
if err := ctx.Err(); err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
if current.NumberU64() == 0 {
|
if current.NumberU64() == 0 {
|
||||||
return nil, nil, errors.New("genesis state is missing")
|
return nil, nil, errors.New("genesis state is missing")
|
||||||
}
|
}
|
||||||
|
@ -142,6 +146,9 @@ func (eth *Ethereum) StateAtBlock(block *types.Block, reexec uint64, base *state
|
||||||
parent common.Hash
|
parent common.Hash
|
||||||
)
|
)
|
||||||
for current.NumberU64() < origin {
|
for current.NumberU64() < origin {
|
||||||
|
if err := ctx.Err(); err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
// Print progress logs if long enough time elapsed
|
// Print progress logs if long enough time elapsed
|
||||||
if time.Since(logged) > 8*time.Second && report {
|
if time.Since(logged) > 8*time.Second && report {
|
||||||
log.Info("Regenerating historical state", "block", current.NumberU64()+1, "target", origin, "remaining", origin-current.NumberU64()-1, "elapsed", time.Since(start))
|
log.Info("Regenerating historical state", "block", current.NumberU64()+1, "target", origin, "remaining", origin-current.NumberU64()-1, "elapsed", time.Since(start))
|
||||||
|
@ -182,7 +189,7 @@ func (eth *Ethereum) StateAtBlock(block *types.Block, reexec uint64, base *state
|
||||||
}
|
}
|
||||||
|
|
||||||
// stateAtTransaction returns the execution environment of a certain transaction.
|
// stateAtTransaction returns the execution environment of a certain transaction.
|
||||||
func (eth *Ethereum) stateAtTransaction(block *types.Block, txIndex int, reexec uint64) (core.Message, vm.BlockContext, *state.StateDB, tracers.StateReleaseFunc, error) {
|
func (eth *Ethereum) stateAtTransaction(ctx context.Context, block *types.Block, txIndex int, reexec uint64) (core.Message, vm.BlockContext, *state.StateDB, tracers.StateReleaseFunc, error) {
|
||||||
// Short circuit if it's genesis block.
|
// Short circuit if it's genesis block.
|
||||||
if block.NumberU64() == 0 {
|
if block.NumberU64() == 0 {
|
||||||
return nil, vm.BlockContext{}, nil, nil, errors.New("no transaction in genesis")
|
return nil, vm.BlockContext{}, nil, nil, errors.New("no transaction in genesis")
|
||||||
|
@ -194,7 +201,7 @@ func (eth *Ethereum) stateAtTransaction(block *types.Block, txIndex int, reexec
|
||||||
}
|
}
|
||||||
// Lookup the statedb of parent block from the live database,
|
// Lookup the statedb of parent block from the live database,
|
||||||
// otherwise regenerate it on the flight.
|
// otherwise regenerate it on the flight.
|
||||||
statedb, release, err := eth.StateAtBlock(parent, reexec, nil, true, false)
|
statedb, release, err := eth.StateAtBlock(ctx, parent, reexec, nil, true, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, vm.BlockContext{}, nil, nil, err
|
return nil, vm.BlockContext{}, nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -549,6 +549,9 @@ func (api *API) IntermediateRoots(ctx context.Context, hash common.Hash, config
|
||||||
deleteEmptyObjects = chainConfig.IsEIP158(block.Number())
|
deleteEmptyObjects = chainConfig.IsEIP158(block.Number())
|
||||||
)
|
)
|
||||||
for i, tx := range block.Transactions() {
|
for i, tx := range block.Transactions() {
|
||||||
|
if err := ctx.Err(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
var (
|
var (
|
||||||
msg, _ = tx.AsMessage(signer, block.BaseFee())
|
msg, _ = tx.AsMessage(signer, block.BaseFee())
|
||||||
txContext = core.NewEVMTxContext(msg)
|
txContext = core.NewEVMTxContext(msg)
|
||||||
|
@ -609,14 +612,13 @@ func (api *API) traceBlock(ctx context.Context, block *types.Block, config *Trac
|
||||||
signer = types.MakeSigner(api.backend.ChainConfig(), block.Number())
|
signer = types.MakeSigner(api.backend.ChainConfig(), block.Number())
|
||||||
txs = block.Transactions()
|
txs = block.Transactions()
|
||||||
results = make([]*txTraceResult, len(txs))
|
results = make([]*txTraceResult, len(txs))
|
||||||
|
pend sync.WaitGroup
|
||||||
pend = new(sync.WaitGroup)
|
|
||||||
jobs = make(chan *txTraceTask, len(txs))
|
|
||||||
)
|
)
|
||||||
threads := runtime.NumCPU()
|
threads := runtime.NumCPU()
|
||||||
if threads > len(txs) {
|
if threads > len(txs) {
|
||||||
threads = len(txs)
|
threads = len(txs)
|
||||||
}
|
}
|
||||||
|
jobs := make(chan *txTraceTask, threads)
|
||||||
blockHash := block.Hash()
|
blockHash := block.Hash()
|
||||||
for th := 0; th < threads; th++ {
|
for th := 0; th < threads; th++ {
|
||||||
pend.Add(1)
|
pend.Add(1)
|
||||||
|
@ -640,12 +642,20 @@ func (api *API) traceBlock(ctx context.Context, block *types.Block, config *Trac
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Feed the transactions into the tracers and return
|
// Feed the transactions into the tracers and return
|
||||||
var failed error
|
var failed error
|
||||||
blockCtx := core.NewEVMBlockContext(block.Header(), api.chainContext(ctx), nil)
|
blockCtx := core.NewEVMBlockContext(block.Header(), api.chainContext(ctx), nil)
|
||||||
|
txloop:
|
||||||
for i, tx := range txs {
|
for i, tx := range txs {
|
||||||
// Send the trace task over for execution
|
// Send the trace task over for execution
|
||||||
jobs <- &txTraceTask{statedb: statedb.Copy(), index: i}
|
task := &txTraceTask{statedb: statedb.Copy(), index: i}
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
failed = ctx.Err()
|
||||||
|
break txloop
|
||||||
|
case jobs <- task:
|
||||||
|
}
|
||||||
|
|
||||||
// Generate the next state snapshot fast without tracing
|
// Generate the next state snapshot fast without tracing
|
||||||
msg, _ := tx.AsMessage(signer, block.BaseFee())
|
msg, _ := tx.AsMessage(signer, block.BaseFee())
|
||||||
|
@ -653,12 +663,13 @@ func (api *API) traceBlock(ctx context.Context, block *types.Block, config *Trac
|
||||||
vmenv := vm.NewEVM(blockCtx, core.NewEVMTxContext(msg), statedb, api.backend.ChainConfig(), vm.Config{})
|
vmenv := vm.NewEVM(blockCtx, core.NewEVMTxContext(msg), statedb, api.backend.ChainConfig(), vm.Config{})
|
||||||
if _, err := core.ApplyMessage(vmenv, msg, new(core.GasPool).AddGas(msg.Gas())); err != nil {
|
if _, err := core.ApplyMessage(vmenv, msg, new(core.GasPool).AddGas(msg.Gas())); err != nil {
|
||||||
failed = err
|
failed = err
|
||||||
break
|
break txloop
|
||||||
}
|
}
|
||||||
// Finalize the state so any modifications are written to the trie
|
// Finalize the state so any modifications are written to the trie
|
||||||
// Only delete empty objects if EIP158/161 (a.k.a Spurious Dragon) is in effect
|
// Only delete empty objects if EIP158/161 (a.k.a Spurious Dragon) is in effect
|
||||||
statedb.Finalise(vmenv.ChainConfig().IsEIP158(block.Number()))
|
statedb.Finalise(vmenv.ChainConfig().IsEIP158(block.Number()))
|
||||||
}
|
}
|
||||||
|
|
||||||
close(jobs)
|
close(jobs)
|
||||||
pend.Wait()
|
pend.Wait()
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue