core/filtermaps: improved head indexing

This commit is contained in:
Zsolt Felfoldi 2024-10-15 15:16:23 +02:00
parent 3cace2a13d
commit d164b762c0
2 changed files with 177 additions and 70 deletions

View File

@ -169,10 +169,12 @@ func NewFilterMaps(db ethdb.KeyValueStore, chain blockchain, params Params, hist
lvPointerCache: lru.NewCache[uint64, uint64](1000), lvPointerCache: lru.NewCache[uint64, uint64](1000),
revertPoints: make(map[uint64]*revertPoint), revertPoints: make(map[uint64]*revertPoint),
} }
fm.tailBlockLvPointer, err = fm.getBlockLvPointer(fm.tailBlockNumber) if fm.initialized {
if err != nil { fm.tailBlockLvPointer, err = fm.getBlockLvPointer(fm.tailBlockNumber)
log.Error("Error fetching tail block pointer, resetting log index", "error", err) if err != nil {
fm.filterMapsRange = filterMapsRange{} // updateLoop resets the database log.Error("Error fetching tail block pointer, resetting log index", "error", err)
fm.filterMapsRange = filterMapsRange{} // updateLoop resets the database
}
} }
return fm return fm
} }

View File

@ -115,7 +115,6 @@ func (f *FilterMaps) updateLoop() {
if !f.tryInit(head) { if !f.tryInit(head) {
return return
} }
if !f.initialized { if !f.initialized {
wait() wait()
continue continue
@ -123,16 +122,41 @@ func (f *FilterMaps) updateLoop() {
} }
// log index is initialized // log index is initialized
if f.headBlockHash != head.Hash() { if f.headBlockHash != head.Hash() {
if !f.tryUpdateHead(head) { // log index head need to be updated
f.tryUpdateHead(func() *types.Header {
// return nil if head processing needs to be stopped
select {
case ev := <-headEventCh:
head = ev.Block.Header()
case syncMatcher = <-f.matcherSyncCh:
head = f.chain.CurrentBlock()
case <-f.closeCh:
stop = true
return nil
default:
head = f.chain.CurrentBlock()
}
return head
})
if stop {
return return
} }
if !f.initialized {
continue
}
if f.headBlockHash != head.Hash() { if f.headBlockHash != head.Hash() {
// if head processing stopped without reaching current head then
// something went wrong; tryUpdateHead prints an error log in
// this case and there is nothing better to do here than retry
// later. Wait for an event though in order to avoid the retry
// loop spinning at full power.
wait() wait()
continue continue
} }
} }
// log index is synced to the latest known chain head
matcherSync() matcherSync()
// log index head is at latest chain head; process tail blocks if possible // process tail blocks if possible
if f.tryUpdateTail(head, func() bool { if f.tryUpdateTail(head, func() bool {
// return true if tail processing needs to be stopped // return true if tail processing needs to be stopped
select { select {
@ -201,9 +225,14 @@ func (f *FilterMaps) tryInit(head *types.Header) bool {
// Returns false if indexer was stopped during a database reset. In this case the // Returns false if indexer was stopped during a database reset. In this case the
// indexer should exit and remaining parts of the old database will be removed // indexer should exit and remaining parts of the old database will be removed
// at next startup. // at next startup.
func (f *FilterMaps) tryUpdateHead(newHead *types.Header) bool { func (f *FilterMaps) tryUpdateHead(headFn func() *types.Header) {
head := headFn()
if head == nil {
return
}
defer func() { defer func() {
if newHead.Hash() == f.headBlockHash { if head.Hash() == f.headBlockHash {
if f.loggedHeadUpdate { if f.loggedHeadUpdate {
log.Info("Forward log indexing finished", "processed", f.headBlockNumber-f.ptrHeadUpdate, log.Info("Forward log indexing finished", "processed", f.headBlockNumber-f.ptrHeadUpdate,
"elapsed", common.PrettyDuration(time.Since(f.lastLogHeadUpdate))) "elapsed", common.PrettyDuration(time.Since(f.lastLogHeadUpdate)))
@ -212,7 +241,7 @@ func (f *FilterMaps) tryUpdateHead(newHead *types.Header) bool {
} else { } else {
if time.Since(f.lastLogHeadUpdate) > logFrequency || !f.loggedHeadUpdate { if time.Since(f.lastLogHeadUpdate) > logFrequency || !f.loggedHeadUpdate {
log.Info("Forward log indexing in progress", "processed", f.headBlockNumber-f.ptrHeadUpdate, log.Info("Forward log indexing in progress", "processed", f.headBlockNumber-f.ptrHeadUpdate,
"remaining", newHead.Number.Uint64()-f.headBlockNumber, "remaining", head.Number.Uint64()-f.headBlockNumber,
"elapsed", common.PrettyDuration(time.Since(f.startedHeadUpdate))) "elapsed", common.PrettyDuration(time.Since(f.startedHeadUpdate)))
f.loggedHeadUpdate = true f.loggedHeadUpdate = true
f.lastLogHeadUpdate = time.Now() f.lastLogHeadUpdate = time.Now()
@ -220,79 +249,99 @@ func (f *FilterMaps) tryUpdateHead(newHead *types.Header) bool {
} }
}() }()
hc := newHeaderChain(f.chain, head.Number.Uint64(), head.Hash())
f.revertToCommonAncestor(head.Number.Uint64(), hc)
if !f.initialized {
return
}
if f.headBlockHash == head.Hash() {
return
}
if !f.startHeadUpdate { if !f.startHeadUpdate {
f.lastLogHeadUpdate = time.Now() f.lastLogHeadUpdate = time.Now()
f.startedHeadUpdate = f.lastLogHeadUpdate f.startedHeadUpdate = f.lastLogHeadUpdate
f.startHeadUpdate = true f.startHeadUpdate = true
f.ptrHeadUpdate = f.headBlockNumber f.ptrHeadUpdate = f.headBlockNumber
} }
// iterate back from new head until the log index head or a revert point and
// collect headers of blocks to be added
var (
newHeaders []*types.Header
chainPtr = newHead
rp *revertPoint
)
for {
if rp == nil || chainPtr.Number.Uint64() < rp.blockNumber {
var err error
rp, err = f.getRevertPoint(chainPtr.Number.Uint64())
if err != nil {
log.Error("Error fetching revert point", "block number", chainPtr.Number.Uint64(), "error", err)
return true
}
if rp == nil {
// there are no more revert points available so we should reset and re-initialize
log.Warn("No suitable revert point exists; re-initializing log index", "block number", newHead.Number.Uint64())
return f.tryInit(newHead)
}
}
if chainPtr.Hash() == rp.blockHash {
// revert point found at an ancestor of the new head
break
}
// keep iterating backwards and collecting headers
newHeaders = append(newHeaders, chainPtr)
chainPtr = f.chain.GetHeader(chainPtr.ParentHash, chainPtr.Number.Uint64()-1)
if chainPtr == nil {
log.Error("Canonical header not found", "number", chainPtr.Number.Uint64()-1, "hash", chainPtr.ParentHash)
return true
}
}
if rp.blockHash != f.headBlockHash {
if rp.blockNumber+128 <= f.headBlockNumber {
log.Warn("Rolling back log index", "old head", f.headBlockNumber, "new head", chainPtr.Number.Uint64())
}
if err := f.revertTo(rp); err != nil {
log.Error("Error applying revert point", "block number", chainPtr.Number.Uint64(), "error", err)
return true
}
}
if newHeaders == nil { // add new blocks
return true
}
// add logs of new blocks in reverse order
update := f.newUpdateBatch() update := f.newUpdateBatch()
for i := len(newHeaders) - 1; i >= 0; i-- { for update.headBlockNumber < head.Number.Uint64() {
newHeader := newHeaders[i] header := hc.getHeader(update.headBlockNumber + 1)
receipts := f.chain.GetReceiptsByHash(newHeader.Hash()) if header == nil {
log.Error("Header not found", "number", update.headBlockNumber+1)
return
}
receipts := f.chain.GetReceiptsByHash(header.Hash())
if receipts == nil { if receipts == nil {
log.Error("Could not retrieve block receipts for new block", "number", newHeader.Number, "hash", newHeader.Hash()) log.Error("Could not retrieve block receipts for new block", "number", header.Number, "hash", header.Hash())
break break
} }
if err := update.addBlockToHead(newHeader, receipts); err != nil { if err := update.addBlockToHead(header, receipts); err != nil {
log.Error("Error adding new block", "number", newHeader.Number, "hash", newHeader.Hash(), "error", err) log.Error("Error adding new block", "number", header.Number, "hash", header.Hash(), "error", err)
break break
} }
if update.updatedRangeLength() >= f.mapsPerEpoch { if update.updatedRangeLength() >= f.mapsPerEpoch {
// limit the amount of data updated in a single batch // limit the amount of data updated in a single batch
f.applyUpdateBatch(update) f.applyUpdateBatch(update)
newHead := headFn()
if newHead == nil {
return
}
if newHead.Hash() != head.Hash() {
head = newHead
hc = newHeaderChain(f.chain, head.Number.Uint64(), head.Hash())
if hc.getBlockHash(f.headBlockNumber) != f.headBlockHash {
f.revertToCommonAncestor(head.Number.Uint64(), hc)
if !f.initialized {
return
}
}
}
update = f.newUpdateBatch() update = f.newUpdateBatch()
} }
} }
f.applyUpdateBatch(update) f.applyUpdateBatch(update)
return true }
// find the latest revert point that is the ancestor of the new head
func (f *FilterMaps) revertToCommonAncestor(headNum uint64, hc *headerChain) {
var (
number = headNum
rp *revertPoint
)
for {
var err error
if rp, err = f.getRevertPoint(number); err == nil {
if rp == nil || hc.getBlockHash(rp.blockNumber) == rp.blockHash {
break
}
} else {
log.Error("Error fetching revert point", "block number", number, "error", err)
}
if rp.blockNumber == 0 {
rp = nil
break
}
number = rp.blockNumber - 1
}
if rp == nil {
// there are no more revert points available so we should reset and re-initialize
log.Warn("No suitable revert point exists; re-initializing log index", "block number", headNum)
f.setRange(f.db, filterMapsRange{})
return
}
if rp.blockHash == f.headBlockHash {
return // found the head revert point, nothing to do
}
// revert to the common ancestor if necessary
if rp.blockNumber+128 <= f.headBlockNumber {
log.Warn("Rolling back log index", "old head", f.headBlockNumber, "new head", rp.blockNumber)
}
if err := f.revertTo(rp); err != nil {
log.Error("Error applying revert point", "block number", rp.blockNumber, "error", err)
}
} }
// tryUpdateTail attempts to extend or shorten the log index according to the // tryUpdateTail attempts to extend or shorten the log index according to the
@ -494,18 +543,74 @@ func (f *FilterMaps) unindexTailPtr(tailTarget uint64) (newTailMap uint32, chang
} }
// obtain tail target's parent hash // obtain tail target's parent hash
if newRange.tailBlockNumber > 0 { if newRange.tailBlockNumber > 0 {
if f.chain.GetCanonicalHash(f.headBlockNumber) != f.headBlockHash { newRange.tailParentHash = newHeaderChain(f.chain, f.headBlockNumber, f.headBlockHash).getBlockHash(newRange.tailBlockNumber - 1)
return 0, false // if a reorg is happening right now then try again later
}
newRange.tailParentHash = f.chain.GetCanonicalHash(newRange.tailBlockNumber - 1)
if f.chain.GetCanonicalHash(f.headBlockNumber) != f.headBlockHash {
return 0, false // check again to make sure that tailParentHash is consistent with the indexed chain
}
} }
f.setRange(f.db, newRange) f.setRange(f.db, newRange)
return newTailMap, true return newTailMap, true
} }
type headerChain struct {
chain blockchain
nonCanonical []*types.Header
number uint64
hash common.Hash
}
func newHeaderChain(chain blockchain, number uint64, hash common.Hash) *headerChain {
hc := &headerChain{
chain: chain,
number: number,
hash: hash,
}
hc.extendNonCanonical()
return hc
}
func (hc *headerChain) extendNonCanonical() bool {
for hc.hash != hc.chain.GetCanonicalHash(hc.number) {
header := hc.chain.GetHeader(hc.hash, hc.number)
if header == nil {
log.Error("Header not found", "number", hc.number, "hash", hc.hash)
return false
}
hc.nonCanonical = append(hc.nonCanonical, header)
hc.number, hc.hash = hc.number-1, header.ParentHash
}
return true
}
func (hc *headerChain) getBlockHash(number uint64) common.Hash {
if number <= hc.number {
hash := hc.chain.GetCanonicalHash(number)
if !hc.extendNonCanonical() {
return common.Hash{}
}
if number <= hc.number {
return hash
}
}
if number-hc.number > uint64(len(hc.nonCanonical)) {
return common.Hash{}
}
return hc.nonCanonical[len(hc.nonCanonical)+1-int(number-hc.number)].Hash()
}
func (hc *headerChain) getHeader(number uint64) *types.Header {
if number <= hc.number {
hash := hc.chain.GetCanonicalHash(number)
if !hc.extendNonCanonical() {
return nil
}
if number <= hc.number {
return hc.chain.GetHeader(hash, number)
}
}
if number-hc.number > uint64(len(hc.nonCanonical)) {
return nil
}
return hc.nonCanonical[len(hc.nonCanonical)+1-int(number-hc.number)]
}
// updateBatch is a memory overlay collecting changes to the index log structure // updateBatch is a memory overlay collecting changes to the index log structure
// that can be written to the database in a single batch while the in-memory // that can be written to the database in a single batch while the in-memory
// representations in FilterMaps are also updated. // representations in FilterMaps are also updated.