From 5f3903c8699b4f7528671ad9fa3285c0a4178795 Mon Sep 17 00:00:00 2001 From: Zsolt Felfoldi Date: Sun, 6 Oct 2024 18:33:22 +0200 Subject: [PATCH] core/filtermaps: simplified locking scheme --- core/filtermaps/filtermaps.go | 66 +++++++++++++-------- core/filtermaps/indexer.go | 94 ++++++++++++------------------ core/filtermaps/indexer_test.go | 21 ++++--- core/filtermaps/matcher_backend.go | 42 ++++++++----- core/filtermaps/matcher_test.go | 4 +- 5 files changed, 117 insertions(+), 110 deletions(-) diff --git a/core/filtermaps/filtermaps.go b/core/filtermaps/filtermaps.go index ed6a00c965..908ee250c8 100644 --- a/core/filtermaps/filtermaps.go +++ b/core/filtermaps/filtermaps.go @@ -57,24 +57,32 @@ type FilterMaps struct { chain blockchain matcherSyncCh chan *FilterMapsMatcherBackend - // db and range are only modified by indexer under write lock; indexer can - // read them without a lock while matchers can access them under read lock - lock sync.RWMutex - db ethdb.KeyValueStore + db ethdb.KeyValueStore + + // fields written by the indexer and read by matcher backend. Indexer can + // read them without a lock and write them under indexLock write lock. + // Matcher backend can read them under indexLock read lock. + indexLock sync.RWMutex filterMapsRange - - matchers map[*FilterMapsMatcherBackend]struct{} - // filterMapCache caches certain filter maps (headCacheSize most recent maps // and one tail map) that are expected to be frequently accessed and modified // while updating the structure. Note that the set of cached maps depends // only on filterMapsRange and rows of other maps are not cached here. - filterMapLock sync.Mutex filterMapCache map[uint32]filterMap + + // also accessed by indexer and matcher backend but no locking needed. blockPtrCache *lru.Cache[uint32, uint64] lvPointerCache *lru.Cache[uint64, uint64] - revertPoints map[uint64]*revertPoint + // the matchers set and the fields of FilterMapsMatcherBackend instances are + // read and written both by exported functions and the indexer. + // Note that if both indexLock and matchersLock needs to be locked then + // indexLock should be locked first. + matchersLock sync.Mutex + matchers map[*FilterMapsMatcherBackend]struct{} + + // fields only accessed by the indexer (no mutex required). + revertPoints map[uint64]*revertPoint startHeadUpdate, loggedHeadUpdate, loggedTailExtend, loggedTailUnindex bool startedHeadUpdate, startedTailExtend, startedTailUnindex time.Time lastLogHeadUpdate, lastLogTailExtend, lastLogTailUnindex time.Time @@ -177,16 +185,16 @@ func (f *FilterMaps) Stop() { // reset un-initializes the FilterMaps structure and removes all related data from // the database. The function returns true if everything was successfully removed. func (f *FilterMaps) reset() bool { - f.lock.Lock() + f.indexLock.Lock() f.filterMapsRange = filterMapsRange{} f.filterMapCache = make(map[uint32]filterMap) f.revertPoints = make(map[uint64]*revertPoint) f.blockPtrCache.Purge() f.lvPointerCache.Purge() + f.indexLock.Unlock() // deleting the range first ensures that resetDb will be called again at next // startup and any leftover data will be removed even if it cannot finish now. rawdb.DeleteFilterMapsRange(f.db) - f.lock.Unlock() return f.removeDbWithPrefix(rawdb.FilterMapsPrefix, "Resetting log index database") } @@ -240,7 +248,7 @@ func (f *FilterMaps) removeDbWithPrefix(prefix []byte, action string) bool { } // setRange updates the covered range and also adds the changes to the given batch. -// Note that this function assumes that the read/write lock is being held. +// Note that this function assumes that the index write lock is being held. func (f *FilterMaps) setRange(batch ethdb.KeyValueWriter, newRange filterMapsRange) { f.filterMapsRange = newRange rs := rawdb.FilterMapsRange{ @@ -259,14 +267,11 @@ func (f *FilterMaps) setRange(batch ethdb.KeyValueWriter, newRange filterMapsRan // updateMapCache updates the maps covered by the filterMapCache according to the // covered range. -// Note that this function assumes that the read lock is being held. +// Note that this function assumes that the index write lock is being held. func (f *FilterMaps) updateMapCache() { if !f.initialized { return } - f.filterMapLock.Lock() - defer f.filterMapLock.Unlock() - newFilterMapCache := make(map[uint32]filterMap) firstMap, afterLastMap := uint32(f.tailBlockLvPointer>>f.logValuesPerMap), uint32((f.headLvPointer+f.valuesPerMap-1)>>f.logValuesPerMap) headCacheFirst := firstMap + 1 @@ -294,7 +299,8 @@ func (f *FilterMaps) updateMapCache() { // Note that this function assumes that the log index structure is consistent // with the canonical chain at the point where the given log value index points. // If this is not the case then an invalid result or an error may be returned. -// Note that this function assumes that the read lock is being held. +// Note that this function assumes that the indexer read lock is being held when +// called from outside the updateLoop goroutine. func (f *FilterMaps) getLogByLvIndex(lvIndex uint64) (*types.Log, error) { if lvIndex < f.tailBlockLvPointer || lvIndex > f.headLvPointer { return nil, nil @@ -361,10 +367,9 @@ func (f *FilterMaps) getLogByLvIndex(lvIndex uint64) (*types.Log, error) { // then a non-nil zero length row is returned. // Note that the returned slices should not be modified, they should be copied // on write. +// Note that the function assumes that the indexLock is not being held (should +// only be called from the updateLoop goroutine). func (f *FilterMaps) getFilterMapRow(mapIndex, rowIndex uint32) (FilterRow, error) { - f.filterMapLock.Lock() - defer f.filterMapLock.Unlock() - fm := f.filterMapCache[mapIndex] if fm != nil && fm[rowIndex] != nil { return fm[rowIndex], nil @@ -374,19 +379,31 @@ func (f *FilterMaps) getFilterMapRow(mapIndex, rowIndex uint32) (FilterRow, erro return nil, err } if fm != nil { + f.indexLock.Lock() fm[rowIndex] = FilterRow(row) + f.indexLock.Unlock() } return FilterRow(row), nil } +// getFilterMapRowUncached returns the given row of the given map. If the row is +// empty then a non-nil zero length row is returned. +// This function bypasses the memory cache which is mostly useful for processing +// the head and tail maps during the indexing process and should be used by the +// matcher backend which rarely accesses the same row twice and therefore does +// not really benefit from caching anyways. +// The function is unaffected by the indexLock mutex. +func (f *FilterMaps) getFilterMapRowUncached(mapIndex, rowIndex uint32) (FilterRow, error) { + row, err := rawdb.ReadFilterMapRow(f.db, f.mapRowIndex(mapIndex, rowIndex)) + return FilterRow(row), err +} + // storeFilterMapRow stores a row at the given row index of the given map and also // caches it in filterMapCache if the given map is cached. // Note that empty rows are not stored in the database and therefore there is no // separate delete function; deleting a row is the same as storing an empty row. +// Note that this function assumes that the indexer write lock is being held. func (f *FilterMaps) storeFilterMapRow(batch ethdb.Batch, mapIndex, rowIndex uint32, row FilterRow) { - f.filterMapLock.Lock() - defer f.filterMapLock.Unlock() - if fm := f.filterMapCache[mapIndex]; fm != nil { fm[rowIndex] = row } @@ -407,7 +424,8 @@ func (f *FilterMaps) mapRowIndex(mapIndex, rowIndex uint32) uint64 { // getBlockLvPointer returns the starting log value index where the log values // generated by the given block are located. If blockNumber is beyond the current // head then the first unoccupied log value index is returned. -// Note that this function assumes that the read lock is being held. +// Note that this function assumes that the indexer read lock is being held when +// called from outside the updateLoop goroutine. func (f *FilterMaps) getBlockLvPointer(blockNumber uint64) (uint64, error) { if blockNumber > f.headBlockNumber { return f.headLvPointer, nil diff --git a/core/filtermaps/indexer.go b/core/filtermaps/indexer.go index 1703c858fe..9bf705cf83 100644 --- a/core/filtermaps/indexer.go +++ b/core/filtermaps/indexer.go @@ -46,9 +46,9 @@ func (f *FilterMaps) updateLoop() { return } - f.lock.Lock() + f.indexLock.Lock() f.updateMapCache() - f.lock.Unlock() + f.indexLock.Unlock() if rp, err := f.newUpdateBatch().makeRevertPoint(); err == nil { f.revertPoints[rp.blockNumber] = rp } else { @@ -61,11 +61,10 @@ func (f *FilterMaps) updateLoop() { head = f.chain.CurrentBlock() stop bool syncMatcher *FilterMapsMatcherBackend - fmr = f.getRange() ) matcherSync := func() { - if syncMatcher != nil && fmr.initialized && fmr.headBlockHash == head.Hash() { + if syncMatcher != nil && f.initialized && f.headBlockHash == head.Hash() { syncMatcher.synced(head) syncMatcher = nil } @@ -92,7 +91,7 @@ func (f *FilterMaps) updateLoop() { stop = true case ch := <-f.waitIdleCh: head = f.chain.CurrentBlock() - if head.Hash() == f.getRange().headBlockHash { + if head.Hash() == f.headBlockHash { ch <- true continue loop } @@ -110,27 +109,24 @@ func (f *FilterMaps) updateLoop() { return } } - fmr = f.getRange() for !stop { - if !fmr.initialized { + if !f.initialized { if !f.tryInit(head) { return } - fmr = f.getRange() - if !fmr.initialized { + if !f.initialized { wait() continue } } // log index is initialized - if fmr.headBlockHash != head.Hash() { + if f.headBlockHash != head.Hash() { if !f.tryUpdateHead(head) { return } - fmr = f.getRange() - if fmr.headBlockHash != head.Hash() { + if f.headBlockHash != head.Hash() { wait() continue } @@ -151,8 +147,8 @@ func (f *FilterMaps) updateLoop() { head = f.chain.CurrentBlock() } // stop if there is a new chain head (always prioritize head updates) - return fmr.headBlockHash != head.Hash() - }) && fmr.headBlockHash == head.Hash() { + return f.headBlockHash != head.Hash() || syncMatcher != nil + }) && f.headBlockHash == head.Hash() { // if tail processing reached its final state and there is no new // head then wait for more events wait() @@ -176,14 +172,6 @@ func (f *FilterMaps) WaitIdle() { } } -// getRange returns the current filterMapsRange. -func (f *FilterMaps) getRange() filterMapsRange { - f.lock.RLock() - defer f.lock.RUnlock() - - return f.filterMapsRange -} - // tryInit attempts to initialize the log index structure. // Returns false if indexer was stopped during a database reset. In this case the // indexer should exit and remaining parts of the old database will be removed @@ -215,17 +203,16 @@ func (f *FilterMaps) tryInit(head *types.Header) bool { // at next startup. func (f *FilterMaps) tryUpdateHead(newHead *types.Header) bool { defer func() { - fmr := f.getRange() - if newHead.Hash() == fmr.headBlockHash { + if newHead.Hash() == f.headBlockHash { if f.loggedHeadUpdate { - log.Info("Forward log indexing finished", "processed", fmr.headBlockNumber-f.ptrHeadUpdate, + log.Info("Forward log indexing finished", "processed", f.headBlockNumber-f.ptrHeadUpdate, "elapsed", common.PrettyDuration(time.Since(f.lastLogHeadUpdate))) f.loggedHeadUpdate, f.startHeadUpdate = false, false } } else { if time.Since(f.lastLogHeadUpdate) > logFrequency || !f.loggedHeadUpdate { - log.Info("Forward log indexing in progress", "processed", fmr.headBlockNumber-f.ptrHeadUpdate, - "remaining", newHead.Number.Uint64()-fmr.headBlockNumber, + log.Info("Forward log indexing in progress", "processed", f.headBlockNumber-f.ptrHeadUpdate, + "remaining", newHead.Number.Uint64()-f.headBlockNumber, "elapsed", common.PrettyDuration(time.Since(f.startedHeadUpdate))) f.loggedHeadUpdate = true f.lastLogHeadUpdate = time.Now() @@ -238,7 +225,7 @@ func (f *FilterMaps) tryUpdateHead(newHead *types.Header) bool { f.lastLogHeadUpdate = time.Now() f.startedHeadUpdate = f.lastLogHeadUpdate f.startHeadUpdate = true - f.ptrHeadUpdate = f.getRange().headBlockNumber + f.ptrHeadUpdate = f.headBlockNumber } // iterate back from new head until the log index head or a revert point and // collect headers of blocks to be added @@ -321,7 +308,7 @@ func (f *FilterMaps) tryUpdateTail(head *types.Header, stopFn func() bool) bool tailTarget = headNum + 1 - f.history } } - tailNum := f.getRange().tailBlockNumber + tailNum := f.tailBlockNumber if tailNum > tailTarget { if !f.tryExtendTail(tailTarget, stopFn) { return false @@ -337,23 +324,21 @@ func (f *FilterMaps) tryUpdateTail(head *types.Header, stopFn func() bool) bool // indexed history length is achieved. Returns true if finished. func (f *FilterMaps) tryExtendTail(tailTarget uint64, stopFn func() bool) bool { defer func() { - fmr := f.getRange() - if fmr.tailBlockNumber <= tailTarget { + if f.tailBlockNumber <= tailTarget { if f.loggedTailExtend { - log.Info("Reverse log indexing finished", "history", fmr.headBlockNumber+1-fmr.tailBlockNumber, - "processed", f.ptrTailExtend-fmr.tailBlockNumber, "elapsed", common.PrettyDuration(time.Since(f.lastLogTailExtend))) + log.Info("Reverse log indexing finished", "history", f.headBlockNumber+1-f.tailBlockNumber, + "processed", f.ptrTailExtend-f.tailBlockNumber, "elapsed", common.PrettyDuration(time.Since(f.lastLogTailExtend))) f.loggedTailExtend = false } } }() - fmr := f.getRange() - number, parentHash := fmr.tailBlockNumber, fmr.tailParentHash + number, parentHash := f.tailBlockNumber, f.tailParentHash if !f.loggedTailExtend { f.lastLogTailExtend = time.Now() f.startedTailExtend = f.lastLogTailExtend - f.ptrTailExtend = fmr.tailBlockNumber + f.ptrTailExtend = f.tailBlockNumber } update := f.newUpdateBatch() @@ -400,20 +385,18 @@ func (f *FilterMaps) tryUnindexTail(tailTarget uint64, stopFn func() bool) bool if !f.loggedTailUnindex { f.lastLogTailUnindex = time.Now() f.startedTailUnindex = f.lastLogTailUnindex - f.ptrTailUnindex = f.getRange().tailBlockNumber + f.ptrTailUnindex = f.tailBlockNumber } for { if f.unindexTailEpoch(tailTarget) { - fmr := f.getRange() - log.Info("Log unindexing finished", "history", fmr.headBlockNumber+1-fmr.tailBlockNumber, - "removed", fmr.tailBlockNumber-f.ptrTailUnindex, "elapsed", common.PrettyDuration(time.Since(f.lastLogTailUnindex))) + log.Info("Log unindexing finished", "history", f.headBlockNumber+1-f.tailBlockNumber, + "removed", f.tailBlockNumber-f.ptrTailUnindex, "elapsed", common.PrettyDuration(time.Since(f.lastLogTailUnindex))) f.loggedTailUnindex = false return true } if time.Since(f.lastLogTailUnindex) > logFrequency || !f.loggedTailUnindex { - fmr := f.getRange() - log.Info("Log unindexing in progress", "history", fmr.headBlockNumber+1-fmr.tailBlockNumber, - "removed", fmr.tailBlockNumber-f.ptrTailUnindex, "remaining", tailTarget-fmr.tailBlockNumber, + log.Info("Log unindexing in progress", "history", f.headBlockNumber+1-f.tailBlockNumber, + "removed", f.tailBlockNumber-f.ptrTailUnindex, "remaining", tailTarget-f.tailBlockNumber, "elapsed", common.PrettyDuration(time.Since(f.startedTailUnindex))) f.loggedTailUnindex = true f.lastLogTailUnindex = time.Now() @@ -427,11 +410,9 @@ func (f *FilterMaps) tryUnindexTail(tailTarget uint64, stopFn func() bool) bool // unindexTailEpoch unindexes at most an epoch of tail log index data until the // desired tail target is reached. func (f *FilterMaps) unindexTailEpoch(tailTarget uint64) (finished bool) { - f.lock.Lock() oldRange := f.filterMapsRange newTailMap, changed := f.unindexTailPtr(tailTarget) newRange := f.filterMapsRange - f.lock.Unlock() if !changed { return true // nothing more to do @@ -441,6 +422,7 @@ func (f *FilterMaps) unindexTailEpoch(tailTarget uint64) (finished bool) { oldTailMap := uint32(oldRange.tailLvPointer >> f.logValuesPerMap) // remove map data [oldTailMap, newTailMap) and block data // [oldRange.tailBlockNumber, newRange.tailBlockNumber) + f.indexLock.Lock() batch := f.db.NewBatch() for blockNumber := oldRange.tailBlockNumber; blockNumber < newRange.tailBlockNumber; blockNumber++ { f.deleteBlockLvPointer(batch, blockNumber) @@ -459,14 +441,15 @@ func (f *FilterMaps) unindexTailEpoch(tailTarget uint64) (finished bool) { newRange.tailLvPointer = uint64(newTailMap) << f.logValuesPerMap if newRange.tailLvPointer > newRange.tailBlockLvPointer { log.Error("Cannot unindex filter maps beyond tail block log value pointer", "tailLvPointer", newRange.tailLvPointer, "tailBlockLvPointer", newRange.tailBlockLvPointer) + f.indexLock.Unlock() return } - f.lock.Lock() f.setRange(batch, newRange) + f.indexLock.Unlock() + if err := batch.Write(); err != nil { log.Crit("Could not write update batch", "error", err) } - f.lock.Unlock() return } @@ -539,9 +522,6 @@ type updateBatch struct { // newUpdateBatch creates a new updateBatch. func (f *FilterMaps) newUpdateBatch() *updateBatch { - f.lock.RLock() - defer f.lock.RUnlock() - return &updateBatch{ f: f, filterMapsRange: f.filterMapsRange, @@ -555,8 +535,7 @@ func (f *FilterMaps) newUpdateBatch() *updateBatch { // applyUpdateBatch writes creates a batch and writes all changes to the database // and also updates the in-memory representations of log index data. func (f *FilterMaps) applyUpdateBatch(u *updateBatch) { - f.lock.Lock() - defer f.lock.Unlock() + f.indexLock.Lock() batch := f.db.NewBatch() // write or remove block to log value index pointers @@ -615,6 +594,8 @@ func (f *FilterMaps) applyUpdateBatch(u *updateBatch) { } // update filterMapsRange f.setRange(batch, u.filterMapsRange) + f.indexLock.Unlock() + if err := batch.Write(); err != nil { log.Crit("Could not write update batch", "error", err) } @@ -849,9 +830,6 @@ func (u *updateBatch) makeRevertPoint() (*revertPoint, error) { // number from memory cache or from the database if available. If no such revert // point is available then it returns no result and no error. func (f *FilterMaps) getRevertPoint(blockNumber uint64) (*revertPoint, error) { - f.lock.RLock() - defer f.lock.RUnlock() - if blockNumber > f.headBlockNumber { blockNumber = f.headBlockNumber } @@ -879,9 +857,6 @@ func (f *FilterMaps) getRevertPoint(blockNumber uint64) (*revertPoint, error) { // revertTo reverts the log index to the given revert point. func (f *FilterMaps) revertTo(rp *revertPoint) error { - f.lock.Lock() - defer f.lock.Unlock() - batch := f.db.NewBatch() afterLastMap := uint32((f.headLvPointer + f.valuesPerMap - 1) >> f.logValuesPerMap) if rp.mapIndex > afterLastMap { @@ -918,7 +893,10 @@ func (f *FilterMaps) revertTo(rp *revertPoint) error { newRange.headLvPointer = lvPointer newRange.headBlockNumber = rp.blockNumber newRange.headBlockHash = rp.blockHash + f.indexLock.Lock() f.setRange(batch, newRange) + f.indexLock.Unlock() + if err := batch.Write(); err != nil { log.Crit("Could not write update batch", "error", err) } diff --git a/core/filtermaps/indexer_test.go b/core/filtermaps/indexer_test.go index be65aa5cf8..a20b062e40 100644 --- a/core/filtermaps/indexer_test.go +++ b/core/filtermaps/indexer_test.go @@ -79,14 +79,13 @@ func TestIndexerRandomRange(t *testing.T) { ts.chain.setCanonicalChain(forks[fork][:head+1]) } ts.fm.WaitIdle() - fmr := ts.fm.getRange() if noHistory { - if fmr.initialized { + if ts.fm.initialized { t.Fatalf("filterMapsRange initialized while indexing is disabled") } continue } - if !fmr.initialized { + if !ts.fm.initialized { t.Fatalf("filterMapsRange not initialized while indexing is enabled") } var ( @@ -99,21 +98,21 @@ func TestIndexerRandomRange(t *testing.T) { if tail > 0 { tpHash = forks[fork][tail-1] } - if fmr.headBlockNumber != uint64(head) || fmr.headBlockHash != forks[fork][head] { - ts.t.Fatalf("Invalid index head (expected #%d %v, got #%d %v)", head, forks[fork][head], fmr.headBlockNumber, fmr.headBlockHash) + if ts.fm.headBlockNumber != uint64(head) || ts.fm.headBlockHash != forks[fork][head] { + ts.t.Fatalf("Invalid index head (expected #%d %v, got #%d %v)", head, forks[fork][head], ts.fm.headBlockNumber, ts.fm.headBlockHash) } - if fmr.tailBlockNumber != uint64(tail) || fmr.tailParentHash != tpHash { - ts.t.Fatalf("Invalid index head (expected #%d %v, got #%d %v)", tail, tpHash, fmr.tailBlockNumber, fmr.tailParentHash) + if ts.fm.tailBlockNumber != uint64(tail) || ts.fm.tailParentHash != tpHash { + ts.t.Fatalf("Invalid index head (expected #%d %v, got #%d %v)", tail, tpHash, ts.fm.tailBlockNumber, ts.fm.tailParentHash) } expLvCount := uint64(head+1-tail) * 50 if tail == 0 { expLvCount -= 50 // no logs in genesis block } - if fmr.headLvPointer-fmr.tailBlockLvPointer != expLvCount { - ts.t.Fatalf("Invalid number of log values (expected %d, got %d)", expLvCount, fmr.headLvPointer-fmr.tailBlockLvPointer) + if ts.fm.headLvPointer-ts.fm.tailBlockLvPointer != expLvCount { + ts.t.Fatalf("Invalid number of log values (expected %d, got %d)", expLvCount, ts.fm.headLvPointer-ts.fm.tailBlockLvPointer) } - if fmr.tailBlockLvPointer-fmr.tailLvPointer >= ts.params.valuesPerMap { - ts.t.Fatalf("Invalid number of leftover tail log values (expected < %d, got %d)", ts.params.valuesPerMap, fmr.tailBlockLvPointer-fmr.tailLvPointer) + if ts.fm.tailBlockLvPointer-ts.fm.tailLvPointer >= ts.params.valuesPerMap { + ts.t.Fatalf("Invalid number of leftover tail log values (expected < %d, got %d)", ts.params.valuesPerMap, ts.fm.tailBlockLvPointer-ts.fm.tailLvPointer) } } } diff --git a/core/filtermaps/matcher_backend.go b/core/filtermaps/matcher_backend.go index 37a51eac48..f5acfa0d3e 100644 --- a/core/filtermaps/matcher_backend.go +++ b/core/filtermaps/matcher_backend.go @@ -25,7 +25,8 @@ import ( // FilterMapsMatcherBackend implements MatcherBackend. type FilterMapsMatcherBackend struct { - f *FilterMaps + f *FilterMaps + // these fields should be accessed under f.matchersLock mutex. valid bool firstValid, lastValid uint64 syncCh chan SyncRange @@ -35,8 +36,12 @@ type FilterMapsMatcherBackend struct { // the active matcher set. // Note that Close should always be called when the matcher is no longer used. func (f *FilterMaps) NewMatcherBackend() *FilterMapsMatcherBackend { - f.lock.Lock() - defer f.lock.Unlock() + f.indexLock.RLock() + f.matchersLock.Lock() + defer func() { + f.matchersLock.Unlock() + f.indexLock.RUnlock() + }() fm := &FilterMapsMatcherBackend{ f: f, @@ -58,8 +63,8 @@ func (fm *FilterMapsMatcherBackend) GetParams() *Params { // any SyncLogIndex calls are cancelled. // Close implements MatcherBackend. func (fm *FilterMapsMatcherBackend) Close() { - fm.f.lock.Lock() - defer fm.f.lock.Unlock() + fm.f.matchersLock.Lock() + defer fm.f.matchersLock.Unlock() delete(fm.f.matchers, fm) } @@ -70,7 +75,7 @@ func (fm *FilterMapsMatcherBackend) Close() { // on write. // GetFilterMapRow implements MatcherBackend. func (fm *FilterMapsMatcherBackend) GetFilterMapRow(ctx context.Context, mapIndex, rowIndex uint32) (FilterRow, error) { - return fm.f.getFilterMapRow(mapIndex, rowIndex) + return fm.f.getFilterMapRowUncached(mapIndex, rowIndex) } // GetBlockLvPointer returns the starting log value index where the log values @@ -78,8 +83,8 @@ func (fm *FilterMapsMatcherBackend) GetFilterMapRow(ctx context.Context, mapInde // head then the first unoccupied log value index is returned. // GetBlockLvPointer implements MatcherBackend. func (fm *FilterMapsMatcherBackend) GetBlockLvPointer(ctx context.Context, blockNumber uint64) (uint64, error) { - fm.f.lock.RLock() - defer fm.f.lock.RUnlock() + fm.f.indexLock.RLock() + defer fm.f.indexLock.RUnlock() return fm.f.getBlockLvPointer(blockNumber) } @@ -94,8 +99,8 @@ func (fm *FilterMapsMatcherBackend) GetBlockLvPointer(ctx context.Context, block // using SyncLogIndex and re-process certain blocks if necessary. // GetLogByLvIndex implements MatcherBackend. func (fm *FilterMapsMatcherBackend) GetLogByLvIndex(ctx context.Context, lvIndex uint64) (*types.Log, error) { - fm.f.lock.RLock() - defer fm.f.lock.RUnlock() + fm.f.indexLock.RLock() + defer fm.f.indexLock.RUnlock() return fm.f.getLogByLvIndex(lvIndex) } @@ -108,8 +113,12 @@ func (fm *FilterMapsMatcherBackend) GetLogByLvIndex(ctx context.Context, lvIndex // should be passed as a parameter and the existing log index should be consistent // with that chain. func (fm *FilterMapsMatcherBackend) synced(head *types.Header) { - fm.f.lock.Lock() - defer fm.f.lock.Unlock() + fm.f.indexLock.RLock() + fm.f.matchersLock.Lock() + defer func() { + fm.f.matchersLock.Unlock() + fm.f.indexLock.RUnlock() + }() fm.syncCh <- SyncRange{ Head: head, @@ -143,9 +152,9 @@ func (fm *FilterMapsMatcherBackend) SyncLogIndex(ctx context.Context) (SyncRange } // add SyncRange return channel, ensuring that syncCh := make(chan SyncRange, 1) - fm.f.lock.Lock() + fm.f.matchersLock.Lock() fm.syncCh = syncCh - fm.f.lock.Unlock() + fm.f.matchersLock.Unlock() select { case fm.f.matcherSyncCh <- fm: @@ -167,8 +176,11 @@ func (fm *FilterMapsMatcherBackend) SyncLogIndex(ctx context.Context) (SyncRange // valid range with the current indexed range. This function should be called // whenever a part of the log index has been removed, before adding new blocks // to it. -// Note that this function assumes that the read lock is being held. +// Note that this function assumes that the index read lock is being held. func (f *FilterMaps) updateMatchersValidRange() { + f.matchersLock.Lock() + defer f.matchersLock.Unlock() + for fm := range f.matchers { if !f.initialized { fm.valid = false diff --git a/core/filtermaps/matcher_test.go b/core/filtermaps/matcher_test.go index 21265d5f0e..7754057d4b 100644 --- a/core/filtermaps/matcher_test.go +++ b/core/filtermaps/matcher_test.go @@ -28,11 +28,11 @@ func TestMatcher(t *testing.T) { ts := newTestSetup(t) defer ts.close() - ts.chain.addBlocks(1000, 10, 10, 4, true) + ts.chain.addBlocks(100, 10, 10, 4, true) ts.setHistory(0, false) ts.fm.WaitIdle() - for i := 0; i < 500; i++ { + for i := 0; i < 5000; i++ { bhash := ts.chain.canonical[rand.Intn(len(ts.chain.canonical))] receipts := ts.chain.receipts[bhash] if len(receipts) == 0 {