core/filtermaps: fixed map pruning

This commit is contained in:
Zsolt Felfoldi 2024-09-30 02:58:08 +02:00
parent 9a5332ba1f
commit ce7cf98094
2 changed files with 25 additions and 24 deletions

View File

@ -17,6 +17,17 @@ const (
removedPointer = math.MaxUint64 // used in updateBatch to signal removed items removedPointer = math.MaxUint64 // used in updateBatch to signal removed items
revertPointFrequency = 256 // frequency of revert points in database revertPointFrequency = 256 // frequency of revert points in database
cachedRevertPoints = 64 // revert points for most recent blocks in memory cachedRevertPoints = 64 // revert points for most recent blocks in memory
testHookInit = iota
testHookUpdateHeadEpoch
testHookUpdateHead
testHookExtendTailEpoch
testHookExtendTail
testHookPruneTail
testHookPruneTailMaps
testHookRevert
testHookWait
testHookStop
) )
// updateLoop initializes and updates the log index structure according to the // updateLoop initializes and updates the log index structure according to the
@ -121,7 +132,7 @@ func (f *FilterMaps) updateLoop() {
syncMatcher = nil syncMatcher = nil
} }
// log index head is at latest chain head; process tail blocks if possible // log index head is at latest chain head; process tail blocks if possible
f.tryUpdateTail(head, func() bool { if f.tryUpdateTail(head, func() bool {
// return true if tail processing needs to be stopped // return true if tail processing needs to be stopped
select { select {
case ev := <-headEventCh: case ev := <-headEventCh:
@ -136,10 +147,9 @@ func (f *FilterMaps) updateLoop() {
} }
// stop if there is a new chain head (always prioritize head updates) // stop if there is a new chain head (always prioritize head updates)
return fmr.headBlockHash != head.Hash() return fmr.headBlockHash != head.Hash()
}) }) && fmr.headBlockHash == head.Hash() {
if fmr.headBlockHash == head.Hash() { // if tail processing reached its final state and there is no new
// if tail processing exited while there is no new head then no more // head then wait for more events
// tail blocks can be processed
wait() wait()
} }
} }
@ -264,7 +274,7 @@ func (f *FilterMaps) tryUpdateHead(newHead *types.Header) bool {
// current head block number and the log history settings. // current head block number and the log history settings.
// stopFn is called regularly during the process, and if it returns true, the // stopFn is called regularly during the process, and if it returns true, the
// latest batch is written and the function returns. // latest batch is written and the function returns.
func (f *FilterMaps) tryUpdateTail(head *types.Header, stopFn func() bool) { func (f *FilterMaps) tryUpdateTail(head *types.Header, stopFn func() bool) bool {
var tailTarget uint64 var tailTarget uint64
if f.history > 0 { if f.history > 0 {
if headNum := head.Number.Uint64(); headNum >= f.history { if headNum := head.Number.Uint64(); headNum >= f.history {
@ -273,17 +283,19 @@ func (f *FilterMaps) tryUpdateTail(head *types.Header, stopFn func() bool) {
} }
tailNum := f.getRange().tailBlockNumber tailNum := f.getRange().tailBlockNumber
if tailNum > tailTarget { if tailNum > tailTarget {
f.tryExtendTail(tailTarget, stopFn) if !f.tryExtendTail(tailTarget, stopFn) {
return false
}
} }
if tailNum < tailTarget { if tailNum < tailTarget {
f.pruneTailPtr(tailTarget) f.pruneTailPtr(tailTarget)
f.tryPruneTailMaps(tailTarget, stopFn)
} }
return f.tryPruneTailMaps(tailTarget, stopFn)
} }
// tryExtendTail attempts to extend the log index backwards until it indexes the // tryExtendTail attempts to extend the log index backwards until it indexes the
// tail target block or cannot find more block receipts. // tail target block or cannot find more block receipts.
func (f *FilterMaps) tryExtendTail(tailTarget uint64, stopFn func() bool) { func (f *FilterMaps) tryExtendTail(tailTarget uint64, stopFn func() bool) bool {
fmr := f.getRange() fmr := f.getRange()
number, parentHash := fmr.tailBlockNumber, fmr.tailParentHash number, parentHash := fmr.tailBlockNumber, fmr.tailParentHash
update := f.newUpdateBatch() update := f.newUpdateBatch()
@ -318,6 +330,7 @@ func (f *FilterMaps) tryExtendTail(tailTarget uint64, stopFn func() bool) {
if f.testHook != nil { if f.testHook != nil {
f.testHook(testHookExtendTail) f.testHook(testHookExtendTail)
} }
return number <= tailTarget
} }
// pruneTailPtr updates the tail block number and hash and the corresponding // pruneTailPtr updates the tail block number and hash and the corresponding
@ -362,12 +375,12 @@ func (f *FilterMaps) pruneTailPtr(tailTarget uint64) {
// tryPruneTailMaps removes unused filter maps and corresponding log index // tryPruneTailMaps removes unused filter maps and corresponding log index
// pointers from the database. This function also updates targetLvPointer. // pointers from the database. This function also updates targetLvPointer.
func (f *FilterMaps) tryPruneTailMaps(tailTarget uint64, stopFn func() bool) { func (f *FilterMaps) tryPruneTailMaps(tailTarget uint64, stopFn func() bool) bool {
fmr := f.getRange() fmr := f.getRange()
tailMap := uint32(fmr.tailLvPointer >> f.logValuesPerMap) tailMap := uint32(fmr.tailLvPointer >> f.logValuesPerMap)
targetMap := uint32(fmr.tailBlockLvPointer >> f.logValuesPerMap) targetMap := uint32(fmr.tailBlockLvPointer >> f.logValuesPerMap)
if tailMap >= targetMap { if tailMap >= targetMap {
return return true
} }
lastEpoch := (targetMap - 1) >> f.logMapsPerEpoch lastEpoch := (targetMap - 1) >> f.logMapsPerEpoch
removeLvPtr, err := f.getMapBlockPtr(tailMap) removeLvPtr, err := f.getMapBlockPtr(tailMap)
@ -396,6 +409,7 @@ func (f *FilterMaps) tryPruneTailMaps(tailTarget uint64, stopFn func() bool) {
if logged { if logged {
log.Info("Finished pruning log index tail", "filter maps left", targetMap-tailMap) log.Info("Finished pruning log index tail", "filter maps left", targetMap-tailMap)
} }
return tailMap >= targetMap
} }
// pruneMaps removes filter maps and corresponding log index pointers in the // pruneMaps removes filter maps and corresponding log index pointers in the

View File

@ -16,19 +16,6 @@ import (
"github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/params"
) )
const (
testHookInit = iota
testHookUpdateHeadEpoch
testHookUpdateHead
testHookExtendTailEpoch
testHookExtendTail
testHookPruneTail
testHookPruneTailMaps
testHookRevert
testHookWait
testHookStop
)
var testParams = Params{ var testParams = Params{
logMapHeight: 2, logMapHeight: 2,
logMapsPerEpoch: 4, logMapsPerEpoch: 4,