diff --git a/btc-crawl.go b/btc-crawl.go index 9b19468..8840c1e 100644 --- a/btc-crawl.go +++ b/btc-crawl.go @@ -1,8 +1,10 @@ -// TODO: Export to a reasonable format. // TODO: Namespace packages properly (outside of `main`) package main import ( + "encoding/json" + "fmt" + "io/ioutil" "os" "time" @@ -23,10 +25,12 @@ var defaultDnsSeeds = []string{ type Options struct { Verbose []bool `short:"v" long:"verbose" description:"Show verbose logging."` + Output string `short:"o" long:"output" description:"File to write result to." default:"btc-crawl.json"` Seed []string `short:"s" long:"seed" description:"Override which seeds to use." default-mask:""` Concurrency int `short:"c" long:"concurrency" description:"Maximum number of concurrent connections to open." default:"10"` UserAgent string `short:"A" long:"user-agent" description:"Client name to advertise while crawling. Should be in format of '/name:x.y.z/'." default:"/btc-crawl:0.1.1/"` PeerAge time.Duration `long:"peer-age" description:"Ignore discovered peers older than this." default:"24h"` + StopAfter int `long:"stop-after" description:"Stop crawling after this many results."` } var logLevels = []log.Level{ @@ -39,9 +43,11 @@ func main() { options := Options{} parser := flags.NewParser(&options, flags.Default) - _, err := parser.Parse() + p, err := parser.Parse() if err != nil { - // FIXME: Print on some specific errors? Seems Parse prints in most cases. + if p == nil { + fmt.Print(err) + } return } @@ -60,6 +66,25 @@ func main() { } client := NewClient(options.UserAgent) - crawler := NewCrawler(client, seedNodes, options.Concurrency, options.PeerAge) - crawler.Start() + crawler := NewCrawler(client, seedNodes, options.PeerAge) + results := crawler.Run(options.Concurrency, options.StopAfter) + + b, err := json.Marshal(results) + if err != nil { + logger.Errorf("Failed to export JSON: %v", err) + return + } + + if options.Output == "-" { + os.Stdout.Write(b) + return + } + + err = ioutil.WriteFile(options.Output, b, 0644) + if err != nil { + logger.Errorf("Failed to write to %s: %v", options.Output, err) + return + } + + logger.Infof("Written %d results: %s", len(*results), options.Output) } diff --git a/crawler.go b/crawler.go index b5157a3..3fe1141 100644 --- a/crawler.go +++ b/crawler.go @@ -11,19 +11,20 @@ type Crawler struct { client *Client count int seenFilter map[string]bool // TODO: Replace with bloom filter? - results chan []string - workers chan struct{} queue []string peerAge time.Duration } -func NewCrawler(client *Client, queue []string, numWorkers int, peerAge time.Duration) *Crawler { +type Result struct { + Node *Peer + Peers []*btcwire.NetAddress +} + +func NewCrawler(client *Client, queue []string, peerAge time.Duration) *Crawler { c := Crawler{ client: client, count: 0, seenFilter: map[string]bool{}, - results: make(chan []string), - workers: make(chan struct{}, numWorkers), queue: []string{}, peerAge: peerAge, } @@ -36,11 +37,10 @@ func NewCrawler(client *Client, queue []string, numWorkers int, peerAge time.Dur return &c } -func (c *Crawler) handleAddress(address string) *[]string { - r := []string{} - +func (c *Crawler) handleAddress(address string) *Result { client := c.client peer := NewPeer(client, address) + r := Result{Node: peer} err := peer.Connect() if err != nil { @@ -66,7 +66,6 @@ func (c *Crawler) handleAddress(address string) *[]string { firstReceived := -1 tolerateMessages := 3 otherMessages := []string{} - timestampSince := time.Now().Add(-c.peerAge) for { // We can't really tell when we're done receiving peers, so we stop either @@ -80,12 +79,7 @@ func (c *Crawler) handleAddress(address string) *[]string { switch tmsg := msg.(type) { case *btcwire.MsgAddr: - for _, addr := range tmsg.AddrList { - if addr.Timestamp.After(timestampSince) { - // TODO: Move this check to .Start()? - r = append(r, NetAddressKey(addr)) - } - } + r.Peers = append(r.Peers, tmsg.AddrList...) if firstReceived == -1 { firstReceived = len(tmsg.AddrList) @@ -96,7 +90,7 @@ func (c *Crawler) handleAddress(address string) *[]string { default: otherMessages = append(otherMessages, tmsg.Command()) if len(otherMessages) > tolerateMessages { - logger.Debugf("[%s] Giving up with %d results after tolerating messages: %v.", address, len(r), otherMessages) + logger.Debugf("[%s] Giving up with %d results after tolerating messages: %v.", address, len(r.Peers), otherMessages) return &r } } @@ -117,36 +111,46 @@ func (c *Crawler) addAddress(address string) bool { return true } -func (c *Crawler) Start() { - numWorkers := 0 +func (c *Crawler) Run(numWorkers int, stopAfter int) *[]Result { + numActive := 0 numGood := 0 + resultChan := make(chan Result) + workerChan := make(chan struct{}, numWorkers) + + results := []Result{} + // This is the main "event loop". Feels like there may be a better way to // manage the number of concurrent workers but I can't think of it right now. for { select { - case c.workers <- struct{}{}: + case workerChan <- struct{}{}: if len(c.queue) == 0 { // No work yet. - <-c.workers + <-workerChan continue } // Pop from the queue address := c.queue[0] c.queue = c.queue[1:] - numWorkers += 1 + numActive += 1 go func() { logger.Debugf("[%s] Worker started.", address) - results := *c.handleAddress(address) - c.results <- results + resultChan <- *c.handleAddress(address) }() - case r := <-c.results: + case r := <-resultChan: newAdded := 0 - for _, address := range r { - if c.addAddress(address) { + timestampSince := time.Now().Add(-c.peerAge) + + for _, addr := range r.Peers { + if !addr.Timestamp.After(timestampSince) { + continue + } + + if c.addAddress(NetAddressKey(addr)) { newAdded += 1 } } @@ -154,18 +158,21 @@ func (c *Crawler) Start() { if newAdded > 0 { numGood += 1 } - numWorkers -= 1 + numActive -= 1 - if len(r) > 0 { - logger.Infof("Added %d new peers of %d returned. Total %d known peers via %d connected.", newAdded, len(r), c.count, numGood) + if len(r.Peers) > 0 { + stopAfter-- + results = append(results, r) + + logger.Infof("Added %d new peers of %d returned. Total %d known peers via %d connected.", newAdded, len(r.Peers), c.count, numGood) } - if len(c.queue) == 0 && numWorkers == 0 { + if stopAfter == 0 || (len(c.queue) == 0 && numActive == 0) { logger.Infof("Done.") - return + return &results } - <-c.workers + <-workerChan } } } diff --git a/peer.go b/peer.go index f3c1f86..8243d28 100644 --- a/peer.go +++ b/peer.go @@ -8,16 +8,20 @@ import ( ) type Peer struct { - client *Client - address string - conn net.Conn - nonce uint64 // Nonce we're sending to the peer + client *Client + conn net.Conn + nonce uint64 // Nonce we're sending to the peer + pver uint32 // Negotiated ProtocolVersion + Address string + UserAgent string + ProtocolVersion int32 } func NewPeer(client *Client, address string) *Peer { p := Peer{ client: client, - address: address, + pver: client.pver, + Address: address, } return &p } @@ -26,7 +30,7 @@ func (p *Peer) Connect() error { if p.conn != nil { return fmt.Errorf("Peer already connected, can't connect again.") } - conn, err := net.Dial("tcp", p.address) + conn, err := net.Dial("tcp", p.Address) if err != nil { return err } @@ -37,7 +41,7 @@ func (p *Peer) Connect() error { func (p *Peer) Disconnect() { p.conn.Close() - logger.Debugf("[%s] Closed.", p.address) + logger.Debugf("[%s] Closed.", p.Address) } func (p *Peer) Handshake() error { @@ -45,7 +49,7 @@ func (p *Peer) Handshake() error { return fmt.Errorf("Peer is not connected, can't handshake.") } - logger.Debugf("[%s] Starting handshake.", p.address) + logger.Debugf("[%s] Starting handshake.", p.Address) nonce, err := btcwire.RandomUint64() if err != nil { @@ -53,16 +57,14 @@ func (p *Peer) Handshake() error { } p.nonce = nonce - pver, btcnet := p.client.pver, p.client.btcnet - msgVersion, err := btcwire.NewMsgVersionFromConn(p.conn, p.nonce, p.client.userAgent, 0) msgVersion.DisableRelayTx = true - if err := btcwire.WriteMessage(p.conn, msgVersion, pver, btcnet); err != nil { + if err := p.WriteMessage(msgVersion); err != nil { return err } // Read the response version. - msg, _, err := btcwire.ReadMessage(p.conn, pver, btcnet) + msg, _, err := p.ReadMessage() if err != nil { return err } @@ -70,17 +72,21 @@ func (p *Peer) Handshake() error { if !ok { return fmt.Errorf("Did not receive version message: %T", vmsg) } + + p.ProtocolVersion = vmsg.ProtocolVersion + p.UserAgent = vmsg.UserAgent + // Negotiate protocol version. - if uint32(vmsg.ProtocolVersion) < pver { - pver = uint32(vmsg.ProtocolVersion) + if uint32(vmsg.ProtocolVersion) < p.pver { + p.pver = uint32(vmsg.ProtocolVersion) } - logger.Debugf("[%s] -> Version: %s", p.address, vmsg.UserAgent) + logger.Debugf("[%s] -> Version: %s", p.Address, vmsg.UserAgent) // Normally we'd check if vmsg.Nonce == p.nonce but the crawler does not // accept external connections so we skip it. // Send verack. - if err := btcwire.WriteMessage(p.conn, btcwire.NewMsgVerAck(), pver, btcnet); err != nil { + if err := p.WriteMessage(btcwire.NewMsgVerAck()); err != nil { return err } @@ -88,9 +94,9 @@ func (p *Peer) Handshake() error { } func (p *Peer) WriteMessage(msg btcwire.Message) error { - return btcwire.WriteMessage(p.conn, msg, p.client.pver, p.client.btcnet) + return btcwire.WriteMessage(p.conn, msg, p.pver, p.client.btcnet) } func (p *Peer) ReadMessage() (btcwire.Message, []byte, error) { - return btcwire.ReadMessage(p.conn, p.client.pver, p.client.btcnet) + return btcwire.ReadMessage(p.conn, p.pver, p.client.btcnet) }