Streaming json action.
This commit is contained in:
parent
f24223dbda
commit
00f80c1ffe
90
cmd.go
90
cmd.go
|
@ -1,10 +1,11 @@
|
||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bufio"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
|
||||||
"os"
|
"os"
|
||||||
|
"os/signal"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/alexcesaro/log"
|
"github.com/alexcesaro/log"
|
||||||
|
@ -29,7 +30,7 @@ type Options struct {
|
||||||
Concurrency int `short:"c" long:"concurrency" description:"Maximum number of concurrent connections to open." default:"10"`
|
Concurrency int `short:"c" long:"concurrency" description:"Maximum number of concurrent connections to open." default:"10"`
|
||||||
UserAgent string `short:"A" long:"user-agent" description:"Client name to advertise while crawling. Should be in format of '/name:x.y.z/'." default:"/btc-crawl:0.1.1/"`
|
UserAgent string `short:"A" long:"user-agent" description:"Client name to advertise while crawling. Should be in format of '/name:x.y.z/'." default:"/btc-crawl:0.1.1/"`
|
||||||
PeerAge time.Duration `long:"peer-age" description:"Ignore discovered peers older than this." default:"24h"`
|
PeerAge time.Duration `long:"peer-age" description:"Ignore discovered peers older than this." default:"24h"`
|
||||||
StopAfter int `long:"stop-after" description:"Stop crawling after this many results." default:"-1"`
|
StopAfter int `long:"stop-after" description:"Stop crawling after this many results." default:"0"`
|
||||||
}
|
}
|
||||||
|
|
||||||
var logLevels = []log.Level{
|
var logLevels = []log.Level{
|
||||||
|
@ -65,26 +66,87 @@ func main() {
|
||||||
seedNodes = GetSeedsFromDNS(defaultDnsSeeds)
|
seedNodes = GetSeedsFromDNS(defaultDnsSeeds)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Create client and crawler
|
||||||
client := NewClient(options.UserAgent)
|
client := NewClient(options.UserAgent)
|
||||||
crawler := NewCrawler(client, seedNodes, options.PeerAge)
|
crawler := NewCrawler(client, seedNodes, options.PeerAge)
|
||||||
results := crawler.Run(options.Concurrency, options.StopAfter)
|
|
||||||
|
|
||||||
b, err := json.Marshal(results)
|
// Configure output
|
||||||
|
var w *bufio.Writer
|
||||||
|
if options.Output == "-" || options.Output == "" {
|
||||||
|
w = bufio.NewWriter(os.Stdout)
|
||||||
|
defer w.Flush()
|
||||||
|
} else {
|
||||||
|
fp, err := os.Create(options.Output)
|
||||||
|
if err != nil {
|
||||||
|
logger.Errorf("Failed to create file: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
w = bufio.NewWriter(fp)
|
||||||
|
defer w.Flush()
|
||||||
|
defer fp.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make the first write, make sure everything is cool
|
||||||
|
_, err = w.Write([]byte("["))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Errorf("Failed to export JSON: %v", err)
|
logger.Errorf("Failed to write result, aborting immediately: %v", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if options.Output == "-" {
|
isActive := true
|
||||||
os.Stdout.Write(b)
|
|
||||||
return
|
// Construct interrupt handler
|
||||||
|
sig := make(chan os.Signal, 1)
|
||||||
|
signal.Notify(sig, os.Interrupt)
|
||||||
|
go func() {
|
||||||
|
<-sig // Wait for ^C signal
|
||||||
|
logger.Warningf("Interrupt signal detected, shutting down gracefully by waiting for active workers to finish.")
|
||||||
|
crawler.Shutdown()
|
||||||
|
|
||||||
|
// FIXME: This isn't working?
|
||||||
|
<-sig // Hurry up?
|
||||||
|
logger.Warningf("Super-interrupt. Abandoning in-progress workers.")
|
||||||
|
isActive = false
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Launch crawler
|
||||||
|
resultChan := make(chan Result)
|
||||||
|
go crawler.Run(resultChan, options.Concurrency)
|
||||||
|
logger.Infof("Crawler started with %d concurrency limit.", options.Concurrency)
|
||||||
|
|
||||||
|
// Start processing results
|
||||||
|
count := 0
|
||||||
|
for result := range resultChan {
|
||||||
|
b, err := json.Marshal(result)
|
||||||
|
if err != nil {
|
||||||
|
logger.Warningf("Failed to export JSON, skipping: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if count > 0 {
|
||||||
|
b = append([]byte(","), b...)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = w.Write(b)
|
||||||
|
if err != nil {
|
||||||
|
logger.Errorf("Failed to write result, aborting gracefully: %v", err)
|
||||||
|
crawler.Shutdown()
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
count++
|
||||||
|
if options.StopAfter > 0 && count > options.StopAfter {
|
||||||
|
logger.Infof("StopAfter count reached, shutting down gracefully.")
|
||||||
|
crawler.Shutdown()
|
||||||
|
}
|
||||||
|
|
||||||
|
if !isActive {
|
||||||
|
// No time to wait, finish writing and quit.
|
||||||
|
break
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = ioutil.WriteFile(options.Output, b, 0644)
|
w.Write([]byte("]")) // No error checking here because it's too late to care.
|
||||||
if err != nil {
|
|
||||||
logger.Errorf("Failed to write to %s: %v", options.Output, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.Infof("Written %d results after %s: %s", len(*results), time.Now().Sub(now), options.Output)
|
logger.Infof("Written %d results after %s: %s", count, time.Now().Sub(now), options.Output)
|
||||||
}
|
}
|
||||||
|
|
115
crawler.go
115
crawler.go
|
@ -18,6 +18,7 @@ type Crawler struct {
|
||||||
numAttempted int
|
numAttempted int
|
||||||
seenFilter map[string]bool // TODO: Replace with bloom filter?
|
seenFilter map[string]bool // TODO: Replace with bloom filter?
|
||||||
peerAge time.Duration
|
peerAge time.Duration
|
||||||
|
shutdown chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
type Result struct {
|
type Result struct {
|
||||||
|
@ -46,6 +47,10 @@ func NewCrawler(client *Client, seeds []string, peerAge time.Duration) *Crawler
|
||||||
return &c
|
return &c
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *Crawler) Shutdown() {
|
||||||
|
c.shutdown <- struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
func (c *Crawler) handleAddress(address string) *Result {
|
func (c *Crawler) handleAddress(address string) *Result {
|
||||||
c.numAttempted++
|
c.numAttempted++
|
||||||
|
|
||||||
|
@ -124,103 +129,65 @@ func (c *Crawler) filter(address string) *string {
|
||||||
return &address
|
return &address
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
func (c *Crawler) process(r *Result) *Result {
|
||||||
func (c *Crawler) Run(resultChan chan<- Result, numWorkers int) {
|
timestampSince := time.Now().Add(-c.peerAge)
|
||||||
workChan := make(chan string, numWorkers)
|
|
||||||
queueChan := make(chan string)
|
|
||||||
tempResult := make(chan Result)
|
|
||||||
|
|
||||||
go func(queueChan <-chan string) {
|
for _, addr := range r.Peers {
|
||||||
// Single thread to safely manage the queue
|
if !addr.Timestamp.After(timestampSince) {
|
||||||
c.addAddress(<-queueChan)
|
continue
|
||||||
nextAddress, _ := c.popAddress()
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case address := <-queueChan:
|
|
||||||
// Enque address
|
|
||||||
c.addAddress(address)
|
|
||||||
case workChan <- nextAddress:
|
|
||||||
nextAddress, err := c.popAddress()
|
|
||||||
if err != nil {
|
|
||||||
// Block until we get more work
|
|
||||||
c.addAddress(<-queueChan)
|
|
||||||
nextAddress, _ = c.popAddress()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}(queueChan)
|
|
||||||
|
|
||||||
go func(tempResult <-chan Result, workChan chan<- string) {
|
c.queue.Input <- NetAddressKey(addr)
|
||||||
// Convert from result to queue.
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case r := <-tempResult:
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}(tempResult, workChan)
|
|
||||||
|
|
||||||
for address := range workChan {
|
|
||||||
// Spawn more workers as we get buffered work
|
|
||||||
go func() {
|
|
||||||
logger.Debugf("[%s] Worker started.", address)
|
|
||||||
tempResult <- *c.handleAddress(address)
|
|
||||||
}()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(r.Peers) > 0 {
|
||||||
|
logger.Infof("[%s] Returned %d peers. Total %d unique peers via %d connected (of %d attempted).", r.Node.Address, len(r.Peers), c.numUnique, c.numConnected, c.numAttempted)
|
||||||
|
return r
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
*/
|
|
||||||
|
|
||||||
func (c *Crawler) Run(numWorkers int, stopAfter int) *[]Result {
|
func (c *Crawler) Run(resultChan chan<- Result, numWorkers int) {
|
||||||
numActive := 0
|
|
||||||
|
|
||||||
resultChan := make(chan Result)
|
|
||||||
workerChan := make(chan struct{}, numWorkers)
|
workerChan := make(chan struct{}, numWorkers)
|
||||||
|
tempResult := make(chan Result)
|
||||||
|
numActive := 0
|
||||||
|
isActive := true
|
||||||
|
|
||||||
results := []Result{}
|
// This is the main "event loop".
|
||||||
|
// FIXME: Feels like there should be a better way to manage the number of
|
||||||
if stopAfter == 0 {
|
// concurrent workers without limiting slots with workerChan and without
|
||||||
// No stopping.
|
// using a numActive counter.
|
||||||
stopAfter = -1
|
|
||||||
}
|
|
||||||
|
|
||||||
// This is the main "event loop". Feels like there may be a better way to
|
|
||||||
// manage the number of concurrent workers but I can't think of it right now.
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case workerChan <- struct{}{}:
|
case workerChan <- struct{}{}:
|
||||||
|
if !isActive {
|
||||||
|
// Don't start any new workers, leave the slot filled.
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
numActive++
|
||||||
go func() {
|
go func() {
|
||||||
address := <-c.queue.Output
|
address := <-c.queue.Output
|
||||||
logger.Debugf("[%s] Worker started.", address)
|
logger.Debugf("[%s] Worker started.", address)
|
||||||
resultChan <- *c.handleAddress(address)
|
tempResult <- *c.handleAddress(address)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
case r := <-resultChan:
|
case r := <-tempResult:
|
||||||
timestampSince := time.Now().Add(-c.peerAge)
|
if c.process(&r) != nil {
|
||||||
|
resultChan <- r
|
||||||
for _, addr := range r.Peers {
|
|
||||||
if !addr.Timestamp.After(timestampSince) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
c.queue.Input <- NetAddressKey(addr)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
numActive--
|
numActive--
|
||||||
|
if (!isActive || c.queue.IsEmpty()) && numActive == 0 {
|
||||||
if len(r.Peers) > 0 {
|
|
||||||
stopAfter--
|
|
||||||
results = append(results, r)
|
|
||||||
|
|
||||||
logger.Infof("[%s] Returned %d peers. Total %d unique peers via %d connected (of %d attempted).", r.Node.Address, len(r.Peers), c.numUnique, c.numConnected, c.numAttempted)
|
|
||||||
}
|
|
||||||
|
|
||||||
if stopAfter == 0 || (c.queue.IsEmpty() && numActive == 0) {
|
|
||||||
logger.Infof("Done.")
|
logger.Infof("Done.")
|
||||||
return &results
|
close(resultChan)
|
||||||
}
|
}
|
||||||
|
|
||||||
<-workerChan
|
<-workerChan
|
||||||
|
|
||||||
|
case <-c.shutdown:
|
||||||
|
isActive = false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue