Some race condition fixen.

This commit is contained in:
Andrey Petrov 2014-05-17 22:16:05 -07:00
parent bbbfa3cf69
commit 6ab35205f6
3 changed files with 32 additions and 22 deletions

2
cmd.go
View File

@ -119,7 +119,7 @@ func main() {
}
if count > 0 {
b = append([]byte(","), b...)
b = append([]byte(",\n"), b...)
}
_, err = w.Write(b)

View File

@ -21,7 +21,7 @@ type Crawler struct {
PeerAge time.Duration
ConnectTimeout time.Duration
shutdown chan struct{}
waitGroup *sync.WaitGroup
waitGroup sync.WaitGroup
}
type Result struct {
@ -30,24 +30,29 @@ type Result struct {
}
func NewCrawler(client *Client, seeds []string) *Crawler {
var wg sync.WaitGroup
c := Crawler{
client: client,
seenFilter: map[string]bool{},
shutdown: make(chan struct{}, 1),
waitGroup: &wg,
waitGroup: sync.WaitGroup{},
}
filter := func(address string) *string {
return c.filter(address)
}
c.queue = queue.NewQueue(filter, &wg)
done := make(chan struct{})
c.queue = queue.NewQueue(filter, done)
// Prefill the queue
for _, address := range seeds {
c.queue.Add(address)
c.addSeed(address)
}
go func() {
c.waitGroup.Wait()
done <- struct{}{}
}()
return &c
}
@ -120,6 +125,13 @@ func (c *Crawler) handleAddress(address string) *Result {
}
}
func (c *Crawler) addSeed(address string) {
c.waitGroup.Add(1)
if c.queue.Add(address) == false {
c.waitGroup.Done()
}
}
func (c *Crawler) filter(address string) *string {
// Returns true if not seen before, otherwise false
c.numSeen++
@ -142,10 +154,7 @@ func (c *Crawler) process(r *Result) *Result {
continue
}
c.waitGroup.Add(1)
if !c.queue.Add(NetAddressKey(addr)) {
c.waitGroup.Done()
}
c.addSeed(NetAddressKey(addr))
}
if len(r.Peers) > 0 {

View File

@ -5,18 +5,18 @@ import "sync"
// TODO: Make this an interface and multiple implementations (Redis etc?)
type Queue struct {
sync.Mutex
storage []string
filter func(string) *string
count int
cond *sync.Cond
waitGroup *sync.WaitGroup
storage []string
filter func(string) *string
count int
cond *sync.Cond
done <-chan struct{}
}
func NewQueue(filter func(string) *string, waitGroup *sync.WaitGroup) *Queue {
func NewQueue(filter func(string) *string, done <-chan struct{}) *Queue {
q := Queue{
storage: []string{},
filter: filter,
waitGroup: waitGroup,
storage: []string{},
filter: filter,
done: done,
}
q.cond = sync.NewCond(&q)
@ -24,12 +24,13 @@ func NewQueue(filter func(string) *string, waitGroup *sync.WaitGroup) *Queue {
}
func (q *Queue) Add(item string) bool {
q.Lock()
r := q.filter(item)
if r == nil {
q.Unlock()
return false
}
q.Lock()
q.storage = append(q.storage, *r)
q.count++
q.Unlock()
@ -42,7 +43,7 @@ func (q *Queue) Iter() <-chan string {
ch := make(chan string)
go func() {
q.waitGroup.Wait()
<-q.done
q.cond.Signal() // Wake up to close the channel.
}()