diff --git a/core/filtermaps/filtermaps.go b/core/filtermaps/filtermaps.go index 82a5e374b0..e4d367dd65 100644 --- a/core/filtermaps/filtermaps.go +++ b/core/filtermaps/filtermaps.go @@ -169,10 +169,12 @@ func NewFilterMaps(db ethdb.KeyValueStore, chain blockchain, params Params, hist lvPointerCache: lru.NewCache[uint64, uint64](1000), revertPoints: make(map[uint64]*revertPoint), } - fm.tailBlockLvPointer, err = fm.getBlockLvPointer(fm.tailBlockNumber) - if err != nil { - log.Error("Error fetching tail block pointer, resetting log index", "error", err) - fm.filterMapsRange = filterMapsRange{} // updateLoop resets the database + if fm.initialized { + fm.tailBlockLvPointer, err = fm.getBlockLvPointer(fm.tailBlockNumber) + if err != nil { + log.Error("Error fetching tail block pointer, resetting log index", "error", err) + fm.filterMapsRange = filterMapsRange{} // updateLoop resets the database + } } return fm } diff --git a/core/filtermaps/indexer.go b/core/filtermaps/indexer.go index 4909e32b0a..e844b5013c 100644 --- a/core/filtermaps/indexer.go +++ b/core/filtermaps/indexer.go @@ -115,7 +115,6 @@ func (f *FilterMaps) updateLoop() { if !f.tryInit(head) { return } - if !f.initialized { wait() continue @@ -123,16 +122,41 @@ func (f *FilterMaps) updateLoop() { } // log index is initialized if f.headBlockHash != head.Hash() { - if !f.tryUpdateHead(head) { + // log index head need to be updated + f.tryUpdateHead(func() *types.Header { + // return nil if head processing needs to be stopped + select { + case ev := <-headEventCh: + head = ev.Block.Header() + case syncMatcher = <-f.matcherSyncCh: + head = f.chain.CurrentBlock() + case <-f.closeCh: + stop = true + return nil + default: + head = f.chain.CurrentBlock() + } + return head + }) + if stop { return } + if !f.initialized { + continue + } if f.headBlockHash != head.Hash() { + // if head processing stopped without reaching current head then + // something went wrong; tryUpdateHead prints an error log in + // this case and there is nothing better to do here than retry + // later. Wait for an event though in order to avoid the retry + // loop spinning at full power. wait() continue } } + // log index is synced to the latest known chain head matcherSync() - // log index head is at latest chain head; process tail blocks if possible + // process tail blocks if possible if f.tryUpdateTail(head, func() bool { // return true if tail processing needs to be stopped select { @@ -201,9 +225,14 @@ func (f *FilterMaps) tryInit(head *types.Header) bool { // Returns false if indexer was stopped during a database reset. In this case the // indexer should exit and remaining parts of the old database will be removed // at next startup. -func (f *FilterMaps) tryUpdateHead(newHead *types.Header) bool { +func (f *FilterMaps) tryUpdateHead(headFn func() *types.Header) { + head := headFn() + if head == nil { + return + } + defer func() { - if newHead.Hash() == f.headBlockHash { + if head.Hash() == f.headBlockHash { if f.loggedHeadUpdate { log.Info("Forward log indexing finished", "processed", f.headBlockNumber-f.ptrHeadUpdate, "elapsed", common.PrettyDuration(time.Since(f.lastLogHeadUpdate))) @@ -212,7 +241,7 @@ func (f *FilterMaps) tryUpdateHead(newHead *types.Header) bool { } else { if time.Since(f.lastLogHeadUpdate) > logFrequency || !f.loggedHeadUpdate { log.Info("Forward log indexing in progress", "processed", f.headBlockNumber-f.ptrHeadUpdate, - "remaining", newHead.Number.Uint64()-f.headBlockNumber, + "remaining", head.Number.Uint64()-f.headBlockNumber, "elapsed", common.PrettyDuration(time.Since(f.startedHeadUpdate))) f.loggedHeadUpdate = true f.lastLogHeadUpdate = time.Now() @@ -220,79 +249,99 @@ func (f *FilterMaps) tryUpdateHead(newHead *types.Header) bool { } }() + hc := newHeaderChain(f.chain, head.Number.Uint64(), head.Hash()) + f.revertToCommonAncestor(head.Number.Uint64(), hc) + if !f.initialized { + return + } + if f.headBlockHash == head.Hash() { + return + } + if !f.startHeadUpdate { f.lastLogHeadUpdate = time.Now() f.startedHeadUpdate = f.lastLogHeadUpdate f.startHeadUpdate = true f.ptrHeadUpdate = f.headBlockNumber } - // iterate back from new head until the log index head or a revert point and - // collect headers of blocks to be added - var ( - newHeaders []*types.Header - chainPtr = newHead - rp *revertPoint - ) - for { - if rp == nil || chainPtr.Number.Uint64() < rp.blockNumber { - var err error - rp, err = f.getRevertPoint(chainPtr.Number.Uint64()) - if err != nil { - log.Error("Error fetching revert point", "block number", chainPtr.Number.Uint64(), "error", err) - return true - } - if rp == nil { - // there are no more revert points available so we should reset and re-initialize - log.Warn("No suitable revert point exists; re-initializing log index", "block number", newHead.Number.Uint64()) - return f.tryInit(newHead) - } - } - if chainPtr.Hash() == rp.blockHash { - // revert point found at an ancestor of the new head - break - } - // keep iterating backwards and collecting headers - newHeaders = append(newHeaders, chainPtr) - chainPtr = f.chain.GetHeader(chainPtr.ParentHash, chainPtr.Number.Uint64()-1) - if chainPtr == nil { - log.Error("Canonical header not found", "number", chainPtr.Number.Uint64()-1, "hash", chainPtr.ParentHash) - return true - } - } - if rp.blockHash != f.headBlockHash { - if rp.blockNumber+128 <= f.headBlockNumber { - log.Warn("Rolling back log index", "old head", f.headBlockNumber, "new head", chainPtr.Number.Uint64()) - } - if err := f.revertTo(rp); err != nil { - log.Error("Error applying revert point", "block number", chainPtr.Number.Uint64(), "error", err) - return true - } - } - if newHeaders == nil { - return true - } - // add logs of new blocks in reverse order + // add new blocks update := f.newUpdateBatch() - for i := len(newHeaders) - 1; i >= 0; i-- { - newHeader := newHeaders[i] - receipts := f.chain.GetReceiptsByHash(newHeader.Hash()) + for update.headBlockNumber < head.Number.Uint64() { + header := hc.getHeader(update.headBlockNumber + 1) + if header == nil { + log.Error("Header not found", "number", update.headBlockNumber+1) + return + } + receipts := f.chain.GetReceiptsByHash(header.Hash()) if receipts == nil { - log.Error("Could not retrieve block receipts for new block", "number", newHeader.Number, "hash", newHeader.Hash()) + log.Error("Could not retrieve block receipts for new block", "number", header.Number, "hash", header.Hash()) break } - if err := update.addBlockToHead(newHeader, receipts); err != nil { - log.Error("Error adding new block", "number", newHeader.Number, "hash", newHeader.Hash(), "error", err) + if err := update.addBlockToHead(header, receipts); err != nil { + log.Error("Error adding new block", "number", header.Number, "hash", header.Hash(), "error", err) break } if update.updatedRangeLength() >= f.mapsPerEpoch { // limit the amount of data updated in a single batch f.applyUpdateBatch(update) + newHead := headFn() + if newHead == nil { + return + } + if newHead.Hash() != head.Hash() { + head = newHead + hc = newHeaderChain(f.chain, head.Number.Uint64(), head.Hash()) + if hc.getBlockHash(f.headBlockNumber) != f.headBlockHash { + f.revertToCommonAncestor(head.Number.Uint64(), hc) + if !f.initialized { + return + } + } + } update = f.newUpdateBatch() } } f.applyUpdateBatch(update) - return true +} + +// find the latest revert point that is the ancestor of the new head +func (f *FilterMaps) revertToCommonAncestor(headNum uint64, hc *headerChain) { + var ( + number = headNum + rp *revertPoint + ) + for { + var err error + if rp, err = f.getRevertPoint(number); err == nil { + if rp == nil || hc.getBlockHash(rp.blockNumber) == rp.blockHash { + break + } + } else { + log.Error("Error fetching revert point", "block number", number, "error", err) + } + if rp.blockNumber == 0 { + rp = nil + break + } + number = rp.blockNumber - 1 + } + if rp == nil { + // there are no more revert points available so we should reset and re-initialize + log.Warn("No suitable revert point exists; re-initializing log index", "block number", headNum) + f.setRange(f.db, filterMapsRange{}) + return + } + if rp.blockHash == f.headBlockHash { + return // found the head revert point, nothing to do + } + // revert to the common ancestor if necessary + if rp.blockNumber+128 <= f.headBlockNumber { + log.Warn("Rolling back log index", "old head", f.headBlockNumber, "new head", rp.blockNumber) + } + if err := f.revertTo(rp); err != nil { + log.Error("Error applying revert point", "block number", rp.blockNumber, "error", err) + } } // tryUpdateTail attempts to extend or shorten the log index according to the @@ -494,18 +543,74 @@ func (f *FilterMaps) unindexTailPtr(tailTarget uint64) (newTailMap uint32, chang } // obtain tail target's parent hash if newRange.tailBlockNumber > 0 { - if f.chain.GetCanonicalHash(f.headBlockNumber) != f.headBlockHash { - return 0, false // if a reorg is happening right now then try again later - } - newRange.tailParentHash = f.chain.GetCanonicalHash(newRange.tailBlockNumber - 1) - if f.chain.GetCanonicalHash(f.headBlockNumber) != f.headBlockHash { - return 0, false // check again to make sure that tailParentHash is consistent with the indexed chain - } + newRange.tailParentHash = newHeaderChain(f.chain, f.headBlockNumber, f.headBlockHash).getBlockHash(newRange.tailBlockNumber - 1) } f.setRange(f.db, newRange) return newTailMap, true } +type headerChain struct { + chain blockchain + nonCanonical []*types.Header + number uint64 + hash common.Hash +} + +func newHeaderChain(chain blockchain, number uint64, hash common.Hash) *headerChain { + hc := &headerChain{ + chain: chain, + number: number, + hash: hash, + } + hc.extendNonCanonical() + return hc +} + +func (hc *headerChain) extendNonCanonical() bool { + for hc.hash != hc.chain.GetCanonicalHash(hc.number) { + header := hc.chain.GetHeader(hc.hash, hc.number) + if header == nil { + log.Error("Header not found", "number", hc.number, "hash", hc.hash) + return false + } + hc.nonCanonical = append(hc.nonCanonical, header) + hc.number, hc.hash = hc.number-1, header.ParentHash + } + return true +} + +func (hc *headerChain) getBlockHash(number uint64) common.Hash { + if number <= hc.number { + hash := hc.chain.GetCanonicalHash(number) + if !hc.extendNonCanonical() { + return common.Hash{} + } + if number <= hc.number { + return hash + } + } + if number-hc.number > uint64(len(hc.nonCanonical)) { + return common.Hash{} + } + return hc.nonCanonical[len(hc.nonCanonical)+1-int(number-hc.number)].Hash() +} + +func (hc *headerChain) getHeader(number uint64) *types.Header { + if number <= hc.number { + hash := hc.chain.GetCanonicalHash(number) + if !hc.extendNonCanonical() { + return nil + } + if number <= hc.number { + return hc.chain.GetHeader(hash, number) + } + } + if number-hc.number > uint64(len(hc.nonCanonical)) { + return nil + } + return hc.nonCanonical[len(hc.nonCanonical)+1-int(number-hc.number)] +} + // updateBatch is a memory overlay collecting changes to the index log structure // that can be written to the database in a single batch while the in-memory // representations in FilterMaps are also updated.