diff --git a/eth/api.go b/eth/api.go index 41387c27f0..a0b1f8ac27 100644 --- a/eth/api.go +++ b/eth/api.go @@ -603,7 +603,7 @@ type NewBlocksArgs struct { // NewBlocks triggers a new block event each time a block is appended to the chain. It accepts an argument which allows // the caller to specify whether the output should contain transactions and in what format. func (s *PublicBlockChainAPI) NewBlocks(ctx context.Context, args NewBlocksArgs) (rpc.Subscription, error) { - notifier, supported := ctx.Value(rpc.NotifierContextKey).(rpc.Notifier) + notifier, supported := rpc.NotifierFromContext(ctx) if !supported { return nil, rpc.ErrNotificationsUnsupported } @@ -1345,7 +1345,7 @@ func (s *PublicTransactionPoolAPI) PendingTransactions() []*RPCTransaction { // NewPendingTransaction creates a subscription that is triggered each time a transaction enters the transaction pool // and is send from one of the transactions this nodes manages. func (s *PublicTransactionPoolAPI) NewPendingTransactions(ctx context.Context) (rpc.Subscription, error) { - notifier, supported := ctx.Value(rpc.NotifierContextKey).(rpc.Notifier) + notifier, supported := rpc.NotifierFromContext(ctx) if !supported { return nil, rpc.ErrNotificationsUnsupported } diff --git a/eth/downloader/api.go b/eth/downloader/api.go index 670a3ade39..94cd6515f8 100644 --- a/eth/downloader/api.go +++ b/eth/downloader/api.go @@ -85,7 +85,7 @@ type SyncingResult struct { // Syncing provides information when this nodes starts synchronising with the Ethereum network and when it's finished. func (api *PublicDownloaderAPI) Syncing(ctx context.Context) (rpc.Subscription, error) { - notifier, supported := ctx.Value(rpc.NotifierContextKey).(rpc.Notifier) + notifier, supported := rpc.NotifierFromContext(ctx) if !supported { return nil, rpc.ErrNotificationsUnsupported } diff --git a/eth/filters/api.go b/eth/filters/api.go index 9e95ebd83f..d9bd4d4b7f 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -234,7 +234,7 @@ func (s *PublicFilterAPI) newLogFilter(earliest, latest int64, addresses []commo } func (s *PublicFilterAPI) Logs(ctx context.Context, args NewFilterArgs) (rpc.Subscription, error) { - notifier, supported := ctx.Value(rpc.NotifierContextKey).(rpc.Notifier) + notifier, supported := rpc.NotifierFromContext(ctx) if !supported { return nil, rpc.ErrNotificationsUnsupported } diff --git a/rpc/notification.go b/rpc/notification.go index 146d785c90..e84e26a582 100644 --- a/rpc/notification.go +++ b/rpc/notification.go @@ -23,6 +23,7 @@ import ( "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" + "golang.org/x/net/context" ) var ( @@ -62,6 +63,14 @@ type Notifier interface { Unsubscribe(id string) error } +type notifierKey struct{} + +// NotifierFromContext returns the Notifier value stored in ctx, if any. +func NotifierFromContext(ctx context.Context) (Notifier, bool) { + n, ok := ctx.Value(notifierKey{}).(Notifier) + return n, ok +} + // Subscription defines the interface for objects that can notify subscribers type Subscription interface { // Inform client of an event diff --git a/rpc/notification_test.go b/rpc/notification_test.go index cd05d73fe5..1bcede177a 100644 --- a/rpc/notification_test.go +++ b/rpc/notification_test.go @@ -36,7 +36,7 @@ func (s *NotificationTestService) Unsubscribe(subid string) { } func (s *NotificationTestService) SomeSubscription(ctx context.Context, n, val int) (Subscription, error) { - notifier, supported := ctx.Value(NotifierContextKey).(Notifier) + notifier, supported := NotifierFromContext(ctx) if !supported { return nil, ErrNotificationsUnsupported } diff --git a/rpc/server.go b/rpc/server.go index cf90eba02a..001107a1b7 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -32,9 +32,6 @@ import ( const ( stopPendingRequestTimeout = 3 * time.Second // give pending requests stopPendingRequestTimeout the time to finish when the server is stopped - // NotifierContextKey is the key where the notifier associated with the codec is stored in the context - NotifierContextKey = 1 - notificationBufferSize = 10000 // max buffered notifications before codec is closed DefaultIPCApis = "admin,eth,debug,miner,net,shh,txpool,personal,web3" @@ -171,7 +168,7 @@ func (s *Server) serveRequest(codec ServerCodec, singleShot bool, options CodecO // to send notification to clients. It is thight to the codec/connection. If the // connection is closed the notifier will stop and cancels all active subscriptions. if options&OptionSubscriptions == OptionSubscriptions { - ctx = context.WithValue(ctx, NotifierContextKey, newBufferedNotifier(codec, notificationBufferSize)) + ctx = context.WithValue(ctx, notifierKey{}, newBufferedNotifier(codec, notificationBufferSize)) } s.codecsMu.Lock() if atomic.LoadInt32(&s.run) != 1 { // server stopped @@ -275,7 +272,7 @@ func (s *Server) handle(ctx context.Context, codec ServerCodec, req *serverReque if req.isUnsubscribe { // cancel subscription, first param must be the subscription id if len(req.args) >= 1 && req.args[0].Kind() == reflect.String { - notifier, supported := ctx.Value(NotifierContextKey).(*bufferedNotifier) + notifier, supported := NotifierFromContext(ctx) if !supported { // interface doesn't support subscriptions (e.g. http) return codec.CreateErrorResponse(&req.id, &callbackError{ErrNotificationsUnsupported.Error()}), nil } @@ -298,8 +295,8 @@ func (s *Server) handle(ctx context.Context, codec ServerCodec, req *serverReque // active the subscription after the sub id was successful sent to the client activateSub := func() { - notifier, _ := ctx.Value(NotifierContextKey).(*bufferedNotifier) - notifier.activate(subid) + notifier, _ := NotifierFromContext(ctx) + notifier.(*bufferedNotifier).activate(subid) } return codec.CreateResponse(req.id, subid), activateSub