New queue.

This commit is contained in:
Andrey Petrov 2014-05-10 18:48:33 -07:00
parent fc1642a06b
commit f24223dbda
4 changed files with 159 additions and 41 deletions

10
Makefile Normal file
View File

@ -0,0 +1,10 @@
btc-crawl: *.go
go build .
build: btc-crawl
clean:
rm btc-crawl
run: btc-crawl
./btc-crawl -v

2
cmd.go
View File

@ -29,7 +29,7 @@ type Options struct {
Concurrency int `short:"c" long:"concurrency" description:"Maximum number of concurrent connections to open." default:"10"`
UserAgent string `short:"A" long:"user-agent" description:"Client name to advertise while crawling. Should be in format of '/name:x.y.z/'." default:"/btc-crawl:0.1.1/"`
PeerAge time.Duration `long:"peer-age" description:"Ignore discovered peers older than this." default:"24h"`
StopAfter int `long:"stop-after" description:"Stop crawling after this many results." default:"0"`
StopAfter int `long:"stop-after" description:"Stop crawling after this many results." default:"-1"`
}
var logLevels = []log.Level{

View File

@ -3,16 +3,21 @@ package main
import (
"time"
"./queue"
"github.com/conformal/btcwire"
)
// TODO: Break Client/Peer/Crawler into separate modules.
type Crawler struct {
client *Client
count int
seenFilter map[string]bool // TODO: Replace with bloom filter?
queue []string
peerAge time.Duration
client *Client
queue *queue.Queue
numSeen int
numUnique int
numConnected int
numAttempted int
seenFilter map[string]bool // TODO: Replace with bloom filter?
peerAge time.Duration
}
type Result struct {
@ -20,24 +25,30 @@ type Result struct {
Peers []*btcwire.NetAddress
}
func NewCrawler(client *Client, queue []string, peerAge time.Duration) *Crawler {
func NewCrawler(client *Client, seeds []string, peerAge time.Duration) *Crawler {
c := Crawler{
client: client,
count: 0,
seenFilter: map[string]bool{},
queue: []string{},
peerAge: peerAge,
}
// Prefill the queue
for _, address := range queue {
c.addAddress(address)
filter := func(address string) *string {
return c.filter(address)
}
c.queue = queue.NewQueue(filter, 10)
go func() {
// Prefill the queue
for _, address := range seeds {
c.queue.Input <- address
}
}()
return &c
}
func (c *Crawler) handleAddress(address string) *Result {
c.numAttempted++
client := c.client
peer := NewPeer(client, address)
r := Result{Node: peer}
@ -62,6 +73,8 @@ func (c *Crawler) handleAddress(address string) *Result {
return &r
}
c.numConnected++
// Listen for tx inv messages.
firstReceived := -1
tolerateMessages := 3
@ -97,23 +110,69 @@ func (c *Crawler) handleAddress(address string) *Result {
}
}
func (c *Crawler) addAddress(address string) bool {
func (c *Crawler) filter(address string) *string {
// Returns true if not seen before, otherwise false
c.numSeen++
state, ok := c.seenFilter[address]
if ok == true && state == true {
return false
return nil
}
c.seenFilter[address] = true
c.count += 1
c.queue = append(c.queue, address)
return true
c.numUnique++
return &address
}
/*
func (c *Crawler) Run(resultChan chan<- Result, numWorkers int) {
workChan := make(chan string, numWorkers)
queueChan := make(chan string)
tempResult := make(chan Result)
go func(queueChan <-chan string) {
// Single thread to safely manage the queue
c.addAddress(<-queueChan)
nextAddress, _ := c.popAddress()
for {
select {
case address := <-queueChan:
// Enque address
c.addAddress(address)
case workChan <- nextAddress:
nextAddress, err := c.popAddress()
if err != nil {
// Block until we get more work
c.addAddress(<-queueChan)
nextAddress, _ = c.popAddress()
}
}
}
}(queueChan)
go func(tempResult <-chan Result, workChan chan<- string) {
// Convert from result to queue.
for {
select {
case r := <-tempResult:
}
}
}(tempResult, workChan)
for address := range workChan {
// Spawn more workers as we get buffered work
go func() {
logger.Debugf("[%s] Worker started.", address)
tempResult <- *c.handleAddress(address)
}()
}
}
*/
func (c *Crawler) Run(numWorkers int, stopAfter int) *[]Result {
numActive := 0
numGood := 0
resultChan := make(chan Result)
workerChan := make(chan struct{}, numWorkers)
@ -130,24 +189,13 @@ func (c *Crawler) Run(numWorkers int, stopAfter int) *[]Result {
for {
select {
case workerChan <- struct{}{}:
if len(c.queue) == 0 {
// No work yet.
<-workerChan
continue
}
// Pop from the queue
address := c.queue[0]
c.queue = c.queue[1:]
numActive += 1
go func() {
address := <-c.queue.Output
logger.Debugf("[%s] Worker started.", address)
resultChan <- *c.handleAddress(address)
}()
case r := <-resultChan:
newAdded := 0
timestampSince := time.Now().Add(-c.peerAge)
for _, addr := range r.Peers {
@ -155,24 +203,19 @@ func (c *Crawler) Run(numWorkers int, stopAfter int) *[]Result {
continue
}
if c.addAddress(NetAddressKey(addr)) {
newAdded += 1
}
c.queue.Input <- NetAddressKey(addr)
}
if newAdded > 0 {
numGood += 1
}
numActive -= 1
numActive--
if len(r.Peers) > 0 {
stopAfter--
results = append(results, r)
logger.Infof("Added %d new peers of %d returned. Total %d known peers via %d connected.", newAdded, len(r.Peers), c.count, numGood)
logger.Infof("[%s] Returned %d peers. Total %d unique peers via %d connected (of %d attempted).", r.Node.Address, len(r.Peers), c.numUnique, c.numConnected, c.numAttempted)
}
if stopAfter == 0 || (len(c.queue) == 0 && numActive == 0) {
if stopAfter == 0 || (c.queue.IsEmpty() && numActive == 0) {
logger.Infof("Done.")
return &results
}

65
queue/queue.go Normal file
View File

@ -0,0 +1,65 @@
package queue
// A single goroutine manages the overflow queue for thread-safety, funneling
// data between the Input and Output channels through a specified filter.
type Queue struct {
Input chan string
Output chan string
overflow []string
filter func(string) *string
}
func NewQueue(filter func(string) *string, bufferSize int) *Queue {
q := Queue{
Input: make(chan string, bufferSize),
Output: make(chan string, bufferSize),
overflow: []string{},
filter: filter,
}
go func(input <-chan string, output chan<- string) {
// Block until we have a next item
nextItem := q.next()
for {
select {
case input := <-q.Input:
// New input
r := q.filter(input)
if r != nil {
// Store in the overflow
q.overflow = append(q.overflow, *r)
}
case output <- nextItem:
// Block until we have more inputs
nextItem = q.next()
}
}
}(q.Input, q.Output)
return &q
}
func (q *Queue) next() string {
// Block until a next item is available.
if len(q.overflow) > 0 {
// Pop off the overflow queue.
r := q.overflow[0]
q.overflow = q.overflow[1:]
return r
}
for {
// Block until we have a viable output
r := q.filter(<-q.Input)
if r != nil {
return *r
}
}
}
func (q *Queue) IsEmpty() bool {
return len(q.overflow) == 0
}