Properly block on first item.

This commit is contained in:
Andrey Petrov 2014-05-15 17:04:09 -07:00
parent 81e7762d1d
commit 5c128f64bb
5 changed files with 69 additions and 34 deletions

View File

@ -1,4 +1,4 @@
btc-crawl: *.go btc-crawl: **.go
go build . go build .
build: btc-crawl build: btc-crawl

22
cmd.go
View File

@ -6,7 +6,6 @@ import (
"fmt" "fmt"
"os" "os"
"os/signal" "os/signal"
"syscall"
"time" "time"
"github.com/alexcesaro/log" "github.com/alexcesaro/log"
@ -25,13 +24,14 @@ var defaultDnsSeeds = []string{
} }
type Options struct { type Options struct {
Verbose []bool `short:"v" long:"verbose" description:"Show verbose logging."` Verbose []bool `short:"v" long:"verbose" description:"Show verbose logging."`
Output string `short:"o" long:"output" description:"File to write result to." default:"btc-crawl.json"` Output string `short:"o" long:"output" description:"File to write result to." default:"btc-crawl.json"`
Seed []string `short:"s" long:"seed" description:"Override which seeds to use." default-mask:"<bitcoin-core DNS seeds>"` Seed []string `short:"s" long:"seed" description:"Override which seeds to use." default-mask:"<bitcoin-core DNS seeds>"`
Concurrency int `short:"c" long:"concurrency" description:"Maximum number of concurrent connections to open." default:"10"` Concurrency int `short:"c" long:"concurrency" description:"Maximum number of concurrent connections to open." default:"10"`
UserAgent string `short:"A" long:"user-agent" description:"Client name to advertise while crawling. Should be in format of '/name:x.y.z/'." default:"/btc-crawl:0.1.1/"` ConnectTimeout time.Duration `short:"t" long:"connect-timeout" description:"Maximum time to wait to connect before giving up." default:"10s"`
PeerAge time.Duration `long:"peer-age" description:"Ignore discovered peers older than this." default:"24h"` UserAgent string `short:"A" long:"user-agent" description:"Client name to advertise while crawling. Should be in format of '/name:x.y.z/'." default:"/btc-crawl:0.1.1/"`
StopAfter int `long:"stop-after" description:"Stop crawling after this many results." default:"0"` PeerAge time.Duration `long:"peer-age" description:"Ignore discovered peers older than this." default:"24h"`
StopAfter int `long:"stop-after" description:"Stop crawling after this many results." default:"0"`
} }
var logLevels = []log.Level{ var logLevels = []log.Level{
@ -69,7 +69,9 @@ func main() {
// Create client and crawler // Create client and crawler
client := NewClient(options.UserAgent) client := NewClient(options.UserAgent)
crawler := NewCrawler(client, seedNodes, options.PeerAge) crawler := NewCrawler(client, seedNodes)
crawler.PeerAge = options.PeerAge
crawler.ConnectTimeout = options.ConnectTimeout
// Configure output // Configure output
var w *bufio.Writer var w *bufio.Writer
@ -99,7 +101,7 @@ func main() {
// Construct interrupt handler // Construct interrupt handler
sig := make(chan os.Signal, 1) sig := make(chan os.Signal, 1)
signal.Notify(sig, os.Interrupt, syscall.SIGTERM) signal.Notify(sig, os.Interrupt)
go func() { go func() {
<-sig // Wait for ^C signal <-sig // Wait for ^C signal
logger.Warningf("Interrupt signal detected, shutting down gracefully by waiting for active workers to finish.") logger.Warningf("Interrupt signal detected, shutting down gracefully by waiting for active workers to finish.")

View File

@ -10,15 +10,16 @@ import (
// TODO: Break Client/Peer/Crawler into separate modules. // TODO: Break Client/Peer/Crawler into separate modules.
type Crawler struct { type Crawler struct {
client *Client client *Client
queue *queue.Queue queue *queue.Queue
numSeen int numSeen int
numUnique int numUnique int
numConnected int numConnected int
numAttempted int numAttempted int
seenFilter map[string]bool // TODO: Replace with bloom filter? seenFilter map[string]bool // TODO: Replace with bloom filter?
peerAge time.Duration PeerAge time.Duration
shutdown chan struct{} ConnectTimeout time.Duration
shutdown chan struct{}
} }
type Result struct { type Result struct {
@ -26,11 +27,10 @@ type Result struct {
Peers []*btcwire.NetAddress Peers []*btcwire.NetAddress
} }
func NewCrawler(client *Client, seeds []string, peerAge time.Duration) *Crawler { func NewCrawler(client *Client, seeds []string) *Crawler {
c := Crawler{ c := Crawler{
client: client, client: client,
seenFilter: map[string]bool{}, seenFilter: map[string]bool{},
peerAge: peerAge,
shutdown: make(chan struct{}, 1), shutdown: make(chan struct{}, 1),
} }
filter := func(address string) *string { filter := func(address string) *string {
@ -57,6 +57,7 @@ func (c *Crawler) handleAddress(address string) *Result {
client := c.client client := c.client
peer := NewPeer(client, address) peer := NewPeer(client, address)
peer.ConnectTimeout = c.ConnectTimeout
r := Result{Node: peer} r := Result{Node: peer}
err := peer.Connect() err := peer.Connect()
@ -131,7 +132,7 @@ func (c *Crawler) filter(address string) *string {
} }
func (c *Crawler) process(r *Result) *Result { func (c *Crawler) process(r *Result) *Result {
timestampSince := time.Now().Add(-c.peerAge) timestampSince := time.Now().Add(-c.PeerAge)
for _, addr := range r.Peers { for _, addr := range r.Peers {
if !addr.Timestamp.After(timestampSince) { if !addr.Timestamp.After(timestampSince) {
@ -155,6 +156,9 @@ func (c *Crawler) Run(resultChan chan<- Result, numWorkers int) {
numActive := 0 numActive := 0
isActive := true isActive := true
// Block until we get the first item
c.queue.Wait()
// This is the main "event loop". // This is the main "event loop".
// FIXME: Feels like there should be a better way to manage the number of // FIXME: Feels like there should be a better way to manage the number of
// concurrent workers without limiting slots with workerChan and without // concurrent workers without limiting slots with workerChan and without
@ -165,6 +169,14 @@ func (c *Crawler) Run(resultChan chan<- Result, numWorkers int) {
if !isActive { if !isActive {
// Don't start any new workers, leave the slot filled. // Don't start any new workers, leave the slot filled.
break break
} else if c.queue.IsEmpty() {
<-workerChan
if numActive == 0 {
logger.Infof("Done.")
close(resultChan)
return
}
} }
numActive++ numActive++
@ -178,13 +190,7 @@ func (c *Crawler) Run(resultChan chan<- Result, numWorkers int) {
if c.process(&r) != nil { if c.process(&r) != nil {
resultChan <- r resultChan <- r
} }
numActive-- numActive--
if (!isActive || c.queue.IsEmpty()) && numActive == 0 {
logger.Infof("Done.")
close(resultChan)
}
<-workerChan <-workerChan
case <-c.shutdown: case <-c.shutdown:

11
peer.go
View File

@ -3,6 +3,7 @@ package main
import ( import (
"fmt" "fmt"
"net" "net"
"time"
"github.com/conformal/btcwire" "github.com/conformal/btcwire"
) )
@ -15,13 +16,15 @@ type Peer struct {
Address string Address string
UserAgent string UserAgent string
ProtocolVersion int32 ProtocolVersion int32
ConnectTimeout time.Duration // For the connect phase (can be overridden)
} }
func NewPeer(client *Client, address string) *Peer { func NewPeer(client *Client, address string) *Peer {
p := Peer{ p := Peer{
client: client, client: client,
pver: client.pver, pver: client.pver,
Address: address, Address: address,
ConnectTimeout: time.Duration(20 * time.Second),
} }
return &p return &p
} }
@ -30,7 +33,7 @@ func (p *Peer) Connect() error {
if p.conn != nil { if p.conn != nil {
return fmt.Errorf("Peer already connected, can't connect again.") return fmt.Errorf("Peer already connected, can't connect again.")
} }
conn, err := net.Dial("tcp", p.Address) conn, err := net.DialTimeout("tcp", p.Address, p.ConnectTimeout)
if err != nil { if err != nil {
return err return err
} }

View File

@ -61,5 +61,29 @@ func (q *Queue) next() string {
} }
func (q *Queue) IsEmpty() bool { func (q *Queue) IsEmpty() bool {
return len(q.overflow) == 0 // FIXME: This effectively cycles the order of the output buffer. Kinda sad.
if len(q.overflow) > 0 {
return true
}
select {
case r := <-q.Output:
go func() {
// Put it back
q.Output <- r
}()
return true
default:
return false
}
}
func (q *Queue) Wait() {
// Wait until there is an Output. Useful for blocking until the queue is
// ramped up.
r := <-q.Output
go func() {
q.Output <- r
}()
} }