use ethreact.Event and ethreact.ReactorEngine
This commit is contained in:
parent
0ecc5c815e
commit
1735ec0362
|
@ -3,6 +3,7 @@ package ethchain
|
|||
import (
|
||||
"github.com/ethereum/eth-go/ethcrypto"
|
||||
"github.com/ethereum/eth-go/ethlog"
|
||||
"github.com/ethereum/eth-go/ethreact"
|
||||
"github.com/ethereum/eth-go/ethutil"
|
||||
"github.com/obscuren/sha3"
|
||||
"hash"
|
||||
|
@ -14,7 +15,7 @@ import (
|
|||
var powlogger = ethlog.NewLogger("POW")
|
||||
|
||||
type PoW interface {
|
||||
Search(block *Block, reactChan chan ethutil.React) []byte
|
||||
Search(block *Block, reactChan chan ethreact.Event) []byte
|
||||
Verify(hash []byte, diff *big.Int, nonce []byte) bool
|
||||
}
|
||||
|
||||
|
@ -22,7 +23,7 @@ type EasyPow struct {
|
|||
hash *big.Int
|
||||
}
|
||||
|
||||
func (pow *EasyPow) Search(block *Block, reactChan chan ethutil.React) []byte {
|
||||
func (pow *EasyPow) Search(block *Block, reactChan chan ethreact.Event) []byte {
|
||||
r := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
hash := block.HashNoNonce()
|
||||
diff := block.Difficulty
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"fmt"
|
||||
"github.com/ethereum/eth-go/ethcrypto"
|
||||
"github.com/ethereum/eth-go/ethlog"
|
||||
"github.com/ethereum/eth-go/ethreact"
|
||||
"github.com/ethereum/eth-go/ethtrie"
|
||||
"github.com/ethereum/eth-go/ethutil"
|
||||
"github.com/ethereum/eth-go/ethwire"
|
||||
|
@ -36,7 +37,7 @@ type EthManager interface {
|
|||
BlockChain() *BlockChain
|
||||
TxPool() *TxPool
|
||||
Broadcast(msgType ethwire.MsgType, data []interface{})
|
||||
Reactor() *ethutil.ReactorEngine
|
||||
Reactor() *ethreact.ReactorEngine
|
||||
PeerCount() int
|
||||
IsMining() bool
|
||||
IsListening() bool
|
||||
|
|
10
ethereum.go
10
ethereum.go
|
@ -6,6 +6,7 @@ import (
|
|||
"github.com/ethereum/eth-go/ethchain"
|
||||
"github.com/ethereum/eth-go/ethcrypto"
|
||||
"github.com/ethereum/eth-go/ethlog"
|
||||
"github.com/ethereum/eth-go/ethreact"
|
||||
"github.com/ethereum/eth-go/ethrpc"
|
||||
"github.com/ethereum/eth-go/ethutil"
|
||||
"github.com/ethereum/eth-go/ethwire"
|
||||
|
@ -73,7 +74,7 @@ type Ethereum struct {
|
|||
|
||||
listening bool
|
||||
|
||||
reactor *ethutil.ReactorEngine
|
||||
reactor *ethreact.ReactorEngine
|
||||
|
||||
RpcServer *ethrpc.JsonRpcServer
|
||||
|
||||
|
@ -108,7 +109,7 @@ func New(db ethutil.Database, clientIdentity ethwire.ClientIdentity, keyManager
|
|||
keyManager: keyManager,
|
||||
clientIdentity: clientIdentity,
|
||||
}
|
||||
ethereum.reactor = ethutil.NewReactorEngine()
|
||||
ethereum.reactor = ethreact.New()
|
||||
|
||||
ethereum.txPool = ethchain.NewTxPool(ethereum)
|
||||
ethereum.blockChain = ethchain.NewBlockChain(ethereum)
|
||||
|
@ -120,7 +121,7 @@ func New(db ethutil.Database, clientIdentity ethwire.ClientIdentity, keyManager
|
|||
return ethereum, nil
|
||||
}
|
||||
|
||||
func (s *Ethereum) Reactor() *ethutil.ReactorEngine {
|
||||
func (s *Ethereum) Reactor() *ethreact.ReactorEngine {
|
||||
return s.reactor
|
||||
}
|
||||
|
||||
|
@ -352,6 +353,7 @@ func (s *Ethereum) ReapDeadPeerHandler() {
|
|||
|
||||
// Start the ethereum
|
||||
func (s *Ethereum) Start(seed bool) {
|
||||
s.reactor.Start()
|
||||
// Bind to addr and port
|
||||
ln, err := net.Listen("tcp", ":"+s.Port)
|
||||
if err != nil {
|
||||
|
@ -462,6 +464,8 @@ func (s *Ethereum) Stop() {
|
|||
}
|
||||
s.txPool.Stop()
|
||||
s.stateManager.Stop()
|
||||
s.reactor.Flush()
|
||||
s.reactor.Stop()
|
||||
|
||||
ethlogger.Infoln("Server stopped")
|
||||
close(s.shutdownChan)
|
||||
|
|
|
@ -4,7 +4,7 @@ import (
|
|||
"bytes"
|
||||
"github.com/ethereum/eth-go/ethchain"
|
||||
"github.com/ethereum/eth-go/ethlog"
|
||||
"github.com/ethereum/eth-go/ethutil"
|
||||
"github.com/ethereum/eth-go/ethreact"
|
||||
"github.com/ethereum/eth-go/ethwire"
|
||||
"sort"
|
||||
)
|
||||
|
@ -15,19 +15,19 @@ type Miner struct {
|
|||
pow ethchain.PoW
|
||||
ethereum ethchain.EthManager
|
||||
coinbase []byte
|
||||
reactChan chan ethutil.React
|
||||
reactChan chan ethreact.Event
|
||||
txs ethchain.Transactions
|
||||
uncles []*ethchain.Block
|
||||
block *ethchain.Block
|
||||
powChan chan []byte
|
||||
powQuitChan chan ethutil.React
|
||||
powQuitChan chan ethreact.Event
|
||||
quitChan chan bool
|
||||
}
|
||||
|
||||
func NewDefaultMiner(coinbase []byte, ethereum ethchain.EthManager) Miner {
|
||||
reactChan := make(chan ethutil.React, 1) // This is the channel that receives 'updates' when ever a new transaction or block comes in
|
||||
reactChan := make(chan ethreact.Event, 1) // This is the channel that receives 'updates' when ever a new transaction or block comes in
|
||||
powChan := make(chan []byte, 1) // This is the channel that receives valid sha hases for a given block
|
||||
powQuitChan := make(chan ethutil.React, 1) // This is the channel that can exit the miner thread
|
||||
powQuitChan := make(chan ethreact.Event, 1) // This is the channel that can exit the miner thread
|
||||
quitChan := make(chan bool, 1)
|
||||
|
||||
ethereum.Reactor().Subscribe("newBlock", reactChan)
|
||||
|
|
|
@ -1,87 +0,0 @@
|
|||
package ethutil
|
||||
|
||||
import (
|
||||
"sync"
|
||||
)
|
||||
|
||||
type ReactorEvent struct {
|
||||
mut sync.Mutex
|
||||
event string
|
||||
chans []chan React
|
||||
}
|
||||
|
||||
// Post the specified reactor resource on the channels
|
||||
// currently subscribed
|
||||
func (e *ReactorEvent) Post(react React) {
|
||||
e.mut.Lock()
|
||||
defer e.mut.Unlock()
|
||||
|
||||
for _, ch := range e.chans {
|
||||
go func(ch chan React) {
|
||||
ch <- react
|
||||
}(ch)
|
||||
}
|
||||
}
|
||||
|
||||
// Add a subscriber to this event
|
||||
func (e *ReactorEvent) Add(ch chan React) {
|
||||
e.mut.Lock()
|
||||
defer e.mut.Unlock()
|
||||
|
||||
e.chans = append(e.chans, ch)
|
||||
}
|
||||
|
||||
// Remove a subscriber
|
||||
func (e *ReactorEvent) Remove(ch chan React) {
|
||||
e.mut.Lock()
|
||||
defer e.mut.Unlock()
|
||||
|
||||
for i, c := range e.chans {
|
||||
if c == ch {
|
||||
e.chans = append(e.chans[:i], e.chans[i+1:]...)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Basic reactor resource
|
||||
type React struct {
|
||||
Resource interface{}
|
||||
Event string
|
||||
}
|
||||
|
||||
// The reactor basic engine. Acts as bridge
|
||||
// between the events and the subscribers/posters
|
||||
type ReactorEngine struct {
|
||||
patterns map[string]*ReactorEvent
|
||||
}
|
||||
|
||||
func NewReactorEngine() *ReactorEngine {
|
||||
return &ReactorEngine{patterns: make(map[string]*ReactorEvent)}
|
||||
}
|
||||
|
||||
// Subscribe a channel to the specified event
|
||||
func (reactor *ReactorEngine) Subscribe(event string, ch chan React) {
|
||||
ev := reactor.patterns[event]
|
||||
// Create a new event if one isn't available
|
||||
if ev == nil {
|
||||
ev = &ReactorEvent{event: event}
|
||||
reactor.patterns[event] = ev
|
||||
}
|
||||
|
||||
// Add the channel to reactor event handler
|
||||
ev.Add(ch)
|
||||
}
|
||||
|
||||
func (reactor *ReactorEngine) Unsubscribe(event string, ch chan React) {
|
||||
ev := reactor.patterns[event]
|
||||
if ev != nil {
|
||||
ev.Remove(ch)
|
||||
}
|
||||
}
|
||||
|
||||
func (reactor *ReactorEngine) Post(event string, resource interface{}) {
|
||||
ev := reactor.patterns[event]
|
||||
if ev != nil {
|
||||
ev.Post(React{Resource: resource, Event: event})
|
||||
}
|
||||
}
|
|
@ -1,30 +0,0 @@
|
|||
package ethutil
|
||||
|
||||
import "testing"
|
||||
|
||||
func TestReactorAdd(t *testing.T) {
|
||||
engine := NewReactorEngine()
|
||||
ch := make(chan React)
|
||||
engine.Subscribe("test", ch)
|
||||
if len(engine.patterns) != 1 {
|
||||
t.Error("Expected patterns to be 1, got", len(engine.patterns))
|
||||
}
|
||||
}
|
||||
|
||||
func TestReactorEvent(t *testing.T) {
|
||||
engine := NewReactorEngine()
|
||||
|
||||
// Buffer 1, so it doesn't block for this test
|
||||
ch := make(chan React, 1)
|
||||
engine.Subscribe("test", ch)
|
||||
engine.Post("test", "hello")
|
||||
|
||||
value := <-ch
|
||||
if val, ok := value.Resource.(string); ok {
|
||||
if val != "hello" {
|
||||
t.Error("Expected Resource to be 'hello', got", val)
|
||||
}
|
||||
} else {
|
||||
t.Error("Unable to cast")
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue