diff --git a/core/filtermaps/indexer.go b/core/filtermaps/indexer.go index efaa900417..36094ba555 100644 --- a/core/filtermaps/indexer.go +++ b/core/filtermaps/indexer.go @@ -57,21 +57,23 @@ func (f *FilterMaps) updateLoop() { head = f.chain.CurrentBlock() stop bool syncMatcher *FilterMapsMatcherBackend + fmr = f.getRange() ) + matcherSync := func() { + if syncMatcher != nil && fmr.headBlockHash == head.Hash() { + syncMatcher.synced(head) + syncMatcher = nil + } + } + defer func() { sub.Unsubscribe() - if syncMatcher != nil { - syncMatcher.synced(head) - syncMatcher = nil - } + matcherSync() }() wait := func() { - if syncMatcher != nil { - syncMatcher.synced(head) - syncMatcher = nil - } + matcherSync() if stop { return } @@ -98,7 +100,7 @@ func (f *FilterMaps) updateLoop() { return } } - fmr := f.getRange() + fmr = f.getRange() for !stop { if !fmr.initialized { @@ -106,10 +108,6 @@ func (f *FilterMaps) updateLoop() { return } - if syncMatcher != nil { - syncMatcher.synced(head) - syncMatcher = nil - } fmr = f.getRange() if !fmr.initialized { wait() @@ -127,10 +125,7 @@ func (f *FilterMaps) updateLoop() { continue } } - if syncMatcher != nil { - syncMatcher.synced(head) - syncMatcher = nil - } + matcherSync() // log index head is at latest chain head; process tail blocks if possible if f.tryUpdateTail(head, func() bool { // return true if tail processing needs to be stopped diff --git a/core/filtermaps/matcher.go b/core/filtermaps/matcher.go index 64c7c5efe6..04dfe9751b 100644 --- a/core/filtermaps/matcher.go +++ b/core/filtermaps/matcher.go @@ -47,99 +47,11 @@ type SyncRange struct { } // GetPotentialMatches returns a list of logs that are potential matches for the -// given filter criteria. If parts of the requested range are not indexed then -// an error is returned. If parts of the requested range are changed during the -// search process then potentially incorrect logs are discarded and searched -// again, ensuring that the returned results are always consistent with the latest -// state of the chain. -// If firstBlock or lastBlock are bigger than the head block number then they are -// substituted with the latest head of the chain, ensuring that a search until -// the head block is still consistent with the latest canonical chain if a new -// head has been added during the process. -// Note that the returned list may still contain false positives. -func GetPotentialMatches(ctx context.Context, backend MatcherBackend, firstBlock, lastBlock uint64, addresses []common.Address, topics [][]common.Hash) ([]*types.Log, *types.Header, uint64, uint64, error) { - if firstBlock > lastBlock { - return nil, nil, 0, 0, errors.New("invalid search range") - } - // enforce a consistent state before starting the search in order to be able - // to determine valid range later - syncRange, err := backend.SyncLogIndex(ctx) - if err != nil { - return nil, nil, 0, 0, err - } - headBlock := syncRange.Head.Number.Uint64() // Head is guaranteed != nil - // if haveMatches == true then matches correspond to the block number range - // between matchFirst and matchLast - var ( - matches []*types.Log - haveMatches bool - matchFirst, matchLast uint64 - ) - for !haveMatches || (matchLast < lastBlock && matchLast < headBlock) { - // determine range to be searched; for simplicity we only extend the most - // recent end of the existing match set by matching between searchFirst - // and searchLast. - searchFirst, searchLast := firstBlock, lastBlock - if searchFirst > headBlock { - searchFirst = headBlock - } - if searchLast > headBlock { - searchLast = headBlock - } - if haveMatches && matchFirst != searchFirst { - // searchFirst might change if firstBlock > headBlock - matches, haveMatches = nil, false - } - if haveMatches && matchLast >= searchFirst { - searchFirst = matchLast + 1 - } - // check if indexed range covers the requested range - if !syncRange.Indexed || syncRange.FirstIndexed > searchFirst || syncRange.LastIndexed < searchLast { - return nil, nil, 0, 0, errors.New("log index not available for requested range") - } - // search for matches in the required range - newMatches, err := getPotentialMatches(ctx, backend, searchFirst, searchLast, addresses, topics) - if err != nil { - return nil, nil, 0, 0, err - } - // enforce a consistent state again in order to determine the guaranteed - // valid range in which the log index has not been changed since the last - // sync. - syncRange, err = backend.SyncLogIndex(ctx) - if err != nil { - return nil, nil, 0, 0, err - } - headBlock = syncRange.Head.Number.Uint64() - // return with error if the beginning of the recently searched range might - // be invalid due to removed log index - if !syncRange.Valid || syncRange.FirstValid > searchFirst || syncRange.LastValid < searchFirst { - return nil, nil, 0, 0, errors.New("log index not available for requested range") - } - // roll back most recent matches if they are not covered by the guaranteed - // valid range - if syncRange.LastValid < searchLast { - for len(newMatches) > 0 && newMatches[len(newMatches)-1].BlockNumber > syncRange.LastValid { - newMatches = newMatches[:len(newMatches)-1] - } - searchLast = syncRange.LastValid - } - // append new matches to existing ones if the were any - if haveMatches { - matches = append(matches, newMatches...) - } else { - matches, haveMatches = newMatches, true - } - matchLast = searchLast - } - return matches, syncRange.Head, firstBlock, matchLast, nil -} - -// getPotentialMatches returns a list of logs that are potential matches for the // given filter criteria. If parts of the log index in the searched range are // missing or changed during the search process then the resulting logs belonging // to that block range might be missing or incorrect. // Also note that the returned list may contain false positives. -func getPotentialMatches(ctx context.Context, backend MatcherBackend, firstBlock, lastBlock uint64, addresses []common.Address, topics [][]common.Hash) ([]*types.Log, error) { +func GetPotentialMatches(ctx context.Context, backend MatcherBackend, firstBlock, lastBlock uint64, addresses []common.Address, topics [][]common.Hash) ([]*types.Log, error) { params := backend.GetParams() // find the log value index range to search firstIndex, err := backend.GetBlockLvPointer(ctx, firstBlock) diff --git a/core/filtermaps/matcher_backend.go b/core/filtermaps/matcher_backend.go index 7becdd8093..82dd7c37ef 100644 --- a/core/filtermaps/matcher_backend.go +++ b/core/filtermaps/matcher_backend.go @@ -111,8 +111,8 @@ func (fm *FilterMapsMatcherBackend) synced(head *types.Header) { } // SyncLogIndex ensures that the log index is consistent with the current state -// of the chain (note that it may or may not be actually synced up to the head). -// It blocks until this state is achieved. +// of the chain and is synced up to the current head. It blocks until this state +// is achieved or the context is cancelled. // If successful, it returns a SyncRange that contains the latest chain head, // the indexed range that is currently consistent with the chain and the valid // range that has not been changed and has been consistent with all states of the diff --git a/eth/filters/filter.go b/eth/filters/filter.go index 2fcf0945ba..e52ffd9287 100644 --- a/eth/filters/filter.go +++ b/eth/filters/filter.go @@ -22,7 +22,6 @@ import ( "math" "math/big" "slices" - "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/filtermaps" @@ -93,117 +92,202 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) { return nil, errPendingLogsUnsupported } - resolveSpecial := func(number int64) (int64, error) { + resolveSpecial := func(number int64) (uint64, error) { switch number { - case rpc.LatestBlockNumber.Int64(), rpc.PendingBlockNumber.Int64(): - // we should return head here since we've already captured - // that we need to get the pending logs in the pending boolean above - return math.MaxInt64, nil + case rpc.LatestBlockNumber.Int64(): + // when searching from and/or until the current head, we resolve it + // to MaxUint64 which is translated by rangeLogs to the actual head + // in each iteration, ensuring that the head block will be searched + // even if the chain is updated during search. + return math.MaxUint64, nil case rpc.FinalizedBlockNumber.Int64(): hdr, _ := f.sys.backend.HeaderByNumber(ctx, rpc.FinalizedBlockNumber) if hdr == nil { return 0, errors.New("finalized header not found") } - return hdr.Number.Int64(), nil + return hdr.Number.Uint64(), nil case rpc.SafeBlockNumber.Int64(): hdr, _ := f.sys.backend.HeaderByNumber(ctx, rpc.SafeBlockNumber) if hdr == nil { return 0, errors.New("safe header not found") } - return hdr.Number.Int64(), nil - default: - return number, nil + return hdr.Number.Uint64(), nil } + if number < 0 { + return 0, errors.New("negative block number") + } + return uint64(number), nil } - var err error // range query need to resolve the special begin/end block number - if f.begin, err = resolveSpecial(f.begin); err != nil { + begin, err := resolveSpecial(f.begin) + if err != nil { return nil, err } - if f.end, err = resolveSpecial(f.end); err != nil { + end, err := resolveSpecial(f.end) + if err != nil { return nil, err } + return f.rangeLogs(ctx, begin, end) +} + +func (f *Filter) rangeLogs(ctx context.Context, firstBlock, lastBlock uint64) ([]*types.Log, error) { + if firstBlock > lastBlock { + return nil, errors.New("invalid search range") + } + mb := f.sys.backend.NewMatcherBackend() + defer mb.Close() + + // enforce a consistent state before starting the search in order to be able + // to determine valid range later + syncRange, err := mb.SyncLogIndex(ctx) + if err != nil { + return nil, err + } + headBlock := syncRange.Head.Number.Uint64() // Head is guaranteed != nil + // if haveMatches == true then matches correspond to the block number range + // between matchFirst and matchLast + var ( + matches []*types.Log + haveMatches, forceUnindexed bool + matchFirst, matchLast uint64 + ) + trimMatches := func(trimFirst, trimLast uint64) { + if !haveMatches { + return + } + if trimLast < matchFirst || trimFirst > matchLast { + matches, haveMatches = nil, false + return + } + if trimFirst > matchFirst { + for len(matches) > 0 && matches[0].BlockNumber < trimFirst { + matches = matches[1:] + } + matchFirst = trimFirst + } + if trimLast < matchLast { + for len(matches) > 0 && matches[len(matches)-1].BlockNumber > trimLast { + matches = matches[:len(matches)-1] + } + matchLast = trimLast + } + } - start := time.Now() - mb := f.sys.backend.NewMatcherBackend() - logs, _, _, _, err := filtermaps.GetPotentialMatches(ctx, mb, uint64(f.begin), uint64(f.end), f.addresses, f.topics) - mb.Close() - if err == filtermaps.ErrMatchAll { - // revert to legacy filter - hdr, _ := f.sys.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber) - if hdr == nil { - return nil, errors.New("latest header not found") + for { + // determine range to be searched; for simplicity we only extend the most + // recent end of the existing match set by matching between searchFirst + // and searchLast. + searchFirst, searchLast := firstBlock, lastBlock + if searchFirst > headBlock { + searchFirst = headBlock } - headNumber := hdr.Number.Int64() - if f.begin > headNumber { - f.begin = headNumber + if searchLast > headBlock { + searchLast = headBlock } - if f.end > headNumber { - f.end = headNumber + trimMatches(searchFirst, searchLast) + if haveMatches && matchFirst == searchFirst && matchLast == searchLast { + return matches, nil } - logChan, errChan := f.rangeLogsAsync(ctx) - var logs []*types.Log - for { - select { - case log := <-logChan: - logs = append(logs, log) - case err := <-errChan: - return logs, err + var trimTailIfNotValid uint64 + if haveMatches && matchFirst > searchFirst { + // missing tail section; do unindexed search + tailMatches, err := f.unindexedLogs(ctx, searchFirst, matchFirst-1) + if err != nil { + return matches, err + } + matches = append(tailMatches, matches...) + matchFirst = searchFirst + // unindexed results are not affected by valid tail; do not trim tail + trimTailIfNotValid = math.MaxUint64 + } + // now if we have matches, they start at searchFirst + if haveMatches { + searchFirst = matchLast + 1 + if !syncRange.Indexed || syncRange.FirstIndexed > searchFirst { + forceUnindexed = true + } + } + var newMatches []*types.Log + if !syncRange.Indexed || syncRange.FirstIndexed > searchLast || syncRange.LastIndexed < searchFirst { + forceUnindexed = true + } + if !forceUnindexed { + if syncRange.FirstIndexed > searchFirst { + searchFirst = syncRange.FirstIndexed + } + if syncRange.LastIndexed > searchLast { + searchLast = syncRange.LastIndexed + } + newMatches, err = f.indexedLogs(ctx, mb, searchFirst, searchLast) + // trim tail if it affects the indexed search range + trimTailIfNotValid = searchFirst + if err == filtermaps.ErrMatchAll { + // "match all" filters are not supported by filtermaps; fall back + // to unindexed search which is the most efficient in this case + forceUnindexed = true + } + } + if forceUnindexed { + newMatches, err = f.unindexedLogs(ctx, searchFirst, searchLast) + // unindexed results are not affected by valid tail; do not trim tail + trimTailIfNotValid = math.MaxUint64 + } + if err != nil { + return matches, err + } + if matches == nil { + matches = newMatches + haveMatches, matchFirst, matchLast = true, searchFirst, searchLast + } else { + matches = append(matches, newMatches...) + matchLast = searchLast + } + + syncRange, err = mb.SyncLogIndex(ctx) + if err != nil { + return matches, err + } + if !syncRange.Valid { + matches, haveMatches = nil, false + } else { + if syncRange.FirstValid > trimTailIfNotValid { + trimMatches(syncRange.FirstValid, syncRange.LastValid) + } else { + trimMatches(0, syncRange.LastValid) } } } - fmLogs := filterLogs(logs, nil, nil, f.addresses, f.topics) - log.Debug("Finished log search", "run time", time.Since(start), "true matches", len(fmLogs), "false positives", len(logs)-len(fmLogs)) - return fmLogs, err } -// rangeLogsAsync retrieves block-range logs that match the filter criteria asynchronously, -// it creates and returns two channels: one for delivering log data, and one for reporting errors. -func (f *Filter) rangeLogsAsync(ctx context.Context) (chan *types.Log, chan error) { - var ( - logChan = make(chan *types.Log) - errChan = make(chan error) - ) - - go func() { - defer func() { - close(errChan) - close(logChan) - }() - - if err := f.unindexedLogs(ctx, uint64(f.end), logChan); err != nil { - errChan <- err - return - } - - errChan <- nil - }() - - return logChan, errChan +func (f *Filter) indexedLogs(ctx context.Context, mb filtermaps.MatcherBackend, begin, end uint64) ([]*types.Log, error) { + logs, err := filtermaps.GetPotentialMatches(ctx, mb, begin, end, f.addresses, f.topics) + logs = filterLogs(logs, nil, nil, f.addresses, f.topics) + return logs, err } // unindexedLogs returns the logs matching the filter criteria based on raw block // iteration and bloom matching. -func (f *Filter) unindexedLogs(ctx context.Context, end uint64, logChan chan *types.Log) error { - for ; f.begin <= int64(end); f.begin++ { - header, err := f.sys.backend.HeaderByNumber(ctx, rpc.BlockNumber(f.begin)) +func (f *Filter) unindexedLogs(ctx context.Context, begin, end uint64) ([]*types.Log, error) { + log.Warn("Performing unindexed log search", "begin", begin, "end", end) + var logs []*types.Log + for blockNumber := begin; blockNumber <= end; blockNumber++ { + select { + case <-ctx.Done(): + return logs, ctx.Err() + default: + } + header, err := f.sys.backend.HeaderByNumber(ctx, rpc.BlockNumber(blockNumber)) if header == nil || err != nil { - return err + return logs, err } found, err := f.blockLogs(ctx, header) if err != nil { - return err - } - for _, log := range found { - select { - case logChan <- log: - case <-ctx.Done(): - return ctx.Err() - } + return logs, err } + logs = append(logs, found...) } - return nil + return logs, nil } // blockLogs returns the logs matching the filter criteria within a single block. diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go index b6119448d9..ea31fb5396 100644 --- a/eth/filters/filter_system_test.go +++ b/eth/filters/filter_system_test.go @@ -20,7 +20,6 @@ import ( "context" "errors" "math/big" - "math/rand" "reflect" "runtime" "testing" @@ -29,6 +28,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus/ethash" "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/filtermaps" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethdb" @@ -136,6 +136,12 @@ func (b *testBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subsc return b.chainFeed.Subscribe(ch) } +func (b *testBackend) NewMatcherBackend() filtermaps.MatcherBackend { + fm := filtermaps.NewFilterMaps(b.db, b, filtermaps.DefaultParams, 0, false) + fm.Start() + return fm.NewMatcherBackend() +} + func (b *testBackend) setPending(block *types.Block, receipts types.Receipts) { b.pendingBlock = block b.pendingReceipts = receipts