diff --git a/core/tx_pool.go b/core/tx_pool.go
index 58922f12fc..ca16c1ba3c 100644
--- a/core/tx_pool.go
+++ b/core/tx_pool.go
@@ -89,7 +89,7 @@ type TxPool struct {
gasLimit func() *big.Int // The current gas limit function callback
minGasPrice *big.Int
eventMux *event.TypeMux
- events event.Subscription
+ events *event.TypeMuxSubscription
localTx *txSet
signer types.Signer
mu sync.RWMutex
diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go
index e0ee2ff514..3adf8111ad 100644
--- a/eth/filters/filter_system.go
+++ b/eth/filters/filter_system.go
@@ -74,7 +74,7 @@ type subscription struct {
// subscription which match the subscription criteria.
type EventSystem struct {
mux *event.TypeMux
- sub event.Subscription
+ sub *event.TypeMuxSubscription
backend Backend
lightMode bool
lastHead *types.Header
@@ -277,7 +277,7 @@ func (es *EventSystem) SubscribePendingTxEvents(hashes chan common.Hash) *Subscr
type filterIndex map[Type]map[rpc.ID]*subscription
// broadcast event to filters that match criteria.
-func (es *EventSystem) broadcast(filters filterIndex, ev *event.Event) {
+func (es *EventSystem) broadcast(filters filterIndex, ev *event.TypeMuxEvent) {
if ev == nil {
return
}
diff --git a/eth/handler.go b/eth/handler.go
index e03c891493..0e7eed3520 100644
--- a/eth/handler.go
+++ b/eth/handler.go
@@ -78,8 +78,8 @@ type ProtocolManager struct {
SubProtocols []p2p.Protocol
eventMux *event.TypeMux
- txSub event.Subscription
- minedBlockSub event.Subscription
+ txSub *event.TypeMuxSubscription
+ minedBlockSub *event.TypeMuxSubscription
// channels for fetcher, syncer, txsyncLoop
newPeerCh chan *peer
diff --git a/event/event.go b/event/event.go
index fd0bcfbd48..d3e84f0f7f 100644
--- a/event/event.go
+++ b/event/event.go
@@ -14,7 +14,7 @@
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see .
-// Package event implements an event multiplexer.
+// Package event deals with subscriptions to real-time events.
package event
import (
@@ -25,33 +25,22 @@ import (
"time"
)
-// Event is a time-tagged notification pushed to subscribers.
-type Event struct {
+// TypeMuxEvent is a time-tagged notification pushed to subscribers.
+type TypeMuxEvent struct {
Time time.Time
Data interface{}
}
-// Subscription is implemented by event subscriptions.
-type Subscription interface {
- // Chan returns a channel that carries events.
- // Implementations should return the same channel
- // for any subsequent calls to Chan.
- Chan() <-chan *Event
-
- // Unsubscribe stops delivery of events to a subscription.
- // The event channel is closed.
- // Unsubscribe can be called more than once.
- Unsubscribe()
-}
-
// A TypeMux dispatches events to registered receivers. Receivers can be
// registered to handle events of certain type. Any operation
// called after mux is stopped will return ErrMuxClosed.
//
// The zero value is ready to use.
+//
+// Deprecated: use Feed
type TypeMux struct {
mutex sync.RWMutex
- subm map[reflect.Type][]*muxsub
+ subm map[reflect.Type][]*TypeMuxSubscription
stopped bool
}
@@ -61,7 +50,7 @@ var ErrMuxClosed = errors.New("event: mux closed")
// Subscribe creates a subscription for events of the given types. The
// subscription's channel is closed when it is unsubscribed
// or the mux is closed.
-func (mux *TypeMux) Subscribe(types ...interface{}) Subscription {
+func (mux *TypeMux) Subscribe(types ...interface{}) *TypeMuxSubscription {
sub := newsub(mux)
mux.mutex.Lock()
defer mux.mutex.Unlock()
@@ -72,7 +61,7 @@ func (mux *TypeMux) Subscribe(types ...interface{}) Subscription {
close(sub.postC)
} else {
if mux.subm == nil {
- mux.subm = make(map[reflect.Type][]*muxsub)
+ mux.subm = make(map[reflect.Type][]*TypeMuxSubscription)
}
for _, t := range types {
rtyp := reflect.TypeOf(t)
@@ -80,7 +69,7 @@ func (mux *TypeMux) Subscribe(types ...interface{}) Subscription {
if find(oldsubs, sub) != -1 {
panic(fmt.Sprintf("event: duplicate type %s in Subscribe", rtyp))
}
- subs := make([]*muxsub, len(oldsubs)+1)
+ subs := make([]*TypeMuxSubscription, len(oldsubs)+1)
copy(subs, oldsubs)
subs[len(oldsubs)] = sub
mux.subm[rtyp] = subs
@@ -92,7 +81,7 @@ func (mux *TypeMux) Subscribe(types ...interface{}) Subscription {
// Post sends an event to all receivers registered for the given type.
// It returns ErrMuxClosed if the mux has been stopped.
func (mux *TypeMux) Post(ev interface{}) error {
- event := &Event{
+ event := &TypeMuxEvent{
Time: time.Now(),
Data: ev,
}
@@ -125,7 +114,7 @@ func (mux *TypeMux) Stop() {
mux.mutex.Unlock()
}
-func (mux *TypeMux) del(s *muxsub) {
+func (mux *TypeMux) del(s *TypeMuxSubscription) {
mux.mutex.Lock()
for typ, subs := range mux.subm {
if pos := find(subs, s); pos >= 0 {
@@ -139,7 +128,7 @@ func (mux *TypeMux) del(s *muxsub) {
s.mux.mutex.Unlock()
}
-func find(slice []*muxsub, item *muxsub) int {
+func find(slice []*TypeMuxSubscription, item *TypeMuxSubscription) int {
for i, v := range slice {
if v == item {
return i
@@ -148,14 +137,15 @@ func find(slice []*muxsub, item *muxsub) int {
return -1
}
-func posdelete(slice []*muxsub, pos int) []*muxsub {
- news := make([]*muxsub, len(slice)-1)
+func posdelete(slice []*TypeMuxSubscription, pos int) []*TypeMuxSubscription {
+ news := make([]*TypeMuxSubscription, len(slice)-1)
copy(news[:pos], slice[:pos])
copy(news[pos:], slice[pos+1:])
return news
}
-type muxsub struct {
+// TypeMuxSubscription is a subscription established through TypeMux.
+type TypeMuxSubscription struct {
mux *TypeMux
created time.Time
closeMu sync.Mutex
@@ -166,13 +156,13 @@ type muxsub struct {
// postC can be set to nil without affecting the return value of
// Chan.
postMu sync.RWMutex
- readC <-chan *Event
- postC chan<- *Event
+ readC <-chan *TypeMuxEvent
+ postC chan<- *TypeMuxEvent
}
-func newsub(mux *TypeMux) *muxsub {
- c := make(chan *Event)
- return &muxsub{
+func newsub(mux *TypeMux) *TypeMuxSubscription {
+ c := make(chan *TypeMuxEvent)
+ return &TypeMuxSubscription{
mux: mux,
created: time.Now(),
readC: c,
@@ -181,16 +171,16 @@ func newsub(mux *TypeMux) *muxsub {
}
}
-func (s *muxsub) Chan() <-chan *Event {
+func (s *TypeMuxSubscription) Chan() <-chan *TypeMuxEvent {
return s.readC
}
-func (s *muxsub) Unsubscribe() {
+func (s *TypeMuxSubscription) Unsubscribe() {
s.mux.del(s)
s.closewait()
}
-func (s *muxsub) closewait() {
+func (s *TypeMuxSubscription) closewait() {
s.closeMu.Lock()
defer s.closeMu.Unlock()
if s.closed {
@@ -205,7 +195,7 @@ func (s *muxsub) closewait() {
s.postMu.Unlock()
}
-func (s *muxsub) deliver(event *Event) {
+func (s *TypeMuxSubscription) deliver(event *TypeMuxEvent) {
// Short circuit delivery if stale event
if s.created.After(event.Time) {
return
diff --git a/event/event_test.go b/event/event_test.go
index 2c56ecf29f..a12945a471 100644
--- a/event/event_test.go
+++ b/event/event_test.go
@@ -149,16 +149,34 @@ func emptySubscriber(mux *TypeMux, types ...interface{}) {
}()
}
-func BenchmarkPost3(b *testing.B) {
- var mux = new(TypeMux)
- defer mux.Stop()
- emptySubscriber(mux, testEvent(0))
- emptySubscriber(mux, testEvent(0))
- emptySubscriber(mux, testEvent(0))
+func BenchmarkPost1000(b *testing.B) {
+ var (
+ mux = new(TypeMux)
+ subscribed, done sync.WaitGroup
+ nsubs = 1000
+ )
+ subscribed.Add(nsubs)
+ done.Add(nsubs)
+ for i := 0; i < nsubs; i++ {
+ go func() {
+ s := mux.Subscribe(testEvent(0))
+ subscribed.Done()
+ for range s.Chan() {
+ }
+ done.Done()
+ }()
+ }
+ subscribed.Wait()
+ // The actual benchmark.
+ b.ResetTimer()
for i := 0; i < b.N; i++ {
mux.Post(testEvent(0))
}
+
+ b.StopTimer()
+ mux.Stop()
+ done.Wait()
}
func BenchmarkPostConcurrent(b *testing.B) {
diff --git a/event/example_feed_test.go b/event/example_feed_test.go
new file mode 100644
index 0000000000..63436b2269
--- /dev/null
+++ b/event/example_feed_test.go
@@ -0,0 +1,73 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see .
+
+package event_test
+
+import (
+ "fmt"
+
+ "github.com/ethereum/go-ethereum/event"
+)
+
+func ExampleFeed_acknowledgedEvents() {
+ // This example shows how the return value of Send can be used for request/reply
+ // interaction between event consumers and producers.
+ var feed event.Feed
+ type ackedEvent struct {
+ i int
+ ack chan<- struct{}
+ }
+
+ // Consumers wait for events on the feed and acknowledge processing.
+ done := make(chan struct{})
+ defer close(done)
+ for i := 0; i < 3; i++ {
+ ch := make(chan ackedEvent, 100)
+ sub := feed.Subscribe(ch)
+ go func() {
+ defer sub.Unsubscribe()
+ for {
+ select {
+ case ev := <-ch:
+ fmt.Println(ev.i) // "process" the event
+ ev.ack <- struct{}{}
+ case <-done:
+ return
+ }
+ }
+ }()
+ }
+
+ // The producer sends values of type ackedEvent with increasing values of i.
+ // It waits for all consumers to acknowledge before sending the next event.
+ for i := 0; i < 3; i++ {
+ acksignal := make(chan struct{})
+ n := feed.Send(ackedEvent{i, acksignal})
+ for ack := 0; ack < n; ack++ {
+ <-acksignal
+ }
+ }
+ // Output:
+ // 0
+ // 0
+ // 0
+ // 1
+ // 1
+ // 1
+ // 2
+ // 2
+ // 2
+}
diff --git a/event/example_scope_test.go b/event/example_scope_test.go
new file mode 100644
index 0000000000..c517a8324d
--- /dev/null
+++ b/event/example_scope_test.go
@@ -0,0 +1,128 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see .
+
+package event_test
+
+import (
+ "fmt"
+ "sync"
+
+ "github.com/ethereum/go-ethereum/event"
+)
+
+// This example demonstrates how SubscriptionScope can be used to control the lifetime of
+// subscriptions.
+//
+// Our example program consists of two servers, each of which performs a calculation when
+// requested. The servers also allow subscribing to results of all computations.
+type divServer struct{ results event.Feed }
+type mulServer struct{ results event.Feed }
+
+func (s *divServer) do(a, b int) int {
+ r := a / b
+ s.results.Send(r)
+ return r
+}
+
+func (s *mulServer) do(a, b int) int {
+ r := a * b
+ s.results.Send(r)
+ return r
+}
+
+// The servers are contained in an App. The app controls the servers and exposes them
+// through its API.
+type App struct {
+ divServer
+ mulServer
+ scope event.SubscriptionScope
+}
+
+func (s *App) Calc(op byte, a, b int) int {
+ switch op {
+ case '/':
+ return s.divServer.do(a, b)
+ case '*':
+ return s.mulServer.do(a, b)
+ default:
+ panic("invalid op")
+ }
+}
+
+// The app's SubscribeResults method starts sending calculation results to the given
+// channel. Subscriptions created through this method are tied to the lifetime of the App
+// because they are registered in the scope.
+func (s *App) SubscribeResults(op byte, ch chan<- int) event.Subscription {
+ switch op {
+ case '/':
+ return s.scope.Track(s.divServer.results.Subscribe(ch))
+ case '*':
+ return s.scope.Track(s.mulServer.results.Subscribe(ch))
+ default:
+ panic("invalid op")
+ }
+}
+
+// Stop stops the App, closing all subscriptions created through SubscribeResults.
+func (s *App) Stop() {
+ s.scope.Close()
+}
+
+func ExampleSubscriptionScope() {
+ // Create the app.
+ var (
+ app App
+ wg sync.WaitGroup
+ divs = make(chan int)
+ muls = make(chan int)
+ )
+
+ // Run a subscriber in the background.
+ divsub := app.SubscribeResults('/', divs)
+ mulsub := app.SubscribeResults('*', muls)
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ defer fmt.Println("subscriber exited")
+ defer divsub.Unsubscribe()
+ defer mulsub.Unsubscribe()
+ for {
+ select {
+ case result := <-divs:
+ fmt.Println("division happened:", result)
+ case result := <-muls:
+ fmt.Println("multiplication happened:", result)
+ case <-divsub.Err():
+ return
+ case <-mulsub.Err():
+ return
+ }
+ }
+ }()
+
+ // Interact with the app.
+ app.Calc('/', 22, 11)
+ app.Calc('*', 3, 4)
+
+ // Stop the app. This shuts down the subscriptions, causing the subscriber to exit.
+ app.Stop()
+ wg.Wait()
+
+ // Output:
+ // division happened: 2
+ // multiplication happened: 12
+ // subscriber exited
+}
diff --git a/event/example_subscription_test.go b/event/example_subscription_test.go
new file mode 100644
index 0000000000..de11266893
--- /dev/null
+++ b/event/example_subscription_test.go
@@ -0,0 +1,56 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see .
+
+package event_test
+
+import (
+ "fmt"
+
+ "github.com/ethereum/go-ethereum/event"
+)
+
+func ExampleNewSubscription() {
+ // Create a subscription that sends 10 integers on ch.
+ ch := make(chan int)
+ sub := event.NewSubscription(func(quit <-chan struct{}) error {
+ for i := 0; i < 10; i++ {
+ select {
+ case ch <- i:
+ case <-quit:
+ fmt.Println("unsubscribed")
+ return nil
+ }
+ }
+ return nil
+ })
+
+ // This is the consumer. It reads 5 integers, then aborts the subscription.
+ // Note that Unsubscribe waits until the producer has shut down.
+ for i := range ch {
+ fmt.Println(i)
+ if i == 4 {
+ sub.Unsubscribe()
+ break
+ }
+ }
+ // Output:
+ // 0
+ // 1
+ // 2
+ // 3
+ // 4
+ // unsubscribed
+}
diff --git a/event/feed.go b/event/feed.go
new file mode 100644
index 0000000000..4568304df5
--- /dev/null
+++ b/event/feed.go
@@ -0,0 +1,249 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see .
+
+package event
+
+import (
+ "errors"
+ "reflect"
+ "sync"
+)
+
+var errBadChannel = errors.New("event: Subscribe argument does not have sendable channel type")
+
+// Feed implements one-to-many subscriptions where the carrier of events is a channel.
+// Values sent to a Feed are delivered to all subscribed channels simultaneously.
+//
+// Feeds can only be used with a single type. The type is determined by the first Send or
+// Subscribe operation. Subsequent calls to these methods panic if the type does not
+// match.
+//
+// The zero value is ready to use.
+type Feed struct {
+ // sendLock has a one-element buffer and is empty when held.
+ // It protects sendCases.
+ sendLock chan struct{}
+ removeSub chan interface{} // interrupts Send
+ sendCases caseList // the active set of select cases used by Send
+
+ // The inbox holds newly subscribed channels until they are added to sendCases.
+ mu sync.Mutex
+ inbox caseList
+ etype reflect.Type
+ closed bool
+}
+
+// This is the index of the first actual subscription channel in sendCases.
+// sendCases[0] is a SelectRecv case for the removeSub channel.
+const firstSubSendCase = 1
+
+type feedTypeError struct {
+ got, want reflect.Type
+ op string
+}
+
+func (e feedTypeError) Error() string {
+ return "event: wrong type in " + e.op + " got " + e.got.String() + ", want " + e.want.String()
+}
+
+func (f *Feed) init() {
+ if f.sendLock != nil {
+ return
+ }
+ f.removeSub = make(chan interface{})
+ f.sendLock = make(chan struct{}, 1)
+ f.sendLock <- struct{}{}
+ f.sendCases = caseList{{Chan: reflect.ValueOf(f.removeSub), Dir: reflect.SelectRecv}}
+}
+
+// Subscribe adds a channel to the feed. Future sends will be delivered on the channel
+// until the subscription is canceled. All channels added must have the same element type.
+//
+// The channel should have ample buffer space to avoid blocking other subscribers.
+// Slow subscribers are not dropped.
+func (f *Feed) Subscribe(channel interface{}) Subscription {
+ chanval := reflect.ValueOf(channel)
+ chantyp := chanval.Type()
+ if chantyp.Kind() != reflect.Chan || chantyp.ChanDir()&reflect.SendDir == 0 {
+ panic(errBadChannel)
+ }
+ sub := &feedSub{feed: f, channel: chanval, err: make(chan error, 1)}
+
+ f.mu.Lock()
+ defer f.mu.Unlock()
+ f.init()
+ if !f.typecheck(chantyp.Elem()) {
+ panic(feedTypeError{op: "Subscribe", got: chantyp, want: reflect.ChanOf(reflect.SendDir, f.etype)})
+ }
+ // Add the select case to the inbox.
+ // The next Send will add it to f.sendCases.
+ cas := reflect.SelectCase{Dir: reflect.SelectSend, Chan: chanval}
+ f.inbox = append(f.inbox, cas)
+ return sub
+}
+
+// note: callers must hold f.mu
+func (f *Feed) typecheck(typ reflect.Type) bool {
+ if f.etype == nil {
+ f.etype = typ
+ return true
+ }
+ return f.etype == typ
+}
+
+func (f *Feed) remove(sub *feedSub) {
+ // Delete from inbox first, which covers channels
+ // that have not been added to f.sendCases yet.
+ ch := sub.channel.Interface()
+ f.mu.Lock()
+ index := f.inbox.find(ch)
+ if index != -1 {
+ f.inbox = f.inbox.delete(index)
+ f.mu.Unlock()
+ return
+ }
+ f.mu.Unlock()
+
+ select {
+ case f.removeSub <- ch:
+ // Send will remove the channel from f.sendCases.
+ case <-f.sendLock:
+ // No Send is in progress, delete the channel now that we have the send lock.
+ f.sendCases = f.sendCases.delete(f.sendCases.find(ch))
+ f.sendLock <- struct{}{}
+ }
+}
+
+// Send delivers to all subscribed channels simultaneously.
+// It returns the number of subscribers that the value was sent to.
+func (f *Feed) Send(value interface{}) (nsent int) {
+ f.mu.Lock()
+ f.init()
+ f.mu.Unlock()
+
+ <-f.sendLock
+
+ // Add new cases from the inbox after taking the send lock.
+ f.mu.Lock()
+ f.sendCases = append(f.sendCases, f.inbox...)
+ f.inbox = nil
+ f.mu.Unlock()
+
+ // Set the sent value on all channels.
+ rvalue := reflect.ValueOf(value)
+ if !f.typecheck(rvalue.Type()) {
+ f.sendLock <- struct{}{}
+ panic(feedTypeError{op: "Send", got: rvalue.Type(), want: f.etype})
+ }
+ for i := firstSubSendCase; i < len(f.sendCases); i++ {
+ f.sendCases[i].Send = rvalue
+ }
+
+ // Send until all channels except removeSub have been chosen.
+ cases := f.sendCases
+ for {
+ // Fast path: try sending without blocking before adding to the select set.
+ // This should usually succeed if subscribers are fast enough and have free
+ // buffer space.
+ for i := firstSubSendCase; i < len(cases); i++ {
+ if cases[i].Chan.TrySend(rvalue) {
+ nsent++
+ cases = cases.deactivate(i)
+ i--
+ }
+ }
+ if len(cases) == firstSubSendCase {
+ break
+ }
+ // Select on all the receivers, waiting for them to unblock.
+ chosen, recv, _ := reflect.Select(cases)
+ if chosen == 0 /* <-f.removeSub */ {
+ index := f.sendCases.find(recv.Interface())
+ f.sendCases = f.sendCases.delete(index)
+ if index >= 0 && index < len(cases) {
+ cases = f.sendCases[:len(cases)-1]
+ }
+ } else {
+ cases = cases.deactivate(chosen)
+ nsent++
+ }
+ }
+
+ // Forget about the sent value and hand off the send lock.
+ for i := firstSubSendCase; i < len(f.sendCases); i++ {
+ f.sendCases[i].Send = reflect.Value{}
+ }
+ f.sendLock <- struct{}{}
+ return nsent
+}
+
+type feedSub struct {
+ feed *Feed
+ channel reflect.Value
+ errOnce sync.Once
+ err chan error
+}
+
+func (sub *feedSub) Unsubscribe() {
+ sub.errOnce.Do(func() {
+ sub.feed.remove(sub)
+ close(sub.err)
+ })
+}
+
+func (sub *feedSub) Err() <-chan error {
+ return sub.err
+}
+
+type caseList []reflect.SelectCase
+
+// find returns the index of a case containing the given channel.
+func (cs caseList) find(channel interface{}) int {
+ for i, cas := range cs {
+ if cas.Chan.Interface() == channel {
+ return i
+ }
+ }
+ return -1
+}
+
+// delete removes the given case from cs.
+func (cs caseList) delete(index int) caseList {
+ return append(cs[:index], cs[index+1:]...)
+}
+
+// deactivate moves the case at index into the non-accessible portion of the cs slice.
+func (cs caseList) deactivate(index int) caseList {
+ last := len(cs) - 1
+ cs[index], cs[last] = cs[last], cs[index]
+ return cs[:last]
+}
+
+// func (cs caseList) String() string {
+// s := "["
+// for i, cas := range cs {
+// if i != 0 {
+// s += ", "
+// }
+// switch cas.Dir {
+// case reflect.SelectSend:
+// s += fmt.Sprintf("%v<-", cas.Chan.Interface())
+// case reflect.SelectRecv:
+// s += fmt.Sprintf("<-%v", cas.Chan.Interface())
+// }
+// }
+// return s + "]"
+// }
diff --git a/event/feed_test.go b/event/feed_test.go
new file mode 100644
index 0000000000..a82c103033
--- /dev/null
+++ b/event/feed_test.go
@@ -0,0 +1,294 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see .
+
+package event
+
+import (
+ "fmt"
+ "reflect"
+ "sync"
+ "testing"
+ "time"
+)
+
+func TestFeedPanics(t *testing.T) {
+ {
+ var f Feed
+ f.Send(int(2))
+ want := feedTypeError{op: "Send", got: reflect.TypeOf(uint64(0)), want: reflect.TypeOf(int(0))}
+ if err := checkPanic(want, func() { f.Send(uint64(2)) }); err != nil {
+ t.Error(err)
+ }
+ }
+ {
+ var f Feed
+ ch := make(chan int)
+ f.Subscribe(ch)
+ want := feedTypeError{op: "Send", got: reflect.TypeOf(uint64(0)), want: reflect.TypeOf(int(0))}
+ if err := checkPanic(want, func() { f.Send(uint64(2)) }); err != nil {
+ t.Error(err)
+ }
+ }
+ {
+ var f Feed
+ f.Send(int(2))
+ want := feedTypeError{op: "Subscribe", got: reflect.TypeOf(make(chan uint64)), want: reflect.TypeOf(make(chan<- int))}
+ if err := checkPanic(want, func() { f.Subscribe(make(chan uint64)) }); err != nil {
+ t.Error(err)
+ }
+ }
+ {
+ var f Feed
+ if err := checkPanic(errBadChannel, func() { f.Subscribe(make(<-chan int)) }); err != nil {
+ t.Error(err)
+ }
+ }
+ {
+ var f Feed
+ if err := checkPanic(errBadChannel, func() { f.Subscribe(int(0)) }); err != nil {
+ t.Error(err)
+ }
+ }
+}
+
+func checkPanic(want error, fn func()) (err error) {
+ defer func() {
+ panic := recover()
+ if panic == nil {
+ err = fmt.Errorf("didn't panic")
+ } else if !reflect.DeepEqual(panic, want) {
+ err = fmt.Errorf("panicked with wrong error: got %q, want %q", panic, want)
+ }
+ }()
+ fn()
+ return nil
+}
+
+func TestFeed(t *testing.T) {
+ var feed Feed
+ var done, subscribed sync.WaitGroup
+ subscriber := func(i int) {
+ defer done.Done()
+
+ subchan := make(chan int)
+ sub := feed.Subscribe(subchan)
+ timeout := time.NewTimer(2 * time.Second)
+ subscribed.Done()
+
+ select {
+ case v := <-subchan:
+ if v != 1 {
+ t.Errorf("%d: received value %d, want 1", i, v)
+ }
+ case <-timeout.C:
+ t.Errorf("%d: receive timeout", i)
+ }
+
+ sub.Unsubscribe()
+ select {
+ case _, ok := <-sub.Err():
+ if ok {
+ t.Errorf("%d: error channel not closed after unsubscribe", i)
+ }
+ case <-timeout.C:
+ t.Errorf("%d: unsubscribe timeout", i)
+ }
+ }
+
+ const n = 1000
+ done.Add(n)
+ subscribed.Add(n)
+ for i := 0; i < n; i++ {
+ go subscriber(i)
+ }
+ subscribed.Wait()
+ if nsent := feed.Send(1); nsent != n {
+ t.Errorf("first send delivered %d times, want %d", nsent, n)
+ }
+ if nsent := feed.Send(2); nsent != 0 {
+ t.Errorf("second send delivered %d times, want 0", nsent)
+ }
+ done.Wait()
+}
+
+func TestFeedSubscribeSameChannel(t *testing.T) {
+ var (
+ feed Feed
+ done sync.WaitGroup
+ ch = make(chan int)
+ sub1 = feed.Subscribe(ch)
+ sub2 = feed.Subscribe(ch)
+ _ = feed.Subscribe(ch)
+ )
+ expectSends := func(value, n int) {
+ if nsent := feed.Send(value); nsent != n {
+ t.Errorf("send delivered %d times, want %d", nsent, n)
+ }
+ done.Done()
+ }
+ expectRecv := func(wantValue, n int) {
+ for i := 0; i < n; i++ {
+ if v := <-ch; v != wantValue {
+ t.Errorf("received %d, want %d", v, wantValue)
+ }
+ }
+ }
+
+ done.Add(1)
+ go expectSends(1, 3)
+ expectRecv(1, 3)
+ done.Wait()
+
+ sub1.Unsubscribe()
+
+ done.Add(1)
+ go expectSends(2, 2)
+ expectRecv(2, 2)
+ done.Wait()
+
+ sub2.Unsubscribe()
+
+ done.Add(1)
+ go expectSends(3, 1)
+ expectRecv(3, 1)
+ done.Wait()
+}
+
+func TestFeedSubscribeBlockedPost(t *testing.T) {
+ var (
+ feed Feed
+ nsends = 2000
+ ch1 = make(chan int)
+ ch2 = make(chan int)
+ wg sync.WaitGroup
+ )
+ defer wg.Wait()
+
+ feed.Subscribe(ch1)
+ wg.Add(nsends)
+ for i := 0; i < nsends; i++ {
+ go func() {
+ feed.Send(99)
+ wg.Done()
+ }()
+ }
+
+ sub2 := feed.Subscribe(ch2)
+ defer sub2.Unsubscribe()
+
+ // We're done when ch1 has received N times.
+ // The number of receives on ch2 depends on scheduling.
+ for i := 0; i < nsends; {
+ select {
+ case <-ch1:
+ i++
+ case <-ch2:
+ }
+ }
+}
+
+func TestFeedUnsubscribeBlockedPost(t *testing.T) {
+ var (
+ feed Feed
+ nsends = 200
+ chans = make([]chan int, 2000)
+ subs = make([]Subscription, len(chans))
+ bchan = make(chan int)
+ bsub = feed.Subscribe(bchan)
+ wg sync.WaitGroup
+ )
+ for i := range chans {
+ chans[i] = make(chan int, nsends)
+ }
+
+ // Queue up some Sends. None of these can make progress while bchan isn't read.
+ wg.Add(nsends)
+ for i := 0; i < nsends; i++ {
+ go func() {
+ feed.Send(99)
+ wg.Done()
+ }()
+ }
+ // Subscribe the other channels.
+ for i, ch := range chans {
+ subs[i] = feed.Subscribe(ch)
+ }
+ // Unsubscribe them again.
+ for _, sub := range subs {
+ sub.Unsubscribe()
+ }
+ // Unblock the Sends.
+ bsub.Unsubscribe()
+ wg.Wait()
+}
+
+func TestFeedUnsubscribeFromInbox(t *testing.T) {
+ var (
+ feed Feed
+ ch1 = make(chan int)
+ ch2 = make(chan int)
+ sub1 = feed.Subscribe(ch1)
+ sub2 = feed.Subscribe(ch1)
+ sub3 = feed.Subscribe(ch2)
+ )
+ if len(feed.inbox) != 3 {
+ t.Errorf("inbox length != 3 after subscribe")
+ }
+ if len(feed.sendCases) != 1 {
+ t.Errorf("sendCases is non-empty after unsubscribe")
+ }
+
+ sub1.Unsubscribe()
+ sub2.Unsubscribe()
+ sub3.Unsubscribe()
+ if len(feed.inbox) != 0 {
+ t.Errorf("inbox is non-empty after unsubscribe")
+ }
+ if len(feed.sendCases) != 1 {
+ t.Errorf("sendCases is non-empty after unsubscribe")
+ }
+}
+
+func BenchmarkFeedSend1000(b *testing.B) {
+ var (
+ done sync.WaitGroup
+ feed Feed
+ nsubs = 1000
+ )
+ subscriber := func(ch <-chan int) {
+ for i := 0; i < b.N; i++ {
+ <-ch
+ }
+ done.Done()
+ }
+ done.Add(nsubs)
+ for i := 0; i < nsubs; i++ {
+ ch := make(chan int, 200)
+ feed.Subscribe(ch)
+ go subscriber(ch)
+ }
+
+ // The actual benchmark.
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ if feed.Send(i) != nsubs {
+ panic("wrong number of sends")
+ }
+ }
+
+ b.StopTimer()
+ done.Wait()
+}
diff --git a/event/subscription.go b/event/subscription.go
new file mode 100644
index 0000000000..83bd21213d
--- /dev/null
+++ b/event/subscription.go
@@ -0,0 +1,275 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see .
+
+package event
+
+import (
+ "sync"
+ "time"
+
+ "github.com/ethereum/go-ethereum/common/mclock"
+ "golang.org/x/net/context"
+)
+
+// Subscription represents a stream of events. The carrier of the events is typically a
+// channel, but isn't part of the interface.
+//
+// Subscriptions can fail while established. Failures are reported through an error
+// channel. It receives a value if there is an issue with the subscription (e.g. the
+// network connection delivering the events has been closed). Only one value will ever be
+// sent.
+//
+// The error channel is closed when the subscription ends successfully (i.e. when the
+// source of events is closed). It is also closed when Unsubscribe is called.
+//
+// The Unsubscribe method cancels the sending of events. You must call Unsubscribe in all
+// cases to ensure that resources related to the subscription are released. It can be
+// called any number of times.
+type Subscription interface {
+ Err() <-chan error // returns the error channel
+ Unsubscribe() // cancels sending of events, closing the error channel
+}
+
+// NewSubscription runs a producer function as a subscription in a new goroutine. The
+// channel given to the producer is closed when Unsubscribe is called. If fn returns an
+// error, it is sent on the subscription's error channel.
+func NewSubscription(producer func(<-chan struct{}) error) Subscription {
+ s := &funcSub{unsub: make(chan struct{}), err: make(chan error, 1)}
+ go func() {
+ defer close(s.err)
+ err := producer(s.unsub)
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ if !s.unsubscribed {
+ if err != nil {
+ s.err <- err
+ }
+ s.unsubscribed = true
+ }
+ }()
+ return s
+}
+
+type funcSub struct {
+ unsub chan struct{}
+ err chan error
+ mu sync.Mutex
+ unsubscribed bool
+}
+
+func (s *funcSub) Unsubscribe() {
+ s.mu.Lock()
+ if s.unsubscribed {
+ s.mu.Unlock()
+ return
+ }
+ s.unsubscribed = true
+ close(s.unsub)
+ s.mu.Unlock()
+ // Wait for producer shutdown.
+ <-s.err
+}
+
+func (s *funcSub) Err() <-chan error {
+ return s.err
+}
+
+// Resubscribe calls fn repeatedly to keep a subscription established. When the
+// subscription is established, Resubscribe waits for it to fail and calls fn again. This
+// process repeats until Unsubscribe is called or the active subscription ends
+// successfully.
+//
+// Resubscribe applies backoff between calls to fn. The time between calls is adapted
+// based on the error rate, but will never exceed backoffMax.
+func Resubscribe(backoffMax time.Duration, fn ResubscribeFunc) Subscription {
+ s := &resubscribeSub{
+ waitTime: backoffMax / 10,
+ backoffMax: backoffMax,
+ fn: fn,
+ err: make(chan error),
+ unsub: make(chan struct{}),
+ }
+ go s.loop()
+ return s
+}
+
+// A ResubscribeFunc attempts to establish a subscription.
+type ResubscribeFunc func(context.Context) (Subscription, error)
+
+type resubscribeSub struct {
+ fn ResubscribeFunc
+ err chan error
+ unsub chan struct{}
+ unsubOnce sync.Once
+ lastTry mclock.AbsTime
+ waitTime, backoffMax time.Duration
+}
+
+func (s *resubscribeSub) Unsubscribe() {
+ s.unsubOnce.Do(func() {
+ s.unsub <- struct{}{}
+ <-s.err
+ })
+}
+
+func (s *resubscribeSub) Err() <-chan error {
+ return s.err
+}
+
+func (s *resubscribeSub) loop() {
+ defer close(s.err)
+ var done bool
+ for !done {
+ sub := s.subscribe()
+ if sub == nil {
+ break
+ }
+ done = s.waitForError(sub)
+ sub.Unsubscribe()
+ }
+}
+
+func (s *resubscribeSub) subscribe() Subscription {
+ subscribed := make(chan error)
+ var sub Subscription
+retry:
+ for {
+ s.lastTry = mclock.Now()
+ ctx, cancel := context.WithCancel(context.Background())
+ go func() {
+ rsub, err := s.fn(ctx)
+ sub = rsub
+ subscribed <- err
+ }()
+ select {
+ case err := <-subscribed:
+ cancel()
+ if err != nil {
+ // Subscribing failed, wait before launching the next try.
+ if s.backoffWait() {
+ return nil
+ }
+ continue retry
+ }
+ if sub == nil {
+ panic("event: ResubscribeFunc returned nil subscription and no error")
+ }
+ return sub
+ case <-s.unsub:
+ cancel()
+ return nil
+ }
+ }
+}
+
+func (s *resubscribeSub) waitForError(sub Subscription) bool {
+ defer sub.Unsubscribe()
+ select {
+ case err := <-sub.Err():
+ return err == nil
+ case <-s.unsub:
+ return true
+ }
+}
+
+func (s *resubscribeSub) backoffWait() bool {
+ if time.Duration(mclock.Now()-s.lastTry) > s.backoffMax {
+ s.waitTime = s.backoffMax / 10
+ } else {
+ s.waitTime *= 2
+ if s.waitTime > s.backoffMax {
+ s.waitTime = s.backoffMax
+ }
+ }
+
+ t := time.NewTimer(s.waitTime)
+ defer t.Stop()
+ select {
+ case <-t.C:
+ return false
+ case <-s.unsub:
+ return true
+ }
+}
+
+// SubscriptionScope provides a facility to unsubscribe multiple subscriptions at once.
+//
+// For code that handle more than one subscription, a scope can be used to conveniently
+// unsubscribe all of them with a single call. The example demonstrates a typical use in a
+// larger program.
+//
+// The zero value is ready to use.
+type SubscriptionScope struct {
+ mu sync.Mutex
+ subs map[*scopeSub]struct{}
+ closed bool
+}
+
+type scopeSub struct {
+ sc *SubscriptionScope
+ s Subscription
+}
+
+// Track starts tracking a subscription. If the scope is closed, Track returns nil. The
+// returned subscription is a wrapper. Unsubscribing the wrapper removes it from the
+// scope.
+func (sc *SubscriptionScope) Track(s Subscription) Subscription {
+ sc.mu.Lock()
+ defer sc.mu.Unlock()
+ if sc.closed {
+ return nil
+ }
+ if sc.subs == nil {
+ sc.subs = make(map[*scopeSub]struct{})
+ }
+ ss := &scopeSub{sc, s}
+ sc.subs[ss] = struct{}{}
+ return ss
+}
+
+// Close calls Unsubscribe on all tracked subscriptions and prevents further additions to
+// the tracked set. Calls to Track after Close return nil.
+func (sc *SubscriptionScope) Close() {
+ sc.mu.Lock()
+ defer sc.mu.Unlock()
+ if sc.closed {
+ return
+ }
+ sc.closed = true
+ for s := range sc.subs {
+ s.s.Unsubscribe()
+ }
+ sc.subs = nil
+}
+
+// Count returns the number of tracked subscriptions.
+// It is meant to be used for debugging.
+func (sc *SubscriptionScope) Count() int {
+ sc.mu.Lock()
+ defer sc.mu.Unlock()
+ return len(sc.subs)
+}
+
+func (s *scopeSub) Unsubscribe() {
+ s.s.Unsubscribe()
+ s.sc.mu.Lock()
+ defer s.sc.mu.Unlock()
+ delete(s.sc.subs, s)
+}
+
+func (s *scopeSub) Err() <-chan error {
+ return s.s.Err()
+}
diff --git a/event/subscription_test.go b/event/subscription_test.go
new file mode 100644
index 0000000000..a4fe30298c
--- /dev/null
+++ b/event/subscription_test.go
@@ -0,0 +1,121 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see .
+
+package event
+
+import (
+ "errors"
+ "testing"
+ "time"
+
+ "golang.org/x/net/context"
+)
+
+var errInts = errors.New("error in subscribeInts")
+
+func subscribeInts(max, fail int, c chan<- int) Subscription {
+ return NewSubscription(func(quit <-chan struct{}) error {
+ for i := 0; i < max; i++ {
+ if i >= fail {
+ return errInts
+ }
+ select {
+ case c <- i:
+ case <-quit:
+ return nil
+ }
+ }
+ return nil
+ })
+}
+
+func TestNewSubscriptionError(t *testing.T) {
+ t.Parallel()
+
+ channel := make(chan int)
+ sub := subscribeInts(10, 2, channel)
+loop:
+ for want := 0; want < 10; want++ {
+ select {
+ case got := <-channel:
+ if got != want {
+ t.Fatalf("wrong int %d, want %d", got, want)
+ }
+ case err := <-sub.Err():
+ if err != errInts {
+ t.Fatalf("wrong error: got %q, want %q", err, errInts)
+ }
+ if want != 2 {
+ t.Fatalf("got errInts at int %d, should be received at 2", want)
+ }
+ break loop
+ }
+ }
+ sub.Unsubscribe()
+
+ err, ok := <-sub.Err()
+ if err != nil {
+ t.Fatal("got non-nil error after Unsubscribe")
+ }
+ if ok {
+ t.Fatal("channel still open after Unsubscribe")
+ }
+}
+
+func TestResubscribe(t *testing.T) {
+ t.Parallel()
+
+ var i int
+ nfails := 6
+ sub := Resubscribe(100*time.Millisecond, func(ctx context.Context) (Subscription, error) {
+ // fmt.Printf("call #%d @ %v\n", i, time.Now())
+ i++
+ if i == 2 {
+ // Delay the second failure a bit to reset the resubscribe interval.
+ time.Sleep(200 * time.Millisecond)
+ }
+ if i < nfails {
+ return nil, errors.New("oops")
+ }
+ sub := NewSubscription(func(unsubscribed <-chan struct{}) error { return nil })
+ return sub, nil
+ })
+
+ <-sub.Err()
+ if i != nfails {
+ t.Fatalf("resubscribe function called %d times, want %d times", i, nfails)
+ }
+}
+
+func TestResubscribeAbort(t *testing.T) {
+ t.Parallel()
+
+ done := make(chan error)
+ sub := Resubscribe(0, func(ctx context.Context) (Subscription, error) {
+ select {
+ case <-ctx.Done():
+ done <- nil
+ case <-time.After(2 * time.Second):
+ done <- errors.New("context given to resubscribe function not canceled within 2s")
+ }
+ return nil, nil
+ })
+
+ sub.Unsubscribe()
+ if err := <-done; err != nil {
+ t.Fatal(err)
+ }
+}
diff --git a/light/txpool.go b/light/txpool.go
index d0781593b0..bcdb6123de 100644
--- a/light/txpool.go
+++ b/light/txpool.go
@@ -47,7 +47,7 @@ type TxPool struct {
signer types.Signer
quit chan bool
eventMux *event.TypeMux
- events event.Subscription
+ events *event.TypeMuxSubscription
mu sync.RWMutex
chain *LightChain
odr OdrBackend
diff --git a/miner/worker.go b/miner/worker.go
index 77e4e02059..49ac602537 100644
--- a/miner/worker.go
+++ b/miner/worker.go
@@ -90,7 +90,7 @@ type worker struct {
// update loop
mux *event.TypeMux
- events event.Subscription
+ events *event.TypeMuxSubscription
wg sync.WaitGroup
agents map[Agent]struct{}
diff --git a/rpc/client.go b/rpc/client.go
index 34a3b78317..269eb78c86 100644
--- a/rpc/client.go
+++ b/rpc/client.go
@@ -682,7 +682,7 @@ func newClientSubscription(c *Client, channel reflect.Value) *ClientSubscription
// resubscription when the client connection is closed unexpectedly.
//
// The error channel receives a value when the subscription has ended due
-// to an error. The received error is ErrClientQuit if Close has been called
+// to an error. The received error is nil if Close has been called
// on the underlying client and no other error has occurred.
//
// The error channel is closed when Unsubscribe is called on the subscription.
@@ -707,6 +707,9 @@ func (sub *ClientSubscription) quitWithError(err error, unsubscribeServer bool)
sub.requestUnsubscribe()
}
if err != nil {
+ if err == ErrClientQuit {
+ err = nil // Adhere to subscription semantics.
+ }
sub.err <- err
}
})