eth/filters: fixed tests, added more
This commit is contained in:
parent
d34c1b69c5
commit
d1f492e511
|
@ -20,7 +20,7 @@ const headCacheSize = 8 // maximum number of recent filter maps cached in memory
|
|||
// blockchain defines functions required by the FilterMaps log indexer.
|
||||
type blockchain interface {
|
||||
CurrentBlock() *types.Header
|
||||
SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription
|
||||
SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
|
||||
GetHeader(hash common.Hash, number uint64) *types.Header
|
||||
GetCanonicalHash(number uint64) common.Hash
|
||||
GetReceiptsByHash(hash common.Hash) types.Receipts
|
||||
|
@ -55,7 +55,8 @@ type FilterMaps struct {
|
|||
lvPointerCache *lru.Cache[uint64, uint64]
|
||||
revertPoints map[uint64]*revertPoint
|
||||
|
||||
testHook func(int)
|
||||
waitIdleCh chan chan bool
|
||||
testHook func(int)
|
||||
}
|
||||
|
||||
// filterMap is a full or partial in-memory representation of a filter map where
|
||||
|
@ -104,12 +105,13 @@ func NewFilterMaps(db ethdb.KeyValueStore, chain blockchain, params Params, hist
|
|||
}
|
||||
params.deriveFields()
|
||||
fm := &FilterMaps{
|
||||
db: db,
|
||||
chain: chain,
|
||||
closeCh: make(chan struct{}),
|
||||
history: history,
|
||||
noHistory: noHistory,
|
||||
Params: params,
|
||||
db: db,
|
||||
chain: chain,
|
||||
closeCh: make(chan struct{}),
|
||||
waitIdleCh: make(chan chan bool),
|
||||
history: history,
|
||||
noHistory: noHistory,
|
||||
Params: params,
|
||||
filterMapsRange: filterMapsRange{
|
||||
initialized: rs.Initialized,
|
||||
headLvPointer: rs.HeadLvPointer,
|
||||
|
|
|
@ -17,7 +17,9 @@ const (
|
|||
removedPointer = math.MaxUint64 // used in updateBatch to signal removed items
|
||||
revertPointFrequency = 256 // frequency of revert points in database
|
||||
cachedRevertPoints = 64 // revert points for most recent blocks in memory
|
||||
)
|
||||
|
||||
const (
|
||||
testHookInit = iota
|
||||
testHookUpdateHeadEpoch
|
||||
testHookUpdateHead
|
||||
|
@ -52,8 +54,8 @@ func (f *FilterMaps) updateLoop() {
|
|||
}
|
||||
|
||||
var (
|
||||
headEventCh = make(chan core.ChainHeadEvent, 10)
|
||||
sub = f.chain.SubscribeChainHeadEvent(headEventCh)
|
||||
headEventCh = make(chan core.ChainEvent, 10)
|
||||
sub = f.chain.SubscribeChainEvent(headEventCh)
|
||||
head = f.chain.CurrentBlock()
|
||||
stop bool
|
||||
syncMatcher *FilterMapsMatcherBackend
|
||||
|
@ -61,7 +63,7 @@ func (f *FilterMaps) updateLoop() {
|
|||
)
|
||||
|
||||
matcherSync := func() {
|
||||
if syncMatcher != nil && fmr.headBlockHash == head.Hash() {
|
||||
if syncMatcher != nil && fmr.initialized && fmr.headBlockHash == head.Hash() {
|
||||
syncMatcher.synced(head)
|
||||
syncMatcher = nil
|
||||
}
|
||||
|
@ -79,19 +81,32 @@ func (f *FilterMaps) updateLoop() {
|
|||
}
|
||||
delay := time.Second * 20
|
||||
if f.testHook != nil {
|
||||
f.testHook(testHookWait)
|
||||
delay = 0
|
||||
}
|
||||
select {
|
||||
case ev := <-headEventCh:
|
||||
head = ev.Block.Header()
|
||||
case syncMatcher = <-f.matcherSyncCh:
|
||||
head = f.chain.CurrentBlock()
|
||||
case <-f.closeCh:
|
||||
stop = true
|
||||
case <-time.After(delay):
|
||||
// keep updating log index during syncing
|
||||
head = f.chain.CurrentBlock()
|
||||
loop:
|
||||
for {
|
||||
select {
|
||||
case ev := <-headEventCh:
|
||||
head = ev.Block.Header()
|
||||
case syncMatcher = <-f.matcherSyncCh:
|
||||
head = f.chain.CurrentBlock()
|
||||
case <-f.closeCh:
|
||||
stop = true
|
||||
case ch := <-f.waitIdleCh:
|
||||
head = f.chain.CurrentBlock()
|
||||
if head.Hash() == f.getRange().headBlockHash {
|
||||
ch <- true
|
||||
continue loop
|
||||
}
|
||||
ch <- false
|
||||
case <-time.After(delay):
|
||||
// keep updating log index during syncing
|
||||
head = f.chain.CurrentBlock()
|
||||
if f.testHook != nil {
|
||||
f.testHook(testHookWait)
|
||||
}
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
for head == nil {
|
||||
|
@ -150,6 +165,18 @@ func (f *FilterMaps) updateLoop() {
|
|||
}
|
||||
}
|
||||
|
||||
// WaitIdle blocks until the indexer is in an idle state while synced up to the
|
||||
// latest chain head.
|
||||
func (f *FilterMaps) WaitIdle() {
|
||||
for {
|
||||
ch := make(chan bool)
|
||||
f.waitIdleCh <- ch
|
||||
if <-ch {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// getRange returns the current filterMapsRange.
|
||||
func (f *FilterMaps) getRange() filterMapsRange {
|
||||
f.lock.RLock()
|
||||
|
@ -804,7 +831,7 @@ func (f *FilterMaps) revertTo(rp *revertPoint) error {
|
|||
|
||||
batch := f.db.NewBatch()
|
||||
afterLastMap := uint32((f.headLvPointer + f.valuesPerMap - 1) >> f.logValuesPerMap)
|
||||
if rp.mapIndex >= afterLastMap {
|
||||
if rp.mapIndex > afterLastMap {
|
||||
return errors.New("cannot revert (head map behind revert point)")
|
||||
}
|
||||
lvPointer := uint64(rp.mapIndex) << f.logValuesPerMap
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
package filtermaps
|
||||
|
||||
import (
|
||||
"crypto/sha256"
|
||||
"fmt"
|
||||
"math/big"
|
||||
"math/rand"
|
||||
"sync"
|
||||
|
@ -59,6 +61,25 @@ func TestIndexerRandomSetHistory(t *testing.T) {
|
|||
ts.checkLvRange(50)
|
||||
}
|
||||
|
||||
func TestIndexerDbEquality(t *testing.T) {
|
||||
ts := newTestSetup(t)
|
||||
ts.setHistory(0, false)
|
||||
for i := 0; i < 10; i++ {
|
||||
ts.chain.addBlocks(100, 10, 3, 4, true)
|
||||
ts.runUntilWait()
|
||||
}
|
||||
hash1 := ts.fmDbHash()
|
||||
fmt.Println(hash1)
|
||||
ts.setHistory(500, false)
|
||||
ts.runUntilWait()
|
||||
hash2 := ts.fmDbHash()
|
||||
fmt.Println(hash2)
|
||||
ts.setHistory(0, false)
|
||||
ts.runUntilWait()
|
||||
hash3 := ts.fmDbHash()
|
||||
fmt.Println(hash3)
|
||||
}
|
||||
|
||||
type testSetup struct {
|
||||
t *testing.T
|
||||
fm *FilterMaps
|
||||
|
@ -94,9 +115,14 @@ func (ts *testSetup) runUntil(stop func() bool) {
|
|||
}
|
||||
|
||||
func (ts *testSetup) runUntilWait() {
|
||||
ts.nextEvent()
|
||||
for ts.lastEvent != testHookWait {
|
||||
for {
|
||||
ts.nextEvent()
|
||||
for ts.lastEvent != testHookWait {
|
||||
ts.nextEvent()
|
||||
}
|
||||
if ts.fm.getRange().headBlockHash == ts.chain.CurrentBlock().Hash() {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -146,6 +172,19 @@ func (ts *testSetup) stopFm() {
|
|||
ts.fm.closeWg.Wait()
|
||||
}
|
||||
|
||||
func (ts *testSetup) fmDbHash() common.Hash {
|
||||
hasher := sha256.New()
|
||||
it := ts.db.NewIterator(nil, nil)
|
||||
for it.Next() {
|
||||
hasher.Write(it.Key())
|
||||
hasher.Write(it.Value())
|
||||
}
|
||||
it.Release()
|
||||
var result common.Hash
|
||||
hasher.Sum(result[:0])
|
||||
return result
|
||||
}
|
||||
|
||||
func (ts *testSetup) close() {
|
||||
ts.stopFm()
|
||||
ts.db.Close()
|
||||
|
@ -178,7 +217,7 @@ func (tc *testChain) CurrentBlock() *types.Header {
|
|||
return tc.blocks[tc.canonical[len(tc.canonical)-1]].Header()
|
||||
}
|
||||
|
||||
func (tc *testChain) SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription {
|
||||
func (tc *testChain) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription {
|
||||
return tc.chainHeadFeed.Subscribe(ch)
|
||||
}
|
||||
|
||||
|
@ -281,5 +320,32 @@ func (tc *testChain) addBlocks(count, maxTxPerBlock, maxLogsPerReceipt, maxTopic
|
|||
tc.receipts[hash] = types.Receipts{}
|
||||
}
|
||||
}
|
||||
tc.chainHeadFeed.Send(core.ChainHeadEvent{Block: tc.blocks[tc.canonical[len(tc.canonical)-1]]})
|
||||
tc.chainHeadFeed.Send(core.ChainEvent{Block: tc.blocks[tc.canonical[len(tc.canonical)-1]]})
|
||||
}
|
||||
|
||||
func (tc *testChain) setHead(headNum int) {
|
||||
tc.lock.Lock()
|
||||
defer tc.lock.Unlock()
|
||||
|
||||
tc.canonical = tc.canonical[:headNum+1]
|
||||
tc.chainHeadFeed.Send(core.ChainEvent{Block: tc.blocks[tc.canonical[len(tc.canonical)-1]]})
|
||||
}
|
||||
|
||||
func (tc *testChain) getCanonicalChain() []common.Hash {
|
||||
tc.lock.RLock()
|
||||
defer tc.lock.RUnlock()
|
||||
|
||||
cc := make([]common.Hash, len(tc.canonical))
|
||||
copy(cc, tc.canonical)
|
||||
return cc
|
||||
}
|
||||
|
||||
// restore an earlier state of the chain
|
||||
func (tc *testChain) setCanonicalChain(cc []common.Hash) {
|
||||
tc.lock.Lock()
|
||||
defer tc.lock.Unlock()
|
||||
|
||||
tc.canonical = make([]common.Hash, len(cc))
|
||||
copy(tc.canonical, cc)
|
||||
tc.chainHeadFeed.Send(core.ChainEvent{Block: tc.blocks[tc.canonical[len(tc.canonical)-1]]})
|
||||
}
|
||||
|
|
|
@ -118,6 +118,13 @@ func (fm *FilterMapsMatcherBackend) synced(head *types.Header) {
|
|||
// range that has not been changed and has been consistent with all states of the
|
||||
// chain since the previous SyncLogIndex or the creation of the matcher backend.
|
||||
func (fm *FilterMapsMatcherBackend) SyncLogIndex(ctx context.Context) (SyncRange, error) {
|
||||
if fm.f.noHistory {
|
||||
head := fm.f.chain.CurrentBlock()
|
||||
if head == nil {
|
||||
return SyncRange{}, errors.New("canonical chain head not available")
|
||||
}
|
||||
return SyncRange{Head: head}, nil
|
||||
}
|
||||
// add SyncRange return channel, ensuring that
|
||||
syncCh := make(chan SyncRange, 1)
|
||||
fm.f.lock.Lock()
|
||||
|
|
|
@ -17,7 +17,6 @@
|
|||
package rawdb
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"math/big"
|
||||
"testing"
|
||||
|
||||
|
@ -25,7 +24,6 @@ import (
|
|||
"github.com/ethereum/go-ethereum/core/types"
|
||||
"github.com/ethereum/go-ethereum/ethdb"
|
||||
"github.com/ethereum/go-ethereum/internal/blocktest"
|
||||
"github.com/ethereum/go-ethereum/params"
|
||||
"github.com/ethereum/go-ethereum/rlp"
|
||||
)
|
||||
|
||||
|
|
|
@ -375,6 +375,7 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error {
|
|||
accountSnaps stat
|
||||
storageSnaps stat
|
||||
preimages stat
|
||||
filterMaps stat
|
||||
beaconHeaders stat
|
||||
cliqueSnaps stat
|
||||
|
||||
|
@ -425,6 +426,8 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error {
|
|||
codes.Add(size)
|
||||
case bytes.HasPrefix(key, txLookupPrefix) && len(key) == (len(txLookupPrefix)+common.HashLength):
|
||||
txLookups.Add(size)
|
||||
case bytes.HasPrefix(key, FilterMapsPrefix):
|
||||
filterMaps.Add(size)
|
||||
case bytes.HasPrefix(key, SnapshotAccountPrefix) && len(key) == (len(SnapshotAccountPrefix)+common.HashLength):
|
||||
accountSnaps.Add(size)
|
||||
case bytes.HasPrefix(key, SnapshotStoragePrefix) && len(key) == (len(SnapshotStoragePrefix)+2*common.HashLength):
|
||||
|
@ -499,6 +502,7 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error {
|
|||
{"Key-Value store", "Block number->hash", numHashPairings.Size(), numHashPairings.Count()},
|
||||
{"Key-Value store", "Block hash->number", hashNumPairings.Size(), hashNumPairings.Count()},
|
||||
{"Key-Value store", "Transaction index", txLookups.Size(), txLookups.Count()},
|
||||
{"Key-Value store", "Log search index", filterMaps.Size(), filterMaps.Count()},
|
||||
{"Key-Value store", "Contract codes", codes.Size(), codes.Count()},
|
||||
{"Key-Value store", "Hash trie nodes", legacyTries.Size(), legacyTries.Count()},
|
||||
{"Key-Value store", "Path trie state lookups", stateLookups.Size(), stateLookups.Count()},
|
||||
|
|
|
@ -37,9 +37,10 @@ type Filter struct {
|
|||
addresses []common.Address
|
||||
topics [][]common.Hash
|
||||
|
||||
block *common.Hash // Block hash if filtering a single block
|
||||
begin, end int64 // Range interval if filtering multiple blocks
|
||||
bbMatchCount uint64
|
||||
block *common.Hash // Block hash if filtering a single block
|
||||
begin, end int64 // Range interval if filtering multiple blocks
|
||||
|
||||
rangeLogsTestHook chan rangeLogsTestEvent
|
||||
}
|
||||
|
||||
// NewRangeFilter creates a new filter which uses a bloom filter on blocks to
|
||||
|
@ -131,10 +132,31 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
|
|||
return f.rangeLogs(ctx, begin, end)
|
||||
}
|
||||
|
||||
const (
|
||||
rangeLogsTestSync = iota
|
||||
rangeLogsTestTrimmed
|
||||
rangeLogsTestIndexed
|
||||
rangeLogsTestUnindexed
|
||||
rangeLogsTestDone
|
||||
)
|
||||
|
||||
type rangeLogsTestEvent struct {
|
||||
event int
|
||||
begin, end uint64
|
||||
}
|
||||
|
||||
func (f *Filter) rangeLogs(ctx context.Context, firstBlock, lastBlock uint64) ([]*types.Log, error) {
|
||||
if firstBlock > lastBlock {
|
||||
return nil, errors.New("invalid search range")
|
||||
if f.rangeLogsTestHook != nil {
|
||||
defer func() {
|
||||
f.rangeLogsTestHook <- rangeLogsTestEvent{rangeLogsTestDone, 0, 0}
|
||||
close(f.rangeLogsTestHook)
|
||||
}()
|
||||
}
|
||||
|
||||
if firstBlock > lastBlock {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
mb := f.sys.backend.NewMatcherBackend()
|
||||
defer mb.Close()
|
||||
|
||||
|
@ -144,6 +166,21 @@ func (f *Filter) rangeLogs(ctx context.Context, firstBlock, lastBlock uint64) ([
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !syncRange.Indexed {
|
||||
// fallback to completely unindexed search
|
||||
headNum := syncRange.Head.Number.Uint64()
|
||||
if firstBlock > headNum {
|
||||
firstBlock = headNum
|
||||
}
|
||||
if lastBlock > headNum {
|
||||
lastBlock = headNum
|
||||
}
|
||||
if f.rangeLogsTestHook != nil {
|
||||
f.rangeLogsTestHook <- rangeLogsTestEvent{rangeLogsTestUnindexed, firstBlock, lastBlock}
|
||||
}
|
||||
return f.unindexedLogs(ctx, firstBlock, lastBlock)
|
||||
}
|
||||
|
||||
headBlock := syncRange.Head.Number.Uint64() // Head is guaranteed != nil
|
||||
// if haveMatches == true then matches correspond to the block number range
|
||||
// between matchFirst and matchLast
|
||||
|
@ -157,7 +194,7 @@ func (f *Filter) rangeLogs(ctx context.Context, firstBlock, lastBlock uint64) ([
|
|||
return
|
||||
}
|
||||
if trimLast < matchFirst || trimFirst > matchLast {
|
||||
matches, haveMatches = nil, false
|
||||
matches, haveMatches, matchFirst, matchLast = nil, false, 0, 0
|
||||
return
|
||||
}
|
||||
if trimFirst > matchFirst {
|
||||
|
@ -192,6 +229,9 @@ func (f *Filter) rangeLogs(ctx context.Context, firstBlock, lastBlock uint64) ([
|
|||
var trimTailIfNotValid uint64
|
||||
if haveMatches && matchFirst > searchFirst {
|
||||
// missing tail section; do unindexed search
|
||||
if f.rangeLogsTestHook != nil {
|
||||
f.rangeLogsTestHook <- rangeLogsTestEvent{rangeLogsTestUnindexed, searchFirst, matchFirst - 1}
|
||||
}
|
||||
tailMatches, err := f.unindexedLogs(ctx, searchFirst, matchFirst-1)
|
||||
if err != nil {
|
||||
return matches, err
|
||||
|
@ -200,56 +240,67 @@ func (f *Filter) rangeLogs(ctx context.Context, firstBlock, lastBlock uint64) ([
|
|||
matchFirst = searchFirst
|
||||
// unindexed results are not affected by valid tail; do not trim tail
|
||||
trimTailIfNotValid = math.MaxUint64
|
||||
}
|
||||
// now if we have matches, they start at searchFirst
|
||||
if haveMatches {
|
||||
searchFirst = matchLast + 1
|
||||
if !syncRange.Indexed || syncRange.FirstIndexed > searchFirst {
|
||||
forceUnindexed = true
|
||||
}
|
||||
}
|
||||
var newMatches []*types.Log
|
||||
if !syncRange.Indexed || syncRange.FirstIndexed > searchLast || syncRange.LastIndexed < searchFirst {
|
||||
forceUnindexed = true
|
||||
}
|
||||
if !forceUnindexed {
|
||||
if syncRange.FirstIndexed > searchFirst {
|
||||
searchFirst = syncRange.FirstIndexed
|
||||
}
|
||||
if syncRange.LastIndexed > searchLast {
|
||||
searchLast = syncRange.LastIndexed
|
||||
}
|
||||
newMatches, err = f.indexedLogs(ctx, mb, searchFirst, searchLast)
|
||||
// trim tail if it affects the indexed search range
|
||||
trimTailIfNotValid = searchFirst
|
||||
if err == filtermaps.ErrMatchAll {
|
||||
// "match all" filters are not supported by filtermaps; fall back
|
||||
// to unindexed search which is the most efficient in this case
|
||||
forceUnindexed = true
|
||||
}
|
||||
}
|
||||
if forceUnindexed {
|
||||
newMatches, err = f.unindexedLogs(ctx, searchFirst, searchLast)
|
||||
// unindexed results are not affected by valid tail; do not trim tail
|
||||
trimTailIfNotValid = math.MaxUint64
|
||||
}
|
||||
if err != nil {
|
||||
return matches, err
|
||||
}
|
||||
if matches == nil {
|
||||
matches = newMatches
|
||||
haveMatches, matchFirst, matchLast = true, searchFirst, searchLast
|
||||
} else {
|
||||
matches = append(matches, newMatches...)
|
||||
matchLast = searchLast
|
||||
// if we have matches, they start at searchFirst
|
||||
if haveMatches {
|
||||
searchFirst = matchLast + 1
|
||||
if !syncRange.Indexed || syncRange.FirstIndexed > searchFirst {
|
||||
forceUnindexed = true
|
||||
}
|
||||
}
|
||||
var newMatches []*types.Log
|
||||
if !syncRange.Indexed || syncRange.FirstIndexed > searchLast || syncRange.LastIndexed < searchFirst {
|
||||
forceUnindexed = true
|
||||
}
|
||||
if !forceUnindexed {
|
||||
if syncRange.FirstIndexed > searchFirst {
|
||||
searchFirst = syncRange.FirstIndexed
|
||||
}
|
||||
if syncRange.LastIndexed < searchLast {
|
||||
searchLast = syncRange.LastIndexed
|
||||
}
|
||||
if f.rangeLogsTestHook != nil {
|
||||
f.rangeLogsTestHook <- rangeLogsTestEvent{rangeLogsTestIndexed, searchFirst, searchLast}
|
||||
}
|
||||
newMatches, err = f.indexedLogs(ctx, mb, searchFirst, searchLast)
|
||||
// trim tail if it affects the indexed search range
|
||||
trimTailIfNotValid = searchFirst
|
||||
if err == filtermaps.ErrMatchAll {
|
||||
// "match all" filters are not supported by filtermaps; fall back
|
||||
// to unindexed search which is the most efficient in this case
|
||||
forceUnindexed = true
|
||||
}
|
||||
}
|
||||
if forceUnindexed {
|
||||
if f.rangeLogsTestHook != nil {
|
||||
f.rangeLogsTestHook <- rangeLogsTestEvent{rangeLogsTestUnindexed, searchFirst, searchLast}
|
||||
}
|
||||
newMatches, err = f.unindexedLogs(ctx, searchFirst, searchLast)
|
||||
// unindexed results are not affected by valid tail; do not trim tail
|
||||
trimTailIfNotValid = math.MaxUint64
|
||||
}
|
||||
if err != nil {
|
||||
return matches, err
|
||||
}
|
||||
if !haveMatches {
|
||||
matches = newMatches
|
||||
haveMatches, matchFirst, matchLast = true, searchFirst, searchLast
|
||||
} else {
|
||||
matches = append(matches, newMatches...)
|
||||
matchLast = searchLast
|
||||
}
|
||||
}
|
||||
|
||||
if f.rangeLogsTestHook != nil {
|
||||
f.rangeLogsTestHook <- rangeLogsTestEvent{event: rangeLogsTestSync, begin: matchFirst, end: matchLast}
|
||||
}
|
||||
syncRange, err = mb.SyncLogIndex(ctx)
|
||||
if err != nil {
|
||||
return matches, err
|
||||
}
|
||||
headBlock = syncRange.Head.Number.Uint64() // Head is guaranteed != nil
|
||||
if !syncRange.Valid {
|
||||
matches, haveMatches = nil, false
|
||||
matches, haveMatches, matchFirst, matchLast = nil, false, 0, 0
|
||||
} else {
|
||||
if syncRange.FirstValid > trimTailIfNotValid {
|
||||
trimMatches(syncRange.FirstValid, syncRange.LastValid)
|
||||
|
@ -257,37 +308,42 @@ func (f *Filter) rangeLogs(ctx context.Context, firstBlock, lastBlock uint64) ([
|
|||
trimMatches(0, syncRange.LastValid)
|
||||
}
|
||||
}
|
||||
if f.rangeLogsTestHook != nil {
|
||||
f.rangeLogsTestHook <- rangeLogsTestEvent{event: rangeLogsTestTrimmed, begin: matchFirst, end: matchLast}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (f *Filter) indexedLogs(ctx context.Context, mb filtermaps.MatcherBackend, begin, end uint64) ([]*types.Log, error) {
|
||||
logs, err := filtermaps.GetPotentialMatches(ctx, mb, begin, end, f.addresses, f.topics)
|
||||
logs = filterLogs(logs, nil, nil, f.addresses, f.topics)
|
||||
return logs, err
|
||||
potentialMatches, err := filtermaps.GetPotentialMatches(ctx, mb, begin, end, f.addresses, f.topics)
|
||||
matches := filterLogs(potentialMatches, nil, nil, f.addresses, f.topics)
|
||||
log.Trace("Performed indexed log search", "begin", begin, "end", end, "true matches", len(matches), "false positives", len(potentialMatches)-len(matches))
|
||||
return matches, err
|
||||
}
|
||||
|
||||
// unindexedLogs returns the logs matching the filter criteria based on raw block
|
||||
// iteration and bloom matching.
|
||||
func (f *Filter) unindexedLogs(ctx context.Context, begin, end uint64) ([]*types.Log, error) {
|
||||
log.Warn("Performing unindexed log search", "begin", begin, "end", end)
|
||||
var logs []*types.Log
|
||||
var matches []*types.Log
|
||||
for blockNumber := begin; blockNumber <= end; blockNumber++ {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return logs, ctx.Err()
|
||||
return matches, ctx.Err()
|
||||
default:
|
||||
}
|
||||
header, err := f.sys.backend.HeaderByNumber(ctx, rpc.BlockNumber(blockNumber))
|
||||
if header == nil || err != nil {
|
||||
return logs, err
|
||||
return matches, err
|
||||
}
|
||||
found, err := f.blockLogs(ctx, header)
|
||||
if err != nil {
|
||||
return logs, err
|
||||
return matches, err
|
||||
}
|
||||
logs = append(logs, found...)
|
||||
matches = append(matches, found...)
|
||||
}
|
||||
return logs, nil
|
||||
log.Trace("Performed unindexed log search", "begin", begin, "end", end, "matches", len(matches))
|
||||
return matches, nil
|
||||
}
|
||||
|
||||
// blockLogs returns the logs matching the filter criteria within a single block.
|
||||
|
|
|
@ -40,6 +40,7 @@ import (
|
|||
|
||||
type testBackend struct {
|
||||
db ethdb.Database
|
||||
fm *filtermaps.FilterMaps
|
||||
sections uint64
|
||||
txFeed event.Feed
|
||||
logsFeed event.Feed
|
||||
|
@ -58,10 +59,28 @@ func (b *testBackend) CurrentHeader() *types.Header {
|
|||
return hdr
|
||||
}
|
||||
|
||||
func (b *testBackend) CurrentBlock() *types.Header {
|
||||
return b.CurrentHeader()
|
||||
}
|
||||
|
||||
func (b *testBackend) ChainDb() ethdb.Database {
|
||||
return b.db
|
||||
}
|
||||
|
||||
func (b *testBackend) GetCanonicalHash(number uint64) common.Hash {
|
||||
return rawdb.ReadCanonicalHash(b.db, number)
|
||||
}
|
||||
|
||||
func (b *testBackend) GetHeader(hash common.Hash, number uint64) *types.Header {
|
||||
hdr, _ := b.HeaderByHash(context.Background(), hash)
|
||||
return hdr
|
||||
}
|
||||
|
||||
func (b *testBackend) GetReceiptsByHash(hash common.Hash) types.Receipts {
|
||||
r, _ := b.GetReceipts(context.Background(), hash)
|
||||
return r
|
||||
}
|
||||
|
||||
func (b *testBackend) HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error) {
|
||||
var (
|
||||
hash common.Hash
|
||||
|
@ -137,9 +156,20 @@ func (b *testBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subsc
|
|||
}
|
||||
|
||||
func (b *testBackend) NewMatcherBackend() filtermaps.MatcherBackend {
|
||||
fm := filtermaps.NewFilterMaps(b.db, b, filtermaps.DefaultParams, 0, false)
|
||||
fm.Start()
|
||||
return fm.NewMatcherBackend()
|
||||
return b.fm.NewMatcherBackend()
|
||||
}
|
||||
|
||||
func (b *testBackend) startFilterMaps(history uint64, noHistory bool) {
|
||||
b.fm = filtermaps.NewFilterMaps(b.db, b, filtermaps.DefaultParams, history, noHistory)
|
||||
b.fm.Start()
|
||||
if !noHistory {
|
||||
b.fm.WaitIdle()
|
||||
}
|
||||
}
|
||||
|
||||
func (b *testBackend) stopFilterMaps() {
|
||||
b.fm.Stop()
|
||||
b.fm = nil
|
||||
}
|
||||
|
||||
func (b *testBackend) setPending(block *types.Block, receipts types.Receipts) {
|
||||
|
|
|
@ -46,15 +46,27 @@ func makeReceipt(addr common.Address) *types.Receipt {
|
|||
return receipt
|
||||
}
|
||||
|
||||
func BenchmarkFilters(b *testing.B) {
|
||||
func BenchmarkFiltersIndexed(b *testing.B) {
|
||||
benchmarkFilters(b, 0, false)
|
||||
}
|
||||
|
||||
func BenchmarkFiltersHalfIndexed(b *testing.B) {
|
||||
benchmarkFilters(b, 50000, false)
|
||||
}
|
||||
|
||||
func BenchmarkFiltersUnindexed(b *testing.B) {
|
||||
benchmarkFilters(b, 0, true)
|
||||
}
|
||||
|
||||
func benchmarkFilters(b *testing.B, history uint64, noHistory bool) {
|
||||
var (
|
||||
db = rawdb.NewMemoryDatabase()
|
||||
_, sys = newTestFilterSystem(b, db, Config{})
|
||||
key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
|
||||
addr1 = crypto.PubkeyToAddress(key1.PublicKey)
|
||||
addr2 = common.BytesToAddress([]byte("jeff"))
|
||||
addr3 = common.BytesToAddress([]byte("ethereum"))
|
||||
addr4 = common.BytesToAddress([]byte("random addresses please"))
|
||||
db = rawdb.NewMemoryDatabase()
|
||||
backend, sys = newTestFilterSystem(b, db, Config{})
|
||||
key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
|
||||
addr1 = crypto.PubkeyToAddress(key1.PublicKey)
|
||||
addr2 = common.BytesToAddress([]byte("jeff"))
|
||||
addr3 = common.BytesToAddress([]byte("ethereum"))
|
||||
addr4 = common.BytesToAddress([]byte("random addresses please"))
|
||||
|
||||
gspec = &core.Genesis{
|
||||
Alloc: types.GenesisAlloc{addr1: {Balance: big.NewInt(1000000)}},
|
||||
|
@ -94,9 +106,12 @@ func BenchmarkFilters(b *testing.B) {
|
|||
rawdb.WriteHeadBlockHash(db, block.Hash())
|
||||
rawdb.WriteReceipts(db, block.Hash(), block.NumberU64(), receipts[i])
|
||||
}
|
||||
backend.startFilterMaps(history, noHistory)
|
||||
defer backend.stopFilterMaps()
|
||||
|
||||
b.ResetTimer()
|
||||
|
||||
filter := sys.NewRangeFilter(0, -1, []common.Address{addr1, addr2, addr3, addr4}, nil)
|
||||
filter := sys.NewRangeFilter(0, int64(rpc.LatestBlockNumber), []common.Address{addr1, addr2, addr3, addr4}, nil)
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
filter.begin = 0
|
||||
|
@ -107,7 +122,19 @@ func BenchmarkFilters(b *testing.B) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestFilters(t *testing.T) {
|
||||
func TestFiltersIndexed(t *testing.T) {
|
||||
testFilters(t, 0, false)
|
||||
}
|
||||
|
||||
func TestFiltersHalfIndexed(t *testing.T) {
|
||||
testFilters(t, 500, false)
|
||||
}
|
||||
|
||||
func TestFiltersUnindexed(t *testing.T) {
|
||||
testFilters(t, 0, true)
|
||||
}
|
||||
|
||||
func testFilters(t *testing.T, history uint64, noHistory bool) {
|
||||
var (
|
||||
db = rawdb.NewMemoryDatabase()
|
||||
backend, sys = newTestFilterSystem(t, db, Config{})
|
||||
|
@ -279,6 +306,9 @@ func TestFilters(t *testing.T) {
|
|||
})
|
||||
backend.setPending(pchain[0], preceipts[0])
|
||||
|
||||
backend.startFilterMaps(history, noHistory)
|
||||
defer backend.stopFilterMaps()
|
||||
|
||||
for i, tc := range []struct {
|
||||
f *Filter
|
||||
want string
|
||||
|
@ -387,3 +417,137 @@ func TestFilters(t *testing.T) {
|
|||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestRangeLogs(t *testing.T) {
|
||||
var (
|
||||
db = rawdb.NewMemoryDatabase()
|
||||
backend, sys = newTestFilterSystem(t, db, Config{})
|
||||
gspec = &core.Genesis{
|
||||
Config: params.TestChainConfig,
|
||||
Alloc: types.GenesisAlloc{},
|
||||
BaseFee: big.NewInt(params.InitialBaseFee),
|
||||
}
|
||||
)
|
||||
_, err := gspec.Commit(db, triedb.NewDatabase(db, nil))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
chain, _ := core.GenerateChain(gspec.Config, gspec.ToBlock(), ethash.NewFaker(), db, 1000, func(i int, gen *core.BlockGen) {})
|
||||
var l uint64
|
||||
bc, err := core.NewBlockChain(db, nil, gspec, nil, ethash.NewFaker(), vm.Config{}, &l)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
_, err = bc.InsertChain(chain[:600])
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
backend.startFilterMaps(200, false)
|
||||
defer backend.stopFilterMaps()
|
||||
|
||||
var (
|
||||
testCase, event int
|
||||
filter *Filter
|
||||
addresses = []common.Address{common.Address{}}
|
||||
)
|
||||
|
||||
newFilter := func(begin, end int64) {
|
||||
testCase++
|
||||
event = 0
|
||||
filter = sys.NewRangeFilter(begin, end, addresses, nil)
|
||||
filter.rangeLogsTestHook = make(chan rangeLogsTestEvent)
|
||||
go func(filter *Filter) {
|
||||
filter.Logs(context.Background())
|
||||
// ensure that filter will not be blocked if we exit early
|
||||
for _ = range filter.rangeLogsTestHook {
|
||||
}
|
||||
}(filter)
|
||||
}
|
||||
|
||||
expEvent := func(exp rangeLogsTestEvent) {
|
||||
event++
|
||||
ev := <-filter.rangeLogsTestHook
|
||||
if ev != exp {
|
||||
t.Fatalf("Test case #%d: wrong test event #%d received (got %v, expected %v)", testCase, event, ev, exp)
|
||||
}
|
||||
}
|
||||
|
||||
// test case #1
|
||||
newFilter(300, 500)
|
||||
expEvent(rangeLogsTestEvent{rangeLogsTestIndexed, 401, 500})
|
||||
expEvent(rangeLogsTestEvent{rangeLogsTestSync, 401, 500})
|
||||
expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 401, 500})
|
||||
expEvent(rangeLogsTestEvent{rangeLogsTestUnindexed, 300, 400})
|
||||
if _, err := bc.InsertChain(chain[600:700]); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
backend.fm.WaitIdle()
|
||||
expEvent(rangeLogsTestEvent{rangeLogsTestSync, 300, 500})
|
||||
expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 300, 500}) // unindexed search is not affected by trimmed tail
|
||||
expEvent(rangeLogsTestEvent{rangeLogsTestDone, 0, 0})
|
||||
|
||||
// test case #2
|
||||
newFilter(400, int64(rpc.LatestBlockNumber))
|
||||
expEvent(rangeLogsTestEvent{rangeLogsTestIndexed, 501, 700})
|
||||
if _, err := bc.InsertChain(chain[700:800]); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
backend.fm.WaitIdle()
|
||||
expEvent(rangeLogsTestEvent{rangeLogsTestSync, 501, 700})
|
||||
expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 601, 700})
|
||||
expEvent(rangeLogsTestEvent{rangeLogsTestUnindexed, 400, 600})
|
||||
expEvent(rangeLogsTestEvent{rangeLogsTestSync, 400, 700})
|
||||
expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 400, 700})
|
||||
expEvent(rangeLogsTestEvent{rangeLogsTestIndexed, 701, 800})
|
||||
if err := bc.SetHead(750); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
backend.fm.WaitIdle()
|
||||
expEvent(rangeLogsTestEvent{rangeLogsTestSync, 400, 800})
|
||||
expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 400, 750})
|
||||
expEvent(rangeLogsTestEvent{rangeLogsTestDone, 0, 0})
|
||||
|
||||
// test case #3
|
||||
newFilter(int64(rpc.LatestBlockNumber), int64(rpc.LatestBlockNumber))
|
||||
expEvent(rangeLogsTestEvent{rangeLogsTestIndexed, 750, 750})
|
||||
if err := bc.SetHead(740); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
backend.fm.WaitIdle()
|
||||
expEvent(rangeLogsTestEvent{rangeLogsTestSync, 750, 750})
|
||||
expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 0, 0})
|
||||
expEvent(rangeLogsTestEvent{rangeLogsTestIndexed, 740, 740})
|
||||
if _, err := bc.InsertChain(chain[740:750]); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
backend.fm.WaitIdle()
|
||||
expEvent(rangeLogsTestEvent{rangeLogsTestSync, 740, 740})
|
||||
// trimmed at the beginning of the next iteration
|
||||
expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 740, 740})
|
||||
expEvent(rangeLogsTestEvent{rangeLogsTestIndexed, 750, 750})
|
||||
expEvent(rangeLogsTestEvent{rangeLogsTestSync, 750, 750})
|
||||
expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 750, 750})
|
||||
expEvent(rangeLogsTestEvent{rangeLogsTestDone, 0, 0})
|
||||
|
||||
// test case #4
|
||||
newFilter(400, int64(rpc.LatestBlockNumber))
|
||||
expEvent(rangeLogsTestEvent{rangeLogsTestIndexed, 551, 750})
|
||||
expEvent(rangeLogsTestEvent{rangeLogsTestSync, 551, 750})
|
||||
expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 551, 750})
|
||||
expEvent(rangeLogsTestEvent{rangeLogsTestUnindexed, 400, 550})
|
||||
if _, err := bc.InsertChain(chain[750:1000]); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
backend.fm.WaitIdle()
|
||||
expEvent(rangeLogsTestEvent{rangeLogsTestSync, 400, 750})
|
||||
// indexed range affected by tail pruning so we have to discard the entire
|
||||
// match set
|
||||
expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 0, 0})
|
||||
expEvent(rangeLogsTestEvent{rangeLogsTestIndexed, 801, 1000})
|
||||
expEvent(rangeLogsTestEvent{rangeLogsTestSync, 801, 1000})
|
||||
expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 801, 1000})
|
||||
expEvent(rangeLogsTestEvent{rangeLogsTestUnindexed, 400, 800})
|
||||
expEvent(rangeLogsTestEvent{rangeLogsTestSync, 400, 1000})
|
||||
expEvent(rangeLogsTestEvent{rangeLogsTestTrimmed, 400, 1000})
|
||||
}
|
||||
|
|
|
@ -45,6 +45,7 @@ import (
|
|||
"github.com/ethereum/go-ethereum/consensus/beacon"
|
||||
"github.com/ethereum/go-ethereum/consensus/ethash"
|
||||
"github.com/ethereum/go-ethereum/core"
|
||||
"github.com/ethereum/go-ethereum/core/filtermaps"
|
||||
"github.com/ethereum/go-ethereum/core/rawdb"
|
||||
"github.com/ethereum/go-ethereum/core/state"
|
||||
"github.com/ethereum/go-ethereum/core/types"
|
||||
|
@ -619,6 +620,9 @@ func (b testBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent)
|
|||
func (b testBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription {
|
||||
panic("implement me")
|
||||
}
|
||||
func (b testBackend) NewMatcherBackend() filtermaps.MatcherBackend {
|
||||
panic("implement me")
|
||||
}
|
||||
func TestEstimateGas(t *testing.T) {
|
||||
t.Parallel()
|
||||
// Initialize test accounts
|
||||
|
|
|
@ -30,6 +30,7 @@ import (
|
|||
"github.com/ethereum/go-ethereum/common/hexutil"
|
||||
"github.com/ethereum/go-ethereum/consensus"
|
||||
"github.com/ethereum/go-ethereum/core"
|
||||
"github.com/ethereum/go-ethereum/core/filtermaps"
|
||||
"github.com/ethereum/go-ethereum/core/state"
|
||||
"github.com/ethereum/go-ethereum/core/types"
|
||||
"github.com/ethereum/go-ethereum/core/vm"
|
||||
|
@ -399,3 +400,5 @@ func (b *backendMock) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent)
|
|||
}
|
||||
|
||||
func (b *backendMock) Engine() consensus.Engine { return nil }
|
||||
|
||||
func (b *backendMock) NewMatcherBackend() filtermaps.MatcherBackend { return nil }
|
||||
|
|
Loading…
Reference in New Issue