diff --git a/eth/protocols/snap/sync.go b/eth/protocols/snap/sync.go index 1ea589c7b2..cff1a77e6c 100644 --- a/eth/protocols/snap/sync.go +++ b/eth/protocols/snap/sync.go @@ -106,9 +106,11 @@ type accountRequest struct { peer string // Peer to which this request is assigned id uint64 // Request ID of this request - cancel chan struct{} // Channel to track sync cancellation - timeout *time.Timer // Timer to track delivery timeout - stale chan struct{} // Channel to signal the request was dropped + deliver chan *accountResponse // Channel to deliver successful response on + revert chan *accountRequest // Channel to deliver request failure on + cancel chan struct{} // Channel to track sync cancellation + timeout *time.Timer // Timer to track delivery timeout + stale chan struct{} // Channel to signal the request was dropped origin common.Hash // First account requested to allow continuation checks limit common.Hash // Last account requested to allow non-overlapping chunking @@ -147,9 +149,11 @@ type bytecodeRequest struct { peer string // Peer to which this request is assigned id uint64 // Request ID of this request - cancel chan struct{} // Channel to track sync cancellation - timeout *time.Timer // Timer to track delivery timeout - stale chan struct{} // Channel to signal the request was dropped + deliver chan *bytecodeResponse // Channel to deliver successful response on + revert chan *bytecodeRequest // Channel to deliver request failure on + cancel chan struct{} // Channel to track sync cancellation + timeout *time.Timer // Timer to track delivery timeout + stale chan struct{} // Channel to signal the request was dropped hashes []common.Hash // Bytecode hashes to validate responses task *accountTask // Task which this request is filling (only access fields through the runloop!!) @@ -176,9 +180,11 @@ type storageRequest struct { peer string // Peer to which this request is assigned id uint64 // Request ID of this request - cancel chan struct{} // Channel to track sync cancellation - timeout *time.Timer // Timer to track delivery timeout - stale chan struct{} // Channel to signal the request was dropped + deliver chan *storageResponse // Channel to deliver successful response on + revert chan *storageRequest // Channel to deliver request failure on + cancel chan struct{} // Channel to track sync cancellation + timeout *time.Timer // Timer to track delivery timeout + stale chan struct{} // Channel to signal the request was dropped accounts []common.Hash // Account hashes to validate responses roots []common.Hash // Storage roots to validate responses @@ -224,9 +230,11 @@ type trienodeHealRequest struct { peer string // Peer to which this request is assigned id uint64 // Request ID of this request - cancel chan struct{} // Channel to track sync cancellation - timeout *time.Timer // Timer to track delivery timeout - stale chan struct{} // Channel to signal the request was dropped + deliver chan *trienodeHealResponse // Channel to deliver successful response on + revert chan *trienodeHealRequest // Channel to deliver request failure on + cancel chan struct{} // Channel to track sync cancellation + timeout *time.Timer // Timer to track delivery timeout + stale chan struct{} // Channel to signal the request was dropped hashes []common.Hash // Trie node hashes to validate responses paths []trie.SyncPath // Trie node paths requested for rescheduling @@ -256,9 +264,11 @@ type bytecodeHealRequest struct { peer string // Peer to which this request is assigned id uint64 // Request ID of this request - cancel chan struct{} // Channel to track sync cancellation - timeout *time.Timer // Timer to track delivery timeout - stale chan struct{} // Channel to signal the request was dropped + deliver chan *bytecodeHealResponse // Channel to deliver successful response on + revert chan *bytecodeHealRequest // Channel to deliver request failure on + cancel chan struct{} // Channel to track sync cancellation + timeout *time.Timer // Timer to track delivery timeout + stale chan struct{} // Channel to signal the request was dropped hashes []common.Hash // Bytecode hashes to validate responses task *healTask // Task which this request is filling (only access fields through the runloop!!) @@ -399,14 +409,6 @@ type Syncer struct { bytecodeReqs map[uint64]*bytecodeRequest // Bytecode requests currently running storageReqs map[uint64]*storageRequest // Storage requests currently running - accountReqFails chan *accountRequest // Failed account range requests to revert - bytecodeReqFails chan *bytecodeRequest // Failed bytecode requests to revert - storageReqFails chan *storageRequest // Failed storage requests to revert - - accountResps chan *accountResponse // Account sub-tries to integrate into the database - bytecodeResps chan *bytecodeResponse // Bytecodes to integrate into the database - storageResps chan *storageResponse // Storage sub-tries to integrate into the database - accountSynced uint64 // Number of accounts downloaded accountBytes common.StorageSize // Number of account trie bytes persisted to disk bytecodeSynced uint64 // Number of bytecodes downloaded @@ -421,12 +423,6 @@ type Syncer struct { trienodeHealReqs map[uint64]*trienodeHealRequest // Trie node requests currently running bytecodeHealReqs map[uint64]*bytecodeHealRequest // Bytecode requests currently running - trienodeHealReqFails chan *trienodeHealRequest // Failed trienode requests to revert - bytecodeHealReqFails chan *bytecodeHealRequest // Failed bytecode requests to revert - - trienodeHealResps chan *trienodeHealResponse // Trie nodes to integrate into the database - bytecodeHealResps chan *bytecodeHealResponse // Bytecodes to integrate into the database - trienodeHealSynced uint64 // Number of state trie nodes downloaded trienodeHealBytes common.StorageSize // Number of state trie bytes persisted to disk trienodeHealDups uint64 // Number of state trie nodes already processed @@ -464,26 +460,16 @@ func NewSyncer(db ethdb.KeyValueStore) *Syncer { storageIdlers: make(map[string]struct{}), bytecodeIdlers: make(map[string]struct{}), - accountReqs: make(map[uint64]*accountRequest), - storageReqs: make(map[uint64]*storageRequest), - bytecodeReqs: make(map[uint64]*bytecodeRequest), - accountReqFails: make(chan *accountRequest), - storageReqFails: make(chan *storageRequest), - bytecodeReqFails: make(chan *bytecodeRequest), - accountResps: make(chan *accountResponse), - storageResps: make(chan *storageResponse), - bytecodeResps: make(chan *bytecodeResponse), + accountReqs: make(map[uint64]*accountRequest), + storageReqs: make(map[uint64]*storageRequest), + bytecodeReqs: make(map[uint64]*bytecodeRequest), trienodeHealIdlers: make(map[string]struct{}), bytecodeHealIdlers: make(map[string]struct{}), - trienodeHealReqs: make(map[uint64]*trienodeHealRequest), - bytecodeHealReqs: make(map[uint64]*bytecodeHealRequest), - trienodeHealReqFails: make(chan *trienodeHealRequest), - bytecodeHealReqFails: make(chan *bytecodeHealRequest), - trienodeHealResps: make(chan *trienodeHealResponse), - bytecodeHealResps: make(chan *bytecodeHealResponse), - stateWriter: db.NewBatch(), + trienodeHealReqs: make(map[uint64]*trienodeHealRequest), + bytecodeHealReqs: make(map[uint64]*bytecodeHealRequest), + stateWriter: db.NewBatch(), } } @@ -611,6 +597,21 @@ func (s *Syncer) Sync(root common.Hash, cancel chan struct{}) error { peerDropSub := s.peerDrop.Subscribe(peerDrop) defer peerDropSub.Unsubscribe() + // Create a set of unique channels for this sync cycle. We need these to be + // ephemeral so a data race doesn't accidentally deliver something stale on + // a persistent channel across syncs (yup, this happened) + var ( + accountReqFails = make(chan *accountRequest) + storageReqFails = make(chan *storageRequest) + bytecodeReqFails = make(chan *bytecodeRequest) + accountResps = make(chan *accountResponse) + storageResps = make(chan *storageResponse) + bytecodeResps = make(chan *bytecodeResponse) + trienodeHealReqFails = make(chan *trienodeHealRequest) + bytecodeHealReqFails = make(chan *bytecodeHealRequest) + trienodeHealResps = make(chan *trienodeHealResponse) + bytecodeHealResps = make(chan *bytecodeHealResponse) + ) for { // Remove all completed tasks and terminate sync if everything's done s.cleanStorageTasks() @@ -619,14 +620,14 @@ func (s *Syncer) Sync(root common.Hash, cancel chan struct{}) error { return nil } // Assign all the data retrieval tasks to any free peers - s.assignAccountTasks(cancel) - s.assignBytecodeTasks(cancel) - s.assignStorageTasks(cancel) + s.assignAccountTasks(accountResps, accountReqFails, cancel) + s.assignBytecodeTasks(bytecodeResps, bytecodeReqFails, cancel) + s.assignStorageTasks(storageResps, storageReqFails, cancel) if len(s.tasks) == 0 { // Sync phase done, run heal phase - s.assignTrienodeHealTasks(cancel) - s.assignBytecodeHealTasks(cancel) + s.assignTrienodeHealTasks(trienodeHealResps, trienodeHealReqFails, cancel) + s.assignBytecodeHealTasks(bytecodeHealResps, bytecodeHealReqFails, cancel) } // Wait for something to happen select { @@ -639,26 +640,26 @@ func (s *Syncer) Sync(root common.Hash, cancel chan struct{}) error { case <-cancel: return ErrCancelled - case req := <-s.accountReqFails: + case req := <-accountReqFails: s.revertAccountRequest(req) - case req := <-s.bytecodeReqFails: + case req := <-bytecodeReqFails: s.revertBytecodeRequest(req) - case req := <-s.storageReqFails: + case req := <-storageReqFails: s.revertStorageRequest(req) - case req := <-s.trienodeHealReqFails: + case req := <-trienodeHealReqFails: s.revertTrienodeHealRequest(req) - case req := <-s.bytecodeHealReqFails: + case req := <-bytecodeHealReqFails: s.revertBytecodeHealRequest(req) - case res := <-s.accountResps: + case res := <-accountResps: s.processAccountResponse(res) - case res := <-s.bytecodeResps: + case res := <-bytecodeResps: s.processBytecodeResponse(res) - case res := <-s.storageResps: + case res := <-storageResps: s.processStorageResponse(res) - case res := <-s.trienodeHealResps: + case res := <-trienodeHealResps: s.processTrienodeHealResponse(res) - case res := <-s.bytecodeHealResps: + case res := <-bytecodeHealResps: s.processBytecodeHealResponse(res) } // Report stats if something meaningful happened @@ -801,7 +802,7 @@ func (s *Syncer) cleanStorageTasks() { // assignAccountTasks attempts to match idle peers to pending account range // retrievals. -func (s *Syncer) assignAccountTasks(cancel chan struct{}) { +func (s *Syncer) assignAccountTasks(success chan *accountResponse, fail chan *accountRequest, cancel chan struct{}) { s.lock.Lock() defer s.lock.Unlock() @@ -847,13 +848,15 @@ func (s *Syncer) assignAccountTasks(cancel chan struct{}) { } // Generate the network query and send it to the peer req := &accountRequest{ - peer: idle, - id: reqid, - cancel: cancel, - stale: make(chan struct{}), - origin: task.Next, - limit: task.Last, - task: task, + peer: idle, + id: reqid, + deliver: success, + revert: fail, + cancel: cancel, + stale: make(chan struct{}), + origin: task.Next, + limit: task.Last, + task: task, } req.timeout = time.AfterFunc(requestTimeout, func() { peer.Log().Debug("Account range request timed out", "reqid", reqid) @@ -879,7 +882,7 @@ func (s *Syncer) assignAccountTasks(cancel chan struct{}) { } // assignBytecodeTasks attempts to match idle peers to pending code retrievals. -func (s *Syncer) assignBytecodeTasks(cancel chan struct{}) { +func (s *Syncer) assignBytecodeTasks(success chan *bytecodeResponse, fail chan *bytecodeRequest, cancel chan struct{}) { s.lock.Lock() defer s.lock.Unlock() @@ -937,12 +940,14 @@ func (s *Syncer) assignBytecodeTasks(cancel chan struct{}) { } } req := &bytecodeRequest{ - peer: idle, - id: reqid, - cancel: cancel, - stale: make(chan struct{}), - hashes: hashes, - task: task, + peer: idle, + id: reqid, + deliver: success, + revert: fail, + cancel: cancel, + stale: make(chan struct{}), + hashes: hashes, + task: task, } req.timeout = time.AfterFunc(requestTimeout, func() { peer.Log().Debug("Bytecode request timed out", "reqid", reqid) @@ -966,7 +971,7 @@ func (s *Syncer) assignBytecodeTasks(cancel chan struct{}) { // assignStorageTasks attempts to match idle peers to pending storage range // retrievals. -func (s *Syncer) assignStorageTasks(cancel chan struct{}) { +func (s *Syncer) assignStorageTasks(success chan *storageResponse, fail chan *storageRequest, cancel chan struct{}) { s.lock.Lock() defer s.lock.Unlock() @@ -1059,6 +1064,8 @@ func (s *Syncer) assignStorageTasks(cancel chan struct{}) { req := &storageRequest{ peer: idle, id: reqid, + deliver: success, + revert: fail, cancel: cancel, stale: make(chan struct{}), accounts: accounts, @@ -1101,7 +1108,7 @@ func (s *Syncer) assignStorageTasks(cancel chan struct{}) { // assignTrienodeHealTasks attempts to match idle peers to trie node requests to // heal any trie errors caused by the snap sync's chunked retrieval model. -func (s *Syncer) assignTrienodeHealTasks(cancel chan struct{}) { +func (s *Syncer) assignTrienodeHealTasks(success chan *trienodeHealResponse, fail chan *trienodeHealRequest, cancel chan struct{}) { s.lock.Lock() defer s.lock.Unlock() @@ -1179,13 +1186,15 @@ func (s *Syncer) assignTrienodeHealTasks(cancel chan struct{}) { } } req := &trienodeHealRequest{ - peer: idle, - id: reqid, - cancel: cancel, - stale: make(chan struct{}), - hashes: hashes, - paths: paths, - task: s.healer, + peer: idle, + id: reqid, + deliver: success, + revert: fail, + cancel: cancel, + stale: make(chan struct{}), + hashes: hashes, + paths: paths, + task: s.healer, } req.timeout = time.AfterFunc(requestTimeout, func() { peer.Log().Debug("Trienode heal request timed out", "reqid", reqid) @@ -1209,7 +1218,7 @@ func (s *Syncer) assignTrienodeHealTasks(cancel chan struct{}) { // assignBytecodeHealTasks attempts to match idle peers to bytecode requests to // heal any trie errors caused by the snap sync's chunked retrieval model. -func (s *Syncer) assignBytecodeHealTasks(cancel chan struct{}) { +func (s *Syncer) assignBytecodeHealTasks(success chan *bytecodeHealResponse, fail chan *bytecodeHealRequest, cancel chan struct{}) { s.lock.Lock() defer s.lock.Unlock() @@ -1280,12 +1289,14 @@ func (s *Syncer) assignBytecodeHealTasks(cancel chan struct{}) { } } req := &bytecodeHealRequest{ - peer: idle, - id: reqid, - cancel: cancel, - stale: make(chan struct{}), - hashes: hashes, - task: s.healer, + peer: idle, + id: reqid, + deliver: success, + revert: fail, + cancel: cancel, + stale: make(chan struct{}), + hashes: hashes, + task: s.healer, } req.timeout = time.AfterFunc(requestTimeout, func() { peer.Log().Debug("Bytecode heal request timed out", "reqid", reqid) @@ -1366,7 +1377,7 @@ func (s *Syncer) revertRequests(peer string) { // request and return all failed retrieval tasks to the scheduler for reassignment. func (s *Syncer) scheduleRevertAccountRequest(req *accountRequest) { select { - case s.accountReqFails <- req: + case req.revert <- req: // Sync event loop notified case <-req.cancel: // Sync cycle got cancelled @@ -1407,7 +1418,7 @@ func (s *Syncer) revertAccountRequest(req *accountRequest) { // and return all failed retrieval tasks to the scheduler for reassignment. func (s *Syncer) scheduleRevertBytecodeRequest(req *bytecodeRequest) { select { - case s.bytecodeReqFails <- req: + case req.revert <- req: // Sync event loop notified case <-req.cancel: // Sync cycle got cancelled @@ -1448,7 +1459,7 @@ func (s *Syncer) revertBytecodeRequest(req *bytecodeRequest) { // request and return all failed retrieval tasks to the scheduler for reassignment. func (s *Syncer) scheduleRevertStorageRequest(req *storageRequest) { select { - case s.storageReqFails <- req: + case req.revert <- req: // Sync event loop notified case <-req.cancel: // Sync cycle got cancelled @@ -1493,7 +1504,7 @@ func (s *Syncer) revertStorageRequest(req *storageRequest) { // request and return all failed retrieval tasks to the scheduler for reassignment. func (s *Syncer) scheduleRevertTrienodeHealRequest(req *trienodeHealRequest) { select { - case s.trienodeHealReqFails <- req: + case req.revert <- req: // Sync event loop notified case <-req.cancel: // Sync cycle got cancelled @@ -1534,7 +1545,7 @@ func (s *Syncer) revertTrienodeHealRequest(req *trienodeHealRequest) { // request and return all failed retrieval tasks to the scheduler for reassignment. func (s *Syncer) scheduleRevertBytecodeHealRequest(req *bytecodeHealRequest) { select { - case s.bytecodeHealReqFails <- req: + case req.revert <- req: // Sync event loop notified case <-req.cancel: // Sync cycle got cancelled @@ -2147,7 +2158,7 @@ func (s *Syncer) OnAccounts(peer SyncPeer, id uint64, hashes []common.Hash, acco cont: cont, } select { - case s.accountResps <- response: + case req.deliver <- response: case <-req.cancel: case <-req.stale: } @@ -2253,7 +2264,7 @@ func (s *Syncer) onByteCodes(peer SyncPeer, id uint64, bytecodes [][]byte) error codes: codes, } select { - case s.bytecodeResps <- response: + case req.deliver <- response: case <-req.cancel: case <-req.stale: } @@ -2411,7 +2422,7 @@ func (s *Syncer) OnStorage(peer SyncPeer, id uint64, hashes [][]common.Hash, slo cont: cont, } select { - case s.storageResps <- response: + case req.deliver <- response: case <-req.cancel: case <-req.stale: } @@ -2505,7 +2516,7 @@ func (s *Syncer) OnTrieNodes(peer SyncPeer, id uint64, trienodes [][]byte) error nodes: nodes, } select { - case s.trienodeHealResps <- response: + case req.deliver <- response: case <-req.cancel: case <-req.stale: } @@ -2598,7 +2609,7 @@ func (s *Syncer) onHealByteCodes(peer SyncPeer, id uint64, bytecodes [][]byte) e codes: codes, } select { - case s.bytecodeHealResps <- response: + case req.deliver <- response: case <-req.cancel: case <-req.stale: }