core/bloombits, eth/filters: removed bloombits

This commit is contained in:
Zsolt Felfoldi 2024-09-27 03:49:41 +02:00
parent 197c77792f
commit 66ab6aab4a
25 changed files with 18 additions and 2659 deletions

View File

@ -894,7 +894,7 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Ha
rawdb.DeleteBody(db, hash, num) rawdb.DeleteBody(db, hash, num)
rawdb.DeleteReceipts(db, hash, num) rawdb.DeleteReceipts(db, hash, num)
} }
// Todo(rjl493456442) txlookup, bloombits, etc // Todo(rjl493456442) txlookup, log index, etc
} }
// If SetHead was only called as a chain reparation method, try to skip // If SetHead was only called as a chain reparation method, try to skip
// touching the header chain altogether, unless the freezer is broken // touching the header chain altogether, unless the freezer is broken

View File

@ -1,92 +0,0 @@
// Copyright 2021 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package core
import (
"context"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/bitutil"
"github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
)
const (
// bloomThrottling is the time to wait between processing two consecutive index
// sections. It's useful during chain upgrades to prevent disk overload.
bloomThrottling = 100 * time.Millisecond
)
// BloomIndexer implements a core.ChainIndexer, building up a rotated bloom bits index
// for the Ethereum header bloom filters, permitting blazing fast filtering.
type BloomIndexer struct {
size uint64 // section size to generate bloombits for
db ethdb.Database // database instance to write index data and metadata into
gen *bloombits.Generator // generator to rotate the bloom bits crating the bloom index
section uint64 // Section is the section number being processed currently
head common.Hash // Head is the hash of the last header processed
}
// NewBloomIndexer returns a chain indexer that generates bloom bits data for the
// canonical chain for fast logs filtering.
func NewBloomIndexer(db ethdb.Database, size, confirms uint64) *ChainIndexer {
backend := &BloomIndexer{
db: db,
size: size,
}
table := rawdb.NewTable(db, string(rawdb.BloomBitsIndexPrefix))
return NewChainIndexer(db, table, backend, size, confirms, bloomThrottling, "bloombits")
}
// Reset implements core.ChainIndexerBackend, starting a new bloombits index
// section.
func (b *BloomIndexer) Reset(ctx context.Context, section uint64, lastSectionHead common.Hash) error {
gen, err := bloombits.NewGenerator(uint(b.size))
b.gen, b.section, b.head = gen, section, common.Hash{}
return err
}
// Process implements core.ChainIndexerBackend, adding a new header's bloom into
// the index.
func (b *BloomIndexer) Process(ctx context.Context, header *types.Header) error {
b.gen.AddBloom(uint(header.Number.Uint64()-b.section*b.size), header.Bloom)
b.head = header.Hash()
return nil
}
// Commit implements core.ChainIndexerBackend, finalizing the bloom section and
// writing it out into the database.
func (b *BloomIndexer) Commit() error {
batch := b.db.NewBatchWithSize((int(b.size) / 8) * types.BloomBitLength)
for i := 0; i < types.BloomBitLength; i++ {
bits, err := b.gen.Bitset(uint(i))
if err != nil {
return err
}
rawdb.WriteBloomBits(batch, uint(i), b.section, b.head, bitutil.CompressBytes(bits))
}
return batch.Write()
}
// Prune returns an empty error since we don't support pruning here.
func (b *BloomIndexer) Prune(threshold uint64) error {
return nil
}

View File

@ -1,18 +0,0 @@
// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
// Package bloombits implements bloom filtering on batches of data.
package bloombits

View File

@ -1,98 +0,0 @@
// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package bloombits
import (
"errors"
"github.com/ethereum/go-ethereum/core/types"
)
var (
// errSectionOutOfBounds is returned if the user tried to add more bloom filters
// to the batch than available space, or if tries to retrieve above the capacity.
errSectionOutOfBounds = errors.New("section out of bounds")
// errBloomBitOutOfBounds is returned if the user tried to retrieve specified
// bit bloom above the capacity.
errBloomBitOutOfBounds = errors.New("bloom bit out of bounds")
)
// Generator takes a number of bloom filters and generates the rotated bloom bits
// to be used for batched filtering.
type Generator struct {
blooms [types.BloomBitLength][]byte // Rotated blooms for per-bit matching
sections uint // Number of sections to batch together
nextSec uint // Next section to set when adding a bloom
}
// NewGenerator creates a rotated bloom generator that can iteratively fill a
// batched bloom filter's bits.
func NewGenerator(sections uint) (*Generator, error) {
if sections%8 != 0 {
return nil, errors.New("section count not multiple of 8")
}
b := &Generator{sections: sections}
for i := 0; i < types.BloomBitLength; i++ {
b.blooms[i] = make([]byte, sections/8)
}
return b, nil
}
// AddBloom takes a single bloom filter and sets the corresponding bit column
// in memory accordingly.
func (b *Generator) AddBloom(index uint, bloom types.Bloom) error {
// Make sure we're not adding more bloom filters than our capacity
if b.nextSec >= b.sections {
return errSectionOutOfBounds
}
if b.nextSec != index {
return errors.New("bloom filter with unexpected index")
}
// Rotate the bloom and insert into our collection
byteIndex := b.nextSec / 8
bitIndex := byte(7 - b.nextSec%8)
for byt := 0; byt < types.BloomByteLength; byt++ {
bloomByte := bloom[types.BloomByteLength-1-byt]
if bloomByte == 0 {
continue
}
base := 8 * byt
b.blooms[base+7][byteIndex] |= ((bloomByte >> 7) & 1) << bitIndex
b.blooms[base+6][byteIndex] |= ((bloomByte >> 6) & 1) << bitIndex
b.blooms[base+5][byteIndex] |= ((bloomByte >> 5) & 1) << bitIndex
b.blooms[base+4][byteIndex] |= ((bloomByte >> 4) & 1) << bitIndex
b.blooms[base+3][byteIndex] |= ((bloomByte >> 3) & 1) << bitIndex
b.blooms[base+2][byteIndex] |= ((bloomByte >> 2) & 1) << bitIndex
b.blooms[base+1][byteIndex] |= ((bloomByte >> 1) & 1) << bitIndex
b.blooms[base][byteIndex] |= (bloomByte & 1) << bitIndex
}
b.nextSec++
return nil
}
// Bitset returns the bit vector belonging to the given bit index after all
// blooms have been added.
func (b *Generator) Bitset(idx uint) ([]byte, error) {
if b.nextSec != b.sections {
return nil, errors.New("bloom not fully generated yet")
}
if idx >= types.BloomBitLength {
return nil, errBloomBitOutOfBounds
}
return b.blooms[idx], nil
}

View File

@ -1,100 +0,0 @@
// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package bloombits
import (
"bytes"
crand "crypto/rand"
"math/rand"
"testing"
"github.com/ethereum/go-ethereum/core/types"
)
// Tests that batched bloom bits are correctly rotated from the input bloom
// filters.
func TestGenerator(t *testing.T) {
// Generate the input and the rotated output
var input, output [types.BloomBitLength][types.BloomByteLength]byte
for i := 0; i < types.BloomBitLength; i++ {
for j := 0; j < types.BloomBitLength; j++ {
bit := byte(rand.Int() % 2)
input[i][j/8] |= bit << byte(7-j%8)
output[types.BloomBitLength-1-j][i/8] |= bit << byte(7-i%8)
}
}
// Crunch the input through the generator and verify the result
gen, err := NewGenerator(types.BloomBitLength)
if err != nil {
t.Fatalf("failed to create bloombit generator: %v", err)
}
for i, bloom := range input {
if err := gen.AddBloom(uint(i), bloom); err != nil {
t.Fatalf("bloom %d: failed to add: %v", i, err)
}
}
for i, want := range output {
have, err := gen.Bitset(uint(i))
if err != nil {
t.Fatalf("output %d: failed to retrieve bits: %v", i, err)
}
if !bytes.Equal(have, want[:]) {
t.Errorf("output %d: bit vector mismatch have %x, want %x", i, have, want)
}
}
}
func BenchmarkGenerator(b *testing.B) {
var input [types.BloomBitLength][types.BloomByteLength]byte
b.Run("empty", func(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
// Crunch the input through the generator and verify the result
gen, err := NewGenerator(types.BloomBitLength)
if err != nil {
b.Fatalf("failed to create bloombit generator: %v", err)
}
for j, bloom := range &input {
if err := gen.AddBloom(uint(j), bloom); err != nil {
b.Fatalf("bloom %d: failed to add: %v", i, err)
}
}
}
})
for i := 0; i < types.BloomBitLength; i++ {
crand.Read(input[i][:])
}
b.Run("random", func(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
// Crunch the input through the generator and verify the result
gen, err := NewGenerator(types.BloomBitLength)
if err != nil {
b.Fatalf("failed to create bloombit generator: %v", err)
}
for j, bloom := range &input {
if err := gen.AddBloom(uint(j), bloom); err != nil {
b.Fatalf("bloom %d: failed to add: %v", i, err)
}
}
}
})
}

View File

@ -1,649 +0,0 @@
// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package bloombits
import (
"bytes"
"context"
"errors"
"math"
"sort"
"sync"
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum/common/bitutil"
"github.com/ethereum/go-ethereum/crypto"
)
// bloomIndexes represents the bit indexes inside the bloom filter that belong
// to some key.
type bloomIndexes [3]uint
// calcBloomIndexes returns the bloom filter bit indexes belonging to the given key.
func calcBloomIndexes(b []byte) bloomIndexes {
b = crypto.Keccak256(b)
var idxs bloomIndexes
for i := 0; i < len(idxs); i++ {
idxs[i] = (uint(b[2*i])<<8)&2047 + uint(b[2*i+1])
}
return idxs
}
// partialMatches with a non-nil vector represents a section in which some sub-
// matchers have already found potential matches. Subsequent sub-matchers will
// binary AND their matches with this vector. If vector is nil, it represents a
// section to be processed by the first sub-matcher.
type partialMatches struct {
section uint64
bitset []byte
}
// Retrieval represents a request for retrieval task assignments for a given
// bit with the given number of fetch elements, or a response for such a request.
// It can also have the actual results set to be used as a delivery data struct.
//
// The context and error fields are used by the light client to terminate matching
// early if an error is encountered on some path of the pipeline.
type Retrieval struct {
Bit uint
Sections []uint64
Bitsets [][]byte
Context context.Context
Error error
}
// Matcher is a pipelined system of schedulers and logic matchers which perform
// binary AND/OR operations on the bit-streams, creating a stream of potential
// blocks to inspect for data content.
type Matcher struct {
sectionSize uint64 // Size of the data batches to filter on
filters [][]bloomIndexes // Filter the system is matching for
schedulers map[uint]*scheduler // Retrieval schedulers for loading bloom bits
retrievers chan chan uint // Retriever processes waiting for bit allocations
counters chan chan uint // Retriever processes waiting for task count reports
retrievals chan chan *Retrieval // Retriever processes waiting for task allocations
deliveries chan *Retrieval // Retriever processes waiting for task response deliveries
running atomic.Bool // Atomic flag whether a session is live or not
}
// NewMatcher creates a new pipeline for retrieving bloom bit streams and doing
// address and topic filtering on them. Setting a filter component to `nil` is
// allowed and will result in that filter rule being skipped (OR 0x11...1).
func NewMatcher(sectionSize uint64, filters [][][]byte) *Matcher {
// Create the matcher instance
m := &Matcher{
sectionSize: sectionSize,
schedulers: make(map[uint]*scheduler),
retrievers: make(chan chan uint),
counters: make(chan chan uint),
retrievals: make(chan chan *Retrieval),
deliveries: make(chan *Retrieval),
}
// Calculate the bloom bit indexes for the groups we're interested in
m.filters = nil
for _, filter := range filters {
// Gather the bit indexes of the filter rule, special casing the nil filter
if len(filter) == 0 {
continue
}
bloomBits := make([]bloomIndexes, len(filter))
for i, clause := range filter {
if clause == nil {
bloomBits = nil
break
}
bloomBits[i] = calcBloomIndexes(clause)
}
// Accumulate the filter rules if no nil rule was within
if bloomBits != nil {
m.filters = append(m.filters, bloomBits)
}
}
// For every bit, create a scheduler to load/download the bit vectors
for _, bloomIndexLists := range m.filters {
for _, bloomIndexList := range bloomIndexLists {
for _, bloomIndex := range bloomIndexList {
m.addScheduler(bloomIndex)
}
}
}
return m
}
// addScheduler adds a bit stream retrieval scheduler for the given bit index if
// it has not existed before. If the bit is already selected for filtering, the
// existing scheduler can be used.
func (m *Matcher) addScheduler(idx uint) {
if _, ok := m.schedulers[idx]; ok {
return
}
m.schedulers[idx] = newScheduler(idx)
}
// Start starts the matching process and returns a stream of bloom matches in
// a given range of blocks. If there are no more matches in the range, the result
// channel is closed.
func (m *Matcher) Start(ctx context.Context, begin, end uint64, results chan uint64) (*MatcherSession, error) {
// Make sure we're not creating concurrent sessions
if m.running.Swap(true) {
return nil, errors.New("matcher already running")
}
defer m.running.Store(false)
// Initiate a new matching round
session := &MatcherSession{
matcher: m,
quit: make(chan struct{}),
ctx: ctx,
}
for _, scheduler := range m.schedulers {
scheduler.reset()
}
sink := m.run(begin, end, cap(results), session)
// Read the output from the result sink and deliver to the user
session.pend.Add(1)
go func() {
defer session.pend.Done()
defer close(results)
for {
select {
case <-session.quit:
return
case res, ok := <-sink:
// New match result found
if !ok {
return
}
// Calculate the first and last blocks of the section
sectionStart := res.section * m.sectionSize
first := sectionStart
if begin > first {
first = begin
}
last := sectionStart + m.sectionSize - 1
if end < last {
last = end
}
// Iterate over all the blocks in the section and return the matching ones
for i := first; i <= last; i++ {
// Skip the entire byte if no matches are found inside (and we're processing an entire byte!)
next := res.bitset[(i-sectionStart)/8]
if next == 0 {
if i%8 == 0 {
i += 7
}
continue
}
// Some bit it set, do the actual submatching
if bit := 7 - i%8; next&(1<<bit) != 0 {
select {
case <-session.quit:
return
case results <- i:
}
}
}
}
}
}()
return session, nil
}
// run creates a daisy-chain of sub-matchers, one for the address set and one
// for each topic set, each sub-matcher receiving a section only if the previous
// ones have all found a potential match in one of the blocks of the section,
// then binary AND-ing its own matches and forwarding the result to the next one.
//
// The method starts feeding the section indexes into the first sub-matcher on a
// new goroutine and returns a sink channel receiving the results.
func (m *Matcher) run(begin, end uint64, buffer int, session *MatcherSession) chan *partialMatches {
// Create the source channel and feed section indexes into
source := make(chan *partialMatches, buffer)
session.pend.Add(1)
go func() {
defer session.pend.Done()
defer close(source)
for i := begin / m.sectionSize; i <= end/m.sectionSize; i++ {
select {
case <-session.quit:
return
case source <- &partialMatches{i, bytes.Repeat([]byte{0xff}, int(m.sectionSize/8))}:
}
}
}()
// Assemble the daisy-chained filtering pipeline
next := source
dist := make(chan *request, buffer)
for _, bloom := range m.filters {
next = m.subMatch(next, dist, bloom, session)
}
// Start the request distribution
session.pend.Add(1)
go m.distributor(dist, session)
return next
}
// subMatch creates a sub-matcher that filters for a set of addresses or topics, binary OR-s those matches, then
// binary AND-s the result to the daisy-chain input (source) and forwards it to the daisy-chain output.
// The matches of each address/topic are calculated by fetching the given sections of the three bloom bit indexes belonging to
// that address/topic, and binary AND-ing those vectors together.
func (m *Matcher) subMatch(source chan *partialMatches, dist chan *request, bloom []bloomIndexes, session *MatcherSession) chan *partialMatches {
// Start the concurrent schedulers for each bit required by the bloom filter
sectionSources := make([][3]chan uint64, len(bloom))
sectionSinks := make([][3]chan []byte, len(bloom))
for i, bits := range bloom {
for j, bit := range bits {
sectionSources[i][j] = make(chan uint64, cap(source))
sectionSinks[i][j] = make(chan []byte, cap(source))
m.schedulers[bit].run(sectionSources[i][j], dist, sectionSinks[i][j], session.quit, &session.pend)
}
}
process := make(chan *partialMatches, cap(source)) // entries from source are forwarded here after fetches have been initiated
results := make(chan *partialMatches, cap(source))
session.pend.Add(2)
go func() {
// Tear down the goroutine and terminate all source channels
defer session.pend.Done()
defer close(process)
defer func() {
for _, bloomSources := range sectionSources {
for _, bitSource := range bloomSources {
close(bitSource)
}
}
}()
// Read sections from the source channel and multiplex into all bit-schedulers
for {
select {
case <-session.quit:
return
case subres, ok := <-source:
// New subresult from previous link
if !ok {
return
}
// Multiplex the section index to all bit-schedulers
for _, bloomSources := range sectionSources {
for _, bitSource := range bloomSources {
select {
case <-session.quit:
return
case bitSource <- subres.section:
}
}
}
// Notify the processor that this section will become available
select {
case <-session.quit:
return
case process <- subres:
}
}
}
}()
go func() {
// Tear down the goroutine and terminate the final sink channel
defer session.pend.Done()
defer close(results)
// Read the source notifications and collect the delivered results
for {
select {
case <-session.quit:
return
case subres, ok := <-process:
// Notified of a section being retrieved
if !ok {
return
}
// Gather all the sub-results and merge them together
var orVector []byte
for _, bloomSinks := range sectionSinks {
var andVector []byte
for _, bitSink := range bloomSinks {
var data []byte
select {
case <-session.quit:
return
case data = <-bitSink:
}
if andVector == nil {
andVector = make([]byte, int(m.sectionSize/8))
copy(andVector, data)
} else {
bitutil.ANDBytes(andVector, andVector, data)
}
}
if orVector == nil {
orVector = andVector
} else {
bitutil.ORBytes(orVector, orVector, andVector)
}
}
if orVector == nil {
orVector = make([]byte, int(m.sectionSize/8))
}
if subres.bitset != nil {
bitutil.ANDBytes(orVector, orVector, subres.bitset)
}
if bitutil.TestBytes(orVector) {
select {
case <-session.quit:
return
case results <- &partialMatches{subres.section, orVector}:
}
}
}
}
}()
return results
}
// distributor receives requests from the schedulers and queues them into a set
// of pending requests, which are assigned to retrievers wanting to fulfil them.
func (m *Matcher) distributor(dist chan *request, session *MatcherSession) {
defer session.pend.Done()
var (
requests = make(map[uint][]uint64) // Per-bit list of section requests, ordered by section number
unallocs = make(map[uint]struct{}) // Bits with pending requests but not allocated to any retriever
retrievers chan chan uint // Waiting retrievers (toggled to nil if unallocs is empty)
allocs int // Number of active allocations to handle graceful shutdown requests
shutdown = session.quit // Shutdown request channel, will gracefully wait for pending requests
)
// assign is a helper method to try to assign a pending bit an actively
// listening servicer, or schedule it up for later when one arrives.
assign := func(bit uint) {
select {
case fetcher := <-m.retrievers:
allocs++
fetcher <- bit
default:
// No retrievers active, start listening for new ones
retrievers = m.retrievers
unallocs[bit] = struct{}{}
}
}
for {
select {
case <-shutdown:
// Shutdown requested. No more retrievers can be allocated,
// but we still need to wait until all pending requests have returned.
shutdown = nil
if allocs == 0 {
return
}
case req := <-dist:
// New retrieval request arrived to be distributed to some fetcher process
queue := requests[req.bit]
index := sort.Search(len(queue), func(i int) bool { return queue[i] >= req.section })
requests[req.bit] = append(queue[:index], append([]uint64{req.section}, queue[index:]...)...)
// If it's a new bit and we have waiting fetchers, allocate to them
if len(queue) == 0 {
assign(req.bit)
}
case fetcher := <-retrievers:
// New retriever arrived, find the lowest section-ed bit to assign
bit, best := uint(0), uint64(math.MaxUint64)
for idx := range unallocs {
if requests[idx][0] < best {
bit, best = idx, requests[idx][0]
}
}
// Stop tracking this bit (and alloc notifications if no more work is available)
delete(unallocs, bit)
if len(unallocs) == 0 {
retrievers = nil
}
allocs++
fetcher <- bit
case fetcher := <-m.counters:
// New task count request arrives, return number of items
fetcher <- uint(len(requests[<-fetcher]))
case fetcher := <-m.retrievals:
// New fetcher waiting for tasks to retrieve, assign
task := <-fetcher
if want := len(task.Sections); want >= len(requests[task.Bit]) {
task.Sections = requests[task.Bit]
delete(requests, task.Bit)
} else {
task.Sections = append(task.Sections[:0], requests[task.Bit][:want]...)
requests[task.Bit] = append(requests[task.Bit][:0], requests[task.Bit][want:]...)
}
fetcher <- task
// If anything was left unallocated, try to assign to someone else
if len(requests[task.Bit]) > 0 {
assign(task.Bit)
}
case result := <-m.deliveries:
// New retrieval task response from fetcher, split out missing sections and
// deliver complete ones
var (
sections = make([]uint64, 0, len(result.Sections))
bitsets = make([][]byte, 0, len(result.Bitsets))
missing = make([]uint64, 0, len(result.Sections))
)
for i, bitset := range result.Bitsets {
if len(bitset) == 0 {
missing = append(missing, result.Sections[i])
continue
}
sections = append(sections, result.Sections[i])
bitsets = append(bitsets, bitset)
}
m.schedulers[result.Bit].deliver(sections, bitsets)
allocs--
// Reschedule missing sections and allocate bit if newly available
if len(missing) > 0 {
queue := requests[result.Bit]
for _, section := range missing {
index := sort.Search(len(queue), func(i int) bool { return queue[i] >= section })
queue = append(queue[:index], append([]uint64{section}, queue[index:]...)...)
}
requests[result.Bit] = queue
if len(queue) == len(missing) {
assign(result.Bit)
}
}
// End the session when all pending deliveries have arrived.
if shutdown == nil && allocs == 0 {
return
}
}
}
}
// MatcherSession is returned by a started matcher to be used as a terminator
// for the actively running matching operation.
type MatcherSession struct {
matcher *Matcher
closer sync.Once // Sync object to ensure we only ever close once
quit chan struct{} // Quit channel to request pipeline termination
ctx context.Context // Context used by the light client to abort filtering
err error // Global error to track retrieval failures deep in the chain
errLock sync.Mutex
pend sync.WaitGroup
}
// Close stops the matching process and waits for all subprocesses to terminate
// before returning. The timeout may be used for graceful shutdown, allowing the
// currently running retrievals to complete before this time.
func (s *MatcherSession) Close() {
s.closer.Do(func() {
// Signal termination and wait for all goroutines to tear down
close(s.quit)
s.pend.Wait()
})
}
// Error returns any failure encountered during the matching session.
func (s *MatcherSession) Error() error {
s.errLock.Lock()
defer s.errLock.Unlock()
return s.err
}
// allocateRetrieval assigns a bloom bit index to a client process that can either
// immediately request and fetch the section contents assigned to this bit or wait
// a little while for more sections to be requested.
func (s *MatcherSession) allocateRetrieval() (uint, bool) {
fetcher := make(chan uint)
select {
case <-s.quit:
return 0, false
case s.matcher.retrievers <- fetcher:
bit, ok := <-fetcher
return bit, ok
}
}
// pendingSections returns the number of pending section retrievals belonging to
// the given bloom bit index.
func (s *MatcherSession) pendingSections(bit uint) int {
fetcher := make(chan uint)
select {
case <-s.quit:
return 0
case s.matcher.counters <- fetcher:
fetcher <- bit
return int(<-fetcher)
}
}
// allocateSections assigns all or part of an already allocated bit-task queue
// to the requesting process.
func (s *MatcherSession) allocateSections(bit uint, count int) []uint64 {
fetcher := make(chan *Retrieval)
select {
case <-s.quit:
return nil
case s.matcher.retrievals <- fetcher:
task := &Retrieval{
Bit: bit,
Sections: make([]uint64, count),
}
fetcher <- task
return (<-fetcher).Sections
}
}
// deliverSections delivers a batch of section bit-vectors for a specific bloom
// bit index to be injected into the processing pipeline.
func (s *MatcherSession) deliverSections(bit uint, sections []uint64, bitsets [][]byte) {
s.matcher.deliveries <- &Retrieval{Bit: bit, Sections: sections, Bitsets: bitsets}
}
// Multiplex polls the matcher session for retrieval tasks and multiplexes it into
// the requested retrieval queue to be serviced together with other sessions.
//
// This method will block for the lifetime of the session. Even after termination
// of the session, any request in-flight need to be responded to! Empty responses
// are fine though in that case.
func (s *MatcherSession) Multiplex(batch int, wait time.Duration, mux chan chan *Retrieval) {
waitTimer := time.NewTimer(wait)
defer waitTimer.Stop()
for {
// Allocate a new bloom bit index to retrieve data for, stopping when done
bit, ok := s.allocateRetrieval()
if !ok {
return
}
// Bit allocated, throttle a bit if we're below our batch limit
if s.pendingSections(bit) < batch {
waitTimer.Reset(wait)
select {
case <-s.quit:
// Session terminating, we can't meaningfully service, abort
s.allocateSections(bit, 0)
s.deliverSections(bit, []uint64{}, [][]byte{})
return
case <-waitTimer.C:
// Throttling up, fetch whatever is available
}
}
// Allocate as much as we can handle and request servicing
sections := s.allocateSections(bit, batch)
request := make(chan *Retrieval)
select {
case <-s.quit:
// Session terminating, we can't meaningfully service, abort
s.deliverSections(bit, sections, make([][]byte, len(sections)))
return
case mux <- request:
// Retrieval accepted, something must arrive before we're aborting
request <- &Retrieval{Bit: bit, Sections: sections, Context: s.ctx}
result := <-request
// Deliver a result before s.Close() to avoid a deadlock
s.deliverSections(result.Bit, result.Sections, result.Bitsets)
if result.Error != nil {
s.errLock.Lock()
s.err = result.Error
s.errLock.Unlock()
s.Close()
}
}
}
}

View File

@ -1,292 +0,0 @@
// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package bloombits
import (
"context"
"math/rand"
"sync/atomic"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
)
const testSectionSize = 4096
// Tests that wildcard filter rules (nil) can be specified and are handled well.
func TestMatcherWildcards(t *testing.T) {
t.Parallel()
matcher := NewMatcher(testSectionSize, [][][]byte{
{common.Address{}.Bytes(), common.Address{0x01}.Bytes()}, // Default address is not a wildcard
{common.Hash{}.Bytes(), common.Hash{0x01}.Bytes()}, // Default hash is not a wildcard
{common.Hash{0x01}.Bytes()}, // Plain rule, sanity check
{common.Hash{0x01}.Bytes(), nil}, // Wildcard suffix, drop rule
{nil, common.Hash{0x01}.Bytes()}, // Wildcard prefix, drop rule
{nil, nil}, // Wildcard combo, drop rule
{}, // Inited wildcard rule, drop rule
nil, // Proper wildcard rule, drop rule
})
if len(matcher.filters) != 3 {
t.Fatalf("filter system size mismatch: have %d, want %d", len(matcher.filters), 3)
}
if len(matcher.filters[0]) != 2 {
t.Fatalf("address clause size mismatch: have %d, want %d", len(matcher.filters[0]), 2)
}
if len(matcher.filters[1]) != 2 {
t.Fatalf("combo topic clause size mismatch: have %d, want %d", len(matcher.filters[1]), 2)
}
if len(matcher.filters[2]) != 1 {
t.Fatalf("singletone topic clause size mismatch: have %d, want %d", len(matcher.filters[2]), 1)
}
}
// Tests the matcher pipeline on a single continuous workflow without interrupts.
func TestMatcherContinuous(t *testing.T) {
t.Parallel()
testMatcherDiffBatches(t, [][]bloomIndexes{{{10, 20, 30}}}, 0, 100000, false, 75)
testMatcherDiffBatches(t, [][]bloomIndexes{{{32, 3125, 100}}, {{40, 50, 10}}}, 0, 100000, false, 81)
testMatcherDiffBatches(t, [][]bloomIndexes{{{4, 8, 11}, {7, 8, 17}}, {{9, 9, 12}, {15, 20, 13}}, {{18, 15, 15}, {12, 10, 4}}}, 0, 10000, false, 36)
}
// Tests the matcher pipeline on a constantly interrupted and resumed work pattern
// with the aim of ensuring data items are requested only once.
func TestMatcherIntermittent(t *testing.T) {
t.Parallel()
testMatcherDiffBatches(t, [][]bloomIndexes{{{10, 20, 30}}}, 0, 100000, true, 75)
testMatcherDiffBatches(t, [][]bloomIndexes{{{32, 3125, 100}}, {{40, 50, 10}}}, 0, 100000, true, 81)
testMatcherDiffBatches(t, [][]bloomIndexes{{{4, 8, 11}, {7, 8, 17}}, {{9, 9, 12}, {15, 20, 13}}, {{18, 15, 15}, {12, 10, 4}}}, 0, 10000, true, 36)
}
// Tests the matcher pipeline on random input to hopefully catch anomalies.
func TestMatcherRandom(t *testing.T) {
t.Parallel()
for i := 0; i < 10; i++ {
testMatcherBothModes(t, makeRandomIndexes([]int{1}, 50), 0, 10000, 0)
testMatcherBothModes(t, makeRandomIndexes([]int{3}, 50), 0, 10000, 0)
testMatcherBothModes(t, makeRandomIndexes([]int{2, 2, 2}, 20), 0, 10000, 0)
testMatcherBothModes(t, makeRandomIndexes([]int{5, 5, 5}, 50), 0, 10000, 0)
testMatcherBothModes(t, makeRandomIndexes([]int{4, 4, 4}, 20), 0, 10000, 0)
}
}
// Tests that the matcher can properly find matches if the starting block is
// shifted from a multiple of 8. This is needed to cover an optimisation with
// bitset matching https://github.com/ethereum/go-ethereum/issues/15309.
func TestMatcherShifted(t *testing.T) {
t.Parallel()
// Block 0 always matches in the tests, skip ahead of first 8 blocks with the
// start to get a potential zero byte in the matcher bitset.
// To keep the second bitset byte zero, the filter must only match for the first
// time in block 16, so doing an all-16 bit filter should suffice.
// To keep the starting block non divisible by 8, block number 9 is the first
// that would introduce a shift and not match block 0.
testMatcherBothModes(t, [][]bloomIndexes{{{16, 16, 16}}}, 9, 64, 0)
}
// Tests that matching on everything doesn't crash (special case internally).
func TestWildcardMatcher(t *testing.T) {
t.Parallel()
testMatcherBothModes(t, nil, 0, 10000, 0)
}
// makeRandomIndexes generates a random filter system, composed of multiple filter
// criteria, each having one bloom list component for the address and arbitrarily
// many topic bloom list components.
func makeRandomIndexes(lengths []int, max int) [][]bloomIndexes {
res := make([][]bloomIndexes, len(lengths))
for i, topics := range lengths {
res[i] = make([]bloomIndexes, topics)
for j := 0; j < topics; j++ {
for k := 0; k < len(res[i][j]); k++ {
res[i][j][k] = uint(rand.Intn(max-1) + 2)
}
}
}
return res
}
// testMatcherDiffBatches runs the given matches test in single-delivery and also
// in batches delivery mode, verifying that all kinds of deliveries are handled
// correctly within.
func testMatcherDiffBatches(t *testing.T, filter [][]bloomIndexes, start, blocks uint64, intermittent bool, retrievals uint32) {
singleton := testMatcher(t, filter, start, blocks, intermittent, retrievals, 1)
batched := testMatcher(t, filter, start, blocks, intermittent, retrievals, 16)
if singleton != batched {
t.Errorf("filter = %v blocks = %v intermittent = %v: request count mismatch, %v in singleton vs. %v in batched mode", filter, blocks, intermittent, singleton, batched)
}
}
// testMatcherBothModes runs the given matcher test in both continuous as well as
// in intermittent mode, verifying that the request counts match each other.
func testMatcherBothModes(t *testing.T, filter [][]bloomIndexes, start, blocks uint64, retrievals uint32) {
continuous := testMatcher(t, filter, start, blocks, false, retrievals, 16)
intermittent := testMatcher(t, filter, start, blocks, true, retrievals, 16)
if continuous != intermittent {
t.Errorf("filter = %v blocks = %v: request count mismatch, %v in continuous vs. %v in intermittent mode", filter, blocks, continuous, intermittent)
}
}
// testMatcher is a generic tester to run the given matcher test and return the
// number of requests made for cross validation between different modes.
func testMatcher(t *testing.T, filter [][]bloomIndexes, start, blocks uint64, intermittent bool, retrievals uint32, maxReqCount int) uint32 {
// Create a new matcher an simulate our explicit random bitsets
matcher := NewMatcher(testSectionSize, nil)
matcher.filters = filter
for _, rule := range filter {
for _, topic := range rule {
for _, bit := range topic {
matcher.addScheduler(bit)
}
}
}
// Track the number of retrieval requests made
var requested atomic.Uint32
// Start the matching session for the filter and the retriever goroutines
quit := make(chan struct{})
matches := make(chan uint64, 16)
session, err := matcher.Start(context.Background(), start, blocks-1, matches)
if err != nil {
t.Fatalf("failed to stat matcher session: %v", err)
}
startRetrievers(session, quit, &requested, maxReqCount)
// Iterate over all the blocks and verify that the pipeline produces the correct matches
for i := start; i < blocks; i++ {
if expMatch3(filter, i) {
match, ok := <-matches
if !ok {
t.Errorf("filter = %v blocks = %v intermittent = %v: expected #%v, results channel closed", filter, blocks, intermittent, i)
return 0
}
if match != i {
t.Errorf("filter = %v blocks = %v intermittent = %v: expected #%v, got #%v", filter, blocks, intermittent, i, match)
}
// If we're testing intermittent mode, abort and restart the pipeline
if intermittent {
session.Close()
close(quit)
quit = make(chan struct{})
matches = make(chan uint64, 16)
session, err = matcher.Start(context.Background(), i+1, blocks-1, matches)
if err != nil {
t.Fatalf("failed to stat matcher session: %v", err)
}
startRetrievers(session, quit, &requested, maxReqCount)
}
}
}
// Ensure the result channel is torn down after the last block
match, ok := <-matches
if ok {
t.Errorf("filter = %v blocks = %v intermittent = %v: expected closed channel, got #%v", filter, blocks, intermittent, match)
}
// Clean up the session and ensure we match the expected retrieval count
session.Close()
close(quit)
if retrievals != 0 && requested.Load() != retrievals {
t.Errorf("filter = %v blocks = %v intermittent = %v: request count mismatch, have #%v, want #%v", filter, blocks, intermittent, requested.Load(), retrievals)
}
return requested.Load()
}
// startRetrievers starts a batch of goroutines listening for section requests
// and serving them.
func startRetrievers(session *MatcherSession, quit chan struct{}, retrievals *atomic.Uint32, batch int) {
requests := make(chan chan *Retrieval)
for i := 0; i < 10; i++ {
// Start a multiplexer to test multiple threaded execution
go session.Multiplex(batch, 100*time.Microsecond, requests)
// Start a services to match the above multiplexer
go func() {
for {
// Wait for a service request or a shutdown
select {
case <-quit:
return
case request := <-requests:
task := <-request
task.Bitsets = make([][]byte, len(task.Sections))
for i, section := range task.Sections {
if rand.Int()%4 != 0 { // Handle occasional missing deliveries
task.Bitsets[i] = generateBitset(task.Bit, section)
retrievals.Add(1)
}
}
request <- task
}
}
}()
}
}
// generateBitset generates the rotated bitset for the given bloom bit and section
// numbers.
func generateBitset(bit uint, section uint64) []byte {
bitset := make([]byte, testSectionSize/8)
for i := 0; i < len(bitset); i++ {
for b := 0; b < 8; b++ {
blockIdx := section*testSectionSize + uint64(i*8+b)
bitset[i] += bitset[i]
if (blockIdx % uint64(bit)) == 0 {
bitset[i]++
}
}
}
return bitset
}
func expMatch1(filter bloomIndexes, i uint64) bool {
for _, ii := range filter {
if (i % uint64(ii)) != 0 {
return false
}
}
return true
}
func expMatch2(filter []bloomIndexes, i uint64) bool {
for _, ii := range filter {
if expMatch1(ii, i) {
return true
}
}
return false
}
func expMatch3(filter [][]bloomIndexes, i uint64) bool {
for _, ii := range filter {
if !expMatch2(ii, i) {
return false
}
}
return true
}

View File

@ -1,181 +0,0 @@
// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package bloombits
import (
"sync"
)
// request represents a bloom retrieval task to prioritize and pull from the local
// database or remotely from the network.
type request struct {
section uint64 // Section index to retrieve the bit-vector from
bit uint // Bit index within the section to retrieve the vector of
}
// response represents the state of a requested bit-vector through a scheduler.
type response struct {
cached []byte // Cached bits to dedup multiple requests
done chan struct{} // Channel to allow waiting for completion
}
// scheduler handles the scheduling of bloom-filter retrieval operations for
// entire section-batches belonging to a single bloom bit. Beside scheduling the
// retrieval operations, this struct also deduplicates the requests and caches
// the results to minimize network/database overhead even in complex filtering
// scenarios.
type scheduler struct {
bit uint // Index of the bit in the bloom filter this scheduler is responsible for
responses map[uint64]*response // Currently pending retrieval requests or already cached responses
lock sync.Mutex // Lock protecting the responses from concurrent access
}
// newScheduler creates a new bloom-filter retrieval scheduler for a specific
// bit index.
func newScheduler(idx uint) *scheduler {
return &scheduler{
bit: idx,
responses: make(map[uint64]*response),
}
}
// run creates a retrieval pipeline, receiving section indexes from sections and
// returning the results in the same order through the done channel. Concurrent
// runs of the same scheduler are allowed, leading to retrieval task deduplication.
func (s *scheduler) run(sections chan uint64, dist chan *request, done chan []byte, quit chan struct{}, wg *sync.WaitGroup) {
// Create a forwarder channel between requests and responses of the same size as
// the distribution channel (since that will block the pipeline anyway).
pend := make(chan uint64, cap(dist))
// Start the pipeline schedulers to forward between user -> distributor -> user
wg.Add(2)
go s.scheduleRequests(sections, dist, pend, quit, wg)
go s.scheduleDeliveries(pend, done, quit, wg)
}
// reset cleans up any leftovers from previous runs. This is required before a
// restart to ensure the no previously requested but never delivered state will
// cause a lockup.
func (s *scheduler) reset() {
s.lock.Lock()
defer s.lock.Unlock()
for section, res := range s.responses {
if res.cached == nil {
delete(s.responses, section)
}
}
}
// scheduleRequests reads section retrieval requests from the input channel,
// deduplicates the stream and pushes unique retrieval tasks into the distribution
// channel for a database or network layer to honour.
func (s *scheduler) scheduleRequests(reqs chan uint64, dist chan *request, pend chan uint64, quit chan struct{}, wg *sync.WaitGroup) {
// Clean up the goroutine and pipeline when done
defer wg.Done()
defer close(pend)
// Keep reading and scheduling section requests
for {
select {
case <-quit:
return
case section, ok := <-reqs:
// New section retrieval requested
if !ok {
return
}
// Deduplicate retrieval requests
unique := false
s.lock.Lock()
if s.responses[section] == nil {
s.responses[section] = &response{
done: make(chan struct{}),
}
unique = true
}
s.lock.Unlock()
// Schedule the section for retrieval and notify the deliverer to expect this section
if unique {
select {
case <-quit:
return
case dist <- &request{bit: s.bit, section: section}:
}
}
select {
case <-quit:
return
case pend <- section:
}
}
}
}
// scheduleDeliveries reads section acceptance notifications and waits for them
// to be delivered, pushing them into the output data buffer.
func (s *scheduler) scheduleDeliveries(pend chan uint64, done chan []byte, quit chan struct{}, wg *sync.WaitGroup) {
// Clean up the goroutine and pipeline when done
defer wg.Done()
defer close(done)
// Keep reading notifications and scheduling deliveries
for {
select {
case <-quit:
return
case idx, ok := <-pend:
// New section retrieval pending
if !ok {
return
}
// Wait until the request is honoured
s.lock.Lock()
res := s.responses[idx]
s.lock.Unlock()
select {
case <-quit:
return
case <-res.done:
}
// Deliver the result
select {
case <-quit:
return
case done <- res.cached:
}
}
}
}
// deliver is called by the request distributor when a reply to a request arrives.
func (s *scheduler) deliver(sections []uint64, data [][]byte) {
s.lock.Lock()
defer s.lock.Unlock()
for i, section := range sections {
if res := s.responses[section]; res != nil && res.cached == nil { // Avoid non-requests and double deliveries
res.cached = data[i]
close(res.done)
}
}
}

View File

@ -1,103 +0,0 @@
// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package bloombits
import (
"bytes"
"math/big"
"sync"
"sync/atomic"
"testing"
)
// Tests that the scheduler can deduplicate and forward retrieval requests to
// underlying fetchers and serve responses back, irrelevant of the concurrency
// of the requesting clients or serving data fetchers.
func TestSchedulerSingleClientSingleFetcher(t *testing.T) { testScheduler(t, 1, 1, 5000) }
func TestSchedulerSingleClientMultiFetcher(t *testing.T) { testScheduler(t, 1, 10, 5000) }
func TestSchedulerMultiClientSingleFetcher(t *testing.T) { testScheduler(t, 10, 1, 5000) }
func TestSchedulerMultiClientMultiFetcher(t *testing.T) { testScheduler(t, 10, 10, 5000) }
func testScheduler(t *testing.T, clients int, fetchers int, requests int) {
t.Parallel()
f := newScheduler(0)
// Create a batch of handler goroutines that respond to bloom bit requests and
// deliver them to the scheduler.
var fetchPend sync.WaitGroup
fetchPend.Add(fetchers)
defer fetchPend.Wait()
fetch := make(chan *request, 16)
defer close(fetch)
var delivered atomic.Uint32
for i := 0; i < fetchers; i++ {
go func() {
defer fetchPend.Done()
for req := range fetch {
delivered.Add(1)
f.deliver([]uint64{
req.section + uint64(requests), // Non-requested data (ensure it doesn't go out of bounds)
req.section, // Requested data
req.section, // Duplicated data (ensure it doesn't double close anything)
}, [][]byte{
{},
new(big.Int).SetUint64(req.section).Bytes(),
new(big.Int).SetUint64(req.section).Bytes(),
})
}
}()
}
// Start a batch of goroutines to concurrently run scheduling tasks
quit := make(chan struct{})
var pend sync.WaitGroup
pend.Add(clients)
for i := 0; i < clients; i++ {
go func() {
defer pend.Done()
in := make(chan uint64, 16)
out := make(chan []byte, 16)
f.run(in, fetch, out, quit, &pend)
go func() {
for j := 0; j < requests; j++ {
in <- uint64(j)
}
close(in)
}()
b := new(big.Int)
for j := 0; j < requests; j++ {
bits := <-out
if want := b.SetUint64(uint64(j)).Bytes(); !bytes.Equal(bits, want) {
t.Errorf("vector %d: delivered content mismatch: have %x, want %x", j, bits, want)
}
}
}()
}
pend.Wait()
if have := delivered.Load(); int(have) != requests {
t.Errorf("request count mismatch: have %v, want %v", have, requests)
}
}

View File

@ -1,522 +0,0 @@
// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package core
import (
"context"
"encoding/binary"
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
)
// ChainIndexerBackend defines the methods needed to process chain segments in
// the background and write the segment results into the database. These can be
// used to create filter blooms or CHTs.
type ChainIndexerBackend interface {
// Reset initiates the processing of a new chain segment, potentially terminating
// any partially completed operations (in case of a reorg).
Reset(ctx context.Context, section uint64, prevHead common.Hash) error
// Process crunches through the next header in the chain segment. The caller
// will ensure a sequential order of headers.
Process(ctx context.Context, header *types.Header) error
// Commit finalizes the section metadata and stores it into the database.
Commit() error
// Prune deletes the chain index older than the given threshold.
Prune(threshold uint64) error
}
// ChainIndexerChain interface is used for connecting the indexer to a blockchain
type ChainIndexerChain interface {
// CurrentHeader retrieves the latest locally known header.
CurrentHeader() *types.Header
// SubscribeChainHeadEvent subscribes to new head header notifications.
SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Subscription
}
// ChainIndexer does a post-processing job for equally sized sections of the
// canonical chain (like BlooomBits and CHT structures). A ChainIndexer is
// connected to the blockchain through the event system by starting a
// ChainHeadEventLoop in a goroutine.
//
// Further child ChainIndexers can be added which use the output of the parent
// section indexer. These child indexers receive new head notifications only
// after an entire section has been finished or in case of rollbacks that might
// affect already finished sections.
type ChainIndexer struct {
chainDb ethdb.Database // Chain database to index the data from
indexDb ethdb.Database // Prefixed table-view of the db to write index metadata into
backend ChainIndexerBackend // Background processor generating the index data content
children []*ChainIndexer // Child indexers to cascade chain updates to
active atomic.Bool // Flag whether the event loop was started
update chan struct{} // Notification channel that headers should be processed
quit chan chan error // Quit channel to tear down running goroutines
ctx context.Context
ctxCancel func()
sectionSize uint64 // Number of blocks in a single chain segment to process
confirmsReq uint64 // Number of confirmations before processing a completed segment
storedSections uint64 // Number of sections successfully indexed into the database
knownSections uint64 // Number of sections known to be complete (block wise)
cascadedHead uint64 // Block number of the last completed section cascaded to subindexers
checkpointSections uint64 // Number of sections covered by the checkpoint
checkpointHead common.Hash // Section head belonging to the checkpoint
throttling time.Duration // Disk throttling to prevent a heavy upgrade from hogging resources
log log.Logger
lock sync.Mutex
}
// NewChainIndexer creates a new chain indexer to do background processing on
// chain segments of a given size after certain number of confirmations passed.
// The throttling parameter might be used to prevent database thrashing.
func NewChainIndexer(chainDb ethdb.Database, indexDb ethdb.Database, backend ChainIndexerBackend, section, confirm uint64, throttling time.Duration, kind string) *ChainIndexer {
c := &ChainIndexer{
chainDb: chainDb,
indexDb: indexDb,
backend: backend,
update: make(chan struct{}, 1),
quit: make(chan chan error),
sectionSize: section,
confirmsReq: confirm,
throttling: throttling,
log: log.New("type", kind),
}
// Initialize database dependent fields and start the updater
c.loadValidSections()
c.ctx, c.ctxCancel = context.WithCancel(context.Background())
go c.updateLoop()
return c
}
// AddCheckpoint adds a checkpoint. Sections are never processed and the chain
// is not expected to be available before this point. The indexer assumes that
// the backend has sufficient information available to process subsequent sections.
//
// Note: knownSections == 0 and storedSections == checkpointSections until
// syncing reaches the checkpoint
func (c *ChainIndexer) AddCheckpoint(section uint64, shead common.Hash) {
c.lock.Lock()
defer c.lock.Unlock()
// Short circuit if the given checkpoint is below than local's.
if c.checkpointSections >= section+1 || section < c.storedSections {
return
}
c.checkpointSections = section + 1
c.checkpointHead = shead
c.setSectionHead(section, shead)
c.setValidSections(section + 1)
}
// Start creates a goroutine to feed chain head events into the indexer for
// cascading background processing. Children do not need to be started, they
// are notified about new events by their parents.
func (c *ChainIndexer) Start(chain ChainIndexerChain) {
events := make(chan ChainHeadEvent, 10)
sub := chain.SubscribeChainHeadEvent(events)
go c.eventLoop(chain.CurrentHeader(), events, sub)
}
// Close tears down all goroutines belonging to the indexer and returns any error
// that might have occurred internally.
func (c *ChainIndexer) Close() error {
var errs []error
c.ctxCancel()
// Tear down the primary update loop
errc := make(chan error)
c.quit <- errc
if err := <-errc; err != nil {
errs = append(errs, err)
}
// If needed, tear down the secondary event loop
if c.active.Load() {
c.quit <- errc
if err := <-errc; err != nil {
errs = append(errs, err)
}
}
// Close all children
for _, child := range c.children {
if err := child.Close(); err != nil {
errs = append(errs, err)
}
}
// Return any failures
switch {
case len(errs) == 0:
return nil
case len(errs) == 1:
return errs[0]
default:
return fmt.Errorf("%v", errs)
}
}
// eventLoop is a secondary - optional - event loop of the indexer which is only
// started for the outermost indexer to push chain head events into a processing
// queue.
func (c *ChainIndexer) eventLoop(currentHeader *types.Header, events chan ChainHeadEvent, sub event.Subscription) {
// Mark the chain indexer as active, requiring an additional teardown
c.active.Store(true)
defer sub.Unsubscribe()
// Fire the initial new head event to start any outstanding processing
c.newHead(currentHeader.Number.Uint64(), false)
var (
prevHeader = currentHeader
prevHash = currentHeader.Hash()
)
for {
select {
case errc := <-c.quit:
// Chain indexer terminating, report no failure and abort
errc <- nil
return
case ev, ok := <-events:
// Received a new event, ensure it's not nil (closing) and update
if !ok {
errc := <-c.quit
errc <- nil
return
}
if ev.Header.ParentHash != prevHash {
// Reorg to the common ancestor if needed (might not exist in light sync mode, skip reorg then)
// TODO(karalabe, zsfelfoldi): This seems a bit brittle, can we detect this case explicitly?
if rawdb.ReadCanonicalHash(c.chainDb, prevHeader.Number.Uint64()) != prevHash {
if h := rawdb.FindCommonAncestor(c.chainDb, prevHeader, ev.Header); h != nil {
c.newHead(h.Number.Uint64(), true)
}
}
}
c.newHead(ev.Header.Number.Uint64(), false)
prevHeader, prevHash = ev.Header, ev.Header.Hash()
}
}
}
// newHead notifies the indexer about new chain heads and/or reorgs.
func (c *ChainIndexer) newHead(head uint64, reorg bool) {
c.lock.Lock()
defer c.lock.Unlock()
// If a reorg happened, invalidate all sections until that point
if reorg {
// Revert the known section number to the reorg point
known := (head + 1) / c.sectionSize
stored := known
if known < c.checkpointSections {
known = 0
}
if stored < c.checkpointSections {
stored = c.checkpointSections
}
if known < c.knownSections {
c.knownSections = known
}
// Revert the stored sections from the database to the reorg point
if stored < c.storedSections {
c.setValidSections(stored)
}
// Update the new head number to the finalized section end and notify children
head = known * c.sectionSize
if head < c.cascadedHead {
c.cascadedHead = head
for _, child := range c.children {
child.newHead(c.cascadedHead, true)
}
}
return
}
// No reorg, calculate the number of newly known sections and update if high enough
var sections uint64
if head >= c.confirmsReq {
sections = (head + 1 - c.confirmsReq) / c.sectionSize
if sections < c.checkpointSections {
sections = 0
}
if sections > c.knownSections {
if c.knownSections < c.checkpointSections {
// syncing reached the checkpoint, verify section head
syncedHead := rawdb.ReadCanonicalHash(c.chainDb, c.checkpointSections*c.sectionSize-1)
if syncedHead != c.checkpointHead {
c.log.Error("Synced chain does not match checkpoint", "number", c.checkpointSections*c.sectionSize-1, "expected", c.checkpointHead, "synced", syncedHead)
return
}
}
c.knownSections = sections
select {
case c.update <- struct{}{}:
default:
}
}
}
}
// updateLoop is the main event loop of the indexer which pushes chain segments
// down into the processing backend.
func (c *ChainIndexer) updateLoop() {
var (
updating bool
updated time.Time
)
for {
select {
case errc := <-c.quit:
// Chain indexer terminating, report no failure and abort
errc <- nil
return
case <-c.update:
// Section headers completed (or rolled back), update the index
c.lock.Lock()
if c.knownSections > c.storedSections {
// Periodically print an upgrade log message to the user
if time.Since(updated) > 8*time.Second {
if c.knownSections > c.storedSections+1 {
updating = true
c.log.Info("Upgrading chain index", "percentage", c.storedSections*100/c.knownSections)
}
updated = time.Now()
}
// Cache the current section count and head to allow unlocking the mutex
c.verifyLastHead()
section := c.storedSections
var oldHead common.Hash
if section > 0 {
oldHead = c.SectionHead(section - 1)
}
// Process the newly defined section in the background
c.lock.Unlock()
newHead, err := c.processSection(section, oldHead)
if err != nil {
select {
case <-c.ctx.Done():
<-c.quit <- nil
return
default:
}
c.log.Error("Section processing failed", "error", err)
}
c.lock.Lock()
// If processing succeeded and no reorgs occurred, mark the section completed
if err == nil && (section == 0 || oldHead == c.SectionHead(section-1)) {
c.setSectionHead(section, newHead)
c.setValidSections(section + 1)
if c.storedSections == c.knownSections && updating {
updating = false
c.log.Info("Finished upgrading chain index")
}
c.cascadedHead = c.storedSections*c.sectionSize - 1
for _, child := range c.children {
c.log.Trace("Cascading chain index update", "head", c.cascadedHead)
child.newHead(c.cascadedHead, false)
}
} else {
// If processing failed, don't retry until further notification
c.log.Debug("Chain index processing failed", "section", section, "err", err)
c.verifyLastHead()
c.knownSections = c.storedSections
}
}
// If there are still further sections to process, reschedule
if c.knownSections > c.storedSections {
time.AfterFunc(c.throttling, func() {
select {
case c.update <- struct{}{}:
default:
}
})
}
c.lock.Unlock()
}
}
}
// processSection processes an entire section by calling backend functions while
// ensuring the continuity of the passed headers. Since the chain mutex is not
// held while processing, the continuity can be broken by a long reorg, in which
// case the function returns with an error.
func (c *ChainIndexer) processSection(section uint64, lastHead common.Hash) (common.Hash, error) {
c.log.Trace("Processing new chain section", "section", section)
// Reset and partial processing
if err := c.backend.Reset(c.ctx, section, lastHead); err != nil {
c.setValidSections(0)
return common.Hash{}, err
}
for number := section * c.sectionSize; number < (section+1)*c.sectionSize; number++ {
hash := rawdb.ReadCanonicalHash(c.chainDb, number)
if hash == (common.Hash{}) {
return common.Hash{}, fmt.Errorf("canonical block #%d unknown", number)
}
header := rawdb.ReadHeader(c.chainDb, hash, number)
if header == nil {
return common.Hash{}, fmt.Errorf("block #%d [%x..] not found", number, hash[:4])
} else if header.ParentHash != lastHead {
return common.Hash{}, errors.New("chain reorged during section processing")
}
if err := c.backend.Process(c.ctx, header); err != nil {
return common.Hash{}, err
}
lastHead = header.Hash()
}
if err := c.backend.Commit(); err != nil {
return common.Hash{}, err
}
return lastHead, nil
}
// verifyLastHead compares last stored section head with the corresponding block hash in the
// actual canonical chain and rolls back reorged sections if necessary to ensure that stored
// sections are all valid
func (c *ChainIndexer) verifyLastHead() {
for c.storedSections > 0 && c.storedSections > c.checkpointSections {
if c.SectionHead(c.storedSections-1) == rawdb.ReadCanonicalHash(c.chainDb, c.storedSections*c.sectionSize-1) {
return
}
c.setValidSections(c.storedSections - 1)
}
}
// Sections returns the number of processed sections maintained by the indexer
// and also the information about the last header indexed for potential canonical
// verifications.
func (c *ChainIndexer) Sections() (uint64, uint64, common.Hash) {
c.lock.Lock()
defer c.lock.Unlock()
c.verifyLastHead()
return c.storedSections, c.storedSections*c.sectionSize - 1, c.SectionHead(c.storedSections - 1)
}
// AddChildIndexer adds a child ChainIndexer that can use the output of this one
func (c *ChainIndexer) AddChildIndexer(indexer *ChainIndexer) {
if indexer == c {
panic("can't add indexer as a child of itself")
}
c.lock.Lock()
defer c.lock.Unlock()
c.children = append(c.children, indexer)
// Cascade any pending updates to new children too
sections := c.storedSections
if c.knownSections < sections {
// if a section is "stored" but not "known" then it is a checkpoint without
// available chain data so we should not cascade it yet
sections = c.knownSections
}
if sections > 0 {
indexer.newHead(sections*c.sectionSize-1, false)
}
}
// Prune deletes all chain data older than given threshold.
func (c *ChainIndexer) Prune(threshold uint64) error {
return c.backend.Prune(threshold)
}
// loadValidSections reads the number of valid sections from the index database
// and caches is into the local state.
func (c *ChainIndexer) loadValidSections() {
data, _ := c.indexDb.Get([]byte("count"))
if len(data) == 8 {
c.storedSections = binary.BigEndian.Uint64(data)
}
}
// setValidSections writes the number of valid sections to the index database
func (c *ChainIndexer) setValidSections(sections uint64) {
// Set the current number of valid sections in the database
var data [8]byte
binary.BigEndian.PutUint64(data[:], sections)
c.indexDb.Put([]byte("count"), data[:])
// Remove any reorged sections, caching the valids in the mean time
for c.storedSections > sections {
c.storedSections--
c.removeSectionHead(c.storedSections)
}
c.storedSections = sections // needed if new > old
}
// SectionHead retrieves the last block hash of a processed section from the
// index database.
func (c *ChainIndexer) SectionHead(section uint64) common.Hash {
var data [8]byte
binary.BigEndian.PutUint64(data[:], section)
hash, _ := c.indexDb.Get(append([]byte("shead"), data[:]...))
if len(hash) == len(common.Hash{}) {
return common.BytesToHash(hash)
}
return common.Hash{}
}
// setSectionHead writes the last block hash of a processed section to the index
// database.
func (c *ChainIndexer) setSectionHead(section uint64, hash common.Hash) {
var data [8]byte
binary.BigEndian.PutUint64(data[:], section)
c.indexDb.Put(append([]byte("shead"), data[:]...), hash.Bytes())
}
// removeSectionHead removes the reference to a processed section from the index
// database.
func (c *ChainIndexer) removeSectionHead(section uint64) {
var data [8]byte
binary.BigEndian.PutUint64(data[:], section)
c.indexDb.Delete(append([]byte("shead"), data[:]...))
}

View File

@ -1,246 +0,0 @@
// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package core
import (
"context"
"errors"
"fmt"
"math/big"
"math/rand"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
)
// Runs multiple tests with randomized parameters.
func TestChainIndexerSingle(t *testing.T) {
for i := 0; i < 10; i++ {
testChainIndexer(t, 1)
}
}
// Runs multiple tests with randomized parameters and different number of
// chain backends.
func TestChainIndexerWithChildren(t *testing.T) {
for i := 2; i < 8; i++ {
testChainIndexer(t, i)
}
}
// testChainIndexer runs a test with either a single chain indexer or a chain of
// multiple backends. The section size and required confirmation count parameters
// are randomized.
func testChainIndexer(t *testing.T, count int) {
db := rawdb.NewMemoryDatabase()
defer db.Close()
// Create a chain of indexers and ensure they all report empty
backends := make([]*testChainIndexBackend, count)
for i := 0; i < count; i++ {
var (
sectionSize = uint64(rand.Intn(100) + 1)
confirmsReq = uint64(rand.Intn(10))
)
backends[i] = &testChainIndexBackend{t: t, processCh: make(chan uint64)}
backends[i].indexer = NewChainIndexer(db, rawdb.NewTable(db, string([]byte{byte(i)})), backends[i], sectionSize, confirmsReq, 0, fmt.Sprintf("indexer-%d", i))
if sections, _, _ := backends[i].indexer.Sections(); sections != 0 {
t.Fatalf("Canonical section count mismatch: have %v, want %v", sections, 0)
}
if i > 0 {
backends[i-1].indexer.AddChildIndexer(backends[i].indexer)
}
}
defer backends[0].indexer.Close() // parent indexer shuts down children
// notify pings the root indexer about a new head or reorg, then expect
// processed blocks if a section is processable
notify := func(headNum, failNum uint64, reorg bool) {
backends[0].indexer.newHead(headNum, reorg)
if reorg {
for _, backend := range backends {
headNum = backend.reorg(headNum)
backend.assertSections()
}
return
}
var cascade bool
for _, backend := range backends {
headNum, cascade = backend.assertBlocks(headNum, failNum)
if !cascade {
break
}
backend.assertSections()
}
}
// inject inserts a new random canonical header into the database directly
inject := func(number uint64) {
header := &types.Header{Number: big.NewInt(int64(number)), Extra: big.NewInt(rand.Int63()).Bytes()}
if number > 0 {
header.ParentHash = rawdb.ReadCanonicalHash(db, number-1)
}
rawdb.WriteHeader(db, header)
rawdb.WriteCanonicalHash(db, header.Hash(), number)
}
// Start indexer with an already existing chain
for i := uint64(0); i <= 100; i++ {
inject(i)
}
notify(100, 100, false)
// Add new blocks one by one
for i := uint64(101); i <= 1000; i++ {
inject(i)
notify(i, i, false)
}
// Do a reorg
notify(500, 500, true)
// Create new fork
for i := uint64(501); i <= 1000; i++ {
inject(i)
notify(i, i, false)
}
for i := uint64(1001); i <= 1500; i++ {
inject(i)
}
// Failed processing scenario where less blocks are available than notified
notify(2000, 1500, false)
// Notify about a reorg (which could have caused the missing blocks if happened during processing)
notify(1500, 1500, true)
// Create new fork
for i := uint64(1501); i <= 2000; i++ {
inject(i)
notify(i, i, false)
}
}
// testChainIndexBackend implements ChainIndexerBackend
type testChainIndexBackend struct {
t *testing.T
indexer *ChainIndexer
section, headerCnt, stored uint64
processCh chan uint64
}
// assertSections verifies if a chain indexer has the correct number of section.
func (b *testChainIndexBackend) assertSections() {
// Keep trying for 3 seconds if it does not match
var sections uint64
for i := 0; i < 300; i++ {
sections, _, _ = b.indexer.Sections()
if sections == b.stored {
return
}
time.Sleep(10 * time.Millisecond)
}
b.t.Fatalf("Canonical section count mismatch: have %v, want %v", sections, b.stored)
}
// assertBlocks expects processing calls after new blocks have arrived. If the
// failNum < headNum then we are simulating a scenario where a reorg has happened
// after the processing has started and the processing of a section fails.
func (b *testChainIndexBackend) assertBlocks(headNum, failNum uint64) (uint64, bool) {
var sections uint64
if headNum >= b.indexer.confirmsReq {
sections = (headNum + 1 - b.indexer.confirmsReq) / b.indexer.sectionSize
if sections > b.stored {
// expect processed blocks
for expectd := b.stored * b.indexer.sectionSize; expectd < sections*b.indexer.sectionSize; expectd++ {
if expectd > failNum {
// rolled back after processing started, no more process calls expected
// wait until updating is done to make sure that processing actually fails
var updating bool
for i := 0; i < 300; i++ {
b.indexer.lock.Lock()
updating = b.indexer.knownSections > b.indexer.storedSections
b.indexer.lock.Unlock()
if !updating {
break
}
time.Sleep(10 * time.Millisecond)
}
if updating {
b.t.Fatalf("update did not finish")
}
sections = expectd / b.indexer.sectionSize
break
}
select {
case <-time.After(10 * time.Second):
b.t.Fatalf("Expected processed block #%d, got nothing", expectd)
case processed := <-b.processCh:
if processed != expectd {
b.t.Errorf("Expected processed block #%d, got #%d", expectd, processed)
}
}
}
b.stored = sections
}
}
if b.stored == 0 {
return 0, false
}
return b.stored*b.indexer.sectionSize - 1, true
}
func (b *testChainIndexBackend) reorg(headNum uint64) uint64 {
firstChanged := (headNum + 1) / b.indexer.sectionSize
if firstChanged < b.stored {
b.stored = firstChanged
}
return b.stored * b.indexer.sectionSize
}
func (b *testChainIndexBackend) Reset(ctx context.Context, section uint64, prevHead common.Hash) error {
b.section = section
b.headerCnt = 0
return nil
}
func (b *testChainIndexBackend) Process(ctx context.Context, header *types.Header) error {
b.headerCnt++
if b.headerCnt > b.indexer.sectionSize {
b.t.Error("Processing too many headers")
}
//t.processCh <- header.Number.Uint64()
select {
case <-time.After(10 * time.Second):
b.t.Error("Unexpected call to Process")
// Can't use Fatal since this is not the test's goroutine.
// Returning error stops the chainIndexer's updateLoop
return errors.New("unexpected call to Process")
case b.processCh <- header.Number.Uint64():
}
return nil
}
func (b *testChainIndexBackend) Commit() error {
if b.headerCnt != b.indexer.sectionSize {
b.t.Error("Not enough headers processed")
}
return nil
}
func (b *testChainIndexBackend) Prune(threshold uint64) error {
return nil
}

View File

@ -17,7 +17,6 @@
package rawdb package rawdb
import ( import (
"bytes"
"encoding/binary" "encoding/binary"
"errors" "errors"
"math/big" "math/big"
@ -147,41 +146,6 @@ func ReadReceipt(db ethdb.Reader, hash common.Hash, config *params.ChainConfig)
return nil, common.Hash{}, 0, 0 return nil, common.Hash{}, 0, 0
} }
// ReadBloomBits retrieves the compressed bloom bit vector belonging to the given
// section and bit index from the.
func ReadBloomBits(db ethdb.KeyValueReader, bit uint, section uint64, head common.Hash) ([]byte, error) {
return db.Get(bloomBitsKey(bit, section, head))
}
// WriteBloomBits stores the compressed bloom bits vector belonging to the given
// section and bit index.
func WriteBloomBits(db ethdb.KeyValueWriter, bit uint, section uint64, head common.Hash, bits []byte) {
if err := db.Put(bloomBitsKey(bit, section, head), bits); err != nil {
log.Crit("Failed to store bloom bits", "err", err)
}
}
// DeleteBloombits removes all compressed bloom bits vector belonging to the
// given section range and bit index.
func DeleteBloombits(db ethdb.Database, bit uint, from uint64, to uint64) {
start, end := bloomBitsKey(bit, from, common.Hash{}), bloomBitsKey(bit, to, common.Hash{})
it := db.NewIterator(nil, start)
defer it.Release()
for it.Next() {
if bytes.Compare(it.Key(), end) >= 0 {
break
}
if len(it.Key()) != len(bloomBitsPrefix)+2+8+32 {
continue
}
db.Delete(it.Key())
}
if it.Error() != nil {
log.Crit("Failed to delete bloom bits", "err", it.Error())
}
}
var emptyRow = []uint32{} var emptyRow = []uint32{}
// ReadFilterMapRow retrieves a filter map row at the given mapRowIndex // ReadFilterMapRow retrieves a filter map row at the given mapRowIndex

View File

@ -111,46 +111,3 @@ func TestLookupStorage(t *testing.T) {
}) })
} }
} }
func TestDeleteBloomBits(t *testing.T) {
// Prepare testing data
db := NewMemoryDatabase()
for i := uint(0); i < 2; i++ {
for s := uint64(0); s < 2; s++ {
WriteBloomBits(db, i, s, params.MainnetGenesisHash, []byte{0x01, 0x02})
WriteBloomBits(db, i, s, params.SepoliaGenesisHash, []byte{0x01, 0x02})
}
}
check := func(bit uint, section uint64, head common.Hash, exist bool) {
bits, _ := ReadBloomBits(db, bit, section, head)
if exist && !bytes.Equal(bits, []byte{0x01, 0x02}) {
t.Fatalf("Bloombits mismatch")
}
if !exist && len(bits) > 0 {
t.Fatalf("Bloombits should be removed")
}
}
// Check the existence of written data.
check(0, 0, params.MainnetGenesisHash, true)
check(0, 0, params.SepoliaGenesisHash, true)
// Check the existence of deleted data.
DeleteBloombits(db, 0, 0, 1)
check(0, 0, params.MainnetGenesisHash, false)
check(0, 0, params.SepoliaGenesisHash, false)
check(0, 1, params.MainnetGenesisHash, true)
check(0, 1, params.SepoliaGenesisHash, true)
// Check the existence of deleted data.
DeleteBloombits(db, 0, 0, 2)
check(0, 0, params.MainnetGenesisHash, false)
check(0, 0, params.SepoliaGenesisHash, false)
check(0, 1, params.MainnetGenesisHash, false)
check(0, 1, params.SepoliaGenesisHash, false)
// Bit1 shouldn't be affect.
check(1, 0, params.MainnetGenesisHash, true)
check(1, 0, params.SepoliaGenesisHash, true)
check(1, 1, params.MainnetGenesisHash, true)
check(1, 1, params.SepoliaGenesisHash, true)
}

View File

@ -375,7 +375,6 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error {
accountSnaps stat accountSnaps stat
storageSnaps stat storageSnaps stat
preimages stat preimages stat
bloomBits stat
beaconHeaders stat beaconHeaders stat
cliqueSnaps stat cliqueSnaps stat
@ -436,10 +435,6 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error {
metadata.Add(size) metadata.Add(size)
case bytes.HasPrefix(key, genesisPrefix) && len(key) == (len(genesisPrefix)+common.HashLength): case bytes.HasPrefix(key, genesisPrefix) && len(key) == (len(genesisPrefix)+common.HashLength):
metadata.Add(size) metadata.Add(size)
case bytes.HasPrefix(key, bloomBitsPrefix) && len(key) == (len(bloomBitsPrefix)+10+common.HashLength):
bloomBits.Add(size)
case bytes.HasPrefix(key, BloomBitsIndexPrefix):
bloomBits.Add(size)
case bytes.HasPrefix(key, skeletonHeaderPrefix) && len(key) == (len(skeletonHeaderPrefix)+8): case bytes.HasPrefix(key, skeletonHeaderPrefix) && len(key) == (len(skeletonHeaderPrefix)+8):
beaconHeaders.Add(size) beaconHeaders.Add(size)
case bytes.HasPrefix(key, CliqueSnapshotPrefix) && len(key) == 7+common.HashLength: case bytes.HasPrefix(key, CliqueSnapshotPrefix) && len(key) == 7+common.HashLength:
@ -504,7 +499,6 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error {
{"Key-Value store", "Block number->hash", numHashPairings.Size(), numHashPairings.Count()}, {"Key-Value store", "Block number->hash", numHashPairings.Size(), numHashPairings.Count()},
{"Key-Value store", "Block hash->number", hashNumPairings.Size(), hashNumPairings.Count()}, {"Key-Value store", "Block hash->number", hashNumPairings.Size(), hashNumPairings.Count()},
{"Key-Value store", "Transaction index", txLookups.Size(), txLookups.Count()}, {"Key-Value store", "Transaction index", txLookups.Size(), txLookups.Count()},
{"Key-Value store", "Bloombit index", bloomBits.Size(), bloomBits.Count()},
{"Key-Value store", "Contract codes", codes.Size(), codes.Count()}, {"Key-Value store", "Contract codes", codes.Size(), codes.Count()},
{"Key-Value store", "Hash trie nodes", legacyTries.Size(), legacyTries.Count()}, {"Key-Value store", "Hash trie nodes", legacyTries.Size(), legacyTries.Count()},
{"Key-Value store", "Path trie state lookups", stateLookups.Size(), stateLookups.Count()}, {"Key-Value store", "Path trie state lookups", stateLookups.Size(), stateLookups.Count()},

View File

@ -230,16 +230,6 @@ func storageSnapshotsKey(accountHash common.Hash) []byte {
return append(SnapshotStoragePrefix, accountHash.Bytes()...) return append(SnapshotStoragePrefix, accountHash.Bytes()...)
} }
// bloomBitsKey = bloomBitsPrefix + bit (uint16 big endian) + section (uint64 big endian) + hash
func bloomBitsKey(bit uint, section uint64, hash common.Hash) []byte {
key := append(append(bloomBitsPrefix, make([]byte, 10)...), hash.Bytes()...)
binary.BigEndian.PutUint16(key[1:], uint16(bit))
binary.BigEndian.PutUint64(key[3:], section)
return key
}
// skeletonHeaderKey = skeletonHeaderPrefix + num (uint64 big endian) // skeletonHeaderKey = skeletonHeaderPrefix + num (uint64 big endian)
func skeletonHeaderKey(number uint64) []byte { func skeletonHeaderKey(number uint64) []byte {
return append(skeletonHeaderPrefix, encodeBlockNumber(number)...) return append(skeletonHeaderPrefix, encodeBlockNumber(number)...)

View File

@ -28,7 +28,6 @@ import (
"github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/consensus/misc/eip4844" "github.com/ethereum/go-ethereum/consensus/misc/eip4844"
"github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/filtermaps" "github.com/ethereum/go-ethereum/core/filtermaps"
"github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/state"
@ -401,17 +400,6 @@ func (b *EthAPIBackend) RPCTxFeeCap() float64 {
return b.eth.config.RPCTxFeeCap return b.eth.config.RPCTxFeeCap
} }
func (b *EthAPIBackend) BloomStatus() (uint64, uint64) {
sections, _, _ := b.eth.bloomIndexer.Sections()
return params.BloomBitsBlocks, sections
}
func (b *EthAPIBackend) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) {
for i := 0; i < bloomFilterThreads; i++ {
go session.Multiplex(bloomRetrievalBatch, bloomRetrievalWait, b.eth.bloomRequests)
}
}
func (b *EthAPIBackend) NewMatcherBackend() filtermaps.MatcherBackend { func (b *EthAPIBackend) NewMatcherBackend() filtermaps.MatcherBackend {
return b.eth.filterMaps.NewMatcherBackend() return b.eth.filterMaps.NewMatcherBackend()
} }

View File

@ -29,7 +29,6 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/filtermaps" "github.com/ethereum/go-ethereum/core/filtermaps"
"github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state/pruner" "github.com/ethereum/go-ethereum/core/state/pruner"
@ -82,10 +81,6 @@ type Ethereum struct {
engine consensus.Engine engine consensus.Engine
accountManager *accounts.Manager accountManager *accounts.Manager
bloomRequests chan chan *bloombits.Retrieval // Channel receiving bloom data retrieval requests
bloomIndexer *core.ChainIndexer // Bloom indexer operating during block imports
closeBloomHandler chan struct{}
filterMaps *filtermaps.FilterMaps filterMaps *filtermaps.FilterMaps
APIBackend *EthAPIBackend APIBackend *EthAPIBackend
@ -159,11 +154,8 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
eventMux: stack.EventMux(), eventMux: stack.EventMux(),
accountManager: stack.AccountManager(), accountManager: stack.AccountManager(),
engine: engine, engine: engine,
closeBloomHandler: make(chan struct{}),
networkID: networkID, networkID: networkID,
gasPrice: config.Miner.GasPrice, gasPrice: config.Miner.GasPrice,
bloomRequests: make(chan chan *bloombits.Retrieval),
bloomIndexer: core.NewBloomIndexer(chainDb, params.BloomBitsBlocks, params.BloomConfirms),
p2pServer: stack.Server(), p2pServer: stack.Server(),
discmix: enode.NewFairMix(0), discmix: enode.NewFairMix(0),
shutdownTracker: shutdowncheck.NewShutdownTracker(chainDb), shutdownTracker: shutdowncheck.NewShutdownTracker(chainDb),
@ -224,7 +216,6 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
eth.bloomIndexer.Start(eth.blockchain)
eth.filterMaps = filtermaps.NewFilterMaps(chainDb, eth.blockchain) eth.filterMaps = filtermaps.NewFilterMaps(chainDb, eth.blockchain)
if config.BlobPool.Datadir != "" { if config.BlobPool.Datadir != "" {
@ -348,7 +339,6 @@ func (s *Ethereum) Downloader() *downloader.Downloader { return s.handler.downlo
func (s *Ethereum) Synced() bool { return s.handler.synced.Load() } func (s *Ethereum) Synced() bool { return s.handler.synced.Load() }
func (s *Ethereum) SetSynced() { s.handler.enableSyncedFeatures() } func (s *Ethereum) SetSynced() { s.handler.enableSyncedFeatures() }
func (s *Ethereum) ArchiveMode() bool { return s.config.NoPruning } func (s *Ethereum) ArchiveMode() bool { return s.config.NoPruning }
func (s *Ethereum) BloomIndexer() *core.ChainIndexer { return s.bloomIndexer }
// Protocols returns all the currently configured // Protocols returns all the currently configured
// network protocols to start. // network protocols to start.
@ -365,9 +355,6 @@ func (s *Ethereum) Protocols() []p2p.Protocol {
func (s *Ethereum) Start() error { func (s *Ethereum) Start() error {
s.setupDiscovery() s.setupDiscovery()
// Start the bloom bits servicing goroutines
s.startBloomHandlers(params.BloomBitsBlocks)
// Regularly update shutdown marker // Regularly update shutdown marker
s.shutdownTracker.Start() s.shutdownTracker.Start()
@ -416,9 +403,7 @@ func (s *Ethereum) Stop() error {
s.handler.Stop() s.handler.Stop()
// Then stop everything else. // Then stop everything else.
s.bloomIndexer.Close()
s.filterMaps.Close() s.filterMaps.Close()
close(s.closeBloomHandler)
s.txPool.Close() s.txPool.Close()
s.blockchain.Stop() s.blockchain.Stop()
s.engine.Close() s.engine.Close()

View File

@ -1,74 +0,0 @@
// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package eth
import (
"time"
"github.com/ethereum/go-ethereum/common/bitutil"
"github.com/ethereum/go-ethereum/core/rawdb"
)
const (
// bloomServiceThreads is the number of goroutines used globally by an Ethereum
// instance to service bloombits lookups for all running filters.
bloomServiceThreads = 16
// bloomFilterThreads is the number of goroutines used locally per filter to
// multiplex requests onto the global servicing goroutines.
bloomFilterThreads = 3
// bloomRetrievalBatch is the maximum number of bloom bit retrievals to service
// in a single batch.
bloomRetrievalBatch = 16
// bloomRetrievalWait is the maximum time to wait for enough bloom bit requests
// to accumulate request an entire batch (avoiding hysteresis).
bloomRetrievalWait = time.Duration(0)
)
// startBloomHandlers starts a batch of goroutines to accept bloom bit database
// retrievals from possibly a range of filters and serving the data to satisfy.
func (eth *Ethereum) startBloomHandlers(sectionSize uint64) {
for i := 0; i < bloomServiceThreads; i++ {
go func() {
for {
select {
case <-eth.closeBloomHandler:
return
case request := <-eth.bloomRequests:
task := <-request
task.Bitsets = make([][]byte, len(task.Sections))
for i, section := range task.Sections {
head := rawdb.ReadCanonicalHash(eth.chainDb, (section+1)*sectionSize-1)
if compVector, err := rawdb.ReadBloomBits(eth.chainDb, task.Bit, section, head); err == nil {
if blob, err := bitutil.DecompressBytes(compVector, int(sectionSize/8)); err == nil {
task.Bitsets[i] = blob
} else {
task.Error = err
}
} else {
task.Error = err
}
}
request <- task
}
}
}()
}
}

View File

@ -25,7 +25,6 @@ import (
"time" "time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/filtermaps" "github.com/ethereum/go-ethereum/core/filtermaps"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
@ -42,37 +41,13 @@ type Filter struct {
block *common.Hash // Block hash if filtering a single block block *common.Hash // Block hash if filtering a single block
begin, end int64 // Range interval if filtering multiple blocks begin, end int64 // Range interval if filtering multiple blocks
bbMatchCount uint64 bbMatchCount uint64
matcher *bloombits.Matcher
} }
// NewRangeFilter creates a new filter which uses a bloom filter on blocks to // NewRangeFilter creates a new filter which uses a bloom filter on blocks to
// figure out whether a particular block is interesting or not. // figure out whether a particular block is interesting or not.
func (sys *FilterSystem) NewRangeFilter(begin, end int64, addresses []common.Address, topics [][]common.Hash) *Filter { func (sys *FilterSystem) NewRangeFilter(begin, end int64, addresses []common.Address, topics [][]common.Hash) *Filter {
// Flatten the address and topic filter clauses into a single bloombits filter
// system. Since the bloombits are not positional, nil topics are permitted,
// which get flattened into a nil byte slice.
var filters [][][]byte
if len(addresses) > 0 {
filter := make([][]byte, len(addresses))
for i, address := range addresses {
filter[i] = address.Bytes()
}
filters = append(filters, filter)
}
for _, topicList := range topics {
filter := make([][]byte, len(topicList))
for i, topic := range topicList {
filter[i] = topic.Bytes()
}
filters = append(filters, filter)
}
size, _ := sys.backend.BloomStatus()
// Create a generic filter and convert it into a range filter // Create a generic filter and convert it into a range filter
filter := newFilter(sys, addresses, topics) filter := newFilter(sys, addresses, topics)
filter.matcher = bloombits.NewMatcher(size, filters)
filter.begin = begin filter.begin = begin
filter.end = end filter.end = end
@ -197,23 +172,7 @@ func (f *Filter) rangeLogsAsync(ctx context.Context) (chan *types.Log, chan erro
close(logChan) close(logChan)
}() }()
// Gather all indexed logs, and finish with non indexed ones if err := f.unindexedLogs(ctx, uint64(f.end), logChan); err != nil {
var (
end = uint64(f.end)
size, sections = f.sys.backend.BloomStatus()
err error
)
if indexed := sections * size; indexed > uint64(f.begin) {
if indexed > end {
indexed = end + 1
}
if err = f.indexedLogs(ctx, indexed-1, logChan); err != nil {
errChan <- err
return
}
}
if err := f.unindexedLogs(ctx, end, logChan); err != nil {
errChan <- err errChan <- err
return return
} }
@ -224,53 +183,6 @@ func (f *Filter) rangeLogsAsync(ctx context.Context) (chan *types.Log, chan erro
return logChan, errChan return logChan, errChan
} }
// indexedLogs returns the logs matching the filter criteria based on the bloom
// bits indexed available locally or via the network.
func (f *Filter) indexedLogs(ctx context.Context, end uint64, logChan chan *types.Log) error {
// Create a matcher session and request servicing from the backend
matches := make(chan uint64, 64)
session, err := f.matcher.Start(ctx, uint64(f.begin), end, matches)
if err != nil {
return err
}
defer session.Close()
f.sys.backend.ServiceFilter(ctx, session)
for {
select {
case number, ok := <-matches:
f.bbMatchCount++
// Abort if all matches have been fulfilled
if !ok {
err := session.Error()
if err == nil {
f.begin = int64(end) + 1
}
return err
}
f.begin = int64(number) + 1
// Retrieve the suggested block and pull any truly matching logs
header, err := f.sys.backend.HeaderByNumber(ctx, rpc.BlockNumber(number))
if header == nil || err != nil {
return err
}
found, err := f.checkMatches(ctx, header)
if err != nil {
return err
}
for _, log := range found {
logChan <- log
}
case <-ctx.Done():
return ctx.Err()
}
}
}
// unindexedLogs returns the logs matching the filter criteria based on raw block // unindexedLogs returns the logs matching the filter criteria based on raw block
// iteration and bloom matching. // iteration and bloom matching.
func (f *Filter) unindexedLogs(ctx context.Context, end uint64, logChan chan *types.Log) error { func (f *Filter) unindexedLogs(ctx context.Context, end uint64, logChan chan *types.Log) error {

View File

@ -29,7 +29,6 @@ import (
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/lru" "github.com/ethereum/go-ethereum/common/lru"
"github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/filtermaps" "github.com/ethereum/go-ethereum/core/filtermaps"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/ethdb"
@ -70,9 +69,6 @@ type Backend interface {
SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription
SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription
BloomStatus() (uint64, uint64)
ServiceFilter(ctx context.Context, session *bloombits.MatcherSession)
NewMatcherBackend() filtermaps.MatcherBackend NewMatcherBackend() filtermaps.MatcherBackend
} }

View File

@ -29,7 +29,6 @@ import (
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/ethash" "github.com/ethereum/go-ethereum/consensus/ethash"
"github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/ethdb"
@ -137,37 +136,6 @@ func (b *testBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subsc
return b.chainFeed.Subscribe(ch) return b.chainFeed.Subscribe(ch)
} }
func (b *testBackend) BloomStatus() (uint64, uint64) {
return params.BloomBitsBlocks, b.sections
}
func (b *testBackend) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) {
requests := make(chan chan *bloombits.Retrieval)
go session.Multiplex(16, 0, requests)
go func() {
for {
// Wait for a service request or a shutdown
select {
case <-ctx.Done():
return
case request := <-requests:
task := <-request
task.Bitsets = make([][]byte, len(task.Sections))
for i, section := range task.Sections {
if rand.Int()%4 != 0 { // Handle occasional missing deliveries
head := rawdb.ReadCanonicalHash(b.db, (section+1)*params.BloomBitsBlocks-1)
task.Bitsets[i], _ = rawdb.ReadBloomBits(b.db, task.Bit, section, head)
}
}
request <- task
}
}
}()
}
func (b *testBackend) setPending(block *types.Block, receipts types.Receipts) { func (b *testBackend) setPending(block *types.Block, receipts types.Receipts) {
b.pendingBlock = block b.pendingBlock = block
b.pendingReceipts = receipts b.pendingReceipts = receipts

View File

@ -45,7 +45,6 @@ import (
"github.com/ethereum/go-ethereum/consensus/beacon" "github.com/ethereum/go-ethereum/consensus/beacon"
"github.com/ethereum/go-ethereum/consensus/ethash" "github.com/ethereum/go-ethereum/consensus/ethash"
"github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
@ -620,11 +619,6 @@ func (b testBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent)
func (b testBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription { func (b testBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription {
panic("implement me") panic("implement me")
} }
func (b testBackend) BloomStatus() (uint64, uint64) { panic("implement me") }
func (b testBackend) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) {
panic("implement me")
}
func TestEstimateGas(t *testing.T) { func TestEstimateGas(t *testing.T) {
t.Parallel() t.Parallel()
// Initialize test accounts // Initialize test accounts

View File

@ -27,7 +27,6 @@ import (
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/filtermaps" "github.com/ethereum/go-ethereum/core/filtermaps"
"github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
@ -94,8 +93,6 @@ type Backend interface {
GetLogs(ctx context.Context, blockHash common.Hash, number uint64) ([][]*types.Log, error) GetLogs(ctx context.Context, blockHash common.Hash, number uint64) ([][]*types.Log, error)
SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription
SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription
BloomStatus() (uint64, uint64)
ServiceFilter(ctx context.Context, session *bloombits.MatcherSession)
NewMatcherBackend() filtermaps.MatcherBackend NewMatcherBackend() filtermaps.MatcherBackend
} }

View File

@ -30,7 +30,6 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/core/vm"
@ -394,8 +393,6 @@ func (b *backendMock) TxPoolContentFrom(addr common.Address) ([]*types.Transacti
return nil, nil return nil, nil
} }
func (b *backendMock) SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription { return nil } func (b *backendMock) SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription { return nil }
func (b *backendMock) BloomStatus() (uint64, uint64) { return 0, 0 }
func (b *backendMock) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) {}
func (b *backendMock) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription { return nil } func (b *backendMock) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription { return nil }
func (b *backendMock) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription { func (b *backendMock) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription {
return nil return nil

View File

@ -20,14 +20,6 @@ package params
// aren't necessarily consensus related. // aren't necessarily consensus related.
const ( const (
// BloomBitsBlocks is the number of blocks a single bloom bit section vector
// contains on the server side.
BloomBitsBlocks uint64 = 4096
// BloomConfirms is the number of confirmation blocks before a bloom section is
// considered probably final and its rotated bits are calculated.
BloomConfirms = 256
// FullImmutabilityThreshold is the number of blocks after which a chain segment is // FullImmutabilityThreshold is the number of blocks after which a chain segment is
// considered immutable (i.e. soft finality). It is used by the downloader as a // considered immutable (i.e. soft finality). It is used by the downloader as a
// hard limit against deep ancestors, by the blockchain against deep reorgs, by // hard limit against deep ancestors, by the blockchain against deep reorgs, by