From d1f492e511b3f3f8617efd708c00f84eac1318a0 Mon Sep 17 00:00:00 2001 From: Zsolt Felfoldi Date: Tue, 1 Oct 2024 05:53:35 +0200 Subject: [PATCH] eth/filters: fixed tests, added more --- core/filtermaps/filtermaps.go | 18 ++- core/filtermaps/indexer.go | 57 +++++-- core/filtermaps/indexer_test.go | 74 ++++++++- core/filtermaps/matcher_backend.go | 7 + core/rawdb/accessors_indexes_test.go | 2 - core/rawdb/database.go | 4 + eth/filters/filter.go | 170 ++++++++++++++------- eth/filters/filter_system_test.go | 36 ++++- eth/filters/filter_test.go | 184 +++++++++++++++++++++-- internal/ethapi/api_test.go | 4 + internal/ethapi/transaction_args_test.go | 3 + 11 files changed, 460 insertions(+), 99 deletions(-) diff --git a/core/filtermaps/filtermaps.go b/core/filtermaps/filtermaps.go index 75ad51ada9..03eb341ffd 100644 --- a/core/filtermaps/filtermaps.go +++ b/core/filtermaps/filtermaps.go @@ -20,7 +20,7 @@ const headCacheSize = 8 // maximum number of recent filter maps cached in memory // blockchain defines functions required by the FilterMaps log indexer. type blockchain interface { CurrentBlock() *types.Header - SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription + SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription GetHeader(hash common.Hash, number uint64) *types.Header GetCanonicalHash(number uint64) common.Hash GetReceiptsByHash(hash common.Hash) types.Receipts @@ -55,7 +55,8 @@ type FilterMaps struct { lvPointerCache *lru.Cache[uint64, uint64] revertPoints map[uint64]*revertPoint - testHook func(int) + waitIdleCh chan chan bool + testHook func(int) } // filterMap is a full or partial in-memory representation of a filter map where @@ -104,12 +105,13 @@ func NewFilterMaps(db ethdb.KeyValueStore, chain blockchain, params Params, hist } params.deriveFields() fm := &FilterMaps{ - db: db, - chain: chain, - closeCh: make(chan struct{}), - history: history, - noHistory: noHistory, - Params: params, + db: db, + chain: chain, + closeCh: make(chan struct{}), + waitIdleCh: make(chan chan bool), + history: history, + noHistory: noHistory, + Params: params, filterMapsRange: filterMapsRange{ initialized: rs.Initialized, headLvPointer: rs.HeadLvPointer, diff --git a/core/filtermaps/indexer.go b/core/filtermaps/indexer.go index 36094ba555..b324c75b4f 100644 --- a/core/filtermaps/indexer.go +++ b/core/filtermaps/indexer.go @@ -17,7 +17,9 @@ const ( removedPointer = math.MaxUint64 // used in updateBatch to signal removed items revertPointFrequency = 256 // frequency of revert points in database cachedRevertPoints = 64 // revert points for most recent blocks in memory +) +const ( testHookInit = iota testHookUpdateHeadEpoch testHookUpdateHead @@ -52,8 +54,8 @@ func (f *FilterMaps) updateLoop() { } var ( - headEventCh = make(chan core.ChainHeadEvent, 10) - sub = f.chain.SubscribeChainHeadEvent(headEventCh) + headEventCh = make(chan core.ChainEvent, 10) + sub = f.chain.SubscribeChainEvent(headEventCh) head = f.chain.CurrentBlock() stop bool syncMatcher *FilterMapsMatcherBackend @@ -61,7 +63,7 @@ func (f *FilterMaps) updateLoop() { ) matcherSync := func() { - if syncMatcher != nil && fmr.headBlockHash == head.Hash() { + if syncMatcher != nil && fmr.initialized && fmr.headBlockHash == head.Hash() { syncMatcher.synced(head) syncMatcher = nil } @@ -79,19 +81,32 @@ func (f *FilterMaps) updateLoop() { } delay := time.Second * 20 if f.testHook != nil { - f.testHook(testHookWait) delay = 0 } - select { - case ev := <-headEventCh: - head = ev.Block.Header() - case syncMatcher = <-f.matcherSyncCh: - head = f.chain.CurrentBlock() - case <-f.closeCh: - stop = true - case <-time.After(delay): - // keep updating log index during syncing - head = f.chain.CurrentBlock() + loop: + for { + select { + case ev := <-headEventCh: + head = ev.Block.Header() + case syncMatcher = <-f.matcherSyncCh: + head = f.chain.CurrentBlock() + case <-f.closeCh: + stop = true + case ch := <-f.waitIdleCh: + head = f.chain.CurrentBlock() + if head.Hash() == f.getRange().headBlockHash { + ch <- true + continue loop + } + ch <- false + case <-time.After(delay): + // keep updating log index during syncing + head = f.chain.CurrentBlock() + if f.testHook != nil { + f.testHook(testHookWait) + } + } + break } } for head == nil { @@ -150,6 +165,18 @@ func (f *FilterMaps) updateLoop() { } } +// WaitIdle blocks until the indexer is in an idle state while synced up to the +// latest chain head. +func (f *FilterMaps) WaitIdle() { + for { + ch := make(chan bool) + f.waitIdleCh <- ch + if <-ch { + return + } + } +} + // getRange returns the current filterMapsRange. func (f *FilterMaps) getRange() filterMapsRange { f.lock.RLock() @@ -804,7 +831,7 @@ func (f *FilterMaps) revertTo(rp *revertPoint) error { batch := f.db.NewBatch() afterLastMap := uint32((f.headLvPointer + f.valuesPerMap - 1) >> f.logValuesPerMap) - if rp.mapIndex >= afterLastMap { + if rp.mapIndex > afterLastMap { return errors.New("cannot revert (head map behind revert point)") } lvPointer := uint64(rp.mapIndex) << f.logValuesPerMap diff --git a/core/filtermaps/indexer_test.go b/core/filtermaps/indexer_test.go index 4c54d0b477..c49a1aa6bb 100644 --- a/core/filtermaps/indexer_test.go +++ b/core/filtermaps/indexer_test.go @@ -1,6 +1,8 @@ package filtermaps import ( + "crypto/sha256" + "fmt" "math/big" "math/rand" "sync" @@ -59,6 +61,25 @@ func TestIndexerRandomSetHistory(t *testing.T) { ts.checkLvRange(50) } +func TestIndexerDbEquality(t *testing.T) { + ts := newTestSetup(t) + ts.setHistory(0, false) + for i := 0; i < 10; i++ { + ts.chain.addBlocks(100, 10, 3, 4, true) + ts.runUntilWait() + } + hash1 := ts.fmDbHash() + fmt.Println(hash1) + ts.setHistory(500, false) + ts.runUntilWait() + hash2 := ts.fmDbHash() + fmt.Println(hash2) + ts.setHistory(0, false) + ts.runUntilWait() + hash3 := ts.fmDbHash() + fmt.Println(hash3) +} + type testSetup struct { t *testing.T fm *FilterMaps @@ -94,9 +115,14 @@ func (ts *testSetup) runUntil(stop func() bool) { } func (ts *testSetup) runUntilWait() { - ts.nextEvent() - for ts.lastEvent != testHookWait { + for { ts.nextEvent() + for ts.lastEvent != testHookWait { + ts.nextEvent() + } + if ts.fm.getRange().headBlockHash == ts.chain.CurrentBlock().Hash() { + return + } } } @@ -146,6 +172,19 @@ func (ts *testSetup) stopFm() { ts.fm.closeWg.Wait() } +func (ts *testSetup) fmDbHash() common.Hash { + hasher := sha256.New() + it := ts.db.NewIterator(nil, nil) + for it.Next() { + hasher.Write(it.Key()) + hasher.Write(it.Value()) + } + it.Release() + var result common.Hash + hasher.Sum(result[:0]) + return result +} + func (ts *testSetup) close() { ts.stopFm() ts.db.Close() @@ -178,7 +217,7 @@ func (tc *testChain) CurrentBlock() *types.Header { return tc.blocks[tc.canonical[len(tc.canonical)-1]].Header() } -func (tc *testChain) SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription { +func (tc *testChain) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription { return tc.chainHeadFeed.Subscribe(ch) } @@ -281,5 +320,32 @@ func (tc *testChain) addBlocks(count, maxTxPerBlock, maxLogsPerReceipt, maxTopic tc.receipts[hash] = types.Receipts{} } } - tc.chainHeadFeed.Send(core.ChainHeadEvent{Block: tc.blocks[tc.canonical[len(tc.canonical)-1]]}) + tc.chainHeadFeed.Send(core.ChainEvent{Block: tc.blocks[tc.canonical[len(tc.canonical)-1]]}) +} + +func (tc *testChain) setHead(headNum int) { + tc.lock.Lock() + defer tc.lock.Unlock() + + tc.canonical = tc.canonical[:headNum+1] + tc.chainHeadFeed.Send(core.ChainEvent{Block: tc.blocks[tc.canonical[len(tc.canonical)-1]]}) +} + +func (tc *testChain) getCanonicalChain() []common.Hash { + tc.lock.RLock() + defer tc.lock.RUnlock() + + cc := make([]common.Hash, len(tc.canonical)) + copy(cc, tc.canonical) + return cc +} + +// restore an earlier state of the chain +func (tc *testChain) setCanonicalChain(cc []common.Hash) { + tc.lock.Lock() + defer tc.lock.Unlock() + + tc.canonical = make([]common.Hash, len(cc)) + copy(tc.canonical, cc) + tc.chainHeadFeed.Send(core.ChainEvent{Block: tc.blocks[tc.canonical[len(tc.canonical)-1]]}) } diff --git a/core/filtermaps/matcher_backend.go b/core/filtermaps/matcher_backend.go index 82dd7c37ef..29d0765056 100644 --- a/core/filtermaps/matcher_backend.go +++ b/core/filtermaps/matcher_backend.go @@ -118,6 +118,13 @@ func (fm *FilterMapsMatcherBackend) synced(head *types.Header) { // range that has not been changed and has been consistent with all states of the // chain since the previous SyncLogIndex or the creation of the matcher backend. func (fm *FilterMapsMatcherBackend) SyncLogIndex(ctx context.Context) (SyncRange, error) { + if fm.f.noHistory { + head := fm.f.chain.CurrentBlock() + if head == nil { + return SyncRange{}, errors.New("canonical chain head not available") + } + return SyncRange{Head: head}, nil + } // add SyncRange return channel, ensuring that syncCh := make(chan SyncRange, 1) fm.f.lock.Lock() diff --git a/core/rawdb/accessors_indexes_test.go b/core/rawdb/accessors_indexes_test.go index fc93b4ed19..29b468fb2a 100644 --- a/core/rawdb/accessors_indexes_test.go +++ b/core/rawdb/accessors_indexes_test.go @@ -17,7 +17,6 @@ package rawdb import ( - "bytes" "math/big" "testing" @@ -25,7 +24,6 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/internal/blocktest" - "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" ) diff --git a/core/rawdb/database.go b/core/rawdb/database.go index 1bf5ca8d3f..30c01d5379 100644 --- a/core/rawdb/database.go +++ b/core/rawdb/database.go @@ -375,6 +375,7 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error { accountSnaps stat storageSnaps stat preimages stat + filterMaps stat beaconHeaders stat cliqueSnaps stat @@ -425,6 +426,8 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error { codes.Add(size) case bytes.HasPrefix(key, txLookupPrefix) && len(key) == (len(txLookupPrefix)+common.HashLength): txLookups.Add(size) + case bytes.HasPrefix(key, FilterMapsPrefix): + filterMaps.Add(size) case bytes.HasPrefix(key, SnapshotAccountPrefix) && len(key) == (len(SnapshotAccountPrefix)+common.HashLength): accountSnaps.Add(size) case bytes.HasPrefix(key, SnapshotStoragePrefix) && len(key) == (len(SnapshotStoragePrefix)+2*common.HashLength): @@ -499,6 +502,7 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error { {"Key-Value store", "Block number->hash", numHashPairings.Size(), numHashPairings.Count()}, {"Key-Value store", "Block hash->number", hashNumPairings.Size(), hashNumPairings.Count()}, {"Key-Value store", "Transaction index", txLookups.Size(), txLookups.Count()}, + {"Key-Value store", "Log search index", filterMaps.Size(), filterMaps.Count()}, {"Key-Value store", "Contract codes", codes.Size(), codes.Count()}, {"Key-Value store", "Hash trie nodes", legacyTries.Size(), legacyTries.Count()}, {"Key-Value store", "Path trie state lookups", stateLookups.Size(), stateLookups.Count()}, diff --git a/eth/filters/filter.go b/eth/filters/filter.go index e52ffd9287..de03c965aa 100644 --- a/eth/filters/filter.go +++ b/eth/filters/filter.go @@ -37,9 +37,10 @@ type Filter struct { addresses []common.Address topics [][]common.Hash - block *common.Hash // Block hash if filtering a single block - begin, end int64 // Range interval if filtering multiple blocks - bbMatchCount uint64 + block *common.Hash // Block hash if filtering a single block + begin, end int64 // Range interval if filtering multiple blocks + + rangeLogsTestHook chan rangeLogsTestEvent } // NewRangeFilter creates a new filter which uses a bloom filter on blocks to @@ -131,10 +132,31 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) { return f.rangeLogs(ctx, begin, end) } +const ( + rangeLogsTestSync = iota + rangeLogsTestTrimmed + rangeLogsTestIndexed + rangeLogsTestUnindexed + rangeLogsTestDone +) + +type rangeLogsTestEvent struct { + event int + begin, end uint64 +} + func (f *Filter) rangeLogs(ctx context.Context, firstBlock, lastBlock uint64) ([]*types.Log, error) { - if firstBlock > lastBlock { - return nil, errors.New("invalid search range") + if f.rangeLogsTestHook != nil { + defer func() { + f.rangeLogsTestHook <- rangeLogsTestEvent{rangeLogsTestDone, 0, 0} + close(f.rangeLogsTestHook) + }() } + + if firstBlock > lastBlock { + return nil, nil + } + mb := f.sys.backend.NewMatcherBackend() defer mb.Close() @@ -144,6 +166,21 @@ func (f *Filter) rangeLogs(ctx context.Context, firstBlock, lastBlock uint64) ([ if err != nil { return nil, err } + if !syncRange.Indexed { + // fallback to completely unindexed search + headNum := syncRange.Head.Number.Uint64() + if firstBlock > headNum { + firstBlock = headNum + } + if lastBlock > headNum { + lastBlock = headNum + } + if f.rangeLogsTestHook != nil { + f.rangeLogsTestHook <- rangeLogsTestEvent{rangeLogsTestUnindexed, firstBlock, lastBlock} + } + return f.unindexedLogs(ctx, firstBlock, lastBlock) + } + headBlock := syncRange.Head.Number.Uint64() // Head is guaranteed != nil // if haveMatches == true then matches correspond to the block number range // between matchFirst and matchLast @@ -157,7 +194,7 @@ func (f *Filter) rangeLogs(ctx context.Context, firstBlock, lastBlock uint64) ([ return } if trimLast < matchFirst || trimFirst > matchLast { - matches, haveMatches = nil, false + matches, haveMatches, matchFirst, matchLast = nil, false, 0, 0 return } if trimFirst > matchFirst { @@ -192,6 +229,9 @@ func (f *Filter) rangeLogs(ctx context.Context, firstBlock, lastBlock uint64) ([ var trimTailIfNotValid uint64 if haveMatches && matchFirst > searchFirst { // missing tail section; do unindexed search + if f.rangeLogsTestHook != nil { + f.rangeLogsTestHook <- rangeLogsTestEvent{rangeLogsTestUnindexed, searchFirst, matchFirst - 1} + } tailMatches, err := f.unindexedLogs(ctx, searchFirst, matchFirst-1) if err != nil { return matches, err @@ -200,56 +240,67 @@ func (f *Filter) rangeLogs(ctx context.Context, firstBlock, lastBlock uint64) ([ matchFirst = searchFirst // unindexed results are not affected by valid tail; do not trim tail trimTailIfNotValid = math.MaxUint64 - } - // now if we have matches, they start at searchFirst - if haveMatches { - searchFirst = matchLast + 1 - if !syncRange.Indexed || syncRange.FirstIndexed > searchFirst { - forceUnindexed = true - } - } - var newMatches []*types.Log - if !syncRange.Indexed || syncRange.FirstIndexed > searchLast || syncRange.LastIndexed < searchFirst { - forceUnindexed = true - } - if !forceUnindexed { - if syncRange.FirstIndexed > searchFirst { - searchFirst = syncRange.FirstIndexed - } - if syncRange.LastIndexed > searchLast { - searchLast = syncRange.LastIndexed - } - newMatches, err = f.indexedLogs(ctx, mb, searchFirst, searchLast) - // trim tail if it affects the indexed search range - trimTailIfNotValid = searchFirst - if err == filtermaps.ErrMatchAll { - // "match all" filters are not supported by filtermaps; fall back - // to unindexed search which is the most efficient in this case - forceUnindexed = true - } - } - if forceUnindexed { - newMatches, err = f.unindexedLogs(ctx, searchFirst, searchLast) - // unindexed results are not affected by valid tail; do not trim tail - trimTailIfNotValid = math.MaxUint64 - } - if err != nil { - return matches, err - } - if matches == nil { - matches = newMatches - haveMatches, matchFirst, matchLast = true, searchFirst, searchLast } else { - matches = append(matches, newMatches...) - matchLast = searchLast + // if we have matches, they start at searchFirst + if haveMatches { + searchFirst = matchLast + 1 + if !syncRange.Indexed || syncRange.FirstIndexed > searchFirst { + forceUnindexed = true + } + } + var newMatches []*types.Log + if !syncRange.Indexed || syncRange.FirstIndexed > searchLast || syncRange.LastIndexed < searchFirst { + forceUnindexed = true + } + if !forceUnindexed { + if syncRange.FirstIndexed > searchFirst { + searchFirst = syncRange.FirstIndexed + } + if syncRange.LastIndexed < searchLast { + searchLast = syncRange.LastIndexed + } + if f.rangeLogsTestHook != nil { + f.rangeLogsTestHook <- rangeLogsTestEvent{rangeLogsTestIndexed, searchFirst, searchLast} + } + newMatches, err = f.indexedLogs(ctx, mb, searchFirst, searchLast) + // trim tail if it affects the indexed search range + trimTailIfNotValid = searchFirst + if err == filtermaps.ErrMatchAll { + // "match all" filters are not supported by filtermaps; fall back + // to unindexed search which is the most efficient in this case + forceUnindexed = true + } + } + if forceUnindexed { + if f.rangeLogsTestHook != nil { + f.rangeLogsTestHook <- rangeLogsTestEvent{rangeLogsTestUnindexed, searchFirst, searchLast} + } + newMatches, err = f.unindexedLogs(ctx, searchFirst, searchLast) + // unindexed results are not affected by valid tail; do not trim tail + trimTailIfNotValid = math.MaxUint64 + } + if err != nil { + return matches, err + } + if !haveMatches { + matches = newMatches + haveMatches, matchFirst, matchLast = true, searchFirst, searchLast + } else { + matches = append(matches, newMatches...) + matchLast = searchLast + } } + if f.rangeLogsTestHook != nil { + f.rangeLogsTestHook <- rangeLogsTestEvent{event: rangeLogsTestSync, begin: matchFirst, end: matchLast} + } syncRange, err = mb.SyncLogIndex(ctx) if err != nil { return matches, err } + headBlock = syncRange.Head.Number.Uint64() // Head is guaranteed != nil if !syncRange.Valid { - matches, haveMatches = nil, false + matches, haveMatches, matchFirst, matchLast = nil, false, 0, 0 } else { if syncRange.FirstValid > trimTailIfNotValid { trimMatches(syncRange.FirstValid, syncRange.LastValid) @@ -257,37 +308,42 @@ func (f *Filter) rangeLogs(ctx context.Context, firstBlock, lastBlock uint64) ([ trimMatches(0, syncRange.LastValid) } } + if f.rangeLogsTestHook != nil { + f.rangeLogsTestHook <- rangeLogsTestEvent{event: rangeLogsTestTrimmed, begin: matchFirst, end: matchLast} + } } } func (f *Filter) indexedLogs(ctx context.Context, mb filtermaps.MatcherBackend, begin, end uint64) ([]*types.Log, error) { - logs, err := filtermaps.GetPotentialMatches(ctx, mb, begin, end, f.addresses, f.topics) - logs = filterLogs(logs, nil, nil, f.addresses, f.topics) - return logs, err + potentialMatches, err := filtermaps.GetPotentialMatches(ctx, mb, begin, end, f.addresses, f.topics) + matches := filterLogs(potentialMatches, nil, nil, f.addresses, f.topics) + log.Trace("Performed indexed log search", "begin", begin, "end", end, "true matches", len(matches), "false positives", len(potentialMatches)-len(matches)) + return matches, err } // unindexedLogs returns the logs matching the filter criteria based on raw block // iteration and bloom matching. func (f *Filter) unindexedLogs(ctx context.Context, begin, end uint64) ([]*types.Log, error) { log.Warn("Performing unindexed log search", "begin", begin, "end", end) - var logs []*types.Log + var matches []*types.Log for blockNumber := begin; blockNumber <= end; blockNumber++ { select { case <-ctx.Done(): - return logs, ctx.Err() + return matches, ctx.Err() default: } header, err := f.sys.backend.HeaderByNumber(ctx, rpc.BlockNumber(blockNumber)) if header == nil || err != nil { - return logs, err + return matches, err } found, err := f.blockLogs(ctx, header) if err != nil { - return logs, err + return matches, err } - logs = append(logs, found...) + matches = append(matches, found...) } - return logs, nil + log.Trace("Performed unindexed log search", "begin", begin, "end", end, "matches", len(matches)) + return matches, nil } // blockLogs returns the logs matching the filter criteria within a single block. diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go index ea31fb5396..83e93494f0 100644 --- a/eth/filters/filter_system_test.go +++ b/eth/filters/filter_system_test.go @@ -40,6 +40,7 @@ import ( type testBackend struct { db ethdb.Database + fm *filtermaps.FilterMaps sections uint64 txFeed event.Feed logsFeed event.Feed @@ -58,10 +59,28 @@ func (b *testBackend) CurrentHeader() *types.Header { return hdr } +func (b *testBackend) CurrentBlock() *types.Header { + return b.CurrentHeader() +} + func (b *testBackend) ChainDb() ethdb.Database { return b.db } +func (b *testBackend) GetCanonicalHash(number uint64) common.Hash { + return rawdb.ReadCanonicalHash(b.db, number) +} + +func (b *testBackend) GetHeader(hash common.Hash, number uint64) *types.Header { + hdr, _ := b.HeaderByHash(context.Background(), hash) + return hdr +} + +func (b *testBackend) GetReceiptsByHash(hash common.Hash) types.Receipts { + r, _ := b.GetReceipts(context.Background(), hash) + return r +} + func (b *testBackend) HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error) { var ( hash common.Hash @@ -137,9 +156,20 @@ func (b *testBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subsc } func (b *testBackend) NewMatcherBackend() filtermaps.MatcherBackend { - fm := filtermaps.NewFilterMaps(b.db, b, filtermaps.DefaultParams, 0, false) - fm.Start() - return fm.NewMatcherBackend() + return b.fm.NewMatcherBackend() +} + +func (b *testBackend) startFilterMaps(history uint64, noHistory bool) { + b.fm = filtermaps.NewFilterMaps(b.db, b, filtermaps.DefaultParams, history, noHistory) + b.fm.Start() + if !noHistory { + b.fm.WaitIdle() + } +} + +func (b *testBackend) stopFilterMaps() { + b.fm.Stop() + b.fm = nil } func (b *testBackend) setPending(block *types.Block, receipts types.Receipts) { diff --git a/eth/filters/filter_test.go b/eth/filters/filter_test.go index 6a3057326d..33a7656d3b 100644 --- a/eth/filters/filter_test.go +++ b/eth/filters/filter_test.go @@ -46,15 +46,27 @@ func makeReceipt(addr common.Address) *types.Receipt { return receipt } -func BenchmarkFilters(b *testing.B) { +func BenchmarkFiltersIndexed(b *testing.B) { + benchmarkFilters(b, 0, false) +} + +func BenchmarkFiltersHalfIndexed(b *testing.B) { + benchmarkFilters(b, 50000, false) +} + +func BenchmarkFiltersUnindexed(b *testing.B) { + benchmarkFilters(b, 0, true) +} + +func benchmarkFilters(b *testing.B, history uint64, noHistory bool) { var ( - db = rawdb.NewMemoryDatabase() - _, sys = newTestFilterSystem(b, db, Config{}) - key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") - addr1 = crypto.PubkeyToAddress(key1.PublicKey) - addr2 = common.BytesToAddress([]byte("jeff")) - addr3 = common.BytesToAddress([]byte("ethereum")) - addr4 = common.BytesToAddress([]byte("random addresses please")) + db = rawdb.NewMemoryDatabase() + backend, sys = newTestFilterSystem(b, db, Config{}) + key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") + addr1 = crypto.PubkeyToAddress(key1.PublicKey) + addr2 = common.BytesToAddress([]byte("jeff")) + addr3 = common.BytesToAddress([]byte("ethereum")) + addr4 = common.BytesToAddress([]byte("random addresses please")) gspec = &core.Genesis{ Alloc: types.GenesisAlloc{addr1: {Balance: big.NewInt(1000000)}}, @@ -94,9 +106,12 @@ func BenchmarkFilters(b *testing.B) { rawdb.WriteHeadBlockHash(db, block.Hash()) rawdb.WriteReceipts(db, block.Hash(), block.NumberU64(), receipts[i]) } + backend.startFilterMaps(history, noHistory) + defer backend.stopFilterMaps() + b.ResetTimer() - filter := sys.NewRangeFilter(0, -1, []common.Address{addr1, addr2, addr3, addr4}, nil) + filter := sys.NewRangeFilter(0, int64(rpc.LatestBlockNumber), []common.Address{addr1, addr2, addr3, addr4}, nil) for i := 0; i < b.N; i++ { filter.begin = 0 @@ -107,7 +122,19 @@ func BenchmarkFilters(b *testing.B) { } } -func TestFilters(t *testing.T) { +func TestFiltersIndexed(t *testing.T) { + testFilters(t, 0, false) +} + +func TestFiltersHalfIndexed(t *testing.T) { + testFilters(t, 500, false) +} + +func TestFiltersUnindexed(t *testing.T) { + testFilters(t, 0, true) +} + +func testFilters(t *testing.T, history uint64, noHistory bool) { var ( db = rawdb.NewMemoryDatabase() backend, sys = newTestFilterSystem(t, db, Config{}) @@ -279,6 +306,9 @@ func TestFilters(t *testing.T) { }) backend.setPending(pchain[0], preceipts[0]) + backend.startFilterMaps(history, noHistory) + defer backend.stopFilterMaps() + for i, tc := range []struct { f *Filter want string @@ -387,3 +417,137 @@ func TestFilters(t *testing.T) { } }) } + +func TestRangeLogs(t *testing.T) { + var ( + db = rawdb.NewMemoryDatabase() + backend, sys = newTestFilterSystem(t, db, Config{}) + gspec = &core.Genesis{ + Config: params.TestChainConfig, + Alloc: types.GenesisAlloc{}, + BaseFee: big.NewInt(params.InitialBaseFee), + } + ) + _, err := gspec.Commit(db, triedb.NewDatabase(db, nil)) + if err != nil { + t.Fatal(err) + } + chain, _ := core.GenerateChain(gspec.Config, gspec.ToBlock(), ethash.NewFaker(), db, 1000, func(i int, gen *core.BlockGen) {}) + var l uint64 + bc, err := core.NewBlockChain(db, nil, gspec, nil, ethash.NewFaker(), vm.Config{}, &l) + if err != nil { + t.Fatal(err) + } + _, err = bc.InsertChain(chain[:600]) + if err != nil { + t.Fatal(err) + } + + backend.startFilterMaps(200, false) + defer backend.stopFilterMaps() + + var ( + testCase, event int + filter *Filter + addresses = []common.Address{common.Address{}} + ) + + newFilter := func(begin, end int64) { + testCase++ + event = 0 + filter = sys.NewRangeFilter(begin, end, addresses, nil) + filter.rangeLogsTestHook = make(chan rangeLogsTestEvent) + go func(filter *Filter) { + filter.Logs(context.Background()) + // ensure that filter will not be blocked if we exit early + for _ = range filter.rangeLogsTestHook { + } + }(filter) + } + + expEvent := func(exp rangeLogsTestEvent) { + event++ + ev := <-filter.rangeLogsTestHook + if ev != exp { + t.Fatalf("Test case #%d: wrong test event #%d received (got %v, expected %v)", testCase, event, ev, exp) + } + } + + // test case #1 + newFilter(300, 500) + expEvent(rangeLogsTestEvent{rangeLogsTestIndexed, 401, 500}) + expEvent(rangeLogsTestEvent{rangeLogsTestSync, 401, 500}) + expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 401, 500}) + expEvent(rangeLogsTestEvent{rangeLogsTestUnindexed, 300, 400}) + if _, err := bc.InsertChain(chain[600:700]); err != nil { + t.Fatal(err) + } + backend.fm.WaitIdle() + expEvent(rangeLogsTestEvent{rangeLogsTestSync, 300, 500}) + expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 300, 500}) // unindexed search is not affected by trimmed tail + expEvent(rangeLogsTestEvent{rangeLogsTestDone, 0, 0}) + + // test case #2 + newFilter(400, int64(rpc.LatestBlockNumber)) + expEvent(rangeLogsTestEvent{rangeLogsTestIndexed, 501, 700}) + if _, err := bc.InsertChain(chain[700:800]); err != nil { + t.Fatal(err) + } + backend.fm.WaitIdle() + expEvent(rangeLogsTestEvent{rangeLogsTestSync, 501, 700}) + expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 601, 700}) + expEvent(rangeLogsTestEvent{rangeLogsTestUnindexed, 400, 600}) + expEvent(rangeLogsTestEvent{rangeLogsTestSync, 400, 700}) + expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 400, 700}) + expEvent(rangeLogsTestEvent{rangeLogsTestIndexed, 701, 800}) + if err := bc.SetHead(750); err != nil { + t.Fatal(err) + } + backend.fm.WaitIdle() + expEvent(rangeLogsTestEvent{rangeLogsTestSync, 400, 800}) + expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 400, 750}) + expEvent(rangeLogsTestEvent{rangeLogsTestDone, 0, 0}) + + // test case #3 + newFilter(int64(rpc.LatestBlockNumber), int64(rpc.LatestBlockNumber)) + expEvent(rangeLogsTestEvent{rangeLogsTestIndexed, 750, 750}) + if err := bc.SetHead(740); err != nil { + t.Fatal(err) + } + backend.fm.WaitIdle() + expEvent(rangeLogsTestEvent{rangeLogsTestSync, 750, 750}) + expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 0, 0}) + expEvent(rangeLogsTestEvent{rangeLogsTestIndexed, 740, 740}) + if _, err := bc.InsertChain(chain[740:750]); err != nil { + t.Fatal(err) + } + backend.fm.WaitIdle() + expEvent(rangeLogsTestEvent{rangeLogsTestSync, 740, 740}) + // trimmed at the beginning of the next iteration + expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 740, 740}) + expEvent(rangeLogsTestEvent{rangeLogsTestIndexed, 750, 750}) + expEvent(rangeLogsTestEvent{rangeLogsTestSync, 750, 750}) + expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 750, 750}) + expEvent(rangeLogsTestEvent{rangeLogsTestDone, 0, 0}) + + // test case #4 + newFilter(400, int64(rpc.LatestBlockNumber)) + expEvent(rangeLogsTestEvent{rangeLogsTestIndexed, 551, 750}) + expEvent(rangeLogsTestEvent{rangeLogsTestSync, 551, 750}) + expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 551, 750}) + expEvent(rangeLogsTestEvent{rangeLogsTestUnindexed, 400, 550}) + if _, err := bc.InsertChain(chain[750:1000]); err != nil { + t.Fatal(err) + } + backend.fm.WaitIdle() + expEvent(rangeLogsTestEvent{rangeLogsTestSync, 400, 750}) + // indexed range affected by tail pruning so we have to discard the entire + // match set + expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 0, 0}) + expEvent(rangeLogsTestEvent{rangeLogsTestIndexed, 801, 1000}) + expEvent(rangeLogsTestEvent{rangeLogsTestSync, 801, 1000}) + expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 801, 1000}) + expEvent(rangeLogsTestEvent{rangeLogsTestUnindexed, 400, 800}) + expEvent(rangeLogsTestEvent{rangeLogsTestSync, 400, 1000}) + expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 400, 1000}) +} diff --git a/internal/ethapi/api_test.go b/internal/ethapi/api_test.go index 2b5e535b13..2e1d44ef20 100644 --- a/internal/ethapi/api_test.go +++ b/internal/ethapi/api_test.go @@ -45,6 +45,7 @@ import ( "github.com/ethereum/go-ethereum/consensus/beacon" "github.com/ethereum/go-ethereum/consensus/ethash" "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/filtermaps" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" @@ -619,6 +620,9 @@ func (b testBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) func (b testBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription { panic("implement me") } +func (b testBackend) NewMatcherBackend() filtermaps.MatcherBackend { + panic("implement me") +} func TestEstimateGas(t *testing.T) { t.Parallel() // Initialize test accounts diff --git a/internal/ethapi/transaction_args_test.go b/internal/ethapi/transaction_args_test.go index 2855bc1803..7113fde0b8 100644 --- a/internal/ethapi/transaction_args_test.go +++ b/internal/ethapi/transaction_args_test.go @@ -30,6 +30,7 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/filtermaps" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" @@ -399,3 +400,5 @@ func (b *backendMock) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) } func (b *backendMock) Engine() consensus.Engine { return nil } + +func (b *backendMock) NewMatcherBackend() filtermaps.MatcherBackend { return nil }