core/filtermaps: trigger undindexing after 1000 blocks
This commit is contained in:
parent
1901c7d3d9
commit
edb6f77b11
|
@ -53,7 +53,7 @@ type FilterMaps struct {
|
|||
db ethdb.KeyValueStore
|
||||
closeCh chan struct{}
|
||||
closeWg sync.WaitGroup
|
||||
history uint64
|
||||
history, unindexLimit uint64
|
||||
noHistory bool
|
||||
|
||||
Params
|
||||
|
@ -101,9 +101,9 @@ var emptyRow = FilterRow{}
|
|||
// Note that tailBlockLvPointer points to the earliest log value index belonging
|
||||
// to the tail block while tailLvPointer points to the earliest log value index
|
||||
// added to the corresponding filter map. The latter might point to an earlier
|
||||
// index after tail blocks have been pruned because we do not remove tail values
|
||||
// one by one, rather delete entire maps when all blocks that had log values in
|
||||
// those maps are unindexed.
|
||||
// index after tail blocks have been unindexed because we do not remove tail
|
||||
// values one by one, rather delete entire maps when all blocks that had log
|
||||
// values in those maps are unindexed.
|
||||
type filterMapsRange struct {
|
||||
initialized bool
|
||||
headLvPointer, tailLvPointer, tailBlockLvPointer uint64
|
||||
|
@ -113,7 +113,7 @@ type filterMapsRange struct {
|
|||
|
||||
// NewFilterMaps creates a new FilterMaps and starts the indexer in order to keep
|
||||
// the structure in sync with the given blockchain.
|
||||
func NewFilterMaps(db ethdb.KeyValueStore, chain blockchain, params Params, history uint64, noHistory bool) *FilterMaps {
|
||||
func NewFilterMaps(db ethdb.KeyValueStore, chain blockchain, params Params, history, unindexLimit uint64, noHistory bool) *FilterMaps {
|
||||
rs, err := rawdb.ReadFilterMapsRange(db)
|
||||
if err != nil {
|
||||
log.Error("Error reading log index range", "error", err)
|
||||
|
|
|
@ -295,10 +295,10 @@ func (f *FilterMaps) tryUpdateTail(head *types.Header, stopFn func() bool) bool
|
|||
return false
|
||||
}
|
||||
}
|
||||
if tailNum < tailTarget {
|
||||
f.pruneTailPtr(tailTarget)
|
||||
if tailNum+f.unindexLimit <= tailTarget {
|
||||
f.unindexTailPtr(tailTarget)
|
||||
}
|
||||
return f.tryPruneTailMaps(tailTarget, stopFn)
|
||||
return f.tryUnindexTailMaps(tailTarget, stopFn)
|
||||
}
|
||||
|
||||
// tryExtendTail attempts to extend the log index backwards until it indexes the
|
||||
|
@ -335,12 +335,12 @@ func (f *FilterMaps) tryExtendTail(tailTarget uint64, stopFn func() bool) bool {
|
|||
return number <= tailTarget
|
||||
}
|
||||
|
||||
// pruneTailPtr updates the tail block number and hash and the corresponding
|
||||
// unindexTailPtr updates the tail block number and hash and the corresponding
|
||||
// tailBlockLvPointer according to the given tail target block number.
|
||||
// Note that this function does not remove old index data, only marks it unused
|
||||
// by updating the tail pointers, except for targetLvPointer which is unchanged
|
||||
// as it marks the tail of the log index data stored in the database.
|
||||
func (f *FilterMaps) pruneTailPtr(tailTarget uint64) {
|
||||
func (f *FilterMaps) unindexTailPtr(tailTarget uint64) {
|
||||
f.lock.Lock()
|
||||
defer f.lock.Unlock()
|
||||
|
||||
|
@ -372,9 +372,9 @@ func (f *FilterMaps) pruneTailPtr(tailTarget uint64) {
|
|||
f.setRange(f.db, fmr)
|
||||
}
|
||||
|
||||
// tryPruneTailMaps removes unused filter maps and corresponding log index
|
||||
// tryUnindexTailMaps removes unused filter maps and corresponding log index
|
||||
// pointers from the database. This function also updates targetLvPointer.
|
||||
func (f *FilterMaps) tryPruneTailMaps(tailTarget uint64, stopFn func() bool) bool {
|
||||
func (f *FilterMaps) tryUnindexTailMaps(tailTarget uint64, stopFn func() bool) bool {
|
||||
fmr := f.getRange()
|
||||
tailMap := uint32(fmr.tailLvPointer >> f.logValuesPerMap)
|
||||
targetMap := uint32(fmr.tailBlockLvPointer >> f.logValuesPerMap)
|
||||
|
@ -394,11 +394,11 @@ func (f *FilterMaps) tryPruneTailMaps(tailTarget uint64, stopFn func() bool) boo
|
|||
for tailMap < targetMap && !stopFn() {
|
||||
tailEpoch := tailMap >> f.logMapsPerEpoch
|
||||
if tailEpoch == lastEpoch {
|
||||
f.pruneMaps(tailMap, targetMap, &removeLvPtr)
|
||||
f.unindexMaps(tailMap, targetMap, &removeLvPtr)
|
||||
break
|
||||
}
|
||||
nextTailMap := (tailEpoch + 1) << f.logMapsPerEpoch
|
||||
f.pruneMaps(tailMap, nextTailMap, &removeLvPtr)
|
||||
f.unindexMaps(tailMap, nextTailMap, &removeLvPtr)
|
||||
tailMap = nextTailMap
|
||||
if !logged || time.Since(lastLogged) >= time.Second*10 {
|
||||
log.Info("Pruning log index tail...", "filter maps left", targetMap-tailMap)
|
||||
|
@ -411,9 +411,9 @@ func (f *FilterMaps) tryPruneTailMaps(tailTarget uint64, stopFn func() bool) boo
|
|||
return tailMap >= targetMap
|
||||
}
|
||||
|
||||
// pruneMaps removes filter maps and corresponding log index pointers in the
|
||||
// unindexMaps removes filter maps and corresponding log index pointers in the
|
||||
// specified range in a single batch.
|
||||
func (f *FilterMaps) pruneMaps(first, afterLast uint32, removeLvPtr *uint64) {
|
||||
func (f *FilterMaps) unindexMaps(first, afterLast uint32, removeLvPtr *uint64) {
|
||||
nextBlockNumber, err := f.getMapBlockPtr(afterLast)
|
||||
if err != nil {
|
||||
log.Error("Error fetching next map block pointer", "map index", afterLast, "error", err)
|
||||
|
@ -438,7 +438,7 @@ func (f *FilterMaps) pruneMaps(first, afterLast uint32, removeLvPtr *uint64) {
|
|||
fmr := f.getRange()
|
||||
fmr.tailLvPointer = uint64(afterLast) << f.logValuesPerMap
|
||||
if fmr.tailLvPointer > fmr.tailBlockLvPointer {
|
||||
log.Error("Cannot prune filter maps beyond tail block log value pointer", "tailLvPointer", fmr.tailLvPointer, "tailBlockLvPointer", fmr.tailBlockLvPointer)
|
||||
log.Error("Cannot unindex filter maps beyond tail block log value pointer", "tailLvPointer", fmr.tailLvPointer, "tailBlockLvPointer", fmr.tailBlockLvPointer)
|
||||
return
|
||||
}
|
||||
f.setRange(batch, fmr)
|
||||
|
|
|
@ -207,7 +207,7 @@ func (ts *testSetup) setHistory(history uint64, noHistory bool) {
|
|||
if ts.fm != nil {
|
||||
ts.fm.Stop()
|
||||
}
|
||||
ts.fm = NewFilterMaps(ts.db, ts.chain, ts.params, history, noHistory)
|
||||
ts.fm = NewFilterMaps(ts.db, ts.chain, ts.params, history, 1, noHistory)
|
||||
ts.fm.Start()
|
||||
}
|
||||
|
||||
|
|
|
@ -216,7 +216,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
eth.filterMaps = filtermaps.NewFilterMaps(chainDb, eth.blockchain, filtermaps.DefaultParams, config.LogHistory, config.LogNoHistory)
|
||||
eth.filterMaps = filtermaps.NewFilterMaps(chainDb, eth.blockchain, filtermaps.DefaultParams, config.LogHistory, 1000, config.LogNoHistory)
|
||||
|
||||
if config.BlobPool.Datadir != "" {
|
||||
config.BlobPool.Datadir = stack.ResolvePath(config.BlobPool.Datadir)
|
||||
|
|
|
@ -160,7 +160,7 @@ func (b *testBackend) NewMatcherBackend() filtermaps.MatcherBackend {
|
|||
}
|
||||
|
||||
func (b *testBackend) startFilterMaps(history uint64, noHistory bool) {
|
||||
b.fm = filtermaps.NewFilterMaps(b.db, b, filtermaps.DefaultParams, history, noHistory)
|
||||
b.fm = filtermaps.NewFilterMaps(b.db, b, filtermaps.DefaultParams, history, 1, noHistory)
|
||||
b.fm.Start()
|
||||
b.fm.WaitIdle()
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue