diff --git a/p2p/server.go b/p2p/server.go index 52d1be6775..3b2f2b0786 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -398,12 +398,11 @@ type dialer interface { func (srv *Server) run(dialstate dialer) { defer srv.loopWG.Done() var ( - peers = make(map[discover.NodeID]*Peer) - trusted = make(map[discover.NodeID]bool, len(srv.TrustedNodes)) - - tasks []task - pendingTasks []task + peers = make(map[discover.NodeID]*Peer) + trusted = make(map[discover.NodeID]bool, len(srv.TrustedNodes)) taskdone = make(chan task, maxActiveDialTasks) + runningTasks []task + queuedTasks []task // tasks that can't run yet ) // Put trusted nodes into a map to speed up checks. // Trusted peers are loaded on startup and cannot be @@ -412,39 +411,39 @@ func (srv *Server) run(dialstate dialer) { trusted[n.ID] = true } - // Some task list helpers. + // removes t from runningTasks delTask := func(t task) { - for i := range tasks { - if tasks[i] == t { - tasks = append(tasks[:i], tasks[i+1:]...) + for i := range runningTasks { + if runningTasks[i] == t { + runningTasks = append(runningTasks[:i], runningTasks[i+1:]...) break } } } - scheduleTasks := func(new []task) { - pt := append(pendingTasks, new...) - start := maxActiveDialTasks - len(tasks) - if len(pt) < start { - start = len(pt) + // starts until max number of active tasks is satisfied + startTasks := func(ts []task) (rest []task) { + i := 0 + for ; len(runningTasks) < maxActiveDialTasks && i < len(ts); i++ { + t := ts[i] + glog.V(logger.Detail).Infoln("new task:", t) + go func() { t.Do(srv); taskdone <- t }() + runningTasks = append(runningTasks, t) } - if start > 0 { - tasks = append(tasks, pt[:start]...) - for _, t := range pt[:start] { - t := t - glog.V(logger.Detail).Infoln("new task:", t) - go func() { t.Do(srv); taskdone <- t }() - } - copy(pt, pt[start:]) - pendingTasks = pt[:len(pt)-start] + return ts[i:] + } + scheduleTasks := func() { + // Start from queue first. + queuedTasks = append(queuedTasks[:0], startTasks(queuedTasks)...) + // Query dialer for new tasks and start as many as possible now. + if len(runningTasks) < maxActiveDialTasks { + nt := dialstate.newTasks(len(runningTasks)+len(queuedTasks), peers, time.Now()) + queuedTasks = append(queuedTasks, startTasks(nt)...) } } running: for { - // Query the dialer for new tasks and launch them. - now := time.Now() - nt := dialstate.newTasks(len(pendingTasks)+len(tasks), peers, now) - scheduleTasks(nt) + scheduleTasks() select { case <-srv.quit: @@ -466,7 +465,7 @@ running: // can update its state and remove it from the active // tasks list. glog.V(logger.Detail).Infoln("<-taskdone:", t) - dialstate.taskDone(t, now) + dialstate.taskDone(t, time.Now()) delTask(t) case c := <-srv.posthandshake: // A connection has passed the encryption handshake so @@ -513,7 +512,7 @@ running: // Wait for peers to shut down. Pending connections and tasks are // not handled here and will terminate soon-ish because srv.quit // is closed. - glog.V(logger.Detail).Infof("ignoring %d pending tasks at spindown", len(tasks)) + glog.V(logger.Detail).Infof("ignoring %d pending tasks at spindown", len(runningTasks)) for len(peers) > 0 { p := <-srv.delpeer glog.V(logger.Detail).Infoln("<-delpeer (spindown):", p) diff --git a/p2p/server_test.go b/p2p/server_test.go index 02d1c8e010..b437ac3676 100644 --- a/p2p/server_test.go +++ b/p2p/server_test.go @@ -235,6 +235,56 @@ func TestServerTaskScheduling(t *testing.T) { } } +// This test checks that Server doesn't drop tasks, +// even if newTasks returns more than the maximum number of tasks. +func TestServerManyTasks(t *testing.T) { + alltasks := make([]task, 300) + for i := range alltasks { + alltasks[i] = &testTask{index: i} + } + + var ( + srv = &Server{quit: make(chan struct{}), ntab: fakeTable{}, running: true} + done = make(chan *testTask) + start, end = 0, 0 + ) + defer srv.Stop() + srv.loopWG.Add(1) + go srv.run(taskgen{ + newFunc: func(running int, peers map[discover.NodeID]*Peer) []task { + start, end = end, end+maxActiveDialTasks+10 + if end > len(alltasks) { + end = len(alltasks) + } + return alltasks[start:end] + }, + doneFunc: func(tt task) { + done <- tt.(*testTask) + }, + }) + + doneset := make(map[int]bool) + timeout := time.After(2 * time.Second) + for len(doneset) < len(alltasks) { + select { + case tt := <-done: + if doneset[tt.index] { + t.Errorf("task %d got done more than once", tt.index) + } else { + doneset[tt.index] = true + } + case <-timeout: + t.Errorf("%d of %d tasks got done within 2s", len(doneset), len(alltasks)) + for i := 0; i < len(alltasks); i++ { + if !doneset[i] { + t.Logf("task %d not done", i) + } + } + return + } + } +} + type taskgen struct { newFunc func(running int, peers map[discover.NodeID]*Peer) []task doneFunc func(task)