Unbroken, yay.

This commit is contained in:
Andrey Petrov 2014-05-17 21:53:15 -07:00
parent 6d23e7fa9d
commit bbbfa3cf69
4 changed files with 16 additions and 27 deletions

View File

@ -5,8 +5,6 @@ Bitcoin node network crawler (written in golang).
This is a for-fun project to explore the Bitcoin protocol and network. This is a for-fun project to explore the Bitcoin protocol and network.
Current status: Current status:
* **XXX:*** Broken. In the process of a major refactor which has a bug that
keeps thinking it's done prematurely.
* JSON streaming is in place, and graceful shutdown. * JSON streaming is in place, and graceful shutdown.
* ~~It crawls with all kinds of nice parameters but stores everything in memory * ~~It crawls with all kinds of nice parameters but stores everything in memory
until dumping a giant JSON blob at the end.~~ until dumping a giant JSON blob at the end.~~
@ -36,7 +34,8 @@ known nodes, but usually only several have timestamps within the last hour.
(In approximate order of priority) (In approximate order of priority)
* Apply peer-age filter to results * Apply peer-age filter to results
* Add timeout option. * Output using some space-conscious format. Right now the output file grows
fairly quickly.
* Namespace useful sub-packages properly (outside of `main`) * Namespace useful sub-packages properly (outside of `main`)

6
cmd.go
View File

@ -102,12 +102,8 @@ func main() {
signal.Notify(sig, os.Interrupt) signal.Notify(sig, os.Interrupt)
go func() { go func() {
<-sig // Wait for ^C signal <-sig // Wait for ^C signal
logger.Warningf("Interrupt signal detected, shutting down gracefully by waiting for active workers to finish.") logger.Warningf("Interrupt signal detected, shutting down.")
crawler.Shutdown() crawler.Shutdown()
<-sig // Hurry up?
logger.Warningf("Urgent interrupt. Abandoning in-progress workers.")
crawler.Shutdown() // FIXME: Could this cause stuff to asplode?
}() }()
// Launch crawler // Launch crawler

View File

@ -21,7 +21,7 @@ type Crawler struct {
PeerAge time.Duration PeerAge time.Duration
ConnectTimeout time.Duration ConnectTimeout time.Duration
shutdown chan struct{} shutdown chan struct{}
wait sync.WaitGroup waitGroup *sync.WaitGroup
} }
type Result struct { type Result struct {
@ -30,16 +30,18 @@ type Result struct {
} }
func NewCrawler(client *Client, seeds []string) *Crawler { func NewCrawler(client *Client, seeds []string) *Crawler {
var wg sync.WaitGroup
c := Crawler{ c := Crawler{
client: client, client: client,
seenFilter: map[string]bool{}, seenFilter: map[string]bool{},
shutdown: make(chan struct{}, 1), shutdown: make(chan struct{}, 1),
wait: sync.WaitGroup{}, waitGroup: &wg,
} }
filter := func(address string) *string { filter := func(address string) *string {
return c.filter(address) return c.filter(address)
} }
c.queue = queue.NewQueue(filter, &c.wait) c.queue = queue.NewQueue(filter, &wg)
// Prefill the queue // Prefill the queue
for _, address := range seeds { for _, address := range seeds {
@ -140,7 +142,10 @@ func (c *Crawler) process(r *Result) *Result {
continue continue
} }
c.queue.Add(NetAddressKey(addr)) c.waitGroup.Add(1)
if !c.queue.Add(NetAddressKey(addr)) {
c.waitGroup.Done()
}
} }
if len(r.Peers) > 0 { if len(r.Peers) > 0 {
@ -167,9 +172,7 @@ func (c *Crawler) Run(numWorkers int) <-chan Result {
} }
// Start worker // Start worker
c.wait.Add(1)
go func() { go func() {
logger.Debugf("[%s] Work received.", address)
r := c.handleAddress(address) r := c.handleAddress(address)
// Process the result // Process the result
@ -179,22 +182,17 @@ func (c *Crawler) Run(numWorkers int) <-chan Result {
// Clear worker slot // Clear worker slot
<-workerChan <-workerChan
c.wait.Done() c.waitGroup.Done()
logger.Debugf("[%s] Work completed.", address)
}() }()
} }
logger.Infof("Done after %d queued items.", c.queue.Count()) logger.Infof("Stopping queue after %d added items.", c.queue.Count())
close(result)
}() }()
go func() { go func() {
<-c.shutdown <-c.shutdown
logger.Infof("Shutting down after workers finish.")
isDone = true isDone = true
// Urgent.
<-c.shutdown
close(result)
}() }()
return result return result

View File

@ -33,8 +33,6 @@ func (q *Queue) Add(item string) bool {
q.storage = append(q.storage, *r) q.storage = append(q.storage, *r)
q.count++ q.count++
q.Unlock() q.Unlock()
q.waitGroup.Add(1)
q.cond.Signal() q.cond.Signal()
return true return true
@ -51,7 +49,6 @@ func (q *Queue) Iter() <-chan string {
go func() { go func() {
for { for {
q.Lock() q.Lock()
if len(q.storage) == 0 { if len(q.storage) == 0 {
// Wait until next Add // Wait until next Add
q.cond.Wait() q.cond.Wait()
@ -59,14 +56,13 @@ func (q *Queue) Iter() <-chan string {
if len(q.storage) == 0 { if len(q.storage) == 0 {
// Queue is finished // Queue is finished
close(ch) close(ch)
q.Unlock()
return return
} }
} }
r := q.storage[0] r := q.storage[0]
q.storage = q.storage[1:] q.storage = q.storage[1:]
q.waitGroup.Done()
q.Unlock() q.Unlock()
ch <- r ch <- r