core/filtermaps: nice log info during indexing/unindexing
This commit is contained in:
parent
1017be0fe0
commit
95cbcdf81b
|
@ -49,18 +49,22 @@ type blockchain interface {
|
|||
// without the tree hashing and consensus changes:
|
||||
// https://eips.ethereum.org/EIPS/eip-7745
|
||||
type FilterMaps struct {
|
||||
lock sync.RWMutex
|
||||
db ethdb.KeyValueStore
|
||||
closeCh chan struct{}
|
||||
closeWg sync.WaitGroup
|
||||
history, unindexLimit uint64
|
||||
noHistory bool
|
||||
|
||||
Params
|
||||
filterMapsRange
|
||||
chain blockchain
|
||||
matcherSyncCh chan *FilterMapsMatcherBackend
|
||||
matchers map[*FilterMapsMatcherBackend]struct{}
|
||||
|
||||
// db and range are only modified by indexer under write lock; indexer can
|
||||
// read them without a lock while matchers can access them under read lock
|
||||
lock sync.RWMutex
|
||||
db ethdb.KeyValueStore
|
||||
filterMapsRange
|
||||
|
||||
matchers map[*FilterMapsMatcherBackend]struct{}
|
||||
|
||||
// filterMapCache caches certain filter maps (headCacheSize most recent maps
|
||||
// and one tail map) that are expected to be frequently accessed and modified
|
||||
// while updating the structure. Note that the set of cached maps depends
|
||||
|
@ -71,6 +75,11 @@ type FilterMaps struct {
|
|||
lvPointerCache *lru.Cache[uint64, uint64]
|
||||
revertPoints map[uint64]*revertPoint
|
||||
|
||||
startHeadUpdate, loggedHeadUpdate, loggedTailExtend, loggedTailUnindex bool
|
||||
startedHeadUpdate, startedTailExtend, startedTailUnindex time.Time
|
||||
lastLogHeadUpdate, lastLogTailExtend, lastLogTailUnindex time.Time
|
||||
ptrHeadUpdate, ptrTailExtend, ptrTailUnindex uint64
|
||||
|
||||
waitIdleCh chan chan bool
|
||||
}
|
||||
|
||||
|
@ -120,13 +129,14 @@ func NewFilterMaps(db ethdb.KeyValueStore, chain blockchain, params Params, hist
|
|||
}
|
||||
params.deriveFields()
|
||||
fm := &FilterMaps{
|
||||
db: db,
|
||||
chain: chain,
|
||||
closeCh: make(chan struct{}),
|
||||
waitIdleCh: make(chan chan bool),
|
||||
history: history,
|
||||
noHistory: noHistory,
|
||||
Params: params,
|
||||
db: db,
|
||||
chain: chain,
|
||||
closeCh: make(chan struct{}),
|
||||
waitIdleCh: make(chan chan bool),
|
||||
history: history,
|
||||
noHistory: noHistory,
|
||||
unindexLimit: unindexLimit,
|
||||
Params: params,
|
||||
filterMapsRange: filterMapsRange{
|
||||
initialized: rs.Initialized,
|
||||
headLvPointer: rs.HeadLvPointer,
|
||||
|
@ -151,13 +161,14 @@ func NewFilterMaps(db ethdb.KeyValueStore, chain blockchain, params Params, hist
|
|||
return fm
|
||||
}
|
||||
|
||||
// Start starts the indexer.
|
||||
func (f *FilterMaps) Start() {
|
||||
f.closeWg.Add(2)
|
||||
go f.removeBloomBits()
|
||||
go f.updateLoop()
|
||||
}
|
||||
|
||||
// Close ensures that the indexer is fully stopped before returning.
|
||||
// Stop ensures that the indexer is fully stopped before returning.
|
||||
func (f *FilterMaps) Stop() {
|
||||
close(f.closeCh)
|
||||
f.closeWg.Wait()
|
||||
|
@ -172,10 +183,10 @@ func (f *FilterMaps) reset() bool {
|
|||
f.revertPoints = make(map[uint64]*revertPoint)
|
||||
f.blockPtrCache.Purge()
|
||||
f.lvPointerCache.Purge()
|
||||
f.lock.Unlock()
|
||||
// deleting the range first ensures that resetDb will be called again at next
|
||||
// startup and any leftover data will be removed even if it cannot finish now.
|
||||
rawdb.DeleteFilterMapsRange(f.db)
|
||||
f.lock.Unlock()
|
||||
return f.removeDbWithPrefix(rawdb.FilterMapsPrefix, "Resetting log index database")
|
||||
}
|
||||
|
||||
|
|
|
@ -29,10 +29,11 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
startLvMap = 1 << 31 // map index assigned to init block
|
||||
removedPointer = math.MaxUint64 // used in updateBatch to signal removed items
|
||||
revertPointFrequency = 256 // frequency of revert points in database
|
||||
cachedRevertPoints = 64 // revert points for most recent blocks in memory
|
||||
startLvMap = 1 << 31 // map index assigned to init block
|
||||
removedPointer = math.MaxUint64 // used in updateBatch to signal removed items
|
||||
revertPointFrequency = 256 // frequency of revert points in database
|
||||
cachedRevertPoints = 64 // revert points for most recent blocks in memory
|
||||
logFrequency = time.Second * 8 // log info frequency during long indexing/unindexing process
|
||||
)
|
||||
|
||||
// updateLoop initializes and updates the log index structure according to the
|
||||
|
@ -44,7 +45,10 @@ func (f *FilterMaps) updateLoop() {
|
|||
f.reset()
|
||||
return
|
||||
}
|
||||
|
||||
f.lock.Lock()
|
||||
f.updateMapCache()
|
||||
f.lock.Unlock()
|
||||
if rp, err := f.newUpdateBatch().makeRevertPoint(); err == nil {
|
||||
f.revertPoints[rp.blockNumber] = rp
|
||||
} else {
|
||||
|
@ -198,6 +202,7 @@ func (f *FilterMaps) tryInit(head *types.Header) bool {
|
|||
log.Error("Could not initialize log index", "error", err)
|
||||
}
|
||||
f.applyUpdateBatch(update)
|
||||
log.Info("Initialized log index", "head", head.Number.Uint64())
|
||||
return true
|
||||
}
|
||||
|
||||
|
@ -209,6 +214,32 @@ func (f *FilterMaps) tryInit(head *types.Header) bool {
|
|||
// indexer should exit and remaining parts of the old database will be removed
|
||||
// at next startup.
|
||||
func (f *FilterMaps) tryUpdateHead(newHead *types.Header) bool {
|
||||
defer func() {
|
||||
fmr := f.getRange()
|
||||
if newHead.Hash() == fmr.headBlockHash {
|
||||
if f.loggedHeadUpdate {
|
||||
log.Info("Forward log indexing finished", "processed", fmr.headBlockNumber-f.ptrHeadUpdate,
|
||||
"elapsed", common.PrettyDuration(time.Since(f.lastLogHeadUpdate)))
|
||||
f.loggedHeadUpdate, f.startHeadUpdate = false, false
|
||||
}
|
||||
} else {
|
||||
if time.Since(f.lastLogHeadUpdate) > logFrequency || !f.loggedHeadUpdate {
|
||||
log.Info("Forward log indexing in progress", "processed", fmr.headBlockNumber-f.ptrHeadUpdate,
|
||||
"remaining", newHead.Number.Uint64()-fmr.headBlockNumber,
|
||||
"elapsed", common.PrettyDuration(time.Since(f.startedHeadUpdate)))
|
||||
f.loggedHeadUpdate = true
|
||||
f.lastLogHeadUpdate = time.Now()
|
||||
}
|
||||
}
|
||||
|
||||
}()
|
||||
|
||||
if !f.startHeadUpdate {
|
||||
f.lastLogHeadUpdate = time.Now()
|
||||
f.startedHeadUpdate = f.lastLogHeadUpdate
|
||||
f.startHeadUpdate = true
|
||||
f.ptrHeadUpdate = f.getRange().headBlockNumber
|
||||
}
|
||||
// iterate back from new head until the log index head or a revert point and
|
||||
// collect headers of blocks to be added
|
||||
var (
|
||||
|
@ -305,14 +336,41 @@ func (f *FilterMaps) tryUpdateTail(head *types.Header, stopFn func() bool) bool
|
|||
// tryExtendTail attempts to extend the log index backwards until the desired
|
||||
// indexed history length is achieved. Returns true if finished.
|
||||
func (f *FilterMaps) tryExtendTail(tailTarget uint64, stopFn func() bool) bool {
|
||||
defer func() {
|
||||
fmr := f.getRange()
|
||||
if fmr.tailBlockNumber <= tailTarget {
|
||||
if f.loggedTailExtend {
|
||||
log.Info("Reverse log indexing finished", "history", fmr.headBlockNumber+1-fmr.tailBlockNumber,
|
||||
"processed", f.ptrTailExtend-fmr.tailBlockNumber, "elapsed", common.PrettyDuration(time.Since(f.lastLogTailExtend)))
|
||||
f.loggedTailExtend = false
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
fmr := f.getRange()
|
||||
number, parentHash := fmr.tailBlockNumber, fmr.tailParentHash
|
||||
|
||||
if !f.loggedTailExtend {
|
||||
f.lastLogTailExtend = time.Now()
|
||||
f.startedTailExtend = f.lastLogTailExtend
|
||||
f.ptrTailExtend = fmr.tailBlockNumber
|
||||
}
|
||||
|
||||
update := f.newUpdateBatch()
|
||||
lastTailEpoch := update.tailEpoch()
|
||||
for number > tailTarget && !stopFn() {
|
||||
if tailEpoch := update.tailEpoch(); tailEpoch < lastTailEpoch {
|
||||
// limit the amount of data updated in a single batch
|
||||
f.applyUpdateBatch(update)
|
||||
|
||||
if time.Since(f.lastLogTailExtend) > logFrequency || !f.loggedTailExtend {
|
||||
log.Info("Reverse log indexing in progress", "history", update.headBlockNumber+1-update.tailBlockNumber,
|
||||
"processed", f.ptrTailExtend-update.tailBlockNumber, "remaining", update.tailBlockNumber-tailTarget,
|
||||
"elapsed", common.PrettyDuration(time.Since(f.startedTailExtend)))
|
||||
f.loggedTailExtend = true
|
||||
f.lastLogTailExtend = time.Now()
|
||||
}
|
||||
|
||||
update = f.newUpdateBatch()
|
||||
lastTailEpoch = tailEpoch
|
||||
}
|
||||
|
@ -339,10 +397,27 @@ func (f *FilterMaps) tryExtendTail(tailTarget uint64, stopFn func() bool) bool {
|
|||
// tryUnindexTail attempts to prune the log index tail until the desired indexed
|
||||
// history length is achieved. Returns true if finished.
|
||||
func (f *FilterMaps) tryUnindexTail(tailTarget uint64, stopFn func() bool) bool {
|
||||
if !f.loggedTailUnindex {
|
||||
f.lastLogTailUnindex = time.Now()
|
||||
f.startedTailUnindex = f.lastLogTailUnindex
|
||||
f.ptrTailUnindex = f.getRange().tailBlockNumber
|
||||
}
|
||||
for {
|
||||
if f.unindexTailEpoch(tailTarget) {
|
||||
fmr := f.getRange()
|
||||
log.Info("Log unindexing finished", "history", fmr.headBlockNumber+1-fmr.tailBlockNumber,
|
||||
"removed", fmr.tailBlockNumber-f.ptrTailUnindex, "elapsed", common.PrettyDuration(time.Since(f.lastLogTailUnindex)))
|
||||
f.loggedTailUnindex = false
|
||||
return true
|
||||
}
|
||||
if time.Since(f.lastLogTailUnindex) > logFrequency || !f.loggedTailUnindex {
|
||||
fmr := f.getRange()
|
||||
log.Info("Log unindexing in progress", "history", fmr.headBlockNumber+1-fmr.tailBlockNumber,
|
||||
"removed", fmr.tailBlockNumber-f.ptrTailUnindex, "remaining", tailTarget-fmr.tailBlockNumber,
|
||||
"elapsed", common.PrettyDuration(time.Since(f.startedTailUnindex)))
|
||||
f.loggedTailUnindex = true
|
||||
f.lastLogTailUnindex = time.Now()
|
||||
}
|
||||
if stopFn() {
|
||||
return false
|
||||
}
|
||||
|
@ -402,6 +477,7 @@ func (f *FilterMaps) unindexTailEpoch(tailTarget uint64) (finished bool) {
|
|||
// by updating the tail pointers, except for targetLvPointer which is not changed
|
||||
// yet as it marks the tail of the log index data stored in the database and
|
||||
// therefore should be updated when map data is actually removed.
|
||||
// Note that this function assumes that the read/write lock is being held.
|
||||
func (f *FilterMaps) unindexTailPtr(tailTarget uint64) (newTailMap uint32, changed bool) {
|
||||
// obtain target log value pointer
|
||||
if tailTarget <= f.tailBlockNumber || tailTarget > f.headBlockNumber {
|
||||
|
@ -542,7 +618,6 @@ func (f *FilterMaps) applyUpdateBatch(u *updateBatch) {
|
|||
if err := batch.Write(); err != nil {
|
||||
log.Crit("Could not write update batch", "error", err)
|
||||
}
|
||||
log.Info("Log index block range updated", "tail", u.tailBlockNumber, "head", u.headBlockNumber, "log values", u.headLvPointer-u.tailBlockLvPointer)
|
||||
}
|
||||
|
||||
// updatedRangeLength returns the lenght of the updated filter map range.
|
||||
|
|
|
@ -167,6 +167,7 @@ func (fm *FilterMapsMatcherBackend) SyncLogIndex(ctx context.Context) (SyncRange
|
|||
// valid range with the current indexed range. This function should be called
|
||||
// whenever a part of the log index has been removed, before adding new blocks
|
||||
// to it.
|
||||
// Note that this function assumes that the read lock is being held.
|
||||
func (f *FilterMaps) updateMatchersValidRange() {
|
||||
for fm := range f.matchers {
|
||||
if !f.initialized {
|
||||
|
|
Loading…
Reference in New Issue