core/filtermaps: use unindexed search as a fallback

This commit is contained in:
Zsolt Felfoldi 2024-10-01 02:14:00 +02:00
parent 00d8c9ba5c
commit 4ad24e0b07
5 changed files with 180 additions and 183 deletions

View File

@ -57,21 +57,23 @@ func (f *FilterMaps) updateLoop() {
head = f.chain.CurrentBlock()
stop bool
syncMatcher *FilterMapsMatcherBackend
fmr = f.getRange()
)
matcherSync := func() {
if syncMatcher != nil && fmr.headBlockHash == head.Hash() {
syncMatcher.synced(head)
syncMatcher = nil
}
}
defer func() {
sub.Unsubscribe()
if syncMatcher != nil {
syncMatcher.synced(head)
syncMatcher = nil
}
matcherSync()
}()
wait := func() {
if syncMatcher != nil {
syncMatcher.synced(head)
syncMatcher = nil
}
matcherSync()
if stop {
return
}
@ -98,7 +100,7 @@ func (f *FilterMaps) updateLoop() {
return
}
}
fmr := f.getRange()
fmr = f.getRange()
for !stop {
if !fmr.initialized {
@ -106,10 +108,6 @@ func (f *FilterMaps) updateLoop() {
return
}
if syncMatcher != nil {
syncMatcher.synced(head)
syncMatcher = nil
}
fmr = f.getRange()
if !fmr.initialized {
wait()
@ -127,10 +125,7 @@ func (f *FilterMaps) updateLoop() {
continue
}
}
if syncMatcher != nil {
syncMatcher.synced(head)
syncMatcher = nil
}
matcherSync()
// log index head is at latest chain head; process tail blocks if possible
if f.tryUpdateTail(head, func() bool {
// return true if tail processing needs to be stopped

View File

@ -47,99 +47,11 @@ type SyncRange struct {
}
// GetPotentialMatches returns a list of logs that are potential matches for the
// given filter criteria. If parts of the requested range are not indexed then
// an error is returned. If parts of the requested range are changed during the
// search process then potentially incorrect logs are discarded and searched
// again, ensuring that the returned results are always consistent with the latest
// state of the chain.
// If firstBlock or lastBlock are bigger than the head block number then they are
// substituted with the latest head of the chain, ensuring that a search until
// the head block is still consistent with the latest canonical chain if a new
// head has been added during the process.
// Note that the returned list may still contain false positives.
func GetPotentialMatches(ctx context.Context, backend MatcherBackend, firstBlock, lastBlock uint64, addresses []common.Address, topics [][]common.Hash) ([]*types.Log, *types.Header, uint64, uint64, error) {
if firstBlock > lastBlock {
return nil, nil, 0, 0, errors.New("invalid search range")
}
// enforce a consistent state before starting the search in order to be able
// to determine valid range later
syncRange, err := backend.SyncLogIndex(ctx)
if err != nil {
return nil, nil, 0, 0, err
}
headBlock := syncRange.Head.Number.Uint64() // Head is guaranteed != nil
// if haveMatches == true then matches correspond to the block number range
// between matchFirst and matchLast
var (
matches []*types.Log
haveMatches bool
matchFirst, matchLast uint64
)
for !haveMatches || (matchLast < lastBlock && matchLast < headBlock) {
// determine range to be searched; for simplicity we only extend the most
// recent end of the existing match set by matching between searchFirst
// and searchLast.
searchFirst, searchLast := firstBlock, lastBlock
if searchFirst > headBlock {
searchFirst = headBlock
}
if searchLast > headBlock {
searchLast = headBlock
}
if haveMatches && matchFirst != searchFirst {
// searchFirst might change if firstBlock > headBlock
matches, haveMatches = nil, false
}
if haveMatches && matchLast >= searchFirst {
searchFirst = matchLast + 1
}
// check if indexed range covers the requested range
if !syncRange.Indexed || syncRange.FirstIndexed > searchFirst || syncRange.LastIndexed < searchLast {
return nil, nil, 0, 0, errors.New("log index not available for requested range")
}
// search for matches in the required range
newMatches, err := getPotentialMatches(ctx, backend, searchFirst, searchLast, addresses, topics)
if err != nil {
return nil, nil, 0, 0, err
}
// enforce a consistent state again in order to determine the guaranteed
// valid range in which the log index has not been changed since the last
// sync.
syncRange, err = backend.SyncLogIndex(ctx)
if err != nil {
return nil, nil, 0, 0, err
}
headBlock = syncRange.Head.Number.Uint64()
// return with error if the beginning of the recently searched range might
// be invalid due to removed log index
if !syncRange.Valid || syncRange.FirstValid > searchFirst || syncRange.LastValid < searchFirst {
return nil, nil, 0, 0, errors.New("log index not available for requested range")
}
// roll back most recent matches if they are not covered by the guaranteed
// valid range
if syncRange.LastValid < searchLast {
for len(newMatches) > 0 && newMatches[len(newMatches)-1].BlockNumber > syncRange.LastValid {
newMatches = newMatches[:len(newMatches)-1]
}
searchLast = syncRange.LastValid
}
// append new matches to existing ones if the were any
if haveMatches {
matches = append(matches, newMatches...)
} else {
matches, haveMatches = newMatches, true
}
matchLast = searchLast
}
return matches, syncRange.Head, firstBlock, matchLast, nil
}
// getPotentialMatches returns a list of logs that are potential matches for the
// given filter criteria. If parts of the log index in the searched range are
// missing or changed during the search process then the resulting logs belonging
// to that block range might be missing or incorrect.
// Also note that the returned list may contain false positives.
func getPotentialMatches(ctx context.Context, backend MatcherBackend, firstBlock, lastBlock uint64, addresses []common.Address, topics [][]common.Hash) ([]*types.Log, error) {
func GetPotentialMatches(ctx context.Context, backend MatcherBackend, firstBlock, lastBlock uint64, addresses []common.Address, topics [][]common.Hash) ([]*types.Log, error) {
params := backend.GetParams()
// find the log value index range to search
firstIndex, err := backend.GetBlockLvPointer(ctx, firstBlock)

View File

@ -111,8 +111,8 @@ func (fm *FilterMapsMatcherBackend) synced(head *types.Header) {
}
// SyncLogIndex ensures that the log index is consistent with the current state
// of the chain (note that it may or may not be actually synced up to the head).
// It blocks until this state is achieved.
// of the chain and is synced up to the current head. It blocks until this state
// is achieved or the context is cancelled.
// If successful, it returns a SyncRange that contains the latest chain head,
// the indexed range that is currently consistent with the chain and the valid
// range that has not been changed and has been consistent with all states of the

View File

@ -22,7 +22,6 @@ import (
"math"
"math/big"
"slices"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/filtermaps"
@ -93,117 +92,202 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
return nil, errPendingLogsUnsupported
}
resolveSpecial := func(number int64) (int64, error) {
resolveSpecial := func(number int64) (uint64, error) {
switch number {
case rpc.LatestBlockNumber.Int64(), rpc.PendingBlockNumber.Int64():
// we should return head here since we've already captured
// that we need to get the pending logs in the pending boolean above
return math.MaxInt64, nil
case rpc.LatestBlockNumber.Int64():
// when searching from and/or until the current head, we resolve it
// to MaxUint64 which is translated by rangeLogs to the actual head
// in each iteration, ensuring that the head block will be searched
// even if the chain is updated during search.
return math.MaxUint64, nil
case rpc.FinalizedBlockNumber.Int64():
hdr, _ := f.sys.backend.HeaderByNumber(ctx, rpc.FinalizedBlockNumber)
if hdr == nil {
return 0, errors.New("finalized header not found")
}
return hdr.Number.Int64(), nil
return hdr.Number.Uint64(), nil
case rpc.SafeBlockNumber.Int64():
hdr, _ := f.sys.backend.HeaderByNumber(ctx, rpc.SafeBlockNumber)
if hdr == nil {
return 0, errors.New("safe header not found")
}
return hdr.Number.Int64(), nil
default:
return number, nil
return hdr.Number.Uint64(), nil
}
if number < 0 {
return 0, errors.New("negative block number")
}
return uint64(number), nil
}
var err error
// range query need to resolve the special begin/end block number
if f.begin, err = resolveSpecial(f.begin); err != nil {
begin, err := resolveSpecial(f.begin)
if err != nil {
return nil, err
}
if f.end, err = resolveSpecial(f.end); err != nil {
end, err := resolveSpecial(f.end)
if err != nil {
return nil, err
}
return f.rangeLogs(ctx, begin, end)
}
func (f *Filter) rangeLogs(ctx context.Context, firstBlock, lastBlock uint64) ([]*types.Log, error) {
if firstBlock > lastBlock {
return nil, errors.New("invalid search range")
}
mb := f.sys.backend.NewMatcherBackend()
defer mb.Close()
// enforce a consistent state before starting the search in order to be able
// to determine valid range later
syncRange, err := mb.SyncLogIndex(ctx)
if err != nil {
return nil, err
}
headBlock := syncRange.Head.Number.Uint64() // Head is guaranteed != nil
// if haveMatches == true then matches correspond to the block number range
// between matchFirst and matchLast
var (
matches []*types.Log
haveMatches, forceUnindexed bool
matchFirst, matchLast uint64
)
trimMatches := func(trimFirst, trimLast uint64) {
if !haveMatches {
return
}
if trimLast < matchFirst || trimFirst > matchLast {
matches, haveMatches = nil, false
return
}
if trimFirst > matchFirst {
for len(matches) > 0 && matches[0].BlockNumber < trimFirst {
matches = matches[1:]
}
matchFirst = trimFirst
}
if trimLast < matchLast {
for len(matches) > 0 && matches[len(matches)-1].BlockNumber > trimLast {
matches = matches[:len(matches)-1]
}
matchLast = trimLast
}
}
start := time.Now()
mb := f.sys.backend.NewMatcherBackend()
logs, _, _, _, err := filtermaps.GetPotentialMatches(ctx, mb, uint64(f.begin), uint64(f.end), f.addresses, f.topics)
mb.Close()
if err == filtermaps.ErrMatchAll {
// revert to legacy filter
hdr, _ := f.sys.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber)
if hdr == nil {
return nil, errors.New("latest header not found")
for {
// determine range to be searched; for simplicity we only extend the most
// recent end of the existing match set by matching between searchFirst
// and searchLast.
searchFirst, searchLast := firstBlock, lastBlock
if searchFirst > headBlock {
searchFirst = headBlock
}
headNumber := hdr.Number.Int64()
if f.begin > headNumber {
f.begin = headNumber
if searchLast > headBlock {
searchLast = headBlock
}
if f.end > headNumber {
f.end = headNumber
trimMatches(searchFirst, searchLast)
if haveMatches && matchFirst == searchFirst && matchLast == searchLast {
return matches, nil
}
logChan, errChan := f.rangeLogsAsync(ctx)
var logs []*types.Log
for {
select {
case log := <-logChan:
logs = append(logs, log)
case err := <-errChan:
return logs, err
var trimTailIfNotValid uint64
if haveMatches && matchFirst > searchFirst {
// missing tail section; do unindexed search
tailMatches, err := f.unindexedLogs(ctx, searchFirst, matchFirst-1)
if err != nil {
return matches, err
}
matches = append(tailMatches, matches...)
matchFirst = searchFirst
// unindexed results are not affected by valid tail; do not trim tail
trimTailIfNotValid = math.MaxUint64
}
// now if we have matches, they start at searchFirst
if haveMatches {
searchFirst = matchLast + 1
if !syncRange.Indexed || syncRange.FirstIndexed > searchFirst {
forceUnindexed = true
}
}
var newMatches []*types.Log
if !syncRange.Indexed || syncRange.FirstIndexed > searchLast || syncRange.LastIndexed < searchFirst {
forceUnindexed = true
}
if !forceUnindexed {
if syncRange.FirstIndexed > searchFirst {
searchFirst = syncRange.FirstIndexed
}
if syncRange.LastIndexed > searchLast {
searchLast = syncRange.LastIndexed
}
newMatches, err = f.indexedLogs(ctx, mb, searchFirst, searchLast)
// trim tail if it affects the indexed search range
trimTailIfNotValid = searchFirst
if err == filtermaps.ErrMatchAll {
// "match all" filters are not supported by filtermaps; fall back
// to unindexed search which is the most efficient in this case
forceUnindexed = true
}
}
if forceUnindexed {
newMatches, err = f.unindexedLogs(ctx, searchFirst, searchLast)
// unindexed results are not affected by valid tail; do not trim tail
trimTailIfNotValid = math.MaxUint64
}
if err != nil {
return matches, err
}
if matches == nil {
matches = newMatches
haveMatches, matchFirst, matchLast = true, searchFirst, searchLast
} else {
matches = append(matches, newMatches...)
matchLast = searchLast
}
syncRange, err = mb.SyncLogIndex(ctx)
if err != nil {
return matches, err
}
if !syncRange.Valid {
matches, haveMatches = nil, false
} else {
if syncRange.FirstValid > trimTailIfNotValid {
trimMatches(syncRange.FirstValid, syncRange.LastValid)
} else {
trimMatches(0, syncRange.LastValid)
}
}
}
fmLogs := filterLogs(logs, nil, nil, f.addresses, f.topics)
log.Debug("Finished log search", "run time", time.Since(start), "true matches", len(fmLogs), "false positives", len(logs)-len(fmLogs))
return fmLogs, err
}
// rangeLogsAsync retrieves block-range logs that match the filter criteria asynchronously,
// it creates and returns two channels: one for delivering log data, and one for reporting errors.
func (f *Filter) rangeLogsAsync(ctx context.Context) (chan *types.Log, chan error) {
var (
logChan = make(chan *types.Log)
errChan = make(chan error)
)
go func() {
defer func() {
close(errChan)
close(logChan)
}()
if err := f.unindexedLogs(ctx, uint64(f.end), logChan); err != nil {
errChan <- err
return
}
errChan <- nil
}()
return logChan, errChan
func (f *Filter) indexedLogs(ctx context.Context, mb filtermaps.MatcherBackend, begin, end uint64) ([]*types.Log, error) {
logs, err := filtermaps.GetPotentialMatches(ctx, mb, begin, end, f.addresses, f.topics)
logs = filterLogs(logs, nil, nil, f.addresses, f.topics)
return logs, err
}
// unindexedLogs returns the logs matching the filter criteria based on raw block
// iteration and bloom matching.
func (f *Filter) unindexedLogs(ctx context.Context, end uint64, logChan chan *types.Log) error {
for ; f.begin <= int64(end); f.begin++ {
header, err := f.sys.backend.HeaderByNumber(ctx, rpc.BlockNumber(f.begin))
func (f *Filter) unindexedLogs(ctx context.Context, begin, end uint64) ([]*types.Log, error) {
log.Warn("Performing unindexed log search", "begin", begin, "end", end)
var logs []*types.Log
for blockNumber := begin; blockNumber <= end; blockNumber++ {
select {
case <-ctx.Done():
return logs, ctx.Err()
default:
}
header, err := f.sys.backend.HeaderByNumber(ctx, rpc.BlockNumber(blockNumber))
if header == nil || err != nil {
return err
return logs, err
}
found, err := f.blockLogs(ctx, header)
if err != nil {
return err
}
for _, log := range found {
select {
case logChan <- log:
case <-ctx.Done():
return ctx.Err()
}
return logs, err
}
logs = append(logs, found...)
}
return nil
return logs, nil
}
// blockLogs returns the logs matching the filter criteria within a single block.

View File

@ -20,7 +20,6 @@ import (
"context"
"errors"
"math/big"
"math/rand"
"reflect"
"runtime"
"testing"
@ -29,6 +28,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/ethash"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/filtermaps"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
@ -136,6 +136,12 @@ func (b *testBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subsc
return b.chainFeed.Subscribe(ch)
}
func (b *testBackend) NewMatcherBackend() filtermaps.MatcherBackend {
fm := filtermaps.NewFilterMaps(b.db, b, filtermaps.DefaultParams, 0, false)
fm.Start()
return fm.NewMatcherBackend()
}
func (b *testBackend) setPending(block *types.Block, receipts types.Receipts) {
b.pendingBlock = block
b.pendingReceipts = receipts