From 00f80c1ffeff6e221edfa71d6921b956a44bd971 Mon Sep 17 00:00:00 2001 From: Andrey Petrov Date: Wed, 14 May 2014 15:47:07 -0700 Subject: [PATCH] Streaming json action. --- cmd.go | 90 ++++++++++++++++++++++++++++++++++------- crawler.go | 115 +++++++++++++++++++---------------------------------- 2 files changed, 117 insertions(+), 88 deletions(-) diff --git a/cmd.go b/cmd.go index 19c4875..13417d0 100644 --- a/cmd.go +++ b/cmd.go @@ -1,10 +1,11 @@ package main import ( + "bufio" "encoding/json" "fmt" - "io/ioutil" "os" + "os/signal" "time" "github.com/alexcesaro/log" @@ -29,7 +30,7 @@ type Options struct { Concurrency int `short:"c" long:"concurrency" description:"Maximum number of concurrent connections to open." default:"10"` UserAgent string `short:"A" long:"user-agent" description:"Client name to advertise while crawling. Should be in format of '/name:x.y.z/'." default:"/btc-crawl:0.1.1/"` PeerAge time.Duration `long:"peer-age" description:"Ignore discovered peers older than this." default:"24h"` - StopAfter int `long:"stop-after" description:"Stop crawling after this many results." default:"-1"` + StopAfter int `long:"stop-after" description:"Stop crawling after this many results." default:"0"` } var logLevels = []log.Level{ @@ -65,26 +66,87 @@ func main() { seedNodes = GetSeedsFromDNS(defaultDnsSeeds) } + // Create client and crawler client := NewClient(options.UserAgent) crawler := NewCrawler(client, seedNodes, options.PeerAge) - results := crawler.Run(options.Concurrency, options.StopAfter) - b, err := json.Marshal(results) + // Configure output + var w *bufio.Writer + if options.Output == "-" || options.Output == "" { + w = bufio.NewWriter(os.Stdout) + defer w.Flush() + } else { + fp, err := os.Create(options.Output) + if err != nil { + logger.Errorf("Failed to create file: %v", err) + return + } + + w = bufio.NewWriter(fp) + defer w.Flush() + defer fp.Close() + } + + // Make the first write, make sure everything is cool + _, err = w.Write([]byte("[")) if err != nil { - logger.Errorf("Failed to export JSON: %v", err) + logger.Errorf("Failed to write result, aborting immediately: %v", err) return } - if options.Output == "-" { - os.Stdout.Write(b) - return + isActive := true + + // Construct interrupt handler + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt) + go func() { + <-sig // Wait for ^C signal + logger.Warningf("Interrupt signal detected, shutting down gracefully by waiting for active workers to finish.") + crawler.Shutdown() + + // FIXME: This isn't working? + <-sig // Hurry up? + logger.Warningf("Super-interrupt. Abandoning in-progress workers.") + isActive = false + }() + + // Launch crawler + resultChan := make(chan Result) + go crawler.Run(resultChan, options.Concurrency) + logger.Infof("Crawler started with %d concurrency limit.", options.Concurrency) + + // Start processing results + count := 0 + for result := range resultChan { + b, err := json.Marshal(result) + if err != nil { + logger.Warningf("Failed to export JSON, skipping: %v", err) + } + + if count > 0 { + b = append([]byte(","), b...) + } + + _, err = w.Write(b) + if err != nil { + logger.Errorf("Failed to write result, aborting gracefully: %v", err) + crawler.Shutdown() + break + } + + count++ + if options.StopAfter > 0 && count > options.StopAfter { + logger.Infof("StopAfter count reached, shutting down gracefully.") + crawler.Shutdown() + } + + if !isActive { + // No time to wait, finish writing and quit. + break + } } - err = ioutil.WriteFile(options.Output, b, 0644) - if err != nil { - logger.Errorf("Failed to write to %s: %v", options.Output, err) - return - } + w.Write([]byte("]")) // No error checking here because it's too late to care. - logger.Infof("Written %d results after %s: %s", len(*results), time.Now().Sub(now), options.Output) + logger.Infof("Written %d results after %s: %s", count, time.Now().Sub(now), options.Output) } diff --git a/crawler.go b/crawler.go index 115261c..1380081 100644 --- a/crawler.go +++ b/crawler.go @@ -18,6 +18,7 @@ type Crawler struct { numAttempted int seenFilter map[string]bool // TODO: Replace with bloom filter? peerAge time.Duration + shutdown chan struct{} } type Result struct { @@ -46,6 +47,10 @@ func NewCrawler(client *Client, seeds []string, peerAge time.Duration) *Crawler return &c } +func (c *Crawler) Shutdown() { + c.shutdown <- struct{}{} +} + func (c *Crawler) handleAddress(address string) *Result { c.numAttempted++ @@ -124,103 +129,65 @@ func (c *Crawler) filter(address string) *string { return &address } -/* -func (c *Crawler) Run(resultChan chan<- Result, numWorkers int) { - workChan := make(chan string, numWorkers) - queueChan := make(chan string) - tempResult := make(chan Result) +func (c *Crawler) process(r *Result) *Result { + timestampSince := time.Now().Add(-c.peerAge) - go func(queueChan <-chan string) { - // Single thread to safely manage the queue - c.addAddress(<-queueChan) - nextAddress, _ := c.popAddress() - - for { - select { - case address := <-queueChan: - // Enque address - c.addAddress(address) - case workChan <- nextAddress: - nextAddress, err := c.popAddress() - if err != nil { - // Block until we get more work - c.addAddress(<-queueChan) - nextAddress, _ = c.popAddress() - } - } + for _, addr := range r.Peers { + if !addr.Timestamp.After(timestampSince) { + continue } - }(queueChan) - go func(tempResult <-chan Result, workChan chan<- string) { - // Convert from result to queue. - for { - select { - case r := <-tempResult: - - } - } - }(tempResult, workChan) - - for address := range workChan { - // Spawn more workers as we get buffered work - go func() { - logger.Debugf("[%s] Worker started.", address) - tempResult <- *c.handleAddress(address) - }() + c.queue.Input <- NetAddressKey(addr) } + + if len(r.Peers) > 0 { + logger.Infof("[%s] Returned %d peers. Total %d unique peers via %d connected (of %d attempted).", r.Node.Address, len(r.Peers), c.numUnique, c.numConnected, c.numAttempted) + return r + } + + return nil } -*/ -func (c *Crawler) Run(numWorkers int, stopAfter int) *[]Result { - numActive := 0 - - resultChan := make(chan Result) +func (c *Crawler) Run(resultChan chan<- Result, numWorkers int) { workerChan := make(chan struct{}, numWorkers) + tempResult := make(chan Result) + numActive := 0 + isActive := true - results := []Result{} - - if stopAfter == 0 { - // No stopping. - stopAfter = -1 - } - - // This is the main "event loop". Feels like there may be a better way to - // manage the number of concurrent workers but I can't think of it right now. + // This is the main "event loop". + // FIXME: Feels like there should be a better way to manage the number of + // concurrent workers without limiting slots with workerChan and without + // using a numActive counter. for { select { case workerChan <- struct{}{}: + if !isActive { + // Don't start any new workers, leave the slot filled. + break + } + + numActive++ go func() { address := <-c.queue.Output logger.Debugf("[%s] Worker started.", address) - resultChan <- *c.handleAddress(address) + tempResult <- *c.handleAddress(address) }() - case r := <-resultChan: - timestampSince := time.Now().Add(-c.peerAge) - - for _, addr := range r.Peers { - if !addr.Timestamp.After(timestampSince) { - continue - } - - c.queue.Input <- NetAddressKey(addr) + case r := <-tempResult: + if c.process(&r) != nil { + resultChan <- r } numActive-- - - if len(r.Peers) > 0 { - stopAfter-- - results = append(results, r) - - logger.Infof("[%s] Returned %d peers. Total %d unique peers via %d connected (of %d attempted).", r.Node.Address, len(r.Peers), c.numUnique, c.numConnected, c.numAttempted) - } - - if stopAfter == 0 || (c.queue.IsEmpty() && numActive == 0) { + if (!isActive || c.queue.IsEmpty()) && numActive == 0 { logger.Infof("Done.") - return &results + close(resultChan) } <-workerChan + + case <-c.shutdown: + isActive = false } } }