Fixing bugs, but broken.

This commit is contained in:
Andrey Petrov 2014-05-15 17:59:01 -07:00
parent 5c128f64bb
commit 8ec2703f3c
3 changed files with 19 additions and 10 deletions

View File

@ -1,5 +1,5 @@
btc-crawl: **.go btc-crawl: **.go
go build . go build ./...
build: btc-crawl build: btc-crawl

View File

@ -170,13 +170,14 @@ func (c *Crawler) Run(resultChan chan<- Result, numWorkers int) {
// Don't start any new workers, leave the slot filled. // Don't start any new workers, leave the slot filled.
break break
} else if c.queue.IsEmpty() { } else if c.queue.IsEmpty() {
<-workerChan
if numActive == 0 { if numActive == 0 {
logger.Infof("Done.") logger.Infof("Done after %d queued items.", c.queue.Count())
close(resultChan) close(resultChan)
return return
} }
<-workerChan
break
} }
numActive++ numActive++
@ -194,7 +195,7 @@ func (c *Crawler) Run(resultChan chan<- Result, numWorkers int) {
<-workerChan <-workerChan
case <-c.shutdown: case <-c.shutdown:
logger.Infof("Shutting down.") logger.Infof("Shutting down after %d workers finish.", numActive)
isActive = false isActive = false
} }
} }

View File

@ -7,6 +7,7 @@ type Queue struct {
Output chan string Output chan string
overflow []string overflow []string
filter func(string) *string filter func(string) *string
count int
} }
func NewQueue(filter func(string) *string, bufferSize int) *Queue { func NewQueue(filter func(string) *string, bufferSize int) *Queue {
@ -23,12 +24,13 @@ func NewQueue(filter func(string) *string, bufferSize int) *Queue {
for { for {
select { select {
case input := <-q.Input: case item := <-input:
// New input // New input
r := q.filter(input) r := q.filter(item)
if r != nil { if r != nil {
// Store in the overflow // Store in the overflow
q.overflow = append(q.overflow, *r) q.overflow = append(q.overflow, *r)
q.count++
} }
case output <- nextItem: case output <- nextItem:
// Block until we have more inputs // Block until we have more inputs
@ -55,6 +57,7 @@ func (q *Queue) next() string {
r := q.filter(<-q.Input) r := q.filter(<-q.Input)
if r != nil { if r != nil {
q.count++
return *r return *r
} }
} }
@ -64,7 +67,7 @@ func (q *Queue) IsEmpty() bool {
// FIXME: This effectively cycles the order of the output buffer. Kinda sad. // FIXME: This effectively cycles the order of the output buffer. Kinda sad.
if len(q.overflow) > 0 { if len(q.overflow) > 0 {
return true return false
} }
select { select {
@ -73,9 +76,9 @@ func (q *Queue) IsEmpty() bool {
// Put it back // Put it back
q.Output <- r q.Output <- r
}() }()
return true
default:
return false return false
default:
return true
} }
} }
@ -87,3 +90,8 @@ func (q *Queue) Wait() {
q.Output <- r q.Output <- r
}() }()
} }
func (q *Queue) Count() int {
// Number of outputs produced.
return q.count
}