diff --git a/core/blockchain.go b/core/blockchain.go
index 0fe4812626..6c72432dcc 100644
--- a/core/blockchain.go
+++ b/core/blockchain.go
@@ -894,7 +894,7 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Ha
rawdb.DeleteBody(db, hash, num)
rawdb.DeleteReceipts(db, hash, num)
}
- // Todo(rjl493456442) txlookup, bloombits, etc
+ // Todo(rjl493456442) txlookup, log index, etc
}
// If SetHead was only called as a chain reparation method, try to skip
// touching the header chain altogether, unless the freezer is broken
diff --git a/core/bloom_indexer.go b/core/bloom_indexer.go
deleted file mode 100644
index 68a35d811e..0000000000
--- a/core/bloom_indexer.go
+++ /dev/null
@@ -1,92 +0,0 @@
-// Copyright 2021 The go-ethereum Authors
-// This file is part of the go-ethereum library.
-//
-// The go-ethereum library is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Lesser General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// The go-ethereum library is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the go-ethereum library. If not, see .
-
-package core
-
-import (
- "context"
- "time"
-
- "github.com/ethereum/go-ethereum/common"
- "github.com/ethereum/go-ethereum/common/bitutil"
- "github.com/ethereum/go-ethereum/core/bloombits"
- "github.com/ethereum/go-ethereum/core/rawdb"
- "github.com/ethereum/go-ethereum/core/types"
- "github.com/ethereum/go-ethereum/ethdb"
-)
-
-const (
- // bloomThrottling is the time to wait between processing two consecutive index
- // sections. It's useful during chain upgrades to prevent disk overload.
- bloomThrottling = 100 * time.Millisecond
-)
-
-// BloomIndexer implements a core.ChainIndexer, building up a rotated bloom bits index
-// for the Ethereum header bloom filters, permitting blazing fast filtering.
-type BloomIndexer struct {
- size uint64 // section size to generate bloombits for
- db ethdb.Database // database instance to write index data and metadata into
- gen *bloombits.Generator // generator to rotate the bloom bits crating the bloom index
- section uint64 // Section is the section number being processed currently
- head common.Hash // Head is the hash of the last header processed
-}
-
-// NewBloomIndexer returns a chain indexer that generates bloom bits data for the
-// canonical chain for fast logs filtering.
-func NewBloomIndexer(db ethdb.Database, size, confirms uint64) *ChainIndexer {
- backend := &BloomIndexer{
- db: db,
- size: size,
- }
- table := rawdb.NewTable(db, string(rawdb.BloomBitsIndexPrefix))
-
- return NewChainIndexer(db, table, backend, size, confirms, bloomThrottling, "bloombits")
-}
-
-// Reset implements core.ChainIndexerBackend, starting a new bloombits index
-// section.
-func (b *BloomIndexer) Reset(ctx context.Context, section uint64, lastSectionHead common.Hash) error {
- gen, err := bloombits.NewGenerator(uint(b.size))
- b.gen, b.section, b.head = gen, section, common.Hash{}
- return err
-}
-
-// Process implements core.ChainIndexerBackend, adding a new header's bloom into
-// the index.
-func (b *BloomIndexer) Process(ctx context.Context, header *types.Header) error {
- b.gen.AddBloom(uint(header.Number.Uint64()-b.section*b.size), header.Bloom)
- b.head = header.Hash()
- return nil
-}
-
-// Commit implements core.ChainIndexerBackend, finalizing the bloom section and
-// writing it out into the database.
-func (b *BloomIndexer) Commit() error {
- batch := b.db.NewBatchWithSize((int(b.size) / 8) * types.BloomBitLength)
- for i := 0; i < types.BloomBitLength; i++ {
- bits, err := b.gen.Bitset(uint(i))
- if err != nil {
- return err
- }
- rawdb.WriteBloomBits(batch, uint(i), b.section, b.head, bitutil.CompressBytes(bits))
- }
- return batch.Write()
-}
-
-// Prune returns an empty error since we don't support pruning here.
-func (b *BloomIndexer) Prune(threshold uint64) error {
- return nil
-}
diff --git a/core/bloombits/doc.go b/core/bloombits/doc.go
deleted file mode 100644
index 3d159e74f7..0000000000
--- a/core/bloombits/doc.go
+++ /dev/null
@@ -1,18 +0,0 @@
-// Copyright 2017 The go-ethereum Authors
-// This file is part of the go-ethereum library.
-//
-// The go-ethereum library is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Lesser General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// The go-ethereum library is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the go-ethereum library. If not, see .
-
-// Package bloombits implements bloom filtering on batches of data.
-package bloombits
diff --git a/core/bloombits/generator.go b/core/bloombits/generator.go
deleted file mode 100644
index 646151db0b..0000000000
--- a/core/bloombits/generator.go
+++ /dev/null
@@ -1,98 +0,0 @@
-// Copyright 2017 The go-ethereum Authors
-// This file is part of the go-ethereum library.
-//
-// The go-ethereum library is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Lesser General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// The go-ethereum library is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the go-ethereum library. If not, see .
-
-package bloombits
-
-import (
- "errors"
-
- "github.com/ethereum/go-ethereum/core/types"
-)
-
-var (
- // errSectionOutOfBounds is returned if the user tried to add more bloom filters
- // to the batch than available space, or if tries to retrieve above the capacity.
- errSectionOutOfBounds = errors.New("section out of bounds")
-
- // errBloomBitOutOfBounds is returned if the user tried to retrieve specified
- // bit bloom above the capacity.
- errBloomBitOutOfBounds = errors.New("bloom bit out of bounds")
-)
-
-// Generator takes a number of bloom filters and generates the rotated bloom bits
-// to be used for batched filtering.
-type Generator struct {
- blooms [types.BloomBitLength][]byte // Rotated blooms for per-bit matching
- sections uint // Number of sections to batch together
- nextSec uint // Next section to set when adding a bloom
-}
-
-// NewGenerator creates a rotated bloom generator that can iteratively fill a
-// batched bloom filter's bits.
-func NewGenerator(sections uint) (*Generator, error) {
- if sections%8 != 0 {
- return nil, errors.New("section count not multiple of 8")
- }
- b := &Generator{sections: sections}
- for i := 0; i < types.BloomBitLength; i++ {
- b.blooms[i] = make([]byte, sections/8)
- }
- return b, nil
-}
-
-// AddBloom takes a single bloom filter and sets the corresponding bit column
-// in memory accordingly.
-func (b *Generator) AddBloom(index uint, bloom types.Bloom) error {
- // Make sure we're not adding more bloom filters than our capacity
- if b.nextSec >= b.sections {
- return errSectionOutOfBounds
- }
- if b.nextSec != index {
- return errors.New("bloom filter with unexpected index")
- }
- // Rotate the bloom and insert into our collection
- byteIndex := b.nextSec / 8
- bitIndex := byte(7 - b.nextSec%8)
- for byt := 0; byt < types.BloomByteLength; byt++ {
- bloomByte := bloom[types.BloomByteLength-1-byt]
- if bloomByte == 0 {
- continue
- }
- base := 8 * byt
- b.blooms[base+7][byteIndex] |= ((bloomByte >> 7) & 1) << bitIndex
- b.blooms[base+6][byteIndex] |= ((bloomByte >> 6) & 1) << bitIndex
- b.blooms[base+5][byteIndex] |= ((bloomByte >> 5) & 1) << bitIndex
- b.blooms[base+4][byteIndex] |= ((bloomByte >> 4) & 1) << bitIndex
- b.blooms[base+3][byteIndex] |= ((bloomByte >> 3) & 1) << bitIndex
- b.blooms[base+2][byteIndex] |= ((bloomByte >> 2) & 1) << bitIndex
- b.blooms[base+1][byteIndex] |= ((bloomByte >> 1) & 1) << bitIndex
- b.blooms[base][byteIndex] |= (bloomByte & 1) << bitIndex
- }
- b.nextSec++
- return nil
-}
-
-// Bitset returns the bit vector belonging to the given bit index after all
-// blooms have been added.
-func (b *Generator) Bitset(idx uint) ([]byte, error) {
- if b.nextSec != b.sections {
- return nil, errors.New("bloom not fully generated yet")
- }
- if idx >= types.BloomBitLength {
- return nil, errBloomBitOutOfBounds
- }
- return b.blooms[idx], nil
-}
diff --git a/core/bloombits/generator_test.go b/core/bloombits/generator_test.go
deleted file mode 100644
index ac1aee0b25..0000000000
--- a/core/bloombits/generator_test.go
+++ /dev/null
@@ -1,100 +0,0 @@
-// Copyright 2017 The go-ethereum Authors
-// This file is part of the go-ethereum library.
-//
-// The go-ethereum library is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Lesser General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// The go-ethereum library is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the go-ethereum library. If not, see .
-
-package bloombits
-
-import (
- "bytes"
- crand "crypto/rand"
- "math/rand"
- "testing"
-
- "github.com/ethereum/go-ethereum/core/types"
-)
-
-// Tests that batched bloom bits are correctly rotated from the input bloom
-// filters.
-func TestGenerator(t *testing.T) {
- // Generate the input and the rotated output
- var input, output [types.BloomBitLength][types.BloomByteLength]byte
-
- for i := 0; i < types.BloomBitLength; i++ {
- for j := 0; j < types.BloomBitLength; j++ {
- bit := byte(rand.Int() % 2)
-
- input[i][j/8] |= bit << byte(7-j%8)
- output[types.BloomBitLength-1-j][i/8] |= bit << byte(7-i%8)
- }
- }
- // Crunch the input through the generator and verify the result
- gen, err := NewGenerator(types.BloomBitLength)
- if err != nil {
- t.Fatalf("failed to create bloombit generator: %v", err)
- }
- for i, bloom := range input {
- if err := gen.AddBloom(uint(i), bloom); err != nil {
- t.Fatalf("bloom %d: failed to add: %v", i, err)
- }
- }
- for i, want := range output {
- have, err := gen.Bitset(uint(i))
- if err != nil {
- t.Fatalf("output %d: failed to retrieve bits: %v", i, err)
- }
- if !bytes.Equal(have, want[:]) {
- t.Errorf("output %d: bit vector mismatch have %x, want %x", i, have, want)
- }
- }
-}
-
-func BenchmarkGenerator(b *testing.B) {
- var input [types.BloomBitLength][types.BloomByteLength]byte
- b.Run("empty", func(b *testing.B) {
- b.ReportAllocs()
- b.ResetTimer()
- for i := 0; i < b.N; i++ {
- // Crunch the input through the generator and verify the result
- gen, err := NewGenerator(types.BloomBitLength)
- if err != nil {
- b.Fatalf("failed to create bloombit generator: %v", err)
- }
- for j, bloom := range &input {
- if err := gen.AddBloom(uint(j), bloom); err != nil {
- b.Fatalf("bloom %d: failed to add: %v", i, err)
- }
- }
- }
- })
- for i := 0; i < types.BloomBitLength; i++ {
- crand.Read(input[i][:])
- }
- b.Run("random", func(b *testing.B) {
- b.ReportAllocs()
- b.ResetTimer()
- for i := 0; i < b.N; i++ {
- // Crunch the input through the generator and verify the result
- gen, err := NewGenerator(types.BloomBitLength)
- if err != nil {
- b.Fatalf("failed to create bloombit generator: %v", err)
- }
- for j, bloom := range &input {
- if err := gen.AddBloom(uint(j), bloom); err != nil {
- b.Fatalf("bloom %d: failed to add: %v", i, err)
- }
- }
- }
- })
-}
diff --git a/core/bloombits/matcher.go b/core/bloombits/matcher.go
deleted file mode 100644
index 486581fe23..0000000000
--- a/core/bloombits/matcher.go
+++ /dev/null
@@ -1,649 +0,0 @@
-// Copyright 2017 The go-ethereum Authors
-// This file is part of the go-ethereum library.
-//
-// The go-ethereum library is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Lesser General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// The go-ethereum library is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the go-ethereum library. If not, see .
-
-package bloombits
-
-import (
- "bytes"
- "context"
- "errors"
- "math"
- "sort"
- "sync"
- "sync/atomic"
- "time"
-
- "github.com/ethereum/go-ethereum/common/bitutil"
- "github.com/ethereum/go-ethereum/crypto"
-)
-
-// bloomIndexes represents the bit indexes inside the bloom filter that belong
-// to some key.
-type bloomIndexes [3]uint
-
-// calcBloomIndexes returns the bloom filter bit indexes belonging to the given key.
-func calcBloomIndexes(b []byte) bloomIndexes {
- b = crypto.Keccak256(b)
-
- var idxs bloomIndexes
- for i := 0; i < len(idxs); i++ {
- idxs[i] = (uint(b[2*i])<<8)&2047 + uint(b[2*i+1])
- }
- return idxs
-}
-
-// partialMatches with a non-nil vector represents a section in which some sub-
-// matchers have already found potential matches. Subsequent sub-matchers will
-// binary AND their matches with this vector. If vector is nil, it represents a
-// section to be processed by the first sub-matcher.
-type partialMatches struct {
- section uint64
- bitset []byte
-}
-
-// Retrieval represents a request for retrieval task assignments for a given
-// bit with the given number of fetch elements, or a response for such a request.
-// It can also have the actual results set to be used as a delivery data struct.
-//
-// The context and error fields are used by the light client to terminate matching
-// early if an error is encountered on some path of the pipeline.
-type Retrieval struct {
- Bit uint
- Sections []uint64
- Bitsets [][]byte
-
- Context context.Context
- Error error
-}
-
-// Matcher is a pipelined system of schedulers and logic matchers which perform
-// binary AND/OR operations on the bit-streams, creating a stream of potential
-// blocks to inspect for data content.
-type Matcher struct {
- sectionSize uint64 // Size of the data batches to filter on
-
- filters [][]bloomIndexes // Filter the system is matching for
- schedulers map[uint]*scheduler // Retrieval schedulers for loading bloom bits
-
- retrievers chan chan uint // Retriever processes waiting for bit allocations
- counters chan chan uint // Retriever processes waiting for task count reports
- retrievals chan chan *Retrieval // Retriever processes waiting for task allocations
- deliveries chan *Retrieval // Retriever processes waiting for task response deliveries
-
- running atomic.Bool // Atomic flag whether a session is live or not
-}
-
-// NewMatcher creates a new pipeline for retrieving bloom bit streams and doing
-// address and topic filtering on them. Setting a filter component to `nil` is
-// allowed and will result in that filter rule being skipped (OR 0x11...1).
-func NewMatcher(sectionSize uint64, filters [][][]byte) *Matcher {
- // Create the matcher instance
- m := &Matcher{
- sectionSize: sectionSize,
- schedulers: make(map[uint]*scheduler),
- retrievers: make(chan chan uint),
- counters: make(chan chan uint),
- retrievals: make(chan chan *Retrieval),
- deliveries: make(chan *Retrieval),
- }
- // Calculate the bloom bit indexes for the groups we're interested in
- m.filters = nil
-
- for _, filter := range filters {
- // Gather the bit indexes of the filter rule, special casing the nil filter
- if len(filter) == 0 {
- continue
- }
- bloomBits := make([]bloomIndexes, len(filter))
- for i, clause := range filter {
- if clause == nil {
- bloomBits = nil
- break
- }
- bloomBits[i] = calcBloomIndexes(clause)
- }
- // Accumulate the filter rules if no nil rule was within
- if bloomBits != nil {
- m.filters = append(m.filters, bloomBits)
- }
- }
- // For every bit, create a scheduler to load/download the bit vectors
- for _, bloomIndexLists := range m.filters {
- for _, bloomIndexList := range bloomIndexLists {
- for _, bloomIndex := range bloomIndexList {
- m.addScheduler(bloomIndex)
- }
- }
- }
- return m
-}
-
-// addScheduler adds a bit stream retrieval scheduler for the given bit index if
-// it has not existed before. If the bit is already selected for filtering, the
-// existing scheduler can be used.
-func (m *Matcher) addScheduler(idx uint) {
- if _, ok := m.schedulers[idx]; ok {
- return
- }
- m.schedulers[idx] = newScheduler(idx)
-}
-
-// Start starts the matching process and returns a stream of bloom matches in
-// a given range of blocks. If there are no more matches in the range, the result
-// channel is closed.
-func (m *Matcher) Start(ctx context.Context, begin, end uint64, results chan uint64) (*MatcherSession, error) {
- // Make sure we're not creating concurrent sessions
- if m.running.Swap(true) {
- return nil, errors.New("matcher already running")
- }
- defer m.running.Store(false)
-
- // Initiate a new matching round
- session := &MatcherSession{
- matcher: m,
- quit: make(chan struct{}),
- ctx: ctx,
- }
- for _, scheduler := range m.schedulers {
- scheduler.reset()
- }
- sink := m.run(begin, end, cap(results), session)
-
- // Read the output from the result sink and deliver to the user
- session.pend.Add(1)
- go func() {
- defer session.pend.Done()
- defer close(results)
-
- for {
- select {
- case <-session.quit:
- return
-
- case res, ok := <-sink:
- // New match result found
- if !ok {
- return
- }
- // Calculate the first and last blocks of the section
- sectionStart := res.section * m.sectionSize
-
- first := sectionStart
- if begin > first {
- first = begin
- }
- last := sectionStart + m.sectionSize - 1
- if end < last {
- last = end
- }
- // Iterate over all the blocks in the section and return the matching ones
- for i := first; i <= last; i++ {
- // Skip the entire byte if no matches are found inside (and we're processing an entire byte!)
- next := res.bitset[(i-sectionStart)/8]
- if next == 0 {
- if i%8 == 0 {
- i += 7
- }
- continue
- }
- // Some bit it set, do the actual submatching
- if bit := 7 - i%8; next&(1<= req.section })
- requests[req.bit] = append(queue[:index], append([]uint64{req.section}, queue[index:]...)...)
-
- // If it's a new bit and we have waiting fetchers, allocate to them
- if len(queue) == 0 {
- assign(req.bit)
- }
-
- case fetcher := <-retrievers:
- // New retriever arrived, find the lowest section-ed bit to assign
- bit, best := uint(0), uint64(math.MaxUint64)
- for idx := range unallocs {
- if requests[idx][0] < best {
- bit, best = idx, requests[idx][0]
- }
- }
- // Stop tracking this bit (and alloc notifications if no more work is available)
- delete(unallocs, bit)
- if len(unallocs) == 0 {
- retrievers = nil
- }
- allocs++
- fetcher <- bit
-
- case fetcher := <-m.counters:
- // New task count request arrives, return number of items
- fetcher <- uint(len(requests[<-fetcher]))
-
- case fetcher := <-m.retrievals:
- // New fetcher waiting for tasks to retrieve, assign
- task := <-fetcher
- if want := len(task.Sections); want >= len(requests[task.Bit]) {
- task.Sections = requests[task.Bit]
- delete(requests, task.Bit)
- } else {
- task.Sections = append(task.Sections[:0], requests[task.Bit][:want]...)
- requests[task.Bit] = append(requests[task.Bit][:0], requests[task.Bit][want:]...)
- }
- fetcher <- task
-
- // If anything was left unallocated, try to assign to someone else
- if len(requests[task.Bit]) > 0 {
- assign(task.Bit)
- }
-
- case result := <-m.deliveries:
- // New retrieval task response from fetcher, split out missing sections and
- // deliver complete ones
- var (
- sections = make([]uint64, 0, len(result.Sections))
- bitsets = make([][]byte, 0, len(result.Bitsets))
- missing = make([]uint64, 0, len(result.Sections))
- )
- for i, bitset := range result.Bitsets {
- if len(bitset) == 0 {
- missing = append(missing, result.Sections[i])
- continue
- }
- sections = append(sections, result.Sections[i])
- bitsets = append(bitsets, bitset)
- }
- m.schedulers[result.Bit].deliver(sections, bitsets)
- allocs--
-
- // Reschedule missing sections and allocate bit if newly available
- if len(missing) > 0 {
- queue := requests[result.Bit]
- for _, section := range missing {
- index := sort.Search(len(queue), func(i int) bool { return queue[i] >= section })
- queue = append(queue[:index], append([]uint64{section}, queue[index:]...)...)
- }
- requests[result.Bit] = queue
-
- if len(queue) == len(missing) {
- assign(result.Bit)
- }
- }
-
- // End the session when all pending deliveries have arrived.
- if shutdown == nil && allocs == 0 {
- return
- }
- }
- }
-}
-
-// MatcherSession is returned by a started matcher to be used as a terminator
-// for the actively running matching operation.
-type MatcherSession struct {
- matcher *Matcher
-
- closer sync.Once // Sync object to ensure we only ever close once
- quit chan struct{} // Quit channel to request pipeline termination
-
- ctx context.Context // Context used by the light client to abort filtering
- err error // Global error to track retrieval failures deep in the chain
- errLock sync.Mutex
-
- pend sync.WaitGroup
-}
-
-// Close stops the matching process and waits for all subprocesses to terminate
-// before returning. The timeout may be used for graceful shutdown, allowing the
-// currently running retrievals to complete before this time.
-func (s *MatcherSession) Close() {
- s.closer.Do(func() {
- // Signal termination and wait for all goroutines to tear down
- close(s.quit)
- s.pend.Wait()
- })
-}
-
-// Error returns any failure encountered during the matching session.
-func (s *MatcherSession) Error() error {
- s.errLock.Lock()
- defer s.errLock.Unlock()
-
- return s.err
-}
-
-// allocateRetrieval assigns a bloom bit index to a client process that can either
-// immediately request and fetch the section contents assigned to this bit or wait
-// a little while for more sections to be requested.
-func (s *MatcherSession) allocateRetrieval() (uint, bool) {
- fetcher := make(chan uint)
-
- select {
- case <-s.quit:
- return 0, false
- case s.matcher.retrievers <- fetcher:
- bit, ok := <-fetcher
- return bit, ok
- }
-}
-
-// pendingSections returns the number of pending section retrievals belonging to
-// the given bloom bit index.
-func (s *MatcherSession) pendingSections(bit uint) int {
- fetcher := make(chan uint)
-
- select {
- case <-s.quit:
- return 0
- case s.matcher.counters <- fetcher:
- fetcher <- bit
- return int(<-fetcher)
- }
-}
-
-// allocateSections assigns all or part of an already allocated bit-task queue
-// to the requesting process.
-func (s *MatcherSession) allocateSections(bit uint, count int) []uint64 {
- fetcher := make(chan *Retrieval)
-
- select {
- case <-s.quit:
- return nil
- case s.matcher.retrievals <- fetcher:
- task := &Retrieval{
- Bit: bit,
- Sections: make([]uint64, count),
- }
- fetcher <- task
- return (<-fetcher).Sections
- }
-}
-
-// deliverSections delivers a batch of section bit-vectors for a specific bloom
-// bit index to be injected into the processing pipeline.
-func (s *MatcherSession) deliverSections(bit uint, sections []uint64, bitsets [][]byte) {
- s.matcher.deliveries <- &Retrieval{Bit: bit, Sections: sections, Bitsets: bitsets}
-}
-
-// Multiplex polls the matcher session for retrieval tasks and multiplexes it into
-// the requested retrieval queue to be serviced together with other sessions.
-//
-// This method will block for the lifetime of the session. Even after termination
-// of the session, any request in-flight need to be responded to! Empty responses
-// are fine though in that case.
-func (s *MatcherSession) Multiplex(batch int, wait time.Duration, mux chan chan *Retrieval) {
- waitTimer := time.NewTimer(wait)
- defer waitTimer.Stop()
-
- for {
- // Allocate a new bloom bit index to retrieve data for, stopping when done
- bit, ok := s.allocateRetrieval()
- if !ok {
- return
- }
- // Bit allocated, throttle a bit if we're below our batch limit
- if s.pendingSections(bit) < batch {
- waitTimer.Reset(wait)
- select {
- case <-s.quit:
- // Session terminating, we can't meaningfully service, abort
- s.allocateSections(bit, 0)
- s.deliverSections(bit, []uint64{}, [][]byte{})
- return
-
- case <-waitTimer.C:
- // Throttling up, fetch whatever is available
- }
- }
- // Allocate as much as we can handle and request servicing
- sections := s.allocateSections(bit, batch)
- request := make(chan *Retrieval)
-
- select {
- case <-s.quit:
- // Session terminating, we can't meaningfully service, abort
- s.deliverSections(bit, sections, make([][]byte, len(sections)))
- return
-
- case mux <- request:
- // Retrieval accepted, something must arrive before we're aborting
- request <- &Retrieval{Bit: bit, Sections: sections, Context: s.ctx}
-
- result := <-request
-
- // Deliver a result before s.Close() to avoid a deadlock
- s.deliverSections(result.Bit, result.Sections, result.Bitsets)
-
- if result.Error != nil {
- s.errLock.Lock()
- s.err = result.Error
- s.errLock.Unlock()
- s.Close()
- }
- }
- }
-}
diff --git a/core/bloombits/matcher_test.go b/core/bloombits/matcher_test.go
deleted file mode 100644
index 7f3d5f279c..0000000000
--- a/core/bloombits/matcher_test.go
+++ /dev/null
@@ -1,292 +0,0 @@
-// Copyright 2017 The go-ethereum Authors
-// This file is part of the go-ethereum library.
-//
-// The go-ethereum library is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Lesser General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// The go-ethereum library is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the go-ethereum library. If not, see .
-
-package bloombits
-
-import (
- "context"
- "math/rand"
- "sync/atomic"
- "testing"
- "time"
-
- "github.com/ethereum/go-ethereum/common"
-)
-
-const testSectionSize = 4096
-
-// Tests that wildcard filter rules (nil) can be specified and are handled well.
-func TestMatcherWildcards(t *testing.T) {
- t.Parallel()
- matcher := NewMatcher(testSectionSize, [][][]byte{
- {common.Address{}.Bytes(), common.Address{0x01}.Bytes()}, // Default address is not a wildcard
- {common.Hash{}.Bytes(), common.Hash{0x01}.Bytes()}, // Default hash is not a wildcard
- {common.Hash{0x01}.Bytes()}, // Plain rule, sanity check
- {common.Hash{0x01}.Bytes(), nil}, // Wildcard suffix, drop rule
- {nil, common.Hash{0x01}.Bytes()}, // Wildcard prefix, drop rule
- {nil, nil}, // Wildcard combo, drop rule
- {}, // Inited wildcard rule, drop rule
- nil, // Proper wildcard rule, drop rule
- })
- if len(matcher.filters) != 3 {
- t.Fatalf("filter system size mismatch: have %d, want %d", len(matcher.filters), 3)
- }
- if len(matcher.filters[0]) != 2 {
- t.Fatalf("address clause size mismatch: have %d, want %d", len(matcher.filters[0]), 2)
- }
- if len(matcher.filters[1]) != 2 {
- t.Fatalf("combo topic clause size mismatch: have %d, want %d", len(matcher.filters[1]), 2)
- }
- if len(matcher.filters[2]) != 1 {
- t.Fatalf("singletone topic clause size mismatch: have %d, want %d", len(matcher.filters[2]), 1)
- }
-}
-
-// Tests the matcher pipeline on a single continuous workflow without interrupts.
-func TestMatcherContinuous(t *testing.T) {
- t.Parallel()
- testMatcherDiffBatches(t, [][]bloomIndexes{{{10, 20, 30}}}, 0, 100000, false, 75)
- testMatcherDiffBatches(t, [][]bloomIndexes{{{32, 3125, 100}}, {{40, 50, 10}}}, 0, 100000, false, 81)
- testMatcherDiffBatches(t, [][]bloomIndexes{{{4, 8, 11}, {7, 8, 17}}, {{9, 9, 12}, {15, 20, 13}}, {{18, 15, 15}, {12, 10, 4}}}, 0, 10000, false, 36)
-}
-
-// Tests the matcher pipeline on a constantly interrupted and resumed work pattern
-// with the aim of ensuring data items are requested only once.
-func TestMatcherIntermittent(t *testing.T) {
- t.Parallel()
- testMatcherDiffBatches(t, [][]bloomIndexes{{{10, 20, 30}}}, 0, 100000, true, 75)
- testMatcherDiffBatches(t, [][]bloomIndexes{{{32, 3125, 100}}, {{40, 50, 10}}}, 0, 100000, true, 81)
- testMatcherDiffBatches(t, [][]bloomIndexes{{{4, 8, 11}, {7, 8, 17}}, {{9, 9, 12}, {15, 20, 13}}, {{18, 15, 15}, {12, 10, 4}}}, 0, 10000, true, 36)
-}
-
-// Tests the matcher pipeline on random input to hopefully catch anomalies.
-func TestMatcherRandom(t *testing.T) {
- t.Parallel()
- for i := 0; i < 10; i++ {
- testMatcherBothModes(t, makeRandomIndexes([]int{1}, 50), 0, 10000, 0)
- testMatcherBothModes(t, makeRandomIndexes([]int{3}, 50), 0, 10000, 0)
- testMatcherBothModes(t, makeRandomIndexes([]int{2, 2, 2}, 20), 0, 10000, 0)
- testMatcherBothModes(t, makeRandomIndexes([]int{5, 5, 5}, 50), 0, 10000, 0)
- testMatcherBothModes(t, makeRandomIndexes([]int{4, 4, 4}, 20), 0, 10000, 0)
- }
-}
-
-// Tests that the matcher can properly find matches if the starting block is
-// shifted from a multiple of 8. This is needed to cover an optimisation with
-// bitset matching https://github.com/ethereum/go-ethereum/issues/15309.
-func TestMatcherShifted(t *testing.T) {
- t.Parallel()
- // Block 0 always matches in the tests, skip ahead of first 8 blocks with the
- // start to get a potential zero byte in the matcher bitset.
-
- // To keep the second bitset byte zero, the filter must only match for the first
- // time in block 16, so doing an all-16 bit filter should suffice.
-
- // To keep the starting block non divisible by 8, block number 9 is the first
- // that would introduce a shift and not match block 0.
- testMatcherBothModes(t, [][]bloomIndexes{{{16, 16, 16}}}, 9, 64, 0)
-}
-
-// Tests that matching on everything doesn't crash (special case internally).
-func TestWildcardMatcher(t *testing.T) {
- t.Parallel()
- testMatcherBothModes(t, nil, 0, 10000, 0)
-}
-
-// makeRandomIndexes generates a random filter system, composed of multiple filter
-// criteria, each having one bloom list component for the address and arbitrarily
-// many topic bloom list components.
-func makeRandomIndexes(lengths []int, max int) [][]bloomIndexes {
- res := make([][]bloomIndexes, len(lengths))
- for i, topics := range lengths {
- res[i] = make([]bloomIndexes, topics)
- for j := 0; j < topics; j++ {
- for k := 0; k < len(res[i][j]); k++ {
- res[i][j][k] = uint(rand.Intn(max-1) + 2)
- }
- }
- }
- return res
-}
-
-// testMatcherDiffBatches runs the given matches test in single-delivery and also
-// in batches delivery mode, verifying that all kinds of deliveries are handled
-// correctly within.
-func testMatcherDiffBatches(t *testing.T, filter [][]bloomIndexes, start, blocks uint64, intermittent bool, retrievals uint32) {
- singleton := testMatcher(t, filter, start, blocks, intermittent, retrievals, 1)
- batched := testMatcher(t, filter, start, blocks, intermittent, retrievals, 16)
-
- if singleton != batched {
- t.Errorf("filter = %v blocks = %v intermittent = %v: request count mismatch, %v in singleton vs. %v in batched mode", filter, blocks, intermittent, singleton, batched)
- }
-}
-
-// testMatcherBothModes runs the given matcher test in both continuous as well as
-// in intermittent mode, verifying that the request counts match each other.
-func testMatcherBothModes(t *testing.T, filter [][]bloomIndexes, start, blocks uint64, retrievals uint32) {
- continuous := testMatcher(t, filter, start, blocks, false, retrievals, 16)
- intermittent := testMatcher(t, filter, start, blocks, true, retrievals, 16)
-
- if continuous != intermittent {
- t.Errorf("filter = %v blocks = %v: request count mismatch, %v in continuous vs. %v in intermittent mode", filter, blocks, continuous, intermittent)
- }
-}
-
-// testMatcher is a generic tester to run the given matcher test and return the
-// number of requests made for cross validation between different modes.
-func testMatcher(t *testing.T, filter [][]bloomIndexes, start, blocks uint64, intermittent bool, retrievals uint32, maxReqCount int) uint32 {
- // Create a new matcher an simulate our explicit random bitsets
- matcher := NewMatcher(testSectionSize, nil)
- matcher.filters = filter
-
- for _, rule := range filter {
- for _, topic := range rule {
- for _, bit := range topic {
- matcher.addScheduler(bit)
- }
- }
- }
- // Track the number of retrieval requests made
- var requested atomic.Uint32
-
- // Start the matching session for the filter and the retriever goroutines
- quit := make(chan struct{})
- matches := make(chan uint64, 16)
-
- session, err := matcher.Start(context.Background(), start, blocks-1, matches)
- if err != nil {
- t.Fatalf("failed to stat matcher session: %v", err)
- }
- startRetrievers(session, quit, &requested, maxReqCount)
-
- // Iterate over all the blocks and verify that the pipeline produces the correct matches
- for i := start; i < blocks; i++ {
- if expMatch3(filter, i) {
- match, ok := <-matches
- if !ok {
- t.Errorf("filter = %v blocks = %v intermittent = %v: expected #%v, results channel closed", filter, blocks, intermittent, i)
- return 0
- }
- if match != i {
- t.Errorf("filter = %v blocks = %v intermittent = %v: expected #%v, got #%v", filter, blocks, intermittent, i, match)
- }
- // If we're testing intermittent mode, abort and restart the pipeline
- if intermittent {
- session.Close()
- close(quit)
-
- quit = make(chan struct{})
- matches = make(chan uint64, 16)
-
- session, err = matcher.Start(context.Background(), i+1, blocks-1, matches)
- if err != nil {
- t.Fatalf("failed to stat matcher session: %v", err)
- }
- startRetrievers(session, quit, &requested, maxReqCount)
- }
- }
- }
- // Ensure the result channel is torn down after the last block
- match, ok := <-matches
- if ok {
- t.Errorf("filter = %v blocks = %v intermittent = %v: expected closed channel, got #%v", filter, blocks, intermittent, match)
- }
- // Clean up the session and ensure we match the expected retrieval count
- session.Close()
- close(quit)
-
- if retrievals != 0 && requested.Load() != retrievals {
- t.Errorf("filter = %v blocks = %v intermittent = %v: request count mismatch, have #%v, want #%v", filter, blocks, intermittent, requested.Load(), retrievals)
- }
- return requested.Load()
-}
-
-// startRetrievers starts a batch of goroutines listening for section requests
-// and serving them.
-func startRetrievers(session *MatcherSession, quit chan struct{}, retrievals *atomic.Uint32, batch int) {
- requests := make(chan chan *Retrieval)
-
- for i := 0; i < 10; i++ {
- // Start a multiplexer to test multiple threaded execution
- go session.Multiplex(batch, 100*time.Microsecond, requests)
-
- // Start a services to match the above multiplexer
- go func() {
- for {
- // Wait for a service request or a shutdown
- select {
- case <-quit:
- return
-
- case request := <-requests:
- task := <-request
-
- task.Bitsets = make([][]byte, len(task.Sections))
- for i, section := range task.Sections {
- if rand.Int()%4 != 0 { // Handle occasional missing deliveries
- task.Bitsets[i] = generateBitset(task.Bit, section)
- retrievals.Add(1)
- }
- }
- request <- task
- }
- }
- }()
- }
-}
-
-// generateBitset generates the rotated bitset for the given bloom bit and section
-// numbers.
-func generateBitset(bit uint, section uint64) []byte {
- bitset := make([]byte, testSectionSize/8)
- for i := 0; i < len(bitset); i++ {
- for b := 0; b < 8; b++ {
- blockIdx := section*testSectionSize + uint64(i*8+b)
- bitset[i] += bitset[i]
- if (blockIdx % uint64(bit)) == 0 {
- bitset[i]++
- }
- }
- }
- return bitset
-}
-
-func expMatch1(filter bloomIndexes, i uint64) bool {
- for _, ii := range filter {
- if (i % uint64(ii)) != 0 {
- return false
- }
- }
- return true
-}
-
-func expMatch2(filter []bloomIndexes, i uint64) bool {
- for _, ii := range filter {
- if expMatch1(ii, i) {
- return true
- }
- }
- return false
-}
-
-func expMatch3(filter [][]bloomIndexes, i uint64) bool {
- for _, ii := range filter {
- if !expMatch2(ii, i) {
- return false
- }
- }
- return true
-}
diff --git a/core/bloombits/scheduler.go b/core/bloombits/scheduler.go
deleted file mode 100644
index a523bc55ab..0000000000
--- a/core/bloombits/scheduler.go
+++ /dev/null
@@ -1,181 +0,0 @@
-// Copyright 2017 The go-ethereum Authors
-// This file is part of the go-ethereum library.
-//
-// The go-ethereum library is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Lesser General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// The go-ethereum library is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the go-ethereum library. If not, see .
-
-package bloombits
-
-import (
- "sync"
-)
-
-// request represents a bloom retrieval task to prioritize and pull from the local
-// database or remotely from the network.
-type request struct {
- section uint64 // Section index to retrieve the bit-vector from
- bit uint // Bit index within the section to retrieve the vector of
-}
-
-// response represents the state of a requested bit-vector through a scheduler.
-type response struct {
- cached []byte // Cached bits to dedup multiple requests
- done chan struct{} // Channel to allow waiting for completion
-}
-
-// scheduler handles the scheduling of bloom-filter retrieval operations for
-// entire section-batches belonging to a single bloom bit. Beside scheduling the
-// retrieval operations, this struct also deduplicates the requests and caches
-// the results to minimize network/database overhead even in complex filtering
-// scenarios.
-type scheduler struct {
- bit uint // Index of the bit in the bloom filter this scheduler is responsible for
- responses map[uint64]*response // Currently pending retrieval requests or already cached responses
- lock sync.Mutex // Lock protecting the responses from concurrent access
-}
-
-// newScheduler creates a new bloom-filter retrieval scheduler for a specific
-// bit index.
-func newScheduler(idx uint) *scheduler {
- return &scheduler{
- bit: idx,
- responses: make(map[uint64]*response),
- }
-}
-
-// run creates a retrieval pipeline, receiving section indexes from sections and
-// returning the results in the same order through the done channel. Concurrent
-// runs of the same scheduler are allowed, leading to retrieval task deduplication.
-func (s *scheduler) run(sections chan uint64, dist chan *request, done chan []byte, quit chan struct{}, wg *sync.WaitGroup) {
- // Create a forwarder channel between requests and responses of the same size as
- // the distribution channel (since that will block the pipeline anyway).
- pend := make(chan uint64, cap(dist))
-
- // Start the pipeline schedulers to forward between user -> distributor -> user
- wg.Add(2)
- go s.scheduleRequests(sections, dist, pend, quit, wg)
- go s.scheduleDeliveries(pend, done, quit, wg)
-}
-
-// reset cleans up any leftovers from previous runs. This is required before a
-// restart to ensure the no previously requested but never delivered state will
-// cause a lockup.
-func (s *scheduler) reset() {
- s.lock.Lock()
- defer s.lock.Unlock()
-
- for section, res := range s.responses {
- if res.cached == nil {
- delete(s.responses, section)
- }
- }
-}
-
-// scheduleRequests reads section retrieval requests from the input channel,
-// deduplicates the stream and pushes unique retrieval tasks into the distribution
-// channel for a database or network layer to honour.
-func (s *scheduler) scheduleRequests(reqs chan uint64, dist chan *request, pend chan uint64, quit chan struct{}, wg *sync.WaitGroup) {
- // Clean up the goroutine and pipeline when done
- defer wg.Done()
- defer close(pend)
-
- // Keep reading and scheduling section requests
- for {
- select {
- case <-quit:
- return
-
- case section, ok := <-reqs:
- // New section retrieval requested
- if !ok {
- return
- }
- // Deduplicate retrieval requests
- unique := false
-
- s.lock.Lock()
- if s.responses[section] == nil {
- s.responses[section] = &response{
- done: make(chan struct{}),
- }
- unique = true
- }
- s.lock.Unlock()
-
- // Schedule the section for retrieval and notify the deliverer to expect this section
- if unique {
- select {
- case <-quit:
- return
- case dist <- &request{bit: s.bit, section: section}:
- }
- }
- select {
- case <-quit:
- return
- case pend <- section:
- }
- }
- }
-}
-
-// scheduleDeliveries reads section acceptance notifications and waits for them
-// to be delivered, pushing them into the output data buffer.
-func (s *scheduler) scheduleDeliveries(pend chan uint64, done chan []byte, quit chan struct{}, wg *sync.WaitGroup) {
- // Clean up the goroutine and pipeline when done
- defer wg.Done()
- defer close(done)
-
- // Keep reading notifications and scheduling deliveries
- for {
- select {
- case <-quit:
- return
-
- case idx, ok := <-pend:
- // New section retrieval pending
- if !ok {
- return
- }
- // Wait until the request is honoured
- s.lock.Lock()
- res := s.responses[idx]
- s.lock.Unlock()
-
- select {
- case <-quit:
- return
- case <-res.done:
- }
- // Deliver the result
- select {
- case <-quit:
- return
- case done <- res.cached:
- }
- }
- }
-}
-
-// deliver is called by the request distributor when a reply to a request arrives.
-func (s *scheduler) deliver(sections []uint64, data [][]byte) {
- s.lock.Lock()
- defer s.lock.Unlock()
-
- for i, section := range sections {
- if res := s.responses[section]; res != nil && res.cached == nil { // Avoid non-requests and double deliveries
- res.cached = data[i]
- close(res.done)
- }
- }
-}
diff --git a/core/bloombits/scheduler_test.go b/core/bloombits/scheduler_test.go
deleted file mode 100644
index dcaaa91525..0000000000
--- a/core/bloombits/scheduler_test.go
+++ /dev/null
@@ -1,103 +0,0 @@
-// Copyright 2017 The go-ethereum Authors
-// This file is part of the go-ethereum library.
-//
-// The go-ethereum library is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Lesser General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// The go-ethereum library is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the go-ethereum library. If not, see .
-
-package bloombits
-
-import (
- "bytes"
- "math/big"
- "sync"
- "sync/atomic"
- "testing"
-)
-
-// Tests that the scheduler can deduplicate and forward retrieval requests to
-// underlying fetchers and serve responses back, irrelevant of the concurrency
-// of the requesting clients or serving data fetchers.
-func TestSchedulerSingleClientSingleFetcher(t *testing.T) { testScheduler(t, 1, 1, 5000) }
-func TestSchedulerSingleClientMultiFetcher(t *testing.T) { testScheduler(t, 1, 10, 5000) }
-func TestSchedulerMultiClientSingleFetcher(t *testing.T) { testScheduler(t, 10, 1, 5000) }
-func TestSchedulerMultiClientMultiFetcher(t *testing.T) { testScheduler(t, 10, 10, 5000) }
-
-func testScheduler(t *testing.T, clients int, fetchers int, requests int) {
- t.Parallel()
- f := newScheduler(0)
-
- // Create a batch of handler goroutines that respond to bloom bit requests and
- // deliver them to the scheduler.
- var fetchPend sync.WaitGroup
- fetchPend.Add(fetchers)
- defer fetchPend.Wait()
-
- fetch := make(chan *request, 16)
- defer close(fetch)
-
- var delivered atomic.Uint32
- for i := 0; i < fetchers; i++ {
- go func() {
- defer fetchPend.Done()
-
- for req := range fetch {
- delivered.Add(1)
-
- f.deliver([]uint64{
- req.section + uint64(requests), // Non-requested data (ensure it doesn't go out of bounds)
- req.section, // Requested data
- req.section, // Duplicated data (ensure it doesn't double close anything)
- }, [][]byte{
- {},
- new(big.Int).SetUint64(req.section).Bytes(),
- new(big.Int).SetUint64(req.section).Bytes(),
- })
- }
- }()
- }
- // Start a batch of goroutines to concurrently run scheduling tasks
- quit := make(chan struct{})
-
- var pend sync.WaitGroup
- pend.Add(clients)
-
- for i := 0; i < clients; i++ {
- go func() {
- defer pend.Done()
-
- in := make(chan uint64, 16)
- out := make(chan []byte, 16)
-
- f.run(in, fetch, out, quit, &pend)
-
- go func() {
- for j := 0; j < requests; j++ {
- in <- uint64(j)
- }
- close(in)
- }()
- b := new(big.Int)
- for j := 0; j < requests; j++ {
- bits := <-out
- if want := b.SetUint64(uint64(j)).Bytes(); !bytes.Equal(bits, want) {
- t.Errorf("vector %d: delivered content mismatch: have %x, want %x", j, bits, want)
- }
- }
- }()
- }
- pend.Wait()
-
- if have := delivered.Load(); int(have) != requests {
- t.Errorf("request count mismatch: have %v, want %v", have, requests)
- }
-}
diff --git a/core/chain_indexer.go b/core/chain_indexer.go
deleted file mode 100644
index 2865daa1ff..0000000000
--- a/core/chain_indexer.go
+++ /dev/null
@@ -1,522 +0,0 @@
-// Copyright 2017 The go-ethereum Authors
-// This file is part of the go-ethereum library.
-//
-// The go-ethereum library is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Lesser General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// The go-ethereum library is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the go-ethereum library. If not, see .
-
-package core
-
-import (
- "context"
- "encoding/binary"
- "errors"
- "fmt"
- "sync"
- "sync/atomic"
- "time"
-
- "github.com/ethereum/go-ethereum/common"
- "github.com/ethereum/go-ethereum/core/rawdb"
- "github.com/ethereum/go-ethereum/core/types"
- "github.com/ethereum/go-ethereum/ethdb"
- "github.com/ethereum/go-ethereum/event"
- "github.com/ethereum/go-ethereum/log"
-)
-
-// ChainIndexerBackend defines the methods needed to process chain segments in
-// the background and write the segment results into the database. These can be
-// used to create filter blooms or CHTs.
-type ChainIndexerBackend interface {
- // Reset initiates the processing of a new chain segment, potentially terminating
- // any partially completed operations (in case of a reorg).
- Reset(ctx context.Context, section uint64, prevHead common.Hash) error
-
- // Process crunches through the next header in the chain segment. The caller
- // will ensure a sequential order of headers.
- Process(ctx context.Context, header *types.Header) error
-
- // Commit finalizes the section metadata and stores it into the database.
- Commit() error
-
- // Prune deletes the chain index older than the given threshold.
- Prune(threshold uint64) error
-}
-
-// ChainIndexerChain interface is used for connecting the indexer to a blockchain
-type ChainIndexerChain interface {
- // CurrentHeader retrieves the latest locally known header.
- CurrentHeader() *types.Header
-
- // SubscribeChainHeadEvent subscribes to new head header notifications.
- SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Subscription
-}
-
-// ChainIndexer does a post-processing job for equally sized sections of the
-// canonical chain (like BlooomBits and CHT structures). A ChainIndexer is
-// connected to the blockchain through the event system by starting a
-// ChainHeadEventLoop in a goroutine.
-//
-// Further child ChainIndexers can be added which use the output of the parent
-// section indexer. These child indexers receive new head notifications only
-// after an entire section has been finished or in case of rollbacks that might
-// affect already finished sections.
-type ChainIndexer struct {
- chainDb ethdb.Database // Chain database to index the data from
- indexDb ethdb.Database // Prefixed table-view of the db to write index metadata into
- backend ChainIndexerBackend // Background processor generating the index data content
- children []*ChainIndexer // Child indexers to cascade chain updates to
-
- active atomic.Bool // Flag whether the event loop was started
- update chan struct{} // Notification channel that headers should be processed
- quit chan chan error // Quit channel to tear down running goroutines
- ctx context.Context
- ctxCancel func()
-
- sectionSize uint64 // Number of blocks in a single chain segment to process
- confirmsReq uint64 // Number of confirmations before processing a completed segment
-
- storedSections uint64 // Number of sections successfully indexed into the database
- knownSections uint64 // Number of sections known to be complete (block wise)
- cascadedHead uint64 // Block number of the last completed section cascaded to subindexers
-
- checkpointSections uint64 // Number of sections covered by the checkpoint
- checkpointHead common.Hash // Section head belonging to the checkpoint
-
- throttling time.Duration // Disk throttling to prevent a heavy upgrade from hogging resources
-
- log log.Logger
- lock sync.Mutex
-}
-
-// NewChainIndexer creates a new chain indexer to do background processing on
-// chain segments of a given size after certain number of confirmations passed.
-// The throttling parameter might be used to prevent database thrashing.
-func NewChainIndexer(chainDb ethdb.Database, indexDb ethdb.Database, backend ChainIndexerBackend, section, confirm uint64, throttling time.Duration, kind string) *ChainIndexer {
- c := &ChainIndexer{
- chainDb: chainDb,
- indexDb: indexDb,
- backend: backend,
- update: make(chan struct{}, 1),
- quit: make(chan chan error),
- sectionSize: section,
- confirmsReq: confirm,
- throttling: throttling,
- log: log.New("type", kind),
- }
- // Initialize database dependent fields and start the updater
- c.loadValidSections()
- c.ctx, c.ctxCancel = context.WithCancel(context.Background())
-
- go c.updateLoop()
-
- return c
-}
-
-// AddCheckpoint adds a checkpoint. Sections are never processed and the chain
-// is not expected to be available before this point. The indexer assumes that
-// the backend has sufficient information available to process subsequent sections.
-//
-// Note: knownSections == 0 and storedSections == checkpointSections until
-// syncing reaches the checkpoint
-func (c *ChainIndexer) AddCheckpoint(section uint64, shead common.Hash) {
- c.lock.Lock()
- defer c.lock.Unlock()
-
- // Short circuit if the given checkpoint is below than local's.
- if c.checkpointSections >= section+1 || section < c.storedSections {
- return
- }
- c.checkpointSections = section + 1
- c.checkpointHead = shead
-
- c.setSectionHead(section, shead)
- c.setValidSections(section + 1)
-}
-
-// Start creates a goroutine to feed chain head events into the indexer for
-// cascading background processing. Children do not need to be started, they
-// are notified about new events by their parents.
-func (c *ChainIndexer) Start(chain ChainIndexerChain) {
- events := make(chan ChainHeadEvent, 10)
- sub := chain.SubscribeChainHeadEvent(events)
-
- go c.eventLoop(chain.CurrentHeader(), events, sub)
-}
-
-// Close tears down all goroutines belonging to the indexer and returns any error
-// that might have occurred internally.
-func (c *ChainIndexer) Close() error {
- var errs []error
-
- c.ctxCancel()
-
- // Tear down the primary update loop
- errc := make(chan error)
- c.quit <- errc
- if err := <-errc; err != nil {
- errs = append(errs, err)
- }
- // If needed, tear down the secondary event loop
- if c.active.Load() {
- c.quit <- errc
- if err := <-errc; err != nil {
- errs = append(errs, err)
- }
- }
- // Close all children
- for _, child := range c.children {
- if err := child.Close(); err != nil {
- errs = append(errs, err)
- }
- }
- // Return any failures
- switch {
- case len(errs) == 0:
- return nil
-
- case len(errs) == 1:
- return errs[0]
-
- default:
- return fmt.Errorf("%v", errs)
- }
-}
-
-// eventLoop is a secondary - optional - event loop of the indexer which is only
-// started for the outermost indexer to push chain head events into a processing
-// queue.
-func (c *ChainIndexer) eventLoop(currentHeader *types.Header, events chan ChainHeadEvent, sub event.Subscription) {
- // Mark the chain indexer as active, requiring an additional teardown
- c.active.Store(true)
-
- defer sub.Unsubscribe()
-
- // Fire the initial new head event to start any outstanding processing
- c.newHead(currentHeader.Number.Uint64(), false)
-
- var (
- prevHeader = currentHeader
- prevHash = currentHeader.Hash()
- )
- for {
- select {
- case errc := <-c.quit:
- // Chain indexer terminating, report no failure and abort
- errc <- nil
- return
-
- case ev, ok := <-events:
- // Received a new event, ensure it's not nil (closing) and update
- if !ok {
- errc := <-c.quit
- errc <- nil
- return
- }
- if ev.Header.ParentHash != prevHash {
- // Reorg to the common ancestor if needed (might not exist in light sync mode, skip reorg then)
- // TODO(karalabe, zsfelfoldi): This seems a bit brittle, can we detect this case explicitly?
-
- if rawdb.ReadCanonicalHash(c.chainDb, prevHeader.Number.Uint64()) != prevHash {
- if h := rawdb.FindCommonAncestor(c.chainDb, prevHeader, ev.Header); h != nil {
- c.newHead(h.Number.Uint64(), true)
- }
- }
- }
- c.newHead(ev.Header.Number.Uint64(), false)
-
- prevHeader, prevHash = ev.Header, ev.Header.Hash()
- }
- }
-}
-
-// newHead notifies the indexer about new chain heads and/or reorgs.
-func (c *ChainIndexer) newHead(head uint64, reorg bool) {
- c.lock.Lock()
- defer c.lock.Unlock()
-
- // If a reorg happened, invalidate all sections until that point
- if reorg {
- // Revert the known section number to the reorg point
- known := (head + 1) / c.sectionSize
- stored := known
- if known < c.checkpointSections {
- known = 0
- }
- if stored < c.checkpointSections {
- stored = c.checkpointSections
- }
- if known < c.knownSections {
- c.knownSections = known
- }
- // Revert the stored sections from the database to the reorg point
- if stored < c.storedSections {
- c.setValidSections(stored)
- }
- // Update the new head number to the finalized section end and notify children
- head = known * c.sectionSize
-
- if head < c.cascadedHead {
- c.cascadedHead = head
- for _, child := range c.children {
- child.newHead(c.cascadedHead, true)
- }
- }
- return
- }
- // No reorg, calculate the number of newly known sections and update if high enough
- var sections uint64
- if head >= c.confirmsReq {
- sections = (head + 1 - c.confirmsReq) / c.sectionSize
- if sections < c.checkpointSections {
- sections = 0
- }
- if sections > c.knownSections {
- if c.knownSections < c.checkpointSections {
- // syncing reached the checkpoint, verify section head
- syncedHead := rawdb.ReadCanonicalHash(c.chainDb, c.checkpointSections*c.sectionSize-1)
- if syncedHead != c.checkpointHead {
- c.log.Error("Synced chain does not match checkpoint", "number", c.checkpointSections*c.sectionSize-1, "expected", c.checkpointHead, "synced", syncedHead)
- return
- }
- }
- c.knownSections = sections
-
- select {
- case c.update <- struct{}{}:
- default:
- }
- }
- }
-}
-
-// updateLoop is the main event loop of the indexer which pushes chain segments
-// down into the processing backend.
-func (c *ChainIndexer) updateLoop() {
- var (
- updating bool
- updated time.Time
- )
-
- for {
- select {
- case errc := <-c.quit:
- // Chain indexer terminating, report no failure and abort
- errc <- nil
- return
-
- case <-c.update:
- // Section headers completed (or rolled back), update the index
- c.lock.Lock()
- if c.knownSections > c.storedSections {
- // Periodically print an upgrade log message to the user
- if time.Since(updated) > 8*time.Second {
- if c.knownSections > c.storedSections+1 {
- updating = true
- c.log.Info("Upgrading chain index", "percentage", c.storedSections*100/c.knownSections)
- }
- updated = time.Now()
- }
- // Cache the current section count and head to allow unlocking the mutex
- c.verifyLastHead()
- section := c.storedSections
- var oldHead common.Hash
- if section > 0 {
- oldHead = c.SectionHead(section - 1)
- }
- // Process the newly defined section in the background
- c.lock.Unlock()
- newHead, err := c.processSection(section, oldHead)
- if err != nil {
- select {
- case <-c.ctx.Done():
- <-c.quit <- nil
- return
- default:
- }
- c.log.Error("Section processing failed", "error", err)
- }
- c.lock.Lock()
-
- // If processing succeeded and no reorgs occurred, mark the section completed
- if err == nil && (section == 0 || oldHead == c.SectionHead(section-1)) {
- c.setSectionHead(section, newHead)
- c.setValidSections(section + 1)
- if c.storedSections == c.knownSections && updating {
- updating = false
- c.log.Info("Finished upgrading chain index")
- }
- c.cascadedHead = c.storedSections*c.sectionSize - 1
- for _, child := range c.children {
- c.log.Trace("Cascading chain index update", "head", c.cascadedHead)
- child.newHead(c.cascadedHead, false)
- }
- } else {
- // If processing failed, don't retry until further notification
- c.log.Debug("Chain index processing failed", "section", section, "err", err)
- c.verifyLastHead()
- c.knownSections = c.storedSections
- }
- }
- // If there are still further sections to process, reschedule
- if c.knownSections > c.storedSections {
- time.AfterFunc(c.throttling, func() {
- select {
- case c.update <- struct{}{}:
- default:
- }
- })
- }
- c.lock.Unlock()
- }
- }
-}
-
-// processSection processes an entire section by calling backend functions while
-// ensuring the continuity of the passed headers. Since the chain mutex is not
-// held while processing, the continuity can be broken by a long reorg, in which
-// case the function returns with an error.
-func (c *ChainIndexer) processSection(section uint64, lastHead common.Hash) (common.Hash, error) {
- c.log.Trace("Processing new chain section", "section", section)
-
- // Reset and partial processing
- if err := c.backend.Reset(c.ctx, section, lastHead); err != nil {
- c.setValidSections(0)
- return common.Hash{}, err
- }
-
- for number := section * c.sectionSize; number < (section+1)*c.sectionSize; number++ {
- hash := rawdb.ReadCanonicalHash(c.chainDb, number)
- if hash == (common.Hash{}) {
- return common.Hash{}, fmt.Errorf("canonical block #%d unknown", number)
- }
- header := rawdb.ReadHeader(c.chainDb, hash, number)
- if header == nil {
- return common.Hash{}, fmt.Errorf("block #%d [%x..] not found", number, hash[:4])
- } else if header.ParentHash != lastHead {
- return common.Hash{}, errors.New("chain reorged during section processing")
- }
- if err := c.backend.Process(c.ctx, header); err != nil {
- return common.Hash{}, err
- }
- lastHead = header.Hash()
- }
- if err := c.backend.Commit(); err != nil {
- return common.Hash{}, err
- }
- return lastHead, nil
-}
-
-// verifyLastHead compares last stored section head with the corresponding block hash in the
-// actual canonical chain and rolls back reorged sections if necessary to ensure that stored
-// sections are all valid
-func (c *ChainIndexer) verifyLastHead() {
- for c.storedSections > 0 && c.storedSections > c.checkpointSections {
- if c.SectionHead(c.storedSections-1) == rawdb.ReadCanonicalHash(c.chainDb, c.storedSections*c.sectionSize-1) {
- return
- }
- c.setValidSections(c.storedSections - 1)
- }
-}
-
-// Sections returns the number of processed sections maintained by the indexer
-// and also the information about the last header indexed for potential canonical
-// verifications.
-func (c *ChainIndexer) Sections() (uint64, uint64, common.Hash) {
- c.lock.Lock()
- defer c.lock.Unlock()
-
- c.verifyLastHead()
- return c.storedSections, c.storedSections*c.sectionSize - 1, c.SectionHead(c.storedSections - 1)
-}
-
-// AddChildIndexer adds a child ChainIndexer that can use the output of this one
-func (c *ChainIndexer) AddChildIndexer(indexer *ChainIndexer) {
- if indexer == c {
- panic("can't add indexer as a child of itself")
- }
- c.lock.Lock()
- defer c.lock.Unlock()
-
- c.children = append(c.children, indexer)
-
- // Cascade any pending updates to new children too
- sections := c.storedSections
- if c.knownSections < sections {
- // if a section is "stored" but not "known" then it is a checkpoint without
- // available chain data so we should not cascade it yet
- sections = c.knownSections
- }
- if sections > 0 {
- indexer.newHead(sections*c.sectionSize-1, false)
- }
-}
-
-// Prune deletes all chain data older than given threshold.
-func (c *ChainIndexer) Prune(threshold uint64) error {
- return c.backend.Prune(threshold)
-}
-
-// loadValidSections reads the number of valid sections from the index database
-// and caches is into the local state.
-func (c *ChainIndexer) loadValidSections() {
- data, _ := c.indexDb.Get([]byte("count"))
- if len(data) == 8 {
- c.storedSections = binary.BigEndian.Uint64(data)
- }
-}
-
-// setValidSections writes the number of valid sections to the index database
-func (c *ChainIndexer) setValidSections(sections uint64) {
- // Set the current number of valid sections in the database
- var data [8]byte
- binary.BigEndian.PutUint64(data[:], sections)
- c.indexDb.Put([]byte("count"), data[:])
-
- // Remove any reorged sections, caching the valids in the mean time
- for c.storedSections > sections {
- c.storedSections--
- c.removeSectionHead(c.storedSections)
- }
- c.storedSections = sections // needed if new > old
-}
-
-// SectionHead retrieves the last block hash of a processed section from the
-// index database.
-func (c *ChainIndexer) SectionHead(section uint64) common.Hash {
- var data [8]byte
- binary.BigEndian.PutUint64(data[:], section)
-
- hash, _ := c.indexDb.Get(append([]byte("shead"), data[:]...))
- if len(hash) == len(common.Hash{}) {
- return common.BytesToHash(hash)
- }
- return common.Hash{}
-}
-
-// setSectionHead writes the last block hash of a processed section to the index
-// database.
-func (c *ChainIndexer) setSectionHead(section uint64, hash common.Hash) {
- var data [8]byte
- binary.BigEndian.PutUint64(data[:], section)
-
- c.indexDb.Put(append([]byte("shead"), data[:]...), hash.Bytes())
-}
-
-// removeSectionHead removes the reference to a processed section from the index
-// database.
-func (c *ChainIndexer) removeSectionHead(section uint64) {
- var data [8]byte
- binary.BigEndian.PutUint64(data[:], section)
-
- c.indexDb.Delete(append([]byte("shead"), data[:]...))
-}
diff --git a/core/chain_indexer_test.go b/core/chain_indexer_test.go
deleted file mode 100644
index bf3bde756c..0000000000
--- a/core/chain_indexer_test.go
+++ /dev/null
@@ -1,246 +0,0 @@
-// Copyright 2017 The go-ethereum Authors
-// This file is part of the go-ethereum library.
-//
-// The go-ethereum library is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Lesser General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// The go-ethereum library is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the go-ethereum library. If not, see .
-
-package core
-
-import (
- "context"
- "errors"
- "fmt"
- "math/big"
- "math/rand"
- "testing"
- "time"
-
- "github.com/ethereum/go-ethereum/common"
- "github.com/ethereum/go-ethereum/core/rawdb"
- "github.com/ethereum/go-ethereum/core/types"
-)
-
-// Runs multiple tests with randomized parameters.
-func TestChainIndexerSingle(t *testing.T) {
- for i := 0; i < 10; i++ {
- testChainIndexer(t, 1)
- }
-}
-
-// Runs multiple tests with randomized parameters and different number of
-// chain backends.
-func TestChainIndexerWithChildren(t *testing.T) {
- for i := 2; i < 8; i++ {
- testChainIndexer(t, i)
- }
-}
-
-// testChainIndexer runs a test with either a single chain indexer or a chain of
-// multiple backends. The section size and required confirmation count parameters
-// are randomized.
-func testChainIndexer(t *testing.T, count int) {
- db := rawdb.NewMemoryDatabase()
- defer db.Close()
-
- // Create a chain of indexers and ensure they all report empty
- backends := make([]*testChainIndexBackend, count)
- for i := 0; i < count; i++ {
- var (
- sectionSize = uint64(rand.Intn(100) + 1)
- confirmsReq = uint64(rand.Intn(10))
- )
- backends[i] = &testChainIndexBackend{t: t, processCh: make(chan uint64)}
- backends[i].indexer = NewChainIndexer(db, rawdb.NewTable(db, string([]byte{byte(i)})), backends[i], sectionSize, confirmsReq, 0, fmt.Sprintf("indexer-%d", i))
-
- if sections, _, _ := backends[i].indexer.Sections(); sections != 0 {
- t.Fatalf("Canonical section count mismatch: have %v, want %v", sections, 0)
- }
- if i > 0 {
- backends[i-1].indexer.AddChildIndexer(backends[i].indexer)
- }
- }
- defer backends[0].indexer.Close() // parent indexer shuts down children
- // notify pings the root indexer about a new head or reorg, then expect
- // processed blocks if a section is processable
- notify := func(headNum, failNum uint64, reorg bool) {
- backends[0].indexer.newHead(headNum, reorg)
- if reorg {
- for _, backend := range backends {
- headNum = backend.reorg(headNum)
- backend.assertSections()
- }
- return
- }
- var cascade bool
- for _, backend := range backends {
- headNum, cascade = backend.assertBlocks(headNum, failNum)
- if !cascade {
- break
- }
- backend.assertSections()
- }
- }
- // inject inserts a new random canonical header into the database directly
- inject := func(number uint64) {
- header := &types.Header{Number: big.NewInt(int64(number)), Extra: big.NewInt(rand.Int63()).Bytes()}
- if number > 0 {
- header.ParentHash = rawdb.ReadCanonicalHash(db, number-1)
- }
- rawdb.WriteHeader(db, header)
- rawdb.WriteCanonicalHash(db, header.Hash(), number)
- }
- // Start indexer with an already existing chain
- for i := uint64(0); i <= 100; i++ {
- inject(i)
- }
- notify(100, 100, false)
-
- // Add new blocks one by one
- for i := uint64(101); i <= 1000; i++ {
- inject(i)
- notify(i, i, false)
- }
- // Do a reorg
- notify(500, 500, true)
-
- // Create new fork
- for i := uint64(501); i <= 1000; i++ {
- inject(i)
- notify(i, i, false)
- }
- for i := uint64(1001); i <= 1500; i++ {
- inject(i)
- }
- // Failed processing scenario where less blocks are available than notified
- notify(2000, 1500, false)
-
- // Notify about a reorg (which could have caused the missing blocks if happened during processing)
- notify(1500, 1500, true)
-
- // Create new fork
- for i := uint64(1501); i <= 2000; i++ {
- inject(i)
- notify(i, i, false)
- }
-}
-
-// testChainIndexBackend implements ChainIndexerBackend
-type testChainIndexBackend struct {
- t *testing.T
- indexer *ChainIndexer
- section, headerCnt, stored uint64
- processCh chan uint64
-}
-
-// assertSections verifies if a chain indexer has the correct number of section.
-func (b *testChainIndexBackend) assertSections() {
- // Keep trying for 3 seconds if it does not match
- var sections uint64
- for i := 0; i < 300; i++ {
- sections, _, _ = b.indexer.Sections()
- if sections == b.stored {
- return
- }
- time.Sleep(10 * time.Millisecond)
- }
- b.t.Fatalf("Canonical section count mismatch: have %v, want %v", sections, b.stored)
-}
-
-// assertBlocks expects processing calls after new blocks have arrived. If the
-// failNum < headNum then we are simulating a scenario where a reorg has happened
-// after the processing has started and the processing of a section fails.
-func (b *testChainIndexBackend) assertBlocks(headNum, failNum uint64) (uint64, bool) {
- var sections uint64
- if headNum >= b.indexer.confirmsReq {
- sections = (headNum + 1 - b.indexer.confirmsReq) / b.indexer.sectionSize
- if sections > b.stored {
- // expect processed blocks
- for expectd := b.stored * b.indexer.sectionSize; expectd < sections*b.indexer.sectionSize; expectd++ {
- if expectd > failNum {
- // rolled back after processing started, no more process calls expected
- // wait until updating is done to make sure that processing actually fails
- var updating bool
- for i := 0; i < 300; i++ {
- b.indexer.lock.Lock()
- updating = b.indexer.knownSections > b.indexer.storedSections
- b.indexer.lock.Unlock()
- if !updating {
- break
- }
- time.Sleep(10 * time.Millisecond)
- }
- if updating {
- b.t.Fatalf("update did not finish")
- }
- sections = expectd / b.indexer.sectionSize
- break
- }
- select {
- case <-time.After(10 * time.Second):
- b.t.Fatalf("Expected processed block #%d, got nothing", expectd)
- case processed := <-b.processCh:
- if processed != expectd {
- b.t.Errorf("Expected processed block #%d, got #%d", expectd, processed)
- }
- }
- }
- b.stored = sections
- }
- }
- if b.stored == 0 {
- return 0, false
- }
- return b.stored*b.indexer.sectionSize - 1, true
-}
-
-func (b *testChainIndexBackend) reorg(headNum uint64) uint64 {
- firstChanged := (headNum + 1) / b.indexer.sectionSize
- if firstChanged < b.stored {
- b.stored = firstChanged
- }
- return b.stored * b.indexer.sectionSize
-}
-
-func (b *testChainIndexBackend) Reset(ctx context.Context, section uint64, prevHead common.Hash) error {
- b.section = section
- b.headerCnt = 0
- return nil
-}
-
-func (b *testChainIndexBackend) Process(ctx context.Context, header *types.Header) error {
- b.headerCnt++
- if b.headerCnt > b.indexer.sectionSize {
- b.t.Error("Processing too many headers")
- }
- //t.processCh <- header.Number.Uint64()
- select {
- case <-time.After(10 * time.Second):
- b.t.Error("Unexpected call to Process")
- // Can't use Fatal since this is not the test's goroutine.
- // Returning error stops the chainIndexer's updateLoop
- return errors.New("unexpected call to Process")
- case b.processCh <- header.Number.Uint64():
- }
- return nil
-}
-
-func (b *testChainIndexBackend) Commit() error {
- if b.headerCnt != b.indexer.sectionSize {
- b.t.Error("Not enough headers processed")
- }
- return nil
-}
-
-func (b *testChainIndexBackend) Prune(threshold uint64) error {
- return nil
-}
diff --git a/core/rawdb/accessors_indexes.go b/core/rawdb/accessors_indexes.go
index d1b0cf5053..68c3454e43 100644
--- a/core/rawdb/accessors_indexes.go
+++ b/core/rawdb/accessors_indexes.go
@@ -17,7 +17,6 @@
package rawdb
import (
- "bytes"
"encoding/binary"
"errors"
"math/big"
@@ -147,41 +146,6 @@ func ReadReceipt(db ethdb.Reader, hash common.Hash, config *params.ChainConfig)
return nil, common.Hash{}, 0, 0
}
-// ReadBloomBits retrieves the compressed bloom bit vector belonging to the given
-// section and bit index from the.
-func ReadBloomBits(db ethdb.KeyValueReader, bit uint, section uint64, head common.Hash) ([]byte, error) {
- return db.Get(bloomBitsKey(bit, section, head))
-}
-
-// WriteBloomBits stores the compressed bloom bits vector belonging to the given
-// section and bit index.
-func WriteBloomBits(db ethdb.KeyValueWriter, bit uint, section uint64, head common.Hash, bits []byte) {
- if err := db.Put(bloomBitsKey(bit, section, head), bits); err != nil {
- log.Crit("Failed to store bloom bits", "err", err)
- }
-}
-
-// DeleteBloombits removes all compressed bloom bits vector belonging to the
-// given section range and bit index.
-func DeleteBloombits(db ethdb.Database, bit uint, from uint64, to uint64) {
- start, end := bloomBitsKey(bit, from, common.Hash{}), bloomBitsKey(bit, to, common.Hash{})
- it := db.NewIterator(nil, start)
- defer it.Release()
-
- for it.Next() {
- if bytes.Compare(it.Key(), end) >= 0 {
- break
- }
- if len(it.Key()) != len(bloomBitsPrefix)+2+8+32 {
- continue
- }
- db.Delete(it.Key())
- }
- if it.Error() != nil {
- log.Crit("Failed to delete bloom bits", "err", it.Error())
- }
-}
-
var emptyRow = []uint32{}
// ReadFilterMapRow retrieves a filter map row at the given mapRowIndex
diff --git a/core/rawdb/accessors_indexes_test.go b/core/rawdb/accessors_indexes_test.go
index 1bee455503..fc93b4ed19 100644
--- a/core/rawdb/accessors_indexes_test.go
+++ b/core/rawdb/accessors_indexes_test.go
@@ -111,46 +111,3 @@ func TestLookupStorage(t *testing.T) {
})
}
}
-
-func TestDeleteBloomBits(t *testing.T) {
- // Prepare testing data
- db := NewMemoryDatabase()
- for i := uint(0); i < 2; i++ {
- for s := uint64(0); s < 2; s++ {
- WriteBloomBits(db, i, s, params.MainnetGenesisHash, []byte{0x01, 0x02})
- WriteBloomBits(db, i, s, params.SepoliaGenesisHash, []byte{0x01, 0x02})
- }
- }
- check := func(bit uint, section uint64, head common.Hash, exist bool) {
- bits, _ := ReadBloomBits(db, bit, section, head)
- if exist && !bytes.Equal(bits, []byte{0x01, 0x02}) {
- t.Fatalf("Bloombits mismatch")
- }
- if !exist && len(bits) > 0 {
- t.Fatalf("Bloombits should be removed")
- }
- }
- // Check the existence of written data.
- check(0, 0, params.MainnetGenesisHash, true)
- check(0, 0, params.SepoliaGenesisHash, true)
-
- // Check the existence of deleted data.
- DeleteBloombits(db, 0, 0, 1)
- check(0, 0, params.MainnetGenesisHash, false)
- check(0, 0, params.SepoliaGenesisHash, false)
- check(0, 1, params.MainnetGenesisHash, true)
- check(0, 1, params.SepoliaGenesisHash, true)
-
- // Check the existence of deleted data.
- DeleteBloombits(db, 0, 0, 2)
- check(0, 0, params.MainnetGenesisHash, false)
- check(0, 0, params.SepoliaGenesisHash, false)
- check(0, 1, params.MainnetGenesisHash, false)
- check(0, 1, params.SepoliaGenesisHash, false)
-
- // Bit1 shouldn't be affect.
- check(1, 0, params.MainnetGenesisHash, true)
- check(1, 0, params.SepoliaGenesisHash, true)
- check(1, 1, params.MainnetGenesisHash, true)
- check(1, 1, params.SepoliaGenesisHash, true)
-}
diff --git a/core/rawdb/database.go b/core/rawdb/database.go
index 53418d1646..1bf5ca8d3f 100644
--- a/core/rawdb/database.go
+++ b/core/rawdb/database.go
@@ -375,7 +375,6 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error {
accountSnaps stat
storageSnaps stat
preimages stat
- bloomBits stat
beaconHeaders stat
cliqueSnaps stat
@@ -436,10 +435,6 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error {
metadata.Add(size)
case bytes.HasPrefix(key, genesisPrefix) && len(key) == (len(genesisPrefix)+common.HashLength):
metadata.Add(size)
- case bytes.HasPrefix(key, bloomBitsPrefix) && len(key) == (len(bloomBitsPrefix)+10+common.HashLength):
- bloomBits.Add(size)
- case bytes.HasPrefix(key, BloomBitsIndexPrefix):
- bloomBits.Add(size)
case bytes.HasPrefix(key, skeletonHeaderPrefix) && len(key) == (len(skeletonHeaderPrefix)+8):
beaconHeaders.Add(size)
case bytes.HasPrefix(key, CliqueSnapshotPrefix) && len(key) == 7+common.HashLength:
@@ -504,7 +499,6 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error {
{"Key-Value store", "Block number->hash", numHashPairings.Size(), numHashPairings.Count()},
{"Key-Value store", "Block hash->number", hashNumPairings.Size(), hashNumPairings.Count()},
{"Key-Value store", "Transaction index", txLookups.Size(), txLookups.Count()},
- {"Key-Value store", "Bloombit index", bloomBits.Size(), bloomBits.Count()},
{"Key-Value store", "Contract codes", codes.Size(), codes.Count()},
{"Key-Value store", "Hash trie nodes", legacyTries.Size(), legacyTries.Count()},
{"Key-Value store", "Path trie state lookups", stateLookups.Size(), stateLookups.Count()},
diff --git a/core/rawdb/schema.go b/core/rawdb/schema.go
index 25c4d88a54..60d6a58ade 100644
--- a/core/rawdb/schema.go
+++ b/core/rawdb/schema.go
@@ -230,16 +230,6 @@ func storageSnapshotsKey(accountHash common.Hash) []byte {
return append(SnapshotStoragePrefix, accountHash.Bytes()...)
}
-// bloomBitsKey = bloomBitsPrefix + bit (uint16 big endian) + section (uint64 big endian) + hash
-func bloomBitsKey(bit uint, section uint64, hash common.Hash) []byte {
- key := append(append(bloomBitsPrefix, make([]byte, 10)...), hash.Bytes()...)
-
- binary.BigEndian.PutUint16(key[1:], uint16(bit))
- binary.BigEndian.PutUint64(key[3:], section)
-
- return key
-}
-
// skeletonHeaderKey = skeletonHeaderPrefix + num (uint64 big endian)
func skeletonHeaderKey(number uint64) []byte {
return append(skeletonHeaderPrefix, encodeBlockNumber(number)...)
diff --git a/eth/api_backend.go b/eth/api_backend.go
index b7dfaad4c4..44e66e78d0 100644
--- a/eth/api_backend.go
+++ b/eth/api_backend.go
@@ -28,7 +28,6 @@ import (
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/consensus/misc/eip4844"
"github.com/ethereum/go-ethereum/core"
- "github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/filtermaps"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state"
@@ -401,17 +400,6 @@ func (b *EthAPIBackend) RPCTxFeeCap() float64 {
return b.eth.config.RPCTxFeeCap
}
-func (b *EthAPIBackend) BloomStatus() (uint64, uint64) {
- sections, _, _ := b.eth.bloomIndexer.Sections()
- return params.BloomBitsBlocks, sections
-}
-
-func (b *EthAPIBackend) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) {
- for i := 0; i < bloomFilterThreads; i++ {
- go session.Multiplex(bloomRetrievalBatch, bloomRetrievalWait, b.eth.bloomRequests)
- }
-}
-
func (b *EthAPIBackend) NewMatcherBackend() filtermaps.MatcherBackend {
return b.eth.filterMaps.NewMatcherBackend()
}
diff --git a/eth/backend.go b/eth/backend.go
index 06255add21..2acded6554 100644
--- a/eth/backend.go
+++ b/eth/backend.go
@@ -29,7 +29,6 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/core"
- "github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/filtermaps"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state/pruner"
@@ -82,10 +81,6 @@ type Ethereum struct {
engine consensus.Engine
accountManager *accounts.Manager
- bloomRequests chan chan *bloombits.Retrieval // Channel receiving bloom data retrieval requests
- bloomIndexer *core.ChainIndexer // Bloom indexer operating during block imports
- closeBloomHandler chan struct{}
-
filterMaps *filtermaps.FilterMaps
APIBackend *EthAPIBackend
@@ -154,19 +149,16 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
networkID = chainConfig.ChainID.Uint64()
}
eth := &Ethereum{
- config: config,
- chainDb: chainDb,
- eventMux: stack.EventMux(),
- accountManager: stack.AccountManager(),
- engine: engine,
- closeBloomHandler: make(chan struct{}),
- networkID: networkID,
- gasPrice: config.Miner.GasPrice,
- bloomRequests: make(chan chan *bloombits.Retrieval),
- bloomIndexer: core.NewBloomIndexer(chainDb, params.BloomBitsBlocks, params.BloomConfirms),
- p2pServer: stack.Server(),
- discmix: enode.NewFairMix(0),
- shutdownTracker: shutdowncheck.NewShutdownTracker(chainDb),
+ config: config,
+ chainDb: chainDb,
+ eventMux: stack.EventMux(),
+ accountManager: stack.AccountManager(),
+ engine: engine,
+ networkID: networkID,
+ gasPrice: config.Miner.GasPrice,
+ p2pServer: stack.Server(),
+ discmix: enode.NewFairMix(0),
+ shutdownTracker: shutdowncheck.NewShutdownTracker(chainDb),
}
bcVersion := rawdb.ReadDatabaseVersion(chainDb)
var dbVer = ""
@@ -224,7 +216,6 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
if err != nil {
return nil, err
}
- eth.bloomIndexer.Start(eth.blockchain)
eth.filterMaps = filtermaps.NewFilterMaps(chainDb, eth.blockchain)
if config.BlobPool.Datadir != "" {
@@ -261,10 +252,10 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
eth.miner.SetExtra(makeExtraData(config.Miner.ExtraData))
eth.APIBackend = &EthAPIBackend{
- extRPCEnabled: stack.Config().ExtRPCEnabled(),
- allowUnprotectedTxs: stack.Config().AllowUnprotectedTxs,
- eth: eth,
- gpo: nil,
+ extRPCEnabled: stack.Config().ExtRPCEnabled(),
+ allowUnprotectedTxs: stack.Config().AllowUnprotectedTxs,
+ eth: eth,
+ gpo: nil,
}
if eth.APIBackend.allowUnprotectedTxs {
log.Info("Unprotected transactions allowed")
@@ -348,7 +339,6 @@ func (s *Ethereum) Downloader() *downloader.Downloader { return s.handler.downlo
func (s *Ethereum) Synced() bool { return s.handler.synced.Load() }
func (s *Ethereum) SetSynced() { s.handler.enableSyncedFeatures() }
func (s *Ethereum) ArchiveMode() bool { return s.config.NoPruning }
-func (s *Ethereum) BloomIndexer() *core.ChainIndexer { return s.bloomIndexer }
// Protocols returns all the currently configured
// network protocols to start.
@@ -365,9 +355,6 @@ func (s *Ethereum) Protocols() []p2p.Protocol {
func (s *Ethereum) Start() error {
s.setupDiscovery()
- // Start the bloom bits servicing goroutines
- s.startBloomHandlers(params.BloomBitsBlocks)
-
// Regularly update shutdown marker
s.shutdownTracker.Start()
@@ -416,9 +403,7 @@ func (s *Ethereum) Stop() error {
s.handler.Stop()
// Then stop everything else.
- s.bloomIndexer.Close()
s.filterMaps.Close()
- close(s.closeBloomHandler)
s.txPool.Close()
s.blockchain.Stop()
s.engine.Close()
diff --git a/eth/bloombits.go b/eth/bloombits.go
deleted file mode 100644
index 0cb7050d23..0000000000
--- a/eth/bloombits.go
+++ /dev/null
@@ -1,74 +0,0 @@
-// Copyright 2017 The go-ethereum Authors
-// This file is part of the go-ethereum library.
-//
-// The go-ethereum library is free software: you can redistribute it and/or modify
-// it under the terms of the GNU Lesser General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-//
-// The go-ethereum library is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU Lesser General Public License for more details.
-//
-// You should have received a copy of the GNU Lesser General Public License
-// along with the go-ethereum library. If not, see .
-
-package eth
-
-import (
- "time"
-
- "github.com/ethereum/go-ethereum/common/bitutil"
- "github.com/ethereum/go-ethereum/core/rawdb"
-)
-
-const (
- // bloomServiceThreads is the number of goroutines used globally by an Ethereum
- // instance to service bloombits lookups for all running filters.
- bloomServiceThreads = 16
-
- // bloomFilterThreads is the number of goroutines used locally per filter to
- // multiplex requests onto the global servicing goroutines.
- bloomFilterThreads = 3
-
- // bloomRetrievalBatch is the maximum number of bloom bit retrievals to service
- // in a single batch.
- bloomRetrievalBatch = 16
-
- // bloomRetrievalWait is the maximum time to wait for enough bloom bit requests
- // to accumulate request an entire batch (avoiding hysteresis).
- bloomRetrievalWait = time.Duration(0)
-)
-
-// startBloomHandlers starts a batch of goroutines to accept bloom bit database
-// retrievals from possibly a range of filters and serving the data to satisfy.
-func (eth *Ethereum) startBloomHandlers(sectionSize uint64) {
- for i := 0; i < bloomServiceThreads; i++ {
- go func() {
- for {
- select {
- case <-eth.closeBloomHandler:
- return
-
- case request := <-eth.bloomRequests:
- task := <-request
- task.Bitsets = make([][]byte, len(task.Sections))
- for i, section := range task.Sections {
- head := rawdb.ReadCanonicalHash(eth.chainDb, (section+1)*sectionSize-1)
- if compVector, err := rawdb.ReadBloomBits(eth.chainDb, task.Bit, section, head); err == nil {
- if blob, err := bitutil.DecompressBytes(compVector, int(sectionSize/8)); err == nil {
- task.Bitsets[i] = blob
- } else {
- task.Error = err
- }
- } else {
- task.Error = err
- }
- }
- request <- task
- }
- }
- }()
- }
-}
diff --git a/eth/filters/filter.go b/eth/filters/filter.go
index 7b0be8d024..2fcf0945ba 100644
--- a/eth/filters/filter.go
+++ b/eth/filters/filter.go
@@ -25,7 +25,6 @@ import (
"time"
"github.com/ethereum/go-ethereum/common"
- "github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/filtermaps"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
@@ -42,37 +41,13 @@ type Filter struct {
block *common.Hash // Block hash if filtering a single block
begin, end int64 // Range interval if filtering multiple blocks
bbMatchCount uint64
-
- matcher *bloombits.Matcher
}
// NewRangeFilter creates a new filter which uses a bloom filter on blocks to
// figure out whether a particular block is interesting or not.
func (sys *FilterSystem) NewRangeFilter(begin, end int64, addresses []common.Address, topics [][]common.Hash) *Filter {
- // Flatten the address and topic filter clauses into a single bloombits filter
- // system. Since the bloombits are not positional, nil topics are permitted,
- // which get flattened into a nil byte slice.
- var filters [][][]byte
- if len(addresses) > 0 {
- filter := make([][]byte, len(addresses))
- for i, address := range addresses {
- filter[i] = address.Bytes()
- }
- filters = append(filters, filter)
- }
- for _, topicList := range topics {
- filter := make([][]byte, len(topicList))
- for i, topic := range topicList {
- filter[i] = topic.Bytes()
- }
- filters = append(filters, filter)
- }
- size, _ := sys.backend.BloomStatus()
-
// Create a generic filter and convert it into a range filter
filter := newFilter(sys, addresses, topics)
-
- filter.matcher = bloombits.NewMatcher(size, filters)
filter.begin = begin
filter.end = end
@@ -197,23 +172,7 @@ func (f *Filter) rangeLogsAsync(ctx context.Context) (chan *types.Log, chan erro
close(logChan)
}()
- // Gather all indexed logs, and finish with non indexed ones
- var (
- end = uint64(f.end)
- size, sections = f.sys.backend.BloomStatus()
- err error
- )
- if indexed := sections * size; indexed > uint64(f.begin) {
- if indexed > end {
- indexed = end + 1
- }
- if err = f.indexedLogs(ctx, indexed-1, logChan); err != nil {
- errChan <- err
- return
- }
- }
-
- if err := f.unindexedLogs(ctx, end, logChan); err != nil {
+ if err := f.unindexedLogs(ctx, uint64(f.end), logChan); err != nil {
errChan <- err
return
}
@@ -224,53 +183,6 @@ func (f *Filter) rangeLogsAsync(ctx context.Context) (chan *types.Log, chan erro
return logChan, errChan
}
-// indexedLogs returns the logs matching the filter criteria based on the bloom
-// bits indexed available locally or via the network.
-func (f *Filter) indexedLogs(ctx context.Context, end uint64, logChan chan *types.Log) error {
- // Create a matcher session and request servicing from the backend
- matches := make(chan uint64, 64)
-
- session, err := f.matcher.Start(ctx, uint64(f.begin), end, matches)
- if err != nil {
- return err
- }
- defer session.Close()
-
- f.sys.backend.ServiceFilter(ctx, session)
-
- for {
- select {
- case number, ok := <-matches:
- f.bbMatchCount++
- // Abort if all matches have been fulfilled
- if !ok {
- err := session.Error()
- if err == nil {
- f.begin = int64(end) + 1
- }
- return err
- }
- f.begin = int64(number) + 1
-
- // Retrieve the suggested block and pull any truly matching logs
- header, err := f.sys.backend.HeaderByNumber(ctx, rpc.BlockNumber(number))
- if header == nil || err != nil {
- return err
- }
- found, err := f.checkMatches(ctx, header)
- if err != nil {
- return err
- }
- for _, log := range found {
- logChan <- log
- }
-
- case <-ctx.Done():
- return ctx.Err()
- }
- }
-}
-
// unindexedLogs returns the logs matching the filter criteria based on raw block
// iteration and bloom matching.
func (f *Filter) unindexedLogs(ctx context.Context, end uint64, logChan chan *types.Log) error {
diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go
index 6c3d8be86b..7531a1ecfc 100644
--- a/eth/filters/filter_system.go
+++ b/eth/filters/filter_system.go
@@ -29,7 +29,6 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/lru"
"github.com/ethereum/go-ethereum/core"
- "github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/filtermaps"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
@@ -70,9 +69,6 @@ type Backend interface {
SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription
SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription
- BloomStatus() (uint64, uint64)
- ServiceFilter(ctx context.Context, session *bloombits.MatcherSession)
-
NewMatcherBackend() filtermaps.MatcherBackend
}
diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go
index aec5ee4166..b6119448d9 100644
--- a/eth/filters/filter_system_test.go
+++ b/eth/filters/filter_system_test.go
@@ -29,7 +29,6 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/ethash"
"github.com/ethereum/go-ethereum/core"
- "github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
@@ -137,37 +136,6 @@ func (b *testBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subsc
return b.chainFeed.Subscribe(ch)
}
-func (b *testBackend) BloomStatus() (uint64, uint64) {
- return params.BloomBitsBlocks, b.sections
-}
-
-func (b *testBackend) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) {
- requests := make(chan chan *bloombits.Retrieval)
-
- go session.Multiplex(16, 0, requests)
- go func() {
- for {
- // Wait for a service request or a shutdown
- select {
- case <-ctx.Done():
- return
-
- case request := <-requests:
- task := <-request
-
- task.Bitsets = make([][]byte, len(task.Sections))
- for i, section := range task.Sections {
- if rand.Int()%4 != 0 { // Handle occasional missing deliveries
- head := rawdb.ReadCanonicalHash(b.db, (section+1)*params.BloomBitsBlocks-1)
- task.Bitsets[i], _ = rawdb.ReadBloomBits(b.db, task.Bit, section, head)
- }
- }
- request <- task
- }
- }
- }()
-}
-
func (b *testBackend) setPending(block *types.Block, receipts types.Receipts) {
b.pendingBlock = block
b.pendingReceipts = receipts
diff --git a/internal/ethapi/api_test.go b/internal/ethapi/api_test.go
index ae2ec9f0f0..2b5e535b13 100644
--- a/internal/ethapi/api_test.go
+++ b/internal/ethapi/api_test.go
@@ -45,7 +45,6 @@ import (
"github.com/ethereum/go-ethereum/consensus/beacon"
"github.com/ethereum/go-ethereum/consensus/ethash"
"github.com/ethereum/go-ethereum/core"
- "github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
@@ -620,11 +619,6 @@ func (b testBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent)
func (b testBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription {
panic("implement me")
}
-func (b testBackend) BloomStatus() (uint64, uint64) { panic("implement me") }
-func (b testBackend) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) {
- panic("implement me")
-}
-
func TestEstimateGas(t *testing.T) {
t.Parallel()
// Initialize test accounts
diff --git a/internal/ethapi/backend.go b/internal/ethapi/backend.go
index f9ce2ef474..9e2ea2c876 100644
--- a/internal/ethapi/backend.go
+++ b/internal/ethapi/backend.go
@@ -27,7 +27,6 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/core"
- "github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/filtermaps"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
@@ -94,8 +93,6 @@ type Backend interface {
GetLogs(ctx context.Context, blockHash common.Hash, number uint64) ([][]*types.Log, error)
SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription
SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription
- BloomStatus() (uint64, uint64)
- ServiceFilter(ctx context.Context, session *bloombits.MatcherSession)
NewMatcherBackend() filtermaps.MatcherBackend
}
diff --git a/internal/ethapi/transaction_args_test.go b/internal/ethapi/transaction_args_test.go
index 7172fc883f..2855bc1803 100644
--- a/internal/ethapi/transaction_args_test.go
+++ b/internal/ethapi/transaction_args_test.go
@@ -30,7 +30,6 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/core"
- "github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
@@ -393,10 +392,8 @@ func (b *backendMock) TxPoolContent() (map[common.Address][]*types.Transaction,
func (b *backendMock) TxPoolContentFrom(addr common.Address) ([]*types.Transaction, []*types.Transaction) {
return nil, nil
}
-func (b *backendMock) SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription { return nil }
-func (b *backendMock) BloomStatus() (uint64, uint64) { return 0, 0 }
-func (b *backendMock) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) {}
-func (b *backendMock) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription { return nil }
+func (b *backendMock) SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription { return nil }
+func (b *backendMock) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription { return nil }
func (b *backendMock) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription {
return nil
}
diff --git a/params/network_params.go b/params/network_params.go
index 61bd6b2f42..c016e7fcf3 100644
--- a/params/network_params.go
+++ b/params/network_params.go
@@ -20,14 +20,6 @@ package params
// aren't necessarily consensus related.
const (
- // BloomBitsBlocks is the number of blocks a single bloom bit section vector
- // contains on the server side.
- BloomBitsBlocks uint64 = 4096
-
- // BloomConfirms is the number of confirmation blocks before a bloom section is
- // considered probably final and its rotated bits are calculated.
- BloomConfirms = 256
-
// FullImmutabilityThreshold is the number of blocks after which a chain segment is
// considered immutable (i.e. soft finality). It is used by the downloader as a
// hard limit against deep ancestors, by the blockchain against deep reorgs, by