eth/protocols/snap: fix problems due to idle-but-busy peers

This commit is contained in:
Martin Holst Swende 2022-08-31 17:58:18 +02:00
parent 3d68bb03c3
commit dafa40e7a7
No known key found for this signature in database
GPG Key ID: 683B438C05A5DDF0
1 changed files with 55 additions and 35 deletions

View File

@ -2248,7 +2248,9 @@ func (s *Syncer) OnAccounts(peer SyncPeer, id uint64, hashes []common.Hash, acco
// Whether or not the response is valid, we can mark the peer as idle and // Whether or not the response is valid, we can mark the peer as idle and
// notify the scheduler to assign a new task. If the response is invalid, // notify the scheduler to assign a new task. If the response is invalid,
// we'll drop the peer in a bit. // we'll drop the peer in a bit.
defer func() {
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock()
if _, ok := s.peers[peer.ID()]; ok { if _, ok := s.peers[peer.ID()]; ok {
s.accountIdlers[peer.ID()] = struct{}{} s.accountIdlers[peer.ID()] = struct{}{}
} }
@ -2256,6 +2258,8 @@ func (s *Syncer) OnAccounts(peer SyncPeer, id uint64, hashes []common.Hash, acco
case s.update <- struct{}{}: case s.update <- struct{}{}:
default: default:
} }
}()
s.lock.Lock()
// Ensure the response is for a valid request // Ensure the response is for a valid request
req, ok := s.accountReqs[id] req, ok := s.accountReqs[id]
if !ok { if !ok {
@ -2360,7 +2364,9 @@ func (s *Syncer) onByteCodes(peer SyncPeer, id uint64, bytecodes [][]byte) error
// Whether or not the response is valid, we can mark the peer as idle and // Whether or not the response is valid, we can mark the peer as idle and
// notify the scheduler to assign a new task. If the response is invalid, // notify the scheduler to assign a new task. If the response is invalid,
// we'll drop the peer in a bit. // we'll drop the peer in a bit.
defer func() {
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock()
if _, ok := s.peers[peer.ID()]; ok { if _, ok := s.peers[peer.ID()]; ok {
s.bytecodeIdlers[peer.ID()] = struct{}{} s.bytecodeIdlers[peer.ID()] = struct{}{}
} }
@ -2368,6 +2374,8 @@ func (s *Syncer) onByteCodes(peer SyncPeer, id uint64, bytecodes [][]byte) error
case s.update <- struct{}{}: case s.update <- struct{}{}:
default: default:
} }
}()
s.lock.Lock()
// Ensure the response is for a valid request // Ensure the response is for a valid request
req, ok := s.bytecodeReqs[id] req, ok := s.bytecodeReqs[id]
if !ok { if !ok {
@ -2469,7 +2477,9 @@ func (s *Syncer) OnStorage(peer SyncPeer, id uint64, hashes [][]common.Hash, slo
// Whether or not the response is valid, we can mark the peer as idle and // Whether or not the response is valid, we can mark the peer as idle and
// notify the scheduler to assign a new task. If the response is invalid, // notify the scheduler to assign a new task. If the response is invalid,
// we'll drop the peer in a bit. // we'll drop the peer in a bit.
defer func() {
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock()
if _, ok := s.peers[peer.ID()]; ok { if _, ok := s.peers[peer.ID()]; ok {
s.storageIdlers[peer.ID()] = struct{}{} s.storageIdlers[peer.ID()] = struct{}{}
} }
@ -2477,6 +2487,8 @@ func (s *Syncer) OnStorage(peer SyncPeer, id uint64, hashes [][]common.Hash, slo
case s.update <- struct{}{}: case s.update <- struct{}{}:
default: default:
} }
}()
s.lock.Lock()
// Ensure the response is for a valid request // Ensure the response is for a valid request
req, ok := s.storageReqs[id] req, ok := s.storageReqs[id]
if !ok { if !ok {
@ -2596,7 +2608,9 @@ func (s *Syncer) OnTrieNodes(peer SyncPeer, id uint64, trienodes [][]byte) error
// Whether or not the response is valid, we can mark the peer as idle and // Whether or not the response is valid, we can mark the peer as idle and
// notify the scheduler to assign a new task. If the response is invalid, // notify the scheduler to assign a new task. If the response is invalid,
// we'll drop the peer in a bit. // we'll drop the peer in a bit.
defer func() {
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock()
if _, ok := s.peers[peer.ID()]; ok { if _, ok := s.peers[peer.ID()]; ok {
s.trienodeHealIdlers[peer.ID()] = struct{}{} s.trienodeHealIdlers[peer.ID()] = struct{}{}
} }
@ -2604,6 +2618,8 @@ func (s *Syncer) OnTrieNodes(peer SyncPeer, id uint64, trienodes [][]byte) error
case s.update <- struct{}{}: case s.update <- struct{}{}:
default: default:
} }
}()
s.lock.Lock()
// Ensure the response is for a valid request // Ensure the response is for a valid request
req, ok := s.trienodeHealReqs[id] req, ok := s.trienodeHealReqs[id]
if !ok { if !ok {
@ -2691,7 +2707,9 @@ func (s *Syncer) onHealByteCodes(peer SyncPeer, id uint64, bytecodes [][]byte) e
// Whether or not the response is valid, we can mark the peer as idle and // Whether or not the response is valid, we can mark the peer as idle and
// notify the scheduler to assign a new task. If the response is invalid, // notify the scheduler to assign a new task. If the response is invalid,
// we'll drop the peer in a bit. // we'll drop the peer in a bit.
defer func() {
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock()
if _, ok := s.peers[peer.ID()]; ok { if _, ok := s.peers[peer.ID()]; ok {
s.bytecodeHealIdlers[peer.ID()] = struct{}{} s.bytecodeHealIdlers[peer.ID()] = struct{}{}
} }
@ -2699,6 +2717,8 @@ func (s *Syncer) onHealByteCodes(peer SyncPeer, id uint64, bytecodes [][]byte) e
case s.update <- struct{}{}: case s.update <- struct{}{}:
default: default:
} }
}()
s.lock.Lock()
// Ensure the response is for a valid request // Ensure the response is for a valid request
req, ok := s.bytecodeHealReqs[id] req, ok := s.bytecodeHealReqs[id]
if !ok { if !ok {