Output to json.

This commit is contained in:
Andrey Petrov 2014-04-25 16:24:27 -07:00
parent d0e4fa43a6
commit ffacb0a6c6
3 changed files with 93 additions and 55 deletions

View File

@ -1,8 +1,10 @@
// TODO: Export to a reasonable format.
// TODO: Namespace packages properly (outside of `main`) // TODO: Namespace packages properly (outside of `main`)
package main package main
import ( import (
"encoding/json"
"fmt"
"io/ioutil"
"os" "os"
"time" "time"
@ -23,10 +25,12 @@ var defaultDnsSeeds = []string{
type Options struct { type Options struct {
Verbose []bool `short:"v" long:"verbose" description:"Show verbose logging."` Verbose []bool `short:"v" long:"verbose" description:"Show verbose logging."`
Output string `short:"o" long:"output" description:"File to write result to." default:"btc-crawl.json"`
Seed []string `short:"s" long:"seed" description:"Override which seeds to use." default-mask:"<bitcoin-core DNS seeds>"` Seed []string `short:"s" long:"seed" description:"Override which seeds to use." default-mask:"<bitcoin-core DNS seeds>"`
Concurrency int `short:"c" long:"concurrency" description:"Maximum number of concurrent connections to open." default:"10"` Concurrency int `short:"c" long:"concurrency" description:"Maximum number of concurrent connections to open." default:"10"`
UserAgent string `short:"A" long:"user-agent" description:"Client name to advertise while crawling. Should be in format of '/name:x.y.z/'." default:"/btc-crawl:0.1.1/"` UserAgent string `short:"A" long:"user-agent" description:"Client name to advertise while crawling. Should be in format of '/name:x.y.z/'." default:"/btc-crawl:0.1.1/"`
PeerAge time.Duration `long:"peer-age" description:"Ignore discovered peers older than this." default:"24h"` PeerAge time.Duration `long:"peer-age" description:"Ignore discovered peers older than this." default:"24h"`
StopAfter int `long:"stop-after" description:"Stop crawling after this many results."`
} }
var logLevels = []log.Level{ var logLevels = []log.Level{
@ -39,9 +43,11 @@ func main() {
options := Options{} options := Options{}
parser := flags.NewParser(&options, flags.Default) parser := flags.NewParser(&options, flags.Default)
_, err := parser.Parse() p, err := parser.Parse()
if err != nil { if err != nil {
// FIXME: Print on some specific errors? Seems Parse prints in most cases. if p == nil {
fmt.Print(err)
}
return return
} }
@ -60,6 +66,25 @@ func main() {
} }
client := NewClient(options.UserAgent) client := NewClient(options.UserAgent)
crawler := NewCrawler(client, seedNodes, options.Concurrency, options.PeerAge) crawler := NewCrawler(client, seedNodes, options.PeerAge)
crawler.Start() results := crawler.Run(options.Concurrency, options.StopAfter)
b, err := json.Marshal(results)
if err != nil {
logger.Errorf("Failed to export JSON: %v", err)
return
}
if options.Output == "-" {
os.Stdout.Write(b)
return
}
err = ioutil.WriteFile(options.Output, b, 0644)
if err != nil {
logger.Errorf("Failed to write to %s: %v", options.Output, err)
return
}
logger.Infof("Written %d results: %s", len(*results), options.Output)
} }

View File

@ -11,19 +11,20 @@ type Crawler struct {
client *Client client *Client
count int count int
seenFilter map[string]bool // TODO: Replace with bloom filter? seenFilter map[string]bool // TODO: Replace with bloom filter?
results chan []string
workers chan struct{}
queue []string queue []string
peerAge time.Duration peerAge time.Duration
} }
func NewCrawler(client *Client, queue []string, numWorkers int, peerAge time.Duration) *Crawler { type Result struct {
Node *Peer
Peers []*btcwire.NetAddress
}
func NewCrawler(client *Client, queue []string, peerAge time.Duration) *Crawler {
c := Crawler{ c := Crawler{
client: client, client: client,
count: 0, count: 0,
seenFilter: map[string]bool{}, seenFilter: map[string]bool{},
results: make(chan []string),
workers: make(chan struct{}, numWorkers),
queue: []string{}, queue: []string{},
peerAge: peerAge, peerAge: peerAge,
} }
@ -36,11 +37,10 @@ func NewCrawler(client *Client, queue []string, numWorkers int, peerAge time.Dur
return &c return &c
} }
func (c *Crawler) handleAddress(address string) *[]string { func (c *Crawler) handleAddress(address string) *Result {
r := []string{}
client := c.client client := c.client
peer := NewPeer(client, address) peer := NewPeer(client, address)
r := Result{Node: peer}
err := peer.Connect() err := peer.Connect()
if err != nil { if err != nil {
@ -66,7 +66,6 @@ func (c *Crawler) handleAddress(address string) *[]string {
firstReceived := -1 firstReceived := -1
tolerateMessages := 3 tolerateMessages := 3
otherMessages := []string{} otherMessages := []string{}
timestampSince := time.Now().Add(-c.peerAge)
for { for {
// We can't really tell when we're done receiving peers, so we stop either // We can't really tell when we're done receiving peers, so we stop either
@ -80,12 +79,7 @@ func (c *Crawler) handleAddress(address string) *[]string {
switch tmsg := msg.(type) { switch tmsg := msg.(type) {
case *btcwire.MsgAddr: case *btcwire.MsgAddr:
for _, addr := range tmsg.AddrList { r.Peers = append(r.Peers, tmsg.AddrList...)
if addr.Timestamp.After(timestampSince) {
// TODO: Move this check to .Start()?
r = append(r, NetAddressKey(addr))
}
}
if firstReceived == -1 { if firstReceived == -1 {
firstReceived = len(tmsg.AddrList) firstReceived = len(tmsg.AddrList)
@ -96,7 +90,7 @@ func (c *Crawler) handleAddress(address string) *[]string {
default: default:
otherMessages = append(otherMessages, tmsg.Command()) otherMessages = append(otherMessages, tmsg.Command())
if len(otherMessages) > tolerateMessages { if len(otherMessages) > tolerateMessages {
logger.Debugf("[%s] Giving up with %d results after tolerating messages: %v.", address, len(r), otherMessages) logger.Debugf("[%s] Giving up with %d results after tolerating messages: %v.", address, len(r.Peers), otherMessages)
return &r return &r
} }
} }
@ -117,36 +111,46 @@ func (c *Crawler) addAddress(address string) bool {
return true return true
} }
func (c *Crawler) Start() { func (c *Crawler) Run(numWorkers int, stopAfter int) *[]Result {
numWorkers := 0 numActive := 0
numGood := 0 numGood := 0
resultChan := make(chan Result)
workerChan := make(chan struct{}, numWorkers)
results := []Result{}
// This is the main "event loop". Feels like there may be a better way to // This is the main "event loop". Feels like there may be a better way to
// manage the number of concurrent workers but I can't think of it right now. // manage the number of concurrent workers but I can't think of it right now.
for { for {
select { select {
case c.workers <- struct{}{}: case workerChan <- struct{}{}:
if len(c.queue) == 0 { if len(c.queue) == 0 {
// No work yet. // No work yet.
<-c.workers <-workerChan
continue continue
} }
// Pop from the queue // Pop from the queue
address := c.queue[0] address := c.queue[0]
c.queue = c.queue[1:] c.queue = c.queue[1:]
numWorkers += 1 numActive += 1
go func() { go func() {
logger.Debugf("[%s] Worker started.", address) logger.Debugf("[%s] Worker started.", address)
results := *c.handleAddress(address) resultChan <- *c.handleAddress(address)
c.results <- results
}() }()
case r := <-c.results: case r := <-resultChan:
newAdded := 0 newAdded := 0
for _, address := range r { timestampSince := time.Now().Add(-c.peerAge)
if c.addAddress(address) {
for _, addr := range r.Peers {
if !addr.Timestamp.After(timestampSince) {
continue
}
if c.addAddress(NetAddressKey(addr)) {
newAdded += 1 newAdded += 1
} }
} }
@ -154,18 +158,21 @@ func (c *Crawler) Start() {
if newAdded > 0 { if newAdded > 0 {
numGood += 1 numGood += 1
} }
numWorkers -= 1 numActive -= 1
if len(r) > 0 { if len(r.Peers) > 0 {
logger.Infof("Added %d new peers of %d returned. Total %d known peers via %d connected.", newAdded, len(r), c.count, numGood) stopAfter--
results = append(results, r)
logger.Infof("Added %d new peers of %d returned. Total %d known peers via %d connected.", newAdded, len(r.Peers), c.count, numGood)
} }
if len(c.queue) == 0 && numWorkers == 0 { if stopAfter == 0 || (len(c.queue) == 0 && numActive == 0) {
logger.Infof("Done.") logger.Infof("Done.")
return return &results
} }
<-c.workers <-workerChan
} }
} }
} }

42
peer.go
View File

@ -8,16 +8,20 @@ import (
) )
type Peer struct { type Peer struct {
client *Client client *Client
address string conn net.Conn
conn net.Conn nonce uint64 // Nonce we're sending to the peer
nonce uint64 // Nonce we're sending to the peer pver uint32 // Negotiated ProtocolVersion
Address string
UserAgent string
ProtocolVersion int32
} }
func NewPeer(client *Client, address string) *Peer { func NewPeer(client *Client, address string) *Peer {
p := Peer{ p := Peer{
client: client, client: client,
address: address, pver: client.pver,
Address: address,
} }
return &p return &p
} }
@ -26,7 +30,7 @@ func (p *Peer) Connect() error {
if p.conn != nil { if p.conn != nil {
return fmt.Errorf("Peer already connected, can't connect again.") return fmt.Errorf("Peer already connected, can't connect again.")
} }
conn, err := net.Dial("tcp", p.address) conn, err := net.Dial("tcp", p.Address)
if err != nil { if err != nil {
return err return err
} }
@ -37,7 +41,7 @@ func (p *Peer) Connect() error {
func (p *Peer) Disconnect() { func (p *Peer) Disconnect() {
p.conn.Close() p.conn.Close()
logger.Debugf("[%s] Closed.", p.address) logger.Debugf("[%s] Closed.", p.Address)
} }
func (p *Peer) Handshake() error { func (p *Peer) Handshake() error {
@ -45,7 +49,7 @@ func (p *Peer) Handshake() error {
return fmt.Errorf("Peer is not connected, can't handshake.") return fmt.Errorf("Peer is not connected, can't handshake.")
} }
logger.Debugf("[%s] Starting handshake.", p.address) logger.Debugf("[%s] Starting handshake.", p.Address)
nonce, err := btcwire.RandomUint64() nonce, err := btcwire.RandomUint64()
if err != nil { if err != nil {
@ -53,16 +57,14 @@ func (p *Peer) Handshake() error {
} }
p.nonce = nonce p.nonce = nonce
pver, btcnet := p.client.pver, p.client.btcnet
msgVersion, err := btcwire.NewMsgVersionFromConn(p.conn, p.nonce, p.client.userAgent, 0) msgVersion, err := btcwire.NewMsgVersionFromConn(p.conn, p.nonce, p.client.userAgent, 0)
msgVersion.DisableRelayTx = true msgVersion.DisableRelayTx = true
if err := btcwire.WriteMessage(p.conn, msgVersion, pver, btcnet); err != nil { if err := p.WriteMessage(msgVersion); err != nil {
return err return err
} }
// Read the response version. // Read the response version.
msg, _, err := btcwire.ReadMessage(p.conn, pver, btcnet) msg, _, err := p.ReadMessage()
if err != nil { if err != nil {
return err return err
} }
@ -70,17 +72,21 @@ func (p *Peer) Handshake() error {
if !ok { if !ok {
return fmt.Errorf("Did not receive version message: %T", vmsg) return fmt.Errorf("Did not receive version message: %T", vmsg)
} }
p.ProtocolVersion = vmsg.ProtocolVersion
p.UserAgent = vmsg.UserAgent
// Negotiate protocol version. // Negotiate protocol version.
if uint32(vmsg.ProtocolVersion) < pver { if uint32(vmsg.ProtocolVersion) < p.pver {
pver = uint32(vmsg.ProtocolVersion) p.pver = uint32(vmsg.ProtocolVersion)
} }
logger.Debugf("[%s] -> Version: %s", p.address, vmsg.UserAgent) logger.Debugf("[%s] -> Version: %s", p.Address, vmsg.UserAgent)
// Normally we'd check if vmsg.Nonce == p.nonce but the crawler does not // Normally we'd check if vmsg.Nonce == p.nonce but the crawler does not
// accept external connections so we skip it. // accept external connections so we skip it.
// Send verack. // Send verack.
if err := btcwire.WriteMessage(p.conn, btcwire.NewMsgVerAck(), pver, btcnet); err != nil { if err := p.WriteMessage(btcwire.NewMsgVerAck()); err != nil {
return err return err
} }
@ -88,9 +94,9 @@ func (p *Peer) Handshake() error {
} }
func (p *Peer) WriteMessage(msg btcwire.Message) error { func (p *Peer) WriteMessage(msg btcwire.Message) error {
return btcwire.WriteMessage(p.conn, msg, p.client.pver, p.client.btcnet) return btcwire.WriteMessage(p.conn, msg, p.pver, p.client.btcnet)
} }
func (p *Peer) ReadMessage() (btcwire.Message, []byte, error) { func (p *Peer) ReadMessage() (btcwire.Message, []byte, error) {
return btcwire.ReadMessage(p.conn, p.client.pver, p.client.btcnet) return btcwire.ReadMessage(p.conn, p.pver, p.client.btcnet)
} }