core/filtermaps: improved unindexer

This commit is contained in:
Zsolt Felfoldi 2024-10-03 23:08:47 +02:00
parent edb6f77b11
commit 1017be0fe0
1 changed files with 98 additions and 97 deletions

View File

@ -278,10 +278,11 @@ func (f *FilterMaps) tryUpdateHead(newHead *types.Header) bool {
return true
}
// tryUpdateTail attempts to extend or prune the log index according to the
// tryUpdateTail attempts to extend or shorten the log index according to the
// current head block number and the log history settings.
// stopFn is called regularly during the process, and if it returns true, the
// latest batch is written and the function returns.
// tryUpdateTail returns true if it has reached the desired history length.
func (f *FilterMaps) tryUpdateTail(head *types.Header, stopFn func() bool) bool {
var tailTarget uint64
if f.history > 0 {
@ -296,13 +297,13 @@ func (f *FilterMaps) tryUpdateTail(head *types.Header, stopFn func() bool) bool
}
}
if tailNum+f.unindexLimit <= tailTarget {
f.unindexTailPtr(tailTarget)
return f.tryUnindexTail(tailTarget, stopFn)
}
return f.tryUnindexTailMaps(tailTarget, stopFn)
return true
}
// tryExtendTail attempts to extend the log index backwards until it indexes the
// tail target block or cannot find more block receipts.
// tryExtendTail attempts to extend the log index backwards until the desired
// indexed history length is achieved. Returns true if finished.
func (f *FilterMaps) tryExtendTail(tailTarget uint64, stopFn func() bool) bool {
fmr := f.getRange()
number, parentHash := fmr.tailBlockNumber, fmr.tailParentHash
@ -335,116 +336,116 @@ func (f *FilterMaps) tryExtendTail(tailTarget uint64, stopFn func() bool) bool {
return number <= tailTarget
}
// unindexTailPtr updates the tail block number and hash and the corresponding
// tailBlockLvPointer according to the given tail target block number.
// Note that this function does not remove old index data, only marks it unused
// by updating the tail pointers, except for targetLvPointer which is unchanged
// as it marks the tail of the log index data stored in the database.
func (f *FilterMaps) unindexTailPtr(tailTarget uint64) {
// tryUnindexTail attempts to prune the log index tail until the desired indexed
// history length is achieved. Returns true if finished.
func (f *FilterMaps) tryUnindexTail(tailTarget uint64, stopFn func() bool) bool {
for {
if f.unindexTailEpoch(tailTarget) {
return true
}
if stopFn() {
return false
}
}
}
// unindexTailEpoch unindexes at most an epoch of tail log index data until the
// desired tail target is reached.
func (f *FilterMaps) unindexTailEpoch(tailTarget uint64) (finished bool) {
f.lock.Lock()
defer f.lock.Unlock()
oldRange := f.filterMapsRange
newTailMap, changed := f.unindexTailPtr(tailTarget)
newRange := f.filterMapsRange
f.lock.Unlock()
// obtain target log value pointer
if tailTarget <= f.tailBlockNumber || tailTarget > f.headBlockNumber {
return // nothing to do
if !changed {
return true // nothing more to do
}
targetLvPointer, err := f.getBlockLvPointer(tailTarget)
fmr := f.filterMapsRange
finished = newRange.tailBlockNumber == tailTarget
if err != nil {
log.Error("Error fetching tail target log value pointer", "block number", tailTarget, "error", err)
}
// obtain tail target's parent hash
var tailParentHash common.Hash
if tailTarget > 0 {
if f.chain.GetCanonicalHash(fmr.headBlockNumber) != fmr.headBlockHash {
return // if a reorg is happening right now then try again later
}
tailParentHash = f.chain.GetCanonicalHash(tailTarget - 1)
if f.chain.GetCanonicalHash(fmr.headBlockNumber) != fmr.headBlockHash {
return // check again to make sure that tailParentHash is consistent with the indexed chain
}
}
fmr.tailBlockNumber, fmr.tailParentHash = tailTarget, tailParentHash
fmr.tailBlockLvPointer = targetLvPointer
f.setRange(f.db, fmr)
}
// tryUnindexTailMaps removes unused filter maps and corresponding log index
// pointers from the database. This function also updates targetLvPointer.
func (f *FilterMaps) tryUnindexTailMaps(tailTarget uint64, stopFn func() bool) bool {
fmr := f.getRange()
tailMap := uint32(fmr.tailLvPointer >> f.logValuesPerMap)
targetMap := uint32(fmr.tailBlockLvPointer >> f.logValuesPerMap)
if tailMap >= targetMap {
return true
}
lastEpoch := (targetMap - 1) >> f.logMapsPerEpoch
removeLvPtr, err := f.getMapBlockPtr(tailMap)
if err != nil {
log.Error("Error fetching tail map block pointer", "map index", tailMap, "error", err)
removeLvPtr = math.MaxUint64 // do not remove anything
}
var (
logged bool
lastLogged time.Time
)
for tailMap < targetMap && !stopFn() {
tailEpoch := tailMap >> f.logMapsPerEpoch
if tailEpoch == lastEpoch {
f.unindexMaps(tailMap, targetMap, &removeLvPtr)
break
}
nextTailMap := (tailEpoch + 1) << f.logMapsPerEpoch
f.unindexMaps(tailMap, nextTailMap, &removeLvPtr)
tailMap = nextTailMap
if !logged || time.Since(lastLogged) >= time.Second*10 {
log.Info("Pruning log index tail...", "filter maps left", targetMap-tailMap)
logged, lastLogged = true, time.Now()
}
}
if logged {
log.Info("Finished pruning log index tail", "filter maps left", targetMap-tailMap)
}
return tailMap >= targetMap
}
// unindexMaps removes filter maps and corresponding log index pointers in the
// specified range in a single batch.
func (f *FilterMaps) unindexMaps(first, afterLast uint32, removeLvPtr *uint64) {
nextBlockNumber, err := f.getMapBlockPtr(afterLast)
if err != nil {
log.Error("Error fetching next map block pointer", "map index", afterLast, "error", err)
nextBlockNumber = 0 // do not remove anything
}
oldTailMap := uint32(oldRange.tailLvPointer >> f.logValuesPerMap)
// remove map data [oldTailMap, newTailMap) and block data
// [oldRange.tailBlockNumber, newRange.tailBlockNumber)
batch := f.db.NewBatch()
for *removeLvPtr < nextBlockNumber {
f.deleteBlockLvPointer(batch, *removeLvPtr)
if (*removeLvPtr)%revertPointFrequency == 0 {
rawdb.DeleteRevertPoint(batch, *removeLvPtr)
for blockNumber := oldRange.tailBlockNumber; blockNumber < newRange.tailBlockNumber; blockNumber++ {
f.deleteBlockLvPointer(batch, blockNumber)
if blockNumber%revertPointFrequency == 0 {
rawdb.DeleteRevertPoint(batch, blockNumber)
}
(*removeLvPtr)++
}
for mapIndex := first; mapIndex < afterLast; mapIndex++ {
for mapIndex := oldTailMap; mapIndex < newTailMap; mapIndex++ {
f.deleteMapBlockPtr(batch, mapIndex)
}
for rowIndex := uint32(0); rowIndex < f.mapHeight; rowIndex++ {
for mapIndex := first; mapIndex < afterLast; mapIndex++ {
for mapIndex := oldTailMap; mapIndex < newTailMap; mapIndex++ {
f.storeFilterMapRow(batch, mapIndex, rowIndex, emptyRow)
}
}
fmr := f.getRange()
fmr.tailLvPointer = uint64(afterLast) << f.logValuesPerMap
if fmr.tailLvPointer > fmr.tailBlockLvPointer {
log.Error("Cannot unindex filter maps beyond tail block log value pointer", "tailLvPointer", fmr.tailLvPointer, "tailBlockLvPointer", fmr.tailBlockLvPointer)
newRange.tailLvPointer = uint64(newTailMap) << f.logValuesPerMap
if newRange.tailLvPointer > newRange.tailBlockLvPointer {
log.Error("Cannot unindex filter maps beyond tail block log value pointer", "tailLvPointer", newRange.tailLvPointer, "tailBlockLvPointer", newRange.tailBlockLvPointer)
return
}
f.setRange(batch, fmr)
f.lock.Lock()
f.setRange(batch, newRange)
if err := batch.Write(); err != nil {
log.Crit("Could not write update batch", "error", err)
}
f.lock.Unlock()
return
}
// unindexTailPtr determines the range of tail maps to be removed in the next
// batch and updates the tail block number and hash and the corresponding
// tailBlockLvPointer accordingly.
// Note that this function does not remove old index data, only marks it unused
// by updating the tail pointers, except for targetLvPointer which is not changed
// yet as it marks the tail of the log index data stored in the database and
// therefore should be updated when map data is actually removed.
func (f *FilterMaps) unindexTailPtr(tailTarget uint64) (newTailMap uint32, changed bool) {
// obtain target log value pointer
if tailTarget <= f.tailBlockNumber || tailTarget > f.headBlockNumber {
return 0, false // nothing to do
}
targetLvPointer, err := f.getBlockLvPointer(tailTarget)
if err != nil {
log.Error("Error fetching tail target log value pointer", "block number", tailTarget, "error", err)
return 0, false
}
newRange := f.filterMapsRange
tailMap := uint32(f.tailBlockLvPointer >> f.logValuesPerMap)
nextEpochFirstMap := ((tailMap >> f.logMapsPerEpoch) + 1) << f.logMapsPerEpoch
targetMap := uint32(targetLvPointer >> f.logValuesPerMap)
if targetMap <= nextEpochFirstMap {
// unindexed range is within a single epoch, do it in a single batch
newRange.tailBlockNumber, newRange.tailBlockLvPointer, newTailMap = tailTarget, targetLvPointer, targetMap
} else {
// new tail map should be nextEpochFirstMap, determine new tail block
tailBlockNumber, err := f.getMapBlockPtr(nextEpochFirstMap)
if err != nil {
log.Error("Error fetching tail map block pointer", "map index", nextEpochFirstMap, "error", err)
return 0, false
}
tailBlockNumber++
tailBlockLvPointer, err := f.getBlockLvPointer(tailBlockNumber)
if err != nil {
log.Error("Error fetching tail block log value pointer", "block number", tailBlockNumber, "error", err)
return 0, false
}
newRange.tailBlockNumber, newRange.tailBlockLvPointer, newTailMap = tailBlockNumber, tailBlockLvPointer, uint32(tailBlockLvPointer>>f.logValuesPerMap)
}
// obtain tail target's parent hash
if newRange.tailBlockNumber > 0 {
if f.chain.GetCanonicalHash(f.headBlockNumber) != f.headBlockHash {
return 0, false // if a reorg is happening right now then try again later
}
newRange.tailParentHash = f.chain.GetCanonicalHash(newRange.tailBlockNumber - 1)
if f.chain.GetCanonicalHash(f.headBlockNumber) != f.headBlockHash {
return 0, false // check again to make sure that tailParentHash is consistent with the indexed chain
}
}
f.setRange(f.db, newRange)
return newTailMap, true
}
// updateBatch is a memory overlay collecting changes to the index log structure