From 5c128f64bb328a3b3f49df334bd584303844c641 Mon Sep 17 00:00:00 2001 From: Andrey Petrov <andrey.petrov@shazow.net> Date: Thu, 15 May 2014 17:04:09 -0700 Subject: [PATCH] Properly block on first item. --- Makefile | 2 +- cmd.go | 22 ++++++++++++---------- crawler.go | 42 ++++++++++++++++++++++++------------------ peer.go | 11 +++++++---- queue/queue.go | 26 +++++++++++++++++++++++++- 5 files changed, 69 insertions(+), 34 deletions(-) diff --git a/Makefile b/Makefile index f0d95ae..51928b5 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -btc-crawl: *.go +btc-crawl: **.go go build . build: btc-crawl diff --git a/cmd.go b/cmd.go index 6c1ad56..be36524 100644 --- a/cmd.go +++ b/cmd.go @@ -6,7 +6,6 @@ import ( "fmt" "os" "os/signal" - "syscall" "time" "github.com/alexcesaro/log" @@ -25,13 +24,14 @@ var defaultDnsSeeds = []string{ } type Options struct { - Verbose []bool `short:"v" long:"verbose" description:"Show verbose logging."` - Output string `short:"o" long:"output" description:"File to write result to." default:"btc-crawl.json"` - Seed []string `short:"s" long:"seed" description:"Override which seeds to use." default-mask:"<bitcoin-core DNS seeds>"` - Concurrency int `short:"c" long:"concurrency" description:"Maximum number of concurrent connections to open." default:"10"` - UserAgent string `short:"A" long:"user-agent" description:"Client name to advertise while crawling. Should be in format of '/name:x.y.z/'." default:"/btc-crawl:0.1.1/"` - PeerAge time.Duration `long:"peer-age" description:"Ignore discovered peers older than this." default:"24h"` - StopAfter int `long:"stop-after" description:"Stop crawling after this many results." default:"0"` + Verbose []bool `short:"v" long:"verbose" description:"Show verbose logging."` + Output string `short:"o" long:"output" description:"File to write result to." default:"btc-crawl.json"` + Seed []string `short:"s" long:"seed" description:"Override which seeds to use." default-mask:"<bitcoin-core DNS seeds>"` + Concurrency int `short:"c" long:"concurrency" description:"Maximum number of concurrent connections to open." default:"10"` + ConnectTimeout time.Duration `short:"t" long:"connect-timeout" description:"Maximum time to wait to connect before giving up." default:"10s"` + UserAgent string `short:"A" long:"user-agent" description:"Client name to advertise while crawling. Should be in format of '/name:x.y.z/'." default:"/btc-crawl:0.1.1/"` + PeerAge time.Duration `long:"peer-age" description:"Ignore discovered peers older than this." default:"24h"` + StopAfter int `long:"stop-after" description:"Stop crawling after this many results." default:"0"` } var logLevels = []log.Level{ @@ -69,7 +69,9 @@ func main() { // Create client and crawler client := NewClient(options.UserAgent) - crawler := NewCrawler(client, seedNodes, options.PeerAge) + crawler := NewCrawler(client, seedNodes) + crawler.PeerAge = options.PeerAge + crawler.ConnectTimeout = options.ConnectTimeout // Configure output var w *bufio.Writer @@ -99,7 +101,7 @@ func main() { // Construct interrupt handler sig := make(chan os.Signal, 1) - signal.Notify(sig, os.Interrupt, syscall.SIGTERM) + signal.Notify(sig, os.Interrupt) go func() { <-sig // Wait for ^C signal logger.Warningf("Interrupt signal detected, shutting down gracefully by waiting for active workers to finish.") diff --git a/crawler.go b/crawler.go index 4215a77..6927c0c 100644 --- a/crawler.go +++ b/crawler.go @@ -10,15 +10,16 @@ import ( // TODO: Break Client/Peer/Crawler into separate modules. type Crawler struct { - client *Client - queue *queue.Queue - numSeen int - numUnique int - numConnected int - numAttempted int - seenFilter map[string]bool // TODO: Replace with bloom filter? - peerAge time.Duration - shutdown chan struct{} + client *Client + queue *queue.Queue + numSeen int + numUnique int + numConnected int + numAttempted int + seenFilter map[string]bool // TODO: Replace with bloom filter? + PeerAge time.Duration + ConnectTimeout time.Duration + shutdown chan struct{} } type Result struct { @@ -26,11 +27,10 @@ type Result struct { Peers []*btcwire.NetAddress } -func NewCrawler(client *Client, seeds []string, peerAge time.Duration) *Crawler { +func NewCrawler(client *Client, seeds []string) *Crawler { c := Crawler{ client: client, seenFilter: map[string]bool{}, - peerAge: peerAge, shutdown: make(chan struct{}, 1), } filter := func(address string) *string { @@ -57,6 +57,7 @@ func (c *Crawler) handleAddress(address string) *Result { client := c.client peer := NewPeer(client, address) + peer.ConnectTimeout = c.ConnectTimeout r := Result{Node: peer} err := peer.Connect() @@ -131,7 +132,7 @@ func (c *Crawler) filter(address string) *string { } func (c *Crawler) process(r *Result) *Result { - timestampSince := time.Now().Add(-c.peerAge) + timestampSince := time.Now().Add(-c.PeerAge) for _, addr := range r.Peers { if !addr.Timestamp.After(timestampSince) { @@ -155,6 +156,9 @@ func (c *Crawler) Run(resultChan chan<- Result, numWorkers int) { numActive := 0 isActive := true + // Block until we get the first item + c.queue.Wait() + // This is the main "event loop". // FIXME: Feels like there should be a better way to manage the number of // concurrent workers without limiting slots with workerChan and without @@ -165,6 +169,14 @@ func (c *Crawler) Run(resultChan chan<- Result, numWorkers int) { if !isActive { // Don't start any new workers, leave the slot filled. break + } else if c.queue.IsEmpty() { + <-workerChan + + if numActive == 0 { + logger.Infof("Done.") + close(resultChan) + return + } } numActive++ @@ -178,13 +190,7 @@ func (c *Crawler) Run(resultChan chan<- Result, numWorkers int) { if c.process(&r) != nil { resultChan <- r } - numActive-- - if (!isActive || c.queue.IsEmpty()) && numActive == 0 { - logger.Infof("Done.") - close(resultChan) - } - <-workerChan case <-c.shutdown: diff --git a/peer.go b/peer.go index 8243d28..9c200b7 100644 --- a/peer.go +++ b/peer.go @@ -3,6 +3,7 @@ package main import ( "fmt" "net" + "time" "github.com/conformal/btcwire" ) @@ -15,13 +16,15 @@ type Peer struct { Address string UserAgent string ProtocolVersion int32 + ConnectTimeout time.Duration // For the connect phase (can be overridden) } func NewPeer(client *Client, address string) *Peer { p := Peer{ - client: client, - pver: client.pver, - Address: address, + client: client, + pver: client.pver, + Address: address, + ConnectTimeout: time.Duration(20 * time.Second), } return &p } @@ -30,7 +33,7 @@ func (p *Peer) Connect() error { if p.conn != nil { return fmt.Errorf("Peer already connected, can't connect again.") } - conn, err := net.Dial("tcp", p.Address) + conn, err := net.DialTimeout("tcp", p.Address, p.ConnectTimeout) if err != nil { return err } diff --git a/queue/queue.go b/queue/queue.go index 5d829f7..3b7e827 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -61,5 +61,29 @@ func (q *Queue) next() string { } func (q *Queue) IsEmpty() bool { - return len(q.overflow) == 0 + // FIXME: This effectively cycles the order of the output buffer. Kinda sad. + + if len(q.overflow) > 0 { + return true + } + + select { + case r := <-q.Output: + go func() { + // Put it back + q.Output <- r + }() + return true + default: + return false + } +} + +func (q *Queue) Wait() { + // Wait until there is an Output. Useful for blocking until the queue is + // ramped up. + r := <-q.Output + go func() { + q.Output <- r + }() }