diff --git a/eth/handler.go b/eth/handler.go index 9d6a8c4225..f0b8d65a12 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -124,7 +124,9 @@ type handler struct { chainSync *chainSyncer wg sync.WaitGroup - peerWG sync.WaitGroup + + handlerStartCh chan struct{} + handlerDoneCh chan struct{} } // newHandler returns a handler for all Ethereum chain management protocol. @@ -144,6 +146,8 @@ func newHandler(config *handlerConfig) (*handler, error) { merger: config.Merger, requiredBlocks: config.RequiredBlocks, quitSync: make(chan struct{}), + handlerDoneCh: make(chan struct{}), + handlerStartCh: make(chan struct{}), } if config.Sync == downloader.FullSync { // The database seems empty as the current block is the genesis. Yet the snap @@ -289,9 +293,50 @@ func newHandler(config *handlerConfig) (*handler, error) { return h, nil } +// protoTracker tracks the number of active protocol handlers. +func (h *handler) protoTracker() { + defer h.wg.Done() + var active int + for { + select { + case <-h.handlerStartCh: + active++ + case <-h.handlerDoneCh: + active-- + case <-h.quitSync: + // Wait for all active handlers to finish. + for ; active > 0; active-- { + <-h.handlerDoneCh + } + return + } + } +} + +// incHandlers signals to increment the number of active handlers if not +// quitting. +func (h *handler) incHandlers() bool { + select { + case h.handlerStartCh <- struct{}{}: + return true + case <-h.quitSync: + return false + } +} + +// decHandlers signals to decrement the number of active handlers. +func (h *handler) decHandlers() { + h.handlerDoneCh <- struct{}{} +} + // runEthPeer registers an eth peer into the joint eth/snap peerset, adds it to // various subsystems and starts handling messages. func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error { + if !h.incHandlers() { + return p2p.DiscQuitting + } + defer h.decHandlers() + // If the peer has a `snap` extension, wait for it to connect so we can have // a uniform initialization/teardown mechanism snap, err := h.peers.waitSnapExtension(peer) @@ -299,12 +344,6 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error { peer.Log().Error("Snapshot extension barrier failed", "err", err) return err } - // TODO(karalabe): Not sure why this is needed - if !h.chainSync.handlePeerEvent(peer) { - return p2p.DiscQuitting - } - h.peerWG.Add(1) - defer h.peerWG.Done() // Execute the Ethereum handshake var ( @@ -360,7 +399,7 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error { return err } } - h.chainSync.handlePeerEvent(peer) + h.chainSync.handlePeerEvent() // Propagate existing transactions. new transactions appearing // after this will be sent via broadcasts. @@ -421,8 +460,10 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error { // `eth`, all subsystem registrations and lifecycle management will be done by // the main `eth` handler to prevent strange races. func (h *handler) runSnapExtension(peer *snap.Peer, handler snap.Handler) error { - h.peerWG.Add(1) - defer h.peerWG.Done() + if !h.incHandlers() { + return p2p.DiscQuitting + } + defer h.decHandlers() if err := h.peers.registerSnapExtension(peer); err != nil { if metrics.Enabled { @@ -494,6 +535,10 @@ func (h *handler) Start(maxPeers int) { // start sync handlers h.wg.Add(1) go h.chainSync.loop() + + // start peer handler tracker + h.wg.Add(1) + go h.protoTracker() } func (h *handler) Stop() { @@ -503,14 +548,13 @@ func (h *handler) Stop() { // Quit chainSync and txsync64. // After this is done, no new peers will be accepted. close(h.quitSync) - h.wg.Wait() // Disconnect existing sessions. // This also closes the gate for any new registrations on the peer set. // sessions which are already established but not added to h.peers yet // will exit when they try to register. h.peers.close() - h.peerWG.Wait() + h.wg.Wait() log.Info("Ethereum protocol stopped") } diff --git a/eth/handler_eth.go b/eth/handler_eth.go index 00be022d9e..3a5e6608bb 100644 --- a/eth/handler_eth.go +++ b/eth/handler_eth.go @@ -134,7 +134,7 @@ func (h *ethHandler) handleBlockBroadcast(peer *eth.Peer, block *types.Block, td // Update the peer's total difficulty if better than the previous if _, td := peer.Head(); trueTD.Cmp(td) > 0 { peer.SetHead(trueHead, trueTD) - h.chainSync.handlePeerEvent(peer) + h.chainSync.handlePeerEvent() } return nil } diff --git a/eth/sync.go b/eth/sync.go index ace1071143..89bffbe653 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -89,7 +89,7 @@ func newChainSyncer(handler *handler) *chainSyncer { // handlePeerEvent notifies the syncer about a change in the peer set. // This is called for new peers and every time a peer announces a new // chain head. -func (cs *chainSyncer) handlePeerEvent(peer *eth.Peer) bool { +func (cs *chainSyncer) handlePeerEvent() bool { select { case cs.peerEventCh <- struct{}{}: return true