599 lines
15 KiB
599 lines
15 KiB
// Copyright 2015 The go-ethereum Authors
// This file is part of go-ethereum.
// go-ethereum is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// go-ethereum is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with go-ethereum. If not, see <http://www.gnu.org/licenses/>.
package filters
import (
var (
filterTickerTime = 5 * time.Minute
// byte will be inferred
const (
unknownFilterTy = iota
// PublicFilterAPI offers support to create and manage filters. This will allow externa clients to retrieve various
// information related to the Ethereum protocol such als blocks, transactions and logs.
type PublicFilterAPI struct {
mux *event.TypeMux
quit chan struct{}
chainDb ethdb.Database
filterManager *FilterSystem
filterMapMu sync.RWMutex
filterMapping map[string]int // maps between filter internal filter identifiers and external filter identifiers
logMu sync.RWMutex
logQueue map[int]*logQueue
blockMu sync.RWMutex
blockQueue map[int]*hashQueue
transactionMu sync.RWMutex
transactionQueue map[int]*hashQueue
transactMu sync.Mutex
// NewPublicFilterAPI returns a new PublicFilterAPI instance.
func NewPublicFilterAPI(chainDb ethdb.Database, mux *event.TypeMux) *PublicFilterAPI {
svc := &PublicFilterAPI{
mux: mux,
chainDb: chainDb,
filterManager: NewFilterSystem(mux),
filterMapping: make(map[string]int),
logQueue: make(map[int]*logQueue),
blockQueue: make(map[int]*hashQueue),
transactionQueue: make(map[int]*hashQueue),
go svc.start()
return svc
// Stop quits the work loop.
func (s *PublicFilterAPI) Stop() {
// start the work loop, wait and process events.
func (s *PublicFilterAPI) start() {
timer := time.NewTicker(2 * time.Second)
defer timer.Stop()
for {
select {
case <-timer.C:
for id, filter := range s.logQueue {
if time.Since(filter.timeout) > filterTickerTime {
delete(s.logQueue, id)
for id, filter := range s.blockQueue {
if time.Since(filter.timeout) > filterTickerTime {
delete(s.blockQueue, id)
for id, filter := range s.transactionQueue {
if time.Since(filter.timeout) > filterTickerTime {
delete(s.transactionQueue, id)
case <-s.quit:
break done
// NewBlockFilter create a new filter that returns blocks that are included into the canonical chain.
func (s *PublicFilterAPI) NewBlockFilter() (string, error) {
externalId, err := newFilterId()
if err != nil {
return "", err
filter := New(s.chainDb)
id, err := s.filterManager.Add(filter, ChainFilter)
if err != nil {
return "", err
s.blockQueue[id] = &hashQueue{timeout: time.Now()}
filter.BlockCallback = func(block *types.Block, logs vm.Logs) {
defer s.blockMu.Unlock()
if queue := s.blockQueue[id]; queue != nil {
defer s.blockMu.Unlock()
s.filterMapping[externalId] = id
return externalId, nil
// NewPendingTransactionFilter creates a filter that returns new pending transactions.
func (s *PublicFilterAPI) NewPendingTransactionFilter() (string, error) {
externalId, err := newFilterId()
if err != nil {
return "", err
defer s.transactionMu.Unlock()
filter := New(s.chainDb)
id, err := s.filterManager.Add(filter, PendingTxFilter)
if err != nil {
return "", err
s.transactionQueue[id] = &hashQueue{timeout: time.Now()}
filter.TransactionCallback = func(tx *types.Transaction) {
defer s.transactionMu.Unlock()
if queue := s.transactionQueue[id]; queue != nil {
s.filterMapping[externalId] = id
return externalId, nil
// newLogFilter creates a new log filter.
func (s *PublicFilterAPI) newLogFilter(earliest, latest int64, addresses []common.Address, topics [][]common.Hash) (int, error) {
defer s.logMu.Unlock()
filter := New(s.chainDb)
id, err := s.filterManager.Add(filter, LogFilter)
if err != nil {
return 0, err
s.logQueue[id] = &logQueue{timeout: time.Now()}
filter.LogCallback = func(log *vm.Log, removed bool) {
defer s.logMu.Unlock()
if queue := s.logQueue[id]; queue != nil {
queue.add(vmlog{log, removed})
return id, nil
// NewFilterArgs represents a request to create a new filter.
type NewFilterArgs struct {
FromBlock rpc.BlockNumber
ToBlock rpc.BlockNumber
Addresses []common.Address
Topics [][]common.Hash
func (args *NewFilterArgs) UnmarshalJSON(data []byte) error {
type input struct {
From *rpc.BlockNumber `json:"fromBlock"`
ToBlock *rpc.BlockNumber `json:"toBlock"`
Addresses interface{} `json:"address"`
Topics interface{} `json:"topics"`
var raw input
if err := json.Unmarshal(data, &raw); err != nil {
return err
if raw.From == nil || raw.From.Int64() < 0 {
args.FromBlock = rpc.LatestBlockNumber
} else {
args.FromBlock = *raw.From
if raw.ToBlock == nil || raw.ToBlock.Int64() < 0 {
args.ToBlock = rpc.LatestBlockNumber
} else {
args.ToBlock = *raw.ToBlock
args.Addresses = []common.Address{}
if raw.Addresses != nil {
// raw.Address can contain a single address or an array of addresses
var addresses []common.Address
if strAddrs, ok := raw.Addresses.([]interface{}); ok {
for i, addr := range strAddrs {
if strAddr, ok := addr.(string); ok {
if len(strAddr) >= 2 && strAddr[0] == '0' && (strAddr[1] == 'x' || strAddr[1] == 'X') {
strAddr = strAddr[2:]
if decAddr, err := hex.DecodeString(strAddr); err == nil {
addresses = append(addresses, common.BytesToAddress(decAddr))
} else {
fmt.Errorf("invalid address given")
} else {
return fmt.Errorf("invalid address on index %d", i)
} else if singleAddr, ok := raw.Addresses.(string); ok {
if len(singleAddr) >= 2 && singleAddr[0] == '0' && (singleAddr[1] == 'x' || singleAddr[1] == 'X') {
singleAddr = singleAddr[2:]
if decAddr, err := hex.DecodeString(singleAddr); err == nil {
addresses = append(addresses, common.BytesToAddress(decAddr))
} else {
fmt.Errorf("invalid address given")
} else {
errors.New("invalid address(es) given")
args.Addresses = addresses
topicConverter := func(raw string) (common.Hash, error) {
if len(raw) == 0 {
return common.Hash{}, nil
if len(raw) >= 2 && raw[0] == '0' && (raw[1] == 'x' || raw[1] == 'X') {
raw = raw[2:]
if decAddr, err := hex.DecodeString(raw); err == nil {
return common.BytesToHash(decAddr), nil
return common.Hash{}, errors.New("invalid topic given")
// topics is an array consisting of strings or arrays of strings
if raw.Topics != nil {
topics, ok := raw.Topics.([]interface{})
if ok {
parsedTopics := make([][]common.Hash, len(topics))
for i, topic := range topics {
if topic == nil {
parsedTopics[i] = []common.Hash{common.StringToHash("")}
} else if strTopic, ok := topic.(string); ok {
if t, err := topicConverter(strTopic); err != nil {
return fmt.Errorf("invalid topic on index %d", i)
} else {
parsedTopics[i] = []common.Hash{t}
} else if arrTopic, ok := topic.([]interface{}); ok {
parsedTopics[i] = make([]common.Hash, len(arrTopic))
for j := 0; j < len(parsedTopics[i]); i++ {
if arrTopic[j] == nil {
parsedTopics[i][j] = common.StringToHash("")
} else if str, ok := arrTopic[j].(string); ok {
if t, err := topicConverter(str); err != nil {
return fmt.Errorf("invalid topic on index %d", i)
} else {
parsedTopics[i] = []common.Hash{t}
} else {
fmt.Errorf("topic[%d][%d] not a string", i, j)
} else {
return fmt.Errorf("topic[%d] invalid", i)
args.Topics = parsedTopics
return nil
// NewFilter creates a new filter and returns the filter id. It can be uses to retrieve logs.
func (s *PublicFilterAPI) NewFilter(args NewFilterArgs) (string, error) {
externalId, err := newFilterId()
if err != nil {
return "", err
var id int
if len(args.Addresses) > 0 {
id, err = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), args.Addresses, args.Topics)
} else {
id, err = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), nil, args.Topics)
if err != nil {
return "", err
s.filterMapping[externalId] = id
return externalId, nil
// GetLogs returns the logs matching the given argument.
func (s *PublicFilterAPI) GetLogs(args NewFilterArgs) []vmlog {
filter := New(s.chainDb)
return toRPCLogs(filter.Find(), false)
// UninstallFilter removes the filter with the given filter id.
func (s *PublicFilterAPI) UninstallFilter(filterId string) bool {
defer s.filterMapMu.Unlock()
id, ok := s.filterMapping[filterId]
if !ok {
return false
defer s.filterManager.Remove(id)
delete(s.filterMapping, filterId)
if _, ok := s.logQueue[id]; ok {
defer s.logMu.Unlock()
delete(s.logQueue, id)
return true
if _, ok := s.blockQueue[id]; ok {
defer s.blockMu.Unlock()
delete(s.blockQueue, id)
return true
if _, ok := s.transactionQueue[id]; ok {
defer s.transactionMu.Unlock()
delete(s.transactionQueue, id)
return true
return false
// getFilterType is a helper utility that determine the type of filter for the given filter id.
func (s *PublicFilterAPI) getFilterType(id int) byte {
if _, ok := s.blockQueue[id]; ok {
return blockFilterTy
} else if _, ok := s.transactionQueue[id]; ok {
return transactionFilterTy
} else if _, ok := s.logQueue[id]; ok {
return logFilterTy
return unknownFilterTy
// blockFilterChanged returns a collection of block hashes for the block filter with the given id.
func (s *PublicFilterAPI) blockFilterChanged(id int) []common.Hash {
defer s.blockMu.Unlock()
if s.blockQueue[id] != nil {
return s.blockQueue[id].get()
return nil
// transactionFilterChanged returns a collection of transaction hashes for the pending
// transaction filter with the given id.
func (s *PublicFilterAPI) transactionFilterChanged(id int) []common.Hash {
defer s.blockMu.Unlock()
if s.transactionQueue[id] != nil {
return s.transactionQueue[id].get()
return nil
// logFilterChanged returns a collection of logs for the log filter with the given id.
func (s *PublicFilterAPI) logFilterChanged(id int) []vmlog {
defer s.logMu.Unlock()
if s.logQueue[id] != nil {
return s.logQueue[id].get()
return nil
// GetFilterLogs returns the logs for the filter with the given id.
func (s *PublicFilterAPI) GetFilterLogs(filterId string) []vmlog {
id, ok := s.filterMapping[filterId]
if !ok {
return toRPCLogs(nil, false)
if filter := s.filterManager.Get(id); filter != nil {
return toRPCLogs(filter.Find(), false)
return toRPCLogs(nil, false)
// GetFilterChanges returns the logs for the filter with the given id since last time is was called.
// This can be used for polling.
func (s *PublicFilterAPI) GetFilterChanges(filterId string) interface{} {
id, ok := s.filterMapping[filterId]
if !ok { // filter not found
return []interface{}{}
switch s.getFilterType(id) {
case blockFilterTy:
return returnHashes(s.blockFilterChanged(id))
case transactionFilterTy:
return returnHashes(s.transactionFilterChanged(id))
case logFilterTy:
return s.logFilterChanged(id)
return []interface{}{}
type vmlog struct {
Removed bool `json:"removed"`
type logQueue struct {
mu sync.Mutex
logs []vmlog
timeout time.Time
id int
func (l *logQueue) add(logs ...vmlog) {
defer l.mu.Unlock()
l.logs = append(l.logs, logs...)
func (l *logQueue) get() []vmlog {
defer l.mu.Unlock()
l.timeout = time.Now()
tmp := l.logs
l.logs = nil
return tmp
type hashQueue struct {
mu sync.Mutex
hashes []common.Hash
timeout time.Time
id int
func (l *hashQueue) add(hashes ...common.Hash) {
defer l.mu.Unlock()
l.hashes = append(l.hashes, hashes...)
func (l *hashQueue) get() []common.Hash {
defer l.mu.Unlock()
l.timeout = time.Now()
tmp := l.hashes
l.hashes = nil
return tmp
// newFilterId generates a new random filter identifier that can be exposed to the outer world. By publishing random
// identifiers it is not feasible for DApp's to guess filter id's for other DApp's and uninstall or poll for them
// causing the affected DApp to miss data.
func newFilterId() (string, error) {
var subid [16]byte
n, _ := rand.Read(subid[:])
if n != 16 {
return "", errors.New("Unable to generate filter id")
return "0x" + hex.EncodeToString(subid[:]), nil
// toRPCLogs is a helper that will convert a vm.Logs array to an structure which
// can hold additional information about the logs such as whether it was deleted.
// Additionally when nil is given it will by default instead create an empty slice
// instead. This is required by the RPC specification.
func toRPCLogs(logs vm.Logs, removed bool) []vmlog {
convertedLogs := make([]vmlog, len(logs))
for i, log := range logs {
convertedLogs[i] = vmlog{Log: log, Removed: removed}
return convertedLogs
// returnHashes is a helper that will return an empty hash array case the given hash array is nil, otherwise is will
// return the given hashes. The RPC interfaces defines that always an array is returned.
func returnHashes(hashes []common.Hash) []common.Hash {
if hashes == nil {
return []common.Hash{}
return hashes