From bbbfa3cf6957d5b1324440635d57a7a32ee37404 Mon Sep 17 00:00:00 2001 From: Andrey Petrov Date: Sat, 17 May 2014 21:53:15 -0700 Subject: [PATCH] Unbroken, yay. --- README.md | 5 ++--- cmd.go | 6 +----- crawler.go | 26 ++++++++++++-------------- queue/queue.go | 6 +----- 4 files changed, 16 insertions(+), 27 deletions(-) diff --git a/README.md b/README.md index 863d30f..6d5ba07 100644 --- a/README.md +++ b/README.md @@ -5,8 +5,6 @@ Bitcoin node network crawler (written in golang). This is a for-fun project to explore the Bitcoin protocol and network. Current status: -* **XXX:*** Broken. In the process of a major refactor which has a bug that - keeps thinking it's done prematurely. * JSON streaming is in place, and graceful shutdown. * ~~It crawls with all kinds of nice parameters but stores everything in memory until dumping a giant JSON blob at the end.~~ @@ -36,7 +34,8 @@ known nodes, but usually only several have timestamps within the last hour. (In approximate order of priority) * Apply peer-age filter to results -* Add timeout option. +* Output using some space-conscious format. Right now the output file grows + fairly quickly. * Namespace useful sub-packages properly (outside of `main`) diff --git a/cmd.go b/cmd.go index 3e3350a..1d22b80 100644 --- a/cmd.go +++ b/cmd.go @@ -102,12 +102,8 @@ func main() { signal.Notify(sig, os.Interrupt) go func() { <-sig // Wait for ^C signal - logger.Warningf("Interrupt signal detected, shutting down gracefully by waiting for active workers to finish.") + logger.Warningf("Interrupt signal detected, shutting down.") crawler.Shutdown() - - <-sig // Hurry up? - logger.Warningf("Urgent interrupt. Abandoning in-progress workers.") - crawler.Shutdown() // FIXME: Could this cause stuff to asplode? }() // Launch crawler diff --git a/crawler.go b/crawler.go index 8e0ba6e..00027ed 100644 --- a/crawler.go +++ b/crawler.go @@ -21,7 +21,7 @@ type Crawler struct { PeerAge time.Duration ConnectTimeout time.Duration shutdown chan struct{} - wait sync.WaitGroup + waitGroup *sync.WaitGroup } type Result struct { @@ -30,16 +30,18 @@ type Result struct { } func NewCrawler(client *Client, seeds []string) *Crawler { + var wg sync.WaitGroup + c := Crawler{ client: client, seenFilter: map[string]bool{}, shutdown: make(chan struct{}, 1), - wait: sync.WaitGroup{}, + waitGroup: &wg, } filter := func(address string) *string { return c.filter(address) } - c.queue = queue.NewQueue(filter, &c.wait) + c.queue = queue.NewQueue(filter, &wg) // Prefill the queue for _, address := range seeds { @@ -140,7 +142,10 @@ func (c *Crawler) process(r *Result) *Result { continue } - c.queue.Add(NetAddressKey(addr)) + c.waitGroup.Add(1) + if !c.queue.Add(NetAddressKey(addr)) { + c.waitGroup.Done() + } } if len(r.Peers) > 0 { @@ -167,9 +172,7 @@ func (c *Crawler) Run(numWorkers int) <-chan Result { } // Start worker - c.wait.Add(1) go func() { - logger.Debugf("[%s] Work received.", address) r := c.handleAddress(address) // Process the result @@ -179,22 +182,17 @@ func (c *Crawler) Run(numWorkers int) <-chan Result { // Clear worker slot <-workerChan - c.wait.Done() - logger.Debugf("[%s] Work completed.", address) + c.waitGroup.Done() }() } - logger.Infof("Done after %d queued items.", c.queue.Count()) + logger.Infof("Stopping queue after %d added items.", c.queue.Count()) + close(result) }() go func() { <-c.shutdown - logger.Infof("Shutting down after workers finish.") isDone = true - - // Urgent. - <-c.shutdown - close(result) }() return result diff --git a/queue/queue.go b/queue/queue.go index 61000c0..5c9b5ca 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -33,8 +33,6 @@ func (q *Queue) Add(item string) bool { q.storage = append(q.storage, *r) q.count++ q.Unlock() - - q.waitGroup.Add(1) q.cond.Signal() return true @@ -51,7 +49,6 @@ func (q *Queue) Iter() <-chan string { go func() { for { q.Lock() - if len(q.storage) == 0 { // Wait until next Add q.cond.Wait() @@ -59,14 +56,13 @@ func (q *Queue) Iter() <-chan string { if len(q.storage) == 0 { // Queue is finished close(ch) + q.Unlock() return } } r := q.storage[0] q.storage = q.storage[1:] - - q.waitGroup.Done() q.Unlock() ch <- r