core/filtermaps: safe concurrent index update and search
This commit is contained in:
parent
8b5c87e30f
commit
932769e06c
|
@ -1,7 +1,6 @@
|
||||||
package filtermaps
|
package filtermaps
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"crypto/sha256"
|
"crypto/sha256"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"errors"
|
"errors"
|
||||||
|
@ -14,6 +13,7 @@ import (
|
||||||
"github.com/ethereum/go-ethereum/core/rawdb"
|
"github.com/ethereum/go-ethereum/core/rawdb"
|
||||||
"github.com/ethereum/go-ethereum/core/types"
|
"github.com/ethereum/go-ethereum/core/types"
|
||||||
"github.com/ethereum/go-ethereum/ethdb"
|
"github.com/ethereum/go-ethereum/ethdb"
|
||||||
|
"github.com/ethereum/go-ethereum/event"
|
||||||
"github.com/ethereum/go-ethereum/log"
|
"github.com/ethereum/go-ethereum/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -28,6 +28,14 @@ const (
|
||||||
headCacheSize = 8 // maximum number of recent filter maps cached in memory
|
headCacheSize = 8 // maximum number of recent filter maps cached in memory
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// blockchain defines functions required by the FilterMaps log indexer.
|
||||||
|
type blockchain interface {
|
||||||
|
CurrentBlock() *types.Header
|
||||||
|
SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription
|
||||||
|
GetHeader(hash common.Hash, number uint64) *types.Header
|
||||||
|
GetCanonicalHash(number uint64) common.Hash
|
||||||
|
}
|
||||||
|
|
||||||
// FilterMaps is the in-memory representation of the log index structure that is
|
// FilterMaps is the in-memory representation of the log index structure that is
|
||||||
// responsible for building and updating the index according to the canonical
|
// responsible for building and updating the index according to the canonical
|
||||||
// chain.
|
// chain.
|
||||||
|
@ -38,10 +46,10 @@ type FilterMaps struct {
|
||||||
lock sync.RWMutex
|
lock sync.RWMutex
|
||||||
db ethdb.Database
|
db ethdb.Database
|
||||||
closeCh chan chan struct{}
|
closeCh chan chan struct{}
|
||||||
|
|
||||||
filterMapsRange
|
filterMapsRange
|
||||||
chain *core.BlockChain
|
chain blockchain
|
||||||
|
matcherSyncCh chan *FilterMapsMatcherBackend
|
||||||
|
matchers map[*FilterMapsMatcherBackend]struct{}
|
||||||
// filterMapCache caches certain filter maps (headCacheSize most recent maps
|
// filterMapCache caches certain filter maps (headCacheSize most recent maps
|
||||||
// and one tail map) that are expected to be frequently accessed and modified
|
// and one tail map) that are expected to be frequently accessed and modified
|
||||||
// while updating the structure. Note that the set of cached maps depends
|
// while updating the structure. Note that the set of cached maps depends
|
||||||
|
@ -86,7 +94,7 @@ type filterMapsRange struct {
|
||||||
|
|
||||||
// NewFilterMaps creates a new FilterMaps and starts the indexer in order to keep
|
// NewFilterMaps creates a new FilterMaps and starts the indexer in order to keep
|
||||||
// the structure in sync with the given blockchain.
|
// the structure in sync with the given blockchain.
|
||||||
func NewFilterMaps(db ethdb.Database, chain *core.BlockChain) *FilterMaps {
|
func NewFilterMaps(db ethdb.Database, chain blockchain) *FilterMaps {
|
||||||
rs, err := rawdb.ReadFilterMapsRange(db)
|
rs, err := rawdb.ReadFilterMapsRange(db)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Error reading log index range", "error", err)
|
log.Error("Error reading log index range", "error", err)
|
||||||
|
@ -104,6 +112,8 @@ func NewFilterMaps(db ethdb.Database, chain *core.BlockChain) *FilterMaps {
|
||||||
headBlockHash: rs.HeadBlockHash,
|
headBlockHash: rs.HeadBlockHash,
|
||||||
tailParentHash: rs.TailParentHash,
|
tailParentHash: rs.TailParentHash,
|
||||||
},
|
},
|
||||||
|
matcherSyncCh: make(chan *FilterMapsMatcherBackend),
|
||||||
|
matchers: make(map[*FilterMapsMatcherBackend]struct{}),
|
||||||
filterMapCache: make(map[uint32]*filterMap),
|
filterMapCache: make(map[uint32]*filterMap),
|
||||||
blockPtrCache: lru.NewCache[uint32, uint64](1000),
|
blockPtrCache: lru.NewCache[uint32, uint64](1000),
|
||||||
lvPointerCache: lru.NewCache[uint64, uint64](1000),
|
lvPointerCache: lru.NewCache[uint64, uint64](1000),
|
||||||
|
@ -129,46 +139,6 @@ func (f *FilterMaps) Close() {
|
||||||
<-ch
|
<-ch
|
||||||
}
|
}
|
||||||
|
|
||||||
// FilterMapsMatcherBackend implements MatcherBackend.
|
|
||||||
type FilterMapsMatcherBackend FilterMaps
|
|
||||||
|
|
||||||
// GetFilterMapRow returns the given row of the given map. If the row is empty
|
|
||||||
// then a non-nil zero length row is returned.
|
|
||||||
// Note that the returned slices should not be modified, they should be copied
|
|
||||||
// on write.
|
|
||||||
// GetFilterMapRow implements MatcherBackend.
|
|
||||||
func (ff *FilterMapsMatcherBackend) GetFilterMapRow(ctx context.Context, mapIndex, rowIndex uint32) (FilterRow, error) {
|
|
||||||
f := (*FilterMaps)(ff)
|
|
||||||
return f.getFilterMapRow(mapIndex, rowIndex)
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetBlockLvPointer returns the starting log value index where the log values
|
|
||||||
// generated by the given block are located. If blockNumber is beyond the current
|
|
||||||
// head then the first unoccupied log value index is returned.
|
|
||||||
// GetBlockLvPointer implements MatcherBackend.
|
|
||||||
func (ff *FilterMapsMatcherBackend) GetBlockLvPointer(ctx context.Context, blockNumber uint64) (uint64, error) {
|
|
||||||
f := (*FilterMaps)(ff)
|
|
||||||
f.lock.RLock()
|
|
||||||
defer f.lock.RUnlock()
|
|
||||||
|
|
||||||
return f.getBlockLvPointer(blockNumber)
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetLogByLvIndex returns the log at the given log value index. If the index does
|
|
||||||
// not point to the first log value entry of a log then no log and no error are
|
|
||||||
// returned as this can happen when the log value index was a false positive.
|
|
||||||
// Note that this function assumes that the log index structure is consistent
|
|
||||||
// with the canonical chain at the point where the given log value index points.
|
|
||||||
// If this is not the case then an invalid result or an error may be returned.
|
|
||||||
// GetLogByLvIndex implements MatcherBackend.
|
|
||||||
func (ff *FilterMapsMatcherBackend) GetLogByLvIndex(ctx context.Context, lvIndex uint64) (*types.Log, error) {
|
|
||||||
f := (*FilterMaps)(ff)
|
|
||||||
f.lock.RLock()
|
|
||||||
defer f.lock.RUnlock()
|
|
||||||
|
|
||||||
return f.getLogByLvIndex(lvIndex)
|
|
||||||
}
|
|
||||||
|
|
||||||
// reset un-initializes the FilterMaps structure and removes all related data from
|
// reset un-initializes the FilterMaps structure and removes all related data from
|
||||||
// the database.
|
// the database.
|
||||||
// Note that this function assumes that the read/write lock is being held.
|
// Note that this function assumes that the read/write lock is being held.
|
||||||
|
@ -224,6 +194,7 @@ func (f *FilterMaps) setRange(batch ethdb.Batch, newRange filterMapsRange) {
|
||||||
}
|
}
|
||||||
rawdb.WriteFilterMapsRange(batch, rs)
|
rawdb.WriteFilterMapsRange(batch, rs)
|
||||||
f.updateMapCache()
|
f.updateMapCache()
|
||||||
|
f.updateMatchersValidRange()
|
||||||
}
|
}
|
||||||
|
|
||||||
// updateMapCache updates the maps covered by the filterMapCache according to the
|
// updateMapCache updates the maps covered by the filterMapCache according to the
|
||||||
|
@ -266,7 +237,7 @@ func (f *FilterMaps) updateMapCache() {
|
||||||
// Note that this function assumes that the read lock is being held.
|
// Note that this function assumes that the read lock is being held.
|
||||||
func (f *FilterMaps) getLogByLvIndex(lvIndex uint64) (*types.Log, error) {
|
func (f *FilterMaps) getLogByLvIndex(lvIndex uint64) (*types.Log, error) {
|
||||||
if lvIndex < f.tailLvPointer || lvIndex > f.headLvPointer {
|
if lvIndex < f.tailLvPointer || lvIndex > f.headLvPointer {
|
||||||
return nil, errors.New("log value index outside available range")
|
return nil, nil
|
||||||
}
|
}
|
||||||
// find possible block range based on map to block pointers
|
// find possible block range based on map to block pointers
|
||||||
mapIndex := uint32(lvIndex >> logValuesPerMap)
|
mapIndex := uint32(lvIndex >> logValuesPerMap)
|
||||||
|
@ -321,7 +292,7 @@ func (f *FilterMaps) getLogByLvIndex(lvIndex uint64) (*types.Log, error) {
|
||||||
lvPointer += uint64(len(log.Topics) + 1)
|
lvPointer += uint64(len(log.Topics) + 1)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil, errors.New("log value index not found")
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// getFilterMapRow returns the given row of the given map. If the row is empty
|
// getFilterMapRow returns the given row of the given map. If the row is empty
|
||||||
|
|
|
@ -22,30 +22,35 @@ const (
|
||||||
// updateLoop initializes and updates the log index structure according to the
|
// updateLoop initializes and updates the log index structure according to the
|
||||||
// canonical chain.
|
// canonical chain.
|
||||||
func (f *FilterMaps) updateLoop() {
|
func (f *FilterMaps) updateLoop() {
|
||||||
headEventCh := make(chan core.ChainHeadEvent)
|
var (
|
||||||
sub := f.chain.SubscribeChainHeadEvent(headEventCh)
|
headEventCh = make(chan core.ChainHeadEvent)
|
||||||
defer sub.Unsubscribe()
|
sub = f.chain.SubscribeChainHeadEvent(headEventCh)
|
||||||
|
head *types.Header
|
||||||
|
stop bool
|
||||||
|
syncMatcher *FilterMapsMatcherBackend
|
||||||
|
)
|
||||||
|
|
||||||
head := f.chain.CurrentBlock()
|
defer func() {
|
||||||
if head == nil {
|
sub.Unsubscribe()
|
||||||
select {
|
if syncMatcher != nil {
|
||||||
case ev := <-headEventCh:
|
syncMatcher.synced(head)
|
||||||
head = ev.Block.Header()
|
syncMatcher = nil
|
||||||
case ch := <-f.closeCh:
|
|
||||||
close(ch)
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
}
|
}()
|
||||||
fmr := f.getRange()
|
|
||||||
|
|
||||||
var stop bool
|
|
||||||
wait := func() {
|
wait := func() {
|
||||||
|
if syncMatcher != nil {
|
||||||
|
syncMatcher.synced(head)
|
||||||
|
syncMatcher = nil
|
||||||
|
}
|
||||||
if stop {
|
if stop {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
select {
|
select {
|
||||||
case ev := <-headEventCh:
|
case ev := <-headEventCh:
|
||||||
head = ev.Block.Header()
|
head = ev.Block.Header()
|
||||||
|
case syncMatcher = <-f.matcherSyncCh:
|
||||||
|
head = f.chain.CurrentBlock()
|
||||||
case <-time.After(time.Second * 20):
|
case <-time.After(time.Second * 20):
|
||||||
// keep updating log index during syncing
|
// keep updating log index during syncing
|
||||||
head = f.chain.CurrentBlock()
|
head = f.chain.CurrentBlock()
|
||||||
|
@ -54,10 +59,21 @@ func (f *FilterMaps) updateLoop() {
|
||||||
stop = true
|
stop = true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
for head == nil {
|
||||||
|
wait()
|
||||||
|
if stop {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
fmr := f.getRange()
|
||||||
|
|
||||||
for !stop {
|
for !stop {
|
||||||
if !fmr.initialized {
|
if !fmr.initialized {
|
||||||
f.tryInit(head)
|
f.tryInit(head)
|
||||||
|
if syncMatcher != nil {
|
||||||
|
syncMatcher.synced(head)
|
||||||
|
syncMatcher = nil
|
||||||
|
}
|
||||||
fmr = f.getRange()
|
fmr = f.getRange()
|
||||||
if !fmr.initialized {
|
if !fmr.initialized {
|
||||||
wait()
|
wait()
|
||||||
|
@ -73,12 +89,18 @@ func (f *FilterMaps) updateLoop() {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if syncMatcher != nil {
|
||||||
|
syncMatcher.synced(head)
|
||||||
|
syncMatcher = nil
|
||||||
|
}
|
||||||
// log index head is at latest chain head; process tail blocks if possible
|
// log index head is at latest chain head; process tail blocks if possible
|
||||||
f.tryExtendTail(func() bool {
|
f.tryExtendTail(func() bool {
|
||||||
// return true if tail processing needs to be stopped
|
// return true if tail processing needs to be stopped
|
||||||
select {
|
select {
|
||||||
case ev := <-headEventCh:
|
case ev := <-headEventCh:
|
||||||
head = ev.Block.Header()
|
head = ev.Block.Header()
|
||||||
|
case syncMatcher = <-f.matcherSyncCh:
|
||||||
|
head = f.chain.CurrentBlock()
|
||||||
case ch := <-f.closeCh:
|
case ch := <-f.closeCh:
|
||||||
close(ch)
|
close(ch)
|
||||||
stop = true
|
stop = true
|
||||||
|
@ -549,6 +571,9 @@ func (u *updateBatch) makeRevertPoint() (*revertPoint, error) {
|
||||||
// number from memory cache or from the database if available. If no such revert
|
// number from memory cache or from the database if available. If no such revert
|
||||||
// point is available then it returns no result and no error.
|
// point is available then it returns no result and no error.
|
||||||
func (f *FilterMaps) getRevertPoint(blockNumber uint64) (*revertPoint, error) {
|
func (f *FilterMaps) getRevertPoint(blockNumber uint64) (*revertPoint, error) {
|
||||||
|
f.lock.RLock()
|
||||||
|
defer f.lock.RUnlock()
|
||||||
|
|
||||||
if blockNumber > f.headBlockNumber {
|
if blockNumber > f.headBlockNumber {
|
||||||
blockNumber = f.headBlockNumber
|
blockNumber = f.headBlockNumber
|
||||||
}
|
}
|
||||||
|
@ -577,6 +602,9 @@ func (f *FilterMaps) getRevertPoint(blockNumber uint64) (*revertPoint, error) {
|
||||||
|
|
||||||
// revertTo reverts the log index to the given revert point.
|
// revertTo reverts the log index to the given revert point.
|
||||||
func (f *FilterMaps) revertTo(rp *revertPoint) error {
|
func (f *FilterMaps) revertTo(rp *revertPoint) error {
|
||||||
|
f.lock.Lock()
|
||||||
|
defer f.lock.Unlock()
|
||||||
|
|
||||||
batch := f.db.NewBatch()
|
batch := f.db.NewBatch()
|
||||||
afterLastMap := uint32((f.headLvPointer + valuesPerMap - 1) >> logValuesPerMap)
|
afterLastMap := uint32((f.headLvPointer + valuesPerMap - 1) >> logValuesPerMap)
|
||||||
if rp.mapIndex >= afterLastMap {
|
if rp.mapIndex >= afterLastMap {
|
||||||
|
|
|
@ -2,6 +2,7 @@ package filtermaps
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"math"
|
"math"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
@ -18,13 +19,121 @@ type MatcherBackend interface {
|
||||||
GetBlockLvPointer(ctx context.Context, blockNumber uint64) (uint64, error)
|
GetBlockLvPointer(ctx context.Context, blockNumber uint64) (uint64, error)
|
||||||
GetFilterMapRow(ctx context.Context, mapIndex, rowIndex uint32) (FilterRow, error)
|
GetFilterMapRow(ctx context.Context, mapIndex, rowIndex uint32) (FilterRow, error)
|
||||||
GetLogByLvIndex(ctx context.Context, lvIndex uint64) (*types.Log, error)
|
GetLogByLvIndex(ctx context.Context, lvIndex uint64) (*types.Log, error)
|
||||||
|
SyncLogIndex(ctx context.Context) (SyncRange, error)
|
||||||
|
Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
// SyncRange is returned by MatcherBackend.SyncLogIndex. It contains the latest
|
||||||
|
// chain head, the indexed range that is currently consistent with the chain
|
||||||
|
// and the valid range that has not been changed and has been consistent with
|
||||||
|
// all states of the chain since the previous SyncLogIndex or the creation of
|
||||||
|
// the matcher backend.
|
||||||
|
type SyncRange struct {
|
||||||
|
Head *types.Header
|
||||||
|
// block range where the index has not changed since the last matcher sync
|
||||||
|
// and therefore the set of matches found in this region is guaranteed to
|
||||||
|
// be valid and complete.
|
||||||
|
Valid bool
|
||||||
|
FirstValid, LastValid uint64
|
||||||
|
// block range indexed according to the given chain head.
|
||||||
|
Indexed bool
|
||||||
|
FirstIndexed, LastIndexed uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetPotentialMatches returns a list of logs that are potential matches for the
|
// GetPotentialMatches returns a list of logs that are potential matches for the
|
||||||
// given filter criteria. Note that the returned list may still contain false
|
// given filter criteria. If parts of the requested range are not indexed then
|
||||||
// positives.
|
// an error is returned. If parts of the requested range are changed during the
|
||||||
//TODO add protection against reorgs during search
|
// search process then potentially incorrect logs are discarded and searched
|
||||||
func GetPotentialMatches(ctx context.Context, backend MatcherBackend, firstBlock, lastBlock uint64, addresses []common.Address, topics [][]common.Hash) ([]*types.Log, error) {
|
// again, ensuring that the returned results are always consistent with the latest
|
||||||
|
// state of the chain.
|
||||||
|
// If firstBlock or lastBlock are bigger than the head block number then they are
|
||||||
|
// substituted with the latest head of the chain, ensuring that a search until
|
||||||
|
// the head block is still consistent with the latest canonical chain if a new
|
||||||
|
// head has been added during the process.
|
||||||
|
// Note that the returned list may still contain false positives.
|
||||||
|
func GetPotentialMatches(ctx context.Context, backend MatcherBackend, firstBlock, lastBlock uint64, addresses []common.Address, topics [][]common.Hash) ([]*types.Log, *types.Header, uint64, uint64, error) {
|
||||||
|
if firstBlock > lastBlock {
|
||||||
|
return nil, nil, 0, 0, errors.New("invalid search range")
|
||||||
|
}
|
||||||
|
// enforce a consistent state before starting the search in order to be able
|
||||||
|
// to determine valid range later
|
||||||
|
syncRange, err := backend.SyncLogIndex(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, 0, 0, err
|
||||||
|
}
|
||||||
|
headBlock := syncRange.Head.Number.Uint64() // Head is guaranteed != nil
|
||||||
|
// if haveMatches == true then matches correspond to the block number range
|
||||||
|
// between matchFirst and matchLast
|
||||||
|
var (
|
||||||
|
matches []*types.Log
|
||||||
|
haveMatches bool
|
||||||
|
matchFirst, matchLast uint64
|
||||||
|
)
|
||||||
|
for !haveMatches || (matchLast < lastBlock && matchLast < headBlock) {
|
||||||
|
// determine range to be searched; for simplicity we only extend the most
|
||||||
|
// recent end of the existing match set by matching between searchFirst
|
||||||
|
// and searchLast.
|
||||||
|
searchFirst, searchLast := firstBlock, lastBlock
|
||||||
|
if searchFirst > headBlock {
|
||||||
|
searchFirst = headBlock
|
||||||
|
}
|
||||||
|
if searchLast > headBlock {
|
||||||
|
searchLast = headBlock
|
||||||
|
}
|
||||||
|
if haveMatches && matchFirst != searchFirst {
|
||||||
|
// searchFirst might change if firstBlock > headBlock
|
||||||
|
matches, haveMatches = nil, false
|
||||||
|
}
|
||||||
|
if haveMatches && matchLast >= searchFirst {
|
||||||
|
searchFirst = matchLast + 1
|
||||||
|
}
|
||||||
|
// check if indexed range covers the requested range
|
||||||
|
if !syncRange.Indexed || syncRange.FirstIndexed > searchFirst || syncRange.LastIndexed < searchLast {
|
||||||
|
return nil, nil, 0, 0, errors.New("log index not available for requested range")
|
||||||
|
}
|
||||||
|
// search for matches in the required range
|
||||||
|
newMatches, err := getPotentialMatches(ctx, backend, searchFirst, searchLast, addresses, topics)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, 0, 0, err
|
||||||
|
}
|
||||||
|
// enforce a consistent state again in order to determine the guaranteed
|
||||||
|
// valid range in which the log index has not been changed since the last
|
||||||
|
// sync.
|
||||||
|
syncRange, err = backend.SyncLogIndex(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, 0, 0, err
|
||||||
|
}
|
||||||
|
headBlock = syncRange.Head.Number.Uint64()
|
||||||
|
// return with error if the beginning of the recently searched range might
|
||||||
|
// be invalid due to removed log index
|
||||||
|
if !syncRange.Valid || syncRange.FirstValid > searchFirst || syncRange.LastValid < searchFirst {
|
||||||
|
return nil, nil, 0, 0, errors.New("log index not available for requested range")
|
||||||
|
}
|
||||||
|
// roll back most recent matches if they are not covered by the guaranteed
|
||||||
|
// valid range
|
||||||
|
if syncRange.LastValid < searchLast {
|
||||||
|
for len(newMatches) > 0 && newMatches[len(newMatches)-1].BlockNumber > syncRange.LastValid {
|
||||||
|
newMatches = newMatches[:len(newMatches)-1]
|
||||||
|
}
|
||||||
|
searchLast = syncRange.LastValid
|
||||||
|
}
|
||||||
|
// append new matches to existing ones if the were any
|
||||||
|
if haveMatches {
|
||||||
|
matches = append(matches, newMatches...)
|
||||||
|
} else {
|
||||||
|
matches, haveMatches = newMatches, true
|
||||||
|
}
|
||||||
|
matchLast = searchLast
|
||||||
|
}
|
||||||
|
return matches, syncRange.Head, firstBlock, matchLast, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// getPotentialMatches returns a list of logs that are potential matches for the
|
||||||
|
// given filter criteria. If parts of the log index in the searched range are
|
||||||
|
// missing or changed during the search process then the resulting logs belonging
|
||||||
|
// to that block range might be missing or incorrect.
|
||||||
|
// Also note that the returned list may contain false positives.
|
||||||
|
func getPotentialMatches(ctx context.Context, backend MatcherBackend, firstBlock, lastBlock uint64, addresses []common.Address, topics [][]common.Hash) ([]*types.Log, error) {
|
||||||
// find the log value index range to search
|
// find the log value index range to search
|
||||||
firstIndex, err := backend.GetBlockLvPointer(ctx, firstBlock)
|
firstIndex, err := backend.GetBlockLvPointer(ctx, firstBlock)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -0,0 +1,158 @@
|
||||||
|
package filtermaps
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
|
||||||
|
"github.com/ethereum/go-ethereum/core/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
// FilterMapsMatcherBackend implements MatcherBackend.
|
||||||
|
type FilterMapsMatcherBackend struct {
|
||||||
|
f *FilterMaps
|
||||||
|
valid bool
|
||||||
|
firstValid, lastValid uint64
|
||||||
|
syncCh chan SyncRange
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewMatcherBackend returns a FilterMapsMatcherBackend after registering it in
|
||||||
|
// the active matcher set.
|
||||||
|
// Note that Close should always be called when the matcher is no longer used.
|
||||||
|
func (f *FilterMaps) NewMatcherBackend() *FilterMapsMatcherBackend {
|
||||||
|
f.lock.Lock()
|
||||||
|
defer f.lock.Unlock()
|
||||||
|
|
||||||
|
fm := &FilterMapsMatcherBackend{
|
||||||
|
f: f,
|
||||||
|
valid: f.initialized,
|
||||||
|
firstValid: f.tailBlockNumber,
|
||||||
|
lastValid: f.headBlockNumber,
|
||||||
|
}
|
||||||
|
f.matchers[fm] = struct{}{}
|
||||||
|
return fm
|
||||||
|
}
|
||||||
|
|
||||||
|
// updateMatchersValidRange iterates through active matchers and limits their
|
||||||
|
// valid range with the current indexed range. This function should be called
|
||||||
|
// whenever a part of the log index has been removed, before adding new blocks
|
||||||
|
// to it.
|
||||||
|
func (f *FilterMaps) updateMatchersValidRange() {
|
||||||
|
for fm := range f.matchers {
|
||||||
|
if !f.initialized {
|
||||||
|
fm.valid = false
|
||||||
|
}
|
||||||
|
if !fm.valid {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if fm.firstValid < f.tailBlockNumber {
|
||||||
|
fm.firstValid = f.tailBlockNumber
|
||||||
|
}
|
||||||
|
if fm.lastValid > f.headBlockNumber {
|
||||||
|
fm.lastValid = f.headBlockNumber
|
||||||
|
}
|
||||||
|
if fm.firstValid > fm.lastValid {
|
||||||
|
fm.valid = false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close removes the matcher from the set of active matchers and ensures that
|
||||||
|
// any SyncLogIndex calls are cancelled.
|
||||||
|
func (fm *FilterMapsMatcherBackend) Close() {
|
||||||
|
fm.f.lock.Lock()
|
||||||
|
defer fm.f.lock.Unlock()
|
||||||
|
|
||||||
|
delete(fm.f.matchers, fm)
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetFilterMapRow returns the given row of the given map. If the row is empty
|
||||||
|
// then a non-nil zero length row is returned.
|
||||||
|
// Note that the returned slices should not be modified, they should be copied
|
||||||
|
// on write.
|
||||||
|
// GetFilterMapRow implements MatcherBackend.
|
||||||
|
func (fm *FilterMapsMatcherBackend) GetFilterMapRow(ctx context.Context, mapIndex, rowIndex uint32) (FilterRow, error) {
|
||||||
|
return fm.f.getFilterMapRow(mapIndex, rowIndex)
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetBlockLvPointer returns the starting log value index where the log values
|
||||||
|
// generated by the given block are located. If blockNumber is beyond the current
|
||||||
|
// head then the first unoccupied log value index is returned.
|
||||||
|
// GetBlockLvPointer implements MatcherBackend.
|
||||||
|
func (fm *FilterMapsMatcherBackend) GetBlockLvPointer(ctx context.Context, blockNumber uint64) (uint64, error) {
|
||||||
|
fm.f.lock.RLock()
|
||||||
|
defer fm.f.lock.RUnlock()
|
||||||
|
|
||||||
|
return fm.f.getBlockLvPointer(blockNumber)
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetLogByLvIndex returns the log at the given log value index.
|
||||||
|
// Note that this function assumes that the log index structure is consistent
|
||||||
|
// with the canonical chain at the point where the given log value index points.
|
||||||
|
// If this is not the case then an invalid result may be returned or certain
|
||||||
|
// logs might not be returned at all.
|
||||||
|
// No error is returned though because of an inconsistency between the chain and
|
||||||
|
// the log index. It is the caller's responsibility to verify this consistency
|
||||||
|
// using SyncLogIndex and re-process certain blocks if necessary.
|
||||||
|
// GetLogByLvIndex implements MatcherBackend.
|
||||||
|
func (fm *FilterMapsMatcherBackend) GetLogByLvIndex(ctx context.Context, lvIndex uint64) (*types.Log, error) {
|
||||||
|
fm.f.lock.RLock()
|
||||||
|
defer fm.f.lock.RUnlock()
|
||||||
|
|
||||||
|
return fm.f.getLogByLvIndex(lvIndex)
|
||||||
|
}
|
||||||
|
|
||||||
|
// synced signals to the matcher that has triggered a synchronisation that it
|
||||||
|
// has been finished and the log index is consistent with the chain head passed
|
||||||
|
// as a parameter.
|
||||||
|
// Note that if the log index head was far behind the chain head then it might not
|
||||||
|
// be synced up to the given head in a single step. Still, the latest chain head
|
||||||
|
// should be passed as a parameter and the existing log index should be consistent
|
||||||
|
// with that chain.
|
||||||
|
func (fm *FilterMapsMatcherBackend) synced(head *types.Header) {
|
||||||
|
fm.f.lock.Lock()
|
||||||
|
defer fm.f.lock.Unlock()
|
||||||
|
|
||||||
|
fm.syncCh <- SyncRange{
|
||||||
|
Head: head,
|
||||||
|
Valid: fm.valid,
|
||||||
|
FirstValid: fm.firstValid,
|
||||||
|
LastValid: fm.lastValid,
|
||||||
|
Indexed: fm.f.initialized,
|
||||||
|
FirstIndexed: fm.f.tailBlockNumber,
|
||||||
|
LastIndexed: fm.f.headBlockNumber,
|
||||||
|
}
|
||||||
|
fm.valid = fm.f.initialized
|
||||||
|
fm.firstValid = fm.f.tailBlockNumber
|
||||||
|
fm.lastValid = fm.f.headBlockNumber
|
||||||
|
fm.syncCh = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// SyncLogIndex ensures that the log index is consistent with the current state
|
||||||
|
// of the chain (note that it may or may not be actually synced up to the head).
|
||||||
|
// It blocks until this state is achieved.
|
||||||
|
// If successful, it returns a SyncRange that contains the latest chain head,
|
||||||
|
// the indexed range that is currently consistent with the chain and the valid
|
||||||
|
// range that has not been changed and has been consistent with all states of the
|
||||||
|
// chain since the previous SyncLogIndex or the creation of the matcher backend.
|
||||||
|
func (fm *FilterMapsMatcherBackend) SyncLogIndex(ctx context.Context) (SyncRange, error) {
|
||||||
|
// add SyncRange return channel, ensuring that
|
||||||
|
syncCh := make(chan SyncRange, 1)
|
||||||
|
fm.f.lock.Lock()
|
||||||
|
fm.syncCh = syncCh
|
||||||
|
fm.f.lock.Unlock()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case fm.f.matcherSyncCh <- fm:
|
||||||
|
case <-ctx.Done():
|
||||||
|
return SyncRange{}, ctx.Err()
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case vr := <-syncCh:
|
||||||
|
if vr.Head == nil {
|
||||||
|
return SyncRange{}, errors.New("canonical chain head not available")
|
||||||
|
}
|
||||||
|
return vr, nil
|
||||||
|
case <-ctx.Done():
|
||||||
|
return SyncRange{}, ctx.Err()
|
||||||
|
}
|
||||||
|
}
|
|
@ -45,7 +45,6 @@ import (
|
||||||
|
|
||||||
// EthAPIBackend implements ethapi.Backend and tracers.Backend for full nodes
|
// EthAPIBackend implements ethapi.Backend and tracers.Backend for full nodes
|
||||||
type EthAPIBackend struct {
|
type EthAPIBackend struct {
|
||||||
*filtermaps.FilterMapsMatcherBackend
|
|
||||||
extRPCEnabled bool
|
extRPCEnabled bool
|
||||||
allowUnprotectedTxs bool
|
allowUnprotectedTxs bool
|
||||||
eth *Ethereum
|
eth *Ethereum
|
||||||
|
@ -413,6 +412,10 @@ func (b *EthAPIBackend) ServiceFilter(ctx context.Context, session *bloombits.Ma
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (b *EthAPIBackend) NewMatcherBackend() filtermaps.MatcherBackend {
|
||||||
|
return b.eth.filterMaps.NewMatcherBackend()
|
||||||
|
}
|
||||||
|
|
||||||
func (b *EthAPIBackend) Engine() consensus.Engine {
|
func (b *EthAPIBackend) Engine() consensus.Engine {
|
||||||
return b.eth.engine
|
return b.eth.engine
|
||||||
}
|
}
|
||||||
|
|
|
@ -261,7 +261,6 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
|
||||||
eth.miner.SetExtra(makeExtraData(config.Miner.ExtraData))
|
eth.miner.SetExtra(makeExtraData(config.Miner.ExtraData))
|
||||||
|
|
||||||
eth.APIBackend = &EthAPIBackend{
|
eth.APIBackend = &EthAPIBackend{
|
||||||
FilterMapsMatcherBackend: (*filtermaps.FilterMapsMatcherBackend)(eth.filterMaps),
|
|
||||||
extRPCEnabled: stack.Config().ExtRPCEnabled(),
|
extRPCEnabled: stack.Config().ExtRPCEnabled(),
|
||||||
allowUnprotectedTxs: stack.Config().AllowUnprotectedTxs,
|
allowUnprotectedTxs: stack.Config().AllowUnprotectedTxs,
|
||||||
eth: eth,
|
eth: eth,
|
||||||
|
|
|
@ -19,10 +19,8 @@ package filters
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"math"
|
||||||
"math/big"
|
"math/big"
|
||||||
|
|
||||||
//"reflect"
|
|
||||||
"slices"
|
"slices"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -30,6 +28,7 @@ import (
|
||||||
"github.com/ethereum/go-ethereum/core/bloombits"
|
"github.com/ethereum/go-ethereum/core/bloombits"
|
||||||
"github.com/ethereum/go-ethereum/core/filtermaps"
|
"github.com/ethereum/go-ethereum/core/filtermaps"
|
||||||
"github.com/ethereum/go-ethereum/core/types"
|
"github.com/ethereum/go-ethereum/core/types"
|
||||||
|
"github.com/ethereum/go-ethereum/log"
|
||||||
"github.com/ethereum/go-ethereum/rpc"
|
"github.com/ethereum/go-ethereum/rpc"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -120,29 +119,26 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
resolveSpecial := func(number int64) (int64, error) {
|
resolveSpecial := func(number int64) (int64, error) {
|
||||||
var hdr *types.Header
|
|
||||||
switch number {
|
switch number {
|
||||||
case rpc.LatestBlockNumber.Int64(), rpc.PendingBlockNumber.Int64():
|
case rpc.LatestBlockNumber.Int64(), rpc.PendingBlockNumber.Int64():
|
||||||
// we should return head here since we've already captured
|
// we should return head here since we've already captured
|
||||||
// that we need to get the pending logs in the pending boolean above
|
// that we need to get the pending logs in the pending boolean above
|
||||||
hdr, _ = f.sys.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber)
|
return math.MaxInt64, nil
|
||||||
if hdr == nil {
|
|
||||||
return 0, errors.New("latest header not found")
|
|
||||||
}
|
|
||||||
case rpc.FinalizedBlockNumber.Int64():
|
case rpc.FinalizedBlockNumber.Int64():
|
||||||
hdr, _ = f.sys.backend.HeaderByNumber(ctx, rpc.FinalizedBlockNumber)
|
hdr, _ := f.sys.backend.HeaderByNumber(ctx, rpc.FinalizedBlockNumber)
|
||||||
if hdr == nil {
|
if hdr == nil {
|
||||||
return 0, errors.New("finalized header not found")
|
return 0, errors.New("finalized header not found")
|
||||||
}
|
}
|
||||||
|
return hdr.Number.Int64(), nil
|
||||||
case rpc.SafeBlockNumber.Int64():
|
case rpc.SafeBlockNumber.Int64():
|
||||||
hdr, _ = f.sys.backend.HeaderByNumber(ctx, rpc.SafeBlockNumber)
|
hdr, _ := f.sys.backend.HeaderByNumber(ctx, rpc.SafeBlockNumber)
|
||||||
if hdr == nil {
|
if hdr == nil {
|
||||||
return 0, errors.New("safe header not found")
|
return 0, errors.New("safe header not found")
|
||||||
}
|
}
|
||||||
|
return hdr.Number.Int64(), nil
|
||||||
default:
|
default:
|
||||||
return number, nil
|
return number, nil
|
||||||
}
|
}
|
||||||
return hdr.Number.Int64(), nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
|
@ -155,26 +151,11 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
logs, err := filtermaps.GetPotentialMatches(ctx, f.sys.backend, uint64(f.begin), uint64(f.end), f.addresses, f.topics)
|
mb := f.sys.backend.NewMatcherBackend()
|
||||||
|
logs, _, _, _, err := filtermaps.GetPotentialMatches(ctx, mb, uint64(f.begin), uint64(f.end), f.addresses, f.topics)
|
||||||
|
mb.Close()
|
||||||
fmLogs := filterLogs(logs, nil, nil, f.addresses, f.topics)
|
fmLogs := filterLogs(logs, nil, nil, f.addresses, f.topics)
|
||||||
fmt.Println("filtermaps (new) runtime", time.Since(start), "true matches", len(fmLogs), "false positives", len(logs)-len(fmLogs))
|
log.Debug("Finished log search", "run time", time.Since(start), "true matches", len(fmLogs), "false positives", len(logs)-len(fmLogs))
|
||||||
|
|
||||||
//TODO remove
|
|
||||||
/*f.bbMatchCount = 0
|
|
||||||
start = time.Now()
|
|
||||||
logChan, errChan := f.rangeLogsAsync(ctx)
|
|
||||||
var bbLogs []*types.Log
|
|
||||||
loop:
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case log := <-logChan:
|
|
||||||
bbLogs = append(bbLogs, log)
|
|
||||||
case <-errChan:
|
|
||||||
break loop
|
|
||||||
}
|
|
||||||
}
|
|
||||||
fmt.Println("bloombits (old) runtime", time.Since(start), "true matches", len(bbLogs), "false positives", f.bbMatchCount-uint64(len(bbLogs)))
|
|
||||||
fmt.Println("DeepEqual", reflect.DeepEqual(fmLogs, bbLogs))*/
|
|
||||||
return fmLogs, err
|
return fmLogs, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -73,9 +73,7 @@ type Backend interface {
|
||||||
BloomStatus() (uint64, uint64)
|
BloomStatus() (uint64, uint64)
|
||||||
ServiceFilter(ctx context.Context, session *bloombits.MatcherSession)
|
ServiceFilter(ctx context.Context, session *bloombits.MatcherSession)
|
||||||
|
|
||||||
GetBlockLvPointer(ctx context.Context, blockNumber uint64) (uint64, error)
|
NewMatcherBackend() filtermaps.MatcherBackend
|
||||||
GetFilterMapRow(ctx context.Context, mapIndex, rowIndex uint32) (filtermaps.FilterRow, error)
|
|
||||||
GetLogByLvIndex(ctx context.Context, lvIndex uint64) (*types.Log, error)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// FilterSystem holds resources shared by all filters.
|
// FilterSystem holds resources shared by all filters.
|
||||||
|
|
|
@ -97,9 +97,7 @@ type Backend interface {
|
||||||
BloomStatus() (uint64, uint64)
|
BloomStatus() (uint64, uint64)
|
||||||
ServiceFilter(ctx context.Context, session *bloombits.MatcherSession)
|
ServiceFilter(ctx context.Context, session *bloombits.MatcherSession)
|
||||||
|
|
||||||
GetBlockLvPointer(ctx context.Context, blockNumber uint64) (uint64, error)
|
NewMatcherBackend() filtermaps.MatcherBackend
|
||||||
GetFilterMapRow(ctx context.Context, mapIndex, rowIndex uint32) (filtermaps.FilterRow, error)
|
|
||||||
GetLogByLvIndex(ctx context.Context, lvIndex uint64) (*types.Log, error)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetAPIs(apiBackend Backend) []rpc.API {
|
func GetAPIs(apiBackend Backend) []rpc.API {
|
||||||
|
|
Loading…
Reference in New Issue