core/filtermaps: use DeleteRange, checkpoint init of log value pointer
This commit is contained in:
parent
7db1987af3
commit
38194a0fbe
|
@ -17,6 +17,7 @@
|
||||||
package filtermaps
|
package filtermaps
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"errors"
|
"errors"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
@ -27,10 +28,37 @@ import (
|
||||||
"github.com/ethereum/go-ethereum/core/rawdb"
|
"github.com/ethereum/go-ethereum/core/rawdb"
|
||||||
"github.com/ethereum/go-ethereum/core/types"
|
"github.com/ethereum/go-ethereum/core/types"
|
||||||
"github.com/ethereum/go-ethereum/ethdb"
|
"github.com/ethereum/go-ethereum/ethdb"
|
||||||
|
"github.com/ethereum/go-ethereum/ethdb/leveldb"
|
||||||
"github.com/ethereum/go-ethereum/event"
|
"github.com/ethereum/go-ethereum/event"
|
||||||
"github.com/ethereum/go-ethereum/log"
|
"github.com/ethereum/go-ethereum/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// checkpoint allows the log indexer to start indexing from the given block
|
||||||
|
// instead of genesis at the correct absolute log value index.
|
||||||
|
type checkpoint struct {
|
||||||
|
blockNumber uint64
|
||||||
|
blockHash common.Hash
|
||||||
|
nextLvIndex uint64 // next log value index after the given block
|
||||||
|
}
|
||||||
|
|
||||||
|
var checkpoints = []checkpoint{
|
||||||
|
{ // Mainnet
|
||||||
|
blockNumber: 21019982,
|
||||||
|
blockHash: common.HexToHash("0xc684e4db692fe347e740082665acf91e27c0d9ad2a118822abdd7bb06c2a9250"),
|
||||||
|
nextLvIndex: 15878969230,
|
||||||
|
},
|
||||||
|
{ // Sepolia
|
||||||
|
blockNumber: 6939193,
|
||||||
|
blockHash: common.HexToHash("0x659b6e8a711efe8184368ac286f1f4aee74be50d38bb7fe4b24f53e73dfa58b8"),
|
||||||
|
nextLvIndex: 3392298216,
|
||||||
|
},
|
||||||
|
{ // Holesky
|
||||||
|
blockNumber: 2607449,
|
||||||
|
blockHash: common.HexToHash("0xa48c4e1ff3857ba44346bc25346d9947cd12c08f5ce8c10e8acaf40e2d6c7dc4"),
|
||||||
|
nextLvIndex: 966700355,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
const headCacheSize = 8 // maximum number of recent filter maps cached in memory
|
const headCacheSize = 8 // maximum number of recent filter maps cached in memory
|
||||||
|
|
||||||
// blockchain defines functions required by the FilterMaps log indexer.
|
// blockchain defines functions required by the FilterMaps log indexer.
|
||||||
|
@ -175,6 +203,9 @@ func NewFilterMaps(db ethdb.KeyValueStore, chain blockchain, params Params, hist
|
||||||
log.Error("Error fetching tail block pointer, resetting log index", "error", err)
|
log.Error("Error fetching tail block pointer, resetting log index", "error", err)
|
||||||
fm.filterMapsRange = filterMapsRange{} // updateLoop resets the database
|
fm.filterMapsRange = filterMapsRange{} // updateLoop resets the database
|
||||||
}
|
}
|
||||||
|
headBlockPtr, _ := fm.getBlockLvPointer(fm.headBlockNumber)
|
||||||
|
log.Trace("Log index head", "number", fm.headBlockNumber, "hash", fm.headBlockHash.String(), "log value pointer", fm.headLvPointer)
|
||||||
|
log.Trace("Log index tail", "number", fm.tailBlockNumber, "parentHash", fm.tailParentHash.String(), "log value pointer", fm.tailBlockLvPointer)
|
||||||
}
|
}
|
||||||
return fm
|
return fm
|
||||||
}
|
}
|
||||||
|
@ -218,43 +249,37 @@ func (f *FilterMaps) removeBloomBits() {
|
||||||
// removeDbWithPrefix removes data with the given prefix from the database and
|
// removeDbWithPrefix removes data with the given prefix from the database and
|
||||||
// returns true if everything was successfully removed.
|
// returns true if everything was successfully removed.
|
||||||
func (f *FilterMaps) removeDbWithPrefix(prefix []byte, action string) bool {
|
func (f *FilterMaps) removeDbWithPrefix(prefix []byte, action string) bool {
|
||||||
var (
|
it := f.db.NewIterator(prefix, nil)
|
||||||
logged bool
|
hasData := it.Next()
|
||||||
lastLogged time.Time
|
it.Release()
|
||||||
removed uint64
|
if !hasData {
|
||||||
)
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
end := bytes.Clone(prefix)
|
||||||
|
end[len(end)-1]++
|
||||||
|
start := time.Now()
|
||||||
|
var retry bool
|
||||||
for {
|
for {
|
||||||
|
err := f.db.DeleteRange(prefix, end)
|
||||||
|
if err == nil {
|
||||||
|
log.Info(action+" finished", "elapsed", time.Since(start))
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
if err != leveldb.ErrTooManyKeys {
|
||||||
|
log.Error(action+" failed", "error", err)
|
||||||
|
return false
|
||||||
|
}
|
||||||
select {
|
select {
|
||||||
case <-f.closeCh:
|
case <-f.closeCh:
|
||||||
return false
|
return false
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
it := f.db.NewIterator(prefix, nil)
|
if !retry {
|
||||||
batch := f.db.NewBatch()
|
log.Info(action + " in progress...")
|
||||||
var count int
|
retry = true
|
||||||
for ; count < 250000 && it.Next(); count++ {
|
|
||||||
batch.Delete(it.Key())
|
|
||||||
removed++
|
|
||||||
}
|
}
|
||||||
it.Release()
|
|
||||||
if count == 0 {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
if !logged {
|
|
||||||
log.Info(action + "...")
|
|
||||||
logged = true
|
|
||||||
lastLogged = time.Now()
|
|
||||||
}
|
|
||||||
if time.Since(lastLogged) >= time.Second*10 {
|
|
||||||
log.Info(action+" in progress", "removed keys", removed)
|
|
||||||
lastLogged = time.Now()
|
|
||||||
}
|
|
||||||
batch.Write()
|
|
||||||
}
|
}
|
||||||
if logged {
|
|
||||||
log.Info(action + " finished")
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// setRange updates the covered range and also adds the changes to the given batch.
|
// setRange updates the covered range and also adds the changes to the given batch.
|
||||||
|
|
|
@ -203,18 +203,37 @@ func (f *FilterMaps) tryInit(head *types.Header) bool {
|
||||||
if !f.reset() {
|
if !f.reset() {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
head = f.chain.GetHeader(f.chain.GetCanonicalHash(0), 0)
|
|
||||||
receipts := f.chain.GetReceiptsByHash(head.Hash())
|
hc := newHeaderChain(f.chain, head.Number.Uint64(), head.Hash())
|
||||||
|
var (
|
||||||
|
initHeader *types.Header
|
||||||
|
nextLvPtr uint64
|
||||||
|
)
|
||||||
|
for _, cp := range checkpoints {
|
||||||
|
if initHeader == nil || cp.blockNumber >= initHeader.Number.Uint64() {
|
||||||
|
if h := f.chain.GetHeader(hc.getBlockHash(cp.blockNumber+1), cp.blockNumber+1); h != nil && h.ParentHash == cp.blockHash {
|
||||||
|
initHeader, nextLvPtr = h, cp.nextLvIndex
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if initHeader == nil {
|
||||||
|
initHeader = f.chain.GetHeader(f.chain.GetCanonicalHash(0), 0)
|
||||||
|
}
|
||||||
|
if initHeader == nil {
|
||||||
|
log.Error("Could not retrieve init header")
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
receipts := f.chain.GetReceiptsByHash(initHeader.Hash())
|
||||||
if receipts == nil {
|
if receipts == nil {
|
||||||
log.Error("Could not retrieve block receipts for init block", "number", head.Number, "hash", head.Hash())
|
log.Error("Could not retrieve block receipts for init block", "number", initHeader.Number, "hash", initHeader.Hash())
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
update := f.newUpdateBatch()
|
update := f.newUpdateBatch()
|
||||||
if err := update.initWithBlock(head, receipts, 0); err != nil {
|
if err := update.initWithBlock(initHeader, receipts, nextLvPtr); err != nil {
|
||||||
log.Error("Could not initialize log index", "error", err)
|
log.Error("Could not initialize log index", "error", err)
|
||||||
}
|
}
|
||||||
f.applyUpdateBatch(update)
|
f.applyUpdateBatch(update)
|
||||||
log.Info("Initialized log index", "head", head.Number.Uint64())
|
log.Info("Initialized log index", "head", initHeader.Number.Uint64(), "log value pointer", nextLvPtr)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue