diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..f0d95ae --- /dev/null +++ b/Makefile @@ -0,0 +1,10 @@ +btc-crawl: *.go + go build . + +build: btc-crawl + +clean: + rm btc-crawl + +run: btc-crawl + ./btc-crawl -v diff --git a/cmd.go b/cmd.go index 9156342..19c4875 100644 --- a/cmd.go +++ b/cmd.go @@ -29,7 +29,7 @@ type Options struct { Concurrency int `short:"c" long:"concurrency" description:"Maximum number of concurrent connections to open." default:"10"` UserAgent string `short:"A" long:"user-agent" description:"Client name to advertise while crawling. Should be in format of '/name:x.y.z/'." default:"/btc-crawl:0.1.1/"` PeerAge time.Duration `long:"peer-age" description:"Ignore discovered peers older than this." default:"24h"` - StopAfter int `long:"stop-after" description:"Stop crawling after this many results." default:"0"` + StopAfter int `long:"stop-after" description:"Stop crawling after this many results." default:"-1"` } var logLevels = []log.Level{ diff --git a/crawler.go b/crawler.go index 7995826..115261c 100644 --- a/crawler.go +++ b/crawler.go @@ -3,16 +3,21 @@ package main import ( "time" + "./queue" + "github.com/conformal/btcwire" ) // TODO: Break Client/Peer/Crawler into separate modules. type Crawler struct { - client *Client - count int - seenFilter map[string]bool // TODO: Replace with bloom filter? - queue []string - peerAge time.Duration + client *Client + queue *queue.Queue + numSeen int + numUnique int + numConnected int + numAttempted int + seenFilter map[string]bool // TODO: Replace with bloom filter? + peerAge time.Duration } type Result struct { @@ -20,24 +25,30 @@ type Result struct { Peers []*btcwire.NetAddress } -func NewCrawler(client *Client, queue []string, peerAge time.Duration) *Crawler { +func NewCrawler(client *Client, seeds []string, peerAge time.Duration) *Crawler { c := Crawler{ client: client, - count: 0, seenFilter: map[string]bool{}, - queue: []string{}, peerAge: peerAge, } - - // Prefill the queue - for _, address := range queue { - c.addAddress(address) + filter := func(address string) *string { + return c.filter(address) } + c.queue = queue.NewQueue(filter, 10) + + go func() { + // Prefill the queue + for _, address := range seeds { + c.queue.Input <- address + } + }() return &c } func (c *Crawler) handleAddress(address string) *Result { + c.numAttempted++ + client := c.client peer := NewPeer(client, address) r := Result{Node: peer} @@ -62,6 +73,8 @@ func (c *Crawler) handleAddress(address string) *Result { return &r } + c.numConnected++ + // Listen for tx inv messages. firstReceived := -1 tolerateMessages := 3 @@ -97,23 +110,69 @@ func (c *Crawler) handleAddress(address string) *Result { } } -func (c *Crawler) addAddress(address string) bool { +func (c *Crawler) filter(address string) *string { // Returns true if not seen before, otherwise false + c.numSeen++ + state, ok := c.seenFilter[address] if ok == true && state == true { - return false + return nil } c.seenFilter[address] = true - c.count += 1 - c.queue = append(c.queue, address) - - return true + c.numUnique++ + return &address } +/* +func (c *Crawler) Run(resultChan chan<- Result, numWorkers int) { + workChan := make(chan string, numWorkers) + queueChan := make(chan string) + tempResult := make(chan Result) + + go func(queueChan <-chan string) { + // Single thread to safely manage the queue + c.addAddress(<-queueChan) + nextAddress, _ := c.popAddress() + + for { + select { + case address := <-queueChan: + // Enque address + c.addAddress(address) + case workChan <- nextAddress: + nextAddress, err := c.popAddress() + if err != nil { + // Block until we get more work + c.addAddress(<-queueChan) + nextAddress, _ = c.popAddress() + } + } + } + }(queueChan) + + go func(tempResult <-chan Result, workChan chan<- string) { + // Convert from result to queue. + for { + select { + case r := <-tempResult: + + } + } + }(tempResult, workChan) + + for address := range workChan { + // Spawn more workers as we get buffered work + go func() { + logger.Debugf("[%s] Worker started.", address) + tempResult <- *c.handleAddress(address) + }() + } +} +*/ + func (c *Crawler) Run(numWorkers int, stopAfter int) *[]Result { numActive := 0 - numGood := 0 resultChan := make(chan Result) workerChan := make(chan struct{}, numWorkers) @@ -130,24 +189,13 @@ func (c *Crawler) Run(numWorkers int, stopAfter int) *[]Result { for { select { case workerChan <- struct{}{}: - if len(c.queue) == 0 { - // No work yet. - <-workerChan - continue - } - - // Pop from the queue - address := c.queue[0] - c.queue = c.queue[1:] - numActive += 1 - go func() { + address := <-c.queue.Output logger.Debugf("[%s] Worker started.", address) resultChan <- *c.handleAddress(address) }() case r := <-resultChan: - newAdded := 0 timestampSince := time.Now().Add(-c.peerAge) for _, addr := range r.Peers { @@ -155,24 +203,19 @@ func (c *Crawler) Run(numWorkers int, stopAfter int) *[]Result { continue } - if c.addAddress(NetAddressKey(addr)) { - newAdded += 1 - } + c.queue.Input <- NetAddressKey(addr) } - if newAdded > 0 { - numGood += 1 - } - numActive -= 1 + numActive-- if len(r.Peers) > 0 { stopAfter-- results = append(results, r) - logger.Infof("Added %d new peers of %d returned. Total %d known peers via %d connected.", newAdded, len(r.Peers), c.count, numGood) + logger.Infof("[%s] Returned %d peers. Total %d unique peers via %d connected (of %d attempted).", r.Node.Address, len(r.Peers), c.numUnique, c.numConnected, c.numAttempted) } - if stopAfter == 0 || (len(c.queue) == 0 && numActive == 0) { + if stopAfter == 0 || (c.queue.IsEmpty() && numActive == 0) { logger.Infof("Done.") return &results } diff --git a/queue/queue.go b/queue/queue.go new file mode 100644 index 0000000..5d829f7 --- /dev/null +++ b/queue/queue.go @@ -0,0 +1,65 @@ +package queue + +// A single goroutine manages the overflow queue for thread-safety, funneling +// data between the Input and Output channels through a specified filter. +type Queue struct { + Input chan string + Output chan string + overflow []string + filter func(string) *string +} + +func NewQueue(filter func(string) *string, bufferSize int) *Queue { + q := Queue{ + Input: make(chan string, bufferSize), + Output: make(chan string, bufferSize), + overflow: []string{}, + filter: filter, + } + + go func(input <-chan string, output chan<- string) { + // Block until we have a next item + nextItem := q.next() + + for { + select { + case input := <-q.Input: + // New input + r := q.filter(input) + if r != nil { + // Store in the overflow + q.overflow = append(q.overflow, *r) + } + case output <- nextItem: + // Block until we have more inputs + nextItem = q.next() + } + } + }(q.Input, q.Output) + + return &q +} + +func (q *Queue) next() string { + // Block until a next item is available. + + if len(q.overflow) > 0 { + // Pop off the overflow queue. + r := q.overflow[0] + q.overflow = q.overflow[1:] + return r + } + + for { + // Block until we have a viable output + r := q.filter(<-q.Input) + + if r != nil { + return *r + } + } +} + +func (q *Queue) IsEmpty() bool { + return len(q.overflow) == 0 +}