From 66ab6aab4ae09afb836dbeed4b6c5a69c2a48acf Mon Sep 17 00:00:00 2001 From: Zsolt Felfoldi Date: Fri, 27 Sep 2024 03:49:41 +0200 Subject: [PATCH] core/bloombits, eth/filters: removed bloombits --- core/blockchain.go | 2 +- core/bloom_indexer.go | 92 ---- core/bloombits/doc.go | 18 - core/bloombits/generator.go | 98 ---- core/bloombits/generator_test.go | 100 ---- core/bloombits/matcher.go | 649 ----------------------- core/bloombits/matcher_test.go | 292 ---------- core/bloombits/scheduler.go | 181 ------- core/bloombits/scheduler_test.go | 103 ---- core/chain_indexer.go | 522 ------------------ core/chain_indexer_test.go | 246 --------- core/rawdb/accessors_indexes.go | 36 -- core/rawdb/accessors_indexes_test.go | 43 -- core/rawdb/database.go | 6 - core/rawdb/schema.go | 10 - eth/api_backend.go | 12 - eth/backend.go | 43 +- eth/bloombits.go | 74 --- eth/filters/filter.go | 90 +--- eth/filters/filter_system.go | 4 - eth/filters/filter_system_test.go | 32 -- internal/ethapi/api_test.go | 6 - internal/ethapi/backend.go | 3 - internal/ethapi/transaction_args_test.go | 7 +- params/network_params.go | 8 - 25 files changed, 18 insertions(+), 2659 deletions(-) delete mode 100644 core/bloom_indexer.go delete mode 100644 core/bloombits/doc.go delete mode 100644 core/bloombits/generator.go delete mode 100644 core/bloombits/generator_test.go delete mode 100644 core/bloombits/matcher.go delete mode 100644 core/bloombits/matcher_test.go delete mode 100644 core/bloombits/scheduler.go delete mode 100644 core/bloombits/scheduler_test.go delete mode 100644 core/chain_indexer.go delete mode 100644 core/chain_indexer_test.go delete mode 100644 eth/bloombits.go diff --git a/core/blockchain.go b/core/blockchain.go index 0fe4812626..6c72432dcc 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -894,7 +894,7 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Ha rawdb.DeleteBody(db, hash, num) rawdb.DeleteReceipts(db, hash, num) } - // Todo(rjl493456442) txlookup, bloombits, etc + // Todo(rjl493456442) txlookup, log index, etc } // If SetHead was only called as a chain reparation method, try to skip // touching the header chain altogether, unless the freezer is broken diff --git a/core/bloom_indexer.go b/core/bloom_indexer.go deleted file mode 100644 index 68a35d811e..0000000000 --- a/core/bloom_indexer.go +++ /dev/null @@ -1,92 +0,0 @@ -// Copyright 2021 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see . - -package core - -import ( - "context" - "time" - - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/common/bitutil" - "github.com/ethereum/go-ethereum/core/bloombits" - "github.com/ethereum/go-ethereum/core/rawdb" - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/ethdb" -) - -const ( - // bloomThrottling is the time to wait between processing two consecutive index - // sections. It's useful during chain upgrades to prevent disk overload. - bloomThrottling = 100 * time.Millisecond -) - -// BloomIndexer implements a core.ChainIndexer, building up a rotated bloom bits index -// for the Ethereum header bloom filters, permitting blazing fast filtering. -type BloomIndexer struct { - size uint64 // section size to generate bloombits for - db ethdb.Database // database instance to write index data and metadata into - gen *bloombits.Generator // generator to rotate the bloom bits crating the bloom index - section uint64 // Section is the section number being processed currently - head common.Hash // Head is the hash of the last header processed -} - -// NewBloomIndexer returns a chain indexer that generates bloom bits data for the -// canonical chain for fast logs filtering. -func NewBloomIndexer(db ethdb.Database, size, confirms uint64) *ChainIndexer { - backend := &BloomIndexer{ - db: db, - size: size, - } - table := rawdb.NewTable(db, string(rawdb.BloomBitsIndexPrefix)) - - return NewChainIndexer(db, table, backend, size, confirms, bloomThrottling, "bloombits") -} - -// Reset implements core.ChainIndexerBackend, starting a new bloombits index -// section. -func (b *BloomIndexer) Reset(ctx context.Context, section uint64, lastSectionHead common.Hash) error { - gen, err := bloombits.NewGenerator(uint(b.size)) - b.gen, b.section, b.head = gen, section, common.Hash{} - return err -} - -// Process implements core.ChainIndexerBackend, adding a new header's bloom into -// the index. -func (b *BloomIndexer) Process(ctx context.Context, header *types.Header) error { - b.gen.AddBloom(uint(header.Number.Uint64()-b.section*b.size), header.Bloom) - b.head = header.Hash() - return nil -} - -// Commit implements core.ChainIndexerBackend, finalizing the bloom section and -// writing it out into the database. -func (b *BloomIndexer) Commit() error { - batch := b.db.NewBatchWithSize((int(b.size) / 8) * types.BloomBitLength) - for i := 0; i < types.BloomBitLength; i++ { - bits, err := b.gen.Bitset(uint(i)) - if err != nil { - return err - } - rawdb.WriteBloomBits(batch, uint(i), b.section, b.head, bitutil.CompressBytes(bits)) - } - return batch.Write() -} - -// Prune returns an empty error since we don't support pruning here. -func (b *BloomIndexer) Prune(threshold uint64) error { - return nil -} diff --git a/core/bloombits/doc.go b/core/bloombits/doc.go deleted file mode 100644 index 3d159e74f7..0000000000 --- a/core/bloombits/doc.go +++ /dev/null @@ -1,18 +0,0 @@ -// Copyright 2017 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see . - -// Package bloombits implements bloom filtering on batches of data. -package bloombits diff --git a/core/bloombits/generator.go b/core/bloombits/generator.go deleted file mode 100644 index 646151db0b..0000000000 --- a/core/bloombits/generator.go +++ /dev/null @@ -1,98 +0,0 @@ -// Copyright 2017 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see . - -package bloombits - -import ( - "errors" - - "github.com/ethereum/go-ethereum/core/types" -) - -var ( - // errSectionOutOfBounds is returned if the user tried to add more bloom filters - // to the batch than available space, or if tries to retrieve above the capacity. - errSectionOutOfBounds = errors.New("section out of bounds") - - // errBloomBitOutOfBounds is returned if the user tried to retrieve specified - // bit bloom above the capacity. - errBloomBitOutOfBounds = errors.New("bloom bit out of bounds") -) - -// Generator takes a number of bloom filters and generates the rotated bloom bits -// to be used for batched filtering. -type Generator struct { - blooms [types.BloomBitLength][]byte // Rotated blooms for per-bit matching - sections uint // Number of sections to batch together - nextSec uint // Next section to set when adding a bloom -} - -// NewGenerator creates a rotated bloom generator that can iteratively fill a -// batched bloom filter's bits. -func NewGenerator(sections uint) (*Generator, error) { - if sections%8 != 0 { - return nil, errors.New("section count not multiple of 8") - } - b := &Generator{sections: sections} - for i := 0; i < types.BloomBitLength; i++ { - b.blooms[i] = make([]byte, sections/8) - } - return b, nil -} - -// AddBloom takes a single bloom filter and sets the corresponding bit column -// in memory accordingly. -func (b *Generator) AddBloom(index uint, bloom types.Bloom) error { - // Make sure we're not adding more bloom filters than our capacity - if b.nextSec >= b.sections { - return errSectionOutOfBounds - } - if b.nextSec != index { - return errors.New("bloom filter with unexpected index") - } - // Rotate the bloom and insert into our collection - byteIndex := b.nextSec / 8 - bitIndex := byte(7 - b.nextSec%8) - for byt := 0; byt < types.BloomByteLength; byt++ { - bloomByte := bloom[types.BloomByteLength-1-byt] - if bloomByte == 0 { - continue - } - base := 8 * byt - b.blooms[base+7][byteIndex] |= ((bloomByte >> 7) & 1) << bitIndex - b.blooms[base+6][byteIndex] |= ((bloomByte >> 6) & 1) << bitIndex - b.blooms[base+5][byteIndex] |= ((bloomByte >> 5) & 1) << bitIndex - b.blooms[base+4][byteIndex] |= ((bloomByte >> 4) & 1) << bitIndex - b.blooms[base+3][byteIndex] |= ((bloomByte >> 3) & 1) << bitIndex - b.blooms[base+2][byteIndex] |= ((bloomByte >> 2) & 1) << bitIndex - b.blooms[base+1][byteIndex] |= ((bloomByte >> 1) & 1) << bitIndex - b.blooms[base][byteIndex] |= (bloomByte & 1) << bitIndex - } - b.nextSec++ - return nil -} - -// Bitset returns the bit vector belonging to the given bit index after all -// blooms have been added. -func (b *Generator) Bitset(idx uint) ([]byte, error) { - if b.nextSec != b.sections { - return nil, errors.New("bloom not fully generated yet") - } - if idx >= types.BloomBitLength { - return nil, errBloomBitOutOfBounds - } - return b.blooms[idx], nil -} diff --git a/core/bloombits/generator_test.go b/core/bloombits/generator_test.go deleted file mode 100644 index ac1aee0b25..0000000000 --- a/core/bloombits/generator_test.go +++ /dev/null @@ -1,100 +0,0 @@ -// Copyright 2017 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see . - -package bloombits - -import ( - "bytes" - crand "crypto/rand" - "math/rand" - "testing" - - "github.com/ethereum/go-ethereum/core/types" -) - -// Tests that batched bloom bits are correctly rotated from the input bloom -// filters. -func TestGenerator(t *testing.T) { - // Generate the input and the rotated output - var input, output [types.BloomBitLength][types.BloomByteLength]byte - - for i := 0; i < types.BloomBitLength; i++ { - for j := 0; j < types.BloomBitLength; j++ { - bit := byte(rand.Int() % 2) - - input[i][j/8] |= bit << byte(7-j%8) - output[types.BloomBitLength-1-j][i/8] |= bit << byte(7-i%8) - } - } - // Crunch the input through the generator and verify the result - gen, err := NewGenerator(types.BloomBitLength) - if err != nil { - t.Fatalf("failed to create bloombit generator: %v", err) - } - for i, bloom := range input { - if err := gen.AddBloom(uint(i), bloom); err != nil { - t.Fatalf("bloom %d: failed to add: %v", i, err) - } - } - for i, want := range output { - have, err := gen.Bitset(uint(i)) - if err != nil { - t.Fatalf("output %d: failed to retrieve bits: %v", i, err) - } - if !bytes.Equal(have, want[:]) { - t.Errorf("output %d: bit vector mismatch have %x, want %x", i, have, want) - } - } -} - -func BenchmarkGenerator(b *testing.B) { - var input [types.BloomBitLength][types.BloomByteLength]byte - b.Run("empty", func(b *testing.B) { - b.ReportAllocs() - b.ResetTimer() - for i := 0; i < b.N; i++ { - // Crunch the input through the generator and verify the result - gen, err := NewGenerator(types.BloomBitLength) - if err != nil { - b.Fatalf("failed to create bloombit generator: %v", err) - } - for j, bloom := range &input { - if err := gen.AddBloom(uint(j), bloom); err != nil { - b.Fatalf("bloom %d: failed to add: %v", i, err) - } - } - } - }) - for i := 0; i < types.BloomBitLength; i++ { - crand.Read(input[i][:]) - } - b.Run("random", func(b *testing.B) { - b.ReportAllocs() - b.ResetTimer() - for i := 0; i < b.N; i++ { - // Crunch the input through the generator and verify the result - gen, err := NewGenerator(types.BloomBitLength) - if err != nil { - b.Fatalf("failed to create bloombit generator: %v", err) - } - for j, bloom := range &input { - if err := gen.AddBloom(uint(j), bloom); err != nil { - b.Fatalf("bloom %d: failed to add: %v", i, err) - } - } - } - }) -} diff --git a/core/bloombits/matcher.go b/core/bloombits/matcher.go deleted file mode 100644 index 486581fe23..0000000000 --- a/core/bloombits/matcher.go +++ /dev/null @@ -1,649 +0,0 @@ -// Copyright 2017 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see . - -package bloombits - -import ( - "bytes" - "context" - "errors" - "math" - "sort" - "sync" - "sync/atomic" - "time" - - "github.com/ethereum/go-ethereum/common/bitutil" - "github.com/ethereum/go-ethereum/crypto" -) - -// bloomIndexes represents the bit indexes inside the bloom filter that belong -// to some key. -type bloomIndexes [3]uint - -// calcBloomIndexes returns the bloom filter bit indexes belonging to the given key. -func calcBloomIndexes(b []byte) bloomIndexes { - b = crypto.Keccak256(b) - - var idxs bloomIndexes - for i := 0; i < len(idxs); i++ { - idxs[i] = (uint(b[2*i])<<8)&2047 + uint(b[2*i+1]) - } - return idxs -} - -// partialMatches with a non-nil vector represents a section in which some sub- -// matchers have already found potential matches. Subsequent sub-matchers will -// binary AND their matches with this vector. If vector is nil, it represents a -// section to be processed by the first sub-matcher. -type partialMatches struct { - section uint64 - bitset []byte -} - -// Retrieval represents a request for retrieval task assignments for a given -// bit with the given number of fetch elements, or a response for such a request. -// It can also have the actual results set to be used as a delivery data struct. -// -// The context and error fields are used by the light client to terminate matching -// early if an error is encountered on some path of the pipeline. -type Retrieval struct { - Bit uint - Sections []uint64 - Bitsets [][]byte - - Context context.Context - Error error -} - -// Matcher is a pipelined system of schedulers and logic matchers which perform -// binary AND/OR operations on the bit-streams, creating a stream of potential -// blocks to inspect for data content. -type Matcher struct { - sectionSize uint64 // Size of the data batches to filter on - - filters [][]bloomIndexes // Filter the system is matching for - schedulers map[uint]*scheduler // Retrieval schedulers for loading bloom bits - - retrievers chan chan uint // Retriever processes waiting for bit allocations - counters chan chan uint // Retriever processes waiting for task count reports - retrievals chan chan *Retrieval // Retriever processes waiting for task allocations - deliveries chan *Retrieval // Retriever processes waiting for task response deliveries - - running atomic.Bool // Atomic flag whether a session is live or not -} - -// NewMatcher creates a new pipeline for retrieving bloom bit streams and doing -// address and topic filtering on them. Setting a filter component to `nil` is -// allowed and will result in that filter rule being skipped (OR 0x11...1). -func NewMatcher(sectionSize uint64, filters [][][]byte) *Matcher { - // Create the matcher instance - m := &Matcher{ - sectionSize: sectionSize, - schedulers: make(map[uint]*scheduler), - retrievers: make(chan chan uint), - counters: make(chan chan uint), - retrievals: make(chan chan *Retrieval), - deliveries: make(chan *Retrieval), - } - // Calculate the bloom bit indexes for the groups we're interested in - m.filters = nil - - for _, filter := range filters { - // Gather the bit indexes of the filter rule, special casing the nil filter - if len(filter) == 0 { - continue - } - bloomBits := make([]bloomIndexes, len(filter)) - for i, clause := range filter { - if clause == nil { - bloomBits = nil - break - } - bloomBits[i] = calcBloomIndexes(clause) - } - // Accumulate the filter rules if no nil rule was within - if bloomBits != nil { - m.filters = append(m.filters, bloomBits) - } - } - // For every bit, create a scheduler to load/download the bit vectors - for _, bloomIndexLists := range m.filters { - for _, bloomIndexList := range bloomIndexLists { - for _, bloomIndex := range bloomIndexList { - m.addScheduler(bloomIndex) - } - } - } - return m -} - -// addScheduler adds a bit stream retrieval scheduler for the given bit index if -// it has not existed before. If the bit is already selected for filtering, the -// existing scheduler can be used. -func (m *Matcher) addScheduler(idx uint) { - if _, ok := m.schedulers[idx]; ok { - return - } - m.schedulers[idx] = newScheduler(idx) -} - -// Start starts the matching process and returns a stream of bloom matches in -// a given range of blocks. If there are no more matches in the range, the result -// channel is closed. -func (m *Matcher) Start(ctx context.Context, begin, end uint64, results chan uint64) (*MatcherSession, error) { - // Make sure we're not creating concurrent sessions - if m.running.Swap(true) { - return nil, errors.New("matcher already running") - } - defer m.running.Store(false) - - // Initiate a new matching round - session := &MatcherSession{ - matcher: m, - quit: make(chan struct{}), - ctx: ctx, - } - for _, scheduler := range m.schedulers { - scheduler.reset() - } - sink := m.run(begin, end, cap(results), session) - - // Read the output from the result sink and deliver to the user - session.pend.Add(1) - go func() { - defer session.pend.Done() - defer close(results) - - for { - select { - case <-session.quit: - return - - case res, ok := <-sink: - // New match result found - if !ok { - return - } - // Calculate the first and last blocks of the section - sectionStart := res.section * m.sectionSize - - first := sectionStart - if begin > first { - first = begin - } - last := sectionStart + m.sectionSize - 1 - if end < last { - last = end - } - // Iterate over all the blocks in the section and return the matching ones - for i := first; i <= last; i++ { - // Skip the entire byte if no matches are found inside (and we're processing an entire byte!) - next := res.bitset[(i-sectionStart)/8] - if next == 0 { - if i%8 == 0 { - i += 7 - } - continue - } - // Some bit it set, do the actual submatching - if bit := 7 - i%8; next&(1<= req.section }) - requests[req.bit] = append(queue[:index], append([]uint64{req.section}, queue[index:]...)...) - - // If it's a new bit and we have waiting fetchers, allocate to them - if len(queue) == 0 { - assign(req.bit) - } - - case fetcher := <-retrievers: - // New retriever arrived, find the lowest section-ed bit to assign - bit, best := uint(0), uint64(math.MaxUint64) - for idx := range unallocs { - if requests[idx][0] < best { - bit, best = idx, requests[idx][0] - } - } - // Stop tracking this bit (and alloc notifications if no more work is available) - delete(unallocs, bit) - if len(unallocs) == 0 { - retrievers = nil - } - allocs++ - fetcher <- bit - - case fetcher := <-m.counters: - // New task count request arrives, return number of items - fetcher <- uint(len(requests[<-fetcher])) - - case fetcher := <-m.retrievals: - // New fetcher waiting for tasks to retrieve, assign - task := <-fetcher - if want := len(task.Sections); want >= len(requests[task.Bit]) { - task.Sections = requests[task.Bit] - delete(requests, task.Bit) - } else { - task.Sections = append(task.Sections[:0], requests[task.Bit][:want]...) - requests[task.Bit] = append(requests[task.Bit][:0], requests[task.Bit][want:]...) - } - fetcher <- task - - // If anything was left unallocated, try to assign to someone else - if len(requests[task.Bit]) > 0 { - assign(task.Bit) - } - - case result := <-m.deliveries: - // New retrieval task response from fetcher, split out missing sections and - // deliver complete ones - var ( - sections = make([]uint64, 0, len(result.Sections)) - bitsets = make([][]byte, 0, len(result.Bitsets)) - missing = make([]uint64, 0, len(result.Sections)) - ) - for i, bitset := range result.Bitsets { - if len(bitset) == 0 { - missing = append(missing, result.Sections[i]) - continue - } - sections = append(sections, result.Sections[i]) - bitsets = append(bitsets, bitset) - } - m.schedulers[result.Bit].deliver(sections, bitsets) - allocs-- - - // Reschedule missing sections and allocate bit if newly available - if len(missing) > 0 { - queue := requests[result.Bit] - for _, section := range missing { - index := sort.Search(len(queue), func(i int) bool { return queue[i] >= section }) - queue = append(queue[:index], append([]uint64{section}, queue[index:]...)...) - } - requests[result.Bit] = queue - - if len(queue) == len(missing) { - assign(result.Bit) - } - } - - // End the session when all pending deliveries have arrived. - if shutdown == nil && allocs == 0 { - return - } - } - } -} - -// MatcherSession is returned by a started matcher to be used as a terminator -// for the actively running matching operation. -type MatcherSession struct { - matcher *Matcher - - closer sync.Once // Sync object to ensure we only ever close once - quit chan struct{} // Quit channel to request pipeline termination - - ctx context.Context // Context used by the light client to abort filtering - err error // Global error to track retrieval failures deep in the chain - errLock sync.Mutex - - pend sync.WaitGroup -} - -// Close stops the matching process and waits for all subprocesses to terminate -// before returning. The timeout may be used for graceful shutdown, allowing the -// currently running retrievals to complete before this time. -func (s *MatcherSession) Close() { - s.closer.Do(func() { - // Signal termination and wait for all goroutines to tear down - close(s.quit) - s.pend.Wait() - }) -} - -// Error returns any failure encountered during the matching session. -func (s *MatcherSession) Error() error { - s.errLock.Lock() - defer s.errLock.Unlock() - - return s.err -} - -// allocateRetrieval assigns a bloom bit index to a client process that can either -// immediately request and fetch the section contents assigned to this bit or wait -// a little while for more sections to be requested. -func (s *MatcherSession) allocateRetrieval() (uint, bool) { - fetcher := make(chan uint) - - select { - case <-s.quit: - return 0, false - case s.matcher.retrievers <- fetcher: - bit, ok := <-fetcher - return bit, ok - } -} - -// pendingSections returns the number of pending section retrievals belonging to -// the given bloom bit index. -func (s *MatcherSession) pendingSections(bit uint) int { - fetcher := make(chan uint) - - select { - case <-s.quit: - return 0 - case s.matcher.counters <- fetcher: - fetcher <- bit - return int(<-fetcher) - } -} - -// allocateSections assigns all or part of an already allocated bit-task queue -// to the requesting process. -func (s *MatcherSession) allocateSections(bit uint, count int) []uint64 { - fetcher := make(chan *Retrieval) - - select { - case <-s.quit: - return nil - case s.matcher.retrievals <- fetcher: - task := &Retrieval{ - Bit: bit, - Sections: make([]uint64, count), - } - fetcher <- task - return (<-fetcher).Sections - } -} - -// deliverSections delivers a batch of section bit-vectors for a specific bloom -// bit index to be injected into the processing pipeline. -func (s *MatcherSession) deliverSections(bit uint, sections []uint64, bitsets [][]byte) { - s.matcher.deliveries <- &Retrieval{Bit: bit, Sections: sections, Bitsets: bitsets} -} - -// Multiplex polls the matcher session for retrieval tasks and multiplexes it into -// the requested retrieval queue to be serviced together with other sessions. -// -// This method will block for the lifetime of the session. Even after termination -// of the session, any request in-flight need to be responded to! Empty responses -// are fine though in that case. -func (s *MatcherSession) Multiplex(batch int, wait time.Duration, mux chan chan *Retrieval) { - waitTimer := time.NewTimer(wait) - defer waitTimer.Stop() - - for { - // Allocate a new bloom bit index to retrieve data for, stopping when done - bit, ok := s.allocateRetrieval() - if !ok { - return - } - // Bit allocated, throttle a bit if we're below our batch limit - if s.pendingSections(bit) < batch { - waitTimer.Reset(wait) - select { - case <-s.quit: - // Session terminating, we can't meaningfully service, abort - s.allocateSections(bit, 0) - s.deliverSections(bit, []uint64{}, [][]byte{}) - return - - case <-waitTimer.C: - // Throttling up, fetch whatever is available - } - } - // Allocate as much as we can handle and request servicing - sections := s.allocateSections(bit, batch) - request := make(chan *Retrieval) - - select { - case <-s.quit: - // Session terminating, we can't meaningfully service, abort - s.deliverSections(bit, sections, make([][]byte, len(sections))) - return - - case mux <- request: - // Retrieval accepted, something must arrive before we're aborting - request <- &Retrieval{Bit: bit, Sections: sections, Context: s.ctx} - - result := <-request - - // Deliver a result before s.Close() to avoid a deadlock - s.deliverSections(result.Bit, result.Sections, result.Bitsets) - - if result.Error != nil { - s.errLock.Lock() - s.err = result.Error - s.errLock.Unlock() - s.Close() - } - } - } -} diff --git a/core/bloombits/matcher_test.go b/core/bloombits/matcher_test.go deleted file mode 100644 index 7f3d5f279c..0000000000 --- a/core/bloombits/matcher_test.go +++ /dev/null @@ -1,292 +0,0 @@ -// Copyright 2017 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see . - -package bloombits - -import ( - "context" - "math/rand" - "sync/atomic" - "testing" - "time" - - "github.com/ethereum/go-ethereum/common" -) - -const testSectionSize = 4096 - -// Tests that wildcard filter rules (nil) can be specified and are handled well. -func TestMatcherWildcards(t *testing.T) { - t.Parallel() - matcher := NewMatcher(testSectionSize, [][][]byte{ - {common.Address{}.Bytes(), common.Address{0x01}.Bytes()}, // Default address is not a wildcard - {common.Hash{}.Bytes(), common.Hash{0x01}.Bytes()}, // Default hash is not a wildcard - {common.Hash{0x01}.Bytes()}, // Plain rule, sanity check - {common.Hash{0x01}.Bytes(), nil}, // Wildcard suffix, drop rule - {nil, common.Hash{0x01}.Bytes()}, // Wildcard prefix, drop rule - {nil, nil}, // Wildcard combo, drop rule - {}, // Inited wildcard rule, drop rule - nil, // Proper wildcard rule, drop rule - }) - if len(matcher.filters) != 3 { - t.Fatalf("filter system size mismatch: have %d, want %d", len(matcher.filters), 3) - } - if len(matcher.filters[0]) != 2 { - t.Fatalf("address clause size mismatch: have %d, want %d", len(matcher.filters[0]), 2) - } - if len(matcher.filters[1]) != 2 { - t.Fatalf("combo topic clause size mismatch: have %d, want %d", len(matcher.filters[1]), 2) - } - if len(matcher.filters[2]) != 1 { - t.Fatalf("singletone topic clause size mismatch: have %d, want %d", len(matcher.filters[2]), 1) - } -} - -// Tests the matcher pipeline on a single continuous workflow without interrupts. -func TestMatcherContinuous(t *testing.T) { - t.Parallel() - testMatcherDiffBatches(t, [][]bloomIndexes{{{10, 20, 30}}}, 0, 100000, false, 75) - testMatcherDiffBatches(t, [][]bloomIndexes{{{32, 3125, 100}}, {{40, 50, 10}}}, 0, 100000, false, 81) - testMatcherDiffBatches(t, [][]bloomIndexes{{{4, 8, 11}, {7, 8, 17}}, {{9, 9, 12}, {15, 20, 13}}, {{18, 15, 15}, {12, 10, 4}}}, 0, 10000, false, 36) -} - -// Tests the matcher pipeline on a constantly interrupted and resumed work pattern -// with the aim of ensuring data items are requested only once. -func TestMatcherIntermittent(t *testing.T) { - t.Parallel() - testMatcherDiffBatches(t, [][]bloomIndexes{{{10, 20, 30}}}, 0, 100000, true, 75) - testMatcherDiffBatches(t, [][]bloomIndexes{{{32, 3125, 100}}, {{40, 50, 10}}}, 0, 100000, true, 81) - testMatcherDiffBatches(t, [][]bloomIndexes{{{4, 8, 11}, {7, 8, 17}}, {{9, 9, 12}, {15, 20, 13}}, {{18, 15, 15}, {12, 10, 4}}}, 0, 10000, true, 36) -} - -// Tests the matcher pipeline on random input to hopefully catch anomalies. -func TestMatcherRandom(t *testing.T) { - t.Parallel() - for i := 0; i < 10; i++ { - testMatcherBothModes(t, makeRandomIndexes([]int{1}, 50), 0, 10000, 0) - testMatcherBothModes(t, makeRandomIndexes([]int{3}, 50), 0, 10000, 0) - testMatcherBothModes(t, makeRandomIndexes([]int{2, 2, 2}, 20), 0, 10000, 0) - testMatcherBothModes(t, makeRandomIndexes([]int{5, 5, 5}, 50), 0, 10000, 0) - testMatcherBothModes(t, makeRandomIndexes([]int{4, 4, 4}, 20), 0, 10000, 0) - } -} - -// Tests that the matcher can properly find matches if the starting block is -// shifted from a multiple of 8. This is needed to cover an optimisation with -// bitset matching https://github.com/ethereum/go-ethereum/issues/15309. -func TestMatcherShifted(t *testing.T) { - t.Parallel() - // Block 0 always matches in the tests, skip ahead of first 8 blocks with the - // start to get a potential zero byte in the matcher bitset. - - // To keep the second bitset byte zero, the filter must only match for the first - // time in block 16, so doing an all-16 bit filter should suffice. - - // To keep the starting block non divisible by 8, block number 9 is the first - // that would introduce a shift and not match block 0. - testMatcherBothModes(t, [][]bloomIndexes{{{16, 16, 16}}}, 9, 64, 0) -} - -// Tests that matching on everything doesn't crash (special case internally). -func TestWildcardMatcher(t *testing.T) { - t.Parallel() - testMatcherBothModes(t, nil, 0, 10000, 0) -} - -// makeRandomIndexes generates a random filter system, composed of multiple filter -// criteria, each having one bloom list component for the address and arbitrarily -// many topic bloom list components. -func makeRandomIndexes(lengths []int, max int) [][]bloomIndexes { - res := make([][]bloomIndexes, len(lengths)) - for i, topics := range lengths { - res[i] = make([]bloomIndexes, topics) - for j := 0; j < topics; j++ { - for k := 0; k < len(res[i][j]); k++ { - res[i][j][k] = uint(rand.Intn(max-1) + 2) - } - } - } - return res -} - -// testMatcherDiffBatches runs the given matches test in single-delivery and also -// in batches delivery mode, verifying that all kinds of deliveries are handled -// correctly within. -func testMatcherDiffBatches(t *testing.T, filter [][]bloomIndexes, start, blocks uint64, intermittent bool, retrievals uint32) { - singleton := testMatcher(t, filter, start, blocks, intermittent, retrievals, 1) - batched := testMatcher(t, filter, start, blocks, intermittent, retrievals, 16) - - if singleton != batched { - t.Errorf("filter = %v blocks = %v intermittent = %v: request count mismatch, %v in singleton vs. %v in batched mode", filter, blocks, intermittent, singleton, batched) - } -} - -// testMatcherBothModes runs the given matcher test in both continuous as well as -// in intermittent mode, verifying that the request counts match each other. -func testMatcherBothModes(t *testing.T, filter [][]bloomIndexes, start, blocks uint64, retrievals uint32) { - continuous := testMatcher(t, filter, start, blocks, false, retrievals, 16) - intermittent := testMatcher(t, filter, start, blocks, true, retrievals, 16) - - if continuous != intermittent { - t.Errorf("filter = %v blocks = %v: request count mismatch, %v in continuous vs. %v in intermittent mode", filter, blocks, continuous, intermittent) - } -} - -// testMatcher is a generic tester to run the given matcher test and return the -// number of requests made for cross validation between different modes. -func testMatcher(t *testing.T, filter [][]bloomIndexes, start, blocks uint64, intermittent bool, retrievals uint32, maxReqCount int) uint32 { - // Create a new matcher an simulate our explicit random bitsets - matcher := NewMatcher(testSectionSize, nil) - matcher.filters = filter - - for _, rule := range filter { - for _, topic := range rule { - for _, bit := range topic { - matcher.addScheduler(bit) - } - } - } - // Track the number of retrieval requests made - var requested atomic.Uint32 - - // Start the matching session for the filter and the retriever goroutines - quit := make(chan struct{}) - matches := make(chan uint64, 16) - - session, err := matcher.Start(context.Background(), start, blocks-1, matches) - if err != nil { - t.Fatalf("failed to stat matcher session: %v", err) - } - startRetrievers(session, quit, &requested, maxReqCount) - - // Iterate over all the blocks and verify that the pipeline produces the correct matches - for i := start; i < blocks; i++ { - if expMatch3(filter, i) { - match, ok := <-matches - if !ok { - t.Errorf("filter = %v blocks = %v intermittent = %v: expected #%v, results channel closed", filter, blocks, intermittent, i) - return 0 - } - if match != i { - t.Errorf("filter = %v blocks = %v intermittent = %v: expected #%v, got #%v", filter, blocks, intermittent, i, match) - } - // If we're testing intermittent mode, abort and restart the pipeline - if intermittent { - session.Close() - close(quit) - - quit = make(chan struct{}) - matches = make(chan uint64, 16) - - session, err = matcher.Start(context.Background(), i+1, blocks-1, matches) - if err != nil { - t.Fatalf("failed to stat matcher session: %v", err) - } - startRetrievers(session, quit, &requested, maxReqCount) - } - } - } - // Ensure the result channel is torn down after the last block - match, ok := <-matches - if ok { - t.Errorf("filter = %v blocks = %v intermittent = %v: expected closed channel, got #%v", filter, blocks, intermittent, match) - } - // Clean up the session and ensure we match the expected retrieval count - session.Close() - close(quit) - - if retrievals != 0 && requested.Load() != retrievals { - t.Errorf("filter = %v blocks = %v intermittent = %v: request count mismatch, have #%v, want #%v", filter, blocks, intermittent, requested.Load(), retrievals) - } - return requested.Load() -} - -// startRetrievers starts a batch of goroutines listening for section requests -// and serving them. -func startRetrievers(session *MatcherSession, quit chan struct{}, retrievals *atomic.Uint32, batch int) { - requests := make(chan chan *Retrieval) - - for i := 0; i < 10; i++ { - // Start a multiplexer to test multiple threaded execution - go session.Multiplex(batch, 100*time.Microsecond, requests) - - // Start a services to match the above multiplexer - go func() { - for { - // Wait for a service request or a shutdown - select { - case <-quit: - return - - case request := <-requests: - task := <-request - - task.Bitsets = make([][]byte, len(task.Sections)) - for i, section := range task.Sections { - if rand.Int()%4 != 0 { // Handle occasional missing deliveries - task.Bitsets[i] = generateBitset(task.Bit, section) - retrievals.Add(1) - } - } - request <- task - } - } - }() - } -} - -// generateBitset generates the rotated bitset for the given bloom bit and section -// numbers. -func generateBitset(bit uint, section uint64) []byte { - bitset := make([]byte, testSectionSize/8) - for i := 0; i < len(bitset); i++ { - for b := 0; b < 8; b++ { - blockIdx := section*testSectionSize + uint64(i*8+b) - bitset[i] += bitset[i] - if (blockIdx % uint64(bit)) == 0 { - bitset[i]++ - } - } - } - return bitset -} - -func expMatch1(filter bloomIndexes, i uint64) bool { - for _, ii := range filter { - if (i % uint64(ii)) != 0 { - return false - } - } - return true -} - -func expMatch2(filter []bloomIndexes, i uint64) bool { - for _, ii := range filter { - if expMatch1(ii, i) { - return true - } - } - return false -} - -func expMatch3(filter [][]bloomIndexes, i uint64) bool { - for _, ii := range filter { - if !expMatch2(ii, i) { - return false - } - } - return true -} diff --git a/core/bloombits/scheduler.go b/core/bloombits/scheduler.go deleted file mode 100644 index a523bc55ab..0000000000 --- a/core/bloombits/scheduler.go +++ /dev/null @@ -1,181 +0,0 @@ -// Copyright 2017 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see . - -package bloombits - -import ( - "sync" -) - -// request represents a bloom retrieval task to prioritize and pull from the local -// database or remotely from the network. -type request struct { - section uint64 // Section index to retrieve the bit-vector from - bit uint // Bit index within the section to retrieve the vector of -} - -// response represents the state of a requested bit-vector through a scheduler. -type response struct { - cached []byte // Cached bits to dedup multiple requests - done chan struct{} // Channel to allow waiting for completion -} - -// scheduler handles the scheduling of bloom-filter retrieval operations for -// entire section-batches belonging to a single bloom bit. Beside scheduling the -// retrieval operations, this struct also deduplicates the requests and caches -// the results to minimize network/database overhead even in complex filtering -// scenarios. -type scheduler struct { - bit uint // Index of the bit in the bloom filter this scheduler is responsible for - responses map[uint64]*response // Currently pending retrieval requests or already cached responses - lock sync.Mutex // Lock protecting the responses from concurrent access -} - -// newScheduler creates a new bloom-filter retrieval scheduler for a specific -// bit index. -func newScheduler(idx uint) *scheduler { - return &scheduler{ - bit: idx, - responses: make(map[uint64]*response), - } -} - -// run creates a retrieval pipeline, receiving section indexes from sections and -// returning the results in the same order through the done channel. Concurrent -// runs of the same scheduler are allowed, leading to retrieval task deduplication. -func (s *scheduler) run(sections chan uint64, dist chan *request, done chan []byte, quit chan struct{}, wg *sync.WaitGroup) { - // Create a forwarder channel between requests and responses of the same size as - // the distribution channel (since that will block the pipeline anyway). - pend := make(chan uint64, cap(dist)) - - // Start the pipeline schedulers to forward between user -> distributor -> user - wg.Add(2) - go s.scheduleRequests(sections, dist, pend, quit, wg) - go s.scheduleDeliveries(pend, done, quit, wg) -} - -// reset cleans up any leftovers from previous runs. This is required before a -// restart to ensure the no previously requested but never delivered state will -// cause a lockup. -func (s *scheduler) reset() { - s.lock.Lock() - defer s.lock.Unlock() - - for section, res := range s.responses { - if res.cached == nil { - delete(s.responses, section) - } - } -} - -// scheduleRequests reads section retrieval requests from the input channel, -// deduplicates the stream and pushes unique retrieval tasks into the distribution -// channel for a database or network layer to honour. -func (s *scheduler) scheduleRequests(reqs chan uint64, dist chan *request, pend chan uint64, quit chan struct{}, wg *sync.WaitGroup) { - // Clean up the goroutine and pipeline when done - defer wg.Done() - defer close(pend) - - // Keep reading and scheduling section requests - for { - select { - case <-quit: - return - - case section, ok := <-reqs: - // New section retrieval requested - if !ok { - return - } - // Deduplicate retrieval requests - unique := false - - s.lock.Lock() - if s.responses[section] == nil { - s.responses[section] = &response{ - done: make(chan struct{}), - } - unique = true - } - s.lock.Unlock() - - // Schedule the section for retrieval and notify the deliverer to expect this section - if unique { - select { - case <-quit: - return - case dist <- &request{bit: s.bit, section: section}: - } - } - select { - case <-quit: - return - case pend <- section: - } - } - } -} - -// scheduleDeliveries reads section acceptance notifications and waits for them -// to be delivered, pushing them into the output data buffer. -func (s *scheduler) scheduleDeliveries(pend chan uint64, done chan []byte, quit chan struct{}, wg *sync.WaitGroup) { - // Clean up the goroutine and pipeline when done - defer wg.Done() - defer close(done) - - // Keep reading notifications and scheduling deliveries - for { - select { - case <-quit: - return - - case idx, ok := <-pend: - // New section retrieval pending - if !ok { - return - } - // Wait until the request is honoured - s.lock.Lock() - res := s.responses[idx] - s.lock.Unlock() - - select { - case <-quit: - return - case <-res.done: - } - // Deliver the result - select { - case <-quit: - return - case done <- res.cached: - } - } - } -} - -// deliver is called by the request distributor when a reply to a request arrives. -func (s *scheduler) deliver(sections []uint64, data [][]byte) { - s.lock.Lock() - defer s.lock.Unlock() - - for i, section := range sections { - if res := s.responses[section]; res != nil && res.cached == nil { // Avoid non-requests and double deliveries - res.cached = data[i] - close(res.done) - } - } -} diff --git a/core/bloombits/scheduler_test.go b/core/bloombits/scheduler_test.go deleted file mode 100644 index dcaaa91525..0000000000 --- a/core/bloombits/scheduler_test.go +++ /dev/null @@ -1,103 +0,0 @@ -// Copyright 2017 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see . - -package bloombits - -import ( - "bytes" - "math/big" - "sync" - "sync/atomic" - "testing" -) - -// Tests that the scheduler can deduplicate and forward retrieval requests to -// underlying fetchers and serve responses back, irrelevant of the concurrency -// of the requesting clients or serving data fetchers. -func TestSchedulerSingleClientSingleFetcher(t *testing.T) { testScheduler(t, 1, 1, 5000) } -func TestSchedulerSingleClientMultiFetcher(t *testing.T) { testScheduler(t, 1, 10, 5000) } -func TestSchedulerMultiClientSingleFetcher(t *testing.T) { testScheduler(t, 10, 1, 5000) } -func TestSchedulerMultiClientMultiFetcher(t *testing.T) { testScheduler(t, 10, 10, 5000) } - -func testScheduler(t *testing.T, clients int, fetchers int, requests int) { - t.Parallel() - f := newScheduler(0) - - // Create a batch of handler goroutines that respond to bloom bit requests and - // deliver them to the scheduler. - var fetchPend sync.WaitGroup - fetchPend.Add(fetchers) - defer fetchPend.Wait() - - fetch := make(chan *request, 16) - defer close(fetch) - - var delivered atomic.Uint32 - for i := 0; i < fetchers; i++ { - go func() { - defer fetchPend.Done() - - for req := range fetch { - delivered.Add(1) - - f.deliver([]uint64{ - req.section + uint64(requests), // Non-requested data (ensure it doesn't go out of bounds) - req.section, // Requested data - req.section, // Duplicated data (ensure it doesn't double close anything) - }, [][]byte{ - {}, - new(big.Int).SetUint64(req.section).Bytes(), - new(big.Int).SetUint64(req.section).Bytes(), - }) - } - }() - } - // Start a batch of goroutines to concurrently run scheduling tasks - quit := make(chan struct{}) - - var pend sync.WaitGroup - pend.Add(clients) - - for i := 0; i < clients; i++ { - go func() { - defer pend.Done() - - in := make(chan uint64, 16) - out := make(chan []byte, 16) - - f.run(in, fetch, out, quit, &pend) - - go func() { - for j := 0; j < requests; j++ { - in <- uint64(j) - } - close(in) - }() - b := new(big.Int) - for j := 0; j < requests; j++ { - bits := <-out - if want := b.SetUint64(uint64(j)).Bytes(); !bytes.Equal(bits, want) { - t.Errorf("vector %d: delivered content mismatch: have %x, want %x", j, bits, want) - } - } - }() - } - pend.Wait() - - if have := delivered.Load(); int(have) != requests { - t.Errorf("request count mismatch: have %v, want %v", have, requests) - } -} diff --git a/core/chain_indexer.go b/core/chain_indexer.go deleted file mode 100644 index 2865daa1ff..0000000000 --- a/core/chain_indexer.go +++ /dev/null @@ -1,522 +0,0 @@ -// Copyright 2017 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see . - -package core - -import ( - "context" - "encoding/binary" - "errors" - "fmt" - "sync" - "sync/atomic" - "time" - - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/rawdb" - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/ethdb" - "github.com/ethereum/go-ethereum/event" - "github.com/ethereum/go-ethereum/log" -) - -// ChainIndexerBackend defines the methods needed to process chain segments in -// the background and write the segment results into the database. These can be -// used to create filter blooms or CHTs. -type ChainIndexerBackend interface { - // Reset initiates the processing of a new chain segment, potentially terminating - // any partially completed operations (in case of a reorg). - Reset(ctx context.Context, section uint64, prevHead common.Hash) error - - // Process crunches through the next header in the chain segment. The caller - // will ensure a sequential order of headers. - Process(ctx context.Context, header *types.Header) error - - // Commit finalizes the section metadata and stores it into the database. - Commit() error - - // Prune deletes the chain index older than the given threshold. - Prune(threshold uint64) error -} - -// ChainIndexerChain interface is used for connecting the indexer to a blockchain -type ChainIndexerChain interface { - // CurrentHeader retrieves the latest locally known header. - CurrentHeader() *types.Header - - // SubscribeChainHeadEvent subscribes to new head header notifications. - SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Subscription -} - -// ChainIndexer does a post-processing job for equally sized sections of the -// canonical chain (like BlooomBits and CHT structures). A ChainIndexer is -// connected to the blockchain through the event system by starting a -// ChainHeadEventLoop in a goroutine. -// -// Further child ChainIndexers can be added which use the output of the parent -// section indexer. These child indexers receive new head notifications only -// after an entire section has been finished or in case of rollbacks that might -// affect already finished sections. -type ChainIndexer struct { - chainDb ethdb.Database // Chain database to index the data from - indexDb ethdb.Database // Prefixed table-view of the db to write index metadata into - backend ChainIndexerBackend // Background processor generating the index data content - children []*ChainIndexer // Child indexers to cascade chain updates to - - active atomic.Bool // Flag whether the event loop was started - update chan struct{} // Notification channel that headers should be processed - quit chan chan error // Quit channel to tear down running goroutines - ctx context.Context - ctxCancel func() - - sectionSize uint64 // Number of blocks in a single chain segment to process - confirmsReq uint64 // Number of confirmations before processing a completed segment - - storedSections uint64 // Number of sections successfully indexed into the database - knownSections uint64 // Number of sections known to be complete (block wise) - cascadedHead uint64 // Block number of the last completed section cascaded to subindexers - - checkpointSections uint64 // Number of sections covered by the checkpoint - checkpointHead common.Hash // Section head belonging to the checkpoint - - throttling time.Duration // Disk throttling to prevent a heavy upgrade from hogging resources - - log log.Logger - lock sync.Mutex -} - -// NewChainIndexer creates a new chain indexer to do background processing on -// chain segments of a given size after certain number of confirmations passed. -// The throttling parameter might be used to prevent database thrashing. -func NewChainIndexer(chainDb ethdb.Database, indexDb ethdb.Database, backend ChainIndexerBackend, section, confirm uint64, throttling time.Duration, kind string) *ChainIndexer { - c := &ChainIndexer{ - chainDb: chainDb, - indexDb: indexDb, - backend: backend, - update: make(chan struct{}, 1), - quit: make(chan chan error), - sectionSize: section, - confirmsReq: confirm, - throttling: throttling, - log: log.New("type", kind), - } - // Initialize database dependent fields and start the updater - c.loadValidSections() - c.ctx, c.ctxCancel = context.WithCancel(context.Background()) - - go c.updateLoop() - - return c -} - -// AddCheckpoint adds a checkpoint. Sections are never processed and the chain -// is not expected to be available before this point. The indexer assumes that -// the backend has sufficient information available to process subsequent sections. -// -// Note: knownSections == 0 and storedSections == checkpointSections until -// syncing reaches the checkpoint -func (c *ChainIndexer) AddCheckpoint(section uint64, shead common.Hash) { - c.lock.Lock() - defer c.lock.Unlock() - - // Short circuit if the given checkpoint is below than local's. - if c.checkpointSections >= section+1 || section < c.storedSections { - return - } - c.checkpointSections = section + 1 - c.checkpointHead = shead - - c.setSectionHead(section, shead) - c.setValidSections(section + 1) -} - -// Start creates a goroutine to feed chain head events into the indexer for -// cascading background processing. Children do not need to be started, they -// are notified about new events by their parents. -func (c *ChainIndexer) Start(chain ChainIndexerChain) { - events := make(chan ChainHeadEvent, 10) - sub := chain.SubscribeChainHeadEvent(events) - - go c.eventLoop(chain.CurrentHeader(), events, sub) -} - -// Close tears down all goroutines belonging to the indexer and returns any error -// that might have occurred internally. -func (c *ChainIndexer) Close() error { - var errs []error - - c.ctxCancel() - - // Tear down the primary update loop - errc := make(chan error) - c.quit <- errc - if err := <-errc; err != nil { - errs = append(errs, err) - } - // If needed, tear down the secondary event loop - if c.active.Load() { - c.quit <- errc - if err := <-errc; err != nil { - errs = append(errs, err) - } - } - // Close all children - for _, child := range c.children { - if err := child.Close(); err != nil { - errs = append(errs, err) - } - } - // Return any failures - switch { - case len(errs) == 0: - return nil - - case len(errs) == 1: - return errs[0] - - default: - return fmt.Errorf("%v", errs) - } -} - -// eventLoop is a secondary - optional - event loop of the indexer which is only -// started for the outermost indexer to push chain head events into a processing -// queue. -func (c *ChainIndexer) eventLoop(currentHeader *types.Header, events chan ChainHeadEvent, sub event.Subscription) { - // Mark the chain indexer as active, requiring an additional teardown - c.active.Store(true) - - defer sub.Unsubscribe() - - // Fire the initial new head event to start any outstanding processing - c.newHead(currentHeader.Number.Uint64(), false) - - var ( - prevHeader = currentHeader - prevHash = currentHeader.Hash() - ) - for { - select { - case errc := <-c.quit: - // Chain indexer terminating, report no failure and abort - errc <- nil - return - - case ev, ok := <-events: - // Received a new event, ensure it's not nil (closing) and update - if !ok { - errc := <-c.quit - errc <- nil - return - } - if ev.Header.ParentHash != prevHash { - // Reorg to the common ancestor if needed (might not exist in light sync mode, skip reorg then) - // TODO(karalabe, zsfelfoldi): This seems a bit brittle, can we detect this case explicitly? - - if rawdb.ReadCanonicalHash(c.chainDb, prevHeader.Number.Uint64()) != prevHash { - if h := rawdb.FindCommonAncestor(c.chainDb, prevHeader, ev.Header); h != nil { - c.newHead(h.Number.Uint64(), true) - } - } - } - c.newHead(ev.Header.Number.Uint64(), false) - - prevHeader, prevHash = ev.Header, ev.Header.Hash() - } - } -} - -// newHead notifies the indexer about new chain heads and/or reorgs. -func (c *ChainIndexer) newHead(head uint64, reorg bool) { - c.lock.Lock() - defer c.lock.Unlock() - - // If a reorg happened, invalidate all sections until that point - if reorg { - // Revert the known section number to the reorg point - known := (head + 1) / c.sectionSize - stored := known - if known < c.checkpointSections { - known = 0 - } - if stored < c.checkpointSections { - stored = c.checkpointSections - } - if known < c.knownSections { - c.knownSections = known - } - // Revert the stored sections from the database to the reorg point - if stored < c.storedSections { - c.setValidSections(stored) - } - // Update the new head number to the finalized section end and notify children - head = known * c.sectionSize - - if head < c.cascadedHead { - c.cascadedHead = head - for _, child := range c.children { - child.newHead(c.cascadedHead, true) - } - } - return - } - // No reorg, calculate the number of newly known sections and update if high enough - var sections uint64 - if head >= c.confirmsReq { - sections = (head + 1 - c.confirmsReq) / c.sectionSize - if sections < c.checkpointSections { - sections = 0 - } - if sections > c.knownSections { - if c.knownSections < c.checkpointSections { - // syncing reached the checkpoint, verify section head - syncedHead := rawdb.ReadCanonicalHash(c.chainDb, c.checkpointSections*c.sectionSize-1) - if syncedHead != c.checkpointHead { - c.log.Error("Synced chain does not match checkpoint", "number", c.checkpointSections*c.sectionSize-1, "expected", c.checkpointHead, "synced", syncedHead) - return - } - } - c.knownSections = sections - - select { - case c.update <- struct{}{}: - default: - } - } - } -} - -// updateLoop is the main event loop of the indexer which pushes chain segments -// down into the processing backend. -func (c *ChainIndexer) updateLoop() { - var ( - updating bool - updated time.Time - ) - - for { - select { - case errc := <-c.quit: - // Chain indexer terminating, report no failure and abort - errc <- nil - return - - case <-c.update: - // Section headers completed (or rolled back), update the index - c.lock.Lock() - if c.knownSections > c.storedSections { - // Periodically print an upgrade log message to the user - if time.Since(updated) > 8*time.Second { - if c.knownSections > c.storedSections+1 { - updating = true - c.log.Info("Upgrading chain index", "percentage", c.storedSections*100/c.knownSections) - } - updated = time.Now() - } - // Cache the current section count and head to allow unlocking the mutex - c.verifyLastHead() - section := c.storedSections - var oldHead common.Hash - if section > 0 { - oldHead = c.SectionHead(section - 1) - } - // Process the newly defined section in the background - c.lock.Unlock() - newHead, err := c.processSection(section, oldHead) - if err != nil { - select { - case <-c.ctx.Done(): - <-c.quit <- nil - return - default: - } - c.log.Error("Section processing failed", "error", err) - } - c.lock.Lock() - - // If processing succeeded and no reorgs occurred, mark the section completed - if err == nil && (section == 0 || oldHead == c.SectionHead(section-1)) { - c.setSectionHead(section, newHead) - c.setValidSections(section + 1) - if c.storedSections == c.knownSections && updating { - updating = false - c.log.Info("Finished upgrading chain index") - } - c.cascadedHead = c.storedSections*c.sectionSize - 1 - for _, child := range c.children { - c.log.Trace("Cascading chain index update", "head", c.cascadedHead) - child.newHead(c.cascadedHead, false) - } - } else { - // If processing failed, don't retry until further notification - c.log.Debug("Chain index processing failed", "section", section, "err", err) - c.verifyLastHead() - c.knownSections = c.storedSections - } - } - // If there are still further sections to process, reschedule - if c.knownSections > c.storedSections { - time.AfterFunc(c.throttling, func() { - select { - case c.update <- struct{}{}: - default: - } - }) - } - c.lock.Unlock() - } - } -} - -// processSection processes an entire section by calling backend functions while -// ensuring the continuity of the passed headers. Since the chain mutex is not -// held while processing, the continuity can be broken by a long reorg, in which -// case the function returns with an error. -func (c *ChainIndexer) processSection(section uint64, lastHead common.Hash) (common.Hash, error) { - c.log.Trace("Processing new chain section", "section", section) - - // Reset and partial processing - if err := c.backend.Reset(c.ctx, section, lastHead); err != nil { - c.setValidSections(0) - return common.Hash{}, err - } - - for number := section * c.sectionSize; number < (section+1)*c.sectionSize; number++ { - hash := rawdb.ReadCanonicalHash(c.chainDb, number) - if hash == (common.Hash{}) { - return common.Hash{}, fmt.Errorf("canonical block #%d unknown", number) - } - header := rawdb.ReadHeader(c.chainDb, hash, number) - if header == nil { - return common.Hash{}, fmt.Errorf("block #%d [%x..] not found", number, hash[:4]) - } else if header.ParentHash != lastHead { - return common.Hash{}, errors.New("chain reorged during section processing") - } - if err := c.backend.Process(c.ctx, header); err != nil { - return common.Hash{}, err - } - lastHead = header.Hash() - } - if err := c.backend.Commit(); err != nil { - return common.Hash{}, err - } - return lastHead, nil -} - -// verifyLastHead compares last stored section head with the corresponding block hash in the -// actual canonical chain and rolls back reorged sections if necessary to ensure that stored -// sections are all valid -func (c *ChainIndexer) verifyLastHead() { - for c.storedSections > 0 && c.storedSections > c.checkpointSections { - if c.SectionHead(c.storedSections-1) == rawdb.ReadCanonicalHash(c.chainDb, c.storedSections*c.sectionSize-1) { - return - } - c.setValidSections(c.storedSections - 1) - } -} - -// Sections returns the number of processed sections maintained by the indexer -// and also the information about the last header indexed for potential canonical -// verifications. -func (c *ChainIndexer) Sections() (uint64, uint64, common.Hash) { - c.lock.Lock() - defer c.lock.Unlock() - - c.verifyLastHead() - return c.storedSections, c.storedSections*c.sectionSize - 1, c.SectionHead(c.storedSections - 1) -} - -// AddChildIndexer adds a child ChainIndexer that can use the output of this one -func (c *ChainIndexer) AddChildIndexer(indexer *ChainIndexer) { - if indexer == c { - panic("can't add indexer as a child of itself") - } - c.lock.Lock() - defer c.lock.Unlock() - - c.children = append(c.children, indexer) - - // Cascade any pending updates to new children too - sections := c.storedSections - if c.knownSections < sections { - // if a section is "stored" but not "known" then it is a checkpoint without - // available chain data so we should not cascade it yet - sections = c.knownSections - } - if sections > 0 { - indexer.newHead(sections*c.sectionSize-1, false) - } -} - -// Prune deletes all chain data older than given threshold. -func (c *ChainIndexer) Prune(threshold uint64) error { - return c.backend.Prune(threshold) -} - -// loadValidSections reads the number of valid sections from the index database -// and caches is into the local state. -func (c *ChainIndexer) loadValidSections() { - data, _ := c.indexDb.Get([]byte("count")) - if len(data) == 8 { - c.storedSections = binary.BigEndian.Uint64(data) - } -} - -// setValidSections writes the number of valid sections to the index database -func (c *ChainIndexer) setValidSections(sections uint64) { - // Set the current number of valid sections in the database - var data [8]byte - binary.BigEndian.PutUint64(data[:], sections) - c.indexDb.Put([]byte("count"), data[:]) - - // Remove any reorged sections, caching the valids in the mean time - for c.storedSections > sections { - c.storedSections-- - c.removeSectionHead(c.storedSections) - } - c.storedSections = sections // needed if new > old -} - -// SectionHead retrieves the last block hash of a processed section from the -// index database. -func (c *ChainIndexer) SectionHead(section uint64) common.Hash { - var data [8]byte - binary.BigEndian.PutUint64(data[:], section) - - hash, _ := c.indexDb.Get(append([]byte("shead"), data[:]...)) - if len(hash) == len(common.Hash{}) { - return common.BytesToHash(hash) - } - return common.Hash{} -} - -// setSectionHead writes the last block hash of a processed section to the index -// database. -func (c *ChainIndexer) setSectionHead(section uint64, hash common.Hash) { - var data [8]byte - binary.BigEndian.PutUint64(data[:], section) - - c.indexDb.Put(append([]byte("shead"), data[:]...), hash.Bytes()) -} - -// removeSectionHead removes the reference to a processed section from the index -// database. -func (c *ChainIndexer) removeSectionHead(section uint64) { - var data [8]byte - binary.BigEndian.PutUint64(data[:], section) - - c.indexDb.Delete(append([]byte("shead"), data[:]...)) -} diff --git a/core/chain_indexer_test.go b/core/chain_indexer_test.go deleted file mode 100644 index bf3bde756c..0000000000 --- a/core/chain_indexer_test.go +++ /dev/null @@ -1,246 +0,0 @@ -// Copyright 2017 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see . - -package core - -import ( - "context" - "errors" - "fmt" - "math/big" - "math/rand" - "testing" - "time" - - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/rawdb" - "github.com/ethereum/go-ethereum/core/types" -) - -// Runs multiple tests with randomized parameters. -func TestChainIndexerSingle(t *testing.T) { - for i := 0; i < 10; i++ { - testChainIndexer(t, 1) - } -} - -// Runs multiple tests with randomized parameters and different number of -// chain backends. -func TestChainIndexerWithChildren(t *testing.T) { - for i := 2; i < 8; i++ { - testChainIndexer(t, i) - } -} - -// testChainIndexer runs a test with either a single chain indexer or a chain of -// multiple backends. The section size and required confirmation count parameters -// are randomized. -func testChainIndexer(t *testing.T, count int) { - db := rawdb.NewMemoryDatabase() - defer db.Close() - - // Create a chain of indexers and ensure they all report empty - backends := make([]*testChainIndexBackend, count) - for i := 0; i < count; i++ { - var ( - sectionSize = uint64(rand.Intn(100) + 1) - confirmsReq = uint64(rand.Intn(10)) - ) - backends[i] = &testChainIndexBackend{t: t, processCh: make(chan uint64)} - backends[i].indexer = NewChainIndexer(db, rawdb.NewTable(db, string([]byte{byte(i)})), backends[i], sectionSize, confirmsReq, 0, fmt.Sprintf("indexer-%d", i)) - - if sections, _, _ := backends[i].indexer.Sections(); sections != 0 { - t.Fatalf("Canonical section count mismatch: have %v, want %v", sections, 0) - } - if i > 0 { - backends[i-1].indexer.AddChildIndexer(backends[i].indexer) - } - } - defer backends[0].indexer.Close() // parent indexer shuts down children - // notify pings the root indexer about a new head or reorg, then expect - // processed blocks if a section is processable - notify := func(headNum, failNum uint64, reorg bool) { - backends[0].indexer.newHead(headNum, reorg) - if reorg { - for _, backend := range backends { - headNum = backend.reorg(headNum) - backend.assertSections() - } - return - } - var cascade bool - for _, backend := range backends { - headNum, cascade = backend.assertBlocks(headNum, failNum) - if !cascade { - break - } - backend.assertSections() - } - } - // inject inserts a new random canonical header into the database directly - inject := func(number uint64) { - header := &types.Header{Number: big.NewInt(int64(number)), Extra: big.NewInt(rand.Int63()).Bytes()} - if number > 0 { - header.ParentHash = rawdb.ReadCanonicalHash(db, number-1) - } - rawdb.WriteHeader(db, header) - rawdb.WriteCanonicalHash(db, header.Hash(), number) - } - // Start indexer with an already existing chain - for i := uint64(0); i <= 100; i++ { - inject(i) - } - notify(100, 100, false) - - // Add new blocks one by one - for i := uint64(101); i <= 1000; i++ { - inject(i) - notify(i, i, false) - } - // Do a reorg - notify(500, 500, true) - - // Create new fork - for i := uint64(501); i <= 1000; i++ { - inject(i) - notify(i, i, false) - } - for i := uint64(1001); i <= 1500; i++ { - inject(i) - } - // Failed processing scenario where less blocks are available than notified - notify(2000, 1500, false) - - // Notify about a reorg (which could have caused the missing blocks if happened during processing) - notify(1500, 1500, true) - - // Create new fork - for i := uint64(1501); i <= 2000; i++ { - inject(i) - notify(i, i, false) - } -} - -// testChainIndexBackend implements ChainIndexerBackend -type testChainIndexBackend struct { - t *testing.T - indexer *ChainIndexer - section, headerCnt, stored uint64 - processCh chan uint64 -} - -// assertSections verifies if a chain indexer has the correct number of section. -func (b *testChainIndexBackend) assertSections() { - // Keep trying for 3 seconds if it does not match - var sections uint64 - for i := 0; i < 300; i++ { - sections, _, _ = b.indexer.Sections() - if sections == b.stored { - return - } - time.Sleep(10 * time.Millisecond) - } - b.t.Fatalf("Canonical section count mismatch: have %v, want %v", sections, b.stored) -} - -// assertBlocks expects processing calls after new blocks have arrived. If the -// failNum < headNum then we are simulating a scenario where a reorg has happened -// after the processing has started and the processing of a section fails. -func (b *testChainIndexBackend) assertBlocks(headNum, failNum uint64) (uint64, bool) { - var sections uint64 - if headNum >= b.indexer.confirmsReq { - sections = (headNum + 1 - b.indexer.confirmsReq) / b.indexer.sectionSize - if sections > b.stored { - // expect processed blocks - for expectd := b.stored * b.indexer.sectionSize; expectd < sections*b.indexer.sectionSize; expectd++ { - if expectd > failNum { - // rolled back after processing started, no more process calls expected - // wait until updating is done to make sure that processing actually fails - var updating bool - for i := 0; i < 300; i++ { - b.indexer.lock.Lock() - updating = b.indexer.knownSections > b.indexer.storedSections - b.indexer.lock.Unlock() - if !updating { - break - } - time.Sleep(10 * time.Millisecond) - } - if updating { - b.t.Fatalf("update did not finish") - } - sections = expectd / b.indexer.sectionSize - break - } - select { - case <-time.After(10 * time.Second): - b.t.Fatalf("Expected processed block #%d, got nothing", expectd) - case processed := <-b.processCh: - if processed != expectd { - b.t.Errorf("Expected processed block #%d, got #%d", expectd, processed) - } - } - } - b.stored = sections - } - } - if b.stored == 0 { - return 0, false - } - return b.stored*b.indexer.sectionSize - 1, true -} - -func (b *testChainIndexBackend) reorg(headNum uint64) uint64 { - firstChanged := (headNum + 1) / b.indexer.sectionSize - if firstChanged < b.stored { - b.stored = firstChanged - } - return b.stored * b.indexer.sectionSize -} - -func (b *testChainIndexBackend) Reset(ctx context.Context, section uint64, prevHead common.Hash) error { - b.section = section - b.headerCnt = 0 - return nil -} - -func (b *testChainIndexBackend) Process(ctx context.Context, header *types.Header) error { - b.headerCnt++ - if b.headerCnt > b.indexer.sectionSize { - b.t.Error("Processing too many headers") - } - //t.processCh <- header.Number.Uint64() - select { - case <-time.After(10 * time.Second): - b.t.Error("Unexpected call to Process") - // Can't use Fatal since this is not the test's goroutine. - // Returning error stops the chainIndexer's updateLoop - return errors.New("unexpected call to Process") - case b.processCh <- header.Number.Uint64(): - } - return nil -} - -func (b *testChainIndexBackend) Commit() error { - if b.headerCnt != b.indexer.sectionSize { - b.t.Error("Not enough headers processed") - } - return nil -} - -func (b *testChainIndexBackend) Prune(threshold uint64) error { - return nil -} diff --git a/core/rawdb/accessors_indexes.go b/core/rawdb/accessors_indexes.go index d1b0cf5053..68c3454e43 100644 --- a/core/rawdb/accessors_indexes.go +++ b/core/rawdb/accessors_indexes.go @@ -17,7 +17,6 @@ package rawdb import ( - "bytes" "encoding/binary" "errors" "math/big" @@ -147,41 +146,6 @@ func ReadReceipt(db ethdb.Reader, hash common.Hash, config *params.ChainConfig) return nil, common.Hash{}, 0, 0 } -// ReadBloomBits retrieves the compressed bloom bit vector belonging to the given -// section and bit index from the. -func ReadBloomBits(db ethdb.KeyValueReader, bit uint, section uint64, head common.Hash) ([]byte, error) { - return db.Get(bloomBitsKey(bit, section, head)) -} - -// WriteBloomBits stores the compressed bloom bits vector belonging to the given -// section and bit index. -func WriteBloomBits(db ethdb.KeyValueWriter, bit uint, section uint64, head common.Hash, bits []byte) { - if err := db.Put(bloomBitsKey(bit, section, head), bits); err != nil { - log.Crit("Failed to store bloom bits", "err", err) - } -} - -// DeleteBloombits removes all compressed bloom bits vector belonging to the -// given section range and bit index. -func DeleteBloombits(db ethdb.Database, bit uint, from uint64, to uint64) { - start, end := bloomBitsKey(bit, from, common.Hash{}), bloomBitsKey(bit, to, common.Hash{}) - it := db.NewIterator(nil, start) - defer it.Release() - - for it.Next() { - if bytes.Compare(it.Key(), end) >= 0 { - break - } - if len(it.Key()) != len(bloomBitsPrefix)+2+8+32 { - continue - } - db.Delete(it.Key()) - } - if it.Error() != nil { - log.Crit("Failed to delete bloom bits", "err", it.Error()) - } -} - var emptyRow = []uint32{} // ReadFilterMapRow retrieves a filter map row at the given mapRowIndex diff --git a/core/rawdb/accessors_indexes_test.go b/core/rawdb/accessors_indexes_test.go index 1bee455503..fc93b4ed19 100644 --- a/core/rawdb/accessors_indexes_test.go +++ b/core/rawdb/accessors_indexes_test.go @@ -111,46 +111,3 @@ func TestLookupStorage(t *testing.T) { }) } } - -func TestDeleteBloomBits(t *testing.T) { - // Prepare testing data - db := NewMemoryDatabase() - for i := uint(0); i < 2; i++ { - for s := uint64(0); s < 2; s++ { - WriteBloomBits(db, i, s, params.MainnetGenesisHash, []byte{0x01, 0x02}) - WriteBloomBits(db, i, s, params.SepoliaGenesisHash, []byte{0x01, 0x02}) - } - } - check := func(bit uint, section uint64, head common.Hash, exist bool) { - bits, _ := ReadBloomBits(db, bit, section, head) - if exist && !bytes.Equal(bits, []byte{0x01, 0x02}) { - t.Fatalf("Bloombits mismatch") - } - if !exist && len(bits) > 0 { - t.Fatalf("Bloombits should be removed") - } - } - // Check the existence of written data. - check(0, 0, params.MainnetGenesisHash, true) - check(0, 0, params.SepoliaGenesisHash, true) - - // Check the existence of deleted data. - DeleteBloombits(db, 0, 0, 1) - check(0, 0, params.MainnetGenesisHash, false) - check(0, 0, params.SepoliaGenesisHash, false) - check(0, 1, params.MainnetGenesisHash, true) - check(0, 1, params.SepoliaGenesisHash, true) - - // Check the existence of deleted data. - DeleteBloombits(db, 0, 0, 2) - check(0, 0, params.MainnetGenesisHash, false) - check(0, 0, params.SepoliaGenesisHash, false) - check(0, 1, params.MainnetGenesisHash, false) - check(0, 1, params.SepoliaGenesisHash, false) - - // Bit1 shouldn't be affect. - check(1, 0, params.MainnetGenesisHash, true) - check(1, 0, params.SepoliaGenesisHash, true) - check(1, 1, params.MainnetGenesisHash, true) - check(1, 1, params.SepoliaGenesisHash, true) -} diff --git a/core/rawdb/database.go b/core/rawdb/database.go index 53418d1646..1bf5ca8d3f 100644 --- a/core/rawdb/database.go +++ b/core/rawdb/database.go @@ -375,7 +375,6 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error { accountSnaps stat storageSnaps stat preimages stat - bloomBits stat beaconHeaders stat cliqueSnaps stat @@ -436,10 +435,6 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error { metadata.Add(size) case bytes.HasPrefix(key, genesisPrefix) && len(key) == (len(genesisPrefix)+common.HashLength): metadata.Add(size) - case bytes.HasPrefix(key, bloomBitsPrefix) && len(key) == (len(bloomBitsPrefix)+10+common.HashLength): - bloomBits.Add(size) - case bytes.HasPrefix(key, BloomBitsIndexPrefix): - bloomBits.Add(size) case bytes.HasPrefix(key, skeletonHeaderPrefix) && len(key) == (len(skeletonHeaderPrefix)+8): beaconHeaders.Add(size) case bytes.HasPrefix(key, CliqueSnapshotPrefix) && len(key) == 7+common.HashLength: @@ -504,7 +499,6 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error { {"Key-Value store", "Block number->hash", numHashPairings.Size(), numHashPairings.Count()}, {"Key-Value store", "Block hash->number", hashNumPairings.Size(), hashNumPairings.Count()}, {"Key-Value store", "Transaction index", txLookups.Size(), txLookups.Count()}, - {"Key-Value store", "Bloombit index", bloomBits.Size(), bloomBits.Count()}, {"Key-Value store", "Contract codes", codes.Size(), codes.Count()}, {"Key-Value store", "Hash trie nodes", legacyTries.Size(), legacyTries.Count()}, {"Key-Value store", "Path trie state lookups", stateLookups.Size(), stateLookups.Count()}, diff --git a/core/rawdb/schema.go b/core/rawdb/schema.go index 25c4d88a54..60d6a58ade 100644 --- a/core/rawdb/schema.go +++ b/core/rawdb/schema.go @@ -230,16 +230,6 @@ func storageSnapshotsKey(accountHash common.Hash) []byte { return append(SnapshotStoragePrefix, accountHash.Bytes()...) } -// bloomBitsKey = bloomBitsPrefix + bit (uint16 big endian) + section (uint64 big endian) + hash -func bloomBitsKey(bit uint, section uint64, hash common.Hash) []byte { - key := append(append(bloomBitsPrefix, make([]byte, 10)...), hash.Bytes()...) - - binary.BigEndian.PutUint16(key[1:], uint16(bit)) - binary.BigEndian.PutUint64(key[3:], section) - - return key -} - // skeletonHeaderKey = skeletonHeaderPrefix + num (uint64 big endian) func skeletonHeaderKey(number uint64) []byte { return append(skeletonHeaderPrefix, encodeBlockNumber(number)...) diff --git a/eth/api_backend.go b/eth/api_backend.go index b7dfaad4c4..44e66e78d0 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -28,7 +28,6 @@ import ( "github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/consensus/misc/eip4844" "github.com/ethereum/go-ethereum/core" - "github.com/ethereum/go-ethereum/core/bloombits" "github.com/ethereum/go-ethereum/core/filtermaps" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/state" @@ -401,17 +400,6 @@ func (b *EthAPIBackend) RPCTxFeeCap() float64 { return b.eth.config.RPCTxFeeCap } -func (b *EthAPIBackend) BloomStatus() (uint64, uint64) { - sections, _, _ := b.eth.bloomIndexer.Sections() - return params.BloomBitsBlocks, sections -} - -func (b *EthAPIBackend) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) { - for i := 0; i < bloomFilterThreads; i++ { - go session.Multiplex(bloomRetrievalBatch, bloomRetrievalWait, b.eth.bloomRequests) - } -} - func (b *EthAPIBackend) NewMatcherBackend() filtermaps.MatcherBackend { return b.eth.filterMaps.NewMatcherBackend() } diff --git a/eth/backend.go b/eth/backend.go index 06255add21..2acded6554 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -29,7 +29,6 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/core" - "github.com/ethereum/go-ethereum/core/bloombits" "github.com/ethereum/go-ethereum/core/filtermaps" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/state/pruner" @@ -82,10 +81,6 @@ type Ethereum struct { engine consensus.Engine accountManager *accounts.Manager - bloomRequests chan chan *bloombits.Retrieval // Channel receiving bloom data retrieval requests - bloomIndexer *core.ChainIndexer // Bloom indexer operating during block imports - closeBloomHandler chan struct{} - filterMaps *filtermaps.FilterMaps APIBackend *EthAPIBackend @@ -154,19 +149,16 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { networkID = chainConfig.ChainID.Uint64() } eth := &Ethereum{ - config: config, - chainDb: chainDb, - eventMux: stack.EventMux(), - accountManager: stack.AccountManager(), - engine: engine, - closeBloomHandler: make(chan struct{}), - networkID: networkID, - gasPrice: config.Miner.GasPrice, - bloomRequests: make(chan chan *bloombits.Retrieval), - bloomIndexer: core.NewBloomIndexer(chainDb, params.BloomBitsBlocks, params.BloomConfirms), - p2pServer: stack.Server(), - discmix: enode.NewFairMix(0), - shutdownTracker: shutdowncheck.NewShutdownTracker(chainDb), + config: config, + chainDb: chainDb, + eventMux: stack.EventMux(), + accountManager: stack.AccountManager(), + engine: engine, + networkID: networkID, + gasPrice: config.Miner.GasPrice, + p2pServer: stack.Server(), + discmix: enode.NewFairMix(0), + shutdownTracker: shutdowncheck.NewShutdownTracker(chainDb), } bcVersion := rawdb.ReadDatabaseVersion(chainDb) var dbVer = "" @@ -224,7 +216,6 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { if err != nil { return nil, err } - eth.bloomIndexer.Start(eth.blockchain) eth.filterMaps = filtermaps.NewFilterMaps(chainDb, eth.blockchain) if config.BlobPool.Datadir != "" { @@ -261,10 +252,10 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { eth.miner.SetExtra(makeExtraData(config.Miner.ExtraData)) eth.APIBackend = &EthAPIBackend{ - extRPCEnabled: stack.Config().ExtRPCEnabled(), - allowUnprotectedTxs: stack.Config().AllowUnprotectedTxs, - eth: eth, - gpo: nil, + extRPCEnabled: stack.Config().ExtRPCEnabled(), + allowUnprotectedTxs: stack.Config().AllowUnprotectedTxs, + eth: eth, + gpo: nil, } if eth.APIBackend.allowUnprotectedTxs { log.Info("Unprotected transactions allowed") @@ -348,7 +339,6 @@ func (s *Ethereum) Downloader() *downloader.Downloader { return s.handler.downlo func (s *Ethereum) Synced() bool { return s.handler.synced.Load() } func (s *Ethereum) SetSynced() { s.handler.enableSyncedFeatures() } func (s *Ethereum) ArchiveMode() bool { return s.config.NoPruning } -func (s *Ethereum) BloomIndexer() *core.ChainIndexer { return s.bloomIndexer } // Protocols returns all the currently configured // network protocols to start. @@ -365,9 +355,6 @@ func (s *Ethereum) Protocols() []p2p.Protocol { func (s *Ethereum) Start() error { s.setupDiscovery() - // Start the bloom bits servicing goroutines - s.startBloomHandlers(params.BloomBitsBlocks) - // Regularly update shutdown marker s.shutdownTracker.Start() @@ -416,9 +403,7 @@ func (s *Ethereum) Stop() error { s.handler.Stop() // Then stop everything else. - s.bloomIndexer.Close() s.filterMaps.Close() - close(s.closeBloomHandler) s.txPool.Close() s.blockchain.Stop() s.engine.Close() diff --git a/eth/bloombits.go b/eth/bloombits.go deleted file mode 100644 index 0cb7050d23..0000000000 --- a/eth/bloombits.go +++ /dev/null @@ -1,74 +0,0 @@ -// Copyright 2017 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see . - -package eth - -import ( - "time" - - "github.com/ethereum/go-ethereum/common/bitutil" - "github.com/ethereum/go-ethereum/core/rawdb" -) - -const ( - // bloomServiceThreads is the number of goroutines used globally by an Ethereum - // instance to service bloombits lookups for all running filters. - bloomServiceThreads = 16 - - // bloomFilterThreads is the number of goroutines used locally per filter to - // multiplex requests onto the global servicing goroutines. - bloomFilterThreads = 3 - - // bloomRetrievalBatch is the maximum number of bloom bit retrievals to service - // in a single batch. - bloomRetrievalBatch = 16 - - // bloomRetrievalWait is the maximum time to wait for enough bloom bit requests - // to accumulate request an entire batch (avoiding hysteresis). - bloomRetrievalWait = time.Duration(0) -) - -// startBloomHandlers starts a batch of goroutines to accept bloom bit database -// retrievals from possibly a range of filters and serving the data to satisfy. -func (eth *Ethereum) startBloomHandlers(sectionSize uint64) { - for i := 0; i < bloomServiceThreads; i++ { - go func() { - for { - select { - case <-eth.closeBloomHandler: - return - - case request := <-eth.bloomRequests: - task := <-request - task.Bitsets = make([][]byte, len(task.Sections)) - for i, section := range task.Sections { - head := rawdb.ReadCanonicalHash(eth.chainDb, (section+1)*sectionSize-1) - if compVector, err := rawdb.ReadBloomBits(eth.chainDb, task.Bit, section, head); err == nil { - if blob, err := bitutil.DecompressBytes(compVector, int(sectionSize/8)); err == nil { - task.Bitsets[i] = blob - } else { - task.Error = err - } - } else { - task.Error = err - } - } - request <- task - } - } - }() - } -} diff --git a/eth/filters/filter.go b/eth/filters/filter.go index 7b0be8d024..2fcf0945ba 100644 --- a/eth/filters/filter.go +++ b/eth/filters/filter.go @@ -25,7 +25,6 @@ import ( "time" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/bloombits" "github.com/ethereum/go-ethereum/core/filtermaps" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" @@ -42,37 +41,13 @@ type Filter struct { block *common.Hash // Block hash if filtering a single block begin, end int64 // Range interval if filtering multiple blocks bbMatchCount uint64 - - matcher *bloombits.Matcher } // NewRangeFilter creates a new filter which uses a bloom filter on blocks to // figure out whether a particular block is interesting or not. func (sys *FilterSystem) NewRangeFilter(begin, end int64, addresses []common.Address, topics [][]common.Hash) *Filter { - // Flatten the address and topic filter clauses into a single bloombits filter - // system. Since the bloombits are not positional, nil topics are permitted, - // which get flattened into a nil byte slice. - var filters [][][]byte - if len(addresses) > 0 { - filter := make([][]byte, len(addresses)) - for i, address := range addresses { - filter[i] = address.Bytes() - } - filters = append(filters, filter) - } - for _, topicList := range topics { - filter := make([][]byte, len(topicList)) - for i, topic := range topicList { - filter[i] = topic.Bytes() - } - filters = append(filters, filter) - } - size, _ := sys.backend.BloomStatus() - // Create a generic filter and convert it into a range filter filter := newFilter(sys, addresses, topics) - - filter.matcher = bloombits.NewMatcher(size, filters) filter.begin = begin filter.end = end @@ -197,23 +172,7 @@ func (f *Filter) rangeLogsAsync(ctx context.Context) (chan *types.Log, chan erro close(logChan) }() - // Gather all indexed logs, and finish with non indexed ones - var ( - end = uint64(f.end) - size, sections = f.sys.backend.BloomStatus() - err error - ) - if indexed := sections * size; indexed > uint64(f.begin) { - if indexed > end { - indexed = end + 1 - } - if err = f.indexedLogs(ctx, indexed-1, logChan); err != nil { - errChan <- err - return - } - } - - if err := f.unindexedLogs(ctx, end, logChan); err != nil { + if err := f.unindexedLogs(ctx, uint64(f.end), logChan); err != nil { errChan <- err return } @@ -224,53 +183,6 @@ func (f *Filter) rangeLogsAsync(ctx context.Context) (chan *types.Log, chan erro return logChan, errChan } -// indexedLogs returns the logs matching the filter criteria based on the bloom -// bits indexed available locally or via the network. -func (f *Filter) indexedLogs(ctx context.Context, end uint64, logChan chan *types.Log) error { - // Create a matcher session and request servicing from the backend - matches := make(chan uint64, 64) - - session, err := f.matcher.Start(ctx, uint64(f.begin), end, matches) - if err != nil { - return err - } - defer session.Close() - - f.sys.backend.ServiceFilter(ctx, session) - - for { - select { - case number, ok := <-matches: - f.bbMatchCount++ - // Abort if all matches have been fulfilled - if !ok { - err := session.Error() - if err == nil { - f.begin = int64(end) + 1 - } - return err - } - f.begin = int64(number) + 1 - - // Retrieve the suggested block and pull any truly matching logs - header, err := f.sys.backend.HeaderByNumber(ctx, rpc.BlockNumber(number)) - if header == nil || err != nil { - return err - } - found, err := f.checkMatches(ctx, header) - if err != nil { - return err - } - for _, log := range found { - logChan <- log - } - - case <-ctx.Done(): - return ctx.Err() - } - } -} - // unindexedLogs returns the logs matching the filter criteria based on raw block // iteration and bloom matching. func (f *Filter) unindexedLogs(ctx context.Context, end uint64, logChan chan *types.Log) error { diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index 6c3d8be86b..7531a1ecfc 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -29,7 +29,6 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/lru" "github.com/ethereum/go-ethereum/core" - "github.com/ethereum/go-ethereum/core/bloombits" "github.com/ethereum/go-ethereum/core/filtermaps" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethdb" @@ -70,9 +69,6 @@ type Backend interface { SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription - BloomStatus() (uint64, uint64) - ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) - NewMatcherBackend() filtermaps.MatcherBackend } diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go index aec5ee4166..b6119448d9 100644 --- a/eth/filters/filter_system_test.go +++ b/eth/filters/filter_system_test.go @@ -29,7 +29,6 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus/ethash" "github.com/ethereum/go-ethereum/core" - "github.com/ethereum/go-ethereum/core/bloombits" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethdb" @@ -137,37 +136,6 @@ func (b *testBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subsc return b.chainFeed.Subscribe(ch) } -func (b *testBackend) BloomStatus() (uint64, uint64) { - return params.BloomBitsBlocks, b.sections -} - -func (b *testBackend) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) { - requests := make(chan chan *bloombits.Retrieval) - - go session.Multiplex(16, 0, requests) - go func() { - for { - // Wait for a service request or a shutdown - select { - case <-ctx.Done(): - return - - case request := <-requests: - task := <-request - - task.Bitsets = make([][]byte, len(task.Sections)) - for i, section := range task.Sections { - if rand.Int()%4 != 0 { // Handle occasional missing deliveries - head := rawdb.ReadCanonicalHash(b.db, (section+1)*params.BloomBitsBlocks-1) - task.Bitsets[i], _ = rawdb.ReadBloomBits(b.db, task.Bit, section, head) - } - } - request <- task - } - } - }() -} - func (b *testBackend) setPending(block *types.Block, receipts types.Receipts) { b.pendingBlock = block b.pendingReceipts = receipts diff --git a/internal/ethapi/api_test.go b/internal/ethapi/api_test.go index ae2ec9f0f0..2b5e535b13 100644 --- a/internal/ethapi/api_test.go +++ b/internal/ethapi/api_test.go @@ -45,7 +45,6 @@ import ( "github.com/ethereum/go-ethereum/consensus/beacon" "github.com/ethereum/go-ethereum/consensus/ethash" "github.com/ethereum/go-ethereum/core" - "github.com/ethereum/go-ethereum/core/bloombits" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" @@ -620,11 +619,6 @@ func (b testBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) func (b testBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription { panic("implement me") } -func (b testBackend) BloomStatus() (uint64, uint64) { panic("implement me") } -func (b testBackend) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) { - panic("implement me") -} - func TestEstimateGas(t *testing.T) { t.Parallel() // Initialize test accounts diff --git a/internal/ethapi/backend.go b/internal/ethapi/backend.go index f9ce2ef474..9e2ea2c876 100644 --- a/internal/ethapi/backend.go +++ b/internal/ethapi/backend.go @@ -27,7 +27,6 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/core" - "github.com/ethereum/go-ethereum/core/bloombits" "github.com/ethereum/go-ethereum/core/filtermaps" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" @@ -94,8 +93,6 @@ type Backend interface { GetLogs(ctx context.Context, blockHash common.Hash, number uint64) ([][]*types.Log, error) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription - BloomStatus() (uint64, uint64) - ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) NewMatcherBackend() filtermaps.MatcherBackend } diff --git a/internal/ethapi/transaction_args_test.go b/internal/ethapi/transaction_args_test.go index 7172fc883f..2855bc1803 100644 --- a/internal/ethapi/transaction_args_test.go +++ b/internal/ethapi/transaction_args_test.go @@ -30,7 +30,6 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/core" - "github.com/ethereum/go-ethereum/core/bloombits" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" @@ -393,10 +392,8 @@ func (b *backendMock) TxPoolContent() (map[common.Address][]*types.Transaction, func (b *backendMock) TxPoolContentFrom(addr common.Address) ([]*types.Transaction, []*types.Transaction) { return nil, nil } -func (b *backendMock) SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription { return nil } -func (b *backendMock) BloomStatus() (uint64, uint64) { return 0, 0 } -func (b *backendMock) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) {} -func (b *backendMock) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription { return nil } +func (b *backendMock) SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription { return nil } +func (b *backendMock) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription { return nil } func (b *backendMock) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription { return nil } diff --git a/params/network_params.go b/params/network_params.go index 61bd6b2f42..c016e7fcf3 100644 --- a/params/network_params.go +++ b/params/network_params.go @@ -20,14 +20,6 @@ package params // aren't necessarily consensus related. const ( - // BloomBitsBlocks is the number of blocks a single bloom bit section vector - // contains on the server side. - BloomBitsBlocks uint64 = 4096 - - // BloomConfirms is the number of confirmation blocks before a bloom section is - // considered probably final and its rotated bits are calculated. - BloomConfirms = 256 - // FullImmutabilityThreshold is the number of blocks after which a chain segment is // considered immutable (i.e. soft finality). It is used by the downloader as a // hard limit against deep ancestors, by the blockchain against deep reorgs, by