core/filtermaps: moved math stuff to separate file, added Params
This commit is contained in:
parent
39ab872d17
commit
950ca52de8
|
@ -1,10 +1,7 @@
|
||||||
package filtermaps
|
package filtermaps
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"crypto/sha256"
|
|
||||||
"encoding/binary"
|
|
||||||
"errors"
|
"errors"
|
||||||
"sort"
|
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -18,16 +15,7 @@ import (
|
||||||
"github.com/ethereum/go-ethereum/log"
|
"github.com/ethereum/go-ethereum/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const headCacheSize = 8 // maximum number of recent filter maps cached in memory
|
||||||
logMapHeight = 12 // log2(mapHeight)
|
|
||||||
mapHeight = 1 << logMapHeight // filter map height (number of rows)
|
|
||||||
logMapsPerEpoch = 6 // log2(mmapsPerEpochapsPerEpoch)
|
|
||||||
mapsPerEpoch = 1 << logMapsPerEpoch // number of maps in an epoch
|
|
||||||
logValuesPerMap = 16 // log2(logValuesPerMap)
|
|
||||||
valuesPerMap = 1 << logValuesPerMap // number of log values marked on each filter map
|
|
||||||
|
|
||||||
headCacheSize = 8 // maximum number of recent filter maps cached in memory
|
|
||||||
)
|
|
||||||
|
|
||||||
// blockchain defines functions required by the FilterMaps log indexer.
|
// blockchain defines functions required by the FilterMaps log indexer.
|
||||||
type blockchain interface {
|
type blockchain interface {
|
||||||
|
@ -51,6 +39,7 @@ type FilterMaps struct {
|
||||||
history uint64
|
history uint64
|
||||||
noHistory bool
|
noHistory bool
|
||||||
|
|
||||||
|
Params
|
||||||
filterMapsRange
|
filterMapsRange
|
||||||
chain blockchain
|
chain blockchain
|
||||||
matcherSyncCh chan *FilterMapsMatcherBackend
|
matcherSyncCh chan *FilterMapsMatcherBackend
|
||||||
|
@ -60,7 +49,7 @@ type FilterMaps struct {
|
||||||
// while updating the structure. Note that the set of cached maps depends
|
// while updating the structure. Note that the set of cached maps depends
|
||||||
// only on filterMapsRange and rows of other maps are not cached here.
|
// only on filterMapsRange and rows of other maps are not cached here.
|
||||||
filterMapLock sync.Mutex
|
filterMapLock sync.Mutex
|
||||||
filterMapCache map[uint32]*filterMap
|
filterMapCache map[uint32]filterMap
|
||||||
blockPtrCache *lru.Cache[uint32, uint64]
|
blockPtrCache *lru.Cache[uint32, uint64]
|
||||||
lvPointerCache *lru.Cache[uint64, uint64]
|
lvPointerCache *lru.Cache[uint64, uint64]
|
||||||
revertPoints map[uint64]*revertPoint
|
revertPoints map[uint64]*revertPoint
|
||||||
|
@ -73,7 +62,7 @@ type FilterMaps struct {
|
||||||
// It can be used as a memory cache or an overlay while preparing a batch of
|
// It can be used as a memory cache or an overlay while preparing a batch of
|
||||||
// changes to the structure. In either case a nil value should be interpreted
|
// changes to the structure. In either case a nil value should be interpreted
|
||||||
// as transparent (uncached/unchanged).
|
// as transparent (uncached/unchanged).
|
||||||
type filterMap [mapHeight]FilterRow
|
type filterMap []FilterRow
|
||||||
|
|
||||||
// FilterRow encodes a single row of a filter map as a list of column indices.
|
// FilterRow encodes a single row of a filter map as a list of column indices.
|
||||||
// Note that the values are always stored in the same order as they were added
|
// Note that the values are always stored in the same order as they were added
|
||||||
|
@ -105,17 +94,19 @@ type filterMapsRange struct {
|
||||||
|
|
||||||
// NewFilterMaps creates a new FilterMaps and starts the indexer in order to keep
|
// NewFilterMaps creates a new FilterMaps and starts the indexer in order to keep
|
||||||
// the structure in sync with the given blockchain.
|
// the structure in sync with the given blockchain.
|
||||||
func NewFilterMaps(db ethdb.Database, chain blockchain, history uint64, noHistory bool) *FilterMaps {
|
func NewFilterMaps(db ethdb.Database, chain blockchain, params Params, history uint64, noHistory bool) *FilterMaps {
|
||||||
rs, err := rawdb.ReadFilterMapsRange(db)
|
rs, err := rawdb.ReadFilterMapsRange(db)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Error reading log index range", "error", err)
|
log.Error("Error reading log index range", "error", err)
|
||||||
}
|
}
|
||||||
|
params.deriveFields()
|
||||||
fm := &FilterMaps{
|
fm := &FilterMaps{
|
||||||
db: db,
|
db: db,
|
||||||
chain: chain,
|
chain: chain,
|
||||||
closeCh: make(chan struct{}),
|
closeCh: make(chan struct{}),
|
||||||
history: history,
|
history: history,
|
||||||
noHistory: noHistory,
|
noHistory: noHistory,
|
||||||
|
Params: params,
|
||||||
filterMapsRange: filterMapsRange{
|
filterMapsRange: filterMapsRange{
|
||||||
initialized: rs.Initialized,
|
initialized: rs.Initialized,
|
||||||
headLvPointer: rs.HeadLvPointer,
|
headLvPointer: rs.HeadLvPointer,
|
||||||
|
@ -127,7 +118,7 @@ func NewFilterMaps(db ethdb.Database, chain blockchain, history uint64, noHistor
|
||||||
},
|
},
|
||||||
matcherSyncCh: make(chan *FilterMapsMatcherBackend),
|
matcherSyncCh: make(chan *FilterMapsMatcherBackend),
|
||||||
matchers: make(map[*FilterMapsMatcherBackend]struct{}),
|
matchers: make(map[*FilterMapsMatcherBackend]struct{}),
|
||||||
filterMapCache: make(map[uint32]*filterMap),
|
filterMapCache: make(map[uint32]filterMap),
|
||||||
blockPtrCache: lru.NewCache[uint32, uint64](1000),
|
blockPtrCache: lru.NewCache[uint32, uint64](1000),
|
||||||
lvPointerCache: lru.NewCache[uint64, uint64](1000),
|
lvPointerCache: lru.NewCache[uint64, uint64](1000),
|
||||||
revertPoints: make(map[uint64]*revertPoint),
|
revertPoints: make(map[uint64]*revertPoint),
|
||||||
|
@ -154,7 +145,7 @@ func (f *FilterMaps) Close() {
|
||||||
func (f *FilterMaps) reset() bool {
|
func (f *FilterMaps) reset() bool {
|
||||||
f.lock.Lock()
|
f.lock.Lock()
|
||||||
f.filterMapsRange = filterMapsRange{}
|
f.filterMapsRange = filterMapsRange{}
|
||||||
f.filterMapCache = make(map[uint32]*filterMap)
|
f.filterMapCache = make(map[uint32]filterMap)
|
||||||
f.revertPoints = make(map[uint64]*revertPoint)
|
f.revertPoints = make(map[uint64]*revertPoint)
|
||||||
f.blockPtrCache.Purge()
|
f.blockPtrCache.Purge()
|
||||||
f.lvPointerCache.Purge()
|
f.lvPointerCache.Purge()
|
||||||
|
@ -242,21 +233,21 @@ func (f *FilterMaps) updateMapCache() {
|
||||||
f.filterMapLock.Lock()
|
f.filterMapLock.Lock()
|
||||||
defer f.filterMapLock.Unlock()
|
defer f.filterMapLock.Unlock()
|
||||||
|
|
||||||
newFilterMapCache := make(map[uint32]*filterMap)
|
newFilterMapCache := make(map[uint32]filterMap)
|
||||||
firstMap, afterLastMap := uint32(f.tailBlockLvPointer>>logValuesPerMap), uint32((f.headLvPointer+valuesPerMap-1)>>logValuesPerMap)
|
firstMap, afterLastMap := uint32(f.tailBlockLvPointer>>f.logValuesPerMap), uint32((f.headLvPointer+f.valuesPerMap-1)>>f.logValuesPerMap)
|
||||||
headCacheFirst := firstMap + 1
|
headCacheFirst := firstMap + 1
|
||||||
if afterLastMap > headCacheFirst+headCacheSize {
|
if afterLastMap > headCacheFirst+headCacheSize {
|
||||||
headCacheFirst = afterLastMap - headCacheSize
|
headCacheFirst = afterLastMap - headCacheSize
|
||||||
}
|
}
|
||||||
fm := f.filterMapCache[firstMap]
|
fm := f.filterMapCache[firstMap]
|
||||||
if fm == nil {
|
if fm == nil {
|
||||||
fm = new(filterMap)
|
fm = make(filterMap, f.mapHeight)
|
||||||
}
|
}
|
||||||
newFilterMapCache[firstMap] = fm
|
newFilterMapCache[firstMap] = fm
|
||||||
for mapIndex := headCacheFirst; mapIndex < afterLastMap; mapIndex++ {
|
for mapIndex := headCacheFirst; mapIndex < afterLastMap; mapIndex++ {
|
||||||
fm := f.filterMapCache[mapIndex]
|
fm := f.filterMapCache[mapIndex]
|
||||||
if fm == nil {
|
if fm == nil {
|
||||||
fm = new(filterMap)
|
fm = make(filterMap, f.mapHeight)
|
||||||
}
|
}
|
||||||
newFilterMapCache[mapIndex] = fm
|
newFilterMapCache[mapIndex] = fm
|
||||||
}
|
}
|
||||||
|
@ -275,7 +266,7 @@ func (f *FilterMaps) getLogByLvIndex(lvIndex uint64) (*types.Log, error) {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
// find possible block range based on map to block pointers
|
// find possible block range based on map to block pointers
|
||||||
mapIndex := uint32(lvIndex >> logValuesPerMap)
|
mapIndex := uint32(lvIndex >> f.logValuesPerMap)
|
||||||
firstBlockNumber, err := f.getMapBlockPtr(mapIndex)
|
firstBlockNumber, err := f.getMapBlockPtr(mapIndex)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -284,7 +275,7 @@ func (f *FilterMaps) getLogByLvIndex(lvIndex uint64) (*types.Log, error) {
|
||||||
firstBlockNumber = f.tailBlockNumber
|
firstBlockNumber = f.tailBlockNumber
|
||||||
}
|
}
|
||||||
var lastBlockNumber uint64
|
var lastBlockNumber uint64
|
||||||
if mapIndex+1 < uint32((f.headLvPointer+valuesPerMap-1)>>logValuesPerMap) {
|
if mapIndex+1 < uint32((f.headLvPointer+f.valuesPerMap-1)>>f.logValuesPerMap) {
|
||||||
lastBlockNumber, err = f.getMapBlockPtr(mapIndex + 1)
|
lastBlockNumber, err = f.getMapBlockPtr(mapIndex + 1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -345,7 +336,7 @@ func (f *FilterMaps) getFilterMapRow(mapIndex, rowIndex uint32) (FilterRow, erro
|
||||||
if fm != nil && fm[rowIndex] != nil {
|
if fm != nil && fm[rowIndex] != nil {
|
||||||
return fm[rowIndex], nil
|
return fm[rowIndex], nil
|
||||||
}
|
}
|
||||||
row, err := rawdb.ReadFilterMapRow(f.db, mapRowIndex(mapIndex, rowIndex))
|
row, err := rawdb.ReadFilterMapRow(f.db, f.mapRowIndex(mapIndex, rowIndex))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -364,9 +355,9 @@ func (f *FilterMaps) storeFilterMapRow(batch ethdb.Batch, mapIndex, rowIndex uin
|
||||||
defer f.filterMapLock.Unlock()
|
defer f.filterMapLock.Unlock()
|
||||||
|
|
||||||
if fm := f.filterMapCache[mapIndex]; fm != nil {
|
if fm := f.filterMapCache[mapIndex]; fm != nil {
|
||||||
(*fm)[rowIndex] = row
|
fm[rowIndex] = row
|
||||||
}
|
}
|
||||||
rawdb.WriteFilterMapRow(batch, mapRowIndex(mapIndex, rowIndex), []uint32(row))
|
rawdb.WriteFilterMapRow(batch, f.mapRowIndex(mapIndex, rowIndex), []uint32(row))
|
||||||
}
|
}
|
||||||
|
|
||||||
// mapRowIndex calculates the unified storage index where the given row of the
|
// mapRowIndex calculates the unified storage index where the given row of the
|
||||||
|
@ -375,9 +366,9 @@ func (f *FilterMaps) storeFilterMapRow(batch ethdb.Batch, mapIndex, rowIndex uin
|
||||||
// same data proximity reasons it is also suitable for database representation.
|
// same data proximity reasons it is also suitable for database representation.
|
||||||
// See also:
|
// See also:
|
||||||
// https://eips.ethereum.org/EIPS/eip-7745#hash-tree-structure
|
// https://eips.ethereum.org/EIPS/eip-7745#hash-tree-structure
|
||||||
func mapRowIndex(mapIndex, rowIndex uint32) uint64 {
|
func (f *FilterMaps) mapRowIndex(mapIndex, rowIndex uint32) uint64 {
|
||||||
epochIndex, mapSubIndex := mapIndex>>logMapsPerEpoch, mapIndex%mapsPerEpoch
|
epochIndex, mapSubIndex := mapIndex>>f.logMapsPerEpoch, mapIndex&(f.mapsPerEpoch-1)
|
||||||
return (uint64(epochIndex)<<logMapHeight+uint64(rowIndex))<<logMapsPerEpoch + uint64(mapSubIndex)
|
return (uint64(epochIndex)<<f.logMapHeight+uint64(rowIndex))<<f.logMapsPerEpoch + uint64(mapSubIndex)
|
||||||
}
|
}
|
||||||
|
|
||||||
// getBlockLvPointer returns the starting log value index where the log values
|
// getBlockLvPointer returns the starting log value index where the log values
|
||||||
|
@ -440,152 +431,3 @@ func (f *FilterMaps) deleteMapBlockPtr(batch ethdb.Batch, mapIndex uint32) {
|
||||||
f.blockPtrCache.Remove(mapIndex)
|
f.blockPtrCache.Remove(mapIndex)
|
||||||
rawdb.DeleteFilterMapBlockPtr(batch, mapIndex)
|
rawdb.DeleteFilterMapBlockPtr(batch, mapIndex)
|
||||||
}
|
}
|
||||||
|
|
||||||
// addressValue returns the log value hash of a log emitting address.
|
|
||||||
func addressValue(address common.Address) common.Hash {
|
|
||||||
var result common.Hash
|
|
||||||
hasher := sha256.New()
|
|
||||||
hasher.Write(address[:])
|
|
||||||
hasher.Sum(result[:0])
|
|
||||||
return result
|
|
||||||
}
|
|
||||||
|
|
||||||
// topicValue returns the log value hash of a log topic.
|
|
||||||
func topicValue(topic common.Hash) common.Hash {
|
|
||||||
var result common.Hash
|
|
||||||
hasher := sha256.New()
|
|
||||||
hasher.Write(topic[:])
|
|
||||||
hasher.Sum(result[:0])
|
|
||||||
return result
|
|
||||||
}
|
|
||||||
|
|
||||||
// rowIndex returns the row index in which the given log value should be marked
|
|
||||||
// during the given epoch. Note that row assignments are re-shuffled in every
|
|
||||||
// epoch in order to ensure that even though there are always a few more heavily
|
|
||||||
// used rows due to very popular addresses and topics, these will not make search
|
|
||||||
// for other log values very expensive. Even if certain values are occasionally
|
|
||||||
// sorted into these heavy rows, in most of the epochs they are placed in average
|
|
||||||
// length rows.
|
|
||||||
func rowIndex(epochIndex uint32, logValue common.Hash) uint32 {
|
|
||||||
hasher := sha256.New()
|
|
||||||
hasher.Write(logValue[:])
|
|
||||||
var indexEnc [4]byte
|
|
||||||
binary.LittleEndian.PutUint32(indexEnc[:], epochIndex)
|
|
||||||
hasher.Write(indexEnc[:])
|
|
||||||
var hash common.Hash
|
|
||||||
hasher.Sum(hash[:0])
|
|
||||||
return binary.LittleEndian.Uint32(hash[:4]) % mapHeight
|
|
||||||
}
|
|
||||||
|
|
||||||
// columnIndex returns the column index that should be added to the appropriate
|
|
||||||
// row in order to place a mark for the next log value.
|
|
||||||
func columnIndex(lvIndex uint64, logValue common.Hash) uint32 {
|
|
||||||
x := uint32(lvIndex % valuesPerMap) // log value sub-index
|
|
||||||
transformHash := transformHash(uint32(lvIndex/valuesPerMap), logValue)
|
|
||||||
// apply column index transformation function
|
|
||||||
x += binary.LittleEndian.Uint32(transformHash[0:4])
|
|
||||||
x *= binary.LittleEndian.Uint32(transformHash[4:8])*2 + 1
|
|
||||||
x ^= binary.LittleEndian.Uint32(transformHash[8:12])
|
|
||||||
x *= binary.LittleEndian.Uint32(transformHash[12:16])*2 + 1
|
|
||||||
x += binary.LittleEndian.Uint32(transformHash[16:20])
|
|
||||||
x *= binary.LittleEndian.Uint32(transformHash[20:24])*2 + 1
|
|
||||||
x ^= binary.LittleEndian.Uint32(transformHash[24:28])
|
|
||||||
x *= binary.LittleEndian.Uint32(transformHash[28:32])*2 + 1
|
|
||||||
return x
|
|
||||||
}
|
|
||||||
|
|
||||||
// transformHash calculates a hash specific to a given map and log value hash
|
|
||||||
// that defines a bijective function on the uint32 range. This function is used
|
|
||||||
// to transform the log value sub-index (distance from the first index of the map)
|
|
||||||
// into a 32 bit column index, then applied in reverse when searching for potential
|
|
||||||
// matches for a given log value.
|
|
||||||
func transformHash(mapIndex uint32, logValue common.Hash) (result common.Hash) {
|
|
||||||
hasher := sha256.New()
|
|
||||||
hasher.Write(logValue[:])
|
|
||||||
var indexEnc [4]byte
|
|
||||||
binary.LittleEndian.PutUint32(indexEnc[:], mapIndex)
|
|
||||||
hasher.Write(indexEnc[:])
|
|
||||||
hasher.Sum(result[:0])
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// potentialMatches returns the list of log value indices potentially matching
|
|
||||||
// the given log value hash in the range of the filter map the row belongs to.
|
|
||||||
// Note that the list of indices is always sorted and potential duplicates are
|
|
||||||
// removed. Though the column indices are stored in the same order they were
|
|
||||||
// added and therefore the true matches are automatically reverse transformed
|
|
||||||
// in the right order, false positives can ruin this property. Since these can
|
|
||||||
// only be separated from true matches after the combined pattern matching of the
|
|
||||||
// outputs of individual log value matchers and this pattern matcher assumes a
|
|
||||||
// sorted and duplicate-free list of indices, we should ensure these properties
|
|
||||||
// here.
|
|
||||||
func (row FilterRow) potentialMatches(mapIndex uint32, logValue common.Hash) potentialMatches {
|
|
||||||
results := make(potentialMatches, 0, 8)
|
|
||||||
transformHash := transformHash(mapIndex, logValue)
|
|
||||||
sub1 := binary.LittleEndian.Uint32(transformHash[0:4])
|
|
||||||
mul1 := uint32ModInverse(binary.LittleEndian.Uint32(transformHash[4:8])*2 + 1)
|
|
||||||
xor1 := binary.LittleEndian.Uint32(transformHash[8:12])
|
|
||||||
mul2 := uint32ModInverse(binary.LittleEndian.Uint32(transformHash[12:16])*2 + 1)
|
|
||||||
sub2 := binary.LittleEndian.Uint32(transformHash[16:20])
|
|
||||||
mul3 := uint32ModInverse(binary.LittleEndian.Uint32(transformHash[20:24])*2 + 1)
|
|
||||||
xor2 := binary.LittleEndian.Uint32(transformHash[24:28])
|
|
||||||
mul4 := uint32ModInverse(binary.LittleEndian.Uint32(transformHash[28:32])*2 + 1)
|
|
||||||
// perform reverse column index transformation on all column indices of the row.
|
|
||||||
// if a column index was added by the searched log value then the reverse
|
|
||||||
// transform will yield a valid log value sub-index of the given map.
|
|
||||||
// Column index is 32 bits long while there are 2**16 valid log value indices
|
|
||||||
// in the map's range, so this can also happen by accident with 1 in 2**16
|
|
||||||
// chance, in which case we have a false positive.
|
|
||||||
for _, columnIndex := range row {
|
|
||||||
if potentialSubIndex := (((((((columnIndex * mul4) ^ xor2) * mul3) - sub2) * mul2) ^ xor1) * mul1) - sub1; potentialSubIndex < valuesPerMap {
|
|
||||||
results = append(results, uint64(mapIndex)*valuesPerMap+uint64(potentialSubIndex))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
sort.Sort(results)
|
|
||||||
// remove duplicates
|
|
||||||
j := 0
|
|
||||||
for i, match := range results {
|
|
||||||
if i == 0 || match != results[i-1] {
|
|
||||||
results[j] = results[i]
|
|
||||||
j++
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return results[:j]
|
|
||||||
}
|
|
||||||
|
|
||||||
// potentialMatches is a strictly monotonically increasing list of log value
|
|
||||||
// indices in the range of a filter map that are potential matches for certain
|
|
||||||
// filter criteria.
|
|
||||||
// Note that nil is used as a wildcard and therefore means that all log value
|
|
||||||
// indices in the filter map range are potential matches. If there are no
|
|
||||||
// potential matches in the given map's range then an empty slice should be used.
|
|
||||||
type potentialMatches []uint64
|
|
||||||
|
|
||||||
// noMatches means there are no potential matches in a given filter map's range.
|
|
||||||
var noMatches = potentialMatches{}
|
|
||||||
|
|
||||||
func (p potentialMatches) Len() int { return len(p) }
|
|
||||||
func (p potentialMatches) Less(i, j int) bool { return p[i] < p[j] }
|
|
||||||
func (p potentialMatches) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
|
|
||||||
|
|
||||||
// uint32ModInverse takes an odd 32 bit number and returns its modular
|
|
||||||
// multiplicative inverse (mod 2**32), meaning that for any uint32 x and odd y
|
|
||||||
// x * y * uint32ModInverse(y) == 1.
|
|
||||||
func uint32ModInverse(v uint32) uint32 {
|
|
||||||
if v&1 == 0 {
|
|
||||||
panic("uint32ModInverse called with even argument")
|
|
||||||
}
|
|
||||||
m := int64(1) << 32
|
|
||||||
m0 := m
|
|
||||||
a := int64(v)
|
|
||||||
x, y := int64(1), int64(0)
|
|
||||||
for a > 1 {
|
|
||||||
q := a / m
|
|
||||||
m, a = a%m, m
|
|
||||||
x, y = y, x-q*y
|
|
||||||
}
|
|
||||||
if x < 0 {
|
|
||||||
x += m0
|
|
||||||
}
|
|
||||||
return uint32(x)
|
|
||||||
}
|
|
||||||
|
|
|
@ -13,10 +13,10 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
startLvPointer = valuesPerMap << 31 // log value index assigned to init block
|
startLvMap = 1 << 31 // map index assigned to init block
|
||||||
removedPointer = math.MaxUint64 // used in updateBatch to signal removed items
|
removedPointer = math.MaxUint64 // used in updateBatch to signal removed items
|
||||||
revertPointFrequency = 256 // frequency of revert points in database
|
revertPointFrequency = 256 // frequency of revert points in database
|
||||||
cachedRevertPoints = 64 // revert points for most recent blocks in memory
|
cachedRevertPoints = 64 // revert points for most recent blocks in memory
|
||||||
)
|
)
|
||||||
|
|
||||||
// updateLoop initializes and updates the log index structure according to the
|
// updateLoop initializes and updates the log index structure according to the
|
||||||
|
@ -36,7 +36,7 @@ func (f *FilterMaps) updateLoop() {
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
headEventCh = make(chan core.ChainHeadEvent)
|
headEventCh = make(chan core.ChainHeadEvent, 10)
|
||||||
sub = f.chain.SubscribeChainHeadEvent(headEventCh)
|
sub = f.chain.SubscribeChainHeadEvent(headEventCh)
|
||||||
head *types.Header
|
head *types.Header
|
||||||
stop bool
|
stop bool
|
||||||
|
@ -231,7 +231,7 @@ func (f *FilterMaps) tryUpdateHead(newHead *types.Header) bool {
|
||||||
log.Error("Error adding new block", "number", newHeader.Number, "hash", newHeader.Hash(), "error", err)
|
log.Error("Error adding new block", "number", newHeader.Number, "hash", newHeader.Hash(), "error", err)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
if update.updatedRangeLength() >= mapsPerEpoch {
|
if update.updatedRangeLength() >= f.mapsPerEpoch {
|
||||||
// limit the amount of data updated in a single batch
|
// limit the amount of data updated in a single batch
|
||||||
f.applyUpdateBatch(update)
|
f.applyUpdateBatch(update)
|
||||||
update = f.newUpdateBatch()
|
update = f.newUpdateBatch()
|
||||||
|
@ -336,12 +336,12 @@ func (f *FilterMaps) pruneTailPtr(tailTarget uint64) {
|
||||||
// pointers from the database. This function also updates targetLvPointer.
|
// pointers from the database. This function also updates targetLvPointer.
|
||||||
func (f *FilterMaps) tryPruneTailMaps(tailTarget uint64, stopFn func() bool) {
|
func (f *FilterMaps) tryPruneTailMaps(tailTarget uint64, stopFn func() bool) {
|
||||||
fmr := f.getRange()
|
fmr := f.getRange()
|
||||||
tailMap := uint32(fmr.tailLvPointer >> logValuesPerMap)
|
tailMap := uint32(fmr.tailLvPointer >> f.logValuesPerMap)
|
||||||
targetMap := uint32(fmr.tailBlockLvPointer >> logValuesPerMap)
|
targetMap := uint32(fmr.tailBlockLvPointer >> f.logValuesPerMap)
|
||||||
if tailMap >= targetMap {
|
if tailMap >= targetMap {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
lastEpoch := (targetMap - 1) >> logMapsPerEpoch
|
lastEpoch := (targetMap - 1) >> f.logMapsPerEpoch
|
||||||
removeLvPtr, err := f.getMapBlockPtr(tailMap)
|
removeLvPtr, err := f.getMapBlockPtr(tailMap)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Error fetching tail map block pointer", "map index", tailMap, "error", err)
|
log.Error("Error fetching tail map block pointer", "map index", tailMap, "error", err)
|
||||||
|
@ -352,12 +352,12 @@ func (f *FilterMaps) tryPruneTailMaps(tailTarget uint64, stopFn func() bool) {
|
||||||
lastLogged time.Time
|
lastLogged time.Time
|
||||||
)
|
)
|
||||||
for tailMap < targetMap && !stopFn() {
|
for tailMap < targetMap && !stopFn() {
|
||||||
tailEpoch := tailMap >> logMapsPerEpoch
|
tailEpoch := tailMap >> f.logMapsPerEpoch
|
||||||
if tailEpoch == lastEpoch {
|
if tailEpoch == lastEpoch {
|
||||||
f.pruneMaps(tailMap, targetMap, &removeLvPtr)
|
f.pruneMaps(tailMap, targetMap, &removeLvPtr)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
nextTailMap := (tailEpoch + 1) << logMapsPerEpoch
|
nextTailMap := (tailEpoch + 1) << f.logMapsPerEpoch
|
||||||
f.pruneMaps(tailMap, nextTailMap, &removeLvPtr)
|
f.pruneMaps(tailMap, nextTailMap, &removeLvPtr)
|
||||||
tailMap = nextTailMap
|
tailMap = nextTailMap
|
||||||
if !logged || time.Since(lastLogged) >= time.Second*10 {
|
if !logged || time.Since(lastLogged) >= time.Second*10 {
|
||||||
|
@ -386,13 +386,13 @@ func (f *FilterMaps) pruneMaps(first, afterLast uint32, removeLvPtr *uint64) {
|
||||||
for mapIndex := first; mapIndex < afterLast; mapIndex++ {
|
for mapIndex := first; mapIndex < afterLast; mapIndex++ {
|
||||||
f.deleteMapBlockPtr(batch, mapIndex)
|
f.deleteMapBlockPtr(batch, mapIndex)
|
||||||
}
|
}
|
||||||
for rowIndex := uint32(0); rowIndex < mapHeight; rowIndex++ {
|
for rowIndex := uint32(0); rowIndex < f.mapHeight; rowIndex++ {
|
||||||
for mapIndex := first; mapIndex < afterLast; mapIndex++ {
|
for mapIndex := first; mapIndex < afterLast; mapIndex++ {
|
||||||
f.storeFilterMapRow(batch, mapIndex, rowIndex, emptyRow)
|
f.storeFilterMapRow(batch, mapIndex, rowIndex, emptyRow)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
fmr := f.getRange()
|
fmr := f.getRange()
|
||||||
fmr.tailLvPointer = uint64(afterLast) << logValuesPerMap
|
fmr.tailLvPointer = uint64(afterLast) << f.logValuesPerMap
|
||||||
if fmr.tailLvPointer > fmr.tailBlockLvPointer {
|
if fmr.tailLvPointer > fmr.tailBlockLvPointer {
|
||||||
log.Error("Cannot prune filter maps beyond tail block log value pointer", "tailLvPointer", fmr.tailLvPointer, "tailBlockLvPointer", fmr.tailBlockLvPointer)
|
log.Error("Cannot prune filter maps beyond tail block log value pointer", "tailLvPointer", fmr.tailLvPointer, "tailBlockLvPointer", fmr.tailBlockLvPointer)
|
||||||
return
|
return
|
||||||
|
@ -407,11 +407,11 @@ func (f *FilterMaps) pruneMaps(first, afterLast uint32, removeLvPtr *uint64) {
|
||||||
// that can be written to the database in a single batch while the in-memory
|
// that can be written to the database in a single batch while the in-memory
|
||||||
// representations in FilterMaps are also updated.
|
// representations in FilterMaps are also updated.
|
||||||
type updateBatch struct {
|
type updateBatch struct {
|
||||||
|
f *FilterMaps
|
||||||
filterMapsRange
|
filterMapsRange
|
||||||
maps map[uint32]*filterMap // nil rows are unchanged
|
maps map[uint32]filterMap // nil rows are unchanged
|
||||||
getFilterMapRow func(mapIndex, rowIndex uint32) (FilterRow, error)
|
blockLvPointer map[uint64]uint64 // removedPointer means delete
|
||||||
blockLvPointer map[uint64]uint64 // removedPointer means delete
|
mapBlockPtr map[uint32]uint64 // removedPointer means delete
|
||||||
mapBlockPtr map[uint32]uint64 // removedPointer means delete
|
|
||||||
revertPoints map[uint64]*revertPoint
|
revertPoints map[uint64]*revertPoint
|
||||||
firstMap, afterLastMap uint32
|
firstMap, afterLastMap uint32
|
||||||
}
|
}
|
||||||
|
@ -422,9 +422,9 @@ func (f *FilterMaps) newUpdateBatch() *updateBatch {
|
||||||
defer f.lock.RUnlock()
|
defer f.lock.RUnlock()
|
||||||
|
|
||||||
return &updateBatch{
|
return &updateBatch{
|
||||||
|
f: f,
|
||||||
filterMapsRange: f.filterMapsRange,
|
filterMapsRange: f.filterMapsRange,
|
||||||
maps: make(map[uint32]*filterMap),
|
maps: make(map[uint32]filterMap),
|
||||||
getFilterMapRow: f.getFilterMapRow,
|
|
||||||
blockLvPointer: make(map[uint64]uint64),
|
blockLvPointer: make(map[uint64]uint64),
|
||||||
mapBlockPtr: make(map[uint32]uint64),
|
mapBlockPtr: make(map[uint32]uint64),
|
||||||
revertPoints: make(map[uint64]*revertPoint),
|
revertPoints: make(map[uint64]*revertPoint),
|
||||||
|
@ -455,10 +455,10 @@ func (f *FilterMaps) applyUpdateBatch(u *updateBatch) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// write filter map rows
|
// write filter map rows
|
||||||
for rowIndex := uint32(0); rowIndex < mapHeight; rowIndex++ {
|
for rowIndex := uint32(0); rowIndex < f.mapHeight; rowIndex++ {
|
||||||
for mapIndex := u.firstMap; mapIndex < u.afterLastMap; mapIndex++ {
|
for mapIndex := u.firstMap; mapIndex < u.afterLastMap; mapIndex++ {
|
||||||
if fm := u.maps[mapIndex]; fm != nil {
|
if fm := u.maps[mapIndex]; fm != nil {
|
||||||
if row := (*fm)[rowIndex]; row != nil {
|
if row := fm[rowIndex]; row != nil {
|
||||||
f.storeFilterMapRow(batch, mapIndex, rowIndex, row)
|
f.storeFilterMapRow(batch, mapIndex, rowIndex, row)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -488,7 +488,7 @@ func (f *FilterMaps) applyUpdateBatch(u *updateBatch) {
|
||||||
rawdb.WriteRevertPoint(batch, b, &rawdb.RevertPoint{
|
rawdb.WriteRevertPoint(batch, b, &rawdb.RevertPoint{
|
||||||
BlockHash: rp.blockHash,
|
BlockHash: rp.blockHash,
|
||||||
MapIndex: rp.mapIndex,
|
MapIndex: rp.mapIndex,
|
||||||
RowLength: rp.rowLength[:],
|
RowLength: rp.rowLength,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -507,7 +507,7 @@ func (u *updateBatch) updatedRangeLength() uint32 {
|
||||||
|
|
||||||
// tailEpoch returns the tail epoch index.
|
// tailEpoch returns the tail epoch index.
|
||||||
func (u *updateBatch) tailEpoch() uint32 {
|
func (u *updateBatch) tailEpoch() uint32 {
|
||||||
return uint32(u.tailBlockLvPointer >> (logValuesPerMap + logMapsPerEpoch))
|
return uint32(u.tailBlockLvPointer >> (u.f.logValuesPerMap + u.f.logMapsPerEpoch))
|
||||||
}
|
}
|
||||||
|
|
||||||
// getRowPtr returns a pointer to a FilterRow that can be modified. If the batch
|
// getRowPtr returns a pointer to a FilterRow that can be modified. If the batch
|
||||||
|
@ -517,7 +517,7 @@ func (u *updateBatch) tailEpoch() uint32 {
|
||||||
func (u *updateBatch) getRowPtr(mapIndex, rowIndex uint32) (*FilterRow, error) {
|
func (u *updateBatch) getRowPtr(mapIndex, rowIndex uint32) (*FilterRow, error) {
|
||||||
fm := u.maps[mapIndex]
|
fm := u.maps[mapIndex]
|
||||||
if fm == nil {
|
if fm == nil {
|
||||||
fm = new(filterMap)
|
fm = make(filterMap, u.f.mapHeight)
|
||||||
u.maps[mapIndex] = fm
|
u.maps[mapIndex] = fm
|
||||||
if mapIndex < u.firstMap || u.afterLastMap == 0 {
|
if mapIndex < u.firstMap || u.afterLastMap == 0 {
|
||||||
u.firstMap = mapIndex
|
u.firstMap = mapIndex
|
||||||
|
@ -526,9 +526,9 @@ func (u *updateBatch) getRowPtr(mapIndex, rowIndex uint32) (*FilterRow, error) {
|
||||||
u.afterLastMap = mapIndex + 1
|
u.afterLastMap = mapIndex + 1
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
rowPtr := &(*fm)[rowIndex]
|
rowPtr := &fm[rowIndex]
|
||||||
if *rowPtr == nil {
|
if *rowPtr == nil {
|
||||||
if filterRow, err := u.getFilterMapRow(mapIndex, rowIndex); err == nil {
|
if filterRow, err := u.f.getFilterMapRow(mapIndex, rowIndex); err == nil {
|
||||||
// filterRow is read only, copy before write
|
// filterRow is read only, copy before write
|
||||||
*rowPtr = make(FilterRow, len(filterRow), len(filterRow)+8)
|
*rowPtr = make(FilterRow, len(filterRow), len(filterRow)+8)
|
||||||
copy(*rowPtr, filterRow)
|
copy(*rowPtr, filterRow)
|
||||||
|
@ -545,6 +545,7 @@ func (u *updateBatch) initWithBlock(header *types.Header, receipts types.Receipt
|
||||||
return errors.New("already initialized")
|
return errors.New("already initialized")
|
||||||
}
|
}
|
||||||
u.initialized = true
|
u.initialized = true
|
||||||
|
startLvPointer := uint64(startLvMap) << u.f.logValuesPerMap
|
||||||
u.headLvPointer, u.tailLvPointer, u.tailBlockLvPointer = startLvPointer, startLvPointer, startLvPointer
|
u.headLvPointer, u.tailLvPointer, u.tailBlockLvPointer = startLvPointer, startLvPointer, startLvPointer
|
||||||
u.headBlockNumber, u.tailBlockNumber = header.Number.Uint64()-1, header.Number.Uint64()
|
u.headBlockNumber, u.tailBlockNumber = header.Number.Uint64()-1, header.Number.Uint64()
|
||||||
u.headBlockHash, u.tailParentHash = header.ParentHash, header.ParentHash
|
u.headBlockHash, u.tailParentHash = header.ParentHash, header.ParentHash
|
||||||
|
@ -554,12 +555,12 @@ func (u *updateBatch) initWithBlock(header *types.Header, receipts types.Receipt
|
||||||
|
|
||||||
// addValueToHead adds a single log value to the head of the log index.
|
// addValueToHead adds a single log value to the head of the log index.
|
||||||
func (u *updateBatch) addValueToHead(logValue common.Hash) error {
|
func (u *updateBatch) addValueToHead(logValue common.Hash) error {
|
||||||
mapIndex := uint32(u.headLvPointer >> logValuesPerMap)
|
mapIndex := uint32(u.headLvPointer >> u.f.logValuesPerMap)
|
||||||
rowPtr, err := u.getRowPtr(mapIndex, rowIndex(mapIndex>>logMapsPerEpoch, logValue))
|
rowPtr, err := u.getRowPtr(mapIndex, u.f.rowIndex(mapIndex>>u.f.logMapsPerEpoch, logValue))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
column := columnIndex(u.headLvPointer, logValue)
|
column := u.f.columnIndex(u.headLvPointer, logValue)
|
||||||
*rowPtr = append(*rowPtr, column)
|
*rowPtr = append(*rowPtr, column)
|
||||||
u.headLvPointer++
|
u.headLvPointer++
|
||||||
return nil
|
return nil
|
||||||
|
@ -577,11 +578,11 @@ func (u *updateBatch) addBlockToHead(header *types.Header, receipts types.Receip
|
||||||
}
|
}
|
||||||
number := header.Number.Uint64()
|
number := header.Number.Uint64()
|
||||||
u.blockLvPointer[number] = u.headLvPointer
|
u.blockLvPointer[number] = u.headLvPointer
|
||||||
startMap := uint32((u.headLvPointer + valuesPerMap - 1) >> logValuesPerMap)
|
startMap := uint32((u.headLvPointer + u.f.valuesPerMap - 1) >> u.f.logValuesPerMap)
|
||||||
if err := iterateReceipts(receipts, u.addValueToHead); err != nil {
|
if err := iterateReceipts(receipts, u.addValueToHead); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
stopMap := uint32((u.headLvPointer + valuesPerMap - 1) >> logValuesPerMap)
|
stopMap := uint32((u.headLvPointer + u.f.valuesPerMap - 1) >> u.f.logValuesPerMap)
|
||||||
for m := startMap; m < stopMap; m++ {
|
for m := startMap; m < stopMap; m++ {
|
||||||
u.mapBlockPtr[m] = number
|
u.mapBlockPtr[m] = number
|
||||||
}
|
}
|
||||||
|
@ -610,12 +611,12 @@ func (u *updateBatch) addValueToTail(logValue common.Hash) error {
|
||||||
return nil // already added to the map
|
return nil // already added to the map
|
||||||
}
|
}
|
||||||
u.tailLvPointer--
|
u.tailLvPointer--
|
||||||
mapIndex := uint32(u.tailBlockLvPointer >> logValuesPerMap)
|
mapIndex := uint32(u.tailBlockLvPointer >> u.f.logValuesPerMap)
|
||||||
rowPtr, err := u.getRowPtr(mapIndex, rowIndex(mapIndex>>logMapsPerEpoch, logValue))
|
rowPtr, err := u.getRowPtr(mapIndex, u.f.rowIndex(mapIndex>>u.f.logMapsPerEpoch, logValue))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
column := columnIndex(u.tailBlockLvPointer, logValue)
|
column := u.f.columnIndex(u.tailBlockLvPointer, logValue)
|
||||||
*rowPtr = append(*rowPtr, 0)
|
*rowPtr = append(*rowPtr, 0)
|
||||||
copy((*rowPtr)[1:], (*rowPtr)[:len(*rowPtr)-1])
|
copy((*rowPtr)[1:], (*rowPtr)[:len(*rowPtr)-1])
|
||||||
(*rowPtr)[0] = column
|
(*rowPtr)[0] = column
|
||||||
|
@ -632,7 +633,7 @@ func (u *updateBatch) addBlockToTail(header *types.Header, receipts types.Receip
|
||||||
return errors.New("addBlockToTail parent mismatch")
|
return errors.New("addBlockToTail parent mismatch")
|
||||||
}
|
}
|
||||||
number := header.Number.Uint64()
|
number := header.Number.Uint64()
|
||||||
stopMap := uint32((u.tailLvPointer + valuesPerMap - 1) >> logValuesPerMap)
|
stopMap := uint32((u.tailLvPointer + u.f.valuesPerMap - 1) >> u.f.logValuesPerMap)
|
||||||
var cnt int
|
var cnt int
|
||||||
if err := iterateReceiptsReverse(receipts, func(lv common.Hash) error {
|
if err := iterateReceiptsReverse(receipts, func(lv common.Hash) error {
|
||||||
cnt++
|
cnt++
|
||||||
|
@ -640,7 +641,7 @@ func (u *updateBatch) addBlockToTail(header *types.Header, receipts types.Receip
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
startMap := uint32(u.tailLvPointer >> logValuesPerMap)
|
startMap := uint32(u.tailLvPointer >> u.f.logValuesPerMap)
|
||||||
for m := startMap; m < stopMap; m++ {
|
for m := startMap; m < stopMap; m++ {
|
||||||
u.mapBlockPtr[m] = number
|
u.mapBlockPtr[m] = number
|
||||||
}
|
}
|
||||||
|
@ -693,7 +694,7 @@ type revertPoint struct {
|
||||||
blockNumber uint64
|
blockNumber uint64
|
||||||
blockHash common.Hash
|
blockHash common.Hash
|
||||||
mapIndex uint32
|
mapIndex uint32
|
||||||
rowLength [mapHeight]uint
|
rowLength []uint
|
||||||
}
|
}
|
||||||
|
|
||||||
// makeRevertPoint creates a new revertPoint.
|
// makeRevertPoint creates a new revertPoint.
|
||||||
|
@ -701,19 +702,20 @@ func (u *updateBatch) makeRevertPoint() (*revertPoint, error) {
|
||||||
rp := &revertPoint{
|
rp := &revertPoint{
|
||||||
blockNumber: u.headBlockNumber,
|
blockNumber: u.headBlockNumber,
|
||||||
blockHash: u.headBlockHash,
|
blockHash: u.headBlockHash,
|
||||||
mapIndex: uint32(u.headLvPointer >> logValuesPerMap),
|
mapIndex: uint32(u.headLvPointer >> u.f.logValuesPerMap),
|
||||||
|
rowLength: make([]uint, u.f.mapHeight),
|
||||||
}
|
}
|
||||||
if u.tailLvPointer > uint64(rp.mapIndex)<<logValuesPerMap {
|
if u.tailLvPointer > uint64(rp.mapIndex)<<u.f.logValuesPerMap {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
for i := range rp.rowLength[:] {
|
for i := range rp.rowLength {
|
||||||
var row FilterRow
|
var row FilterRow
|
||||||
if m := u.maps[rp.mapIndex]; m != nil {
|
if m := u.maps[rp.mapIndex]; m != nil {
|
||||||
row = (*m)[i]
|
row = m[i]
|
||||||
}
|
}
|
||||||
if row == nil {
|
if row == nil {
|
||||||
var err error
|
var err error
|
||||||
row, err = u.getFilterMapRow(rp.mapIndex, uint32(i))
|
row, err = u.f.getFilterMapRow(rp.mapIndex, uint32(i))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -744,16 +746,15 @@ func (f *FilterMaps) getRevertPoint(blockNumber uint64) (*revertPoint, error) {
|
||||||
if rps == nil {
|
if rps == nil {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
if len(rps.RowLength) != mapHeight {
|
if uint32(len(rps.RowLength)) != f.mapHeight {
|
||||||
return nil, errors.New("invalid number of rows in stored revert point")
|
return nil, errors.New("invalid number of rows in stored revert point")
|
||||||
}
|
}
|
||||||
rp := &revertPoint{
|
return &revertPoint{
|
||||||
blockNumber: blockNumber,
|
blockNumber: blockNumber,
|
||||||
blockHash: rps.BlockHash,
|
blockHash: rps.BlockHash,
|
||||||
mapIndex: rps.MapIndex,
|
mapIndex: rps.MapIndex,
|
||||||
}
|
rowLength: rps.RowLength,
|
||||||
copy(rp.rowLength[:], rps.RowLength)
|
}, nil
|
||||||
return rp, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// revertTo reverts the log index to the given revert point.
|
// revertTo reverts the log index to the given revert point.
|
||||||
|
@ -762,12 +763,12 @@ func (f *FilterMaps) revertTo(rp *revertPoint) error {
|
||||||
defer f.lock.Unlock()
|
defer f.lock.Unlock()
|
||||||
|
|
||||||
batch := f.db.NewBatch()
|
batch := f.db.NewBatch()
|
||||||
afterLastMap := uint32((f.headLvPointer + valuesPerMap - 1) >> logValuesPerMap)
|
afterLastMap := uint32((f.headLvPointer + f.valuesPerMap - 1) >> f.logValuesPerMap)
|
||||||
if rp.mapIndex >= afterLastMap {
|
if rp.mapIndex >= afterLastMap {
|
||||||
return errors.New("cannot revert (head map behind revert point)")
|
return errors.New("cannot revert (head map behind revert point)")
|
||||||
}
|
}
|
||||||
lvPointer := uint64(rp.mapIndex) << logValuesPerMap
|
lvPointer := uint64(rp.mapIndex) << f.logValuesPerMap
|
||||||
for rowIndex, rowLen := range rp.rowLength[:] {
|
for rowIndex, rowLen := range rp.rowLength {
|
||||||
rowIndex := uint32(rowIndex)
|
rowIndex := uint32(rowIndex)
|
||||||
row, err := f.getFilterMapRow(rp.mapIndex, rowIndex)
|
row, err := f.getFilterMapRow(rp.mapIndex, rowIndex)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -21,6 +21,7 @@ var ErrMatchAll = errors.New("match all patterns not supported")
|
||||||
// once EIP-7745 is implemented and active, these functions can also be trustlessly
|
// once EIP-7745 is implemented and active, these functions can also be trustlessly
|
||||||
// served by a remote prover.
|
// served by a remote prover.
|
||||||
type MatcherBackend interface {
|
type MatcherBackend interface {
|
||||||
|
GetParams() *Params
|
||||||
GetBlockLvPointer(ctx context.Context, blockNumber uint64) (uint64, error)
|
GetBlockLvPointer(ctx context.Context, blockNumber uint64) (uint64, error)
|
||||||
GetFilterMapRow(ctx context.Context, mapIndex, rowIndex uint32) (FilterRow, error)
|
GetFilterMapRow(ctx context.Context, mapIndex, rowIndex uint32) (FilterRow, error)
|
||||||
GetLogByLvIndex(ctx context.Context, lvIndex uint64) (*types.Log, error)
|
GetLogByLvIndex(ctx context.Context, lvIndex uint64) (*types.Log, error)
|
||||||
|
@ -139,6 +140,7 @@ func GetPotentialMatches(ctx context.Context, backend MatcherBackend, firstBlock
|
||||||
// to that block range might be missing or incorrect.
|
// to that block range might be missing or incorrect.
|
||||||
// Also note that the returned list may contain false positives.
|
// Also note that the returned list may contain false positives.
|
||||||
func getPotentialMatches(ctx context.Context, backend MatcherBackend, firstBlock, lastBlock uint64, addresses []common.Address, topics [][]common.Hash) ([]*types.Log, error) {
|
func getPotentialMatches(ctx context.Context, backend MatcherBackend, firstBlock, lastBlock uint64, addresses []common.Address, topics [][]common.Hash) ([]*types.Log, error) {
|
||||||
|
params := backend.GetParams()
|
||||||
// find the log value index range to search
|
// find the log value index range to search
|
||||||
firstIndex, err := backend.GetBlockLvPointer(ctx, firstBlock)
|
firstIndex, err := backend.GetBlockLvPointer(ctx, firstBlock)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -151,8 +153,8 @@ func getPotentialMatches(ctx context.Context, backend MatcherBackend, firstBlock
|
||||||
if lastIndex > 0 {
|
if lastIndex > 0 {
|
||||||
lastIndex--
|
lastIndex--
|
||||||
}
|
}
|
||||||
firstMap, lastMap := uint32(firstIndex>>logValuesPerMap), uint32(lastIndex>>logValuesPerMap)
|
firstMap, lastMap := uint32(firstIndex>>params.logValuesPerMap), uint32(lastIndex>>params.logValuesPerMap)
|
||||||
firstEpoch, lastEpoch := firstMap>>logMapsPerEpoch, lastMap>>logMapsPerEpoch
|
firstEpoch, lastEpoch := firstMap>>params.logMapsPerEpoch, lastMap>>params.logMapsPerEpoch
|
||||||
|
|
||||||
// build matcher according to the given filter criteria
|
// build matcher according to the given filter criteria
|
||||||
matchers := make([]matcher, len(topics)+1)
|
matchers := make([]matcher, len(topics)+1)
|
||||||
|
@ -178,13 +180,13 @@ func getPotentialMatches(ctx context.Context, backend MatcherBackend, firstBlock
|
||||||
}
|
}
|
||||||
// matcher is the final sequence matcher that signals a match when all underlying
|
// matcher is the final sequence matcher that signals a match when all underlying
|
||||||
// matchers signal a match for consecutive log value indices.
|
// matchers signal a match for consecutive log value indices.
|
||||||
matcher := newMatchSequence(matchers)
|
matcher := newMatchSequence(params, matchers)
|
||||||
|
|
||||||
// processEpoch returns the potentially matching logs from the given epoch.
|
// processEpoch returns the potentially matching logs from the given epoch.
|
||||||
processEpoch := func(epochIndex uint32) ([]*types.Log, error) {
|
processEpoch := func(epochIndex uint32) ([]*types.Log, error) {
|
||||||
var logs []*types.Log
|
var logs []*types.Log
|
||||||
// create a list of map indices to process
|
// create a list of map indices to process
|
||||||
fm, lm := epochIndex<<logMapsPerEpoch, (epochIndex+1)<<logMapsPerEpoch-1
|
fm, lm := epochIndex<<params.logMapsPerEpoch, (epochIndex+1)<<params.logMapsPerEpoch-1
|
||||||
if fm < firstMap {
|
if fm < firstMap {
|
||||||
fm = firstMap
|
fm = firstMap
|
||||||
}
|
}
|
||||||
|
@ -318,13 +320,14 @@ type singleMatcher struct {
|
||||||
|
|
||||||
// getMatches implements matcher
|
// getMatches implements matcher
|
||||||
func (s *singleMatcher) getMatches(ctx context.Context, mapIndices []uint32) ([]potentialMatches, error) {
|
func (s *singleMatcher) getMatches(ctx context.Context, mapIndices []uint32) ([]potentialMatches, error) {
|
||||||
|
params := s.backend.GetParams()
|
||||||
results := make([]potentialMatches, len(mapIndices))
|
results := make([]potentialMatches, len(mapIndices))
|
||||||
for i, mapIndex := range mapIndices {
|
for i, mapIndex := range mapIndices {
|
||||||
filterRow, err := s.backend.GetFilterMapRow(ctx, mapIndex, rowIndex(mapIndex>>logMapsPerEpoch, s.value))
|
filterRow, err := s.backend.GetFilterMapRow(ctx, mapIndex, params.rowIndex(mapIndex>>params.logMapsPerEpoch, s.value))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
results[i] = filterRow.potentialMatches(mapIndex, s.value)
|
results[i] = params.potentialMatches(filterRow, mapIndex, s.value)
|
||||||
}
|
}
|
||||||
return results, nil
|
return results, nil
|
||||||
}
|
}
|
||||||
|
@ -403,6 +406,7 @@ func mergeResults(results []potentialMatches) potentialMatches {
|
||||||
// gives a match at X+offset. Note that matchSequence can be used recursively to
|
// gives a match at X+offset. Note that matchSequence can be used recursively to
|
||||||
// detect any log value sequence.
|
// detect any log value sequence.
|
||||||
type matchSequence struct {
|
type matchSequence struct {
|
||||||
|
params *Params
|
||||||
base, next matcher
|
base, next matcher
|
||||||
offset uint64
|
offset uint64
|
||||||
// *EmptyRate == totalCount << 32 + emptyCount (atomically accessed)
|
// *EmptyRate == totalCount << 32 + emptyCount (atomically accessed)
|
||||||
|
@ -412,7 +416,7 @@ type matchSequence struct {
|
||||||
// newMatchSequence creates a recursive sequence matcher from a list of underlying
|
// newMatchSequence creates a recursive sequence matcher from a list of underlying
|
||||||
// matchers. The resulting matcher signals a match at log value index X when each
|
// matchers. The resulting matcher signals a match at log value index X when each
|
||||||
// underlying matcher matchers[i] returns a match at X+i.
|
// underlying matcher matchers[i] returns a match at X+i.
|
||||||
func newMatchSequence(matchers []matcher) matcher {
|
func newMatchSequence(params *Params, matchers []matcher) matcher {
|
||||||
if len(matchers) == 0 {
|
if len(matchers) == 0 {
|
||||||
panic("zero length sequence matchers are not allowed")
|
panic("zero length sequence matchers are not allowed")
|
||||||
}
|
}
|
||||||
|
@ -420,7 +424,8 @@ func newMatchSequence(matchers []matcher) matcher {
|
||||||
return matchers[0]
|
return matchers[0]
|
||||||
}
|
}
|
||||||
return &matchSequence{
|
return &matchSequence{
|
||||||
base: newMatchSequence(matchers[:len(matchers)-1]),
|
params: params,
|
||||||
|
base: newMatchSequence(params, matchers[:len(matchers)-1]),
|
||||||
next: matchers[len(matchers)-1],
|
next: matchers[len(matchers)-1],
|
||||||
offset: uint64(len(matchers) - 1),
|
offset: uint64(len(matchers) - 1),
|
||||||
}
|
}
|
||||||
|
@ -461,7 +466,7 @@ func (m *matchSequence) getMatches(ctx context.Context, mapIndices []uint32) ([]
|
||||||
nextIndices = append(nextIndices, mapIndex)
|
nextIndices = append(nextIndices, mapIndex)
|
||||||
lastAdded = mapIndex
|
lastAdded = mapIndex
|
||||||
}
|
}
|
||||||
if !baseFirst || baseRes[i] == nil || baseRes[i][len(baseRes[i])-1] >= (uint64(mapIndex+1)<<logValuesPerMap)-m.offset {
|
if !baseFirst || baseRes[i] == nil || baseRes[i][len(baseRes[i])-1] >= (uint64(mapIndex+1)<<m.params.logValuesPerMap)-m.offset {
|
||||||
nextIndices = append(nextIndices, mapIndex+1)
|
nextIndices = append(nextIndices, mapIndex+1)
|
||||||
lastAdded = mapIndex + 1
|
lastAdded = mapIndex + 1
|
||||||
}
|
}
|
||||||
|
@ -492,8 +497,8 @@ func (m *matchSequence) getMatches(ctx context.Context, mapIndices []uint32) ([]
|
||||||
panic("invalid nextIndices")
|
panic("invalid nextIndices")
|
||||||
}
|
}
|
||||||
next1, next2 := nextRes[nextPtr], nextRes[nextPtr+1]
|
next1, next2 := nextRes[nextPtr], nextRes[nextPtr+1]
|
||||||
if next1 == nil || (len(next1) > 0 && next1[len(next1)-1] >= (uint64(mapIndex)<<logValuesPerMap)+m.offset) ||
|
if next1 == nil || (len(next1) > 0 && next1[len(next1)-1] >= (uint64(mapIndex)<<m.params.logValuesPerMap)+m.offset) ||
|
||||||
next2 == nil || (len(next2) > 0 && next2[0] < (uint64(mapIndex+1)<<logValuesPerMap)+m.offset) {
|
next2 == nil || (len(next2) > 0 && next2[0] < (uint64(mapIndex+1)<<m.params.logValuesPerMap)+m.offset) {
|
||||||
baseIndices = append(baseIndices, mapIndex)
|
baseIndices = append(baseIndices, mapIndex)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -548,17 +553,17 @@ func (m *matchSequence) getMatches(ctx context.Context, mapIndices []uint32) ([]
|
||||||
// match corresponding base and next matcher results
|
// match corresponding base and next matcher results
|
||||||
results := make([]potentialMatches, len(mapIndices))
|
results := make([]potentialMatches, len(mapIndices))
|
||||||
for i, mapIndex := range mapIndices {
|
for i, mapIndex := range mapIndices {
|
||||||
results[i] = matchSequenceResults(mapIndex, m.offset, baseResult(mapIndex), nextResult(mapIndex), nextResult(mapIndex+1))
|
results[i] = m.matchResults(mapIndex, m.offset, baseResult(mapIndex), nextResult(mapIndex), nextResult(mapIndex+1))
|
||||||
}
|
}
|
||||||
return results, nil
|
return results, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// matchSequenceResults returns a list of sequence matches for the given mapIndex
|
// matchResults returns a list of sequence matches for the given mapIndex and
|
||||||
// and offset based on the base matcher's results at mapIndex and the next matcher's
|
// offset based on the base matcher's results at mapIndex and the next matcher's
|
||||||
// results at mapIndex and mapIndex+1. Note that acquiring nextNextRes may be
|
// results at mapIndex and mapIndex+1. Note that acquiring nextNextRes may be
|
||||||
// skipped and it can be substituted with an empty list if baseRes has no potential
|
// skipped and it can be substituted with an empty list if baseRes has no potential
|
||||||
// matches that could be sequence matched with anything that could be in nextNextRes.
|
// matches that could be sequence matched with anything that could be in nextNextRes.
|
||||||
func matchSequenceResults(mapIndex uint32, offset uint64, baseRes, nextRes, nextNextRes potentialMatches) potentialMatches {
|
func (m *matchSequence) matchResults(mapIndex uint32, offset uint64, baseRes, nextRes, nextNextRes potentialMatches) potentialMatches {
|
||||||
if nextRes == nil || (baseRes != nil && len(baseRes) == 0) {
|
if nextRes == nil || (baseRes != nil && len(baseRes) == 0) {
|
||||||
// if nextRes is a wild card or baseRes is empty then the sequence matcher
|
// if nextRes is a wild card or baseRes is empty then the sequence matcher
|
||||||
// result equals baseRes.
|
// result equals baseRes.
|
||||||
|
@ -568,7 +573,7 @@ func matchSequenceResults(mapIndex uint32, offset uint64, baseRes, nextRes, next
|
||||||
// discard items from nextRes whose corresponding base matcher results
|
// discard items from nextRes whose corresponding base matcher results
|
||||||
// with the negative offset applied would be located at mapIndex-1.
|
// with the negative offset applied would be located at mapIndex-1.
|
||||||
start := 0
|
start := 0
|
||||||
for start < len(nextRes) && nextRes[start] < uint64(mapIndex)<<logValuesPerMap+offset {
|
for start < len(nextRes) && nextRes[start] < uint64(mapIndex)<<m.params.logValuesPerMap+offset {
|
||||||
start++
|
start++
|
||||||
}
|
}
|
||||||
nextRes = nextRes[start:]
|
nextRes = nextRes[start:]
|
||||||
|
@ -577,7 +582,7 @@ func matchSequenceResults(mapIndex uint32, offset uint64, baseRes, nextRes, next
|
||||||
// discard items from nextNextRes whose corresponding base matcher results
|
// discard items from nextNextRes whose corresponding base matcher results
|
||||||
// with the negative offset applied would still be located at mapIndex+1.
|
// with the negative offset applied would still be located at mapIndex+1.
|
||||||
stop := 0
|
stop := 0
|
||||||
for stop < len(nextNextRes) && nextNextRes[stop] < uint64(mapIndex+1)<<logValuesPerMap+offset {
|
for stop < len(nextNextRes) && nextNextRes[stop] < uint64(mapIndex+1)<<m.params.logValuesPerMap+offset {
|
||||||
stop++
|
stop++
|
||||||
}
|
}
|
||||||
nextNextRes = nextNextRes[:stop]
|
nextNextRes = nextNextRes[:stop]
|
||||||
|
|
|
@ -32,32 +32,15 @@ func (f *FilterMaps) NewMatcherBackend() *FilterMapsMatcherBackend {
|
||||||
return fm
|
return fm
|
||||||
}
|
}
|
||||||
|
|
||||||
// updateMatchersValidRange iterates through active matchers and limits their
|
// GetParams returns the filtermaps parameters.
|
||||||
// valid range with the current indexed range. This function should be called
|
// GetParams implements MatcherBackend.
|
||||||
// whenever a part of the log index has been removed, before adding new blocks
|
func (fm *FilterMapsMatcherBackend) GetParams() *Params {
|
||||||
// to it.
|
return &fm.f.Params
|
||||||
func (f *FilterMaps) updateMatchersValidRange() {
|
|
||||||
for fm := range f.matchers {
|
|
||||||
if !f.initialized {
|
|
||||||
fm.valid = false
|
|
||||||
}
|
|
||||||
if !fm.valid {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if fm.firstValid < f.tailBlockNumber {
|
|
||||||
fm.firstValid = f.tailBlockNumber
|
|
||||||
}
|
|
||||||
if fm.lastValid > f.headBlockNumber {
|
|
||||||
fm.lastValid = f.headBlockNumber
|
|
||||||
}
|
|
||||||
if fm.firstValid > fm.lastValid {
|
|
||||||
fm.valid = false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close removes the matcher from the set of active matchers and ensures that
|
// Close removes the matcher from the set of active matchers and ensures that
|
||||||
// any SyncLogIndex calls are cancelled.
|
// any SyncLogIndex calls are cancelled.
|
||||||
|
// Close implements MatcherBackend.
|
||||||
func (fm *FilterMapsMatcherBackend) Close() {
|
func (fm *FilterMapsMatcherBackend) Close() {
|
||||||
fm.f.lock.Lock()
|
fm.f.lock.Lock()
|
||||||
defer fm.f.lock.Unlock()
|
defer fm.f.lock.Unlock()
|
||||||
|
@ -156,3 +139,27 @@ func (fm *FilterMapsMatcherBackend) SyncLogIndex(ctx context.Context) (SyncRange
|
||||||
return SyncRange{}, ctx.Err()
|
return SyncRange{}, ctx.Err()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// updateMatchersValidRange iterates through active matchers and limits their
|
||||||
|
// valid range with the current indexed range. This function should be called
|
||||||
|
// whenever a part of the log index has been removed, before adding new blocks
|
||||||
|
// to it.
|
||||||
|
func (f *FilterMaps) updateMatchersValidRange() {
|
||||||
|
for fm := range f.matchers {
|
||||||
|
if !f.initialized {
|
||||||
|
fm.valid = false
|
||||||
|
}
|
||||||
|
if !fm.valid {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if fm.firstValid < f.tailBlockNumber {
|
||||||
|
fm.firstValid = f.tailBlockNumber
|
||||||
|
}
|
||||||
|
if fm.lastValid > f.headBlockNumber {
|
||||||
|
fm.lastValid = f.headBlockNumber
|
||||||
|
}
|
||||||
|
if fm.firstValid > fm.lastValid {
|
||||||
|
fm.valid = false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,180 @@
|
||||||
|
package filtermaps
|
||||||
|
|
||||||
|
import (
|
||||||
|
"crypto/sha256"
|
||||||
|
"encoding/binary"
|
||||||
|
"sort"
|
||||||
|
|
||||||
|
"github.com/ethereum/go-ethereum/common"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Params struct {
|
||||||
|
logMapHeight uint // log2(mapHeight)
|
||||||
|
logMapsPerEpoch uint // log2(mmapsPerEpochapsPerEpoch)
|
||||||
|
logValuesPerMap uint // log2(logValuesPerMap)
|
||||||
|
// derived fields
|
||||||
|
mapHeight uint32 // filter map height (number of rows)
|
||||||
|
mapsPerEpoch uint32 // number of maps in an epoch
|
||||||
|
valuesPerMap uint64 // number of log values marked on each filter map
|
||||||
|
}
|
||||||
|
|
||||||
|
var DefaultParams = Params{
|
||||||
|
logMapHeight: 12,
|
||||||
|
logMapsPerEpoch: 6,
|
||||||
|
logValuesPerMap: 16,
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Params) deriveFields() {
|
||||||
|
p.mapHeight = uint32(1) << p.logMapHeight
|
||||||
|
p.mapsPerEpoch = uint32(1) << p.logMapsPerEpoch
|
||||||
|
p.valuesPerMap = uint64(1) << p.logValuesPerMap
|
||||||
|
}
|
||||||
|
|
||||||
|
// addressValue returns the log value hash of a log emitting address.
|
||||||
|
func addressValue(address common.Address) common.Hash {
|
||||||
|
var result common.Hash
|
||||||
|
hasher := sha256.New()
|
||||||
|
hasher.Write(address[:])
|
||||||
|
hasher.Sum(result[:0])
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
// topicValue returns the log value hash of a log topic.
|
||||||
|
func topicValue(topic common.Hash) common.Hash {
|
||||||
|
var result common.Hash
|
||||||
|
hasher := sha256.New()
|
||||||
|
hasher.Write(topic[:])
|
||||||
|
hasher.Sum(result[:0])
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
// rowIndex returns the row index in which the given log value should be marked
|
||||||
|
// during the given epoch. Note that row assignments are re-shuffled in every
|
||||||
|
// epoch in order to ensure that even though there are always a few more heavily
|
||||||
|
// used rows due to very popular addresses and topics, these will not make search
|
||||||
|
// for other log values very expensive. Even if certain values are occasionally
|
||||||
|
// sorted into these heavy rows, in most of the epochs they are placed in average
|
||||||
|
// length rows.
|
||||||
|
func (p *Params) rowIndex(epochIndex uint32, logValue common.Hash) uint32 {
|
||||||
|
hasher := sha256.New()
|
||||||
|
hasher.Write(logValue[:])
|
||||||
|
var indexEnc [4]byte
|
||||||
|
binary.LittleEndian.PutUint32(indexEnc[:], epochIndex)
|
||||||
|
hasher.Write(indexEnc[:])
|
||||||
|
var hash common.Hash
|
||||||
|
hasher.Sum(hash[:0])
|
||||||
|
return binary.LittleEndian.Uint32(hash[:4]) % p.mapHeight
|
||||||
|
}
|
||||||
|
|
||||||
|
// columnIndex returns the column index that should be added to the appropriate
|
||||||
|
// row in order to place a mark for the next log value.
|
||||||
|
func (p *Params) columnIndex(lvIndex uint64, logValue common.Hash) uint32 {
|
||||||
|
x := uint32(lvIndex % p.valuesPerMap) // log value sub-index
|
||||||
|
transformHash := transformHash(uint32(lvIndex/p.valuesPerMap), logValue)
|
||||||
|
// apply column index transformation function
|
||||||
|
x += binary.LittleEndian.Uint32(transformHash[0:4])
|
||||||
|
x *= binary.LittleEndian.Uint32(transformHash[4:8])*2 + 1
|
||||||
|
x ^= binary.LittleEndian.Uint32(transformHash[8:12])
|
||||||
|
x *= binary.LittleEndian.Uint32(transformHash[12:16])*2 + 1
|
||||||
|
x += binary.LittleEndian.Uint32(transformHash[16:20])
|
||||||
|
x *= binary.LittleEndian.Uint32(transformHash[20:24])*2 + 1
|
||||||
|
x ^= binary.LittleEndian.Uint32(transformHash[24:28])
|
||||||
|
x *= binary.LittleEndian.Uint32(transformHash[28:32])*2 + 1
|
||||||
|
return x
|
||||||
|
}
|
||||||
|
|
||||||
|
// transformHash calculates a hash specific to a given map and log value hash
|
||||||
|
// that defines a bijective function on the uint32 range. This function is used
|
||||||
|
// to transform the log value sub-index (distance from the first index of the map)
|
||||||
|
// into a 32 bit column index, then applied in reverse when searching for potential
|
||||||
|
// matches for a given log value.
|
||||||
|
func transformHash(mapIndex uint32, logValue common.Hash) (result common.Hash) {
|
||||||
|
hasher := sha256.New()
|
||||||
|
hasher.Write(logValue[:])
|
||||||
|
var indexEnc [4]byte
|
||||||
|
binary.LittleEndian.PutUint32(indexEnc[:], mapIndex)
|
||||||
|
hasher.Write(indexEnc[:])
|
||||||
|
hasher.Sum(result[:0])
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// potentialMatches returns the list of log value indices potentially matching
|
||||||
|
// the given log value hash in the range of the filter map the row belongs to.
|
||||||
|
// Note that the list of indices is always sorted and potential duplicates are
|
||||||
|
// removed. Though the column indices are stored in the same order they were
|
||||||
|
// added and therefore the true matches are automatically reverse transformed
|
||||||
|
// in the right order, false positives can ruin this property. Since these can
|
||||||
|
// only be separated from true matches after the combined pattern matching of the
|
||||||
|
// outputs of individual log value matchers and this pattern matcher assumes a
|
||||||
|
// sorted and duplicate-free list of indices, we should ensure these properties
|
||||||
|
// here.
|
||||||
|
func (p *Params) potentialMatches(row FilterRow, mapIndex uint32, logValue common.Hash) potentialMatches {
|
||||||
|
results := make(potentialMatches, 0, 8)
|
||||||
|
transformHash := transformHash(mapIndex, logValue)
|
||||||
|
sub1 := binary.LittleEndian.Uint32(transformHash[0:4])
|
||||||
|
mul1 := uint32ModInverse(binary.LittleEndian.Uint32(transformHash[4:8])*2 + 1)
|
||||||
|
xor1 := binary.LittleEndian.Uint32(transformHash[8:12])
|
||||||
|
mul2 := uint32ModInverse(binary.LittleEndian.Uint32(transformHash[12:16])*2 + 1)
|
||||||
|
sub2 := binary.LittleEndian.Uint32(transformHash[16:20])
|
||||||
|
mul3 := uint32ModInverse(binary.LittleEndian.Uint32(transformHash[20:24])*2 + 1)
|
||||||
|
xor2 := binary.LittleEndian.Uint32(transformHash[24:28])
|
||||||
|
mul4 := uint32ModInverse(binary.LittleEndian.Uint32(transformHash[28:32])*2 + 1)
|
||||||
|
// perform reverse column index transformation on all column indices of the row.
|
||||||
|
// if a column index was added by the searched log value then the reverse
|
||||||
|
// transform will yield a valid log value sub-index of the given map.
|
||||||
|
// Column index is 32 bits long while there are 2**16 valid log value indices
|
||||||
|
// in the map's range, so this can also happen by accident with 1 in 2**16
|
||||||
|
// chance, in which case we have a false positive.
|
||||||
|
for _, columnIndex := range row {
|
||||||
|
if potentialSubIndex := (((((((columnIndex * mul4) ^ xor2) * mul3) - sub2) * mul2) ^ xor1) * mul1) - sub1; potentialSubIndex < uint32(p.valuesPerMap) {
|
||||||
|
results = append(results, uint64(mapIndex)<<p.logValuesPerMap+uint64(potentialSubIndex))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sort.Sort(results)
|
||||||
|
// remove duplicates
|
||||||
|
j := 0
|
||||||
|
for i, match := range results {
|
||||||
|
if i == 0 || match != results[i-1] {
|
||||||
|
results[j] = results[i]
|
||||||
|
j++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return results[:j]
|
||||||
|
}
|
||||||
|
|
||||||
|
// potentialMatches is a strictly monotonically increasing list of log value
|
||||||
|
// indices in the range of a filter map that are potential matches for certain
|
||||||
|
// filter criteria.
|
||||||
|
// Note that nil is used as a wildcard and therefore means that all log value
|
||||||
|
// indices in the filter map range are potential matches. If there are no
|
||||||
|
// potential matches in the given map's range then an empty slice should be used.
|
||||||
|
type potentialMatches []uint64
|
||||||
|
|
||||||
|
// noMatches means there are no potential matches in a given filter map's range.
|
||||||
|
var noMatches = potentialMatches{}
|
||||||
|
|
||||||
|
func (p potentialMatches) Len() int { return len(p) }
|
||||||
|
func (p potentialMatches) Less(i, j int) bool { return p[i] < p[j] }
|
||||||
|
func (p potentialMatches) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
|
||||||
|
|
||||||
|
// uint32ModInverse takes an odd 32 bit number and returns its modular
|
||||||
|
// multiplicative inverse (mod 2**32), meaning that for any uint32 x and odd y
|
||||||
|
// x * y * uint32ModInverse(y) == 1.
|
||||||
|
func uint32ModInverse(v uint32) uint32 {
|
||||||
|
if v&1 == 0 {
|
||||||
|
panic("uint32ModInverse called with even argument")
|
||||||
|
}
|
||||||
|
m := int64(1) << 32
|
||||||
|
m0 := m
|
||||||
|
a := int64(v)
|
||||||
|
x, y := int64(1), int64(0)
|
||||||
|
for a > 1 {
|
||||||
|
q := a / m
|
||||||
|
m, a = a%m, m
|
||||||
|
x, y = y, x-q*y
|
||||||
|
}
|
||||||
|
if x < 0 {
|
||||||
|
x += m0
|
||||||
|
}
|
||||||
|
return uint32(x)
|
||||||
|
}
|
|
@ -8,14 +8,17 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestSingleMatch(t *testing.T) {
|
func TestSingleMatch(t *testing.T) {
|
||||||
|
params := DefaultParams
|
||||||
|
params.deriveFields()
|
||||||
|
|
||||||
for count := 0; count < 100000; count++ {
|
for count := 0; count < 100000; count++ {
|
||||||
// generate a row with a single random entry
|
// generate a row with a single random entry
|
||||||
mapIndex := rand.Uint32()
|
mapIndex := rand.Uint32()
|
||||||
lvIndex := uint64(mapIndex)<<logValuesPerMap + uint64(rand.Intn(valuesPerMap))
|
lvIndex := uint64(mapIndex)<<params.logValuesPerMap + uint64(rand.Intn(int(params.valuesPerMap)))
|
||||||
var lvHash common.Hash
|
var lvHash common.Hash
|
||||||
rand.Read(lvHash[:])
|
rand.Read(lvHash[:])
|
||||||
row := FilterRow{columnIndex(lvIndex, lvHash)}
|
row := FilterRow{params.columnIndex(lvIndex, lvHash)}
|
||||||
matches := row.potentialMatches(mapIndex, lvHash)
|
matches := params.potentialMatches(row, mapIndex, lvHash)
|
||||||
// check if it has been reverse transformed correctly
|
// check if it has been reverse transformed correctly
|
||||||
if len(matches) != 1 {
|
if len(matches) != 1 {
|
||||||
t.Fatalf("Invalid length of matches (got %d, expected 1)", len(matches))
|
t.Fatalf("Invalid length of matches (got %d, expected 1)", len(matches))
|
||||||
|
@ -34,23 +37,26 @@ const (
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestPotentialMatches(t *testing.T) {
|
func TestPotentialMatches(t *testing.T) {
|
||||||
|
params := DefaultParams
|
||||||
|
params.deriveFields()
|
||||||
|
|
||||||
var falsePositives int
|
var falsePositives int
|
||||||
for count := 0; count < testPmCount; count++ {
|
for count := 0; count < testPmCount; count++ {
|
||||||
mapIndex := rand.Uint32()
|
mapIndex := rand.Uint32()
|
||||||
lvStart := uint64(mapIndex) << logValuesPerMap
|
lvStart := uint64(mapIndex) << params.logValuesPerMap
|
||||||
var row FilterRow
|
var row FilterRow
|
||||||
lvIndices := make([]uint64, testPmLen)
|
lvIndices := make([]uint64, testPmLen)
|
||||||
lvHashes := make([]common.Hash, testPmLen+1)
|
lvHashes := make([]common.Hash, testPmLen+1)
|
||||||
for i := range lvIndices {
|
for i := range lvIndices {
|
||||||
// add testPmLen single entries with different log value hashes at different indices
|
// add testPmLen single entries with different log value hashes at different indices
|
||||||
lvIndices[i] = lvStart + uint64(rand.Intn(valuesPerMap))
|
lvIndices[i] = lvStart + uint64(rand.Intn(int(params.valuesPerMap)))
|
||||||
rand.Read(lvHashes[i][:])
|
rand.Read(lvHashes[i][:])
|
||||||
row = append(row, columnIndex(lvIndices[i], lvHashes[i]))
|
row = append(row, params.columnIndex(lvIndices[i], lvHashes[i]))
|
||||||
}
|
}
|
||||||
// add the same log value hash at the first testPmLen log value indices of the map's range
|
// add the same log value hash at the first testPmLen log value indices of the map's range
|
||||||
rand.Read(lvHashes[testPmLen][:])
|
rand.Read(lvHashes[testPmLen][:])
|
||||||
for lvIndex := lvStart; lvIndex < lvStart+testPmLen; lvIndex++ {
|
for lvIndex := lvStart; lvIndex < lvStart+testPmLen; lvIndex++ {
|
||||||
row = append(row, columnIndex(lvIndex, lvHashes[testPmLen]))
|
row = append(row, params.columnIndex(lvIndex, lvHashes[testPmLen]))
|
||||||
}
|
}
|
||||||
// randomly duplicate some entries
|
// randomly duplicate some entries
|
||||||
for i := 0; i < testPmLen; i++ {
|
for i := 0; i < testPmLen; i++ {
|
||||||
|
@ -63,7 +69,7 @@ func TestPotentialMatches(t *testing.T) {
|
||||||
}
|
}
|
||||||
// check retrieved matches while also counting false positives
|
// check retrieved matches while also counting false positives
|
||||||
for i, lvHash := range lvHashes {
|
for i, lvHash := range lvHashes {
|
||||||
matches := row.potentialMatches(mapIndex, lvHash)
|
matches := params.potentialMatches(row, mapIndex, lvHash)
|
||||||
if i < testPmLen {
|
if i < testPmLen {
|
||||||
// check single entry match
|
// check single entry match
|
||||||
if len(matches) < 1 {
|
if len(matches) < 1 {
|
||||||
|
@ -97,15 +103,17 @@ func TestPotentialMatches(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Whenever looking for a certain log value hash, each entry in the row that
|
// Whenever looking for a certain log value hash, each entry in the row that
|
||||||
// was generated by another log value hash (a "foreign entry") has an
|
// was generated by another log value hash (a "foreign entry") has a
|
||||||
// 1 / valuesPerMap chance of yielding a false positive.
|
// valuesPerMap // 2^32 chance of yielding a false positive if the reverse
|
||||||
|
// transformed 32 bit integer is by random chance less than valuesPerMap and
|
||||||
|
// is therefore considered a potentially valid match.
|
||||||
// We have testPmLen unique hash entries and a testPmLen long series of entries
|
// We have testPmLen unique hash entries and a testPmLen long series of entries
|
||||||
// for the same hash. For each of the testPmLen unique hash entries there are
|
// for the same hash. For each of the testPmLen unique hash entries there are
|
||||||
// testPmLen*2-1 foreign entries while for the long series there are testPmLen
|
// testPmLen*2-1 foreign entries while for the long series there are testPmLen
|
||||||
// foreign entries. This means that after performing all these filtering runs,
|
// foreign entries. This means that after performing all these filtering runs,
|
||||||
// we have processed 2*testPmLen^2 foreign entries, which given us an estimate
|
// we have processed 2*testPmLen^2 foreign entries, which given us an estimate
|
||||||
// of how many false positives to expect.
|
// of how many false positives to expect.
|
||||||
expFalse := testPmCount * testPmLen * testPmLen * 2 / valuesPerMap
|
expFalse := int(uint64(testPmCount*testPmLen*testPmLen*2) * params.valuesPerMap >> 32)
|
||||||
if falsePositives < expFalse/2 || falsePositives > expFalse*3/2 {
|
if falsePositives < expFalse/2 || falsePositives > expFalse*3/2 {
|
||||||
t.Fatalf("False positive rate out of expected range (got %d, expected %d +-50%%)", falsePositives, expFalse)
|
t.Fatalf("False positive rate out of expected range (got %d, expected %d +-50%%)", falsePositives, expFalse)
|
||||||
}
|
}
|
|
@ -216,7 +216,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
eth.filterMaps = filtermaps.NewFilterMaps(chainDb, eth.blockchain, config.LogHistory, config.LogNoHistory)
|
eth.filterMaps = filtermaps.NewFilterMaps(chainDb, eth.blockchain, filtermaps.DefaultParams, config.LogHistory, config.LogNoHistory)
|
||||||
|
|
||||||
if config.BlobPool.Datadir != "" {
|
if config.BlobPool.Datadir != "" {
|
||||||
config.BlobPool.Datadir = stack.ResolvePath(config.BlobPool.Datadir)
|
config.BlobPool.Datadir = stack.ResolvePath(config.BlobPool.Datadir)
|
||||||
|
|
Loading…
Reference in New Issue