core/filtermaps: remove bloombits database

This commit is contained in:
Zsolt Felfoldi 2024-09-27 12:38:07 +02:00
parent 66ab6aab4a
commit 189705f3af
3 changed files with 83 additions and 44 deletions

View File

@ -6,6 +6,7 @@ import (
"errors"
"sort"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/lru"
@ -45,7 +46,8 @@ type blockchain interface {
type FilterMaps struct {
lock sync.RWMutex
db ethdb.Database
closeCh chan chan struct{}
closeCh chan struct{}
closeWg sync.WaitGroup
filterMapsRange
chain blockchain
matcherSyncCh chan *FilterMapsMatcherBackend
@ -102,7 +104,7 @@ func NewFilterMaps(db ethdb.Database, chain blockchain) *FilterMaps {
fm := &FilterMaps{
db: db,
chain: chain,
closeCh: make(chan chan struct{}),
closeCh: make(chan struct{}),
filterMapsRange: filterMapsRange{
initialized: rs.Initialized,
headLvPointer: rs.HeadLvPointer,
@ -119,64 +121,81 @@ func NewFilterMaps(db ethdb.Database, chain blockchain) *FilterMaps {
lvPointerCache: lru.NewCache[uint64, uint64](1000),
revertPoints: make(map[uint64]*revertPoint),
}
if !fm.initialized {
fm.resetDb()
}
fm.updateMapCache()
if rp, err := fm.newUpdateBatch().makeRevertPoint(); err == nil {
fm.revertPoints[rp.blockNumber] = rp
} else {
log.Error("Error creating head revert point", "error", err)
}
fm.closeWg.Add(2)
go fm.removeBloomBits()
go fm.updateLoop()
return fm
}
// Close ensures that the indexer is fully stopped before returning.
func (f *FilterMaps) Close() {
ch := make(chan struct{})
f.closeCh <- ch
<-ch
close(f.closeCh)
f.closeWg.Wait()
}
// reset un-initializes the FilterMaps structure and removes all related data from
// the database.
// Note that this function assumes that the read/write lock is being held.
func (f *FilterMaps) reset() {
// deleting the range first ensures that resetDb will be called again at next
// startup and any leftover data will be removed even if it cannot finish now.
rawdb.DeleteFilterMapsRange(f.db)
f.resetDb()
// the database. The function returns true if everything was successfully removed.
func (f *FilterMaps) reset() bool {
f.lock.Lock()
f.filterMapsRange = filterMapsRange{}
f.filterMapCache = make(map[uint32]*filterMap)
f.revertPoints = make(map[uint64]*revertPoint)
f.blockPtrCache.Purge()
f.lvPointerCache.Purge()
f.lock.Unlock()
// deleting the range first ensures that resetDb will be called again at next
// startup and any leftover data will be removed even if it cannot finish now.
rawdb.DeleteFilterMapsRange(f.db)
return f.removeDbWithPrefix(rawdb.FilterMapsPrefix, "Resetting log index database")
}
// resetDb removes all log index data from the database.
func (f *FilterMaps) resetDb() {
var logged bool
// removeBloomBits removes old bloom bits data from the database.
func (f *FilterMaps) removeBloomBits() {
f.removeDbWithPrefix(rawdb.BloomBitsPrefix, "Removing old bloom bits database")
f.removeDbWithPrefix(rawdb.BloomBitsIndexPrefix, "Removing old bloom bits chain index")
f.closeWg.Done()
}
// removeDbWithPrefix removes data with the given prefix from the database and
// returns true if everything was successfully removed.
func (f *FilterMaps) removeDbWithPrefix(prefix []byte, action string) bool {
var (
logged bool
lastLogged time.Time
removed uint64
)
for {
it := f.db.NewIterator(rawdb.FilterMapsPrefix, nil)
select {
case <-f.closeCh:
return false
default:
}
it := f.db.NewIterator(prefix, nil)
batch := f.db.NewBatch()
var count int
for ; count < 10000 && it.Next(); count++ {
batch.Delete(it.Key())
removed++
}
it.Release()
if count == 0 {
break
}
if !logged {
log.Info("Resetting log index database...")
log.Info(action + "...")
logged = true
lastLogged = time.Now()
}
if time.Since(lastLogged) >= time.Second*10 {
log.Info(action+" in progress", "removed keys", removed)
lastLogged = time.Now()
}
batch.Write()
}
if logged {
log.Info("Resetting log index database finished")
log.Info(action + " finished")
}
return true
}
// setRange updates the covered range and also adds the changes to the given batch.

View File

@ -22,6 +22,14 @@ const (
// updateLoop initializes and updates the log index structure according to the
// canonical chain.
func (f *FilterMaps) updateLoop() {
defer f.closeWg.Done()
f.updateMapCache()
if rp, err := f.newUpdateBatch().makeRevertPoint(); err == nil {
f.revertPoints[rp.blockNumber] = rp
} else {
log.Error("Error creating head revert point", "error", err)
}
var (
headEventCh = make(chan core.ChainHeadEvent)
sub = f.chain.SubscribeChainHeadEvent(headEventCh)
@ -54,8 +62,7 @@ func (f *FilterMaps) updateLoop() {
case <-time.After(time.Second * 20):
// keep updating log index during syncing
head = f.chain.CurrentBlock()
case ch := <-f.closeCh:
close(ch)
case <-f.closeCh:
stop = true
}
}
@ -69,7 +76,10 @@ func (f *FilterMaps) updateLoop() {
for !stop {
if !fmr.initialized {
f.tryInit(head)
if !f.tryInit(head) {
return
}
if syncMatcher != nil {
syncMatcher.synced(head)
syncMatcher = nil
@ -82,7 +92,9 @@ func (f *FilterMaps) updateLoop() {
}
// log index is initialized
if fmr.headBlockHash != head.Hash() {
f.tryUpdateHead(head)
if !f.tryUpdateHead(head) {
return
}
fmr = f.getRange()
if fmr.headBlockHash != head.Hash() {
wait()
@ -101,8 +113,7 @@ func (f *FilterMaps) updateLoop() {
head = ev.Block.Header()
case syncMatcher = <-f.matcherSyncCh:
head = f.chain.CurrentBlock()
case ch := <-f.closeCh:
close(ch)
case <-f.closeCh:
stop = true
return true
default:
@ -128,24 +139,34 @@ func (f *FilterMaps) getRange() filterMapsRange {
}
// tryInit attempts to initialize the log index structure.
func (f *FilterMaps) tryInit(head *types.Header) {
// Returns false if indexer was stopped during a database reset. In this case the
// indexer should exit and remaining parts of the old database will be removed
// at next startup.
func (f *FilterMaps) tryInit(head *types.Header) bool {
if !f.reset() {
return false
}
receipts := rawdb.ReadRawReceipts(f.db, head.Hash(), head.Number.Uint64())
if receipts == nil {
log.Error("Could not retrieve block receipts for init block", "number", head.Number, "hash", head.Hash())
return
return true
}
update := f.newUpdateBatch()
if err := update.initWithBlock(head, receipts); err != nil {
log.Error("Could not initialize log index", "error", err)
}
f.applyUpdateBatch(update)
return true
}
// tryUpdateHead attempts to update the log index with a new head. If necessary,
// it reverts to a common ancestor with the old head before adding new block logs.
// If no suitable revert point is available (probably a reorg just after init)
// then it resets the index and tries to re-initialize with the new head.
func (f *FilterMaps) tryUpdateHead(newHead *types.Header) {
// Returns false if indexer was stopped during a database reset. In this case the
// indexer should exit and remaining parts of the old database will be removed
// at next startup.
func (f *FilterMaps) tryUpdateHead(newHead *types.Header) bool {
// iterate back from new head until the log index head or a revert point and
// collect headers of blocks to be added
var (
@ -159,14 +180,12 @@ func (f *FilterMaps) tryUpdateHead(newHead *types.Header) {
rp, err = f.getRevertPoint(chainPtr.Number.Uint64())
if err != nil {
log.Error("Error fetching revert point", "block number", chainPtr.Number.Uint64(), "error", err)
return
return true
}
if rp == nil {
// there are no more revert points available so we should reset and re-initialize
log.Warn("No suitable revert point exists; re-initializing log index", "block number", newHead.Number.Uint64())
f.reset()
f.tryInit(newHead)
return
return f.tryInit(newHead)
}
}
if chainPtr.Hash() == rp.blockHash {
@ -178,7 +197,7 @@ func (f *FilterMaps) tryUpdateHead(newHead *types.Header) {
chainPtr = f.chain.GetHeader(chainPtr.ParentHash, chainPtr.Number.Uint64()-1)
if chainPtr == nil {
log.Error("Canonical header not found", "number", chainPtr.Number.Uint64()-1, "hash", chainPtr.ParentHash)
return
return true
}
}
if rp.blockHash != f.headBlockHash {
@ -187,12 +206,12 @@ func (f *FilterMaps) tryUpdateHead(newHead *types.Header) {
}
if err := f.revertTo(rp); err != nil {
log.Error("Error applying revert point", "block number", chainPtr.Number.Uint64(), "error", err)
return
return true
}
}
if newHeaders == nil {
return
return true
}
// add logs of new blocks in reverse order
update := f.newUpdateBatch()
@ -214,6 +233,7 @@ func (f *FilterMaps) tryUpdateHead(newHead *types.Header) {
}
}
f.applyUpdateBatch(update)
return true
}
// tryExtendTail attempts to extend the log index backwards until it indexes the

View File

@ -106,7 +106,7 @@ var (
blockReceiptsPrefix = []byte("r") // blockReceiptsPrefix + num (uint64 big endian) + hash -> block receipts
txLookupPrefix = []byte("l") // txLookupPrefix + hash -> transaction/receipt lookup metadata
bloomBitsPrefix = []byte("B") // bloomBitsPrefix + bit (uint16 big endian) + section (uint64 big endian) + hash -> bloom bits
BloomBitsPrefix = []byte("B") // bloomBitsPrefix + bit (uint16 big endian) + section (uint64 big endian) + hash -> bloom bits
SnapshotAccountPrefix = []byte("a") // SnapshotAccountPrefix + account hash -> account trie value
SnapshotStoragePrefix = []byte("o") // SnapshotStoragePrefix + account hash + storage hash -> storage trie value
CodePrefix = []byte("c") // CodePrefix + code hash -> account code