From 1017be0fe039d84fe3e0553f794a832524c46e83 Mon Sep 17 00:00:00 2001 From: Zsolt Felfoldi Date: Thu, 3 Oct 2024 23:08:47 +0200 Subject: [PATCH] core/filtermaps: improved unindexer --- core/filtermaps/indexer.go | 195 +++++++++++++++++++------------------ 1 file changed, 98 insertions(+), 97 deletions(-) diff --git a/core/filtermaps/indexer.go b/core/filtermaps/indexer.go index 6a2bae02d4..0b1db011e8 100644 --- a/core/filtermaps/indexer.go +++ b/core/filtermaps/indexer.go @@ -278,10 +278,11 @@ func (f *FilterMaps) tryUpdateHead(newHead *types.Header) bool { return true } -// tryUpdateTail attempts to extend or prune the log index according to the +// tryUpdateTail attempts to extend or shorten the log index according to the // current head block number and the log history settings. // stopFn is called regularly during the process, and if it returns true, the // latest batch is written and the function returns. +// tryUpdateTail returns true if it has reached the desired history length. func (f *FilterMaps) tryUpdateTail(head *types.Header, stopFn func() bool) bool { var tailTarget uint64 if f.history > 0 { @@ -296,13 +297,13 @@ func (f *FilterMaps) tryUpdateTail(head *types.Header, stopFn func() bool) bool } } if tailNum+f.unindexLimit <= tailTarget { - f.unindexTailPtr(tailTarget) + return f.tryUnindexTail(tailTarget, stopFn) } - return f.tryUnindexTailMaps(tailTarget, stopFn) + return true } -// tryExtendTail attempts to extend the log index backwards until it indexes the -// tail target block or cannot find more block receipts. +// tryExtendTail attempts to extend the log index backwards until the desired +// indexed history length is achieved. Returns true if finished. func (f *FilterMaps) tryExtendTail(tailTarget uint64, stopFn func() bool) bool { fmr := f.getRange() number, parentHash := fmr.tailBlockNumber, fmr.tailParentHash @@ -335,116 +336,116 @@ func (f *FilterMaps) tryExtendTail(tailTarget uint64, stopFn func() bool) bool { return number <= tailTarget } -// unindexTailPtr updates the tail block number and hash and the corresponding -// tailBlockLvPointer according to the given tail target block number. -// Note that this function does not remove old index data, only marks it unused -// by updating the tail pointers, except for targetLvPointer which is unchanged -// as it marks the tail of the log index data stored in the database. -func (f *FilterMaps) unindexTailPtr(tailTarget uint64) { +// tryUnindexTail attempts to prune the log index tail until the desired indexed +// history length is achieved. Returns true if finished. +func (f *FilterMaps) tryUnindexTail(tailTarget uint64, stopFn func() bool) bool { + for { + if f.unindexTailEpoch(tailTarget) { + return true + } + if stopFn() { + return false + } + } +} + +// unindexTailEpoch unindexes at most an epoch of tail log index data until the +// desired tail target is reached. +func (f *FilterMaps) unindexTailEpoch(tailTarget uint64) (finished bool) { f.lock.Lock() - defer f.lock.Unlock() + oldRange := f.filterMapsRange + newTailMap, changed := f.unindexTailPtr(tailTarget) + newRange := f.filterMapsRange + f.lock.Unlock() - // obtain target log value pointer - if tailTarget <= f.tailBlockNumber || tailTarget > f.headBlockNumber { - return // nothing to do + if !changed { + return true // nothing more to do } - targetLvPointer, err := f.getBlockLvPointer(tailTarget) - fmr := f.filterMapsRange + finished = newRange.tailBlockNumber == tailTarget - if err != nil { - log.Error("Error fetching tail target log value pointer", "block number", tailTarget, "error", err) - } - - // obtain tail target's parent hash - var tailParentHash common.Hash - if tailTarget > 0 { - if f.chain.GetCanonicalHash(fmr.headBlockNumber) != fmr.headBlockHash { - return // if a reorg is happening right now then try again later - } - tailParentHash = f.chain.GetCanonicalHash(tailTarget - 1) - if f.chain.GetCanonicalHash(fmr.headBlockNumber) != fmr.headBlockHash { - return // check again to make sure that tailParentHash is consistent with the indexed chain - } - } - - fmr.tailBlockNumber, fmr.tailParentHash = tailTarget, tailParentHash - fmr.tailBlockLvPointer = targetLvPointer - f.setRange(f.db, fmr) -} - -// tryUnindexTailMaps removes unused filter maps and corresponding log index -// pointers from the database. This function also updates targetLvPointer. -func (f *FilterMaps) tryUnindexTailMaps(tailTarget uint64, stopFn func() bool) bool { - fmr := f.getRange() - tailMap := uint32(fmr.tailLvPointer >> f.logValuesPerMap) - targetMap := uint32(fmr.tailBlockLvPointer >> f.logValuesPerMap) - if tailMap >= targetMap { - return true - } - lastEpoch := (targetMap - 1) >> f.logMapsPerEpoch - removeLvPtr, err := f.getMapBlockPtr(tailMap) - if err != nil { - log.Error("Error fetching tail map block pointer", "map index", tailMap, "error", err) - removeLvPtr = math.MaxUint64 // do not remove anything - } - var ( - logged bool - lastLogged time.Time - ) - for tailMap < targetMap && !stopFn() { - tailEpoch := tailMap >> f.logMapsPerEpoch - if tailEpoch == lastEpoch { - f.unindexMaps(tailMap, targetMap, &removeLvPtr) - break - } - nextTailMap := (tailEpoch + 1) << f.logMapsPerEpoch - f.unindexMaps(tailMap, nextTailMap, &removeLvPtr) - tailMap = nextTailMap - if !logged || time.Since(lastLogged) >= time.Second*10 { - log.Info("Pruning log index tail...", "filter maps left", targetMap-tailMap) - logged, lastLogged = true, time.Now() - } - } - if logged { - log.Info("Finished pruning log index tail", "filter maps left", targetMap-tailMap) - } - return tailMap >= targetMap -} - -// unindexMaps removes filter maps and corresponding log index pointers in the -// specified range in a single batch. -func (f *FilterMaps) unindexMaps(first, afterLast uint32, removeLvPtr *uint64) { - nextBlockNumber, err := f.getMapBlockPtr(afterLast) - if err != nil { - log.Error("Error fetching next map block pointer", "map index", afterLast, "error", err) - nextBlockNumber = 0 // do not remove anything - } + oldTailMap := uint32(oldRange.tailLvPointer >> f.logValuesPerMap) + // remove map data [oldTailMap, newTailMap) and block data + // [oldRange.tailBlockNumber, newRange.tailBlockNumber) batch := f.db.NewBatch() - for *removeLvPtr < nextBlockNumber { - f.deleteBlockLvPointer(batch, *removeLvPtr) - if (*removeLvPtr)%revertPointFrequency == 0 { - rawdb.DeleteRevertPoint(batch, *removeLvPtr) + for blockNumber := oldRange.tailBlockNumber; blockNumber < newRange.tailBlockNumber; blockNumber++ { + f.deleteBlockLvPointer(batch, blockNumber) + if blockNumber%revertPointFrequency == 0 { + rawdb.DeleteRevertPoint(batch, blockNumber) } - (*removeLvPtr)++ } - for mapIndex := first; mapIndex < afterLast; mapIndex++ { + for mapIndex := oldTailMap; mapIndex < newTailMap; mapIndex++ { f.deleteMapBlockPtr(batch, mapIndex) } for rowIndex := uint32(0); rowIndex < f.mapHeight; rowIndex++ { - for mapIndex := first; mapIndex < afterLast; mapIndex++ { + for mapIndex := oldTailMap; mapIndex < newTailMap; mapIndex++ { f.storeFilterMapRow(batch, mapIndex, rowIndex, emptyRow) } } - fmr := f.getRange() - fmr.tailLvPointer = uint64(afterLast) << f.logValuesPerMap - if fmr.tailLvPointer > fmr.tailBlockLvPointer { - log.Error("Cannot unindex filter maps beyond tail block log value pointer", "tailLvPointer", fmr.tailLvPointer, "tailBlockLvPointer", fmr.tailBlockLvPointer) + newRange.tailLvPointer = uint64(newTailMap) << f.logValuesPerMap + if newRange.tailLvPointer > newRange.tailBlockLvPointer { + log.Error("Cannot unindex filter maps beyond tail block log value pointer", "tailLvPointer", newRange.tailLvPointer, "tailBlockLvPointer", newRange.tailBlockLvPointer) return } - f.setRange(batch, fmr) + f.lock.Lock() + f.setRange(batch, newRange) if err := batch.Write(); err != nil { log.Crit("Could not write update batch", "error", err) } + f.lock.Unlock() + return +} + +// unindexTailPtr determines the range of tail maps to be removed in the next +// batch and updates the tail block number and hash and the corresponding +// tailBlockLvPointer accordingly. +// Note that this function does not remove old index data, only marks it unused +// by updating the tail pointers, except for targetLvPointer which is not changed +// yet as it marks the tail of the log index data stored in the database and +// therefore should be updated when map data is actually removed. +func (f *FilterMaps) unindexTailPtr(tailTarget uint64) (newTailMap uint32, changed bool) { + // obtain target log value pointer + if tailTarget <= f.tailBlockNumber || tailTarget > f.headBlockNumber { + return 0, false // nothing to do + } + targetLvPointer, err := f.getBlockLvPointer(tailTarget) + if err != nil { + log.Error("Error fetching tail target log value pointer", "block number", tailTarget, "error", err) + return 0, false + } + newRange := f.filterMapsRange + tailMap := uint32(f.tailBlockLvPointer >> f.logValuesPerMap) + nextEpochFirstMap := ((tailMap >> f.logMapsPerEpoch) + 1) << f.logMapsPerEpoch + targetMap := uint32(targetLvPointer >> f.logValuesPerMap) + if targetMap <= nextEpochFirstMap { + // unindexed range is within a single epoch, do it in a single batch + newRange.tailBlockNumber, newRange.tailBlockLvPointer, newTailMap = tailTarget, targetLvPointer, targetMap + } else { + // new tail map should be nextEpochFirstMap, determine new tail block + tailBlockNumber, err := f.getMapBlockPtr(nextEpochFirstMap) + if err != nil { + log.Error("Error fetching tail map block pointer", "map index", nextEpochFirstMap, "error", err) + return 0, false + } + tailBlockNumber++ + tailBlockLvPointer, err := f.getBlockLvPointer(tailBlockNumber) + if err != nil { + log.Error("Error fetching tail block log value pointer", "block number", tailBlockNumber, "error", err) + return 0, false + } + newRange.tailBlockNumber, newRange.tailBlockLvPointer, newTailMap = tailBlockNumber, tailBlockLvPointer, uint32(tailBlockLvPointer>>f.logValuesPerMap) + } + // obtain tail target's parent hash + if newRange.tailBlockNumber > 0 { + if f.chain.GetCanonicalHash(f.headBlockNumber) != f.headBlockHash { + return 0, false // if a reorg is happening right now then try again later + } + newRange.tailParentHash = f.chain.GetCanonicalHash(newRange.tailBlockNumber - 1) + if f.chain.GetCanonicalHash(f.headBlockNumber) != f.headBlockHash { + return 0, false // check again to make sure that tailParentHash is consistent with the indexed chain + } + } + f.setRange(f.db, newRange) + return newTailMap, true } // updateBatch is a memory overlay collecting changes to the index log structure