core/filtermaps: add indexer test
This commit is contained in:
parent
950ca52de8
commit
0d676dcaae
|
@ -23,6 +23,7 @@ type blockchain interface {
|
|||
SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription
|
||||
GetHeader(hash common.Hash, number uint64) *types.Header
|
||||
GetCanonicalHash(number uint64) common.Hash
|
||||
GetReceiptsByHash(hash common.Hash) types.Receipts
|
||||
}
|
||||
|
||||
// FilterMaps is the in-memory representation of the log index structure that is
|
||||
|
@ -33,7 +34,7 @@ type blockchain interface {
|
|||
// https://eips.ethereum.org/EIPS/eip-7745
|
||||
type FilterMaps struct {
|
||||
lock sync.RWMutex
|
||||
db ethdb.Database
|
||||
db ethdb.KeyValueStore
|
||||
closeCh chan struct{}
|
||||
closeWg sync.WaitGroup
|
||||
history uint64
|
||||
|
@ -53,6 +54,8 @@ type FilterMaps struct {
|
|||
blockPtrCache *lru.Cache[uint32, uint64]
|
||||
lvPointerCache *lru.Cache[uint64, uint64]
|
||||
revertPoints map[uint64]*revertPoint
|
||||
|
||||
testHook func(int)
|
||||
}
|
||||
|
||||
// filterMap is a full or partial in-memory representation of a filter map where
|
||||
|
@ -94,7 +97,7 @@ type filterMapsRange struct {
|
|||
|
||||
// NewFilterMaps creates a new FilterMaps and starts the indexer in order to keep
|
||||
// the structure in sync with the given blockchain.
|
||||
func NewFilterMaps(db ethdb.Database, chain blockchain, params Params, history uint64, noHistory bool) *FilterMaps {
|
||||
func NewFilterMaps(db ethdb.KeyValueStore, chain blockchain, params Params, history uint64, noHistory bool) *FilterMaps {
|
||||
rs, err := rawdb.ReadFilterMapsRange(db)
|
||||
if err != nil {
|
||||
log.Error("Error reading log index range", "error", err)
|
||||
|
@ -128,14 +131,17 @@ func NewFilterMaps(db ethdb.Database, chain blockchain, params Params, history u
|
|||
log.Error("Error fetching tail block pointer, resetting log index", "error", err)
|
||||
fm.filterMapsRange = filterMapsRange{} // updateLoop resets the database
|
||||
}
|
||||
fm.closeWg.Add(2)
|
||||
go fm.removeBloomBits()
|
||||
go fm.updateLoop()
|
||||
return fm
|
||||
}
|
||||
|
||||
func (f *FilterMaps) Start() {
|
||||
f.closeWg.Add(2)
|
||||
go f.removeBloomBits()
|
||||
go f.updateLoop()
|
||||
}
|
||||
|
||||
// Close ensures that the indexer is fully stopped before returning.
|
||||
func (f *FilterMaps) Close() {
|
||||
func (f *FilterMaps) Stop() {
|
||||
close(f.closeCh)
|
||||
f.closeWg.Wait()
|
||||
}
|
||||
|
@ -297,8 +303,7 @@ func (f *FilterMaps) getLogByLvIndex(lvIndex uint64) (*types.Log, error) {
|
|||
}
|
||||
}
|
||||
// get block receipts
|
||||
hash := f.chain.GetCanonicalHash(firstBlockNumber)
|
||||
receipts := rawdb.ReadRawReceipts(f.db, hash, firstBlockNumber) //TODO small cache
|
||||
receipts := f.chain.GetReceiptsByHash(f.chain.GetCanonicalHash(firstBlockNumber))
|
||||
if receipts == nil {
|
||||
return nil, errors.New("receipts not found")
|
||||
}
|
||||
|
|
|
@ -22,7 +22,12 @@ const (
|
|||
// updateLoop initializes and updates the log index structure according to the
|
||||
// canonical chain.
|
||||
func (f *FilterMaps) updateLoop() {
|
||||
defer f.closeWg.Done()
|
||||
defer func() {
|
||||
f.closeWg.Done()
|
||||
if f.testHook != nil {
|
||||
f.testHook(testHookStop)
|
||||
}
|
||||
}()
|
||||
|
||||
if f.noHistory {
|
||||
f.reset()
|
||||
|
@ -38,7 +43,7 @@ func (f *FilterMaps) updateLoop() {
|
|||
var (
|
||||
headEventCh = make(chan core.ChainHeadEvent, 10)
|
||||
sub = f.chain.SubscribeChainHeadEvent(headEventCh)
|
||||
head *types.Header
|
||||
head = f.chain.CurrentBlock()
|
||||
stop bool
|
||||
syncMatcher *FilterMapsMatcherBackend
|
||||
)
|
||||
|
@ -59,16 +64,21 @@ func (f *FilterMaps) updateLoop() {
|
|||
if stop {
|
||||
return
|
||||
}
|
||||
delay := time.Second * 20
|
||||
if f.testHook != nil {
|
||||
f.testHook(testHookWait)
|
||||
delay = 0
|
||||
}
|
||||
select {
|
||||
case ev := <-headEventCh:
|
||||
head = ev.Block.Header()
|
||||
case syncMatcher = <-f.matcherSyncCh:
|
||||
head = f.chain.CurrentBlock()
|
||||
case <-time.After(time.Second * 20):
|
||||
// keep updating log index during syncing
|
||||
head = f.chain.CurrentBlock()
|
||||
case <-f.closeCh:
|
||||
stop = true
|
||||
case <-time.After(delay):
|
||||
// keep updating log index during syncing
|
||||
head = f.chain.CurrentBlock()
|
||||
}
|
||||
}
|
||||
for head == nil {
|
||||
|
@ -151,7 +161,7 @@ func (f *FilterMaps) tryInit(head *types.Header) bool {
|
|||
if !f.reset() {
|
||||
return false
|
||||
}
|
||||
receipts := rawdb.ReadRawReceipts(f.db, head.Hash(), head.Number.Uint64())
|
||||
receipts := f.chain.GetReceiptsByHash(head.Hash())
|
||||
if receipts == nil {
|
||||
log.Error("Could not retrieve block receipts for init block", "number", head.Number, "hash", head.Hash())
|
||||
return true
|
||||
|
@ -161,6 +171,9 @@ func (f *FilterMaps) tryInit(head *types.Header) bool {
|
|||
log.Error("Could not initialize log index", "error", err)
|
||||
}
|
||||
f.applyUpdateBatch(update)
|
||||
if f.testHook != nil {
|
||||
f.testHook(testHookInit)
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
|
@ -222,7 +235,7 @@ func (f *FilterMaps) tryUpdateHead(newHead *types.Header) bool {
|
|||
update := f.newUpdateBatch()
|
||||
for i := len(newHeaders) - 1; i >= 0; i-- {
|
||||
newHeader := newHeaders[i]
|
||||
receipts := rawdb.ReadRawReceipts(f.db, newHeader.Hash(), newHeader.Number.Uint64())
|
||||
receipts := f.chain.GetReceiptsByHash(newHeader.Hash())
|
||||
if receipts == nil {
|
||||
log.Error("Could not retrieve block receipts for new block", "number", newHeader.Number, "hash", newHeader.Hash())
|
||||
break
|
||||
|
@ -234,10 +247,16 @@ func (f *FilterMaps) tryUpdateHead(newHead *types.Header) bool {
|
|||
if update.updatedRangeLength() >= f.mapsPerEpoch {
|
||||
// limit the amount of data updated in a single batch
|
||||
f.applyUpdateBatch(update)
|
||||
if f.testHook != nil {
|
||||
f.testHook(testHookUpdateHeadEpoch)
|
||||
}
|
||||
update = f.newUpdateBatch()
|
||||
}
|
||||
}
|
||||
f.applyUpdateBatch(update)
|
||||
if f.testHook != nil {
|
||||
f.testHook(testHookUpdateHead)
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
|
@ -273,6 +292,9 @@ func (f *FilterMaps) tryExtendTail(tailTarget uint64, stopFn func() bool) {
|
|||
if tailEpoch := update.tailEpoch(); tailEpoch < lastTailEpoch {
|
||||
// limit the amount of data updated in a single batch
|
||||
f.applyUpdateBatch(update)
|
||||
if f.testHook != nil {
|
||||
f.testHook(testHookExtendTailEpoch)
|
||||
}
|
||||
update = f.newUpdateBatch()
|
||||
lastTailEpoch = tailEpoch
|
||||
}
|
||||
|
@ -281,7 +303,7 @@ func (f *FilterMaps) tryExtendTail(tailTarget uint64, stopFn func() bool) {
|
|||
log.Error("Tail header not found", "number", number-1, "hash", parentHash)
|
||||
break
|
||||
}
|
||||
receipts := rawdb.ReadRawReceipts(f.db, newTail.Hash(), newTail.Number.Uint64())
|
||||
receipts := f.chain.GetReceiptsByHash(newTail.Hash())
|
||||
if receipts == nil {
|
||||
log.Error("Could not retrieve block receipts for tail block", "number", newTail.Number, "hash", newTail.Hash())
|
||||
break
|
||||
|
@ -293,6 +315,9 @@ func (f *FilterMaps) tryExtendTail(tailTarget uint64, stopFn func() bool) {
|
|||
number, parentHash = newTail.Number.Uint64(), newTail.ParentHash
|
||||
}
|
||||
f.applyUpdateBatch(update)
|
||||
if f.testHook != nil {
|
||||
f.testHook(testHookExtendTail)
|
||||
}
|
||||
}
|
||||
|
||||
// pruneTailPtr updates the tail block number and hash and the corresponding
|
||||
|
@ -330,6 +355,9 @@ func (f *FilterMaps) pruneTailPtr(tailTarget uint64) {
|
|||
fmr.tailBlockNumber, fmr.tailParentHash = tailTarget, tailParentHash
|
||||
fmr.tailBlockLvPointer = targetLvPointer
|
||||
f.setRange(f.db, fmr)
|
||||
if f.testHook != nil {
|
||||
f.testHook(testHookPruneTail)
|
||||
}
|
||||
}
|
||||
|
||||
// tryPruneTailMaps removes unused filter maps and corresponding log index
|
||||
|
@ -401,6 +429,9 @@ func (f *FilterMaps) pruneMaps(first, afterLast uint32, removeLvPtr *uint64) {
|
|||
if err := batch.Write(); err != nil {
|
||||
log.Crit("Could not write update batch", "error", err)
|
||||
}
|
||||
if f.testHook != nil {
|
||||
f.testHook(testHookPruneTailMaps)
|
||||
}
|
||||
}
|
||||
|
||||
// updateBatch is a memory overlay collecting changes to the index log structure
|
||||
|
@ -799,5 +830,8 @@ func (f *FilterMaps) revertTo(rp *revertPoint) error {
|
|||
if err := batch.Write(); err != nil {
|
||||
log.Crit("Could not write update batch", "error", err)
|
||||
}
|
||||
if f.testHook != nil {
|
||||
f.testHook(testHookRevert)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -0,0 +1,280 @@
|
|||
package filtermaps
|
||||
|
||||
import (
|
||||
"math/big"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/consensus/ethash"
|
||||
"github.com/ethereum/go-ethereum/core"
|
||||
"github.com/ethereum/go-ethereum/core/rawdb"
|
||||
"github.com/ethereum/go-ethereum/core/types"
|
||||
"github.com/ethereum/go-ethereum/ethdb"
|
||||
"github.com/ethereum/go-ethereum/event"
|
||||
"github.com/ethereum/go-ethereum/params"
|
||||
)
|
||||
|
||||
const (
|
||||
testHookInit = iota
|
||||
testHookUpdateHeadEpoch
|
||||
testHookUpdateHead
|
||||
testHookExtendTailEpoch
|
||||
testHookExtendTail
|
||||
testHookPruneTail
|
||||
testHookPruneTailMaps
|
||||
testHookRevert
|
||||
testHookWait
|
||||
testHookStop
|
||||
)
|
||||
|
||||
var testParams = Params{
|
||||
logMapHeight: 2,
|
||||
logMapsPerEpoch: 4,
|
||||
logValuesPerMap: 4,
|
||||
}
|
||||
|
||||
func TestIndexerSetHistory(t *testing.T) {
|
||||
ts := newTestSetup(t)
|
||||
ts.setHistory(0, false)
|
||||
ts.chain.addBlocks(1000, 5, 2, 4, false) // 50 log values per block
|
||||
ts.runUntilWait()
|
||||
ts.setHistory(100, false)
|
||||
ts.runUntil(func() bool {
|
||||
l := ts.lastRange.headLvPointer - ts.lastRange.tailLvPointer
|
||||
return l > 44000 && l < 45000
|
||||
})
|
||||
ts.setHistory(200, false)
|
||||
ts.runUntilWait()
|
||||
ts.setHistory(0, false)
|
||||
ts.runUntilWait()
|
||||
if ts.lastRange.headLvPointer-ts.lastRange.tailLvPointer != 50000 {
|
||||
t.Fatalf("Invalid number of log values in the final state (expected %d, got %d)", 50000, ts.lastRange.headLvPointer-ts.lastRange.tailLvPointer)
|
||||
}
|
||||
}
|
||||
|
||||
func TestIndexerRandomSetHistory(t *testing.T) {
|
||||
ts := newTestSetup(t)
|
||||
ts.chain.addBlocks(100, 5, 2, 4, false) // 50 log values per block
|
||||
for i := 0; i < 3000; i++ {
|
||||
ts.setHistory(uint64(rand.Intn(1001)), false)
|
||||
ts.nextEvent()
|
||||
for rand.Intn(20) != 0 && ts.lastEvent != testHookWait {
|
||||
ts.nextEvent()
|
||||
}
|
||||
}
|
||||
ts.setHistory(0, false)
|
||||
ts.runUntilWait()
|
||||
if ts.lastRange.headLvPointer-ts.lastRange.tailLvPointer != 5000 {
|
||||
t.Fatalf("Invalid number of log values in the final state (expected %d, got %d)", 5000, ts.lastRange.headLvPointer-ts.lastRange.tailLvPointer)
|
||||
}
|
||||
}
|
||||
|
||||
type testSetup struct {
|
||||
t *testing.T
|
||||
fm *FilterMaps
|
||||
db ethdb.Database
|
||||
chain *testChain
|
||||
eventCh chan int
|
||||
resumeCh chan struct{}
|
||||
lastEvent int
|
||||
lastRange filterMapsRange
|
||||
}
|
||||
|
||||
func newTestSetup(t *testing.T) *testSetup {
|
||||
return &testSetup{
|
||||
t: t,
|
||||
chain: newTestChain(),
|
||||
db: rawdb.NewMemoryDatabase(),
|
||||
eventCh: make(chan int),
|
||||
resumeCh: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
func (ts *testSetup) runUntil(stop func() bool) {
|
||||
for !stop() {
|
||||
ts.nextEvent()
|
||||
for ts.lastEvent == testHookWait {
|
||||
ts.t.Fatalf("Indexer in waiting state before runUntil condition is met")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (ts *testSetup) runUntilWait() {
|
||||
ts.nextEvent()
|
||||
for ts.lastEvent != testHookWait {
|
||||
ts.nextEvent()
|
||||
}
|
||||
}
|
||||
|
||||
func (ts *testSetup) setHistory(history uint64, noHistory bool) {
|
||||
if ts.fm != nil {
|
||||
ts.stopFm()
|
||||
}
|
||||
ts.fm = NewFilterMaps(ts.db, ts.chain, testParams, history, noHistory)
|
||||
ts.fm.testHook = ts.testHook
|
||||
ts.fm.Start()
|
||||
ts.lastEvent = <-ts.eventCh
|
||||
}
|
||||
|
||||
func (ts *testSetup) testHook(event int) {
|
||||
ts.eventCh <- event
|
||||
<-ts.resumeCh
|
||||
}
|
||||
|
||||
func (ts *testSetup) nextEvent() {
|
||||
ts.resumeCh <- struct{}{}
|
||||
ts.lastEvent = <-ts.eventCh
|
||||
ts.lastRange = ts.fm.getRange()
|
||||
}
|
||||
|
||||
func (ts *testSetup) stopFm() {
|
||||
close(ts.fm.closeCh)
|
||||
for {
|
||||
ts.nextEvent()
|
||||
if ts.lastEvent == testHookStop {
|
||||
break
|
||||
}
|
||||
}
|
||||
ts.resumeCh <- struct{}{}
|
||||
ts.fm.closeWg.Wait()
|
||||
}
|
||||
|
||||
func (ts *testSetup) close() {
|
||||
ts.stopFm()
|
||||
ts.db.Close()
|
||||
ts.chain.db.Close()
|
||||
}
|
||||
|
||||
type testChain struct {
|
||||
db ethdb.Database
|
||||
lock sync.RWMutex
|
||||
canonical []common.Hash
|
||||
chainHeadFeed event.Feed
|
||||
blocks map[common.Hash]*types.Block
|
||||
receipts map[common.Hash]types.Receipts
|
||||
}
|
||||
|
||||
func newTestChain() *testChain {
|
||||
return &testChain{
|
||||
blocks: make(map[common.Hash]*types.Block),
|
||||
receipts: make(map[common.Hash]types.Receipts),
|
||||
}
|
||||
}
|
||||
|
||||
func (tc *testChain) CurrentBlock() *types.Header {
|
||||
tc.lock.RLock()
|
||||
defer tc.lock.RUnlock()
|
||||
|
||||
if len(tc.canonical) == 0 {
|
||||
return nil
|
||||
}
|
||||
return tc.blocks[tc.canonical[len(tc.canonical)-1]].Header()
|
||||
}
|
||||
|
||||
func (tc *testChain) SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription {
|
||||
return tc.chainHeadFeed.Subscribe(ch)
|
||||
}
|
||||
|
||||
func (tc *testChain) GetHeader(hash common.Hash, number uint64) *types.Header {
|
||||
tc.lock.RLock()
|
||||
defer tc.lock.RUnlock()
|
||||
|
||||
return tc.blocks[hash].Header()
|
||||
}
|
||||
|
||||
func (tc *testChain) GetCanonicalHash(number uint64) common.Hash {
|
||||
tc.lock.RLock()
|
||||
defer tc.lock.RUnlock()
|
||||
|
||||
if uint64(len(tc.canonical)) <= number {
|
||||
return common.Hash{}
|
||||
}
|
||||
return tc.canonical[number]
|
||||
}
|
||||
|
||||
func (tc *testChain) GetReceiptsByHash(hash common.Hash) types.Receipts {
|
||||
tc.lock.RLock()
|
||||
defer tc.lock.RUnlock()
|
||||
|
||||
return tc.receipts[hash]
|
||||
}
|
||||
|
||||
func (tc *testChain) addBlocks(count, maxTxPerBlock, maxLogsPerReceipt, maxTopicsPerLog int, random bool) {
|
||||
tc.lock.Lock()
|
||||
defer tc.lock.Unlock()
|
||||
|
||||
blockGen := func(i int, gen *core.BlockGen) {
|
||||
var txCount int
|
||||
if random {
|
||||
txCount = rand.Intn(maxTxPerBlock + 1)
|
||||
} else {
|
||||
txCount = maxTxPerBlock
|
||||
}
|
||||
for k := txCount; k > 0; k-- {
|
||||
receipt := types.NewReceipt(nil, false, 0)
|
||||
var logCount int
|
||||
if random {
|
||||
logCount = rand.Intn(maxLogsPerReceipt + 1)
|
||||
} else {
|
||||
logCount = maxLogsPerReceipt
|
||||
}
|
||||
receipt.Logs = make([]*types.Log, logCount)
|
||||
for i := range receipt.Logs {
|
||||
log := &types.Log{}
|
||||
receipt.Logs[i] = log
|
||||
rand.Read(log.Address[:])
|
||||
var topicCount int
|
||||
if random {
|
||||
topicCount = rand.Intn(maxTopicsPerLog + 1)
|
||||
} else {
|
||||
topicCount = maxTopicsPerLog
|
||||
}
|
||||
log.Topics = make([]common.Hash, topicCount)
|
||||
for j := range log.Topics {
|
||||
rand.Read(log.Topics[j][:])
|
||||
}
|
||||
}
|
||||
gen.AddUncheckedReceipt(receipt)
|
||||
gen.AddUncheckedTx(types.NewTransaction(999, common.HexToAddress("0x999"), big.NewInt(999), 999, gen.BaseFee(), nil))
|
||||
}
|
||||
}
|
||||
|
||||
var (
|
||||
blocks []*types.Block
|
||||
receipts []types.Receipts
|
||||
engine = ethash.NewFaker()
|
||||
)
|
||||
|
||||
if len(tc.canonical) == 0 {
|
||||
gspec := &core.Genesis{
|
||||
Alloc: types.GenesisAlloc{},
|
||||
BaseFee: big.NewInt(params.InitialBaseFee),
|
||||
Config: params.TestChainConfig,
|
||||
}
|
||||
tc.db, blocks, receipts = core.GenerateChainWithGenesis(gspec, engine, count, blockGen)
|
||||
gblock := gspec.ToBlock()
|
||||
ghash := gblock.Hash()
|
||||
tc.canonical = []common.Hash{ghash}
|
||||
tc.blocks[ghash] = gblock
|
||||
tc.receipts[ghash] = types.Receipts{}
|
||||
} else {
|
||||
blocks, receipts = core.GenerateChain(params.TestChainConfig, tc.blocks[tc.canonical[len(tc.canonical)-1]], engine, tc.db, count, blockGen)
|
||||
}
|
||||
|
||||
for i, block := range blocks {
|
||||
num, hash := int(block.NumberU64()), block.Hash()
|
||||
if len(tc.canonical) != num {
|
||||
panic(nil)
|
||||
}
|
||||
tc.canonical = append(tc.canonical, hash)
|
||||
tc.blocks[hash] = block
|
||||
if receipts[i] != nil {
|
||||
tc.receipts[hash] = receipts[i]
|
||||
} else {
|
||||
tc.receipts[hash] = types.Receipts{}
|
||||
}
|
||||
}
|
||||
tc.chainHeadFeed.Send(core.ChainHeadEvent{Block: tc.blocks[tc.canonical[len(tc.canonical)-1]]})
|
||||
}
|
|
@ -360,6 +360,9 @@ func (s *Ethereum) Start() error {
|
|||
|
||||
// Start the networking layer
|
||||
s.handler.Start(s.p2pServer.MaxPeers)
|
||||
|
||||
// start log indexer
|
||||
s.filterMaps.Start()
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -403,7 +406,7 @@ func (s *Ethereum) Stop() error {
|
|||
s.handler.Stop()
|
||||
|
||||
// Then stop everything else.
|
||||
s.filterMaps.Close()
|
||||
s.filterMaps.Stop()
|
||||
s.txPool.Close()
|
||||
s.blockchain.Stop()
|
||||
s.engine.Close()
|
||||
|
|
Loading…
Reference in New Issue