eth/downloader: improve deliverNodeData (#3588)
Commit d3b751e
accidentally deleted a crucial 'return' statement,
leading to a crash in case of an issue with node data. This change
improves the fix in PR #3591 by removing the lock entirely.
This commit is contained in:
parent
2718b42828
commit
f1069a30b9
|
@ -23,7 +23,6 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
|
@ -101,10 +100,9 @@ type queue struct {
|
|||
stateTaskQueue *prque.Prque // [eth/63] Priority queue of the hashes to fetch the node data for
|
||||
statePendPool map[string]*fetchRequest // [eth/63] Currently pending node data retrieval operations
|
||||
|
||||
stateDatabase ethdb.Database // [eth/63] Trie database to populate during state reassembly
|
||||
stateScheduler *state.StateSync // [eth/63] State trie synchronisation scheduler and integrator
|
||||
stateProcessors int32 // [eth/63] Number of currently running state processors
|
||||
stateSchedLock sync.RWMutex // [eth/63] Lock serialising access to the state scheduler
|
||||
stateDatabase ethdb.Database // [eth/63] Trie database to populate during state reassembly
|
||||
stateScheduler *state.StateSync // [eth/63] State trie synchronisation scheduler and integrator
|
||||
stateWriters int // [eth/63] Number of running state DB writer goroutines
|
||||
|
||||
resultCache []*fetchResult // Downloaded but not yet delivered fetch results
|
||||
resultOffset uint64 // Offset of the first cached fetch result in the block chain
|
||||
|
@ -143,9 +141,6 @@ func (q *queue) Reset() {
|
|||
q.lock.Lock()
|
||||
defer q.lock.Unlock()
|
||||
|
||||
q.stateSchedLock.Lock()
|
||||
defer q.stateSchedLock.Unlock()
|
||||
|
||||
q.closed = false
|
||||
q.mode = FullSync
|
||||
q.fastSyncPivot = 0
|
||||
|
@ -209,13 +204,24 @@ func (q *queue) PendingReceipts() int {
|
|||
|
||||
// PendingNodeData retrieves the number of node data entries pending for retrieval.
|
||||
func (q *queue) PendingNodeData() int {
|
||||
q.stateSchedLock.RLock()
|
||||
defer q.stateSchedLock.RUnlock()
|
||||
q.lock.Lock()
|
||||
defer q.lock.Unlock()
|
||||
|
||||
return q.pendingNodeDataLocked()
|
||||
}
|
||||
|
||||
// pendingNodeDataLocked retrieves the number of node data entries pending for retrieval.
|
||||
// The caller must hold q.lock.
|
||||
func (q *queue) pendingNodeDataLocked() int {
|
||||
var n int
|
||||
if q.stateScheduler != nil {
|
||||
return q.stateScheduler.Pending()
|
||||
n = q.stateScheduler.Pending()
|
||||
}
|
||||
return 0
|
||||
// Ensure that PendingNodeData doesn't return 0 until all state is written.
|
||||
if q.stateWriters > 0 {
|
||||
n++
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
// InFlightHeaders retrieves whether there are header fetch requests currently
|
||||
|
@ -251,7 +257,7 @@ func (q *queue) InFlightNodeData() bool {
|
|||
q.lock.Lock()
|
||||
defer q.lock.Unlock()
|
||||
|
||||
return len(q.statePendPool)+int(atomic.LoadInt32(&q.stateProcessors)) > 0
|
||||
return len(q.statePendPool)+q.stateWriters > 0
|
||||
}
|
||||
|
||||
// Idle returns if the queue is fully idle or has some data still inside. This
|
||||
|
@ -264,12 +270,9 @@ func (q *queue) Idle() bool {
|
|||
pending := len(q.blockPendPool) + len(q.receiptPendPool) + len(q.statePendPool)
|
||||
cached := len(q.blockDonePool) + len(q.receiptDonePool)
|
||||
|
||||
q.stateSchedLock.RLock()
|
||||
if q.stateScheduler != nil {
|
||||
queued += q.stateScheduler.Pending()
|
||||
}
|
||||
q.stateSchedLock.RUnlock()
|
||||
|
||||
return (queued + pending + cached) == 0
|
||||
}
|
||||
|
||||
|
@ -398,9 +401,7 @@ func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header {
|
|||
req.Hashes = make(map[common.Hash]int) // Make sure executing requests fail, but don't disappear
|
||||
}
|
||||
|
||||
q.stateSchedLock.Lock()
|
||||
q.stateScheduler = state.NewStateSync(header.Root, q.stateDatabase)
|
||||
q.stateSchedLock.Unlock()
|
||||
}
|
||||
inserts = append(inserts, header)
|
||||
q.headerHead = hash
|
||||
|
@ -459,7 +460,7 @@ func (q *queue) countProcessableItems() int {
|
|||
// resultCache has space for fsHeaderForceVerify items. Not
|
||||
// doing this could leave us unable to download the required
|
||||
// amount of headers.
|
||||
if i > 0 || len(q.stateTaskPool) > 0 || q.PendingNodeData() > 0 {
|
||||
if i > 0 || len(q.stateTaskPool) > 0 || q.pendingNodeDataLocked() > 0 {
|
||||
return i
|
||||
}
|
||||
for j := 0; j < fsHeaderForceVerify; j++ {
|
||||
|
@ -524,9 +525,6 @@ func (q *queue) ReserveHeaders(p *peer, count int) *fetchRequest {
|
|||
func (q *queue) ReserveNodeData(p *peer, count int) *fetchRequest {
|
||||
// Create a task generator to fetch status-fetch tasks if all schedules ones are done
|
||||
generator := func(max int) {
|
||||
q.stateSchedLock.Lock()
|
||||
defer q.stateSchedLock.Unlock()
|
||||
|
||||
if q.stateScheduler != nil {
|
||||
for _, hash := range q.stateScheduler.Missing(max) {
|
||||
q.stateTaskPool[hash] = q.stateTaskIndex
|
||||
|
@ -1068,7 +1066,7 @@ func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(int, boo
|
|||
}
|
||||
}
|
||||
// Iterate over the downloaded data and verify each of them
|
||||
accepted, errs := 0, make([]error, 0)
|
||||
errs := make([]error, 0)
|
||||
process := []trie.SyncResult{}
|
||||
for _, blob := range data {
|
||||
// Skip any state trie entries that were not requested
|
||||
|
@ -1079,70 +1077,52 @@ func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(int, boo
|
|||
}
|
||||
// Inject the next state trie item into the processing queue
|
||||
process = append(process, trie.SyncResult{Hash: hash, Data: blob})
|
||||
accepted++
|
||||
|
||||
delete(request.Hashes, hash)
|
||||
delete(q.stateTaskPool, hash)
|
||||
}
|
||||
// Start the asynchronous node state data injection
|
||||
atomic.AddInt32(&q.stateProcessors, 1)
|
||||
go func() {
|
||||
defer atomic.AddInt32(&q.stateProcessors, -1)
|
||||
q.deliverNodeData(process, callback)
|
||||
}()
|
||||
// Return all failed or missing fetches to the queue
|
||||
for hash, index := range request.Hashes {
|
||||
q.stateTaskQueue.Push(hash, float32(index))
|
||||
}
|
||||
if q.stateScheduler == nil {
|
||||
return 0, errNoFetchesPending
|
||||
}
|
||||
|
||||
// Run valid nodes through the trie download scheduler. It writes completed nodes to a
|
||||
// batch, which is committed asynchronously. This may lead to over-fetches because the
|
||||
// scheduler treats everything as written after Process has returned, but it's
|
||||
// unlikely to be an issue in practice.
|
||||
batch := q.stateDatabase.NewBatch()
|
||||
progressed, nproc, procerr := q.stateScheduler.Process(process, batch)
|
||||
q.stateWriters += 1
|
||||
go func() {
|
||||
if procerr == nil {
|
||||
nproc = len(process)
|
||||
procerr = batch.Write()
|
||||
}
|
||||
// Return processing errors through the callback so the sync gets canceled. The
|
||||
// number of writers is decremented prior to the call so PendingNodeData will
|
||||
// return zero when the callback runs.
|
||||
q.lock.Lock()
|
||||
q.stateWriters -= 1
|
||||
q.lock.Unlock()
|
||||
callback(nproc, progressed, procerr)
|
||||
// Wake up WaitResults after the state has been written because it might be
|
||||
// waiting for completion of the pivot block's state download.
|
||||
q.active.Signal()
|
||||
}()
|
||||
|
||||
// If none of the data items were good, it's a stale delivery
|
||||
switch {
|
||||
case len(errs) == 0:
|
||||
return accepted, nil
|
||||
return len(process), nil
|
||||
case len(errs) == len(request.Hashes):
|
||||
return accepted, errStaleDelivery
|
||||
return len(process), errStaleDelivery
|
||||
default:
|
||||
return accepted, fmt.Errorf("multiple failures: %v", errs)
|
||||
return len(process), fmt.Errorf("multiple failures: %v", errs)
|
||||
}
|
||||
}
|
||||
|
||||
// deliverNodeData is the asynchronous node data processor that injects a batch
|
||||
// of sync results into the state scheduler.
|
||||
func (q *queue) deliverNodeData(results []trie.SyncResult, callback func(int, bool, error)) {
|
||||
// Wake up WaitResults after the state has been written because it
|
||||
// might be waiting for the pivot block state to get completed.
|
||||
defer q.active.Signal()
|
||||
|
||||
// Process results one by one to permit task fetches in between
|
||||
progressed := false
|
||||
for i, result := range results {
|
||||
q.stateSchedLock.Lock()
|
||||
|
||||
if q.stateScheduler == nil {
|
||||
// Syncing aborted since this async delivery started, bail out
|
||||
q.stateSchedLock.Unlock()
|
||||
callback(i, progressed, errNoFetchesPending)
|
||||
return
|
||||
}
|
||||
|
||||
batch := q.stateDatabase.NewBatch()
|
||||
prog, _, err := q.stateScheduler.Process([]trie.SyncResult{result}, batch)
|
||||
if err != nil {
|
||||
q.stateSchedLock.Unlock()
|
||||
callback(i, progressed, err)
|
||||
return
|
||||
}
|
||||
if err = batch.Write(); err != nil {
|
||||
q.stateSchedLock.Unlock()
|
||||
callback(i, progressed, err)
|
||||
return // TODO(karalabe): If a DB write fails (disk full), we ought to cancel the sync
|
||||
}
|
||||
// Item processing succeeded, release the lock (temporarily)
|
||||
progressed = progressed || prog
|
||||
q.stateSchedLock.Unlock()
|
||||
}
|
||||
callback(len(results), progressed, nil)
|
||||
}
|
||||
|
||||
// Prepare configures the result cache to allow accepting and caching inbound
|
||||
// fetch results.
|
||||
func (q *queue) Prepare(offset uint64, mode SyncMode, pivot uint64, head *types.Header) {
|
||||
|
|
Loading…
Reference in New Issue