diff --git a/javascript/javascript_runtime.go b/javascript/javascript_runtime.go index ffc672a63a..16e22964f9 100644 --- a/javascript/javascript_runtime.go +++ b/javascript/javascript_runtime.go @@ -11,9 +11,9 @@ import ( "github.com/ethereum/eth-go/ethchain" "github.com/ethereum/eth-go/ethlog" "github.com/ethereum/eth-go/ethpipe" - "github.com/ethereum/eth-go/ethreact" "github.com/ethereum/eth-go/ethstate" "github.com/ethereum/eth-go/ethutil" + "github.com/ethereum/eth-go/event" "github.com/ethereum/go-ethereum/utils" "github.com/obscuren/otto" ) @@ -25,9 +25,8 @@ type JSRE struct { Vm *otto.Otto pipe *ethpipe.JSPipe - blockChan chan ethreact.Event - changeChan chan ethreact.Event - quitChan chan bool + events event.Subscription + quitChan chan bool objectCb map[string][]otto.Value } @@ -51,8 +50,7 @@ func NewJSRE(ethereum *eth.Ethereum) *JSRE { ethereum, otto.New(), ethpipe.NewJSPipe(ethereum), - make(chan ethreact.Event, 10), - make(chan ethreact.Event, 10), + nil, make(chan bool), make(map[string][]otto.Value), } @@ -68,8 +66,8 @@ func NewJSRE(ethereum *eth.Ethereum) *JSRE { go re.mainLoop() // Subscribe to events - reactor := ethereum.Reactor() - reactor.Subscribe("newBlock", re.blockChan) + mux := ethereum.EventMux() + re.events = mux.Subscribe(ethchain.NewBlockEvent{}) re.Bind("eth", &JSEthereum{re.pipe, re.Vm, ethereum}) @@ -105,25 +103,16 @@ func (self *JSRE) Require(file string) error { } func (self *JSRE) Stop() { + self.events.Unsubscribe() // Kill the main loop self.quitChan <- true - close(self.blockChan) close(self.quitChan) - close(self.changeChan) jsrelogger.Infoln("stopped") } func (self *JSRE) mainLoop() { -out: - for { - select { - case <-self.quitChan: - break out - case block := <-self.blockChan: - if _, ok := block.Resource.(*ethchain.Block); ok { - } - } + for _ = range self.events.Chan() { } } @@ -201,13 +190,13 @@ func (self *JSRE) watch(call otto.FunctionCall) otto.Value { if storageCallback { self.objectCb[addr+storageAddr] = append(self.objectCb[addr+storageAddr], cb) - event := "storage:" + string(ethutil.Hex2Bytes(addr)) + ":" + string(ethutil.Hex2Bytes(storageAddr)) - self.ethereum.Reactor().Subscribe(event, self.changeChan) + // event := "storage:" + string(ethutil.Hex2Bytes(addr)) + ":" + string(ethutil.Hex2Bytes(storageAddr)) + // self.ethereum.EventMux().Subscribe(event, self.changeChan) } else { self.objectCb[addr] = append(self.objectCb[addr], cb) - event := "object:" + string(ethutil.Hex2Bytes(addr)) - self.ethereum.Reactor().Subscribe(event, self.changeChan) + // event := "object:" + string(ethutil.Hex2Bytes(addr)) + // self.ethereum.EventMux().Subscribe(event, self.changeChan) } return otto.UndefinedValue() diff --git a/mist/ext_app.go b/mist/ext_app.go index 514084c974..36241e58c4 100644 --- a/mist/ext_app.go +++ b/mist/ext_app.go @@ -5,8 +5,8 @@ import ( "github.com/ethereum/eth-go/ethchain" "github.com/ethereum/eth-go/ethpipe" - "github.com/ethereum/eth-go/ethreact" "github.com/ethereum/eth-go/ethstate" + "github.com/ethereum/eth-go/event" "github.com/ethereum/go-ethereum/javascript" "gopkg.in/qml.v1" ) @@ -28,9 +28,7 @@ type ExtApplication struct { *ethpipe.JSPipe eth ethchain.EthManager - blockChan chan ethreact.Event - messageChan chan ethreact.Event - quitChan chan bool + events event.Subscription watcherQuitChan chan bool filters map[string]*ethchain.Filter @@ -40,19 +38,14 @@ type ExtApplication struct { } func NewExtApplication(container AppContainer, lib *UiLib) *ExtApplication { - app := &ExtApplication{ - ethpipe.NewJSPipe(lib.eth), - lib.eth, - make(chan ethreact.Event, 100), - make(chan ethreact.Event, 100), - make(chan bool), - make(chan bool), - make(map[string]*ethchain.Filter), - container, - lib, + return &ExtApplication{ + JSPipe: ethpipe.NewJSPipe(lib.eth), + eth: lib.eth, + watcherQuitChan: make(chan bool), + filters: make(map[string]*ethchain.Filter), + container: container, + lib: lib, } - - return app } func (app *ExtApplication) run() { @@ -67,14 +60,13 @@ func (app *ExtApplication) run() { return } + // Subscribe to events + mux := app.lib.eth.EventMux() + app.events = mux.Subscribe(ethchain.NewBlockEvent{}, ethstate.Messages(nil)) + // Call the main loop go app.mainLoop() - // Subscribe to events - reactor := app.lib.eth.Reactor() - reactor.Subscribe("newBlock", app.blockChan) - reactor.Subscribe("messages", app.messageChan) - app.container.NewWatcher(app.watcherQuitChan) win := app.container.Window() @@ -85,42 +77,29 @@ func (app *ExtApplication) run() { } func (app *ExtApplication) stop() { - // Clean up - reactor := app.lib.eth.Reactor() - reactor.Unsubscribe("newBlock", app.blockChan) + app.events.Unsubscribe() // Kill the main loop - app.quitChan <- true app.watcherQuitChan <- true - close(app.blockChan) - close(app.quitChan) - app.container.Destroy() } func (app *ExtApplication) mainLoop() { -out: - for { - select { - case <-app.quitChan: - break out - case block := <-app.blockChan: - if block, ok := block.Resource.(*ethchain.Block); ok { - app.container.NewBlock(block) - } - case msg := <-app.messageChan: - if messages, ok := msg.Resource.(ethstate.Messages); ok { - for id, filter := range app.filters { - msgs := filter.FilterMessages(messages) - if len(msgs) > 0 { - app.container.Messages(msgs, id) - } + for ev := range app.events.Chan() { + switch ev := ev.(type) { + case ethchain.NewBlockEvent: + app.container.NewBlock(ev.Block) + + case ethstate.Messages: + for id, filter := range app.filters { + msgs := filter.FilterMessages(ev) + if len(msgs) > 0 { + app.container.Messages(msgs, id) } } } } - } func (self *ExtApplication) Watch(filterOptions map[string]interface{}, identifier string) { diff --git a/mist/gui.go b/mist/gui.go index 80d4a1fc3f..84ee1ea00a 100644 --- a/mist/gui.go +++ b/mist/gui.go @@ -19,7 +19,6 @@ import ( "github.com/ethereum/eth-go/ethlog" "github.com/ethereum/eth-go/ethminer" "github.com/ethereum/eth-go/ethpipe" - "github.com/ethereum/eth-go/ethreact" "github.com/ethereum/eth-go/ethutil" "github.com/ethereum/eth-go/ethwire" "gopkg.in/qml.v1" @@ -376,15 +375,6 @@ func (gui *Gui) update() { gui.win.Root().Call("addPlugin", plugin.Path, "") } - var ( - blockChan = make(chan ethreact.Event, 100) - txChan = make(chan ethreact.Event, 100) - objectChan = make(chan ethreact.Event, 100) - peerChan = make(chan ethreact.Event, 100) - chainSyncChan = make(chan ethreact.Event, 100) - miningChan = make(chan ethreact.Event, 100) - ) - peerUpdateTicker := time.NewTicker(5 * time.Second) generalUpdateTicker := time.NewTicker(500 * time.Millisecond) statsUpdateTicker := time.NewTicker(5 * time.Second) @@ -397,61 +387,82 @@ func (gui *Gui) update() { lastBlockLabel := gui.getObjectByName("lastBlockLabel") miningLabel := gui.getObjectByName("miningLabel") + events := gui.eth.EventMux().Subscribe( + eth.ChainSyncEvent{}, + eth.PeerListEvent{}, + ethchain.NewBlockEvent{}, + ethchain.TxEvent{}, + ethminer.Event{}, + ) + + // nameReg := gui.pipe.World().Config().Get("NameReg") + // mux.Subscribe("object:"+string(nameReg.Address()), objectChan) + go func() { + defer events.Unsubscribe() for { select { - case b := <-blockChan: - block := b.Resource.(*ethchain.Block) - gui.processBlock(block, false) - if bytes.Compare(block.Coinbase, gui.address()) == 0 { - gui.setWalletValue(gui.eth.StateManager().CurrentState().GetAccount(gui.address()).Balance, nil) + case ev, isopen := <-events.Chan(): + if !isopen { + return } - case txMsg := <-txChan: - tx := txMsg.Resource.(*ethchain.Transaction) - - if txMsg.Name == "newTx:pre" { - object := state.GetAccount(gui.address()) - - if bytes.Compare(tx.Sender(), gui.address()) == 0 { - unconfirmedFunds.Sub(unconfirmedFunds, tx.Value) - } else if bytes.Compare(tx.Recipient, gui.address()) == 0 { - unconfirmedFunds.Add(unconfirmedFunds, tx.Value) + switch ev := ev.(type) { + case ethchain.NewBlockEvent: + gui.processBlock(ev.Block, false) + if bytes.Compare(ev.Block.Coinbase, gui.address()) == 0 { + gui.setWalletValue(gui.eth.StateManager().CurrentState().GetAccount(gui.address()).Balance, nil) } - gui.setWalletValue(object.Balance, unconfirmedFunds) + case ethchain.TxEvent: + tx := ev.Tx + if ev.Type == ethchain.TxPre { + object := state.GetAccount(gui.address()) - gui.insertTransaction("pre", tx) - } else { - object := state.GetAccount(gui.address()) - if bytes.Compare(tx.Sender(), gui.address()) == 0 { - object.SubAmount(tx.Value) + if bytes.Compare(tx.Sender(), gui.address()) == 0 { + unconfirmedFunds.Sub(unconfirmedFunds, tx.Value) + } else if bytes.Compare(tx.Recipient, gui.address()) == 0 { + unconfirmedFunds.Add(unconfirmedFunds, tx.Value) + } - //gui.getObjectByName("transactionView").Call("addTx", ethpipe.NewJSTx(tx), "send") - gui.txDb.Put(tx.Hash(), tx.RlpEncode()) - } else if bytes.Compare(tx.Recipient, gui.address()) == 0 { - object.AddAmount(tx.Value) + gui.setWalletValue(object.Balance, unconfirmedFunds) - //gui.getObjectByName("transactionView").Call("addTx", ethpipe.NewJSTx(tx), "recv") - gui.txDb.Put(tx.Hash(), tx.RlpEncode()) + gui.insertTransaction("pre", tx) + + } else if ev.Type == ethchain.TxPost { + object := state.GetAccount(gui.address()) + if bytes.Compare(tx.Sender(), gui.address()) == 0 { + object.SubAmount(tx.Value) + + //gui.getObjectByName("transactionView").Call("addTx", ethpipe.NewJSTx(tx), "send") + gui.txDb.Put(tx.Hash(), tx.RlpEncode()) + } else if bytes.Compare(tx.Recipient, gui.address()) == 0 { + object.AddAmount(tx.Value) + + //gui.getObjectByName("transactionView").Call("addTx", ethpipe.NewJSTx(tx), "recv") + gui.txDb.Put(tx.Hash(), tx.RlpEncode()) + } + + gui.setWalletValue(object.Balance, nil) + + state.UpdateStateObject(object) } - gui.setWalletValue(object.Balance, nil) + // case object: + // gui.loadAddressBook() - state.UpdateStateObject(object) + case eth.PeerListEvent: + gui.setPeerInfo() + + case ethminer.Event: + if ev.Type == ethminer.Started { + gui.miner = ev.Miner + } else { + gui.miner = nil + } } - case <-objectChan: - gui.loadAddressBook() - case <-peerChan: - gui.setPeerInfo() case <-peerUpdateTicker.C: gui.setPeerInfo() - case msg := <-miningChan: - if msg.Name == "miner:start" { - gui.miner = msg.Resource.(*ethminer.Miner) - } else { - gui.miner = nil - } case <-generalUpdateTicker.C: statusText := "#" + gui.eth.BlockChain().CurrentBlock.Number.String() lastBlockLabel.Set("text", statusText) @@ -478,20 +489,6 @@ func (gui *Gui) update() { } } }() - - reactor := gui.eth.Reactor() - - reactor.Subscribe("newBlock", blockChan) - reactor.Subscribe("newTx:pre", txChan) - reactor.Subscribe("newTx:post", txChan) - reactor.Subscribe("chainSync", chainSyncChan) - reactor.Subscribe("miner:start", miningChan) - reactor.Subscribe("miner:stop", miningChan) - - nameReg := gui.pipe.World().Config().Get("NameReg") - reactor.Subscribe("object:"+string(nameReg.Address()), objectChan) - - reactor.Subscribe("peerList", peerChan) } func (gui *Gui) setStatsPane() {