core/filtermaps: always use correct absolute log index
This commit is contained in:
parent
f431db1e27
commit
077ab869f3
|
@ -29,7 +29,6 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
startLvMap = 1 << 31 // map index assigned to init block
|
||||
removedPointer = math.MaxUint64 // used in updateBatch to signal removed items
|
||||
revertPointFrequency = 256 // frequency of revert points in database
|
||||
cachedRevertPoints = 64 // revert points for most recent blocks in memory
|
||||
|
@ -157,7 +156,7 @@ func (f *FilterMaps) updateLoop() {
|
|||
// log index is synced to the latest known chain head
|
||||
matcherSync()
|
||||
// process tail blocks if possible
|
||||
if f.tryUpdateTail(head, func() bool {
|
||||
if f.tryUpdateTail(func() bool {
|
||||
// return true if tail processing needs to be stopped
|
||||
select {
|
||||
case ev := <-headEventCh:
|
||||
|
@ -204,13 +203,14 @@ func (f *FilterMaps) tryInit(head *types.Header) bool {
|
|||
if !f.reset() {
|
||||
return false
|
||||
}
|
||||
head = f.chain.GetHeader(f.chain.GetCanonicalHash(0), 0)
|
||||
receipts := f.chain.GetReceiptsByHash(head.Hash())
|
||||
if receipts == nil {
|
||||
log.Error("Could not retrieve block receipts for init block", "number", head.Number, "hash", head.Hash())
|
||||
return true
|
||||
}
|
||||
update := f.newUpdateBatch()
|
||||
if err := update.initWithBlock(head, receipts); err != nil {
|
||||
if err := update.initWithBlock(head, receipts, 0); err != nil {
|
||||
log.Error("Could not initialize log index", "error", err)
|
||||
}
|
||||
f.applyUpdateBatch(update)
|
||||
|
@ -234,21 +234,24 @@ func (f *FilterMaps) tryUpdateHead(headFn func() *types.Header) {
|
|||
defer func() {
|
||||
if head.Hash() == f.headBlockHash {
|
||||
if f.loggedHeadUpdate {
|
||||
log.Info("Forward log indexing finished", "processed", f.headBlockNumber-f.ptrHeadUpdate,
|
||||
"elapsed", common.PrettyDuration(time.Since(f.lastLogHeadUpdate)))
|
||||
f.loggedHeadUpdate, f.startHeadUpdate = false, false
|
||||
}
|
||||
} else {
|
||||
if time.Since(f.lastLogHeadUpdate) > logFrequency || !f.loggedHeadUpdate {
|
||||
log.Info("Forward log indexing in progress", "processed", f.headBlockNumber-f.ptrHeadUpdate,
|
||||
"remaining", head.Number.Uint64()-f.headBlockNumber,
|
||||
log.Info("Forward log indexing finished", "filter maps", f.mapCount(f.logValuesPerMap), "first block", f.tailBlockNumber,
|
||||
"last block", f.headBlockNumber, "processed", f.headBlockNumber-f.ptrHeadUpdate,
|
||||
"elapsed", common.PrettyDuration(time.Since(f.startedHeadUpdate)))
|
||||
f.loggedHeadUpdate = true
|
||||
f.lastLogHeadUpdate = time.Now()
|
||||
f.loggedHeadUpdate, f.startHeadUpdate = false, false
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
printProgressLog := func() {
|
||||
if time.Since(f.lastLogHeadUpdate) > logFrequency || !f.loggedHeadUpdate {
|
||||
log.Info("Forward log indexing in progress", "filter maps", f.mapCount(f.logValuesPerMap), "first block", f.tailBlockNumber,
|
||||
"last block", f.headBlockNumber, "processed", f.headBlockNumber-f.ptrHeadUpdate, "remaining", head.Number.Uint64()-f.headBlockNumber,
|
||||
"elapsed", common.PrettyDuration(time.Since(f.startedHeadUpdate)))
|
||||
f.loggedHeadUpdate = true
|
||||
f.lastLogHeadUpdate = time.Now()
|
||||
}
|
||||
}
|
||||
|
||||
hc := newHeaderChain(f.chain, head.Number.Uint64(), head.Hash())
|
||||
f.revertToCommonAncestor(head.Number.Uint64(), hc)
|
||||
if !f.initialized {
|
||||
|
@ -267,6 +270,7 @@ func (f *FilterMaps) tryUpdateHead(headFn func() *types.Header) {
|
|||
|
||||
// add new blocks
|
||||
update := f.newUpdateBatch()
|
||||
lastHeadEpoch := update.headEpoch()
|
||||
for update.headBlockNumber < head.Number.Uint64() {
|
||||
header := hc.getHeader(update.headBlockNumber + 1)
|
||||
if header == nil {
|
||||
|
@ -282,24 +286,48 @@ func (f *FilterMaps) tryUpdateHead(headFn func() *types.Header) {
|
|||
log.Error("Error adding new block", "number", header.Number, "hash", header.Hash(), "error", err)
|
||||
break
|
||||
}
|
||||
if update.updatedRangeLength() >= f.mapsPerEpoch {
|
||||
if update.headBlockNumber+cachedRevertPoints > head.Number.Uint64() ||
|
||||
update.headBlockNumber%revertPointFrequency == 0 {
|
||||
if rp, err := update.makeRevertPoint(); err != nil {
|
||||
log.Error("Error creating revert point", "block number", update.headBlockNumber, "error", err)
|
||||
} else if rp != nil {
|
||||
update.revertPoints[update.headBlockNumber] = rp
|
||||
}
|
||||
}
|
||||
newHead := headFn()
|
||||
if newHead == nil {
|
||||
f.applyUpdateBatch(update)
|
||||
printProgressLog()
|
||||
return
|
||||
}
|
||||
if newHead.Hash() != head.Hash() {
|
||||
head = newHead
|
||||
hc = newHeaderChain(f.chain, head.Number.Uint64(), head.Hash())
|
||||
if hc.getBlockHash(f.headBlockNumber) != f.headBlockHash {
|
||||
f.applyUpdateBatch(update)
|
||||
printProgressLog()
|
||||
f.revertToCommonAncestor(head.Number.Uint64(), hc)
|
||||
if !f.initialized {
|
||||
return
|
||||
}
|
||||
update = f.newUpdateBatch()
|
||||
}
|
||||
}
|
||||
if headEpoch := update.headEpoch(); headEpoch > lastHeadEpoch {
|
||||
// limit the amount of data updated in a single batch
|
||||
f.applyUpdateBatch(update)
|
||||
newHead := headFn()
|
||||
if newHead == nil {
|
||||
return
|
||||
// after adding 1 epoch of new log data remove at most 2 epochs of
|
||||
// unwanted tail data if necessary
|
||||
tailTarget := f.tailTarget()
|
||||
if f.tailBlockNumber < tailTarget {
|
||||
f.unindexTailEpoch(tailTarget)
|
||||
}
|
||||
if newHead.Hash() != head.Hash() {
|
||||
head = newHead
|
||||
hc = newHeaderChain(f.chain, head.Number.Uint64(), head.Hash())
|
||||
if hc.getBlockHash(f.headBlockNumber) != f.headBlockHash {
|
||||
f.revertToCommonAncestor(head.Number.Uint64(), hc)
|
||||
if !f.initialized {
|
||||
return
|
||||
}
|
||||
}
|
||||
if f.tailBlockNumber < tailTarget {
|
||||
f.unindexTailEpoch(tailTarget)
|
||||
}
|
||||
printProgressLog()
|
||||
update = f.newUpdateBatch()
|
||||
lastHeadEpoch = headEpoch
|
||||
}
|
||||
}
|
||||
f.applyUpdateBatch(update)
|
||||
|
@ -307,6 +335,9 @@ func (f *FilterMaps) tryUpdateHead(headFn func() *types.Header) {
|
|||
|
||||
// find the latest revert point that is the ancestor of the new head
|
||||
func (f *FilterMaps) revertToCommonAncestor(headNum uint64, hc *headerChain) {
|
||||
if hc.getBlockHash(f.headBlockNumber) == f.headBlockHash {
|
||||
return
|
||||
}
|
||||
var (
|
||||
number = headNum
|
||||
rp *revertPoint
|
||||
|
@ -332,9 +363,6 @@ func (f *FilterMaps) revertToCommonAncestor(headNum uint64, hc *headerChain) {
|
|||
f.setRange(f.db, filterMapsRange{})
|
||||
return
|
||||
}
|
||||
if rp.blockHash == f.headBlockHash {
|
||||
return // found the head revert point, nothing to do
|
||||
}
|
||||
// revert to the common ancestor if necessary
|
||||
if rp.blockNumber+128 <= f.headBlockNumber {
|
||||
log.Warn("Rolling back log index", "old head", f.headBlockNumber, "new head", rp.blockNumber)
|
||||
|
@ -349,13 +377,8 @@ func (f *FilterMaps) revertToCommonAncestor(headNum uint64, hc *headerChain) {
|
|||
// stopFn is called regularly during the process, and if it returns true, the
|
||||
// latest batch is written and the function returns.
|
||||
// tryUpdateTail returns true if it has reached the desired history length.
|
||||
func (f *FilterMaps) tryUpdateTail(head *types.Header, stopFn func() bool) bool {
|
||||
var tailTarget uint64
|
||||
if f.history > 0 {
|
||||
if headNum := head.Number.Uint64(); headNum >= f.history {
|
||||
tailTarget = headNum + 1 - f.history
|
||||
}
|
||||
}
|
||||
func (f *FilterMaps) tryUpdateTail(stopFn func() bool) bool {
|
||||
tailTarget := f.tailTarget()
|
||||
tailNum := f.tailBlockNumber
|
||||
if tailNum > tailTarget {
|
||||
if !f.tryExtendTail(tailTarget, stopFn) {
|
||||
|
@ -368,14 +391,24 @@ func (f *FilterMaps) tryUpdateTail(head *types.Header, stopFn func() bool) bool
|
|||
return true
|
||||
}
|
||||
|
||||
// tailTarget returns the target value for the tail block number according to the
|
||||
// log history parameter and the current index head.
|
||||
func (f *FilterMaps) tailTarget() uint64 {
|
||||
if f.history == 0 || f.headBlockNumber < f.history {
|
||||
return 0
|
||||
}
|
||||
return f.headBlockNumber + 1 - f.history
|
||||
}
|
||||
|
||||
// tryExtendTail attempts to extend the log index backwards until the desired
|
||||
// indexed history length is achieved. Returns true if finished.
|
||||
func (f *FilterMaps) tryExtendTail(tailTarget uint64, stopFn func() bool) bool {
|
||||
defer func() {
|
||||
if f.tailBlockNumber <= tailTarget {
|
||||
if f.loggedTailExtend {
|
||||
log.Info("Reverse log indexing finished", "maps", f.mapCount(f.logValuesPerMap), "history", f.headBlockNumber+1-f.tailBlockNumber,
|
||||
"processed", f.ptrTailExtend-f.tailBlockNumber, "elapsed", common.PrettyDuration(time.Since(f.startedTailExtend)))
|
||||
log.Info("Reverse log indexing finished", "filter maps", f.mapCount(f.logValuesPerMap), "first block", f.tailBlockNumber,
|
||||
"last block", f.headBlockNumber, "processed", f.ptrTailExtend-f.tailBlockNumber,
|
||||
"elapsed", common.PrettyDuration(time.Since(f.startedTailExtend)))
|
||||
f.loggedTailExtend = false
|
||||
}
|
||||
}
|
||||
|
@ -397,8 +430,8 @@ func (f *FilterMaps) tryExtendTail(tailTarget uint64, stopFn func() bool) bool {
|
|||
f.applyUpdateBatch(update)
|
||||
|
||||
if time.Since(f.lastLogTailExtend) > logFrequency || !f.loggedTailExtend {
|
||||
log.Info("Reverse log indexing in progress", "maps", update.mapCount(f.logValuesPerMap), "history", update.headBlockNumber+1-update.tailBlockNumber,
|
||||
"processed", f.ptrTailExtend-update.tailBlockNumber, "remaining", update.tailBlockNumber-tailTarget,
|
||||
log.Info("Reverse log indexing in progress", "filter maps", f.mapCount(f.logValuesPerMap), "first block", f.tailBlockNumber,
|
||||
"last block", f.headBlockNumber, "processed", f.ptrTailExtend-update.tailBlockNumber, "remaining", f.tailBlockNumber-tailTarget,
|
||||
"elapsed", common.PrettyDuration(time.Since(f.startedTailExtend)))
|
||||
f.loggedTailExtend = true
|
||||
f.lastLogTailExtend = time.Now()
|
||||
|
@ -710,6 +743,11 @@ func (u *updateBatch) updatedRangeLength() uint32 {
|
|||
return u.afterLastMap - u.firstMap
|
||||
}
|
||||
|
||||
// headEpoch returns the head epoch index.
|
||||
func (u *updateBatch) headEpoch() uint32 {
|
||||
return uint32(u.headLvPointer >> (u.f.logValuesPerMap + u.f.logMapsPerEpoch))
|
||||
}
|
||||
|
||||
// tailEpoch returns the tail epoch index.
|
||||
func (u *updateBatch) tailEpoch() uint32 {
|
||||
return uint32(u.tailBlockLvPointer >> (u.f.logValuesPerMap + u.f.logMapsPerEpoch))
|
||||
|
@ -745,12 +783,11 @@ func (u *updateBatch) getRowPtr(mapIndex, rowIndex uint32) (*FilterRow, error) {
|
|||
}
|
||||
|
||||
// initWithBlock initializes the log index with the given block as head.
|
||||
func (u *updateBatch) initWithBlock(header *types.Header, receipts types.Receipts) error {
|
||||
func (u *updateBatch) initWithBlock(header *types.Header, receipts types.Receipts, startLvPointer uint64) error {
|
||||
if u.initialized {
|
||||
return errors.New("already initialized")
|
||||
}
|
||||
u.initialized = true
|
||||
startLvPointer := uint64(startLvMap) << u.f.logValuesPerMap
|
||||
u.headLvPointer, u.tailLvPointer, u.tailBlockLvPointer = startLvPointer, startLvPointer, startLvPointer
|
||||
u.headBlockNumber, u.tailBlockNumber = header.Number.Uint64()-1, header.Number.Uint64()
|
||||
u.headBlockHash, u.tailParentHash = header.ParentHash, header.ParentHash
|
||||
|
@ -795,11 +832,6 @@ func (u *updateBatch) addBlockToHead(header *types.Header, receipts types.Receip
|
|||
if (u.headBlockNumber-cachedRevertPoints)%revertPointFrequency != 0 {
|
||||
delete(u.revertPoints, u.headBlockNumber-cachedRevertPoints)
|
||||
}
|
||||
if rp, err := u.makeRevertPoint(); err != nil {
|
||||
return err
|
||||
} else if rp != nil {
|
||||
u.revertPoints[u.headBlockNumber] = rp
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue