diff --git a/eth/filters/api.go b/eth/filters/api.go index a30eb28bef..f9ae70eba7 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -39,6 +39,7 @@ type filter struct { typ Type deadline *time.Timer // filter is inactive when deadline triggers hashes []common.Hash + fullTx bool txs []*types.Transaction crit FilterCriteria logs []*types.Log @@ -103,14 +104,14 @@ func (api *FilterAPI) timeoutLoop(timeout time.Duration) { // // It is part of the filter package because this filter can be used through the // `eth_getFilterChanges` polling method that is also used for log filters. -func (api *FilterAPI) NewPendingTransactionFilter() rpc.ID { +func (api *FilterAPI) NewPendingTransactionFilter(fullTx *bool) rpc.ID { var ( pendingTxs = make(chan []*types.Transaction) pendingTxSub = api.events.SubscribePendingTxs(pendingTxs) ) api.filtersMu.Lock() - api.filters[pendingTxSub.ID] = &filter{typ: PendingTransactionsSubscription, deadline: time.NewTimer(api.timeout), txs: make([]*types.Transaction, 0), s: pendingTxSub} + api.filters[pendingTxSub.ID] = &filter{typ: PendingTransactionsSubscription, fullTx: fullTx != nil && *fullTx, deadline: time.NewTimer(api.timeout), txs: make([]*types.Transaction, 0), s: pendingTxSub} api.filtersMu.Unlock() go func() { @@ -412,6 +413,9 @@ func (api *FilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) { api.filtersMu.Lock() defer api.filtersMu.Unlock() + chainConfig := api.sys.backend.ChainConfig() + latest := api.sys.backend.CurrentHeader() + if f, found := api.filters[id]; found { if !f.deadline.Stop() { // timer expired but filter is not yet removed in timeout loop @@ -426,9 +430,21 @@ func (api *FilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) { f.hashes = nil return returnHashes(hashes), nil case PendingTransactionsSubscription: - txs := f.txs - f.txs = nil - return txs, nil + if f.fullTx { + txs := make([]*ethapi.RPCTransaction, 0, len(f.txs)) + for _, tx := range f.txs { + txs = append(txs, ethapi.NewRPCPendingTransaction(tx, latest, chainConfig)) + } + f.txs = nil + return txs, nil + } else { + hashes := make([]common.Hash, 0, len(f.txs)) + for _, tx := range f.txs { + hashes = append(hashes, tx.Hash()) + } + f.txs = nil + return hashes, nil + } case LogsSubscription, MinedAndPendingLogsSubscription: logs := f.logs f.logs = nil diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go index 974483b68f..b70b0158ad 100644 --- a/eth/filters/filter_system_test.go +++ b/eth/filters/filter_system_test.go @@ -37,6 +37,7 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/internal/ethapi" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rpc" ) @@ -52,11 +53,12 @@ type testBackend struct { } func (b *testBackend) ChainConfig() *params.ChainConfig { - panic("implement me") + return params.TestChainConfig } func (b *testBackend) CurrentHeader() *types.Header { - panic("implement me") + hdr, _ := b.HeaderByNumber(context.TODO(), rpc.LatestBlockNumber) + return hdr } func (b *testBackend) ChainDb() ethdb.Database { @@ -256,10 +258,10 @@ func TestPendingTxFilter(t *testing.T) { types.NewTransaction(4, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil), } - txs []*types.Transaction + hashes []common.Hash ) - fid0 := api.NewPendingTransactionFilter() + fid0 := api.NewPendingTransactionFilter(nil) time.Sleep(1 * time.Second) backend.txFeed.Send(core.NewTxsEvent{Txs: transactions}) @@ -271,7 +273,64 @@ func TestPendingTxFilter(t *testing.T) { t.Fatalf("Unable to retrieve logs: %v", err) } - tx := results.([]*types.Transaction) + h := results.([]common.Hash) + hashes = append(hashes, h...) + if len(hashes) >= len(transactions) { + break + } + // check timeout + if time.Now().After(timeout) { + break + } + + time.Sleep(100 * time.Millisecond) + } + + if len(hashes) != len(transactions) { + t.Errorf("invalid number of transactions, want %d transactions(s), got %d", len(transactions), len(hashes)) + return + } + for i := range hashes { + if hashes[i] != transactions[i].Hash() { + t.Errorf("hashes[%d] invalid, want %x, got %x", i, transactions[i].Hash(), hashes[i]) + } + } +} + +// TestPendingTxFilterFullTx tests whether pending tx filters retrieve all pending transactions that are posted to the event mux. +func TestPendingTxFilterFullTx(t *testing.T) { + t.Parallel() + + var ( + db = rawdb.NewMemoryDatabase() + backend, sys = newTestFilterSystem(t, db, Config{}) + api = NewFilterAPI(sys, false) + + transactions = []*types.Transaction{ + types.NewTransaction(0, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil), + types.NewTransaction(1, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil), + types.NewTransaction(2, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil), + types.NewTransaction(3, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil), + types.NewTransaction(4, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil), + } + + txs []*ethapi.RPCTransaction + ) + + fullTx := true + fid0 := api.NewPendingTransactionFilter(&fullTx) + + time.Sleep(1 * time.Second) + backend.txFeed.Send(core.NewTxsEvent{Txs: transactions}) + + timeout := time.Now().Add(1 * time.Second) + for { + results, err := api.GetFilterChanges(fid0) + if err != nil { + t.Fatalf("Unable to retrieve logs: %v", err) + } + + tx := results.([]*ethapi.RPCTransaction) txs = append(txs, tx...) if len(txs) >= len(transactions) { break @@ -289,8 +348,8 @@ func TestPendingTxFilter(t *testing.T) { return } for i := range txs { - if txs[i].Hash() != transactions[i].Hash() { - t.Errorf("hashes[%d] invalid, want %x, got %x", i, transactions[i].Hash(), txs[i].Hash()) + if txs[i].Hash != transactions[i].Hash() { + t.Errorf("hashes[%d] invalid, want %x, got %x", i, transactions[i].Hash(), txs[i].Hash) } } } @@ -854,15 +913,15 @@ func TestPendingTxFilterDeadlock(t *testing.T) { // timeout either in 100ms or 200ms fids := make([]rpc.ID, 20) for i := 0; i < len(fids); i++ { - fid := api.NewPendingTransactionFilter() + fid := api.NewPendingTransactionFilter(nil) fids[i] = fid // Wait for at least one tx to arrive in filter for { - txs, err := api.GetFilterChanges(fid) + hashes, err := api.GetFilterChanges(fid) if err != nil { t.Fatalf("Filter should exist: %v\n", err) } - if len(txs.([]*types.Transaction)) > 0 { + if len(hashes.([]common.Hash)) > 0 { break } runtime.Gosched()