Refactored the parallelized bits, still broken though.

This commit is contained in:
Andrey Petrov 2014-05-17 19:44:33 -07:00
parent e73de7985e
commit c8de36ad5e
4 changed files with 113 additions and 126 deletions

View File

@ -1,6 +1,11 @@
btc-crawl: **.go all: btc-crawl
**/*.go:
go build ./... go build ./...
btc-crawl: **/*.go *.go
go build .
build: btc-crawl build: btc-crawl
clean: clean:

6
cmd.go
View File

@ -97,8 +97,6 @@ func main() {
return return
} }
resultChan := make(chan Result)
// Construct interrupt handler // Construct interrupt handler
sig := make(chan os.Signal, 1) sig := make(chan os.Signal, 1)
signal.Notify(sig, os.Interrupt) signal.Notify(sig, os.Interrupt)
@ -109,11 +107,11 @@ func main() {
<-sig // Hurry up? <-sig // Hurry up?
logger.Warningf("Urgent interrupt. Abandoning in-progress workers.") logger.Warningf("Urgent interrupt. Abandoning in-progress workers.")
close(resultChan) // FIXME: Could this cause stuff to asplode? crawler.Shutdown() // FIXME: Could this cause stuff to asplode?
}() }()
// Launch crawler // Launch crawler
go crawler.Run(resultChan, options.Concurrency) resultChan := crawler.Run(options.Concurrency)
logger.Infof("Crawler started with %d concurrency limit.", options.Concurrency) logger.Infof("Crawler started with %d concurrency limit.", options.Concurrency)
// Start processing results // Start processing results

View File

@ -1,6 +1,7 @@
package main package main
import ( import (
"sync"
"time" "time"
"./queue" "./queue"
@ -20,6 +21,7 @@ type Crawler struct {
PeerAge time.Duration PeerAge time.Duration
ConnectTimeout time.Duration ConnectTimeout time.Duration
shutdown chan struct{} shutdown chan struct{}
wait sync.WaitGroup
} }
type Result struct { type Result struct {
@ -32,18 +34,17 @@ func NewCrawler(client *Client, seeds []string) *Crawler {
client: client, client: client,
seenFilter: map[string]bool{}, seenFilter: map[string]bool{},
shutdown: make(chan struct{}, 1), shutdown: make(chan struct{}, 1),
wait: sync.WaitGroup{},
} }
filter := func(address string) *string { filter := func(address string) *string {
return c.filter(address) return c.filter(address)
} }
c.queue = queue.NewQueue(filter, 10) c.queue = queue.NewQueue(filter, &c.wait)
go func() { // Prefill the queue
// Prefill the queue for _, address := range seeds {
for _, address := range seeds { c.queue.Add(address)
c.queue.Input <- address }
}
}()
return &c return &c
} }
@ -139,7 +140,7 @@ func (c *Crawler) process(r *Result) *Result {
continue continue
} }
c.queue.Input <- NetAddressKey(addr) c.queue.Add(NetAddressKey(addr))
} }
if len(r.Peers) > 0 { if len(r.Peers) > 0 {
@ -150,53 +151,51 @@ func (c *Crawler) process(r *Result) *Result {
return nil return nil
} }
func (c *Crawler) Run(resultChan chan<- Result, numWorkers int) { func (c *Crawler) Run(numWorkers int) <-chan Result {
result := make(chan Result, 100)
workerChan := make(chan struct{}, numWorkers) workerChan := make(chan struct{}, numWorkers)
tempResult := make(chan Result) isDone := false
numActive := 0
isActive := true
// Block until we get the first item go func() {
c.queue.Wait() // Queue handler
for address := range c.queue.Iter() {
// Reserve worker slot (block)
workerChan <- struct{}{}
// This is the main "event loop". if isDone {
// FIXME: Feels like there should be a better way to manage the number of
// concurrent workers without limiting slots with workerChan and without
// using a numActive counter.
for {
select {
case workerChan <- struct{}{}:
if !isActive {
// Don't start any new workers, leave the slot filled.
break break
} else if c.queue.IsEmpty() { }
if numActive == 0 {
logger.Infof("Done after %d queued items.", c.queue.Count()) // Start worker
close(resultChan) c.wait.Add(1)
return go func() {
logger.Debugf("[%s] Work received.", address)
r := c.handleAddress(address)
// Process the result
if c.process(r) != nil {
result <- *r
} }
// Clear worker slot
<-workerChan <-workerChan
break c.wait.Done()
} logger.Debugf("[%s] Work completed.", address)
numActive++
go func() {
address := <-c.queue.Output
logger.Debugf("[%s] Work received.", address)
tempResult <- *c.handleAddress(address)
}() }()
case r := <-tempResult:
if c.process(&r) != nil {
resultChan <- r
}
numActive--
<-workerChan
case <-c.shutdown:
logger.Infof("Shutting down after %d workers finish.", numActive)
isActive = false
} }
}
logger.Infof("Done after %d queued items.", c.queue.Count())
}()
go func() {
<-c.shutdown
logger.Infof("Shutting down after workers finish.")
isDone = true
// Urgent.
<-c.shutdown
close(result)
}()
return result
} }

View File

@ -1,94 +1,79 @@
package queue package queue
// A single goroutine manages the overflow queue for thread-safety, funneling import "sync"
// data between the Input and Output channels through a specified filter.
// TODO: Make this an interface and multiple implementations (Redis etc?)
type Queue struct { type Queue struct {
Input chan string sync.Mutex
Output chan string storage []string
overflow []string filter func(string) *string
filter func(string) *string count int
count int cond *sync.Cond
waitGroup *sync.WaitGroup
} }
func NewQueue(filter func(string) *string, bufferSize int) *Queue { func NewQueue(filter func(string) *string, waitGroup *sync.WaitGroup) *Queue {
q := Queue{ q := Queue{
Input: make(chan string, bufferSize), storage: []string{},
Output: make(chan string, bufferSize), filter: filter,
overflow: []string{}, waitGroup: waitGroup,
filter: filter,
} }
q.cond = sync.NewCond(&q)
go func() {
// Block until we have a next item
nextItem := q.next()
for {
select {
case item := <-q.Input:
// New input
r := q.filter(item)
if r != nil {
// Store in the overflow
q.overflow = append(q.overflow, *r)
q.count++
}
case q.Output <- nextItem:
// Block until we have more inputs
nextItem = q.next()
}
}
}()
return &q return &q
} }
func (q *Queue) next() string { func (q *Queue) Add(item string) bool {
// Block until a next item is available. r := q.filter(item)
if r == nil {
if len(q.overflow) > 0 {
// Pop off the overflow queue.
r := q.overflow[0]
q.overflow = q.overflow[1:]
return r
}
for {
// Block until we have a viable output
r := q.filter(<-q.Input)
if r != nil {
q.count++
return *r
}
}
}
func (q *Queue) IsEmpty() bool {
// FIXME: This breaks everything, get rid of it.
if len(q.overflow) > 0 {
return false return false
} }
select { q.Lock()
case r := <-q.Output: q.storage = append(q.storage, *r)
go func() { q.count++
// Put it back q.Unlock()
q.Output <- r
}() q.waitGroup.Add(1)
return false q.cond.Signal()
default:
return true return true
}
} }
func (q *Queue) Wait() { func (q *Queue) Iter() <-chan string {
// Wait until there is an Output. Useful for blocking until the queue is ch := make(chan string)
// ramped up.
r := <-q.Output
go func() { go func() {
q.Output <- r q.waitGroup.Wait()
q.cond.Signal() // Wake up to close the channel.
}() }()
go func() {
for {
q.Lock()
if len(q.storage) == 0 {
// Wait until next Add
q.cond.Wait()
if len(q.storage) == 0 {
// Queue is finished
close(ch)
return
}
}
r := q.storage[0]
q.storage = q.storage[1:]
q.waitGroup.Done()
q.Unlock()
ch <- r
}
}()
return ch
} }
func (q *Queue) Count() int { func (q *Queue) Count() int {