// Copyright 2016 The go-ethereum Authors // This file is part of the go-ethereum library. // // The go-ethereum library is free software: you can redistribute it and/or modify // it under the terms of the GNU Lesser General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // The go-ethereum library is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU Lesser General Public License for more details. // // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. package rpc import ( "errors" "sync" "time" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" "golang.org/x/net/context" ) var ( // ErrNotificationsUnsupported is returned when the connection doesn't support notifications ErrNotificationsUnsupported = errors.New("notifications not supported") // ErrNotificationNotFound is returned when the notification for the given id is not found ErrNotificationNotFound = errors.New("notification not found") // errNotifierStopped is returned when the notifier is stopped (e.g. codec is closed) errNotifierStopped = errors.New("unable to send notification") // errNotificationQueueFull is returns when there are too many notifications in the queue errNotificationQueueFull = errors.New("too many pending notifications") ) // unsubSignal is a signal that the subscription is unsubscribed. It is used to flush buffered // notifications that might be pending in the internal queue. var unsubSignal = new(struct{}) // UnsubscribeCallback defines a callback that is called when a subcription ends. // It receives the subscription id as argument. type UnsubscribeCallback func(id string) // notification is a helper object that holds event data for a subscription type notification struct { sub *bufferedSubscription // subscription id data interface{} // event data } // A Notifier type describes the interface for objects that can send create subscriptions type Notifier interface { // Create a new subscription. The given callback is called when this subscription // is cancelled (e.g. client send an unsubscribe, connection closed). NewSubscription(UnsubscribeCallback) (Subscription, error) // Cancel subscription Unsubscribe(id string) error } type notifierKey struct{} // NotifierFromContext returns the Notifier value stored in ctx, if any. func NotifierFromContext(ctx context.Context) (Notifier, bool) { n, ok := ctx.Value(notifierKey{}).(Notifier) return n, ok } // Subscription defines the interface for objects that can notify subscribers type Subscription interface { // Inform client of an event Notify(data interface{}) error // Unique identifier ID() string // Cancel subscription Cancel() error } // bufferedSubscription is a subscription that uses a bufferedNotifier to send // notifications to subscribers. type bufferedSubscription struct { id string unsubOnce sync.Once // call unsub method once unsub UnsubscribeCallback // called on Unsubscribed notifier *bufferedNotifier // forward notifications to pending chan interface{} // closed when active flushed chan interface{} // closed when all buffered notifications are send lastNotification time.Time // last time a notification was send } // ID returns the subscription identifier that the client uses to refer to this instance. func (s *bufferedSubscription) ID() string { return s.id } // Cancel informs the notifier that this subscription is cancelled by the API func (s *bufferedSubscription) Cancel() error { return s.notifier.Unsubscribe(s.id) } // Notify the subscriber of a particular event. func (s *bufferedSubscription) Notify(data interface{}) error { return s.notifier.send(s.id, data) } // bufferedNotifier is a notifier that queues notifications in an internal queue and // send them as fast as possible to the client from this queue. It will stop if the // queue grows past a given size. type bufferedNotifier struct { codec ServerCodec // underlying connection mu sync.Mutex // guard internal state subscriptions map[string]*bufferedSubscription // keep track of subscriptions associated with codec queueSize int // max number of items in queue queue chan *notification // notification queue stopped bool // indication if this notifier is ordered to stop } // newBufferedNotifier returns a notifier that queues notifications in an internal queue // from which notifications are send as fast as possible to the client. If the queue size // limit is reached (client is unable to keep up) it will stop and closes the codec. func newBufferedNotifier(codec ServerCodec, size int) *bufferedNotifier { notifier := &bufferedNotifier{ codec: codec, subscriptions: make(map[string]*bufferedSubscription), queue: make(chan *notification, size), queueSize: size, } go notifier.run() return notifier } // NewSubscription creates a new subscription that forwards events to this instance internal // queue. The given callback is called when the subscription is unsubscribed/cancelled. func (n *bufferedNotifier) NewSubscription(callback UnsubscribeCallback) (Subscription, error) { id, err := newSubscriptionID() if err != nil { return nil, err } n.mu.Lock() defer n.mu.Unlock() if n.stopped { return nil, errNotifierStopped } sub := &bufferedSubscription{ id: id, unsub: callback, notifier: n, pending: make(chan interface{}), flushed: make(chan interface{}), lastNotification: time.Now(), } n.subscriptions[id] = sub return sub, nil } // Remove the given subscription. If subscription is not found notificationNotFoundErr is returned. func (n *bufferedNotifier) Unsubscribe(subid string) error { n.mu.Lock() sub, found := n.subscriptions[subid] n.mu.Unlock() if found { // send the unsubscribe signal, this will cause the notifier not to accept new events // for this subscription and will close the flushed channel after the last (buffered) // notification was send to the client. if err := n.send(subid, unsubSignal); err != nil { return err } // wait for confirmation that all (buffered) events are send for this subscription. // this ensures that the unsubscribe method response is not send before all buffered // events for this subscription are send. <-sub.flushed return nil } return ErrNotificationNotFound } // Send enques the given data for the subscription with public ID on the internal queue. t returns // an error when the notifier is stopped or the queue is full. If data is the unsubscribe signal it // will remove the subscription with the given id from the subscription collection. func (n *bufferedNotifier) send(id string, data interface{}) error { n.mu.Lock() defer n.mu.Unlock() if n.stopped { return errNotifierStopped } var ( subscription *bufferedSubscription found bool ) // check if subscription is associated with this connection, it might be cancelled // (subscribe/connection closed) if subscription, found = n.subscriptions[id]; !found { glog.V(logger.Error).Infof("received notification for unknown subscription %s\n", id) return ErrNotificationNotFound } // received the unsubscribe signal. Add it to the queue to make sure any pending notifications // for this subscription are send. When the run loop receives this singal it will signal that // all pending subscriptions are flushed and that the confirmation of the unsubscribe can be // send to the user. Remove the subscriptions to make sure new notifications are not accepted. if data == unsubSignal { delete(n.subscriptions, id) if subscription.unsub != nil { subscription.unsubOnce.Do(func() { subscription.unsub(id) }) } } subscription.lastNotification = time.Now() if len(n.queue) >= n.queueSize { glog.V(logger.Warn).Infoln("too many buffered notifications -> close connection") n.codec.Close() return errNotificationQueueFull } n.queue <- ¬ification{subscription, data} return nil } // run reads notifications from the internal queue and sends them to the client. In case of an // error, or when the codec is closed it will cancel all active subscriptions and returns. func (n *bufferedNotifier) run() { defer func() { n.mu.Lock() defer n.mu.Unlock() n.stopped = true close(n.queue) // on exit call unsubscribe callback for id, sub := range n.subscriptions { if sub.unsub != nil { sub.unsubOnce.Do(func() { sub.unsub(id) }) } close(sub.flushed) delete(n.subscriptions, id) } }() for { select { case notification := <-n.queue: // It can happen that an event is raised before the RPC server was able to send the sub // id to the client. Therefore subscriptions are marked as pending until the sub id was // send. The RPC server will activate the subscription by closing the pending chan. <-notification.sub.pending if notification.data == unsubSignal { // unsubSignal is the last accepted message for this subscription. Raise the signal // that all buffered notifications are sent by closing the flushed channel. This // indicates that the response for the unsubscribe can be send to the client. close(notification.sub.flushed) } else { msg := n.codec.CreateNotification(notification.sub.id, notification.data) if err := n.codec.Write(msg); err != nil { n.codec.Close() // unable to send notification to client, unsubscribe all subscriptions glog.V(logger.Warn).Infof("unable to send notification - %v\n", err) return } } case <-n.codec.Closed(): // connection was closed glog.V(logger.Debug).Infoln("codec closed, stop subscriptions") return } } } // Marks the subscription as active. This will causes the notifications for this subscription to be // forwarded to the client. func (n *bufferedNotifier) activate(subid string) { n.mu.Lock() defer n.mu.Unlock() if sub, found := n.subscriptions[subid]; found { close(sub.pending) } }