From 47ff8130124b479f1f051312eed50c33f0a38e6f Mon Sep 17 00:00:00 2001 From: Bas van Kervel Date: Wed, 27 Jul 2016 17:47:46 +0200 Subject: [PATCH] rpc: refactor subscriptions and filters --- core/types/block.go | 22 + eth/downloader/api.go | 163 +++- eth/filters/api.go | 838 ++++++++---------- eth/filters/api_test.go | 27 +- eth/filters/filter.go | 120 +-- eth/filters/filter_system.go | 380 +++++--- eth/filters/filter_system_test.go | 357 ++++++-- internal/ethapi/api.go | 126 +-- rpc/notification.go | 297 ------- rpc/server.go | 12 +- rpc/server_test.go | 2 +- rpc/subscription.go | 135 +++ ...ification_test.go => subscription_test.go} | 34 +- rpc/types.go | 4 +- rpc/utils.go | 42 +- 15 files changed, 1304 insertions(+), 1255 deletions(-) delete mode 100644 rpc/notification.go create mode 100644 rpc/subscription.go rename rpc/{notification_test.go => subscription_test.go} (84%) diff --git a/core/types/block.go b/core/types/block.go index 37b6f3ec17..5993592478 100644 --- a/core/types/block.go +++ b/core/types/block.go @@ -114,6 +114,28 @@ func (h *Header) UnmarshalJSON(data []byte) error { return nil } +func (h *Header) MarshalJSON() ([]byte, error) { + fields := map[string]interface{}{ + "hash": h.Hash(), + "parentHash": h.ParentHash, + "number": fmt.Sprintf("%#x", h.Number), + "nonce": h.Nonce, + "receiptRoot": h.ReceiptHash, + "logsBloom": h.Bloom, + "sha3Uncles": h.UncleHash, + "stateRoot": h.Root, + "miner": h.Coinbase, + "difficulty": fmt.Sprintf("%#x", h.Difficulty), + "extraData": fmt.Sprintf("0x%x", h.Extra), + "gasLimit": fmt.Sprintf("%#x", h.GasLimit), + "gasUsed": fmt.Sprintf("%#x", h.GasUsed), + "timestamp": fmt.Sprintf("%#x", h.Time), + "transactionsRoot": h.TxHash, + } + + return json.Marshal(fields) +} + func rlpHash(x interface{}) (h common.Hash) { hw := sha3.NewKeccak256() rlp.Encode(hw, x) diff --git a/eth/downloader/api.go b/eth/downloader/api.go index 94cd6515f8..c36dfb7e07 100644 --- a/eth/downloader/api.go +++ b/eth/downloader/api.go @@ -19,55 +19,104 @@ package downloader import ( "sync" - "golang.org/x/net/context" - "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/rpc" + "golang.org/x/net/context" ) // PublicDownloaderAPI provides an API which gives information about the current synchronisation status. // It offers only methods that operates on data that can be available to anyone without security risks. type PublicDownloaderAPI struct { - d *Downloader - mux *event.TypeMux - muSyncSubscriptions sync.Mutex - syncSubscriptions map[string]rpc.Subscription + d *Downloader + mux *event.TypeMux + installSyncSubscription chan chan interface{} + uninstallSyncSubscription chan *uninstallSyncSubscriptionRequest } -// NewPublicDownloaderAPI create a new PublicDownloaderAPI. +// NewPublicDownloaderAPI create a new PublicDownloaderAPI. The API has an internal event loop that +// listens for events from the downloader through the global event mux. In case it receives one of +// these events it broadcasts it to all syncing subscriptions that are installed through the +// installSyncSubscription channel. func NewPublicDownloaderAPI(d *Downloader, m *event.TypeMux) *PublicDownloaderAPI { - api := &PublicDownloaderAPI{d: d, mux: m, syncSubscriptions: make(map[string]rpc.Subscription)} + api := &PublicDownloaderAPI{ + d: d, + mux: m, + installSyncSubscription: make(chan chan interface{}), + uninstallSyncSubscription: make(chan *uninstallSyncSubscriptionRequest), + } - go api.run() + go api.eventLoop() return api } -func (api *PublicDownloaderAPI) run() { - sub := api.mux.Subscribe(StartEvent{}, DoneEvent{}, FailedEvent{}) +// eventLoop runs an loop until the event mux closes. It will install and uninstall new +// sync subscriptions and broadcasts sync status updates to the installed sync subscriptions. +func (api *PublicDownloaderAPI) eventLoop() { + var ( + sub = api.mux.Subscribe(StartEvent{}, DoneEvent{}, FailedEvent{}) + syncSubscriptions = make(map[chan interface{}]struct{}) + ) - for event := range sub.Chan() { - var notification interface{} + for { + select { + case i := <-api.installSyncSubscription: + syncSubscriptions[i] = struct{}{} + case u := <-api.uninstallSyncSubscription: + delete(syncSubscriptions, u.c) + close(u.uninstalled) + case event := <-sub.Chan(): + if event == nil { + return + } - switch event.Data.(type) { - case StartEvent: - result := &SyncingResult{Syncing: true} - result.Status.Origin, result.Status.Current, result.Status.Height, result.Status.Pulled, result.Status.Known = api.d.Progress() - notification = result - case DoneEvent, FailedEvent: - notification = false - } - - api.muSyncSubscriptions.Lock() - for id, sub := range api.syncSubscriptions { - if sub.Notify(notification) == rpc.ErrNotificationNotFound { - delete(api.syncSubscriptions, id) + var notification interface{} + switch event.Data.(type) { + case StartEvent: + result := &SyncingResult{Syncing: true} + result.Status.Origin, result.Status.Current, result.Status.Height, result.Status.Pulled, result.Status.Known = api.d.Progress() + notification = result + case DoneEvent, FailedEvent: + notification = false + } + // broadcast + for c := range syncSubscriptions { + c <- notification } } - api.muSyncSubscriptions.Unlock() } } +// Syncing provides information when this nodes starts synchronising with the Ethereum network and when it's finished. +func (api *PublicDownloaderAPI) Syncing(ctx context.Context) (*rpc.Subscription, error) { + notifier, supported := rpc.NotifierFromContext(ctx) + if !supported { + return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported + } + + rpcSub := notifier.CreateSubscription() + + go func() { + statuses := make(chan interface{}) + sub := api.SubscribeSyncStatus(statuses) + + for { + select { + case status := <-statuses: + notifier.Notify(rpcSub.ID, status) + case <-rpcSub.Err(): + sub.Unsubscribe() + return + case <-notifier.Closed(): + sub.Unsubscribe() + return + } + } + }() + + return rpcSub, nil +} + // Progress gives progress indications when the node is synchronising with the Ethereum network. type Progress struct { Origin uint64 `json:"startingBlock"` @@ -83,26 +132,42 @@ type SyncingResult struct { Status Progress `json:"status"` } -// Syncing provides information when this nodes starts synchronising with the Ethereum network and when it's finished. -func (api *PublicDownloaderAPI) Syncing(ctx context.Context) (rpc.Subscription, error) { - notifier, supported := rpc.NotifierFromContext(ctx) - if !supported { - return nil, rpc.ErrNotificationsUnsupported - } - - subscription, err := notifier.NewSubscription(func(id string) { - api.muSyncSubscriptions.Lock() - delete(api.syncSubscriptions, id) - api.muSyncSubscriptions.Unlock() - }) - - if err != nil { - return nil, err - } - - api.muSyncSubscriptions.Lock() - api.syncSubscriptions[subscription.ID()] = subscription - api.muSyncSubscriptions.Unlock() - - return subscription, nil +// uninstallSyncSubscriptionRequest uninstalles a syncing subscription in the API event loop. +type uninstallSyncSubscriptionRequest struct { + c chan interface{} + uninstalled chan interface{} +} + +// SyncStatusSubscription represents a syncing subscription. +type SyncStatusSubscription struct { + api *PublicDownloaderAPI // register subscription in event loop of this api instance + c chan interface{} // channel where events are broadcasted to + unsubOnce sync.Once // make sure unsubscribe logic is executed once +} + +// Unsubscribe uninstalls the subscription from the DownloadAPI event loop. +// The status channel that was passed to subscribeSyncStatus isn't used anymore +// after this method returns. +func (s *SyncStatusSubscription) Unsubscribe() { + s.unsubOnce.Do(func() { + req := uninstallSyncSubscriptionRequest{s.c, make(chan interface{})} + s.api.uninstallSyncSubscription <- &req + + for { + select { + case <-s.c: + // drop new status events until uninstall confirmation + continue + case <-req.uninstalled: + return + } + } + }) +} + +// SubscribeSyncStatus creates a subscription that will broadcast new synchronisation updates. +// The given channel must receive interface values, the result can either +func (api *PublicDownloaderAPI) SubscribeSyncStatus(status chan interface{}) *SyncStatusSubscription { + api.installSyncSubscription <- status + return &SyncStatusSubscription{api: api, c: status} } diff --git a/eth/filters/api.go b/eth/filters/api.go index 65c5b9380d..3bc2203481 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -17,292 +17,414 @@ package filters import ( - "crypto/rand" "encoding/hex" "encoding/json" "errors" "fmt" + "math/big" "sync" "time" + "golang.org/x/net/context" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/rpc" - - "golang.org/x/net/context" ) var ( - filterTickerTime = 5 * time.Minute + deadline = 5 * time.Minute // consider a filter inactive if it has not been polled for within deadline ) -// byte will be inferred -const ( - unknownFilterTy = iota - blockFilterTy - transactionFilterTy - logFilterTy -) +// filter is a helper struct that holds meta information over the filter type +// and associated subscription in the event system. +type filter struct { + typ Type + deadline *time.Timer // filter is inactiv when deadline triggers + hashes []common.Hash + crit FilterCriteria + logs []Log + s *Subscription // associated subscription in event system +} // PublicFilterAPI offers support to create and manage filters. This will allow external clients to retrieve various // information related to the Ethereum protocol such als blocks, transactions and logs. type PublicFilterAPI struct { - mux *event.TypeMux - - quit chan struct{} - chainDb ethdb.Database - - filterManager *FilterSystem - - filterMapMu sync.RWMutex - filterMapping map[string]int // maps between filter internal filter identifiers and external filter identifiers - - logMu sync.RWMutex - logQueue map[int]*logQueue - - blockMu sync.RWMutex - blockQueue map[int]*hashQueue - - transactionMu sync.RWMutex - transactionQueue map[int]*hashQueue + mux *event.TypeMux + quit chan struct{} + chainDb ethdb.Database + events *EventSystem + filtersMu sync.Mutex + filters map[rpc.ID]*filter } // NewPublicFilterAPI returns a new PublicFilterAPI instance. func NewPublicFilterAPI(chainDb ethdb.Database, mux *event.TypeMux) *PublicFilterAPI { - svc := &PublicFilterAPI{ - mux: mux, - chainDb: chainDb, - filterManager: NewFilterSystem(mux), - filterMapping: make(map[string]int), - logQueue: make(map[int]*logQueue), - blockQueue: make(map[int]*hashQueue), - transactionQueue: make(map[int]*hashQueue), + api := &PublicFilterAPI{ + mux: mux, + chainDb: chainDb, + events: NewEventSystem(mux), + filters: make(map[rpc.ID]*filter), } - go svc.start() - return svc + + go api.timeoutLoop() + + return api } -// Stop quits the work loop. -func (s *PublicFilterAPI) Stop() { - close(s.quit) -} - -// start the work loop, wait and process events. -func (s *PublicFilterAPI) start() { - timer := time.NewTicker(2 * time.Second) - defer timer.Stop() -done: +// timeoutLoop runs every 5 minutes and deletes filters that have not been recently used. +// Tt is started when the api is created. +func (api *PublicFilterAPI) timeoutLoop() { + ticker := time.NewTicker(5 * time.Minute) for { - select { - case <-timer.C: - s.filterManager.Lock() // lock order like filterLoop() - s.logMu.Lock() - for id, filter := range s.logQueue { - if time.Since(filter.timeout) > filterTickerTime { - s.filterManager.Remove(id) - delete(s.logQueue, id) - } + <-ticker.C + api.filtersMu.Lock() + for id, f := range api.filters { + select { + case <-f.deadline.C: + f.s.Unsubscribe() + delete(api.filters, id) + default: + continue } - s.logMu.Unlock() - - s.blockMu.Lock() - for id, filter := range s.blockQueue { - if time.Since(filter.timeout) > filterTickerTime { - s.filterManager.Remove(id) - delete(s.blockQueue, id) - } - } - s.blockMu.Unlock() - - s.transactionMu.Lock() - for id, filter := range s.transactionQueue { - if time.Since(filter.timeout) > filterTickerTime { - s.filterManager.Remove(id) - delete(s.transactionQueue, id) - } - } - s.transactionMu.Unlock() - s.filterManager.Unlock() - case <-s.quit: - break done } + api.filtersMu.Unlock() } - } -// NewBlockFilter create a new filter that returns blocks that are included into the canonical chain. -func (s *PublicFilterAPI) NewBlockFilter() (string, error) { - // protect filterManager.Add() and setting of filter fields - s.filterManager.Lock() - defer s.filterManager.Unlock() +// NewPendingTransactionFilter creates a filter that fetches pending transaction hashes +// as transactions enter the pending state. +// +// It is part of the filter package because this filter can be used throug the +// `eth_getFilterChanges` polling method that is also used for log filters. +// +// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newpendingtransactionfilter +func (api *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID { + var ( + pendingTxs = make(chan common.Hash) + pendingTxSub = api.events.SubscribePendingTxEvents(pendingTxs) + ) - externalId, err := newFilterId() - if err != nil { - return "", err - } + api.filtersMu.Lock() + api.filters[pendingTxSub.ID] = &filter{typ: PendingTransactionsSubscription, deadline: time.NewTimer(deadline), hashes: make([]common.Hash, 0), s: pendingTxSub} + api.filtersMu.Unlock() - filter := New(s.chainDb) - id, err := s.filterManager.Add(filter, ChainFilter) - if err != nil { - return "", err - } - - s.blockMu.Lock() - s.blockQueue[id] = &hashQueue{timeout: time.Now()} - s.blockMu.Unlock() - - filter.BlockCallback = func(block *types.Block, logs vm.Logs) { - s.blockMu.Lock() - defer s.blockMu.Unlock() - - if queue := s.blockQueue[id]; queue != nil { - queue.add(block.Hash()) - } - } - - s.filterMapMu.Lock() - s.filterMapping[externalId] = id - s.filterMapMu.Unlock() - - return externalId, nil -} - -// NewPendingTransactionFilter creates a filter that returns new pending transactions. -func (s *PublicFilterAPI) NewPendingTransactionFilter() (string, error) { - // protect filterManager.Add() and setting of filter fields - s.filterManager.Lock() - defer s.filterManager.Unlock() - - externalId, err := newFilterId() - if err != nil { - return "", err - } - - filter := New(s.chainDb) - id, err := s.filterManager.Add(filter, PendingTxFilter) - if err != nil { - return "", err - } - - s.transactionMu.Lock() - s.transactionQueue[id] = &hashQueue{timeout: time.Now()} - s.transactionMu.Unlock() - - filter.TransactionCallback = func(tx *types.Transaction) { - s.transactionMu.Lock() - defer s.transactionMu.Unlock() - - if queue := s.transactionQueue[id]; queue != nil { - queue.add(tx.Hash()) - } - } - - s.filterMapMu.Lock() - s.filterMapping[externalId] = id - s.filterMapMu.Unlock() - - return externalId, nil -} - -// newLogFilter creates a new log filter. -func (s *PublicFilterAPI) newLogFilter(earliest, latest int64, addresses []common.Address, topics [][]common.Hash, callback func(log *vm.Log, removed bool)) (int, error) { - // protect filterManager.Add() and setting of filter fields - s.filterManager.Lock() - defer s.filterManager.Unlock() - - filter := New(s.chainDb) - id, err := s.filterManager.Add(filter, LogFilter) - if err != nil { - return 0, err - } - - s.logMu.Lock() - s.logQueue[id] = &logQueue{timeout: time.Now()} - s.logMu.Unlock() - - filter.SetBeginBlock(earliest) - filter.SetEndBlock(latest) - filter.SetAddresses(addresses) - filter.SetTopics(topics) - filter.LogCallback = func(log *vm.Log, removed bool) { - if callback != nil { - callback(log, removed) - } else { - s.logMu.Lock() - defer s.logMu.Unlock() - if queue := s.logQueue[id]; queue != nil { - queue.add(vmlog{log, removed}) + go func() { + for { + select { + case ph := <-pendingTxs: + api.filtersMu.Lock() + if f, found := api.filters[pendingTxSub.ID]; found { + f.hashes = append(f.hashes, ph) + } + api.filtersMu.Unlock() + case <-pendingTxSub.Err(): + api.filtersMu.Lock() + delete(api.filters, pendingTxSub.ID) + api.filtersMu.Unlock() + return } } + }() + + return pendingTxSub.ID +} + +// NewPendingTransactions creates a subscription that is triggered each time a transaction +// enters the transaction pool and was signed from one of the transactions this nodes manages. +func (api *PublicFilterAPI) NewPendingTransactions(ctx context.Context) (*rpc.Subscription, error) { + notifier, supported := rpc.NotifierFromContext(ctx) + if !supported { + return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported } - return id, nil + rpcSub := notifier.CreateSubscription() + + go func() { + txHashes := make(chan common.Hash) + pendingTxSub := api.events.SubscribePendingTxEvents(txHashes) + + for { + select { + case h := <-txHashes: + notifier.Notify(rpcSub.ID, h) + case <-rpcSub.Err(): + pendingTxSub.Unsubscribe() + return + case <-notifier.Closed(): + pendingTxSub.Unsubscribe() + return + } + } + }() + + return rpcSub, nil +} + +// NewBlockFilter creates a filter that fetches blocks that are imported into the chain. +// It is part of the filter package since polling goes with eth_getFilterChanges. +// +// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newblockfilter +func (api *PublicFilterAPI) NewBlockFilter() rpc.ID { + var ( + headers = make(chan *types.Header) + headerSub = api.events.SubscribeNewHeads(headers) + ) + + api.filtersMu.Lock() + api.filters[headerSub.ID] = &filter{typ: BlocksSubscription, deadline: time.NewTimer(deadline), hashes: make([]common.Hash, 0), s: headerSub} + api.filtersMu.Unlock() + + go func() { + for { + select { + case h := <-headers: + api.filtersMu.Lock() + if f, found := api.filters[headerSub.ID]; found { + f.hashes = append(f.hashes, h.Hash()) + } + api.filtersMu.Unlock() + case <-headerSub.Err(): + api.filtersMu.Lock() + delete(api.filters, headerSub.ID) + api.filtersMu.Unlock() + return + } + } + }() + + return headerSub.ID +} + +// NewHeads send a notification each time a new (header) block is appended to the chain. +func (api *PublicFilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, error) { + notifier, supported := rpc.NotifierFromContext(ctx) + if !supported { + return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported + } + + rpcSub := notifier.CreateSubscription() + + go func() { + headers := make(chan *types.Header) + headersSub := api.events.SubscribeNewHeads(headers) + + for { + select { + case h := <-headers: + notifier.Notify(rpcSub.ID, h) + case <-rpcSub.Err(): + headersSub.Unsubscribe() + return + case <-notifier.Closed(): + headersSub.Unsubscribe() + return + } + } + }() + + return rpcSub, nil } // Logs creates a subscription that fires for all new log that match the given filter criteria. -func (s *PublicFilterAPI) Logs(ctx context.Context, args NewFilterArgs) (rpc.Subscription, error) { +func (api *PublicFilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc.Subscription, error) { notifier, supported := rpc.NotifierFromContext(ctx) if !supported { - return nil, rpc.ErrNotificationsUnsupported + return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported } - var ( - externalId string - subscription rpc.Subscription - err error - ) + rpcSub := notifier.CreateSubscription() - if externalId, err = newFilterId(); err != nil { - return nil, err - } + go func() { + matchedLogs := make(chan []Log) + logsSub := api.events.SubscribeLogs(crit, matchedLogs) - // uninstall filter when subscription is unsubscribed/cancelled - if subscription, err = notifier.NewSubscription(func(string) { - s.UninstallFilter(externalId) - }); err != nil { - return nil, err - } - - notifySubscriber := func(log *vm.Log, removed bool) { - rpcLog := toRPCLogs(vm.Logs{log}, removed) - if err := subscription.Notify(rpcLog); err != nil { - subscription.Cancel() + for { + select { + case logs := <-matchedLogs: + for _, log := range logs { + notifier.Notify(rpcSub.ID, &log) + } + case <-rpcSub.Err(): // client send an unsubscribe request + logsSub.Unsubscribe() + return + case <-notifier.Closed(): // connection dropped + logsSub.Unsubscribe() + return + } } - } + }() - // from and to block number are not used since subscriptions don't allow you to travel to "time" - var id int - if len(args.Addresses) > 0 { - id, err = s.newLogFilter(-1, -1, args.Addresses, args.Topics, notifySubscriber) - } else { - id, err = s.newLogFilter(-1, -1, nil, args.Topics, notifySubscriber) - } - - if err != nil { - subscription.Cancel() - return nil, err - } - - s.filterMapMu.Lock() - s.filterMapping[externalId] = id - s.filterMapMu.Unlock() - - return subscription, err + return rpcSub, nil } -// NewFilterArgs represents a request to create a new filter. -type NewFilterArgs struct { - FromBlock rpc.BlockNumber - ToBlock rpc.BlockNumber +// FilterCriteria represents a request to create a new filter. +type FilterCriteria struct { + FromBlock *big.Int + ToBlock *big.Int Addresses []common.Address Topics [][]common.Hash } +// NewFilter creates a new filter and returns the filter id. It can be +// used to retrieve logs when the state changes. This method cannot be +// used to fetch logs that are already stored in the state. +// +// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newfilter +func (api *PublicFilterAPI) NewFilter(crit FilterCriteria) rpc.ID { + var ( + logs = make(chan []Log) + logsSub = api.events.SubscribeLogs(crit, logs) + ) + + if crit.FromBlock == nil { + crit.FromBlock = big.NewInt(rpc.LatestBlockNumber.Int64()) + } + if crit.ToBlock == nil { + crit.ToBlock = big.NewInt(rpc.LatestBlockNumber.Int64()) + } + + api.filtersMu.Lock() + api.filters[logsSub.ID] = &filter{typ: LogsSubscription, crit: crit, deadline: time.NewTimer(deadline), logs: make([]Log, 0), s: logsSub} + api.filtersMu.Unlock() + + go func() { + for { + select { + case l := <-logs: + api.filtersMu.Lock() + if f, found := api.filters[logsSub.ID]; found { + f.logs = append(f.logs, l...) + } + api.filtersMu.Unlock() + case <-logsSub.Err(): + api.filtersMu.Lock() + delete(api.filters, logsSub.ID) + api.filtersMu.Unlock() + return + } + } + }() + + return logsSub.ID +} + +// GetLogs returns logs matching the given argument that are stored within the state. +// +// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getlogs +func (api *PublicFilterAPI) GetLogs(crit FilterCriteria) []Log { + if crit.FromBlock == nil { + crit.FromBlock = big.NewInt(rpc.LatestBlockNumber.Int64()) + } + if crit.ToBlock == nil { + crit.ToBlock = big.NewInt(rpc.LatestBlockNumber.Int64()) + } + + filter := New(api.chainDb) + filter.SetBeginBlock(crit.FromBlock.Int64()) + filter.SetEndBlock(crit.ToBlock.Int64()) + filter.SetAddresses(crit.Addresses) + filter.SetTopics(crit.Topics) + + return returnLogs(filter.Find()) +} + +// UninstallFilter removes the filter with the given filter id. +// +// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_uninstallfilter +func (api *PublicFilterAPI) UninstallFilter(id rpc.ID) bool { + api.filtersMu.Lock() + f, found := api.filters[id] + if found { + delete(api.filters, id) + } + api.filtersMu.Unlock() + if found { + f.s.Unsubscribe() + } + + return found +} + +// GetFilterLogs returns the logs for the filter with the given id. +// If the filter could not be found an empty array of logs is returned. +// +// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getfilterlogs +func (api *PublicFilterAPI) GetFilterLogs(id rpc.ID) []Log { + api.filtersMu.Lock() + f, found := api.filters[id] + api.filtersMu.Unlock() + + if !found || f.typ != LogsSubscription { + return []Log{} + } + + filter := New(api.chainDb) + filter.SetBeginBlock(f.crit.FromBlock.Int64()) + filter.SetEndBlock(f.crit.ToBlock.Int64()) + filter.SetAddresses(f.crit.Addresses) + filter.SetTopics(f.crit.Topics) + + return returnLogs(filter.Find()) +} + +// GetFilterChanges returns the logs for the filter with the given id since +// last time is was called. This can be used for polling. +// +// For pending transaction and block filters the result is []common.Hash. +// (pending)Log filters return []Log. If the filter could not be found +// []interface{}{} is returned. +// +// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getfilterchanges +func (api *PublicFilterAPI) GetFilterChanges(id rpc.ID) interface{} { + api.filtersMu.Lock() + defer api.filtersMu.Unlock() + + if f, found := api.filters[id]; found { + if !f.deadline.Stop() { + // timer expired but filter is not yet removed in timeout loop + // receive timer value and reset timer + <-f.deadline.C + } + f.deadline.Reset(deadline) + + switch f.typ { + case PendingTransactionsSubscription, BlocksSubscription: + hashes := f.hashes + f.hashes = nil + return returnHashes(hashes) + case PendingLogsSubscription, LogsSubscription: + logs := f.logs + f.logs = nil + return returnLogs(logs) + } + } + + return []interface{}{} +} + +// returnHashes is a helper that will return an empty hash array case the given hash array is nil, +// otherwise the given hashes array is returned. +func returnHashes(hashes []common.Hash) []common.Hash { + if hashes == nil { + return []common.Hash{} + } + return hashes +} + +// returnLogs is a helper that will return an empty log array in case the given logs array is nil, +// otherwise the given logs array is returned. +func returnLogs(logs []Log) []Log { + if logs == nil { + return []Log{} + } + return logs +} + // UnmarshalJSON sets *args fields with given data. -func (args *NewFilterArgs) UnmarshalJSON(data []byte) error { +func (args *FilterCriteria) UnmarshalJSON(data []byte) error { type input struct { From *rpc.BlockNumber `json:"fromBlock"` ToBlock *rpc.BlockNumber `json:"toBlock"` @@ -316,15 +438,15 @@ func (args *NewFilterArgs) UnmarshalJSON(data []byte) error { } if raw.From == nil || raw.From.Int64() < 0 { - args.FromBlock = rpc.LatestBlockNumber + args.FromBlock = big.NewInt(rpc.LatestBlockNumber.Int64()) } else { - args.FromBlock = *raw.From + args.FromBlock = big.NewInt(raw.From.Int64()) } if raw.ToBlock == nil || raw.ToBlock.Int64() < 0 { - args.ToBlock = rpc.LatestBlockNumber + args.ToBlock = big.NewInt(rpc.LatestBlockNumber.Int64()) } else { - args.ToBlock = *raw.ToBlock + args.ToBlock = big.NewInt(raw.ToBlock.Int64()) } args.Addresses = []common.Address{} @@ -414,255 +536,3 @@ func (args *NewFilterArgs) UnmarshalJSON(data []byte) error { return nil } - -// NewFilter creates a new filter and returns the filter id. It can be uses to retrieve logs. -func (s *PublicFilterAPI) NewFilter(args NewFilterArgs) (string, error) { - externalId, err := newFilterId() - if err != nil { - return "", err - } - - var id int - if len(args.Addresses) > 0 { - id, err = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), args.Addresses, args.Topics, nil) - } else { - id, err = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), nil, args.Topics, nil) - } - if err != nil { - return "", err - } - - s.filterMapMu.Lock() - s.filterMapping[externalId] = id - s.filterMapMu.Unlock() - - return externalId, nil -} - -// GetLogs returns the logs matching the given argument. -func (s *PublicFilterAPI) GetLogs(args NewFilterArgs) []vmlog { - filter := New(s.chainDb) - filter.SetBeginBlock(args.FromBlock.Int64()) - filter.SetEndBlock(args.ToBlock.Int64()) - filter.SetAddresses(args.Addresses) - filter.SetTopics(args.Topics) - - return toRPCLogs(filter.Find(), false) -} - -// UninstallFilter removes the filter with the given filter id. -func (s *PublicFilterAPI) UninstallFilter(filterId string) bool { - s.filterManager.Lock() - defer s.filterManager.Unlock() - - s.filterMapMu.Lock() - id, ok := s.filterMapping[filterId] - if !ok { - s.filterMapMu.Unlock() - return false - } - delete(s.filterMapping, filterId) - s.filterMapMu.Unlock() - - s.filterManager.Remove(id) - - s.logMu.Lock() - if _, ok := s.logQueue[id]; ok { - delete(s.logQueue, id) - s.logMu.Unlock() - return true - } - s.logMu.Unlock() - - s.blockMu.Lock() - if _, ok := s.blockQueue[id]; ok { - delete(s.blockQueue, id) - s.blockMu.Unlock() - return true - } - s.blockMu.Unlock() - - s.transactionMu.Lock() - if _, ok := s.transactionQueue[id]; ok { - delete(s.transactionQueue, id) - s.transactionMu.Unlock() - return true - } - s.transactionMu.Unlock() - - return false -} - -// getFilterType is a helper utility that determine the type of filter for the given filter id. -func (s *PublicFilterAPI) getFilterType(id int) byte { - if _, ok := s.blockQueue[id]; ok { - return blockFilterTy - } else if _, ok := s.transactionQueue[id]; ok { - return transactionFilterTy - } else if _, ok := s.logQueue[id]; ok { - return logFilterTy - } - - return unknownFilterTy -} - -// blockFilterChanged returns a collection of block hashes for the block filter with the given id. -func (s *PublicFilterAPI) blockFilterChanged(id int) []common.Hash { - s.blockMu.Lock() - defer s.blockMu.Unlock() - - if s.blockQueue[id] != nil { - return s.blockQueue[id].get() - } - return nil -} - -// transactionFilterChanged returns a collection of transaction hashes for the pending -// transaction filter with the given id. -func (s *PublicFilterAPI) transactionFilterChanged(id int) []common.Hash { - s.blockMu.Lock() - defer s.blockMu.Unlock() - - if s.transactionQueue[id] != nil { - return s.transactionQueue[id].get() - } - return nil -} - -// logFilterChanged returns a collection of logs for the log filter with the given id. -func (s *PublicFilterAPI) logFilterChanged(id int) []vmlog { - s.logMu.Lock() - defer s.logMu.Unlock() - - if s.logQueue[id] != nil { - return s.logQueue[id].get() - } - return nil -} - -// GetFilterLogs returns the logs for the filter with the given id. -func (s *PublicFilterAPI) GetFilterLogs(filterId string) []vmlog { - s.filterMapMu.RLock() - id, ok := s.filterMapping[filterId] - s.filterMapMu.RUnlock() - if !ok { - return toRPCLogs(nil, false) - } - - if filter := s.filterManager.Get(id); filter != nil { - return toRPCLogs(filter.Find(), false) - } - - return toRPCLogs(nil, false) -} - -// GetFilterChanges returns the logs for the filter with the given id since last time is was called. -// This can be used for polling. -func (s *PublicFilterAPI) GetFilterChanges(filterId string) interface{} { - s.filterMapMu.RLock() - id, ok := s.filterMapping[filterId] - s.filterMapMu.RUnlock() - - if !ok { // filter not found - return []interface{}{} - } - - switch s.getFilterType(id) { - case blockFilterTy: - return returnHashes(s.blockFilterChanged(id)) - case transactionFilterTy: - return returnHashes(s.transactionFilterChanged(id)) - case logFilterTy: - return s.logFilterChanged(id) - } - - return []interface{}{} -} - -type vmlog struct { - *vm.Log - Removed bool `json:"removed"` -} - -type logQueue struct { - mu sync.Mutex - - logs []vmlog - timeout time.Time - id int -} - -func (l *logQueue) add(logs ...vmlog) { - l.mu.Lock() - defer l.mu.Unlock() - - l.logs = append(l.logs, logs...) -} - -func (l *logQueue) get() []vmlog { - l.mu.Lock() - defer l.mu.Unlock() - - l.timeout = time.Now() - tmp := l.logs - l.logs = nil - return tmp -} - -type hashQueue struct { - mu sync.Mutex - - hashes []common.Hash - timeout time.Time - id int -} - -func (l *hashQueue) add(hashes ...common.Hash) { - l.mu.Lock() - defer l.mu.Unlock() - - l.hashes = append(l.hashes, hashes...) -} - -func (l *hashQueue) get() []common.Hash { - l.mu.Lock() - defer l.mu.Unlock() - - l.timeout = time.Now() - tmp := l.hashes - l.hashes = nil - return tmp -} - -// newFilterId generates a new random filter identifier that can be exposed to the outer world. By publishing random -// identifiers it is not feasible for DApp's to guess filter id's for other DApp's and uninstall or poll for them -// causing the affected DApp to miss data. -func newFilterId() (string, error) { - var subid [16]byte - n, _ := rand.Read(subid[:]) - if n != 16 { - return "", errors.New("Unable to generate filter id") - } - return "0x" + hex.EncodeToString(subid[:]), nil -} - -// toRPCLogs is a helper that will convert a vm.Logs array to an structure which -// can hold additional information about the logs such as whether it was deleted. -// Additionally when nil is given it will by default instead create an empty slice -// instead. This is required by the RPC specification. -func toRPCLogs(logs vm.Logs, removed bool) []vmlog { - convertedLogs := make([]vmlog, len(logs)) - for i, log := range logs { - convertedLogs[i] = vmlog{Log: log, Removed: removed} - } - return convertedLogs -} - -// returnHashes is a helper that will return an empty hash array case the given hash array is nil, otherwise is will -// return the given hashes. The RPC interfaces defines that always an array is returned. -func returnHashes(hashes []common.Hash) []common.Hash { - if hashes == nil { - return []common.Hash{} - } - return hashes -} diff --git a/eth/filters/api_test.go b/eth/filters/api_test.go index 9e8edc2413..98eb6cbaa2 100644 --- a/eth/filters/api_test.go +++ b/eth/filters/api_test.go @@ -14,7 +14,7 @@ // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see . -package filters_test +package filters import ( "encoding/json" @@ -22,7 +22,6 @@ import ( "testing" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/eth/filters" "github.com/ethereum/go-ethereum/rpc" ) @@ -39,14 +38,14 @@ func TestUnmarshalJSONNewFilterArgs(t *testing.T) { ) // default values - var test0 filters.NewFilterArgs + var test0 FilterCriteria if err := json.Unmarshal([]byte("{}"), &test0); err != nil { t.Fatal(err) } - if test0.FromBlock != rpc.LatestBlockNumber { + if test0.FromBlock.Int64() != rpc.LatestBlockNumber.Int64() { t.Fatalf("expected %d, got %d", rpc.LatestBlockNumber, test0.FromBlock) } - if test0.ToBlock != rpc.LatestBlockNumber { + if test0.ToBlock.Int64() != rpc.LatestBlockNumber.Int64() { t.Fatalf("expected %d, got %d", rpc.LatestBlockNumber, test0.ToBlock) } if len(test0.Addresses) != 0 { @@ -57,20 +56,20 @@ func TestUnmarshalJSONNewFilterArgs(t *testing.T) { } // from, to block number - var test1 filters.NewFilterArgs + var test1 FilterCriteria vector := fmt.Sprintf(`{"fromBlock":"0x%x","toBlock":"0x%x"}`, fromBlock, toBlock) if err := json.Unmarshal([]byte(vector), &test1); err != nil { t.Fatal(err) } - if test1.FromBlock != fromBlock { + if test1.FromBlock.Int64() != fromBlock.Int64() { t.Fatalf("expected FromBlock %d, got %d", fromBlock, test1.FromBlock) } - if test1.ToBlock != toBlock { + if test1.ToBlock.Int64() != toBlock.Int64() { t.Fatalf("expected ToBlock %d, got %d", toBlock, test1.ToBlock) } // single address - var test2 filters.NewFilterArgs + var test2 FilterCriteria vector = fmt.Sprintf(`{"address": "%s"}`, address0.Hex()) if err := json.Unmarshal([]byte(vector), &test2); err != nil { t.Fatal(err) @@ -83,7 +82,7 @@ func TestUnmarshalJSONNewFilterArgs(t *testing.T) { } // multiple address - var test3 filters.NewFilterArgs + var test3 FilterCriteria vector = fmt.Sprintf(`{"address": ["%s", "%s"]}`, address0.Hex(), address1.Hex()) if err := json.Unmarshal([]byte(vector), &test3); err != nil { t.Fatal(err) @@ -99,7 +98,7 @@ func TestUnmarshalJSONNewFilterArgs(t *testing.T) { } // single topic - var test4 filters.NewFilterArgs + var test4 FilterCriteria vector = fmt.Sprintf(`{"topics": ["%s"]}`, topic0.Hex()) if err := json.Unmarshal([]byte(vector), &test4); err != nil { t.Fatal(err) @@ -115,7 +114,7 @@ func TestUnmarshalJSONNewFilterArgs(t *testing.T) { } // test multiple "AND" topics - var test5 filters.NewFilterArgs + var test5 FilterCriteria vector = fmt.Sprintf(`{"topics": ["%s", "%s"]}`, topic0.Hex(), topic1.Hex()) if err := json.Unmarshal([]byte(vector), &test5); err != nil { t.Fatal(err) @@ -137,7 +136,7 @@ func TestUnmarshalJSONNewFilterArgs(t *testing.T) { } // test optional topic - var test6 filters.NewFilterArgs + var test6 FilterCriteria vector = fmt.Sprintf(`{"topics": ["%s", null, "%s"]}`, topic0.Hex(), topic2.Hex()) if err := json.Unmarshal([]byte(vector), &test6); err != nil { t.Fatal(err) @@ -165,7 +164,7 @@ func TestUnmarshalJSONNewFilterArgs(t *testing.T) { } // test OR topics - var test7 filters.NewFilterArgs + var test7 FilterCriteria vector = fmt.Sprintf(`{"topics": [["%s", "%s"], null, ["%s", null]]}`, topic0.Hex(), topic1.Hex(), topic2.Hex()) if err := json.Unmarshal([]byte(vector), &test7); err != nil { t.Fatal(err) diff --git a/eth/filters/filter.go b/eth/filters/filter.go index fd739bf0ee..4226620dc4 100644 --- a/eth/filters/filter.go +++ b/eth/filters/filter.go @@ -23,15 +23,10 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/ethdb" ) -type AccountChange struct { - Address, StateAddress []byte -} - -// Filtering interface +// Filter can be used to retrieve and filter logs type Filter struct { created time.Time @@ -39,70 +34,72 @@ type Filter struct { begin, end int64 addresses []common.Address topics [][]common.Hash - - BlockCallback func(*types.Block, vm.Logs) - TransactionCallback func(*types.Transaction) - LogCallback func(*vm.Log, bool) } -// Create a new filter which uses a bloom filter on blocks to figure out whether a particular block -// is interesting or not. +// New creates a new filter which uses a bloom filter on blocks to figure out whether +// a particular block is interesting or not. func New(db ethdb.Database) *Filter { return &Filter{db: db} } -// Set the earliest and latest block for filtering. +// SetBeginBlock sets the earliest block for filtering. // -1 = latest block (i.e., the current block) // hash = particular hash from-to -func (self *Filter) SetBeginBlock(begin int64) { - self.begin = begin +func (f *Filter) SetBeginBlock(begin int64) { + f.begin = begin } -func (self *Filter) SetEndBlock(end int64) { - self.end = end +// SetEndBlock sets the latest block for filtering. +func (f *Filter) SetEndBlock(end int64) { + f.end = end } -func (self *Filter) SetAddresses(addr []common.Address) { - self.addresses = addr +// SetAddresses matches only logs that are generated from addresses that are included +// in the given addresses. +func (f *Filter) SetAddresses(addr []common.Address) { + f.addresses = addr } -func (self *Filter) SetTopics(topics [][]common.Hash) { - self.topics = topics +// SetTopics matches only logs that have topics matching the given topics. +func (f *Filter) SetTopics(topics [][]common.Hash) { + f.topics = topics } // Run filters logs with the current parameters set -func (self *Filter) Find() vm.Logs { - latestHash := core.GetHeadBlockHash(self.db) - latestBlock := core.GetBlock(self.db, latestHash, core.GetBlockNumber(self.db, latestHash)) +func (f *Filter) Find() []Log { + latestHash := core.GetHeadBlockHash(f.db) + latestBlock := core.GetBlock(f.db, latestHash, core.GetBlockNumber(f.db, latestHash)) if latestBlock == nil { - return vm.Logs{} + return []Log{} } - var beginBlockNo uint64 = uint64(self.begin) - if self.begin == -1 { + + var beginBlockNo uint64 = uint64(f.begin) + if f.begin == -1 { beginBlockNo = latestBlock.NumberU64() } - var endBlockNo uint64 = uint64(self.end) - if self.end == -1 { + + endBlockNo := uint64(f.end) + if f.end == -1 { endBlockNo = latestBlock.NumberU64() } // if no addresses are present we can't make use of fast search which // uses the mipmap bloom filters to check for fast inclusion and uses // higher range probability in order to ensure at least a false positive - if len(self.addresses) == 0 { - return self.getLogs(beginBlockNo, endBlockNo) + if len(f.addresses) == 0 { + return f.getLogs(beginBlockNo, endBlockNo) } - return self.mipFind(beginBlockNo, endBlockNo, 0) + return f.mipFind(beginBlockNo, endBlockNo, 0) } -func (self *Filter) mipFind(start, end uint64, depth int) (logs vm.Logs) { +func (f *Filter) mipFind(start, end uint64, depth int) (logs []Log) { level := core.MIPMapLevels[depth] // normalise numerator so we can work in level specific batches and // work with the proper range checks for num := start / level * level; num <= end; num += level { // find addresses in bloom filters - bloom := core.GetMipmapBloom(self.db, num, level) - for _, addr := range self.addresses { + bloom := core.GetMipmapBloom(f.db, num, level) + for _, addr := range f.addresses { if bloom.TestBytes(addr[:]) { // range check normalised values and make sure that // we're resolving the correct range instead of the @@ -110,9 +107,9 @@ func (self *Filter) mipFind(start, end uint64, depth int) (logs vm.Logs) { start := uint64(math.Max(float64(num), float64(start))) end := uint64(math.Min(float64(num+level-1), float64(end))) if depth+1 == len(core.MIPMapLevels) { - logs = append(logs, self.getLogs(start, end)...) + logs = append(logs, f.getLogs(start, end)...) } else { - logs = append(logs, self.mipFind(start, end, depth+1)...) + logs = append(logs, f.mipFind(start, end, depth+1)...) } // break so we don't check the same range for each // possible address. Checks on multiple addresses @@ -125,12 +122,15 @@ func (self *Filter) mipFind(start, end uint64, depth int) (logs vm.Logs) { return logs } -func (self *Filter) getLogs(start, end uint64) (logs vm.Logs) { +func (f *Filter) getLogs(start, end uint64) (logs []Log) { + var block *types.Block + for i := start; i <= end; i++ { - var block *types.Block - hash := core.GetCanonicalHash(self.db, i) + hash := core.GetCanonicalHash(f.db, i) if hash != (common.Hash{}) { - block = core.GetBlock(self.db, hash, i) + block = core.GetBlock(f.db, hash, i) + } else { // block not found + return logs } if block == nil { // block not found/written return logs @@ -138,16 +138,20 @@ func (self *Filter) getLogs(start, end uint64) (logs vm.Logs) { // Use bloom filtering to see if this block is interesting given the // current parameters - if self.bloomFilter(block) { + if f.bloomFilter(block) { // Get the logs of the block var ( - receipts = core.GetBlockReceipts(self.db, block.Hash(), i) - unfiltered vm.Logs + receipts = core.GetBlockReceipts(f.db, block.Hash(), i) + unfiltered []Log ) for _, receipt := range receipts { - unfiltered = append(unfiltered, receipt.Logs...) + rl := make([]Log, len(receipt.Logs)) + for i, l := range receipt.Logs { + rl[i] = Log{l, false} + } + unfiltered = append(unfiltered, rl...) } - logs = append(logs, self.FilterLogs(unfiltered)...) + logs = append(logs, filterLogs(unfiltered, f.addresses, f.topics)...) } } @@ -164,26 +168,25 @@ func includes(addresses []common.Address, a common.Address) bool { return false } -func (self *Filter) FilterLogs(logs vm.Logs) vm.Logs { - var ret vm.Logs +func filterLogs(logs []Log, addresses []common.Address, topics [][]common.Hash) []Log { + var ret []Log // Filter the logs for interesting stuff Logs: for _, log := range logs { - if len(self.addresses) > 0 && !includes(self.addresses, log.Address) { + if len(addresses) > 0 && !includes(addresses, log.Address) { continue } - logTopics := make([]common.Hash, len(self.topics)) + logTopics := make([]common.Hash, len(topics)) copy(logTopics, log.Topics) - // If the to filtered topics is greater than the amount of topics in - // logs, skip. - if len(self.topics) > len(log.Topics) { + // If the to filtered topics is greater than the amount of topics in logs, skip. + if len(topics) > len(log.Topics) { continue Logs } - for i, topics := range self.topics { + for i, topics := range topics { var match bool for _, topic := range topics { // common.Hash{} is a match all (wildcard) @@ -196,7 +199,6 @@ Logs: if !match { continue Logs } - } ret = append(ret, log) @@ -205,10 +207,10 @@ Logs: return ret } -func (self *Filter) bloomFilter(block *types.Block) bool { - if len(self.addresses) > 0 { +func (f *Filter) bloomFilter(block *types.Block) bool { + if len(f.addresses) > 0 { var included bool - for _, addr := range self.addresses { + for _, addr := range f.addresses { if types.BloomLookup(block.Bloom(), addr) { included = true break @@ -220,7 +222,7 @@ func (self *Filter) bloomFilter(block *types.Block) bool { } } - for _, sub := range self.topics { + for _, sub := range f.topics { var included bool for _, topic := range sub { if (topic == common.Hash{}) || types.BloomLookup(block.Bloom(), topic) { diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index 2564642131..04a55fd099 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -14,179 +14,305 @@ // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see . -// package filters implements an ethereum filtering system for block, +// Package filters implements an ethereum filtering system for block, // transactions and log events. package filters import ( + "encoding/json" + "errors" "fmt" "sync" "time" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/rpc" ) -// FilterType determines the type of filter and is used to put the filter in to +// Type determines the kind of filter and is used to put the filter in to // the correct bucket when added. -type FilterType byte +type Type byte const ( - ChainFilter FilterType = iota // new block events filter - PendingTxFilter // pending transaction filter - LogFilter // new or removed log filter - PendingLogFilter // pending log filter + // UnknownSubscription indicates an unkown subscription type + UnknownSubscription Type = iota + // LogsSubscription queries for new or removed (chain reorg) logs + LogsSubscription + // PendingLogsSubscription queries for logs for the pending block + PendingLogsSubscription + // PendingTransactionsSubscription queries tx hashes for pending + // transactions entering the pending state + PendingTransactionsSubscription + // BlocksSubscription queries hashes for blocks that are imported + BlocksSubscription ) -// FilterSystem manages filters that filter specific events such as -// block, transaction and log events. The Filtering system can be used to listen -// for specific LOG events fired by the EVM (Ethereum Virtual Machine). -type FilterSystem struct { - filterMu sync.RWMutex - filterId int +var ( + ErrInvalidSubscriptionID = errors.New("invalid id") +) - chainFilters map[int]*Filter - pendingTxFilters map[int]*Filter - logFilters map[int]*Filter - pendingLogFilters map[int]*Filter - - // generic is an ugly hack for Get - generic map[int]*Filter - - sub event.Subscription +// Log is a helper that can hold additional information about vm.Log +// necessary for the RPC interface. +type Log struct { + *vm.Log + Removed bool `json:"removed"` } -// NewFilterSystem returns a newly allocated filter manager -func NewFilterSystem(mux *event.TypeMux) *FilterSystem { - fs := &FilterSystem{ - chainFilters: make(map[int]*Filter), - pendingTxFilters: make(map[int]*Filter), - logFilters: make(map[int]*Filter), - pendingLogFilters: make(map[int]*Filter), - generic: make(map[int]*Filter), +func (l *Log) MarshalJSON() ([]byte, error) { + fields := map[string]interface{}{ + "address": l.Address, + "data": fmt.Sprintf("0x%x", l.Data), + "blockNumber": fmt.Sprintf("%#x", l.BlockNumber), + "logIndex": fmt.Sprintf("%#x", l.Index), + "blockHash": l.BlockHash, + "transactionHash": l.TxHash, + "transactionIndex": fmt.Sprintf("%#x", l.TxIndex), + "topics": l.Topics, + "removed": l.Removed, } - fs.sub = mux.Subscribe( - core.PendingLogsEvent{}, - core.RemovedLogsEvent{}, - core.ChainEvent{}, - core.TxPreEvent{}, - vm.Logs(nil), - ) - go fs.filterLoop() - return fs + + return json.Marshal(fields) } -// Stop quits the filter loop required for polling events -func (fs *FilterSystem) Stop() { - fs.sub.Unsubscribe() +type subscription struct { + id rpc.ID + typ Type + created time.Time + logsCrit FilterCriteria + logs chan []Log + hashes chan common.Hash + headers chan *types.Header + installed chan struct{} // closed when the filter is installed + err chan error // closed when the filter is uninstalled } -// Acquire filter system maps lock, required to force lock acquisition -// sequence with filterMu acquired first to avoid deadlocks by callbacks -func (fs *FilterSystem) Lock() { - fs.filterMu.Lock() +// EventSystem creates subscriptions, processes events and broadcasts them to the +// subscription which match the subscription criteria. +type EventSystem struct { + mux *event.TypeMux + sub event.Subscription + install chan *subscription // install filter for event notification + uninstall chan *subscription // remove filter for event notification } -// Release filter system maps lock -func (fs *FilterSystem) Unlock() { - fs.filterMu.Unlock() -} - -// Add adds a filter to the filter manager -// Expects filterMu to be locked. -func (fs *FilterSystem) Add(filter *Filter, filterType FilterType) (int, error) { - id := fs.filterId - filter.created = time.Now() - - switch filterType { - case ChainFilter: - fs.chainFilters[id] = filter - case PendingTxFilter: - fs.pendingTxFilters[id] = filter - case LogFilter: - fs.logFilters[id] = filter - case PendingLogFilter: - fs.pendingLogFilters[id] = filter - default: - return 0, fmt.Errorf("unknown filter type %v", filterType) +// NewEventSystem creates a new manager that listens for event on the given mux, +// parses and filters them. It uses the all map to retrieve filter changes. The +// work loop holds its own index that is used to forward events to filters. +// +// The returned manager has a loop that needs to be stopped with the Stop function +// or by stopping the given mux. +func NewEventSystem(mux *event.TypeMux) *EventSystem { + m := &EventSystem{ + mux: mux, + install: make(chan *subscription), + uninstall: make(chan *subscription), } - fs.generic[id] = filter - fs.filterId++ + go m.eventLoop() - return id, nil + return m } -// Remove removes a filter by filter id -// Expects filterMu to be locked. -func (fs *FilterSystem) Remove(id int) { - delete(fs.chainFilters, id) - delete(fs.pendingTxFilters, id) - delete(fs.logFilters, id) - delete(fs.pendingLogFilters, id) - delete(fs.generic, id) +// Subscription is created when the client registers itself for a particular event. +type Subscription struct { + ID rpc.ID + f *subscription + es *EventSystem + unsubOnce sync.Once } -func (fs *FilterSystem) Get(id int) *Filter { - fs.filterMu.RLock() - defer fs.filterMu.RUnlock() - - return fs.generic[id] +// Err returns a channel that is closed when unsubscribed. +func (sub *Subscription) Err() <-chan error { + return sub.f.err } -// filterLoop waits for specific events from ethereum and fires their handlers -// when the filter matches the requirements. -func (fs *FilterSystem) filterLoop() { - for event := range fs.sub.Chan() { - switch ev := event.Data.(type) { - case core.ChainEvent: - fs.filterMu.RLock() - for _, filter := range fs.chainFilters { - if filter.BlockCallback != nil && !filter.created.After(event.Time) { - filter.BlockCallback(ev.Block, ev.Logs) - } +// Unsubscribe uninstalls the subscription from the event broadcast loop. +func (sub *Subscription) Unsubscribe() { + sub.unsubOnce.Do(func() { + uninstallLoop: + for { + // write uninstall request and consume logs/hashes. This prevents + // the eventLoop broadcast method to deadlock when writing to the + // filter event channel while the subscription loop is waiting for + // this method to return (and thus not reading these events). + select { + case sub.es.uninstall <- sub.f: + break uninstallLoop + case <-sub.f.logs: + case <-sub.f.hashes: + case <-sub.f.headers: } - fs.filterMu.RUnlock() - case core.TxPreEvent: - fs.filterMu.RLock() - for _, filter := range fs.pendingTxFilters { - if filter.TransactionCallback != nil && !filter.created.After(event.Time) { - filter.TransactionCallback(ev.Tx) - } - } - fs.filterMu.RUnlock() + } - case vm.Logs: - fs.filterMu.RLock() - for _, filter := range fs.logFilters { - if filter.LogCallback != nil && !filter.created.After(event.Time) { - for _, log := range filter.FilterLogs(ev) { - filter.LogCallback(log, false) + // wait for filter to be uninstalled in work loop before returning + // this ensures that the manager won't use the event channel which + // will probably be closed by the client asap after this method returns. + <-sub.Err() + }) +} + +// subscribe installs the subscription in the event broadcast loop. +func (es *EventSystem) subscribe(sub *subscription) *Subscription { + es.install <- sub + <-sub.installed + return &Subscription{ID: sub.id, f: sub, es: es} +} + +// SubscribeLogs creates a subscription that will write all logs matching the +// given criteria to the given logs channel. +func (es *EventSystem) SubscribeLogs(crit FilterCriteria, logs chan []Log) *Subscription { + sub := &subscription{ + id: rpc.NewID(), + typ: LogsSubscription, + logsCrit: crit, + created: time.Now(), + logs: logs, + hashes: make(chan common.Hash), + headers: make(chan *types.Header), + installed: make(chan struct{}), + err: make(chan error), + } + + return es.subscribe(sub) +} + +// SubscribePendingLogs creates a subscription that will write pending logs matching the +// given criteria to the given channel. +func (es *EventSystem) SubscribePendingLogs(crit FilterCriteria, logs chan []Log) *Subscription { + sub := &subscription{ + id: rpc.NewID(), + typ: PendingLogsSubscription, + logsCrit: crit, + created: time.Now(), + logs: logs, + hashes: make(chan common.Hash), + headers: make(chan *types.Header), + installed: make(chan struct{}), + err: make(chan error), + } + + return es.subscribe(sub) +} + +// SubscribePendingTxEvents creates a sbuscription that writes transaction hashes for +// transactions that enter the transaction pool. +func (es *EventSystem) SubscribePendingTxEvents(hashes chan common.Hash) *Subscription { + sub := &subscription{ + id: rpc.NewID(), + typ: PendingTransactionsSubscription, + created: time.Now(), + logs: make(chan []Log), + hashes: hashes, + headers: make(chan *types.Header), + installed: make(chan struct{}), + err: make(chan error), + } + + return es.subscribe(sub) +} + +// SubscribeNewHeads creates a subscription that writes the header of a block that is +// imported in the chain. +func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscription { + sub := &subscription{ + id: rpc.NewID(), + typ: BlocksSubscription, + created: time.Now(), + logs: make(chan []Log), + hashes: make(chan common.Hash), + headers: headers, + installed: make(chan struct{}), + err: make(chan error), + } + + return es.subscribe(sub) +} + +type filterIndex map[Type]map[rpc.ID]*subscription + +// broadcast event to filters that match criteria. +func broadcast(filters filterIndex, ev *event.Event) { + if ev == nil { + return + } + + switch e := ev.Data.(type) { + case vm.Logs: + if len(e) > 0 { + for _, f := range filters[LogsSubscription] { + if ev.Time.After(f.created) { + if matchedLogs := filterLogs(convertLogs(e, false), f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 { + f.logs <- matchedLogs } } } - fs.filterMu.RUnlock() - case core.RemovedLogsEvent: - fs.filterMu.RLock() - for _, filter := range fs.logFilters { - if filter.LogCallback != nil && !filter.created.After(event.Time) { - for _, removedLog := range filter.FilterLogs(ev.Logs) { - filter.LogCallback(removedLog, true) - } + } + case core.RemovedLogsEvent: + for _, f := range filters[LogsSubscription] { + if ev.Time.After(f.created) { + if matchedLogs := filterLogs(convertLogs(e.Logs, true), f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 { + f.logs <- matchedLogs } } - fs.filterMu.RUnlock() - case core.PendingLogsEvent: - fs.filterMu.RLock() - for _, filter := range fs.pendingLogFilters { - if filter.LogCallback != nil && !filter.created.After(event.Time) { - for _, pendingLog := range ev.Logs { - filter.LogCallback(pendingLog, false) - } + } + case core.PendingLogsEvent: + for _, f := range filters[PendingLogsSubscription] { + if ev.Time.After(f.created) { + if matchedLogs := filterLogs(convertLogs(e.Logs, false), f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 { + f.logs <- matchedLogs } } - fs.filterMu.RUnlock() + } + case core.TxPreEvent: + for _, f := range filters[PendingTransactionsSubscription] { + if ev.Time.After(f.created) { + f.hashes <- e.Tx.Hash() + } + } + case core.ChainEvent: + for _, f := range filters[BlocksSubscription] { + if ev.Time.After(f.created) { + f.headers <- e.Block.Header() + } } } } + +// eventLoop (un)installs filters and processes mux events. +func (es *EventSystem) eventLoop() { + var ( + index = make(filterIndex) + sub = es.mux.Subscribe(core.PendingLogsEvent{}, core.RemovedLogsEvent{}, vm.Logs{}, core.TxPreEvent{}, core.ChainEvent{}) + ) + for { + select { + case ev, active := <-sub.Chan(): + if !active { // system stopped + return + } + broadcast(index, ev) + case f := <-es.install: + if _, found := index[f.typ]; !found { + index[f.typ] = make(map[rpc.ID]*subscription) + } + index[f.typ][f.id] = f + close(f.installed) + case f := <-es.uninstall: + delete(index[f.typ], f.id) + close(f.err) + } + } +} + +// convertLogs is a helper utility that converts vm.Logs to []filter.Log. +func convertLogs(in vm.Logs, removed bool) []Log { + logs := make([]Log, len(in)) + for i, l := range in { + logs[i] = Log{l, removed} + } + return logs +} diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go index 72824cb088..9e6fde1c64 100644 --- a/eth/filters/filter_system_test.go +++ b/eth/filters/filter_system_test.go @@ -17,101 +17,310 @@ package filters import ( + "math/big" + "reflect" "testing" "time" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" + "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/rpc" ) -func TestCallbacks(t *testing.T) { +var ( + mux = new(event.TypeMux) + db, _ = ethdb.NewMemDatabase() + api = NewPublicFilterAPI(db, mux) +) + +// TestBlockSubscription tests if a block subscription returns block hashes for posted chain events. +// It creates multiple subscriptions: +// - one at the start and should receive all posted chain events and a second (blockHashes) +// - one that is created after a cutoff moment and uninstalled after a second cutoff moment (blockHashes[cutoff1:cutoff2]) +// - one that is created after the second cutoff moment (blockHashes[cutoff2:]) +func TestBlockSubscription(t *testing.T) { + t.Parallel() + var ( - mux event.TypeMux - fs = NewFilterSystem(&mux) - blockDone = make(chan struct{}) - txDone = make(chan struct{}) - logDone = make(chan struct{}) - removedLogDone = make(chan struct{}) - pendingLogDone = make(chan struct{}) + genesis = core.WriteGenesisBlockForTesting(db) + chain, _ = core.GenerateChain(nil, genesis, db, 10, func(i int, gen *core.BlockGen) {}) + chainEvents = []core.ChainEvent{} ) - blockFilter := &Filter{ - BlockCallback: func(*types.Block, vm.Logs) { - close(blockDone) - }, + for _, blk := range chain { + chainEvents = append(chainEvents, core.ChainEvent{Hash: blk.Hash(), Block: blk}) } - txFilter := &Filter{ - TransactionCallback: func(*types.Transaction) { - close(txDone) - }, - } - logFilter := &Filter{ - LogCallback: func(l *vm.Log, oob bool) { - if !oob { - close(logDone) + + chan0 := make(chan *types.Header) + sub0 := api.events.SubscribeNewHeads(chan0) + chan1 := make(chan *types.Header) + sub1 := api.events.SubscribeNewHeads(chan1) + + go func() { // simulate client + i1, i2 := 0, 0 + for i1 != len(chainEvents) || i2 != len(chainEvents) { + select { + case header := <-chan0: + if chainEvents[i1].Hash != header.Hash() { + t.Errorf("sub0 received invalid hash on index %d, want %x, got %x", i1, chainEvents[i1].Hash, header.Hash()) + } + i1++ + case header := <-chan1: + if chainEvents[i2].Hash != header.Hash() { + t.Errorf("sub1 received invalid hash on index %d, want %x, got %x", i2, chainEvents[i2].Hash, header.Hash()) + } + i2++ } - }, - } - removedLogFilter := &Filter{ - LogCallback: func(l *vm.Log, oob bool) { - if oob { - close(removedLogDone) - } - }, - } - pendingLogFilter := &Filter{ - LogCallback: func(*vm.Log, bool) { - close(pendingLogDone) - }, + } + + sub0.Unsubscribe() + sub1.Unsubscribe() + }() + + time.Sleep(1 * time.Second) + for _, e := range chainEvents { + mux.Post(e) } - fs.Add(blockFilter, ChainFilter) - fs.Add(txFilter, PendingTxFilter) - fs.Add(logFilter, LogFilter) - fs.Add(removedLogFilter, LogFilter) - fs.Add(pendingLogFilter, PendingLogFilter) + <-sub0.Err() + <-sub1.Err() +} - mux.Post(core.ChainEvent{}) - mux.Post(core.TxPreEvent{}) - mux.Post(vm.Logs{&vm.Log{}}) - mux.Post(core.RemovedLogsEvent{Logs: vm.Logs{&vm.Log{}}}) - mux.Post(core.PendingLogsEvent{Logs: vm.Logs{&vm.Log{}}}) +// TestPendingTxFilter tests whether pending tx filters retrieve all pending transactions that are posted to the event mux. +func TestPendingTxFilter(t *testing.T) { + t.Parallel() - const dura = 5 * time.Second - failTimer := time.NewTimer(dura) - select { - case <-blockDone: - case <-failTimer.C: - t.Error("block filter failed to trigger (timeout)") + var ( + transactions = []*types.Transaction{ + types.NewTransaction(0, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), new(big.Int), new(big.Int), nil), + types.NewTransaction(1, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), new(big.Int), new(big.Int), nil), + types.NewTransaction(2, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), new(big.Int), new(big.Int), nil), + types.NewTransaction(3, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), new(big.Int), new(big.Int), nil), + types.NewTransaction(4, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), new(big.Int), new(big.Int), nil), + } + + hashes []common.Hash + ) + + fid0 := api.NewPendingTransactionFilter() + + time.Sleep(1 * time.Second) + for _, tx := range transactions { + ev := core.TxPreEvent{Tx: tx} + mux.Post(ev) } - failTimer.Reset(dura) - select { - case <-txDone: - case <-failTimer.C: - t.Error("transaction filter failed to trigger (timeout)") + for { + h := api.GetFilterChanges(fid0).([]common.Hash) + hashes = append(hashes, h...) + + if len(hashes) >= len(transactions) { + break + } + + time.Sleep(100 * time.Millisecond) } - failTimer.Reset(dura) - select { - case <-logDone: - case <-failTimer.C: - t.Error("log filter failed to trigger (timeout)") - } - - failTimer.Reset(dura) - select { - case <-removedLogDone: - case <-failTimer.C: - t.Error("removed log filter failed to trigger (timeout)") - } - - failTimer.Reset(dura) - select { - case <-pendingLogDone: - case <-failTimer.C: - t.Error("pending log filter failed to trigger (timeout)") + for i := range hashes { + if hashes[i] != transactions[i].Hash() { + t.Errorf("hashes[%d] invalid, want %x, got %x", i, transactions[i].Hash(), hashes[i]) + } + } +} + +// TestLogFilter tests whether log filters match the correct logs that are posted to the event mux. +func TestLogFilter(t *testing.T) { + t.Parallel() + + var ( + firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111") + secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222") + thirdAddress = common.HexToAddress("0x3333333333333333333333333333333333333333") + notUsedAddress = common.HexToAddress("0x9999999999999999999999999999999999999999") + firstTopic = common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111") + secondTopic = common.HexToHash("0x2222222222222222222222222222222222222222222222222222222222222222") + notUsedTopic = common.HexToHash("0x9999999999999999999999999999999999999999999999999999999999999999") + + allLogs = vm.Logs{ + // Note, these are used for comparison of the test cases. + vm.NewLog(firstAddr, []common.Hash{}, []byte(""), 0), + vm.NewLog(firstAddr, []common.Hash{firstTopic}, []byte(""), 1), + vm.NewLog(secondAddr, []common.Hash{firstTopic}, []byte(""), 1), + vm.NewLog(thirdAddress, []common.Hash{secondTopic}, []byte(""), 2), + vm.NewLog(thirdAddress, []common.Hash{secondTopic}, []byte(""), 3), + } + + testCases = []struct { + crit FilterCriteria + expected vm.Logs + id rpc.ID + }{ + // match all + {FilterCriteria{}, allLogs, ""}, + // match none due to no matching addresses + {FilterCriteria{Addresses: []common.Address{common.Address{}, notUsedAddress}, Topics: [][]common.Hash{allLogs[0].Topics}}, vm.Logs{}, ""}, + // match logs based on addresses, ignore topics + {FilterCriteria{Addresses: []common.Address{firstAddr}}, allLogs[:2], ""}, + // match none due to no matching topics (match with address) + {FilterCriteria{Addresses: []common.Address{secondAddr}, Topics: [][]common.Hash{[]common.Hash{notUsedTopic}}}, vm.Logs{}, ""}, + // match logs based on addresses and topics + {FilterCriteria{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{[]common.Hash{firstTopic, secondTopic}}}, allLogs[3:5], ""}, + // match logs based on multiple addresses and "or" topics + {FilterCriteria{Addresses: []common.Address{secondAddr, thirdAddress}, Topics: [][]common.Hash{[]common.Hash{firstTopic, secondTopic}}}, allLogs[2:5], ""}, + // block numbers are ignored for filters created with New***Filter, these return all logs that match the given criterias when the state changes + {FilterCriteria{Addresses: []common.Address{firstAddr}, FromBlock: big.NewInt(1), ToBlock: big.NewInt(2)}, allLogs[:2], ""}, + } + + err error + ) + + // create all filters + for i := range testCases { + testCases[i].id = api.NewFilter(testCases[i].crit) + } + + // raise events + time.Sleep(1 * time.Second) + if err = mux.Post(allLogs); err != nil { + t.Fatal(err) + } + + for i, tt := range testCases { + var fetched []Log + for { // fetch all expected logs + fetched = append(fetched, api.GetFilterChanges(tt.id).([]Log)...) + if len(fetched) >= len(tt.expected) { + break + } + + time.Sleep(100 * time.Millisecond) + } + + if len(fetched) != len(tt.expected) { + t.Errorf("invalid number of logs for case %d, want %d log(s), got %d", i, len(tt.expected), len(fetched)) + return + } + + for l := range fetched { + if fetched[l].Removed { + t.Errorf("expected log not to be removed for log %d in case %d", l, i) + } + if !reflect.DeepEqual(fetched[l].Log, tt.expected[l]) { + t.Errorf("invalid log on index %d for case %d", l, i) + } + + } + } +} + +// TestPendingLogsSubscription tests if a subscription receives the correct pending logs that are posted to the event mux. +func TestPendingLogsSubscription(t *testing.T) { + t.Parallel() + + var ( + firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111") + secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222") + thirdAddress = common.HexToAddress("0x3333333333333333333333333333333333333333") + notUsedAddress = common.HexToAddress("0x9999999999999999999999999999999999999999") + firstTopic = common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111") + secondTopic = common.HexToHash("0x2222222222222222222222222222222222222222222222222222222222222222") + thirdTopic = common.HexToHash("0x3333333333333333333333333333333333333333333333333333333333333333") + forthTopic = common.HexToHash("0x4444444444444444444444444444444444444444444444444444444444444444") + notUsedTopic = common.HexToHash("0x9999999999999999999999999999999999999999999999999999999999999999") + + allLogs = []core.PendingLogsEvent{ + core.PendingLogsEvent{Logs: vm.Logs{vm.NewLog(firstAddr, []common.Hash{}, []byte(""), 0)}}, + core.PendingLogsEvent{Logs: vm.Logs{vm.NewLog(firstAddr, []common.Hash{firstTopic}, []byte(""), 1)}}, + core.PendingLogsEvent{Logs: vm.Logs{vm.NewLog(secondAddr, []common.Hash{firstTopic}, []byte(""), 2)}}, + core.PendingLogsEvent{Logs: vm.Logs{vm.NewLog(thirdAddress, []common.Hash{secondTopic}, []byte(""), 3)}}, + core.PendingLogsEvent{Logs: vm.Logs{vm.NewLog(thirdAddress, []common.Hash{secondTopic}, []byte(""), 4)}}, + core.PendingLogsEvent{Logs: vm.Logs{ + vm.NewLog(thirdAddress, []common.Hash{firstTopic}, []byte(""), 5), + vm.NewLog(thirdAddress, []common.Hash{thirdTopic}, []byte(""), 5), + vm.NewLog(thirdAddress, []common.Hash{forthTopic}, []byte(""), 5), + vm.NewLog(firstAddr, []common.Hash{firstTopic}, []byte(""), 5), + }}, + } + + convertLogs = func(pl []core.PendingLogsEvent) vm.Logs { + var logs vm.Logs + for _, l := range pl { + logs = append(logs, l.Logs...) + } + return logs + } + + testCases = []struct { + crit FilterCriteria + expected vm.Logs + c chan []Log + sub *Subscription + }{ + // match all + {FilterCriteria{}, convertLogs(allLogs), nil, nil}, + // match none due to no matching addresses + {FilterCriteria{Addresses: []common.Address{common.Address{}, notUsedAddress}, Topics: [][]common.Hash{[]common.Hash{}}}, vm.Logs{}, nil, nil}, + // match logs based on addresses, ignore topics + {FilterCriteria{Addresses: []common.Address{firstAddr}}, append(convertLogs(allLogs[:2]), allLogs[5].Logs[3]), nil, nil}, + // match none due to no matching topics (match with address) + {FilterCriteria{Addresses: []common.Address{secondAddr}, Topics: [][]common.Hash{[]common.Hash{notUsedTopic}}}, vm.Logs{}, nil, nil}, + // match logs based on addresses and topics + {FilterCriteria{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{[]common.Hash{firstTopic, secondTopic}}}, append(convertLogs(allLogs[3:5]), allLogs[5].Logs[0]), nil, nil}, + // match logs based on multiple addresses and "or" topics + {FilterCriteria{Addresses: []common.Address{secondAddr, thirdAddress}, Topics: [][]common.Hash{[]common.Hash{firstTopic, secondTopic}}}, append(convertLogs(allLogs[2:5]), allLogs[5].Logs[0]), nil, nil}, + // block numbers are ignored for filters created with New***Filter, these return all logs that match the given criterias when the state changes + {FilterCriteria{Addresses: []common.Address{firstAddr}, FromBlock: big.NewInt(2), ToBlock: big.NewInt(3)}, append(convertLogs(allLogs[:2]), allLogs[5].Logs[3]), nil, nil}, + // multiple pending logs, should match only 2 topics from the logs in block 5 + {FilterCriteria{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{[]common.Hash{firstTopic, forthTopic}}}, vm.Logs{allLogs[5].Logs[0], allLogs[5].Logs[2]}, nil, nil}, + } + ) + + // create all subscriptions, this ensures all subscriptions are created before the events are posted. + // on slow machines this could otherwise lead to missing events when the subscription is created after + // (some) events are posted. + for i := range testCases { + testCases[i].c = make(chan []Log) + testCases[i].sub = api.events.SubscribePendingLogs(testCases[i].crit, testCases[i].c) + } + + for n, test := range testCases { + i := n + tt := test + go func() { + var fetched []Log + fetchLoop: + for { + logs := <-tt.c + fetched = append(fetched, logs...) + if len(fetched) >= len(tt.expected) { + break fetchLoop + } + } + + if len(fetched) != len(tt.expected) { + t.Fatalf("invalid number of logs for case %d, want %d log(s), got %d", i, len(tt.expected), len(fetched)) + } + + for l := range fetched { + if fetched[l].Removed { + t.Errorf("expected log not to be removed for log %d in case %d", l, i) + } + if !reflect.DeepEqual(fetched[l].Log, tt.expected[l]) { + t.Errorf("invalid log on index %d for case %d", l, i) + } + } + }() + } + + // raise events + time.Sleep(1 * time.Second) + for _, l := range allLogs { + if err := mux.Post(l); err != nil { + t.Fatal(err) + } } } diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go index 88bacc45bf..e1729d1d29 100644 --- a/internal/ethapi/api.go +++ b/internal/ethapi/api.go @@ -24,7 +24,6 @@ import ( "fmt" "math/big" "strings" - "sync" "time" "github.com/ethereum/ethash" @@ -345,37 +344,12 @@ func (s *PrivateAccountAPI) SignAndSendTransaction(ctx context.Context, args Sen // PublicBlockChainAPI provides an API to access the Ethereum blockchain. // It offers only methods that operate on public data that is freely available to anyone. type PublicBlockChainAPI struct { - b Backend - muNewBlockSubscriptions sync.Mutex // protects newBlocksSubscriptions - newBlockSubscriptions map[string]func(core.ChainEvent) error // callbacks for new block subscriptions + b Backend } // NewPublicBlockChainAPI creates a new Etheruem blockchain API. func NewPublicBlockChainAPI(b Backend) *PublicBlockChainAPI { - api := &PublicBlockChainAPI{ - b: b, - newBlockSubscriptions: make(map[string]func(core.ChainEvent) error), - } - - go api.subscriptionLoop() - - return api -} - -// subscriptionLoop reads events from the global event mux and creates notifications for the matched subscriptions. -func (s *PublicBlockChainAPI) subscriptionLoop() { - sub := s.b.EventMux().Subscribe(core.ChainEvent{}) - for event := range sub.Chan() { - if chainEvent, ok := event.Data.(core.ChainEvent); ok { - s.muNewBlockSubscriptions.Lock() - for id, notifyOf := range s.newBlockSubscriptions { - if notifyOf(chainEvent) == rpc.ErrNotificationNotFound { - delete(s.newBlockSubscriptions, id) - } - } - s.muNewBlockSubscriptions.Unlock() - } - } + return &PublicBlockChainAPI{b} } // BlockNumber returns the block number of the chain head. @@ -470,45 +444,6 @@ func (s *PublicBlockChainAPI) GetUncleCountByBlockHash(ctx context.Context, bloc return nil } -// NewBlocksArgs allows the user to specify if the returned block should include transactions and in which format. -type NewBlocksArgs struct { - IncludeTransactions bool `json:"includeTransactions"` - TransactionDetails bool `json:"transactionDetails"` -} - -// NewBlocks triggers a new block event each time a block is appended to the chain. It accepts an argument which allows -// the caller to specify whether the output should contain transactions and in what format. -func (s *PublicBlockChainAPI) NewBlocks(ctx context.Context, args NewBlocksArgs) (rpc.Subscription, error) { - notifier, supported := rpc.NotifierFromContext(ctx) - if !supported { - return nil, rpc.ErrNotificationsUnsupported - } - - // create a subscription that will remove itself when unsubscribed/cancelled - subscription, err := notifier.NewSubscription(func(subId string) { - s.muNewBlockSubscriptions.Lock() - delete(s.newBlockSubscriptions, subId) - s.muNewBlockSubscriptions.Unlock() - }) - - if err != nil { - return nil, err - } - - // add a callback that is called on chain events which will format the block and notify the client - s.muNewBlockSubscriptions.Lock() - s.newBlockSubscriptions[subscription.ID()] = func(e core.ChainEvent) error { - notification, err := s.rpcOutputBlock(e.Block, args.IncludeTransactions, args.TransactionDetails) - if err == nil { - return subscription.Notify(notification) - } - glog.V(logger.Warn).Info("unable to format block %v\n", err) - return nil - } - s.muNewBlockSubscriptions.Unlock() - return subscription, nil -} - // GetCode returns the code stored at the given address in the state for the given block number. func (s *PublicBlockChainAPI) GetCode(ctx context.Context, address common.Address, blockNr rpc.BlockNumber) (string, error) { state, _, err := s.b.StateAndHeaderByNumber(blockNr) @@ -867,40 +802,12 @@ func newRPCTransaction(b *types.Block, txHash common.Hash) (*RPCTransaction, err // PublicTransactionPoolAPI exposes methods for the RPC interface type PublicTransactionPoolAPI struct { - b Backend - muPendingTxSubs sync.Mutex - pendingTxSubs map[string]rpc.Subscription + b Backend } // NewPublicTransactionPoolAPI creates a new RPC service with methods specific for the transaction pool. func NewPublicTransactionPoolAPI(b Backend) *PublicTransactionPoolAPI { - api := &PublicTransactionPoolAPI{ - b: b, - pendingTxSubs: make(map[string]rpc.Subscription), - } - - go api.subscriptionLoop() - - return api -} - -// subscriptionLoop listens for events on the global event mux and creates notifications for subscriptions. -func (s *PublicTransactionPoolAPI) subscriptionLoop() { - sub := s.b.EventMux().Subscribe(core.TxPreEvent{}) - for event := range sub.Chan() { - tx := event.Data.(core.TxPreEvent) - if from, err := tx.Tx.FromFrontier(); err == nil { - if s.b.AccountManager().HasAddress(from) { - s.muPendingTxSubs.Lock() - for id, sub := range s.pendingTxSubs { - if sub.Notify(tx.Tx.Hash()) == rpc.ErrNotificationNotFound { - delete(s.pendingTxSubs, id) - } - } - s.muPendingTxSubs.Unlock() - } - } - } + return &PublicTransactionPoolAPI{b} } func getTransaction(chainDb ethdb.Database, b Backend, txHash common.Hash) (*types.Transaction, bool, error) { @@ -1353,31 +1260,6 @@ func (s *PublicTransactionPoolAPI) PendingTransactions() []*RPCTransaction { return transactions } -// NewPendingTransactions creates a subscription that is triggered each time a transaction enters the transaction pool -// and is send from one of the transactions this nodes manages. -func (s *PublicTransactionPoolAPI) NewPendingTransactions(ctx context.Context) (rpc.Subscription, error) { - notifier, supported := rpc.NotifierFromContext(ctx) - if !supported { - return nil, rpc.ErrNotificationsUnsupported - } - - subscription, err := notifier.NewSubscription(func(id string) { - s.muPendingTxSubs.Lock() - delete(s.pendingTxSubs, id) - s.muPendingTxSubs.Unlock() - }) - - if err != nil { - return nil, err - } - - s.muPendingTxSubs.Lock() - s.pendingTxSubs[subscription.ID()] = subscription - s.muPendingTxSubs.Unlock() - - return subscription, nil -} - // Resend accepts an existing transaction and a new gas price and limit. It will remove the given transaction from the // pool and reinsert it with the new gas price and limit. func (s *PublicTransactionPoolAPI) Resend(ctx context.Context, tx *Tx, gasPrice, gasLimit *rpc.HexNumber) (common.Hash, error) { diff --git a/rpc/notification.go b/rpc/notification.go deleted file mode 100644 index 8754330716..0000000000 --- a/rpc/notification.go +++ /dev/null @@ -1,297 +0,0 @@ -// Copyright 2016 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see . - -package rpc - -import ( - "errors" - "sync" - "time" - - "github.com/ethereum/go-ethereum/logger" - "github.com/ethereum/go-ethereum/logger/glog" - "golang.org/x/net/context" -) - -var ( - // ErrNotificationsUnsupported is returned when the connection doesn't support notifications - ErrNotificationsUnsupported = errors.New("subscription notifications not supported by the current transport") - - // ErrNotificationNotFound is returned when the notification for the given id is not found - ErrNotificationNotFound = errors.New("notification not found") - - // errNotifierStopped is returned when the notifier is stopped (e.g. codec is closed) - errNotifierStopped = errors.New("unable to send notification") - - // errNotificationQueueFull is returns when there are too many notifications in the queue - errNotificationQueueFull = errors.New("too many pending notifications") -) - -// unsubSignal is a signal that the subscription is unsubscribed. It is used to flush buffered -// notifications that might be pending in the internal queue. -var unsubSignal = new(struct{}) - -// UnsubscribeCallback defines a callback that is called when a subcription ends. -// It receives the subscription id as argument. -type UnsubscribeCallback func(id string) - -// notification is a helper object that holds event data for a subscription -type notification struct { - sub *bufferedSubscription // subscription id - data interface{} // event data -} - -// A Notifier type describes the interface for objects that can send create subscriptions -type Notifier interface { - // Create a new subscription. The given callback is called when this subscription - // is cancelled (e.g. client send an unsubscribe, connection closed). - NewSubscription(UnsubscribeCallback) (Subscription, error) - // Cancel subscription - Unsubscribe(id string) error -} - -type notifierKey struct{} - -// NotifierFromContext returns the Notifier value stored in ctx, if any. -func NotifierFromContext(ctx context.Context) (Notifier, bool) { - n, ok := ctx.Value(notifierKey{}).(Notifier) - return n, ok -} - -// Subscription defines the interface for objects that can notify subscribers -type Subscription interface { - // Inform client of an event - Notify(data interface{}) error - // Unique identifier - ID() string - // Cancel subscription - Cancel() error -} - -// bufferedSubscription is a subscription that uses a bufferedNotifier to send -// notifications to subscribers. -type bufferedSubscription struct { - id string - unsubOnce sync.Once // call unsub method once - unsub UnsubscribeCallback // called on Unsubscribed - notifier *bufferedNotifier // forward notifications to - pending chan interface{} // closed when active - flushed chan interface{} // closed when all buffered notifications are send - lastNotification time.Time // last time a notification was send -} - -// ID returns the subscription identifier that the client uses to refer to this instance. -func (s *bufferedSubscription) ID() string { - return s.id -} - -// Cancel informs the notifier that this subscription is cancelled by the API -func (s *bufferedSubscription) Cancel() error { - return s.notifier.Unsubscribe(s.id) -} - -// Notify the subscriber of a particular event. -func (s *bufferedSubscription) Notify(data interface{}) error { - return s.notifier.send(s.id, data) -} - -// bufferedNotifier is a notifier that queues notifications in an internal queue and -// send them as fast as possible to the client from this queue. It will stop if the -// queue grows past a given size. -type bufferedNotifier struct { - codec ServerCodec // underlying connection - mu sync.Mutex // guard internal state - subscriptions map[string]*bufferedSubscription // keep track of subscriptions associated with codec - queueSize int // max number of items in queue - queue chan *notification // notification queue - stopped bool // indication if this notifier is ordered to stop -} - -// newBufferedNotifier returns a notifier that queues notifications in an internal queue -// from which notifications are send as fast as possible to the client. If the queue size -// limit is reached (client is unable to keep up) it will stop and closes the codec. -func newBufferedNotifier(codec ServerCodec, size int) *bufferedNotifier { - notifier := &bufferedNotifier{ - codec: codec, - subscriptions: make(map[string]*bufferedSubscription), - queue: make(chan *notification, size), - queueSize: size, - } - - go notifier.run() - - return notifier -} - -// NewSubscription creates a new subscription that forwards events to this instance internal -// queue. The given callback is called when the subscription is unsubscribed/cancelled. -func (n *bufferedNotifier) NewSubscription(callback UnsubscribeCallback) (Subscription, error) { - id, err := newSubscriptionID() - if err != nil { - return nil, err - } - - n.mu.Lock() - defer n.mu.Unlock() - - if n.stopped { - return nil, errNotifierStopped - } - - sub := &bufferedSubscription{ - id: id, - unsub: callback, - notifier: n, - pending: make(chan interface{}), - flushed: make(chan interface{}), - lastNotification: time.Now(), - } - - n.subscriptions[id] = sub - - return sub, nil -} - -// Remove the given subscription. If subscription is not found notificationNotFoundErr is returned. -func (n *bufferedNotifier) Unsubscribe(subid string) error { - n.mu.Lock() - sub, found := n.subscriptions[subid] - n.mu.Unlock() - - if found { - // send the unsubscribe signal, this will cause the notifier not to accept new events - // for this subscription and will close the flushed channel after the last (buffered) - // notification was send to the client. - if err := n.send(subid, unsubSignal); err != nil { - return err - } - - // wait for confirmation that all (buffered) events are send for this subscription. - // this ensures that the unsubscribe method response is not send before all buffered - // events for this subscription are send. - <-sub.flushed - - return nil - } - - return ErrNotificationNotFound -} - -// Send enques the given data for the subscription with public ID on the internal queue. t returns -// an error when the notifier is stopped or the queue is full. If data is the unsubscribe signal it -// will remove the subscription with the given id from the subscription collection. -func (n *bufferedNotifier) send(id string, data interface{}) error { - n.mu.Lock() - defer n.mu.Unlock() - - if n.stopped { - return errNotifierStopped - } - - var ( - subscription *bufferedSubscription - found bool - ) - - // check if subscription is associated with this connection, it might be cancelled - // (subscribe/connection closed) - if subscription, found = n.subscriptions[id]; !found { - glog.V(logger.Error).Infof("received notification for unknown subscription %s\n", id) - return ErrNotificationNotFound - } - - // received the unsubscribe signal. Add it to the queue to make sure any pending notifications - // for this subscription are send. When the run loop receives this singal it will signal that - // all pending subscriptions are flushed and that the confirmation of the unsubscribe can be - // send to the user. Remove the subscriptions to make sure new notifications are not accepted. - if data == unsubSignal { - delete(n.subscriptions, id) - if subscription.unsub != nil { - subscription.unsubOnce.Do(func() { subscription.unsub(id) }) - } - } - - subscription.lastNotification = time.Now() - - if len(n.queue) >= n.queueSize { - glog.V(logger.Warn).Infoln("too many buffered notifications -> close connection") - n.codec.Close() - return errNotificationQueueFull - } - - n.queue <- ¬ification{subscription, data} - return nil -} - -// run reads notifications from the internal queue and sends them to the client. In case of an -// error, or when the codec is closed it will cancel all active subscriptions and returns. -func (n *bufferedNotifier) run() { - defer func() { - n.mu.Lock() - defer n.mu.Unlock() - - n.stopped = true - close(n.queue) - - // on exit call unsubscribe callback - for id, sub := range n.subscriptions { - if sub.unsub != nil { - sub.unsubOnce.Do(func() { sub.unsub(id) }) - } - close(sub.flushed) - delete(n.subscriptions, id) - } - }() - - for { - select { - case notification := <-n.queue: - // It can happen that an event is raised before the RPC server was able to send the sub - // id to the client. Therefore subscriptions are marked as pending until the sub id was - // send. The RPC server will activate the subscription by closing the pending chan. - <-notification.sub.pending - - if notification.data == unsubSignal { - // unsubSignal is the last accepted message for this subscription. Raise the signal - // that all buffered notifications are sent by closing the flushed channel. This - // indicates that the response for the unsubscribe can be send to the client. - close(notification.sub.flushed) - } else { - msg := n.codec.CreateNotification(notification.sub.id, notification.data) - if err := n.codec.Write(msg); err != nil { - n.codec.Close() - // unable to send notification to client, unsubscribe all subscriptions - glog.V(logger.Warn).Infof("unable to send notification - %v\n", err) - return - } - } - case <-n.codec.Closed(): // connection was closed - glog.V(logger.Debug).Infoln("codec closed, stop subscriptions") - return - } - } -} - -// Marks the subscription as active. This will causes the notifications for this subscription to be -// forwarded to the client. -func (n *bufferedNotifier) activate(subid string) { - n.mu.Lock() - defer n.mu.Unlock() - - if sub, found := n.subscriptions[subid]; found { - close(sub.pending) - } -} diff --git a/rpc/server.go b/rpc/server.go index 040805a5c2..996c637004 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -166,7 +166,7 @@ func (s *Server) serveRequest(codec ServerCodec, singleShot bool, options CodecO // to send notification to clients. It is thight to the codec/connection. If the // connection is closed the notifier will stop and cancels all active subscriptions. if options&OptionSubscriptions == OptionSubscriptions { - ctx = context.WithValue(ctx, notifierKey{}, newBufferedNotifier(codec, notificationBufferSize)) + ctx = context.WithValue(ctx, notifierKey{}, newNotifier(codec)) } s.codecsMu.Lock() if atomic.LoadInt32(&s.run) != 1 { // server stopped @@ -247,7 +247,7 @@ func (s *Server) Stop() { } // createSubscription will call the subscription callback and returns the subscription id or error. -func (s *Server) createSubscription(ctx context.Context, c ServerCodec, req *serverRequest) (string, error) { +func (s *Server) createSubscription(ctx context.Context, c ServerCodec, req *serverRequest) (ID, error) { // subscription have as first argument the context following optional arguments args := []reflect.Value{req.callb.rcvr, reflect.ValueOf(ctx)} args = append(args, req.args...) @@ -257,7 +257,7 @@ func (s *Server) createSubscription(ctx context.Context, c ServerCodec, req *ser return "", reply[1].Interface().(error) } - return reply[0].Interface().(Subscription).ID(), nil + return reply[0].Interface().(*Subscription).ID, nil } // handle executes a request and returns the response from the callback. @@ -273,8 +273,8 @@ func (s *Server) handle(ctx context.Context, codec ServerCodec, req *serverReque return codec.CreateErrorResponse(&req.id, &callbackError{ErrNotificationsUnsupported.Error()}), nil } - subid := req.args[0].String() - if err := notifier.Unsubscribe(subid); err != nil { + subid := ID(req.args[0].String()) + if err := notifier.unsubscribe(subid); err != nil { return codec.CreateErrorResponse(&req.id, &callbackError{err.Error()}), nil } @@ -292,7 +292,7 @@ func (s *Server) handle(ctx context.Context, codec ServerCodec, req *serverReque // active the subscription after the sub id was successfully sent to the client activateSub := func() { notifier, _ := NotifierFromContext(ctx) - notifier.(*bufferedNotifier).activate(subid) + notifier.activate(subid) } return codec.CreateResponse(req.id, subid), activateSub diff --git a/rpc/server_test.go b/rpc/server_test.go index e6840bde47..c3c88fab75 100644 --- a/rpc/server_test.go +++ b/rpc/server_test.go @@ -72,7 +72,7 @@ func (s *Service) InvalidRets3() (string, string, error) { return "", "", nil } -func (s *Service) Subscription(ctx context.Context) (Subscription, error) { +func (s *Service) Subscription(ctx context.Context) (*Subscription, error) { return nil, nil } diff --git a/rpc/subscription.go b/rpc/subscription.go new file mode 100644 index 0000000000..863d34b202 --- /dev/null +++ b/rpc/subscription.go @@ -0,0 +1,135 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rpc + +import ( + "errors" + "sync" + + "golang.org/x/net/context" +) + +var ( + // ErrNotificationsUnsupported is returned when the connection doesn't support notifications + ErrNotificationsUnsupported = errors.New("notifications not supported") + // ErrNotificationNotFound is returned when the notification for the given id is not found + ErrSubscriptionNotFound = errors.New("subscription not found") +) + +// ID defines a psuedo random number that is used to identify RPC subscriptions. +type ID string + +// a Subscription is created by a notifier and tight to that notifier. The client can use +// this subscription to wait for an unsubscribe request for the client, see Err(). +type Subscription struct { + ID ID + err chan error // closed on unsubscribe +} + +// Err returns a channel that is closed when the client send an unsubscribe request. +func (s *Subscription) Err() <-chan error { + return s.err +} + +// notifierKey is used to store a notifier within the connection context. +type notifierKey struct{} + +// Notifier is tight to a RPC connection that supports subscriptions. +// Server callbacks use the notifier to send notifications. +type Notifier struct { + codec ServerCodec + subMu sync.RWMutex // guards active and inactive maps + stopped bool + active map[ID]*Subscription + inactive map[ID]*Subscription +} + +// newNotifier creates a new notifier that can be used to send subscription +// notifications to the client. +func newNotifier(codec ServerCodec) *Notifier { + return &Notifier{ + codec: codec, + active: make(map[ID]*Subscription), + inactive: make(map[ID]*Subscription), + } +} + +// NotifierFromContext returns the Notifier value stored in ctx, if any. +func NotifierFromContext(ctx context.Context) (*Notifier, bool) { + n, ok := ctx.Value(notifierKey{}).(*Notifier) + return n, ok +} + +// CreateSubscription returns a new subscription that is coupled to the +// RPC connection. By default subscriptions are inactive and notifications +// are dropped until the subscription is marked as active. This is done +// by the RPC server after the subscription ID is send to the client. +func (n *Notifier) CreateSubscription() *Subscription { + s := &Subscription{NewID(), make(chan error)} + n.subMu.Lock() + n.inactive[s.ID] = s + n.subMu.Unlock() + return s +} + +// Notify sends a notification to the client with the given data as payload. +// If an error occurs the RPC connection is closed and the error is returned. +func (n *Notifier) Notify(id ID, data interface{}) error { + n.subMu.RLock() + defer n.subMu.RUnlock() + + _, active := n.active[id] + if active { + notification := n.codec.CreateNotification(string(id), data) + if err := n.codec.Write(notification); err != nil { + n.codec.Close() + return err + } + } + return nil +} + +// Closed returns a channel that is closed when the RPC connection is closed. +func (n *Notifier) Closed() <-chan interface{} { + return n.codec.Closed() +} + +// unsubscribe a subscription. +// If the subscription could not be found ErrSubscriptionNotFound is returned. +func (n *Notifier) unsubscribe(id ID) error { + n.subMu.Lock() + defer n.subMu.Unlock() + if s, found := n.active[id]; found { + close(s.err) + delete(n.active, id) + return nil + } + return ErrSubscriptionNotFound +} + +// activate enables a subscription. Until a subscription is enabled all +// notifications are dropped. This method is called by the RPC server after +// the subscription ID was sent to client. This prevents notifications being +// send to the client before the subscription ID is send to the client. +func (n *Notifier) activate(id ID) { + n.subMu.Lock() + defer n.subMu.Unlock() + if sub, found := n.inactive[id]; found { + n.active[id] = sub + delete(n.inactive, id) + } +} diff --git a/rpc/notification_test.go b/rpc/subscription_test.go similarity index 84% rename from rpc/notification_test.go rename to rpc/subscription_test.go index 52352848c4..8bb3416947 100644 --- a/rpc/notification_test.go +++ b/rpc/subscription_test.go @@ -50,7 +50,7 @@ func (s *NotificationTestService) Unsubscribe(subid string) { s.mu.Unlock() } -func (s *NotificationTestService) SomeSubscription(ctx context.Context, n, val int) (Subscription, error) { +func (s *NotificationTestService) SomeSubscription(ctx context.Context, n, val int) (*Subscription, error) { notifier, supported := NotifierFromContext(ctx) if !supported { return nil, ErrNotificationsUnsupported @@ -59,17 +59,29 @@ func (s *NotificationTestService) SomeSubscription(ctx context.Context, n, val i // by explicitly creating an subscription we make sure that the subscription id is send back to the client // before the first subscription.Notify is called. Otherwise the events might be send before the response // for the eth_subscribe method. - subscription, err := notifier.NewSubscription(s.Unsubscribe) - if err != nil { - return nil, err - } + subscription := notifier.CreateSubscription() go func() { + // test expects n events, if we begin sending event immediatly some events + // will probably be dropped since the subscription ID might not be send to + // the client. + time.Sleep(5 * time.Second) for i := 0; i < n; i++ { - if err := subscription.Notify(val + i); err != nil { + if err := notifier.Notify(subscription.ID, val+i); err != nil { return } } + + select { + case <-notifier.Closed(): + s.mu.Lock() + s.unsubscribed = true + s.mu.Unlock() + case <-subscription.Err(): + s.mu.Lock() + s.unsubscribed = true + s.mu.Unlock() + } }() return subscription, nil @@ -77,7 +89,7 @@ func (s *NotificationTestService) SomeSubscription(ctx context.Context, n, val i // HangSubscription blocks on s.unblockHangSubscription before // sending anything. -func (s *NotificationTestService) HangSubscription(ctx context.Context, val int) (Subscription, error) { +func (s *NotificationTestService) HangSubscription(ctx context.Context, val int) (*Subscription, error) { notifier, supported := NotifierFromContext(ctx) if !supported { return nil, ErrNotificationsUnsupported @@ -85,12 +97,10 @@ func (s *NotificationTestService) HangSubscription(ctx context.Context, val int) s.gotHangSubscriptionReq <- struct{}{} <-s.unblockHangSubscription - subscription, err := notifier.NewSubscription(s.Unsubscribe) - if err != nil { - return nil, err - } + subscription := notifier.CreateSubscription() + go func() { - subscription.Notify(val) + notifier.Notify(subscription.ID, val) }() return subscription, nil } diff --git a/rpc/types.go b/rpc/types.go index 2a7268ad8b..89c5b5bc9b 100644 --- a/rpc/types.go +++ b/rpc/types.go @@ -269,6 +269,6 @@ func (bn *BlockNumber) UnmarshalJSON(data []byte) error { return fmt.Errorf("blocknumber not in range [%d, %d]", earliestBlockNumber, maxBlockNumber) } -func (bn *BlockNumber) Int64() int64 { - return (int64)(*bn) +func (bn BlockNumber) Int64() int64 { + return (int64)(bn) } diff --git a/rpc/utils.go b/rpc/utils.go index 1ac6698f56..b590ba62f3 100644 --- a/rpc/utils.go +++ b/rpc/utils.go @@ -17,17 +17,26 @@ package rpc import ( - "crypto/rand" + "bufio" + crand "crypto/rand" + "encoding/binary" "encoding/hex" - "errors" "math/big" + "math/rand" "reflect" + "sync" + "time" "unicode" "unicode/utf8" "golang.org/x/net/context" ) +var ( + subscriptionIDGenMu sync.Mutex + subscriptionIDGen = idGenerator() +) + // Is this an exported - upper case - name? func isExported(name string) bool { rune, _ := utf8.DecodeRuneInString(name) @@ -218,11 +227,28 @@ METHODS: return callbacks, subscriptions } -func newSubscriptionID() (string, error) { - var subid [16]byte - n, _ := rand.Read(subid[:]) - if n != 16 { - return "", errors.New("Unable to generate subscription id") +// idGenerator helper utility that generates a (pseudo) random sequence of +// bytes that are used to generate identifiers. +func idGenerator() *rand.Rand { + if seed, err := binary.ReadVarint(bufio.NewReader(crand.Reader)); err == nil { + return rand.New(rand.NewSource(seed)) } - return "0x" + hex.EncodeToString(subid[:]), nil + return rand.New(rand.NewSource(int64(time.Now().Nanosecond()))) +} + +// NewID generates a identifier that can be used as an identifier in the RPC interface. +// e.g. filter and subscription identifier. +func NewID() ID { + subscriptionIDGenMu.Lock() + defer subscriptionIDGenMu.Unlock() + + id := make([]byte, 16) + for i := 0; i < len(id); i += 7 { + val := subscriptionIDGen.Int63() + for j := 0; i+j < len(id) && j < 7; j++ { + id[i+j] = byte(val) + val >>= 8 + } + } + return ID("0x" + hex.EncodeToString(id)) }