Refactor into separate modules.

This commit is contained in:
Andrey Petrov 2014-04-22 16:28:10 -07:00
parent 85df8fcf35
commit 60ea0ebb76
4 changed files with 282 additions and 267 deletions

View File

@ -1,280 +1,14 @@
package main
import (
"fmt"
"github.com/conformal/btcwire"
"log"
"net"
"time"
)
// TODO: Unhardcode these:
var seedNodes []string = []string{"85.214.251.25:8333", "62.75.216.13:8333"}
var userAgent string = "/btc-crawl:0.0.1"
var lastBlock int32 = 0
// TODO: Break Client/Peer/Crawler into separate modules.
type Client struct {
btcnet btcwire.BitcoinNet // Bitcoin Network
pver uint32 // Protocl Version
userAgent string // User Agent
lastBlock int32
}
func NewDefaultClient() *Client {
return &Client{
btcnet: btcwire.MainNet,
pver: btcwire.ProtocolVersion,
userAgent: userAgent,
lastBlock: lastBlock,
}
}
type Peer struct {
client *Client
address string
conn net.Conn
nonce uint64 // Nonce we're sending to the peer
}
func NewPeer(client *Client, address string) *Peer {
p := Peer{
client: client,
address: address,
}
return &p
}
func (p *Peer) Connect() error {
if p.conn != nil {
return fmt.Errorf("Peer already connected, can't connect again.")
}
conn, err := net.Dial("tcp", p.address)
if err != nil {
return err
}
p.conn = conn
return nil
}
func (p *Peer) Disconnect() {
p.conn.Close()
}
func (p *Peer) Handshake() error {
if p.conn == nil {
return fmt.Errorf("Peer is not connected, can't handshake.")
}
log.Printf("[%s] Starting handshake.", p.address)
nonce, err := btcwire.RandomUint64()
if err != nil {
return err
}
p.nonce = nonce
pver, btcnet := p.client.pver, p.client.btcnet
msgVersion, err := btcwire.NewMsgVersionFromConn(p.conn, p.nonce, p.client.userAgent, 0)
msgVersion.DisableRelayTx = true
if err := btcwire.WriteMessage(p.conn, msgVersion, pver, btcnet); err != nil {
return err
}
// Read the response version.
msg, _, err := btcwire.ReadMessage(p.conn, pver, btcnet)
if err != nil {
return err
}
vmsg, ok := msg.(*btcwire.MsgVersion)
if !ok {
return fmt.Errorf("Did not receive version message: %T", vmsg)
}
// Negotiate protocol version.
if uint32(vmsg.ProtocolVersion) < pver {
pver = uint32(vmsg.ProtocolVersion)
}
log.Printf("[%s] -> Version: %s", p.address, vmsg.UserAgent)
// Normally we'd check if vmsg.Nonce == p.nonce but the crawler does not
// accept external connections so we skip it.
// Send verack.
if err := btcwire.WriteMessage(p.conn, btcwire.NewMsgVerAck(), pver, btcnet); err != nil {
return err
}
return nil
}
type Crawler struct {
client *Client
count int
seenFilter map[string]bool // TODO: Replace with bloom filter?
results chan []string
workers chan struct{}
queue []string
activeSince time.Duration
}
func NewCrawler(client *Client, queue []string, numWorkers int) *Crawler {
c := Crawler{
client: client,
count: 0,
seenFilter: map[string]bool{},
results: make(chan []string),
workers: make(chan struct{}, numWorkers),
queue: []string{},
activeSince: time.Hour * -24,
}
// Prefill the queue
for _, address := range queue {
c.addAddress(address)
}
return &c
}
func (c *Crawler) handleAddress(address string) *[]string {
r := []string{}
client := c.client
peer := NewPeer(client, address)
err := peer.Connect()
if err != nil {
log.Printf("[%s] Connection failed: %v", address, err)
return &r
}
defer peer.Disconnect()
err = peer.Handshake()
if err != nil {
log.Printf("[%s] Handsake failed: %v", address, err)
return &r
}
// Send getaddr.
if err := btcwire.WriteMessage(peer.conn, btcwire.NewMsgGetAddr(), client.pver, client.btcnet); err != nil {
log.Printf("[%s] GetAddr failed: %v", address, err)
return &r
}
// Listen for tx inv messages.
firstReceived := -1
tolerateMessages := 3
otherMessages := []string{}
timestampSince := time.Now().Add(c.activeSince)
for {
// We can't really tell when we're done receiving peers, so we stop either
// when we get a smaller-than-normal set size or when we've received too
// many unrelated messages.
msg, _, err := btcwire.ReadMessage(peer.conn, client.pver, client.btcnet)
if err != nil {
log.Printf("[%s] Failed to read message: %v", address, err)
continue
}
switch tmsg := msg.(type) {
case *btcwire.MsgAddr:
for _, addr := range tmsg.AddrList {
if addr.Timestamp.After(timestampSince) {
r = append(r, NetAddressKey(addr))
}
}
if firstReceived == -1 {
firstReceived = len(tmsg.AddrList)
} else if firstReceived > len(tmsg.AddrList) || firstReceived == 0 {
// Probably done.
return &r
}
default:
otherMessages = append(otherMessages, tmsg.Command())
if len(otherMessages) > tolerateMessages {
log.Printf("[%s] Giving up with %d results after tolerating messages: %v.", address, len(r), otherMessages)
return &r
}
}
}
}
func (c *Crawler) addAddress(address string) bool {
// Returns true if not seen before, otherwise false
state, ok := c.seenFilter[address]
if ok == true && state == true {
return false
}
c.seenFilter[address] = true
c.count += 1
c.queue = append(c.queue, address)
return true
}
func (c *Crawler) Start() (chan struct{}, error) {
done := make(chan struct{}, 1)
numWorkers := 0
numGood := 0
// This is the main "event loop". Feels like there may be a better way to
// manage the number of concurrent workers but I can't think of it right now.
for {
select {
case c.workers <- struct{}{}:
if len(c.queue) == 0 {
// No work yet.
<-c.workers
continue
}
// Pop from the queue
address := c.queue[0]
c.queue = c.queue[1:]
numWorkers += 1
go func() {
log.Printf("[%s] Worker started.", address)
results := *c.handleAddress(address)
c.results <- results
}()
case r := <-c.results:
newAdded := 0
for _, address := range r {
if c.addAddress(address) {
newAdded += 1
}
}
if newAdded > 0 {
numGood += 1
}
numWorkers -= 1
log.Printf("Added %d new peers of %d returned. Total %d known peers via %d connected.", newAdded, len(r), c.count, numGood)
if len(c.queue) == 0 && numWorkers == 0 {
log.Printf("Done.")
done <- struct{}{}
return done, nil
}
<-c.workers
}
}
}
func main() {
// TODO: Parse args.
// TODO: Export to a reasonable format.
// TODO: Use proper logger for logging.
var seedNodes []string = []string{"85.214.251.25:8333", "62.75.216.13:8333"}
client := NewDefaultClient()
crawler := NewCrawler(client, seedNodes, 10)

25
client.go Normal file
View File

@ -0,0 +1,25 @@
package main
import (
"github.com/conformal/btcwire"
)
// TODO: Unhardcode these:
var userAgent string = "/btc-crawl:0.0.1"
var lastBlock int32 = 0
type Client struct {
btcnet btcwire.BitcoinNet // Bitcoin Network
pver uint32 // Protocl Version
userAgent string // User Agent
lastBlock int32
}
func NewDefaultClient() *Client {
return &Client{
btcnet: btcwire.MainNet,
pver: btcwire.ProtocolVersion,
userAgent: userAgent,
lastBlock: lastBlock,
}
}

169
crawler.go Normal file
View File

@ -0,0 +1,169 @@
package main
import (
"github.com/conformal/btcwire"
"log"
"time"
)
// TODO: Break Client/Peer/Crawler into separate modules.
type Crawler struct {
client *Client
count int
seenFilter map[string]bool // TODO: Replace with bloom filter?
results chan []string
workers chan struct{}
queue []string
activeSince time.Duration
}
func NewCrawler(client *Client, queue []string, numWorkers int) *Crawler {
c := Crawler{
client: client,
count: 0,
seenFilter: map[string]bool{},
results: make(chan []string),
workers: make(chan struct{}, numWorkers),
queue: []string{},
activeSince: time.Hour * -24,
}
// Prefill the queue
for _, address := range queue {
c.addAddress(address)
}
return &c
}
func (c *Crawler) handleAddress(address string) *[]string {
r := []string{}
client := c.client
peer := NewPeer(client, address)
err := peer.Connect()
if err != nil {
log.Printf("[%s] Connection failed: %v", address, err)
return &r
}
defer peer.Disconnect()
err = peer.Handshake()
if err != nil {
log.Printf("[%s] Handsake failed: %v", address, err)
return &r
}
// Send getaddr.
if err := btcwire.WriteMessage(peer.conn, btcwire.NewMsgGetAddr(), client.pver, client.btcnet); err != nil {
log.Printf("[%s] GetAddr failed: %v", address, err)
return &r
}
// Listen for tx inv messages.
firstReceived := -1
tolerateMessages := 3
otherMessages := []string{}
timestampSince := time.Now().Add(c.activeSince)
for {
// We can't really tell when we're done receiving peers, so we stop either
// when we get a smaller-than-normal set size or when we've received too
// many unrelated messages.
msg, _, err := btcwire.ReadMessage(peer.conn, client.pver, client.btcnet)
if err != nil {
log.Printf("[%s] Failed to read message: %v", address, err)
continue
}
switch tmsg := msg.(type) {
case *btcwire.MsgAddr:
for _, addr := range tmsg.AddrList {
if addr.Timestamp.After(timestampSince) {
r = append(r, NetAddressKey(addr))
}
}
if firstReceived == -1 {
firstReceived = len(tmsg.AddrList)
} else if firstReceived > len(tmsg.AddrList) || firstReceived == 0 {
// Probably done.
return &r
}
default:
otherMessages = append(otherMessages, tmsg.Command())
if len(otherMessages) > tolerateMessages {
log.Printf("[%s] Giving up with %d results after tolerating messages: %v.", address, len(r), otherMessages)
return &r
}
}
}
}
func (c *Crawler) addAddress(address string) bool {
// Returns true if not seen before, otherwise false
state, ok := c.seenFilter[address]
if ok == true && state == true {
return false
}
c.seenFilter[address] = true
c.count += 1
c.queue = append(c.queue, address)
return true
}
func (c *Crawler) Start() (chan struct{}, error) {
done := make(chan struct{}, 1)
numWorkers := 0
numGood := 0
// This is the main "event loop". Feels like there may be a better way to
// manage the number of concurrent workers but I can't think of it right now.
for {
select {
case c.workers <- struct{}{}:
if len(c.queue) == 0 {
// No work yet.
<-c.workers
continue
}
// Pop from the queue
address := c.queue[0]
c.queue = c.queue[1:]
numWorkers += 1
go func() {
log.Printf("[%s] Worker started.", address)
results := *c.handleAddress(address)
c.results <- results
}()
case r := <-c.results:
newAdded := 0
for _, address := range r {
if c.addAddress(address) {
newAdded += 1
}
}
if newAdded > 0 {
numGood += 1
}
numWorkers -= 1
log.Printf("Added %d new peers of %d returned. Total %d known peers via %d connected.", newAdded, len(r), c.count, numGood)
if len(c.queue) == 0 && numWorkers == 0 {
log.Printf("Done.")
done <- struct{}{}
return done, nil
}
<-c.workers
}
}
}

87
peer.go Normal file
View File

@ -0,0 +1,87 @@
package main
import (
"fmt"
"github.com/conformal/btcwire"
"log"
"net"
)
type Peer struct {
client *Client
address string
conn net.Conn
nonce uint64 // Nonce we're sending to the peer
}
func NewPeer(client *Client, address string) *Peer {
p := Peer{
client: client,
address: address,
}
return &p
}
func (p *Peer) Connect() error {
if p.conn != nil {
return fmt.Errorf("Peer already connected, can't connect again.")
}
conn, err := net.Dial("tcp", p.address)
if err != nil {
return err
}
p.conn = conn
return nil
}
func (p *Peer) Disconnect() {
p.conn.Close()
}
func (p *Peer) Handshake() error {
if p.conn == nil {
return fmt.Errorf("Peer is not connected, can't handshake.")
}
log.Printf("[%s] Starting handshake.", p.address)
nonce, err := btcwire.RandomUint64()
if err != nil {
return err
}
p.nonce = nonce
pver, btcnet := p.client.pver, p.client.btcnet
msgVersion, err := btcwire.NewMsgVersionFromConn(p.conn, p.nonce, p.client.userAgent, 0)
msgVersion.DisableRelayTx = true
if err := btcwire.WriteMessage(p.conn, msgVersion, pver, btcnet); err != nil {
return err
}
// Read the response version.
msg, _, err := btcwire.ReadMessage(p.conn, pver, btcnet)
if err != nil {
return err
}
vmsg, ok := msg.(*btcwire.MsgVersion)
if !ok {
return fmt.Errorf("Did not receive version message: %T", vmsg)
}
// Negotiate protocol version.
if uint32(vmsg.ProtocolVersion) < pver {
pver = uint32(vmsg.ProtocolVersion)
}
log.Printf("[%s] -> Version: %s", p.address, vmsg.UserAgent)
// Normally we'd check if vmsg.Nonce == p.nonce but the crawler does not
// accept external connections so we skip it.
// Send verack.
if err := btcwire.WriteMessage(p.conn, btcwire.NewMsgVerAck(), pver, btcnet); err != nil {
return err
}
return nil
}