diff --git a/cmd.go b/cmd.go index 1d22b80..35045c5 100644 --- a/cmd.go +++ b/cmd.go @@ -119,7 +119,7 @@ func main() { } if count > 0 { - b = append([]byte(","), b...) + b = append([]byte(",\n"), b...) } _, err = w.Write(b) diff --git a/crawler.go b/crawler.go index 00027ed..59a69ee 100644 --- a/crawler.go +++ b/crawler.go @@ -21,7 +21,7 @@ type Crawler struct { PeerAge time.Duration ConnectTimeout time.Duration shutdown chan struct{} - waitGroup *sync.WaitGroup + waitGroup sync.WaitGroup } type Result struct { @@ -30,24 +30,29 @@ type Result struct { } func NewCrawler(client *Client, seeds []string) *Crawler { - var wg sync.WaitGroup - c := Crawler{ client: client, seenFilter: map[string]bool{}, shutdown: make(chan struct{}, 1), - waitGroup: &wg, + waitGroup: sync.WaitGroup{}, } filter := func(address string) *string { return c.filter(address) } - c.queue = queue.NewQueue(filter, &wg) + + done := make(chan struct{}) + c.queue = queue.NewQueue(filter, done) // Prefill the queue for _, address := range seeds { - c.queue.Add(address) + c.addSeed(address) } + go func() { + c.waitGroup.Wait() + done <- struct{}{} + }() + return &c } @@ -120,6 +125,13 @@ func (c *Crawler) handleAddress(address string) *Result { } } +func (c *Crawler) addSeed(address string) { + c.waitGroup.Add(1) + if c.queue.Add(address) == false { + c.waitGroup.Done() + } +} + func (c *Crawler) filter(address string) *string { // Returns true if not seen before, otherwise false c.numSeen++ @@ -142,10 +154,7 @@ func (c *Crawler) process(r *Result) *Result { continue } - c.waitGroup.Add(1) - if !c.queue.Add(NetAddressKey(addr)) { - c.waitGroup.Done() - } + c.addSeed(NetAddressKey(addr)) } if len(r.Peers) > 0 { diff --git a/queue/queue.go b/queue/queue.go index 5c9b5ca..2c9d44d 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -5,18 +5,18 @@ import "sync" // TODO: Make this an interface and multiple implementations (Redis etc?) type Queue struct { sync.Mutex - storage []string - filter func(string) *string - count int - cond *sync.Cond - waitGroup *sync.WaitGroup + storage []string + filter func(string) *string + count int + cond *sync.Cond + done <-chan struct{} } -func NewQueue(filter func(string) *string, waitGroup *sync.WaitGroup) *Queue { +func NewQueue(filter func(string) *string, done <-chan struct{}) *Queue { q := Queue{ - storage: []string{}, - filter: filter, - waitGroup: waitGroup, + storage: []string{}, + filter: filter, + done: done, } q.cond = sync.NewCond(&q) @@ -24,12 +24,13 @@ func NewQueue(filter func(string) *string, waitGroup *sync.WaitGroup) *Queue { } func (q *Queue) Add(item string) bool { + q.Lock() r := q.filter(item) if r == nil { + q.Unlock() return false } - q.Lock() q.storage = append(q.storage, *r) q.count++ q.Unlock() @@ -42,7 +43,7 @@ func (q *Queue) Iter() <-chan string { ch := make(chan string) go func() { - q.waitGroup.Wait() + <-q.done q.cond.Signal() // Wake up to close the channel. }()