core/filtermaps: simplified locking scheme

This commit is contained in:
Zsolt Felfoldi 2024-10-06 18:33:22 +02:00
parent 95cbcdf81b
commit 1c9e0d800c
5 changed files with 117 additions and 110 deletions

View File

@ -57,24 +57,32 @@ type FilterMaps struct {
chain blockchain chain blockchain
matcherSyncCh chan *FilterMapsMatcherBackend matcherSyncCh chan *FilterMapsMatcherBackend
// db and range are only modified by indexer under write lock; indexer can
// read them without a lock while matchers can access them under read lock
lock sync.RWMutex
db ethdb.KeyValueStore db ethdb.KeyValueStore
// fields written by the indexer and read by matcher backend. Indexer can
// read them without a lock and write them under indexLock write lock.
// Matcher backend can read them under indexLock read lock.
indexLock sync.RWMutex
filterMapsRange filterMapsRange
matchers map[*FilterMapsMatcherBackend]struct{}
// filterMapCache caches certain filter maps (headCacheSize most recent maps // filterMapCache caches certain filter maps (headCacheSize most recent maps
// and one tail map) that are expected to be frequently accessed and modified // and one tail map) that are expected to be frequently accessed and modified
// while updating the structure. Note that the set of cached maps depends // while updating the structure. Note that the set of cached maps depends
// only on filterMapsRange and rows of other maps are not cached here. // only on filterMapsRange and rows of other maps are not cached here.
filterMapLock sync.Mutex
filterMapCache map[uint32]filterMap filterMapCache map[uint32]filterMap
// also accessed by indexer and matcher backend but no locking needed.
blockPtrCache *lru.Cache[uint32, uint64] blockPtrCache *lru.Cache[uint32, uint64]
lvPointerCache *lru.Cache[uint64, uint64] lvPointerCache *lru.Cache[uint64, uint64]
revertPoints map[uint64]*revertPoint
// the matchers set and the fields of FilterMapsMatcherBackend instances are
// read and written both by exported functions and the indexer.
// Note that if both indexLock and matchersLock needs to be locked then
// indexLock should be locked first.
matchersLock sync.Mutex
matchers map[*FilterMapsMatcherBackend]struct{}
// fields only accessed by the indexer (no mutex required).
revertPoints map[uint64]*revertPoint
startHeadUpdate, loggedHeadUpdate, loggedTailExtend, loggedTailUnindex bool startHeadUpdate, loggedHeadUpdate, loggedTailExtend, loggedTailUnindex bool
startedHeadUpdate, startedTailExtend, startedTailUnindex time.Time startedHeadUpdate, startedTailExtend, startedTailUnindex time.Time
lastLogHeadUpdate, lastLogTailExtend, lastLogTailUnindex time.Time lastLogHeadUpdate, lastLogTailExtend, lastLogTailUnindex time.Time
@ -177,16 +185,16 @@ func (f *FilterMaps) Stop() {
// reset un-initializes the FilterMaps structure and removes all related data from // reset un-initializes the FilterMaps structure and removes all related data from
// the database. The function returns true if everything was successfully removed. // the database. The function returns true if everything was successfully removed.
func (f *FilterMaps) reset() bool { func (f *FilterMaps) reset() bool {
f.lock.Lock() f.indexLock.Lock()
f.filterMapsRange = filterMapsRange{} f.filterMapsRange = filterMapsRange{}
f.filterMapCache = make(map[uint32]filterMap) f.filterMapCache = make(map[uint32]filterMap)
f.revertPoints = make(map[uint64]*revertPoint) f.revertPoints = make(map[uint64]*revertPoint)
f.blockPtrCache.Purge() f.blockPtrCache.Purge()
f.lvPointerCache.Purge() f.lvPointerCache.Purge()
f.indexLock.Unlock()
// deleting the range first ensures that resetDb will be called again at next // deleting the range first ensures that resetDb will be called again at next
// startup and any leftover data will be removed even if it cannot finish now. // startup and any leftover data will be removed even if it cannot finish now.
rawdb.DeleteFilterMapsRange(f.db) rawdb.DeleteFilterMapsRange(f.db)
f.lock.Unlock()
return f.removeDbWithPrefix(rawdb.FilterMapsPrefix, "Resetting log index database") return f.removeDbWithPrefix(rawdb.FilterMapsPrefix, "Resetting log index database")
} }
@ -240,7 +248,7 @@ func (f *FilterMaps) removeDbWithPrefix(prefix []byte, action string) bool {
} }
// setRange updates the covered range and also adds the changes to the given batch. // setRange updates the covered range and also adds the changes to the given batch.
// Note that this function assumes that the read/write lock is being held. // Note that this function assumes that the index write lock is being held.
func (f *FilterMaps) setRange(batch ethdb.KeyValueWriter, newRange filterMapsRange) { func (f *FilterMaps) setRange(batch ethdb.KeyValueWriter, newRange filterMapsRange) {
f.filterMapsRange = newRange f.filterMapsRange = newRange
rs := rawdb.FilterMapsRange{ rs := rawdb.FilterMapsRange{
@ -259,14 +267,11 @@ func (f *FilterMaps) setRange(batch ethdb.KeyValueWriter, newRange filterMapsRan
// updateMapCache updates the maps covered by the filterMapCache according to the // updateMapCache updates the maps covered by the filterMapCache according to the
// covered range. // covered range.
// Note that this function assumes that the read lock is being held. // Note that this function assumes that the index write lock is being held.
func (f *FilterMaps) updateMapCache() { func (f *FilterMaps) updateMapCache() {
if !f.initialized { if !f.initialized {
return return
} }
f.filterMapLock.Lock()
defer f.filterMapLock.Unlock()
newFilterMapCache := make(map[uint32]filterMap) newFilterMapCache := make(map[uint32]filterMap)
firstMap, afterLastMap := uint32(f.tailBlockLvPointer>>f.logValuesPerMap), uint32((f.headLvPointer+f.valuesPerMap-1)>>f.logValuesPerMap) firstMap, afterLastMap := uint32(f.tailBlockLvPointer>>f.logValuesPerMap), uint32((f.headLvPointer+f.valuesPerMap-1)>>f.logValuesPerMap)
headCacheFirst := firstMap + 1 headCacheFirst := firstMap + 1
@ -294,7 +299,8 @@ func (f *FilterMaps) updateMapCache() {
// Note that this function assumes that the log index structure is consistent // Note that this function assumes that the log index structure is consistent
// with the canonical chain at the point where the given log value index points. // with the canonical chain at the point where the given log value index points.
// If this is not the case then an invalid result or an error may be returned. // If this is not the case then an invalid result or an error may be returned.
// Note that this function assumes that the read lock is being held. // Note that this function assumes that the indexer read lock is being held when
// called from outside the updateLoop goroutine.
func (f *FilterMaps) getLogByLvIndex(lvIndex uint64) (*types.Log, error) { func (f *FilterMaps) getLogByLvIndex(lvIndex uint64) (*types.Log, error) {
if lvIndex < f.tailBlockLvPointer || lvIndex > f.headLvPointer { if lvIndex < f.tailBlockLvPointer || lvIndex > f.headLvPointer {
return nil, nil return nil, nil
@ -361,10 +367,9 @@ func (f *FilterMaps) getLogByLvIndex(lvIndex uint64) (*types.Log, error) {
// then a non-nil zero length row is returned. // then a non-nil zero length row is returned.
// Note that the returned slices should not be modified, they should be copied // Note that the returned slices should not be modified, they should be copied
// on write. // on write.
// Note that the function assumes that the indexLock is not being held (should
// only be called from the updateLoop goroutine).
func (f *FilterMaps) getFilterMapRow(mapIndex, rowIndex uint32) (FilterRow, error) { func (f *FilterMaps) getFilterMapRow(mapIndex, rowIndex uint32) (FilterRow, error) {
f.filterMapLock.Lock()
defer f.filterMapLock.Unlock()
fm := f.filterMapCache[mapIndex] fm := f.filterMapCache[mapIndex]
if fm != nil && fm[rowIndex] != nil { if fm != nil && fm[rowIndex] != nil {
return fm[rowIndex], nil return fm[rowIndex], nil
@ -374,19 +379,31 @@ func (f *FilterMaps) getFilterMapRow(mapIndex, rowIndex uint32) (FilterRow, erro
return nil, err return nil, err
} }
if fm != nil { if fm != nil {
f.indexLock.Lock()
fm[rowIndex] = FilterRow(row) fm[rowIndex] = FilterRow(row)
f.indexLock.Unlock()
} }
return FilterRow(row), nil return FilterRow(row), nil
} }
// getFilterMapRowUncached returns the given row of the given map. If the row is
// empty then a non-nil zero length row is returned.
// This function bypasses the memory cache which is mostly useful for processing
// the head and tail maps during the indexing process and should be used by the
// matcher backend which rarely accesses the same row twice and therefore does
// not really benefit from caching anyways.
// The function is unaffected by the indexLock mutex.
func (f *FilterMaps) getFilterMapRowUncached(mapIndex, rowIndex uint32) (FilterRow, error) {
row, err := rawdb.ReadFilterMapRow(f.db, f.mapRowIndex(mapIndex, rowIndex))
return FilterRow(row), err
}
// storeFilterMapRow stores a row at the given row index of the given map and also // storeFilterMapRow stores a row at the given row index of the given map and also
// caches it in filterMapCache if the given map is cached. // caches it in filterMapCache if the given map is cached.
// Note that empty rows are not stored in the database and therefore there is no // Note that empty rows are not stored in the database and therefore there is no
// separate delete function; deleting a row is the same as storing an empty row. // separate delete function; deleting a row is the same as storing an empty row.
// Note that this function assumes that the indexer write lock is being held.
func (f *FilterMaps) storeFilterMapRow(batch ethdb.Batch, mapIndex, rowIndex uint32, row FilterRow) { func (f *FilterMaps) storeFilterMapRow(batch ethdb.Batch, mapIndex, rowIndex uint32, row FilterRow) {
f.filterMapLock.Lock()
defer f.filterMapLock.Unlock()
if fm := f.filterMapCache[mapIndex]; fm != nil { if fm := f.filterMapCache[mapIndex]; fm != nil {
fm[rowIndex] = row fm[rowIndex] = row
} }
@ -407,7 +424,8 @@ func (f *FilterMaps) mapRowIndex(mapIndex, rowIndex uint32) uint64 {
// getBlockLvPointer returns the starting log value index where the log values // getBlockLvPointer returns the starting log value index where the log values
// generated by the given block are located. If blockNumber is beyond the current // generated by the given block are located. If blockNumber is beyond the current
// head then the first unoccupied log value index is returned. // head then the first unoccupied log value index is returned.
// Note that this function assumes that the read lock is being held. // Note that this function assumes that the indexer read lock is being held when
// called from outside the updateLoop goroutine.
func (f *FilterMaps) getBlockLvPointer(blockNumber uint64) (uint64, error) { func (f *FilterMaps) getBlockLvPointer(blockNumber uint64) (uint64, error) {
if blockNumber > f.headBlockNumber { if blockNumber > f.headBlockNumber {
return f.headLvPointer, nil return f.headLvPointer, nil

View File

@ -46,9 +46,9 @@ func (f *FilterMaps) updateLoop() {
return return
} }
f.lock.Lock() f.indexLock.Lock()
f.updateMapCache() f.updateMapCache()
f.lock.Unlock() f.indexLock.Unlock()
if rp, err := f.newUpdateBatch().makeRevertPoint(); err == nil { if rp, err := f.newUpdateBatch().makeRevertPoint(); err == nil {
f.revertPoints[rp.blockNumber] = rp f.revertPoints[rp.blockNumber] = rp
} else { } else {
@ -61,11 +61,10 @@ func (f *FilterMaps) updateLoop() {
head = f.chain.CurrentBlock() head = f.chain.CurrentBlock()
stop bool stop bool
syncMatcher *FilterMapsMatcherBackend syncMatcher *FilterMapsMatcherBackend
fmr = f.getRange()
) )
matcherSync := func() { matcherSync := func() {
if syncMatcher != nil && fmr.initialized && fmr.headBlockHash == head.Hash() { if syncMatcher != nil && f.initialized && f.headBlockHash == head.Hash() {
syncMatcher.synced(head) syncMatcher.synced(head)
syncMatcher = nil syncMatcher = nil
} }
@ -92,7 +91,7 @@ func (f *FilterMaps) updateLoop() {
stop = true stop = true
case ch := <-f.waitIdleCh: case ch := <-f.waitIdleCh:
head = f.chain.CurrentBlock() head = f.chain.CurrentBlock()
if head.Hash() == f.getRange().headBlockHash { if head.Hash() == f.headBlockHash {
ch <- true ch <- true
continue loop continue loop
} }
@ -110,27 +109,24 @@ func (f *FilterMaps) updateLoop() {
return return
} }
} }
fmr = f.getRange()
for !stop { for !stop {
if !fmr.initialized { if !f.initialized {
if !f.tryInit(head) { if !f.tryInit(head) {
return return
} }
fmr = f.getRange() if !f.initialized {
if !fmr.initialized {
wait() wait()
continue continue
} }
} }
// log index is initialized // log index is initialized
if fmr.headBlockHash != head.Hash() { if f.headBlockHash != head.Hash() {
if !f.tryUpdateHead(head) { if !f.tryUpdateHead(head) {
return return
} }
fmr = f.getRange() if f.headBlockHash != head.Hash() {
if fmr.headBlockHash != head.Hash() {
wait() wait()
continue continue
} }
@ -151,8 +147,8 @@ func (f *FilterMaps) updateLoop() {
head = f.chain.CurrentBlock() head = f.chain.CurrentBlock()
} }
// stop if there is a new chain head (always prioritize head updates) // stop if there is a new chain head (always prioritize head updates)
return fmr.headBlockHash != head.Hash() return f.headBlockHash != head.Hash() || syncMatcher != nil
}) && fmr.headBlockHash == head.Hash() { }) && f.headBlockHash == head.Hash() {
// if tail processing reached its final state and there is no new // if tail processing reached its final state and there is no new
// head then wait for more events // head then wait for more events
wait() wait()
@ -176,14 +172,6 @@ func (f *FilterMaps) WaitIdle() {
} }
} }
// getRange returns the current filterMapsRange.
func (f *FilterMaps) getRange() filterMapsRange {
f.lock.RLock()
defer f.lock.RUnlock()
return f.filterMapsRange
}
// tryInit attempts to initialize the log index structure. // tryInit attempts to initialize the log index structure.
// Returns false if indexer was stopped during a database reset. In this case the // Returns false if indexer was stopped during a database reset. In this case the
// indexer should exit and remaining parts of the old database will be removed // indexer should exit and remaining parts of the old database will be removed
@ -215,17 +203,16 @@ func (f *FilterMaps) tryInit(head *types.Header) bool {
// at next startup. // at next startup.
func (f *FilterMaps) tryUpdateHead(newHead *types.Header) bool { func (f *FilterMaps) tryUpdateHead(newHead *types.Header) bool {
defer func() { defer func() {
fmr := f.getRange() if newHead.Hash() == f.headBlockHash {
if newHead.Hash() == fmr.headBlockHash {
if f.loggedHeadUpdate { if f.loggedHeadUpdate {
log.Info("Forward log indexing finished", "processed", fmr.headBlockNumber-f.ptrHeadUpdate, log.Info("Forward log indexing finished", "processed", f.headBlockNumber-f.ptrHeadUpdate,
"elapsed", common.PrettyDuration(time.Since(f.lastLogHeadUpdate))) "elapsed", common.PrettyDuration(time.Since(f.lastLogHeadUpdate)))
f.loggedHeadUpdate, f.startHeadUpdate = false, false f.loggedHeadUpdate, f.startHeadUpdate = false, false
} }
} else { } else {
if time.Since(f.lastLogHeadUpdate) > logFrequency || !f.loggedHeadUpdate { if time.Since(f.lastLogHeadUpdate) > logFrequency || !f.loggedHeadUpdate {
log.Info("Forward log indexing in progress", "processed", fmr.headBlockNumber-f.ptrHeadUpdate, log.Info("Forward log indexing in progress", "processed", f.headBlockNumber-f.ptrHeadUpdate,
"remaining", newHead.Number.Uint64()-fmr.headBlockNumber, "remaining", newHead.Number.Uint64()-f.headBlockNumber,
"elapsed", common.PrettyDuration(time.Since(f.startedHeadUpdate))) "elapsed", common.PrettyDuration(time.Since(f.startedHeadUpdate)))
f.loggedHeadUpdate = true f.loggedHeadUpdate = true
f.lastLogHeadUpdate = time.Now() f.lastLogHeadUpdate = time.Now()
@ -238,7 +225,7 @@ func (f *FilterMaps) tryUpdateHead(newHead *types.Header) bool {
f.lastLogHeadUpdate = time.Now() f.lastLogHeadUpdate = time.Now()
f.startedHeadUpdate = f.lastLogHeadUpdate f.startedHeadUpdate = f.lastLogHeadUpdate
f.startHeadUpdate = true f.startHeadUpdate = true
f.ptrHeadUpdate = f.getRange().headBlockNumber f.ptrHeadUpdate = f.headBlockNumber
} }
// iterate back from new head until the log index head or a revert point and // iterate back from new head until the log index head or a revert point and
// collect headers of blocks to be added // collect headers of blocks to be added
@ -321,7 +308,7 @@ func (f *FilterMaps) tryUpdateTail(head *types.Header, stopFn func() bool) bool
tailTarget = headNum + 1 - f.history tailTarget = headNum + 1 - f.history
} }
} }
tailNum := f.getRange().tailBlockNumber tailNum := f.tailBlockNumber
if tailNum > tailTarget { if tailNum > tailTarget {
if !f.tryExtendTail(tailTarget, stopFn) { if !f.tryExtendTail(tailTarget, stopFn) {
return false return false
@ -337,23 +324,21 @@ func (f *FilterMaps) tryUpdateTail(head *types.Header, stopFn func() bool) bool
// indexed history length is achieved. Returns true if finished. // indexed history length is achieved. Returns true if finished.
func (f *FilterMaps) tryExtendTail(tailTarget uint64, stopFn func() bool) bool { func (f *FilterMaps) tryExtendTail(tailTarget uint64, stopFn func() bool) bool {
defer func() { defer func() {
fmr := f.getRange() if f.tailBlockNumber <= tailTarget {
if fmr.tailBlockNumber <= tailTarget {
if f.loggedTailExtend { if f.loggedTailExtend {
log.Info("Reverse log indexing finished", "history", fmr.headBlockNumber+1-fmr.tailBlockNumber, log.Info("Reverse log indexing finished", "history", f.headBlockNumber+1-f.tailBlockNumber,
"processed", f.ptrTailExtend-fmr.tailBlockNumber, "elapsed", common.PrettyDuration(time.Since(f.lastLogTailExtend))) "processed", f.ptrTailExtend-f.tailBlockNumber, "elapsed", common.PrettyDuration(time.Since(f.lastLogTailExtend)))
f.loggedTailExtend = false f.loggedTailExtend = false
} }
} }
}() }()
fmr := f.getRange() number, parentHash := f.tailBlockNumber, f.tailParentHash
number, parentHash := fmr.tailBlockNumber, fmr.tailParentHash
if !f.loggedTailExtend { if !f.loggedTailExtend {
f.lastLogTailExtend = time.Now() f.lastLogTailExtend = time.Now()
f.startedTailExtend = f.lastLogTailExtend f.startedTailExtend = f.lastLogTailExtend
f.ptrTailExtend = fmr.tailBlockNumber f.ptrTailExtend = f.tailBlockNumber
} }
update := f.newUpdateBatch() update := f.newUpdateBatch()
@ -400,20 +385,18 @@ func (f *FilterMaps) tryUnindexTail(tailTarget uint64, stopFn func() bool) bool
if !f.loggedTailUnindex { if !f.loggedTailUnindex {
f.lastLogTailUnindex = time.Now() f.lastLogTailUnindex = time.Now()
f.startedTailUnindex = f.lastLogTailUnindex f.startedTailUnindex = f.lastLogTailUnindex
f.ptrTailUnindex = f.getRange().tailBlockNumber f.ptrTailUnindex = f.tailBlockNumber
} }
for { for {
if f.unindexTailEpoch(tailTarget) { if f.unindexTailEpoch(tailTarget) {
fmr := f.getRange() log.Info("Log unindexing finished", "history", f.headBlockNumber+1-f.tailBlockNumber,
log.Info("Log unindexing finished", "history", fmr.headBlockNumber+1-fmr.tailBlockNumber, "removed", f.tailBlockNumber-f.ptrTailUnindex, "elapsed", common.PrettyDuration(time.Since(f.lastLogTailUnindex)))
"removed", fmr.tailBlockNumber-f.ptrTailUnindex, "elapsed", common.PrettyDuration(time.Since(f.lastLogTailUnindex)))
f.loggedTailUnindex = false f.loggedTailUnindex = false
return true return true
} }
if time.Since(f.lastLogTailUnindex) > logFrequency || !f.loggedTailUnindex { if time.Since(f.lastLogTailUnindex) > logFrequency || !f.loggedTailUnindex {
fmr := f.getRange() log.Info("Log unindexing in progress", "history", f.headBlockNumber+1-f.tailBlockNumber,
log.Info("Log unindexing in progress", "history", fmr.headBlockNumber+1-fmr.tailBlockNumber, "removed", f.tailBlockNumber-f.ptrTailUnindex, "remaining", tailTarget-f.tailBlockNumber,
"removed", fmr.tailBlockNumber-f.ptrTailUnindex, "remaining", tailTarget-fmr.tailBlockNumber,
"elapsed", common.PrettyDuration(time.Since(f.startedTailUnindex))) "elapsed", common.PrettyDuration(time.Since(f.startedTailUnindex)))
f.loggedTailUnindex = true f.loggedTailUnindex = true
f.lastLogTailUnindex = time.Now() f.lastLogTailUnindex = time.Now()
@ -427,11 +410,9 @@ func (f *FilterMaps) tryUnindexTail(tailTarget uint64, stopFn func() bool) bool
// unindexTailEpoch unindexes at most an epoch of tail log index data until the // unindexTailEpoch unindexes at most an epoch of tail log index data until the
// desired tail target is reached. // desired tail target is reached.
func (f *FilterMaps) unindexTailEpoch(tailTarget uint64) (finished bool) { func (f *FilterMaps) unindexTailEpoch(tailTarget uint64) (finished bool) {
f.lock.Lock()
oldRange := f.filterMapsRange oldRange := f.filterMapsRange
newTailMap, changed := f.unindexTailPtr(tailTarget) newTailMap, changed := f.unindexTailPtr(tailTarget)
newRange := f.filterMapsRange newRange := f.filterMapsRange
f.lock.Unlock()
if !changed { if !changed {
return true // nothing more to do return true // nothing more to do
@ -441,6 +422,7 @@ func (f *FilterMaps) unindexTailEpoch(tailTarget uint64) (finished bool) {
oldTailMap := uint32(oldRange.tailLvPointer >> f.logValuesPerMap) oldTailMap := uint32(oldRange.tailLvPointer >> f.logValuesPerMap)
// remove map data [oldTailMap, newTailMap) and block data // remove map data [oldTailMap, newTailMap) and block data
// [oldRange.tailBlockNumber, newRange.tailBlockNumber) // [oldRange.tailBlockNumber, newRange.tailBlockNumber)
f.indexLock.Lock()
batch := f.db.NewBatch() batch := f.db.NewBatch()
for blockNumber := oldRange.tailBlockNumber; blockNumber < newRange.tailBlockNumber; blockNumber++ { for blockNumber := oldRange.tailBlockNumber; blockNumber < newRange.tailBlockNumber; blockNumber++ {
f.deleteBlockLvPointer(batch, blockNumber) f.deleteBlockLvPointer(batch, blockNumber)
@ -459,14 +441,15 @@ func (f *FilterMaps) unindexTailEpoch(tailTarget uint64) (finished bool) {
newRange.tailLvPointer = uint64(newTailMap) << f.logValuesPerMap newRange.tailLvPointer = uint64(newTailMap) << f.logValuesPerMap
if newRange.tailLvPointer > newRange.tailBlockLvPointer { if newRange.tailLvPointer > newRange.tailBlockLvPointer {
log.Error("Cannot unindex filter maps beyond tail block log value pointer", "tailLvPointer", newRange.tailLvPointer, "tailBlockLvPointer", newRange.tailBlockLvPointer) log.Error("Cannot unindex filter maps beyond tail block log value pointer", "tailLvPointer", newRange.tailLvPointer, "tailBlockLvPointer", newRange.tailBlockLvPointer)
f.indexLock.Unlock()
return return
} }
f.lock.Lock()
f.setRange(batch, newRange) f.setRange(batch, newRange)
f.indexLock.Unlock()
if err := batch.Write(); err != nil { if err := batch.Write(); err != nil {
log.Crit("Could not write update batch", "error", err) log.Crit("Could not write update batch", "error", err)
} }
f.lock.Unlock()
return return
} }
@ -539,9 +522,6 @@ type updateBatch struct {
// newUpdateBatch creates a new updateBatch. // newUpdateBatch creates a new updateBatch.
func (f *FilterMaps) newUpdateBatch() *updateBatch { func (f *FilterMaps) newUpdateBatch() *updateBatch {
f.lock.RLock()
defer f.lock.RUnlock()
return &updateBatch{ return &updateBatch{
f: f, f: f,
filterMapsRange: f.filterMapsRange, filterMapsRange: f.filterMapsRange,
@ -555,8 +535,7 @@ func (f *FilterMaps) newUpdateBatch() *updateBatch {
// applyUpdateBatch writes creates a batch and writes all changes to the database // applyUpdateBatch writes creates a batch and writes all changes to the database
// and also updates the in-memory representations of log index data. // and also updates the in-memory representations of log index data.
func (f *FilterMaps) applyUpdateBatch(u *updateBatch) { func (f *FilterMaps) applyUpdateBatch(u *updateBatch) {
f.lock.Lock() f.indexLock.Lock()
defer f.lock.Unlock()
batch := f.db.NewBatch() batch := f.db.NewBatch()
// write or remove block to log value index pointers // write or remove block to log value index pointers
@ -615,6 +594,8 @@ func (f *FilterMaps) applyUpdateBatch(u *updateBatch) {
} }
// update filterMapsRange // update filterMapsRange
f.setRange(batch, u.filterMapsRange) f.setRange(batch, u.filterMapsRange)
f.indexLock.Unlock()
if err := batch.Write(); err != nil { if err := batch.Write(); err != nil {
log.Crit("Could not write update batch", "error", err) log.Crit("Could not write update batch", "error", err)
} }
@ -849,9 +830,6 @@ func (u *updateBatch) makeRevertPoint() (*revertPoint, error) {
// number from memory cache or from the database if available. If no such revert // number from memory cache or from the database if available. If no such revert
// point is available then it returns no result and no error. // point is available then it returns no result and no error.
func (f *FilterMaps) getRevertPoint(blockNumber uint64) (*revertPoint, error) { func (f *FilterMaps) getRevertPoint(blockNumber uint64) (*revertPoint, error) {
f.lock.RLock()
defer f.lock.RUnlock()
if blockNumber > f.headBlockNumber { if blockNumber > f.headBlockNumber {
blockNumber = f.headBlockNumber blockNumber = f.headBlockNumber
} }
@ -879,9 +857,6 @@ func (f *FilterMaps) getRevertPoint(blockNumber uint64) (*revertPoint, error) {
// revertTo reverts the log index to the given revert point. // revertTo reverts the log index to the given revert point.
func (f *FilterMaps) revertTo(rp *revertPoint) error { func (f *FilterMaps) revertTo(rp *revertPoint) error {
f.lock.Lock()
defer f.lock.Unlock()
batch := f.db.NewBatch() batch := f.db.NewBatch()
afterLastMap := uint32((f.headLvPointer + f.valuesPerMap - 1) >> f.logValuesPerMap) afterLastMap := uint32((f.headLvPointer + f.valuesPerMap - 1) >> f.logValuesPerMap)
if rp.mapIndex > afterLastMap { if rp.mapIndex > afterLastMap {
@ -918,7 +893,10 @@ func (f *FilterMaps) revertTo(rp *revertPoint) error {
newRange.headLvPointer = lvPointer newRange.headLvPointer = lvPointer
newRange.headBlockNumber = rp.blockNumber newRange.headBlockNumber = rp.blockNumber
newRange.headBlockHash = rp.blockHash newRange.headBlockHash = rp.blockHash
f.indexLock.Lock()
f.setRange(batch, newRange) f.setRange(batch, newRange)
f.indexLock.Unlock()
if err := batch.Write(); err != nil { if err := batch.Write(); err != nil {
log.Crit("Could not write update batch", "error", err) log.Crit("Could not write update batch", "error", err)
} }

View File

@ -79,14 +79,13 @@ func TestIndexerRandomRange(t *testing.T) {
ts.chain.setCanonicalChain(forks[fork][:head+1]) ts.chain.setCanonicalChain(forks[fork][:head+1])
} }
ts.fm.WaitIdle() ts.fm.WaitIdle()
fmr := ts.fm.getRange()
if noHistory { if noHistory {
if fmr.initialized { if ts.fm.initialized {
t.Fatalf("filterMapsRange initialized while indexing is disabled") t.Fatalf("filterMapsRange initialized while indexing is disabled")
} }
continue continue
} }
if !fmr.initialized { if !ts.fm.initialized {
t.Fatalf("filterMapsRange not initialized while indexing is enabled") t.Fatalf("filterMapsRange not initialized while indexing is enabled")
} }
var ( var (
@ -99,21 +98,21 @@ func TestIndexerRandomRange(t *testing.T) {
if tail > 0 { if tail > 0 {
tpHash = forks[fork][tail-1] tpHash = forks[fork][tail-1]
} }
if fmr.headBlockNumber != uint64(head) || fmr.headBlockHash != forks[fork][head] { if ts.fm.headBlockNumber != uint64(head) || ts.fm.headBlockHash != forks[fork][head] {
ts.t.Fatalf("Invalid index head (expected #%d %v, got #%d %v)", head, forks[fork][head], fmr.headBlockNumber, fmr.headBlockHash) ts.t.Fatalf("Invalid index head (expected #%d %v, got #%d %v)", head, forks[fork][head], ts.fm.headBlockNumber, ts.fm.headBlockHash)
} }
if fmr.tailBlockNumber != uint64(tail) || fmr.tailParentHash != tpHash { if ts.fm.tailBlockNumber != uint64(tail) || ts.fm.tailParentHash != tpHash {
ts.t.Fatalf("Invalid index head (expected #%d %v, got #%d %v)", tail, tpHash, fmr.tailBlockNumber, fmr.tailParentHash) ts.t.Fatalf("Invalid index head (expected #%d %v, got #%d %v)", tail, tpHash, ts.fm.tailBlockNumber, ts.fm.tailParentHash)
} }
expLvCount := uint64(head+1-tail) * 50 expLvCount := uint64(head+1-tail) * 50
if tail == 0 { if tail == 0 {
expLvCount -= 50 // no logs in genesis block expLvCount -= 50 // no logs in genesis block
} }
if fmr.headLvPointer-fmr.tailBlockLvPointer != expLvCount { if ts.fm.headLvPointer-ts.fm.tailBlockLvPointer != expLvCount {
ts.t.Fatalf("Invalid number of log values (expected %d, got %d)", expLvCount, fmr.headLvPointer-fmr.tailBlockLvPointer) ts.t.Fatalf("Invalid number of log values (expected %d, got %d)", expLvCount, ts.fm.headLvPointer-ts.fm.tailBlockLvPointer)
} }
if fmr.tailBlockLvPointer-fmr.tailLvPointer >= ts.params.valuesPerMap { if ts.fm.tailBlockLvPointer-ts.fm.tailLvPointer >= ts.params.valuesPerMap {
ts.t.Fatalf("Invalid number of leftover tail log values (expected < %d, got %d)", ts.params.valuesPerMap, fmr.tailBlockLvPointer-fmr.tailLvPointer) ts.t.Fatalf("Invalid number of leftover tail log values (expected < %d, got %d)", ts.params.valuesPerMap, ts.fm.tailBlockLvPointer-ts.fm.tailLvPointer)
} }
} }
} }

View File

@ -26,6 +26,7 @@ import (
// FilterMapsMatcherBackend implements MatcherBackend. // FilterMapsMatcherBackend implements MatcherBackend.
type FilterMapsMatcherBackend struct { type FilterMapsMatcherBackend struct {
f *FilterMaps f *FilterMaps
// these fields should be accessed under f.matchersLock mutex.
valid bool valid bool
firstValid, lastValid uint64 firstValid, lastValid uint64
syncCh chan SyncRange syncCh chan SyncRange
@ -35,8 +36,12 @@ type FilterMapsMatcherBackend struct {
// the active matcher set. // the active matcher set.
// Note that Close should always be called when the matcher is no longer used. // Note that Close should always be called when the matcher is no longer used.
func (f *FilterMaps) NewMatcherBackend() *FilterMapsMatcherBackend { func (f *FilterMaps) NewMatcherBackend() *FilterMapsMatcherBackend {
f.lock.Lock() f.indexLock.RLock()
defer f.lock.Unlock() f.matchersLock.Lock()
defer func() {
f.matchersLock.Unlock()
f.indexLock.RUnlock()
}()
fm := &FilterMapsMatcherBackend{ fm := &FilterMapsMatcherBackend{
f: f, f: f,
@ -58,8 +63,8 @@ func (fm *FilterMapsMatcherBackend) GetParams() *Params {
// any SyncLogIndex calls are cancelled. // any SyncLogIndex calls are cancelled.
// Close implements MatcherBackend. // Close implements MatcherBackend.
func (fm *FilterMapsMatcherBackend) Close() { func (fm *FilterMapsMatcherBackend) Close() {
fm.f.lock.Lock() fm.f.matchersLock.Lock()
defer fm.f.lock.Unlock() defer fm.f.matchersLock.Unlock()
delete(fm.f.matchers, fm) delete(fm.f.matchers, fm)
} }
@ -70,7 +75,7 @@ func (fm *FilterMapsMatcherBackend) Close() {
// on write. // on write.
// GetFilterMapRow implements MatcherBackend. // GetFilterMapRow implements MatcherBackend.
func (fm *FilterMapsMatcherBackend) GetFilterMapRow(ctx context.Context, mapIndex, rowIndex uint32) (FilterRow, error) { func (fm *FilterMapsMatcherBackend) GetFilterMapRow(ctx context.Context, mapIndex, rowIndex uint32) (FilterRow, error) {
return fm.f.getFilterMapRow(mapIndex, rowIndex) return fm.f.getFilterMapRowUncached(mapIndex, rowIndex)
} }
// GetBlockLvPointer returns the starting log value index where the log values // GetBlockLvPointer returns the starting log value index where the log values
@ -78,8 +83,8 @@ func (fm *FilterMapsMatcherBackend) GetFilterMapRow(ctx context.Context, mapInde
// head then the first unoccupied log value index is returned. // head then the first unoccupied log value index is returned.
// GetBlockLvPointer implements MatcherBackend. // GetBlockLvPointer implements MatcherBackend.
func (fm *FilterMapsMatcherBackend) GetBlockLvPointer(ctx context.Context, blockNumber uint64) (uint64, error) { func (fm *FilterMapsMatcherBackend) GetBlockLvPointer(ctx context.Context, blockNumber uint64) (uint64, error) {
fm.f.lock.RLock() fm.f.indexLock.RLock()
defer fm.f.lock.RUnlock() defer fm.f.indexLock.RUnlock()
return fm.f.getBlockLvPointer(blockNumber) return fm.f.getBlockLvPointer(blockNumber)
} }
@ -94,8 +99,8 @@ func (fm *FilterMapsMatcherBackend) GetBlockLvPointer(ctx context.Context, block
// using SyncLogIndex and re-process certain blocks if necessary. // using SyncLogIndex and re-process certain blocks if necessary.
// GetLogByLvIndex implements MatcherBackend. // GetLogByLvIndex implements MatcherBackend.
func (fm *FilterMapsMatcherBackend) GetLogByLvIndex(ctx context.Context, lvIndex uint64) (*types.Log, error) { func (fm *FilterMapsMatcherBackend) GetLogByLvIndex(ctx context.Context, lvIndex uint64) (*types.Log, error) {
fm.f.lock.RLock() fm.f.indexLock.RLock()
defer fm.f.lock.RUnlock() defer fm.f.indexLock.RUnlock()
return fm.f.getLogByLvIndex(lvIndex) return fm.f.getLogByLvIndex(lvIndex)
} }
@ -108,8 +113,12 @@ func (fm *FilterMapsMatcherBackend) GetLogByLvIndex(ctx context.Context, lvIndex
// should be passed as a parameter and the existing log index should be consistent // should be passed as a parameter and the existing log index should be consistent
// with that chain. // with that chain.
func (fm *FilterMapsMatcherBackend) synced(head *types.Header) { func (fm *FilterMapsMatcherBackend) synced(head *types.Header) {
fm.f.lock.Lock() fm.f.indexLock.RLock()
defer fm.f.lock.Unlock() fm.f.matchersLock.Lock()
defer func() {
fm.f.matchersLock.Unlock()
fm.f.indexLock.RUnlock()
}()
fm.syncCh <- SyncRange{ fm.syncCh <- SyncRange{
Head: head, Head: head,
@ -143,9 +152,9 @@ func (fm *FilterMapsMatcherBackend) SyncLogIndex(ctx context.Context) (SyncRange
} }
// add SyncRange return channel, ensuring that // add SyncRange return channel, ensuring that
syncCh := make(chan SyncRange, 1) syncCh := make(chan SyncRange, 1)
fm.f.lock.Lock() fm.f.matchersLock.Lock()
fm.syncCh = syncCh fm.syncCh = syncCh
fm.f.lock.Unlock() fm.f.matchersLock.Unlock()
select { select {
case fm.f.matcherSyncCh <- fm: case fm.f.matcherSyncCh <- fm:
@ -167,8 +176,11 @@ func (fm *FilterMapsMatcherBackend) SyncLogIndex(ctx context.Context) (SyncRange
// valid range with the current indexed range. This function should be called // valid range with the current indexed range. This function should be called
// whenever a part of the log index has been removed, before adding new blocks // whenever a part of the log index has been removed, before adding new blocks
// to it. // to it.
// Note that this function assumes that the read lock is being held. // Note that this function assumes that the index read lock is being held.
func (f *FilterMaps) updateMatchersValidRange() { func (f *FilterMaps) updateMatchersValidRange() {
f.matchersLock.Lock()
defer f.matchersLock.Unlock()
for fm := range f.matchers { for fm := range f.matchers {
if !f.initialized { if !f.initialized {
fm.valid = false fm.valid = false

View File

@ -28,11 +28,11 @@ func TestMatcher(t *testing.T) {
ts := newTestSetup(t) ts := newTestSetup(t)
defer ts.close() defer ts.close()
ts.chain.addBlocks(1000, 10, 10, 4, true) ts.chain.addBlocks(100, 10, 10, 4, true)
ts.setHistory(0, false) ts.setHistory(0, false)
ts.fm.WaitIdle() ts.fm.WaitIdle()
for i := 0; i < 500; i++ { for i := 0; i < 5000; i++ {
bhash := ts.chain.canonical[rand.Intn(len(ts.chain.canonical))] bhash := ts.chain.canonical[rand.Intn(len(ts.chain.canonical))]
receipts := ts.chain.receipts[bhash] receipts := ts.chain.receipts[bhash]
if len(receipts) == 0 { if len(receipts) == 0 {