diff --git a/core/filtermaps/filtermaps.go b/core/filtermaps/filtermaps.go index 8f5e436d9f..3b19cc2cd2 100644 --- a/core/filtermaps/filtermaps.go +++ b/core/filtermaps/filtermaps.go @@ -6,6 +6,7 @@ import ( "errors" "sort" "sync" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/lru" @@ -45,7 +46,8 @@ type blockchain interface { type FilterMaps struct { lock sync.RWMutex db ethdb.Database - closeCh chan chan struct{} + closeCh chan struct{} + closeWg sync.WaitGroup filterMapsRange chain blockchain matcherSyncCh chan *FilterMapsMatcherBackend @@ -102,7 +104,7 @@ func NewFilterMaps(db ethdb.Database, chain blockchain) *FilterMaps { fm := &FilterMaps{ db: db, chain: chain, - closeCh: make(chan chan struct{}), + closeCh: make(chan struct{}), filterMapsRange: filterMapsRange{ initialized: rs.Initialized, headLvPointer: rs.HeadLvPointer, @@ -119,64 +121,81 @@ func NewFilterMaps(db ethdb.Database, chain blockchain) *FilterMaps { lvPointerCache: lru.NewCache[uint64, uint64](1000), revertPoints: make(map[uint64]*revertPoint), } - if !fm.initialized { - fm.resetDb() - } - fm.updateMapCache() - if rp, err := fm.newUpdateBatch().makeRevertPoint(); err == nil { - fm.revertPoints[rp.blockNumber] = rp - } else { - log.Error("Error creating head revert point", "error", err) - } + fm.closeWg.Add(2) + go fm.removeBloomBits() go fm.updateLoop() return fm } // Close ensures that the indexer is fully stopped before returning. func (f *FilterMaps) Close() { - ch := make(chan struct{}) - f.closeCh <- ch - <-ch + close(f.closeCh) + f.closeWg.Wait() } // reset un-initializes the FilterMaps structure and removes all related data from -// the database. -// Note that this function assumes that the read/write lock is being held. -func (f *FilterMaps) reset() { - // deleting the range first ensures that resetDb will be called again at next - // startup and any leftover data will be removed even if it cannot finish now. - rawdb.DeleteFilterMapsRange(f.db) - f.resetDb() +// the database. The function returns true if everything was successfully removed. +func (f *FilterMaps) reset() bool { + f.lock.Lock() f.filterMapsRange = filterMapsRange{} f.filterMapCache = make(map[uint32]*filterMap) f.revertPoints = make(map[uint64]*revertPoint) f.blockPtrCache.Purge() f.lvPointerCache.Purge() + f.lock.Unlock() + // deleting the range first ensures that resetDb will be called again at next + // startup and any leftover data will be removed even if it cannot finish now. + rawdb.DeleteFilterMapsRange(f.db) + return f.removeDbWithPrefix(rawdb.FilterMapsPrefix, "Resetting log index database") } -// resetDb removes all log index data from the database. -func (f *FilterMaps) resetDb() { - var logged bool +// removeBloomBits removes old bloom bits data from the database. +func (f *FilterMaps) removeBloomBits() { + f.removeDbWithPrefix(rawdb.BloomBitsPrefix, "Removing old bloom bits database") + f.removeDbWithPrefix(rawdb.BloomBitsIndexPrefix, "Removing old bloom bits chain index") + f.closeWg.Done() +} + +// removeDbWithPrefix removes data with the given prefix from the database and +// returns true if everything was successfully removed. +func (f *FilterMaps) removeDbWithPrefix(prefix []byte, action string) bool { + var ( + logged bool + lastLogged time.Time + removed uint64 + ) for { - it := f.db.NewIterator(rawdb.FilterMapsPrefix, nil) + select { + case <-f.closeCh: + return false + default: + } + it := f.db.NewIterator(prefix, nil) batch := f.db.NewBatch() var count int for ; count < 10000 && it.Next(); count++ { batch.Delete(it.Key()) + removed++ } it.Release() if count == 0 { break } if !logged { - log.Info("Resetting log index database...") + log.Info(action + "...") logged = true + lastLogged = time.Now() + } + if time.Since(lastLogged) >= time.Second*10 { + log.Info(action+" in progress", "removed keys", removed) + lastLogged = time.Now() } batch.Write() } if logged { - log.Info("Resetting log index database finished") + log.Info(action + " finished") } + return true } // setRange updates the covered range and also adds the changes to the given batch. diff --git a/core/filtermaps/indexer.go b/core/filtermaps/indexer.go index a9b1b56136..9c2235a03f 100644 --- a/core/filtermaps/indexer.go +++ b/core/filtermaps/indexer.go @@ -22,6 +22,14 @@ const ( // updateLoop initializes and updates the log index structure according to the // canonical chain. func (f *FilterMaps) updateLoop() { + defer f.closeWg.Done() + f.updateMapCache() + if rp, err := f.newUpdateBatch().makeRevertPoint(); err == nil { + f.revertPoints[rp.blockNumber] = rp + } else { + log.Error("Error creating head revert point", "error", err) + } + var ( headEventCh = make(chan core.ChainHeadEvent) sub = f.chain.SubscribeChainHeadEvent(headEventCh) @@ -54,8 +62,7 @@ func (f *FilterMaps) updateLoop() { case <-time.After(time.Second * 20): // keep updating log index during syncing head = f.chain.CurrentBlock() - case ch := <-f.closeCh: - close(ch) + case <-f.closeCh: stop = true } } @@ -69,7 +76,10 @@ func (f *FilterMaps) updateLoop() { for !stop { if !fmr.initialized { - f.tryInit(head) + if !f.tryInit(head) { + return + } + if syncMatcher != nil { syncMatcher.synced(head) syncMatcher = nil @@ -82,7 +92,9 @@ func (f *FilterMaps) updateLoop() { } // log index is initialized if fmr.headBlockHash != head.Hash() { - f.tryUpdateHead(head) + if !f.tryUpdateHead(head) { + return + } fmr = f.getRange() if fmr.headBlockHash != head.Hash() { wait() @@ -101,8 +113,7 @@ func (f *FilterMaps) updateLoop() { head = ev.Block.Header() case syncMatcher = <-f.matcherSyncCh: head = f.chain.CurrentBlock() - case ch := <-f.closeCh: - close(ch) + case <-f.closeCh: stop = true return true default: @@ -128,24 +139,34 @@ func (f *FilterMaps) getRange() filterMapsRange { } // tryInit attempts to initialize the log index structure. -func (f *FilterMaps) tryInit(head *types.Header) { +// Returns false if indexer was stopped during a database reset. In this case the +// indexer should exit and remaining parts of the old database will be removed +// at next startup. +func (f *FilterMaps) tryInit(head *types.Header) bool { + if !f.reset() { + return false + } receipts := rawdb.ReadRawReceipts(f.db, head.Hash(), head.Number.Uint64()) if receipts == nil { log.Error("Could not retrieve block receipts for init block", "number", head.Number, "hash", head.Hash()) - return + return true } update := f.newUpdateBatch() if err := update.initWithBlock(head, receipts); err != nil { log.Error("Could not initialize log index", "error", err) } f.applyUpdateBatch(update) + return true } // tryUpdateHead attempts to update the log index with a new head. If necessary, // it reverts to a common ancestor with the old head before adding new block logs. // If no suitable revert point is available (probably a reorg just after init) // then it resets the index and tries to re-initialize with the new head. -func (f *FilterMaps) tryUpdateHead(newHead *types.Header) { +// Returns false if indexer was stopped during a database reset. In this case the +// indexer should exit and remaining parts of the old database will be removed +// at next startup. +func (f *FilterMaps) tryUpdateHead(newHead *types.Header) bool { // iterate back from new head until the log index head or a revert point and // collect headers of blocks to be added var ( @@ -159,14 +180,12 @@ func (f *FilterMaps) tryUpdateHead(newHead *types.Header) { rp, err = f.getRevertPoint(chainPtr.Number.Uint64()) if err != nil { log.Error("Error fetching revert point", "block number", chainPtr.Number.Uint64(), "error", err) - return + return true } if rp == nil { // there are no more revert points available so we should reset and re-initialize log.Warn("No suitable revert point exists; re-initializing log index", "block number", newHead.Number.Uint64()) - f.reset() - f.tryInit(newHead) - return + return f.tryInit(newHead) } } if chainPtr.Hash() == rp.blockHash { @@ -178,7 +197,7 @@ func (f *FilterMaps) tryUpdateHead(newHead *types.Header) { chainPtr = f.chain.GetHeader(chainPtr.ParentHash, chainPtr.Number.Uint64()-1) if chainPtr == nil { log.Error("Canonical header not found", "number", chainPtr.Number.Uint64()-1, "hash", chainPtr.ParentHash) - return + return true } } if rp.blockHash != f.headBlockHash { @@ -187,12 +206,12 @@ func (f *FilterMaps) tryUpdateHead(newHead *types.Header) { } if err := f.revertTo(rp); err != nil { log.Error("Error applying revert point", "block number", chainPtr.Number.Uint64(), "error", err) - return + return true } } if newHeaders == nil { - return + return true } // add logs of new blocks in reverse order update := f.newUpdateBatch() @@ -214,6 +233,7 @@ func (f *FilterMaps) tryUpdateHead(newHead *types.Header) { } } f.applyUpdateBatch(update) + return true } // tryExtendTail attempts to extend the log index backwards until it indexes the diff --git a/core/rawdb/schema.go b/core/rawdb/schema.go index 60d6a58ade..29e9adb42b 100644 --- a/core/rawdb/schema.go +++ b/core/rawdb/schema.go @@ -106,7 +106,7 @@ var ( blockReceiptsPrefix = []byte("r") // blockReceiptsPrefix + num (uint64 big endian) + hash -> block receipts txLookupPrefix = []byte("l") // txLookupPrefix + hash -> transaction/receipt lookup metadata - bloomBitsPrefix = []byte("B") // bloomBitsPrefix + bit (uint16 big endian) + section (uint64 big endian) + hash -> bloom bits + BloomBitsPrefix = []byte("B") // bloomBitsPrefix + bit (uint16 big endian) + section (uint64 big endian) + hash -> bloom bits SnapshotAccountPrefix = []byte("a") // SnapshotAccountPrefix + account hash -> account trie value SnapshotStoragePrefix = []byte("o") // SnapshotStoragePrefix + account hash + storage hash -> storage trie value CodePrefix = []byte("c") // CodePrefix + code hash -> account code