From 1901c7d3d9fd0c0b4e4cfdce51f60db07945f105 Mon Sep 17 00:00:00 2001 From: Zsolt Felfoldi Date: Thu, 3 Oct 2024 12:34:07 +0200 Subject: [PATCH] core/filtermaps: added more tests --- core/filtermaps/filtermaps.go | 1 - core/filtermaps/indexer.go | 63 ++----- core/filtermaps/indexer_test.go | 267 +++++++++++++++++------------- core/filtermaps/matcher_test.go | 86 ++++++++++ core/rawdb/schema.go | 4 +- eth/filters/filter_system_test.go | 4 +- 6 files changed, 255 insertions(+), 170 deletions(-) create mode 100644 core/filtermaps/matcher_test.go diff --git a/core/filtermaps/filtermaps.go b/core/filtermaps/filtermaps.go index 489b6e5e64..ab15482401 100644 --- a/core/filtermaps/filtermaps.go +++ b/core/filtermaps/filtermaps.go @@ -72,7 +72,6 @@ type FilterMaps struct { revertPoints map[uint64]*revertPoint waitIdleCh chan chan bool - testHook func(int) } // filterMap is a full or partial in-memory representation of a filter map where diff --git a/core/filtermaps/indexer.go b/core/filtermaps/indexer.go index 47edd55da5..c04d8623a4 100644 --- a/core/filtermaps/indexer.go +++ b/core/filtermaps/indexer.go @@ -35,28 +35,10 @@ const ( cachedRevertPoints = 64 // revert points for most recent blocks in memory ) -const ( - testHookInit = iota - testHookUpdateHeadEpoch - testHookUpdateHead - testHookExtendTailEpoch - testHookExtendTail - testHookPruneTail - testHookPruneTailMaps - testHookRevert - testHookWait - testHookStop -) - // updateLoop initializes and updates the log index structure according to the // canonical chain. func (f *FilterMaps) updateLoop() { - defer func() { - f.closeWg.Done() - if f.testHook != nil { - f.testHook(testHookStop) - } - }() + defer f.closeWg.Done() if f.noHistory { f.reset() @@ -95,10 +77,6 @@ func (f *FilterMaps) updateLoop() { if stop { return } - delay := time.Second * 20 - if f.testHook != nil { - delay = 0 - } loop: for { select { @@ -115,12 +93,9 @@ func (f *FilterMaps) updateLoop() { continue loop } ch <- false - case <-time.After(delay): + case <-time.After(time.Second * 20): // keep updating log index during syncing head = f.chain.CurrentBlock() - if f.testHook != nil { - f.testHook(testHookWait) - } } break } @@ -184,6 +159,10 @@ func (f *FilterMaps) updateLoop() { // WaitIdle blocks until the indexer is in an idle state while synced up to the // latest chain head. func (f *FilterMaps) WaitIdle() { + if f.noHistory { + f.closeWg.Wait() + return + } for { ch := make(chan bool) f.waitIdleCh <- ch @@ -219,9 +198,6 @@ func (f *FilterMaps) tryInit(head *types.Header) bool { log.Error("Could not initialize log index", "error", err) } f.applyUpdateBatch(update) - if f.testHook != nil { - f.testHook(testHookInit) - } return true } @@ -295,16 +271,10 @@ func (f *FilterMaps) tryUpdateHead(newHead *types.Header) bool { if update.updatedRangeLength() >= f.mapsPerEpoch { // limit the amount of data updated in a single batch f.applyUpdateBatch(update) - if f.testHook != nil { - f.testHook(testHookUpdateHeadEpoch) - } update = f.newUpdateBatch() } } f.applyUpdateBatch(update) - if f.testHook != nil { - f.testHook(testHookUpdateHead) - } return true } @@ -342,9 +312,6 @@ func (f *FilterMaps) tryExtendTail(tailTarget uint64, stopFn func() bool) bool { if tailEpoch := update.tailEpoch(); tailEpoch < lastTailEpoch { // limit the amount of data updated in a single batch f.applyUpdateBatch(update) - if f.testHook != nil { - f.testHook(testHookExtendTailEpoch) - } update = f.newUpdateBatch() lastTailEpoch = tailEpoch } @@ -365,9 +332,6 @@ func (f *FilterMaps) tryExtendTail(tailTarget uint64, stopFn func() bool) bool { number, parentHash = newTail.Number.Uint64(), newTail.ParentHash } f.applyUpdateBatch(update) - if f.testHook != nil { - f.testHook(testHookExtendTail) - } return number <= tailTarget } @@ -406,9 +370,6 @@ func (f *FilterMaps) pruneTailPtr(tailTarget uint64) { fmr.tailBlockNumber, fmr.tailParentHash = tailTarget, tailParentHash fmr.tailBlockLvPointer = targetLvPointer f.setRange(f.db, fmr) - if f.testHook != nil { - f.testHook(testHookPruneTail) - } } // tryPruneTailMaps removes unused filter maps and corresponding log index @@ -461,6 +422,9 @@ func (f *FilterMaps) pruneMaps(first, afterLast uint32, removeLvPtr *uint64) { batch := f.db.NewBatch() for *removeLvPtr < nextBlockNumber { f.deleteBlockLvPointer(batch, *removeLvPtr) + if (*removeLvPtr)%revertPointFrequency == 0 { + rawdb.DeleteRevertPoint(batch, *removeLvPtr) + } (*removeLvPtr)++ } for mapIndex := first; mapIndex < afterLast; mapIndex++ { @@ -481,9 +445,6 @@ func (f *FilterMaps) pruneMaps(first, afterLast uint32, removeLvPtr *uint64) { if err := batch.Write(); err != nil { log.Crit("Could not write update batch", "error", err) } - if f.testHook != nil { - f.testHook(testHookPruneTailMaps) - } } // updateBatch is a memory overlay collecting changes to the index log structure @@ -873,6 +834,9 @@ func (f *FilterMaps) revertTo(rp *revertPoint) error { } for blockNumber := rp.blockNumber + 1; blockNumber <= f.headBlockNumber; blockNumber++ { f.deleteBlockLvPointer(batch, blockNumber) + if blockNumber%revertPointFrequency == 0 { + rawdb.DeleteRevertPoint(batch, blockNumber) + } } newRange := f.filterMapsRange newRange.headLvPointer = lvPointer @@ -882,8 +846,5 @@ func (f *FilterMaps) revertTo(rp *revertPoint) error { if err := batch.Write(); err != nil { log.Crit("Could not write update batch", "error", err) } - if f.testHook != nil { - f.testHook(testHookRevert) - } return nil } diff --git a/core/filtermaps/indexer_test.go b/core/filtermaps/indexer_test.go index 562fe79273..6a1f25fc1a 100644 --- a/core/filtermaps/indexer_test.go +++ b/core/filtermaps/indexer_test.go @@ -18,7 +18,6 @@ package filtermaps import ( "crypto/sha256" - "fmt" "math/big" "math/rand" "sync" @@ -40,72 +39,156 @@ var testParams = Params{ logValuesPerMap: 4, } -func TestIndexerSetHistory(t *testing.T) { +func TestIndexerRandomRange(t *testing.T) { ts := newTestSetup(t) - ts.setHistory(0, false) + defer ts.close() + + forks := make([][]common.Hash, 10) ts.chain.addBlocks(1000, 5, 2, 4, false) // 50 log values per block - ts.runUntilWait() - ts.checkLvRange(50) - ts.setHistory(100, false) - ts.runUntil(func() bool { - l := ts.lastRange.headLvPointer - ts.lastRange.tailLvPointer - return l > 44000 && l < 45000 - }) - ts.setHistory(200, false) - ts.runUntilWait() - ts.checkLvRange(50) - ts.setHistory(0, false) - ts.runUntilWait() - ts.checkLvRange(50) -} - -func TestIndexerRandomSetHistory(t *testing.T) { - ts := newTestSetup(t) - ts.chain.addBlocks(100, 5, 2, 4, false) // 50 log values per block - for i := 0; i < 3000; i++ { - ts.setHistory(uint64(rand.Intn(1001)), false) - ts.nextEvent() - for rand.Intn(20) != 0 && ts.lastEvent != testHookWait { - ts.nextEvent() - } - if ts.lastEvent == testHookWait { - ts.checkLvRange(50) + for i := range forks { + if i != 0 { + forkBlock := rand.Intn(1000) + ts.chain.setHead(forkBlock) + ts.chain.addBlocks(1000-forkBlock, 5, 2, 4, false) // 50 log values per block } + forks[i] = ts.chain.getCanonicalChain() } ts.setHistory(0, false) - ts.runUntilWait() - ts.checkLvRange(50) + var ( + history int + noHistory bool + fork, head = len(forks) - 1, 1000 + ) + ts.fm.WaitIdle() + for i := 0; i < 200; i++ { + switch rand.Intn(2) { + case 0: + // change history settings + switch rand.Intn(10) { + case 0: + history, noHistory = 0, false + case 1: + history, noHistory = 0, true + default: + history, noHistory = rand.Intn(1000)+1, false + } + ts.setHistory(uint64(history), noHistory) + case 1: + // change head + fork, head = rand.Intn(len(forks)), rand.Intn(1001) + ts.chain.setCanonicalChain(forks[fork][:head+1]) + } + ts.fm.WaitIdle() + fmr := ts.fm.getRange() + if noHistory { + if fmr.initialized { + t.Fatalf("filterMapsRange initialized while indexing is disabled") + } + continue + } + if !fmr.initialized { + t.Fatalf("filterMapsRange not initialized while indexing is enabled") + } + var ( + tail int + tpHash common.Hash + ) + if history > 0 && history <= head { + tail = head + 1 - history + } + if tail > 0 { + tpHash = forks[fork][tail-1] + } + if fmr.headBlockNumber != uint64(head) || fmr.headBlockHash != forks[fork][head] { + ts.t.Fatalf("Invalid index head (expected #%d %v, got #%d %v)", head, forks[fork][head], fmr.headBlockNumber, fmr.headBlockHash) + } + if fmr.tailBlockNumber != uint64(tail) || fmr.tailParentHash != tpHash { + ts.t.Fatalf("Invalid index head (expected #%d %v, got #%d %v)", tail, tpHash, fmr.tailBlockNumber, fmr.tailParentHash) + } + expLvCount := uint64(head+1-tail) * 50 + if tail == 0 { + expLvCount -= 50 // no logs in genesis block + } + if fmr.headLvPointer-fmr.tailBlockLvPointer != expLvCount { + ts.t.Fatalf("Invalid number of log values (expected %d, got %d)", expLvCount, fmr.headLvPointer-fmr.tailBlockLvPointer) + } + if fmr.tailBlockLvPointer-fmr.tailLvPointer >= ts.params.valuesPerMap { + ts.t.Fatalf("Invalid number of leftover tail log values (expected < %d, got %d)", ts.params.valuesPerMap, fmr.tailBlockLvPointer-fmr.tailLvPointer) + } + } } -func TestIndexerDbEquality(t *testing.T) { +func TestIndexerCompareDb(t *testing.T) { ts := newTestSetup(t) + defer ts.close() + ts.setHistory(0, false) - for i := 0; i < 10; i++ { - ts.chain.addBlocks(100, 10, 3, 4, true) - ts.runUntilWait() - } - hash1 := ts.fmDbHash() - fmt.Println(hash1) - ts.setHistory(500, false) - ts.runUntilWait() - hash2 := ts.fmDbHash() - fmt.Println(hash2) + ts.chain.addBlocks(500, 10, 3, 4, true) + ts.fm.WaitIdle() + // revert points are stored after block 500 + ts.chain.addBlocks(500, 10, 3, 4, true) + ts.fm.WaitIdle() + chain1 := ts.chain.getCanonicalChain() + ts.storeDbHash("chain 1 [0, 1000]") + + ts.chain.setHead(600) + ts.fm.WaitIdle() + ts.storeDbHash("chain 1/2 [0, 600]") + + ts.chain.addBlocks(600, 10, 3, 4, true) + ts.fm.WaitIdle() + chain2 := ts.chain.getCanonicalChain() + ts.storeDbHash("chain 2 [0, 1200]") + + ts.setHistory(800, false) + ts.fm.WaitIdle() + ts.storeDbHash("chain 2 [401, 1200]") + + ts.chain.setHead(600) + ts.fm.WaitIdle() + ts.checkDbHash("chain 1/2 [0, 600]") + + ts.chain.setCanonicalChain(chain1) + ts.fm.WaitIdle() + ts.storeDbHash("chain 1 [201, 1000]") + ts.setHistory(0, false) - ts.runUntilWait() - hash3 := ts.fmDbHash() - fmt.Println(hash3) + ts.fm.WaitIdle() + ts.checkDbHash("chain 1 [0, 1000]") + + ts.setHistory(0, true) + ts.fm.WaitIdle() + ts.storeDbHash("no index") + + ts.chain.setCanonicalChain(chain2[:501]) + ts.setHistory(0, false) + ts.fm.WaitIdle() + ts.chain.setCanonicalChain(chain2) + ts.fm.WaitIdle() + ts.checkDbHash("chain 2 [0, 1200]") + + ts.chain.setCanonicalChain(chain1) + ts.fm.WaitIdle() + ts.setHistory(800, false) + ts.fm.WaitIdle() + ts.checkDbHash("chain 1 [201, 1000]") + + ts.chain.setCanonicalChain(chain2) + ts.fm.WaitIdle() + ts.checkDbHash("chain 2 [401, 1200]") + + ts.setHistory(0, true) + ts.fm.WaitIdle() + ts.checkDbHash("no index") } type testSetup struct { - t *testing.T - fm *FilterMaps - db ethdb.Database - chain *testChain - params Params - eventCh chan int - resumeCh chan struct{} - lastEvent int - lastRange filterMapsRange + t *testing.T + fm *FilterMaps + db ethdb.Database + chain *testChain + params Params + dbHashes map[string]common.Hash } func newTestSetup(t *testing.T) *testSetup { @@ -116,76 +199,32 @@ func newTestSetup(t *testing.T) *testSetup { chain: newTestChain(), db: rawdb.NewMemoryDatabase(), params: params, - eventCh: make(chan int), - resumeCh: make(chan struct{}), - } -} - -func (ts *testSetup) runUntil(stop func() bool) { - for !stop() { - ts.nextEvent() - for ts.lastEvent == testHookWait { - ts.t.Fatalf("Indexer in waiting state before runUntil condition is met") - } - } -} - -func (ts *testSetup) runUntilWait() { - for { - ts.nextEvent() - for ts.lastEvent != testHookWait { - ts.nextEvent() - } - if ts.fm.getRange().headBlockHash == ts.chain.CurrentBlock().Hash() { - return - } - } -} - -func (ts *testSetup) checkLvRange(lvPerBlock uint64) { - expBlockCount := uint64(len(ts.chain.canonical) - 1) - if ts.fm.history != 0 && ts.fm.history < expBlockCount { - expBlockCount = ts.fm.history - } - if ts.lastRange.headLvPointer-ts.lastRange.tailBlockLvPointer != expBlockCount*lvPerBlock { - ts.t.Fatalf("Invalid number of log values (expected %d, got %d)", expBlockCount*lvPerBlock, ts.lastRange.headLvPointer-ts.lastRange.tailLvPointer) - } - if ts.lastRange.tailBlockLvPointer-ts.lastRange.tailLvPointer >= ts.params.valuesPerMap { - ts.t.Fatalf("Invalid number of leftover tail log values (expected < %d, got %d)", ts.params.valuesPerMap, ts.lastRange.tailBlockLvPointer-ts.lastRange.tailLvPointer) + dbHashes: make(map[string]common.Hash), } } func (ts *testSetup) setHistory(history uint64, noHistory bool) { if ts.fm != nil { - ts.stopFm() + ts.fm.Stop() } ts.fm = NewFilterMaps(ts.db, ts.chain, ts.params, history, noHistory) - ts.fm.testHook = ts.testHook ts.fm.Start() - ts.lastEvent = <-ts.eventCh } -func (ts *testSetup) testHook(event int) { - ts.eventCh <- event - <-ts.resumeCh -} - -func (ts *testSetup) nextEvent() { - ts.resumeCh <- struct{}{} - ts.lastEvent = <-ts.eventCh - ts.lastRange = ts.fm.getRange() -} - -func (ts *testSetup) stopFm() { - close(ts.fm.closeCh) - for { - ts.nextEvent() - if ts.lastEvent == testHookStop { - break +func (ts *testSetup) storeDbHash(id string) { + dbHash := ts.fmDbHash() + for otherId, otherHash := range ts.dbHashes { + if otherHash == dbHash { + ts.t.Fatalf("Unexpected equal database hashes `%s` and `%s`", id, otherId) } } - ts.resumeCh <- struct{}{} - ts.fm.closeWg.Wait() + ts.dbHashes[id] = dbHash +} + +func (ts *testSetup) checkDbHash(id string) { + if ts.fmDbHash() != ts.dbHashes[id] { + ts.t.Fatalf("Database `%s` hash mismatch", id) + } } func (ts *testSetup) fmDbHash() common.Hash { @@ -202,7 +241,9 @@ func (ts *testSetup) fmDbHash() common.Hash { } func (ts *testSetup) close() { - ts.stopFm() + if ts.fm != nil { + ts.fm.Stop() + } ts.db.Close() ts.chain.db.Close() } diff --git a/core/filtermaps/matcher_test.go b/core/filtermaps/matcher_test.go new file mode 100644 index 0000000000..21265d5f0e --- /dev/null +++ b/core/filtermaps/matcher_test.go @@ -0,0 +1,86 @@ +// Copyright 2024 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package filtermaps + +import ( + "context" + "math/rand" + "testing" + + "github.com/ethereum/go-ethereum/common" +) + +func TestMatcher(t *testing.T) { + ts := newTestSetup(t) + defer ts.close() + + ts.chain.addBlocks(1000, 10, 10, 4, true) + ts.setHistory(0, false) + ts.fm.WaitIdle() + + for i := 0; i < 500; i++ { + bhash := ts.chain.canonical[rand.Intn(len(ts.chain.canonical))] + receipts := ts.chain.receipts[bhash] + if len(receipts) == 0 { + continue + } + receipt := receipts[rand.Intn(len(receipts))] + if len(receipt.Logs) == 0 { + continue + } + log := receipt.Logs[rand.Intn(len(receipt.Logs))] + var ok bool + addresses := make([]common.Address, rand.Intn(3)) + for i := range addresses { + rand.Read(addresses[i][:]) + } + if len(addresses) > 0 { + addresses[rand.Intn(len(addresses))] = log.Address + ok = true + } + topics := make([][]common.Hash, rand.Intn(len(log.Topics)+1)) + for j := range topics { + topics[j] = make([]common.Hash, rand.Intn(3)) + for i := range topics[j] { + rand.Read(topics[j][i][:]) + } + if len(topics[j]) > 0 { + topics[j][rand.Intn(len(topics[j]))] = log.Topics[j] + ok = true + } + } + if !ok { + continue // cannot search for match-all pattern + } + mb := ts.fm.NewMatcherBackend() + logs, err := GetPotentialMatches(context.Background(), mb, 0, 1000, addresses, topics) + mb.Close() + if err != nil { + t.Fatalf("Log search error: %v", err) + } + var found bool + for _, l := range logs { + if l == log { + found = true + break + } + } + if !found { + t.Fatalf("Log search did not return expected log (addresses: %v, topics: %v, expected log: %v)", addresses, topics, *log) + } + } +} diff --git a/core/rawdb/schema.go b/core/rawdb/schema.go index 0948fa9d98..332c32a0ee 100644 --- a/core/rawdb/schema.go +++ b/core/rawdb/schema.go @@ -347,14 +347,14 @@ func IsStorageTrieNode(key []byte) bool { // filterMapRowKey = filterMapRowPrefix + mapRowIndex (uint64 big endian) func filterMapRowKey(mapRowIndex uint64) []byte { key := append(filterMapRowPrefix, make([]byte, 8)...) - binary.BigEndian.PutUint64(key[1:], mapRowIndex) + binary.BigEndian.PutUint64(key[len(filterMapRowPrefix):], mapRowIndex) return key } // filterMapBlockPtrKey = filterMapBlockPtrPrefix + mapIndex (uint32 big endian) func filterMapBlockPtrKey(mapIndex uint32) []byte { key := append(filterMapBlockPtrPrefix, make([]byte, 4)...) - binary.BigEndian.PutUint32(key[1:], mapIndex) + binary.BigEndian.PutUint32(key[len(filterMapBlockPtrPrefix):], mapIndex) return key } diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go index 83e93494f0..dfd2b8e52d 100644 --- a/eth/filters/filter_system_test.go +++ b/eth/filters/filter_system_test.go @@ -162,9 +162,7 @@ func (b *testBackend) NewMatcherBackend() filtermaps.MatcherBackend { func (b *testBackend) startFilterMaps(history uint64, noHistory bool) { b.fm = filtermaps.NewFilterMaps(b.db, b, filtermaps.DefaultParams, history, noHistory) b.fm.Start() - if !noHistory { - b.fm.WaitIdle() - } + b.fm.WaitIdle() } func (b *testBackend) stopFilterMaps() {