core/filtermaps: added history.logs parameter
This commit is contained in:
parent
189705f3af
commit
39ab872d17
|
@ -100,6 +100,8 @@ if one is set. Otherwise it prints the genesis from the datadir.`,
|
|||
utils.VMTraceFlag,
|
||||
utils.VMTraceJsonConfigFlag,
|
||||
utils.TransactionHistoryFlag,
|
||||
utils.LogHistoryFlag,
|
||||
utils.LogNoHistoryFlag,
|
||||
utils.StateHistoryFlag,
|
||||
}, utils.DatabaseFlags),
|
||||
Description: `
|
||||
|
|
|
@ -86,6 +86,8 @@ var (
|
|||
utils.SnapshotFlag,
|
||||
utils.TxLookupLimitFlag, // deprecated
|
||||
utils.TransactionHistoryFlag,
|
||||
utils.LogHistoryFlag,
|
||||
utils.LogNoHistoryFlag,
|
||||
utils.StateHistoryFlag,
|
||||
utils.LightServeFlag, // deprecated
|
||||
utils.LightIngressFlag, // deprecated
|
||||
|
|
|
@ -272,6 +272,17 @@ var (
|
|||
Value: ethconfig.Defaults.TransactionHistory,
|
||||
Category: flags.StateCategory,
|
||||
}
|
||||
LogHistoryFlag = &cli.Uint64Flag{
|
||||
Name: "history.logs",
|
||||
Usage: "Number of recent blocks to maintain log search index for (default = about one year, 0 = entire chain)",
|
||||
Value: ethconfig.Defaults.LogHistory,
|
||||
Category: flags.StateCategory,
|
||||
}
|
||||
LogNoHistoryFlag = &cli.BoolFlag{
|
||||
Name: "history.logs.disable",
|
||||
Usage: "Do not maintain log search index",
|
||||
Category: flags.StateCategory,
|
||||
}
|
||||
// Beacon client light sync settings
|
||||
BeaconApiFlag = &cli.StringSliceFlag{
|
||||
Name: "beacon.api",
|
||||
|
@ -1662,6 +1673,12 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
|
|||
cfg.StateScheme = rawdb.HashScheme
|
||||
log.Warn("Forcing hash state-scheme for archive mode")
|
||||
}
|
||||
if ctx.IsSet(LogHistoryFlag.Name) {
|
||||
cfg.LogHistory = ctx.Uint64(LogHistoryFlag.Name)
|
||||
}
|
||||
if ctx.IsSet(LogNoHistoryFlag.Name) {
|
||||
cfg.LogNoHistory = true
|
||||
}
|
||||
if ctx.IsSet(CacheFlag.Name) || ctx.IsSet(CacheTrieFlag.Name) {
|
||||
cfg.TrieCleanCache = ctx.Int(CacheFlag.Name) * ctx.Int(CacheTrieFlag.Name) / 100
|
||||
}
|
||||
|
|
|
@ -48,6 +48,9 @@ type FilterMaps struct {
|
|||
db ethdb.Database
|
||||
closeCh chan struct{}
|
||||
closeWg sync.WaitGroup
|
||||
history uint64
|
||||
noHistory bool
|
||||
|
||||
filterMapsRange
|
||||
chain blockchain
|
||||
matcherSyncCh chan *FilterMapsMatcherBackend
|
||||
|
@ -87,16 +90,22 @@ var emptyRow = FilterRow{}
|
|||
|
||||
// filterMapsRange describes the block range that has been indexed and the log
|
||||
// value index range it has been mapped to.
|
||||
// Note that tailBlockLvPointer points to the earliest log value index belonging
|
||||
// to the tail block while tailLvPointer points to the earliest log value index
|
||||
// added to the corresponding filter map. The latter might point to an earlier
|
||||
// index after tail blocks have been pruned because we do not remove tail values
|
||||
// one by one, rather delete entire maps when all blocks that had log values in
|
||||
// those maps are unindexed.
|
||||
type filterMapsRange struct {
|
||||
initialized bool
|
||||
headLvPointer, tailLvPointer uint64
|
||||
headLvPointer, tailLvPointer, tailBlockLvPointer uint64
|
||||
headBlockNumber, tailBlockNumber uint64
|
||||
headBlockHash, tailParentHash common.Hash
|
||||
}
|
||||
|
||||
// NewFilterMaps creates a new FilterMaps and starts the indexer in order to keep
|
||||
// the structure in sync with the given blockchain.
|
||||
func NewFilterMaps(db ethdb.Database, chain blockchain) *FilterMaps {
|
||||
func NewFilterMaps(db ethdb.Database, chain blockchain, history uint64, noHistory bool) *FilterMaps {
|
||||
rs, err := rawdb.ReadFilterMapsRange(db)
|
||||
if err != nil {
|
||||
log.Error("Error reading log index range", "error", err)
|
||||
|
@ -105,6 +114,8 @@ func NewFilterMaps(db ethdb.Database, chain blockchain) *FilterMaps {
|
|||
db: db,
|
||||
chain: chain,
|
||||
closeCh: make(chan struct{}),
|
||||
history: history,
|
||||
noHistory: noHistory,
|
||||
filterMapsRange: filterMapsRange{
|
||||
initialized: rs.Initialized,
|
||||
headLvPointer: rs.HeadLvPointer,
|
||||
|
@ -121,6 +132,11 @@ func NewFilterMaps(db ethdb.Database, chain blockchain) *FilterMaps {
|
|||
lvPointerCache: lru.NewCache[uint64, uint64](1000),
|
||||
revertPoints: make(map[uint64]*revertPoint),
|
||||
}
|
||||
fm.tailBlockLvPointer, err = fm.getBlockLvPointer(fm.tailBlockNumber)
|
||||
if err != nil {
|
||||
log.Error("Error fetching tail block pointer, resetting log index", "error", err)
|
||||
fm.filterMapsRange = filterMapsRange{} // updateLoop resets the database
|
||||
}
|
||||
fm.closeWg.Add(2)
|
||||
go fm.removeBloomBits()
|
||||
go fm.updateLoop()
|
||||
|
@ -200,7 +216,7 @@ func (f *FilterMaps) removeDbWithPrefix(prefix []byte, action string) bool {
|
|||
|
||||
// setRange updates the covered range and also adds the changes to the given batch.
|
||||
// Note that this function assumes that the read/write lock is being held.
|
||||
func (f *FilterMaps) setRange(batch ethdb.Batch, newRange filterMapsRange) {
|
||||
func (f *FilterMaps) setRange(batch ethdb.KeyValueWriter, newRange filterMapsRange) {
|
||||
f.filterMapsRange = newRange
|
||||
rs := rawdb.FilterMapsRange{
|
||||
Initialized: newRange.initialized,
|
||||
|
@ -227,7 +243,7 @@ func (f *FilterMaps) updateMapCache() {
|
|||
defer f.filterMapLock.Unlock()
|
||||
|
||||
newFilterMapCache := make(map[uint32]*filterMap)
|
||||
firstMap, afterLastMap := uint32(f.tailLvPointer>>logValuesPerMap), uint32((f.headLvPointer+valuesPerMap-1)>>logValuesPerMap)
|
||||
firstMap, afterLastMap := uint32(f.tailBlockLvPointer>>logValuesPerMap), uint32((f.headLvPointer+valuesPerMap-1)>>logValuesPerMap)
|
||||
headCacheFirst := firstMap + 1
|
||||
if afterLastMap > headCacheFirst+headCacheSize {
|
||||
headCacheFirst = afterLastMap - headCacheSize
|
||||
|
@ -255,7 +271,7 @@ func (f *FilterMaps) updateMapCache() {
|
|||
// If this is not the case then an invalid result or an error may be returned.
|
||||
// Note that this function assumes that the read lock is being held.
|
||||
func (f *FilterMaps) getLogByLvIndex(lvIndex uint64) (*types.Log, error) {
|
||||
if lvIndex < f.tailLvPointer || lvIndex > f.headLvPointer {
|
||||
if lvIndex < f.tailBlockLvPointer || lvIndex > f.headLvPointer {
|
||||
return nil, nil
|
||||
}
|
||||
// find possible block range based on map to block pointers
|
||||
|
@ -264,6 +280,9 @@ func (f *FilterMaps) getLogByLvIndex(lvIndex uint64) (*types.Log, error) {
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if firstBlockNumber < f.tailBlockNumber {
|
||||
firstBlockNumber = f.tailBlockNumber
|
||||
}
|
||||
var lastBlockNumber uint64
|
||||
if mapIndex+1 < uint32((f.headLvPointer+valuesPerMap-1)>>logValuesPerMap) {
|
||||
lastBlockNumber, err = f.getMapBlockPtr(mapIndex + 1)
|
||||
|
|
|
@ -23,6 +23,11 @@ const (
|
|||
// canonical chain.
|
||||
func (f *FilterMaps) updateLoop() {
|
||||
defer f.closeWg.Done()
|
||||
|
||||
if f.noHistory {
|
||||
f.reset()
|
||||
return
|
||||
}
|
||||
f.updateMapCache()
|
||||
if rp, err := f.newUpdateBatch().makeRevertPoint(); err == nil {
|
||||
f.revertPoints[rp.blockNumber] = rp
|
||||
|
@ -106,7 +111,7 @@ func (f *FilterMaps) updateLoop() {
|
|||
syncMatcher = nil
|
||||
}
|
||||
// log index head is at latest chain head; process tail blocks if possible
|
||||
f.tryExtendTail(func() bool {
|
||||
f.tryUpdateTail(head, func() bool {
|
||||
// return true if tail processing needs to be stopped
|
||||
select {
|
||||
case ev := <-headEventCh:
|
||||
|
@ -236,19 +241,35 @@ func (f *FilterMaps) tryUpdateHead(newHead *types.Header) bool {
|
|||
return true
|
||||
}
|
||||
|
||||
// tryExtendTail attempts to extend the log index backwards until it indexes the
|
||||
// genesis block or cannot find more block receipts. Since this is a long process,
|
||||
// stopFn is called after adding each tail block and if it returns true, the
|
||||
// tryUpdateTail attempts to extend or prune the log index according to the
|
||||
// current head block number and the log history settings.
|
||||
// stopFn is called regularly during the process, and if it returns true, the
|
||||
// latest batch is written and the function returns.
|
||||
func (f *FilterMaps) tryExtendTail(stopFn func() bool) {
|
||||
func (f *FilterMaps) tryUpdateTail(head *types.Header, stopFn func() bool) {
|
||||
var tailTarget uint64
|
||||
if f.history > 0 {
|
||||
if headNum := head.Number.Uint64(); headNum >= f.history {
|
||||
tailTarget = headNum + 1 - f.history
|
||||
}
|
||||
}
|
||||
tailNum := f.getRange().tailBlockNumber
|
||||
if tailNum > tailTarget {
|
||||
f.tryExtendTail(tailTarget, stopFn)
|
||||
}
|
||||
if tailNum < tailTarget {
|
||||
f.pruneTailPtr(tailTarget)
|
||||
f.tryPruneTailMaps(tailTarget, stopFn)
|
||||
}
|
||||
}
|
||||
|
||||
// tryExtendTail attempts to extend the log index backwards until it indexes the
|
||||
// tail target block or cannot find more block receipts.
|
||||
func (f *FilterMaps) tryExtendTail(tailTarget uint64, stopFn func() bool) {
|
||||
fmr := f.getRange()
|
||||
number, parentHash := fmr.tailBlockNumber, fmr.tailParentHash
|
||||
if number == 0 {
|
||||
return
|
||||
}
|
||||
update := f.newUpdateBatch()
|
||||
lastTailEpoch := update.tailEpoch()
|
||||
for number > 0 && !stopFn() {
|
||||
for number > tailTarget && !stopFn() {
|
||||
if tailEpoch := update.tailEpoch(); tailEpoch < lastTailEpoch {
|
||||
// limit the amount of data updated in a single batch
|
||||
f.applyUpdateBatch(update)
|
||||
|
@ -274,6 +295,114 @@ func (f *FilterMaps) tryExtendTail(stopFn func() bool) {
|
|||
f.applyUpdateBatch(update)
|
||||
}
|
||||
|
||||
// pruneTailPtr updates the tail block number and hash and the corresponding
|
||||
// tailBlockLvPointer according to the given tail target block number.
|
||||
// Note that this function does not remove old index data, only marks it unused
|
||||
// by updating the tail pointers, except for targetLvPointer which is unchanged
|
||||
// as it marks the tail of the log index data stored in the database.
|
||||
func (f *FilterMaps) pruneTailPtr(tailTarget uint64) {
|
||||
f.lock.Lock()
|
||||
defer f.lock.Unlock()
|
||||
|
||||
// obtain target log value pointer
|
||||
if tailTarget <= f.tailBlockNumber || tailTarget > f.headBlockNumber {
|
||||
return // nothing to do
|
||||
}
|
||||
targetLvPointer, err := f.getBlockLvPointer(tailTarget)
|
||||
fmr := f.filterMapsRange
|
||||
|
||||
if err != nil {
|
||||
log.Error("Error fetching tail target log value pointer", "block number", tailTarget, "error", err)
|
||||
}
|
||||
|
||||
// obtain tail target's parent hash
|
||||
var tailParentHash common.Hash
|
||||
if tailTarget > 0 {
|
||||
if f.chain.GetCanonicalHash(fmr.headBlockNumber) != fmr.headBlockHash {
|
||||
return // if a reorg is happening right now then try again later
|
||||
}
|
||||
tailParentHash = f.chain.GetCanonicalHash(tailTarget - 1)
|
||||
if f.chain.GetCanonicalHash(fmr.headBlockNumber) != fmr.headBlockHash {
|
||||
return // check again to make sure that tailParentHash is consistent with the indexed chain
|
||||
}
|
||||
}
|
||||
|
||||
fmr.tailBlockNumber, fmr.tailParentHash = tailTarget, tailParentHash
|
||||
fmr.tailBlockLvPointer = targetLvPointer
|
||||
f.setRange(f.db, fmr)
|
||||
}
|
||||
|
||||
// tryPruneTailMaps removes unused filter maps and corresponding log index
|
||||
// pointers from the database. This function also updates targetLvPointer.
|
||||
func (f *FilterMaps) tryPruneTailMaps(tailTarget uint64, stopFn func() bool) {
|
||||
fmr := f.getRange()
|
||||
tailMap := uint32(fmr.tailLvPointer >> logValuesPerMap)
|
||||
targetMap := uint32(fmr.tailBlockLvPointer >> logValuesPerMap)
|
||||
if tailMap >= targetMap {
|
||||
return
|
||||
}
|
||||
lastEpoch := (targetMap - 1) >> logMapsPerEpoch
|
||||
removeLvPtr, err := f.getMapBlockPtr(tailMap)
|
||||
if err != nil {
|
||||
log.Error("Error fetching tail map block pointer", "map index", tailMap, "error", err)
|
||||
removeLvPtr = math.MaxUint64 // do not remove anything
|
||||
}
|
||||
var (
|
||||
logged bool
|
||||
lastLogged time.Time
|
||||
)
|
||||
for tailMap < targetMap && !stopFn() {
|
||||
tailEpoch := tailMap >> logMapsPerEpoch
|
||||
if tailEpoch == lastEpoch {
|
||||
f.pruneMaps(tailMap, targetMap, &removeLvPtr)
|
||||
break
|
||||
}
|
||||
nextTailMap := (tailEpoch + 1) << logMapsPerEpoch
|
||||
f.pruneMaps(tailMap, nextTailMap, &removeLvPtr)
|
||||
tailMap = nextTailMap
|
||||
if !logged || time.Since(lastLogged) >= time.Second*10 {
|
||||
log.Info("Pruning log index tail...", "filter maps left", targetMap-tailMap)
|
||||
logged, lastLogged = true, time.Now()
|
||||
}
|
||||
}
|
||||
if logged {
|
||||
log.Info("Finished pruning log index tail", "filter maps left", targetMap-tailMap)
|
||||
}
|
||||
}
|
||||
|
||||
// pruneMaps removes filter maps and corresponding log index pointers in the
|
||||
// specified range in a single batch.
|
||||
func (f *FilterMaps) pruneMaps(first, afterLast uint32, removeLvPtr *uint64) {
|
||||
nextBlockNumber, err := f.getMapBlockPtr(afterLast)
|
||||
if err != nil {
|
||||
log.Error("Error fetching next map block pointer", "map index", afterLast, "error", err)
|
||||
nextBlockNumber = 0 // do not remove anything
|
||||
}
|
||||
batch := f.db.NewBatch()
|
||||
for *removeLvPtr < nextBlockNumber {
|
||||
f.deleteBlockLvPointer(batch, *removeLvPtr)
|
||||
(*removeLvPtr)++
|
||||
}
|
||||
for mapIndex := first; mapIndex < afterLast; mapIndex++ {
|
||||
f.deleteMapBlockPtr(batch, mapIndex)
|
||||
}
|
||||
for rowIndex := uint32(0); rowIndex < mapHeight; rowIndex++ {
|
||||
for mapIndex := first; mapIndex < afterLast; mapIndex++ {
|
||||
f.storeFilterMapRow(batch, mapIndex, rowIndex, emptyRow)
|
||||
}
|
||||
}
|
||||
fmr := f.getRange()
|
||||
fmr.tailLvPointer = uint64(afterLast) << logValuesPerMap
|
||||
if fmr.tailLvPointer > fmr.tailBlockLvPointer {
|
||||
log.Error("Cannot prune filter maps beyond tail block log value pointer", "tailLvPointer", fmr.tailLvPointer, "tailBlockLvPointer", fmr.tailBlockLvPointer)
|
||||
return
|
||||
}
|
||||
f.setRange(batch, fmr)
|
||||
if err := batch.Write(); err != nil {
|
||||
log.Crit("Could not write update batch", "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
// updateBatch is a memory overlay collecting changes to the index log structure
|
||||
// that can be written to the database in a single batch while the in-memory
|
||||
// representations in FilterMaps are also updated.
|
||||
|
@ -368,7 +497,7 @@ func (f *FilterMaps) applyUpdateBatch(u *updateBatch) {
|
|||
if err := batch.Write(); err != nil {
|
||||
log.Crit("Could not write update batch", "error", err)
|
||||
}
|
||||
log.Info("Log index block range updated", "tail", u.tailBlockNumber, "head", u.headBlockNumber, "log values", u.headLvPointer-u.tailLvPointer)
|
||||
log.Info("Log index block range updated", "tail", u.tailBlockNumber, "head", u.headBlockNumber, "log values", u.headLvPointer-u.tailBlockLvPointer)
|
||||
}
|
||||
|
||||
// updatedRangeLength returns the lenght of the updated filter map range.
|
||||
|
@ -378,7 +507,7 @@ func (u *updateBatch) updatedRangeLength() uint32 {
|
|||
|
||||
// tailEpoch returns the tail epoch index.
|
||||
func (u *updateBatch) tailEpoch() uint32 {
|
||||
return uint32(u.tailLvPointer >> (logValuesPerMap + logMapsPerEpoch))
|
||||
return uint32(u.tailBlockLvPointer >> (logValuesPerMap + logMapsPerEpoch))
|
||||
}
|
||||
|
||||
// getRowPtr returns a pointer to a FilterRow that can be modified. If the batch
|
||||
|
@ -416,8 +545,8 @@ func (u *updateBatch) initWithBlock(header *types.Header, receipts types.Receipt
|
|||
return errors.New("already initialized")
|
||||
}
|
||||
u.initialized = true
|
||||
u.headLvPointer, u.tailLvPointer = startLvPointer, startLvPointer
|
||||
u.headBlockNumber, u.tailBlockNumber = header.Number.Uint64()-1, header.Number.Uint64() //TODO genesis?
|
||||
u.headLvPointer, u.tailLvPointer, u.tailBlockLvPointer = startLvPointer, startLvPointer, startLvPointer
|
||||
u.headBlockNumber, u.tailBlockNumber = header.Number.Uint64()-1, header.Number.Uint64()
|
||||
u.headBlockHash, u.tailParentHash = header.ParentHash, header.ParentHash
|
||||
u.addBlockToHead(header, receipts)
|
||||
return nil
|
||||
|
@ -470,16 +599,23 @@ func (u *updateBatch) addBlockToHead(header *types.Header, receipts types.Receip
|
|||
|
||||
// addValueToTail adds a single log value to the tail of the log index.
|
||||
func (u *updateBatch) addValueToTail(logValue common.Hash) error {
|
||||
if u.tailLvPointer == 0 {
|
||||
if u.tailBlockLvPointer == 0 {
|
||||
return errors.New("tail log value pointer underflow")
|
||||
}
|
||||
if u.tailBlockLvPointer < u.tailLvPointer {
|
||||
panic("tailBlockLvPointer < tailLvPointer")
|
||||
}
|
||||
u.tailBlockLvPointer--
|
||||
if u.tailBlockLvPointer >= u.tailLvPointer {
|
||||
return nil // already added to the map
|
||||
}
|
||||
u.tailLvPointer--
|
||||
mapIndex := uint32(u.tailLvPointer >> logValuesPerMap)
|
||||
mapIndex := uint32(u.tailBlockLvPointer >> logValuesPerMap)
|
||||
rowPtr, err := u.getRowPtr(mapIndex, rowIndex(mapIndex>>logMapsPerEpoch, logValue))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
column := columnIndex(u.tailLvPointer, logValue)
|
||||
column := columnIndex(u.tailBlockLvPointer, logValue)
|
||||
*rowPtr = append(*rowPtr, 0)
|
||||
copy((*rowPtr)[1:], (*rowPtr)[:len(*rowPtr)-1])
|
||||
(*rowPtr)[0] = column
|
||||
|
|
|
@ -145,7 +145,7 @@ var (
|
|||
FixedCommitteeRootKey = []byte("fixedRoot-") // bigEndian64(syncPeriod) -> committee root hash
|
||||
SyncCommitteeKey = []byte("committee-") // bigEndian64(syncPeriod) -> serialized committee
|
||||
|
||||
FilterMapsPrefix = []byte("fT5-") //TODO fm-
|
||||
FilterMapsPrefix = []byte("fm-")
|
||||
filterMapsRangeKey = append(FilterMapsPrefix, byte('R'))
|
||||
filterMapRowPrefix = append(FilterMapsPrefix, byte('r')) // filterMapRowPrefix + mapRowIndex (uint64 big endian) -> filter row
|
||||
filterMapBlockPtrPrefix = append(FilterMapsPrefix, byte('b')) // filterMapBlockPtrPrefix + mapIndex (uint32 big endian) -> block number (uint64 big endian)
|
||||
|
|
|
@ -216,7 +216,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
eth.filterMaps = filtermaps.NewFilterMaps(chainDb, eth.blockchain)
|
||||
eth.filterMaps = filtermaps.NewFilterMaps(chainDb, eth.blockchain, config.LogHistory, config.LogNoHistory)
|
||||
|
||||
if config.BlobPool.Datadir != "" {
|
||||
config.BlobPool.Datadir = stack.ResolvePath(config.BlobPool.Datadir)
|
||||
|
|
|
@ -52,6 +52,7 @@ var Defaults = Config{
|
|||
NetworkId: 0, // enable auto configuration of networkID == chainID
|
||||
TxLookupLimit: 2350000,
|
||||
TransactionHistory: 2350000,
|
||||
LogHistory: 2350000,
|
||||
StateHistory: params.FullImmutabilityThreshold,
|
||||
DatabaseCache: 512,
|
||||
TrieCleanCache: 154,
|
||||
|
@ -94,6 +95,8 @@ type Config struct {
|
|||
TxLookupLimit uint64 `toml:",omitempty"` // The maximum number of blocks from head whose tx indices are reserved.
|
||||
|
||||
TransactionHistory uint64 `toml:",omitempty"` // The maximum number of blocks from head whose tx indices are reserved.
|
||||
LogHistory uint64 `toml:",omitempty"` // The maximum number of blocks from head where a log search index is maintained.
|
||||
LogNoHistory bool `toml:",omitempty"` // No log search index is maintained.
|
||||
StateHistory uint64 `toml:",omitempty"` // The maximum number of blocks from head whose state histories are reserved.
|
||||
|
||||
// State scheme represents the scheme used to store ethereum states and trie
|
||||
|
|
Loading…
Reference in New Issue