Merge pull request #2711 from hdiedrich/1.4.7-filter-races-cleanup
Fix #2710 Filter race: concurrent map read and map write
This commit is contained in:
commit
74ec95e7f6
|
@ -68,8 +68,6 @@ type PublicFilterAPI struct {
|
|||
|
||||
transactionMu sync.RWMutex
|
||||
transactionQueue map[int]*hashQueue
|
||||
|
||||
transactMu sync.Mutex
|
||||
}
|
||||
|
||||
// NewPublicFilterAPI returns a new PublicFilterAPI instance.
|
||||
|
@ -100,6 +98,7 @@ done:
|
|||
for {
|
||||
select {
|
||||
case <-timer.C:
|
||||
s.filterManager.Lock() // lock order like filterLoop()
|
||||
s.logMu.Lock()
|
||||
for id, filter := range s.logQueue {
|
||||
if time.Since(filter.timeout) > filterTickerTime {
|
||||
|
@ -126,6 +125,7 @@ done:
|
|||
}
|
||||
}
|
||||
s.transactionMu.Unlock()
|
||||
s.filterManager.Unlock()
|
||||
case <-s.quit:
|
||||
break done
|
||||
}
|
||||
|
@ -135,19 +135,24 @@ done:
|
|||
|
||||
// NewBlockFilter create a new filter that returns blocks that are included into the canonical chain.
|
||||
func (s *PublicFilterAPI) NewBlockFilter() (string, error) {
|
||||
// protect filterManager.Add() and setting of filter fields
|
||||
s.filterManager.Lock()
|
||||
defer s.filterManager.Unlock()
|
||||
|
||||
externalId, err := newFilterId()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
s.blockMu.Lock()
|
||||
filter := New(s.chainDb)
|
||||
id, err := s.filterManager.Add(filter, ChainFilter)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
s.blockMu.Lock()
|
||||
s.blockQueue[id] = &hashQueue{timeout: time.Now()}
|
||||
s.blockMu.Unlock()
|
||||
|
||||
filter.BlockCallback = func(block *types.Block, logs vm.Logs) {
|
||||
s.blockMu.Lock()
|
||||
|
@ -158,8 +163,6 @@ func (s *PublicFilterAPI) NewBlockFilter() (string, error) {
|
|||
}
|
||||
}
|
||||
|
||||
defer s.blockMu.Unlock()
|
||||
|
||||
s.filterMapMu.Lock()
|
||||
s.filterMapping[externalId] = id
|
||||
s.filterMapMu.Unlock()
|
||||
|
@ -169,21 +172,24 @@ func (s *PublicFilterAPI) NewBlockFilter() (string, error) {
|
|||
|
||||
// NewPendingTransactionFilter creates a filter that returns new pending transactions.
|
||||
func (s *PublicFilterAPI) NewPendingTransactionFilter() (string, error) {
|
||||
// protect filterManager.Add() and setting of filter fields
|
||||
s.filterManager.Lock()
|
||||
defer s.filterManager.Unlock()
|
||||
|
||||
externalId, err := newFilterId()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
s.transactionMu.Lock()
|
||||
defer s.transactionMu.Unlock()
|
||||
|
||||
filter := New(s.chainDb)
|
||||
id, err := s.filterManager.Add(filter, PendingTxFilter)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
s.transactionMu.Lock()
|
||||
s.transactionQueue[id] = &hashQueue{timeout: time.Now()}
|
||||
s.transactionMu.Unlock()
|
||||
|
||||
filter.TransactionCallback = func(tx *types.Transaction) {
|
||||
s.transactionMu.Lock()
|
||||
|
@ -203,8 +209,9 @@ func (s *PublicFilterAPI) NewPendingTransactionFilter() (string, error) {
|
|||
|
||||
// newLogFilter creates a new log filter.
|
||||
func (s *PublicFilterAPI) newLogFilter(earliest, latest int64, addresses []common.Address, topics [][]common.Hash, callback func(log *vm.Log, removed bool)) (int, error) {
|
||||
s.logMu.Lock()
|
||||
defer s.logMu.Unlock()
|
||||
// protect filterManager.Add() and setting of filter fields
|
||||
s.filterManager.Lock()
|
||||
defer s.filterManager.Unlock()
|
||||
|
||||
filter := New(s.chainDb)
|
||||
id, err := s.filterManager.Add(filter, LogFilter)
|
||||
|
@ -212,7 +219,9 @@ func (s *PublicFilterAPI) newLogFilter(earliest, latest int64, addresses []commo
|
|||
return 0, err
|
||||
}
|
||||
|
||||
s.logMu.Lock()
|
||||
s.logQueue[id] = &logQueue{timeout: time.Now()}
|
||||
s.logMu.Unlock()
|
||||
|
||||
filter.SetBeginBlock(earliest)
|
||||
filter.SetEndBlock(latest)
|
||||
|
@ -443,35 +452,43 @@ func (s *PublicFilterAPI) GetLogs(args NewFilterArgs) []vmlog {
|
|||
|
||||
// UninstallFilter removes the filter with the given filter id.
|
||||
func (s *PublicFilterAPI) UninstallFilter(filterId string) bool {
|
||||
s.filterMapMu.Lock()
|
||||
defer s.filterMapMu.Unlock()
|
||||
s.filterManager.Lock()
|
||||
defer s.filterManager.Unlock()
|
||||
|
||||
s.filterMapMu.Lock()
|
||||
id, ok := s.filterMapping[filterId]
|
||||
if !ok {
|
||||
s.filterMapMu.Unlock()
|
||||
return false
|
||||
}
|
||||
|
||||
defer s.filterManager.Remove(id)
|
||||
delete(s.filterMapping, filterId)
|
||||
s.filterMapMu.Unlock()
|
||||
|
||||
s.filterManager.Remove(id)
|
||||
|
||||
s.logMu.Lock()
|
||||
if _, ok := s.logQueue[id]; ok {
|
||||
s.logMu.Lock()
|
||||
defer s.logMu.Unlock()
|
||||
delete(s.logQueue, id)
|
||||
s.logMu.Unlock()
|
||||
return true
|
||||
}
|
||||
s.logMu.Unlock()
|
||||
|
||||
s.blockMu.Lock()
|
||||
if _, ok := s.blockQueue[id]; ok {
|
||||
s.blockMu.Lock()
|
||||
defer s.blockMu.Unlock()
|
||||
delete(s.blockQueue, id)
|
||||
s.blockMu.Unlock()
|
||||
return true
|
||||
}
|
||||
s.blockMu.Unlock()
|
||||
|
||||
s.transactionMu.Lock()
|
||||
if _, ok := s.transactionQueue[id]; ok {
|
||||
s.transactionMu.Lock()
|
||||
defer s.transactionMu.Unlock()
|
||||
delete(s.transactionQueue, id)
|
||||
s.transactionMu.Unlock()
|
||||
return true
|
||||
}
|
||||
s.transactionMu.Unlock()
|
||||
|
||||
return false
|
||||
}
|
||||
|
@ -525,7 +542,9 @@ func (s *PublicFilterAPI) logFilterChanged(id int) []vmlog {
|
|||
|
||||
// GetFilterLogs returns the logs for the filter with the given id.
|
||||
func (s *PublicFilterAPI) GetFilterLogs(filterId string) []vmlog {
|
||||
s.filterMapMu.RLock()
|
||||
id, ok := s.filterMapping[filterId]
|
||||
s.filterMapMu.RUnlock()
|
||||
if !ok {
|
||||
return toRPCLogs(nil, false)
|
||||
}
|
||||
|
@ -540,9 +559,9 @@ func (s *PublicFilterAPI) GetFilterLogs(filterId string) []vmlog {
|
|||
// GetFilterChanges returns the logs for the filter with the given id since last time is was called.
|
||||
// This can be used for polling.
|
||||
func (s *PublicFilterAPI) GetFilterChanges(filterId string) interface{} {
|
||||
s.filterMapMu.Lock()
|
||||
s.filterMapMu.RLock()
|
||||
id, ok := s.filterMapping[filterId]
|
||||
s.filterMapMu.Unlock()
|
||||
s.filterMapMu.RUnlock()
|
||||
|
||||
if !ok { // filter not found
|
||||
return []interface{}{}
|
||||
|
|
|
@ -82,11 +82,20 @@ func (fs *FilterSystem) Stop() {
|
|||
fs.sub.Unsubscribe()
|
||||
}
|
||||
|
||||
// Add adds a filter to the filter manager
|
||||
func (fs *FilterSystem) Add(filter *Filter, filterType FilterType) (int, error) {
|
||||
// Acquire filter system maps lock, required to force lock acquisition
|
||||
// sequence with filterMu acquired first to avoid deadlocks by callbacks
|
||||
func (fs *FilterSystem) Lock() {
|
||||
fs.filterMu.Lock()
|
||||
defer fs.filterMu.Unlock()
|
||||
}
|
||||
|
||||
// Release filter system maps lock
|
||||
func (fs *FilterSystem) Unlock() {
|
||||
fs.filterMu.Unlock()
|
||||
}
|
||||
|
||||
// Add adds a filter to the filter manager
|
||||
// Expects filterMu to be locked.
|
||||
func (fs *FilterSystem) Add(filter *Filter, filterType FilterType) (int, error) {
|
||||
id := fs.filterId
|
||||
filter.created = time.Now()
|
||||
|
||||
|
@ -110,10 +119,8 @@ func (fs *FilterSystem) Add(filter *Filter, filterType FilterType) (int, error)
|
|||
}
|
||||
|
||||
// Remove removes a filter by filter id
|
||||
// Expects filterMu to be locked.
|
||||
func (fs *FilterSystem) Remove(id int) {
|
||||
fs.filterMu.Lock()
|
||||
defer fs.filterMu.Unlock()
|
||||
|
||||
delete(fs.chainFilters, id)
|
||||
delete(fs.pendingTxFilters, id)
|
||||
delete(fs.logFilters, id)
|
||||
|
|
Loading…
Reference in New Issue