diff --git a/cmd/geth/retesteth.go b/cmd/geth/retesteth.go index 29590b63bf..f4ec832789 100644 --- a/cmd/geth/retesteth.go +++ b/cmd/geth/retesteth.go @@ -200,11 +200,11 @@ func (e *NoRewardEngine) Author(header *types.Header) (common.Address, error) { return e.inner.Author(header) } -func (e *NoRewardEngine) VerifyHeader(chain consensus.ChainReader, header *types.Header, seal bool) error { +func (e *NoRewardEngine) VerifyHeader(chain consensus.ChainHeaderReader, header *types.Header, seal bool) error { return e.inner.VerifyHeader(chain, header, seal) } -func (e *NoRewardEngine) VerifyHeaders(chain consensus.ChainReader, headers []*types.Header, seals []bool) (chan<- struct{}, <-chan error) { +func (e *NoRewardEngine) VerifyHeaders(chain consensus.ChainHeaderReader, headers []*types.Header, seals []bool) (chan<- struct{}, <-chan error) { return e.inner.VerifyHeaders(chain, headers, seals) } @@ -212,11 +212,11 @@ func (e *NoRewardEngine) VerifyUncles(chain consensus.ChainReader, block *types. return e.inner.VerifyUncles(chain, block) } -func (e *NoRewardEngine) VerifySeal(chain consensus.ChainReader, header *types.Header) error { +func (e *NoRewardEngine) VerifySeal(chain consensus.ChainHeaderReader, header *types.Header) error { return e.inner.VerifySeal(chain, header) } -func (e *NoRewardEngine) Prepare(chain consensus.ChainReader, header *types.Header) error { +func (e *NoRewardEngine) Prepare(chain consensus.ChainHeaderReader, header *types.Header) error { return e.inner.Prepare(chain, header) } @@ -229,7 +229,7 @@ func (e *NoRewardEngine) accumulateRewards(config *params.ChainConfig, state *st state.AddBalance(header.Coinbase, reward) } -func (e *NoRewardEngine) Finalize(chain consensus.ChainReader, header *types.Header, statedb *state.StateDB, txs []*types.Transaction, +func (e *NoRewardEngine) Finalize(chain consensus.ChainHeaderReader, header *types.Header, statedb *state.StateDB, txs []*types.Transaction, uncles []*types.Header) { if e.rewardsOn { e.inner.Finalize(chain, header, statedb, txs, uncles) @@ -239,7 +239,7 @@ func (e *NoRewardEngine) Finalize(chain consensus.ChainReader, header *types.Hea } } -func (e *NoRewardEngine) FinalizeAndAssemble(chain consensus.ChainReader, header *types.Header, statedb *state.StateDB, txs []*types.Transaction, +func (e *NoRewardEngine) FinalizeAndAssemble(chain consensus.ChainHeaderReader, header *types.Header, statedb *state.StateDB, txs []*types.Transaction, uncles []*types.Header, receipts []*types.Receipt) (*types.Block, error) { if e.rewardsOn { return e.inner.FinalizeAndAssemble(chain, header, statedb, txs, uncles, receipts) @@ -252,7 +252,7 @@ func (e *NoRewardEngine) FinalizeAndAssemble(chain consensus.ChainReader, header } } -func (e *NoRewardEngine) Seal(chain consensus.ChainReader, block *types.Block, results chan<- *types.Block, stop <-chan struct{}) error { +func (e *NoRewardEngine) Seal(chain consensus.ChainHeaderReader, block *types.Block, results chan<- *types.Block, stop <-chan struct{}) error { return e.inner.Seal(chain, block, results, stop) } @@ -260,11 +260,11 @@ func (e *NoRewardEngine) SealHash(header *types.Header) common.Hash { return e.inner.SealHash(header) } -func (e *NoRewardEngine) CalcDifficulty(chain consensus.ChainReader, time uint64, parent *types.Header) *big.Int { +func (e *NoRewardEngine) CalcDifficulty(chain consensus.ChainHeaderReader, time uint64, parent *types.Header) *big.Int { return e.inner.CalcDifficulty(chain, time, parent) } -func (e *NoRewardEngine) APIs(chain consensus.ChainReader) []rpc.API { +func (e *NoRewardEngine) APIs(chain consensus.ChainHeaderReader) []rpc.API { return e.inner.APIs(chain) } diff --git a/consensus/clique/api.go b/consensus/clique/api.go index 13d404d2c8..8e9a1e7deb 100644 --- a/consensus/clique/api.go +++ b/consensus/clique/api.go @@ -28,7 +28,7 @@ import ( // API is a user facing RPC API to allow controlling the signer and voting // mechanisms of the proof-of-authority scheme. type API struct { - chain consensus.ChainReader + chain consensus.ChainHeaderReader clique *Clique } diff --git a/consensus/clique/clique.go b/consensus/clique/clique.go index 35542baf4e..377a3ebaeb 100644 --- a/consensus/clique/clique.go +++ b/consensus/clique/clique.go @@ -213,14 +213,14 @@ func (c *Clique) Author(header *types.Header) (common.Address, error) { } // VerifyHeader checks whether a header conforms to the consensus rules. -func (c *Clique) VerifyHeader(chain consensus.ChainReader, header *types.Header, seal bool) error { +func (c *Clique) VerifyHeader(chain consensus.ChainHeaderReader, header *types.Header, seal bool) error { return c.verifyHeader(chain, header, nil) } // VerifyHeaders is similar to VerifyHeader, but verifies a batch of headers. The // method returns a quit channel to abort the operations and a results channel to // retrieve the async verifications (the order is that of the input slice). -func (c *Clique) VerifyHeaders(chain consensus.ChainReader, headers []*types.Header, seals []bool) (chan<- struct{}, <-chan error) { +func (c *Clique) VerifyHeaders(chain consensus.ChainHeaderReader, headers []*types.Header, seals []bool) (chan<- struct{}, <-chan error) { abort := make(chan struct{}) results := make(chan error, len(headers)) @@ -242,7 +242,7 @@ func (c *Clique) VerifyHeaders(chain consensus.ChainReader, headers []*types.Hea // caller may optionally pass in a batch of parents (ascending order) to avoid // looking those up from the database. This is useful for concurrently verifying // a batch of new headers. -func (c *Clique) verifyHeader(chain consensus.ChainReader, header *types.Header, parents []*types.Header) error { +func (c *Clique) verifyHeader(chain consensus.ChainHeaderReader, header *types.Header, parents []*types.Header) error { if header.Number == nil { return errUnknownBlock } @@ -305,7 +305,7 @@ func (c *Clique) verifyHeader(chain consensus.ChainReader, header *types.Header, // rather depend on a batch of previous headers. The caller may optionally pass // in a batch of parents (ascending order) to avoid looking those up from the // database. This is useful for concurrently verifying a batch of new headers. -func (c *Clique) verifyCascadingFields(chain consensus.ChainReader, header *types.Header, parents []*types.Header) error { +func (c *Clique) verifyCascadingFields(chain consensus.ChainHeaderReader, header *types.Header, parents []*types.Header) error { // The genesis block is the always valid dead-end number := header.Number.Uint64() if number == 0 { @@ -345,7 +345,7 @@ func (c *Clique) verifyCascadingFields(chain consensus.ChainReader, header *type } // snapshot retrieves the authorization snapshot at a given point in time. -func (c *Clique) snapshot(chain consensus.ChainReader, number uint64, hash common.Hash, parents []*types.Header) (*Snapshot, error) { +func (c *Clique) snapshot(chain consensus.ChainHeaderReader, number uint64, hash common.Hash, parents []*types.Header) (*Snapshot, error) { // Search for a snapshot in memory or on disk for checkpoints var ( headers []*types.Header @@ -436,7 +436,7 @@ func (c *Clique) VerifyUncles(chain consensus.ChainReader, block *types.Block) e // VerifySeal implements consensus.Engine, checking whether the signature contained // in the header satisfies the consensus protocol requirements. -func (c *Clique) VerifySeal(chain consensus.ChainReader, header *types.Header) error { +func (c *Clique) VerifySeal(chain consensus.ChainHeaderReader, header *types.Header) error { return c.verifySeal(chain, header, nil) } @@ -444,7 +444,7 @@ func (c *Clique) VerifySeal(chain consensus.ChainReader, header *types.Header) e // consensus protocol requirements. The method accepts an optional list of parent // headers that aren't yet part of the local blockchain to generate the snapshots // from. -func (c *Clique) verifySeal(chain consensus.ChainReader, header *types.Header, parents []*types.Header) error { +func (c *Clique) verifySeal(chain consensus.ChainHeaderReader, header *types.Header, parents []*types.Header) error { // Verifying the genesis block is not supported number := header.Number.Uint64() if number == 0 { @@ -487,7 +487,7 @@ func (c *Clique) verifySeal(chain consensus.ChainReader, header *types.Header, p // Prepare implements consensus.Engine, preparing all the consensus fields of the // header for running the transactions on top. -func (c *Clique) Prepare(chain consensus.ChainReader, header *types.Header) error { +func (c *Clique) Prepare(chain consensus.ChainHeaderReader, header *types.Header) error { // If the block isn't a checkpoint, cast a random vote (good enough for now) header.Coinbase = common.Address{} header.Nonce = types.BlockNonce{} @@ -552,7 +552,7 @@ func (c *Clique) Prepare(chain consensus.ChainReader, header *types.Header) erro // Finalize implements consensus.Engine, ensuring no uncles are set, nor block // rewards given. -func (c *Clique) Finalize(chain consensus.ChainReader, header *types.Header, state *state.StateDB, txs []*types.Transaction, uncles []*types.Header) { +func (c *Clique) Finalize(chain consensus.ChainHeaderReader, header *types.Header, state *state.StateDB, txs []*types.Transaction, uncles []*types.Header) { // No block rewards in PoA, so the state remains as is and uncles are dropped header.Root = state.IntermediateRoot(chain.Config().IsEIP158(header.Number)) header.UncleHash = types.CalcUncleHash(nil) @@ -560,7 +560,7 @@ func (c *Clique) Finalize(chain consensus.ChainReader, header *types.Header, sta // FinalizeAndAssemble implements consensus.Engine, ensuring no uncles are set, // nor block rewards given, and returns the final block. -func (c *Clique) FinalizeAndAssemble(chain consensus.ChainReader, header *types.Header, state *state.StateDB, txs []*types.Transaction, uncles []*types.Header, receipts []*types.Receipt) (*types.Block, error) { +func (c *Clique) FinalizeAndAssemble(chain consensus.ChainHeaderReader, header *types.Header, state *state.StateDB, txs []*types.Transaction, uncles []*types.Header, receipts []*types.Receipt) (*types.Block, error) { // No block rewards in PoA, so the state remains as is and uncles are dropped header.Root = state.IntermediateRoot(chain.Config().IsEIP158(header.Number)) header.UncleHash = types.CalcUncleHash(nil) @@ -581,7 +581,7 @@ func (c *Clique) Authorize(signer common.Address, signFn SignerFn) { // Seal implements consensus.Engine, attempting to create a sealed block using // the local signing credentials. -func (c *Clique) Seal(chain consensus.ChainReader, block *types.Block, results chan<- *types.Block, stop <-chan struct{}) error { +func (c *Clique) Seal(chain consensus.ChainHeaderReader, block *types.Block, results chan<- *types.Block, stop <-chan struct{}) error { header := block.Header() // Sealing the genesis block is not supported @@ -654,7 +654,7 @@ func (c *Clique) Seal(chain consensus.ChainReader, block *types.Block, results c // CalcDifficulty is the difficulty adjustment algorithm. It returns the difficulty // that a new block should have based on the previous blocks in the chain and the // current signer. -func (c *Clique) CalcDifficulty(chain consensus.ChainReader, time uint64, parent *types.Header) *big.Int { +func (c *Clique) CalcDifficulty(chain consensus.ChainHeaderReader, time uint64, parent *types.Header) *big.Int { snap, err := c.snapshot(chain, parent.Number.Uint64(), parent.Hash(), nil) if err != nil { return nil @@ -684,7 +684,7 @@ func (c *Clique) Close() error { // APIs implements consensus.Engine, returning the user facing RPC API to allow // controlling the signer voting. -func (c *Clique) APIs(chain consensus.ChainReader) []rpc.API { +func (c *Clique) APIs(chain consensus.ChainHeaderReader) []rpc.API { return []rpc.API{{ Namespace: "clique", Version: "1.0", diff --git a/consensus/consensus.go b/consensus/consensus.go index f753af550c..f7a4d0ff0b 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -27,9 +27,9 @@ import ( "github.com/ethereum/go-ethereum/rpc" ) -// ChainReader defines a small collection of methods needed to access the local -// blockchain during header and/or uncle verification. -type ChainReader interface { +// ChainHeaderReader defines a small collection of methods needed to access the local +// blockchain during header verification. +type ChainHeaderReader interface { // Config retrieves the blockchain's chain configuration. Config() *params.ChainConfig @@ -44,6 +44,12 @@ type ChainReader interface { // GetHeaderByHash retrieves a block header from the database by its hash. GetHeaderByHash(hash common.Hash) *types.Header +} + +// ChainReader defines a small collection of methods needed to access the local +// blockchain during header and/or uncle verification. +type ChainReader interface { + ChainHeaderReader // GetBlock retrieves a block from the database by hash and number. GetBlock(hash common.Hash, number uint64) *types.Block @@ -59,13 +65,13 @@ type Engine interface { // VerifyHeader checks whether a header conforms to the consensus rules of a // given engine. Verifying the seal may be done optionally here, or explicitly // via the VerifySeal method. - VerifyHeader(chain ChainReader, header *types.Header, seal bool) error + VerifyHeader(chain ChainHeaderReader, header *types.Header, seal bool) error // VerifyHeaders is similar to VerifyHeader, but verifies a batch of headers // concurrently. The method returns a quit channel to abort the operations and // a results channel to retrieve the async verifications (the order is that of // the input slice). - VerifyHeaders(chain ChainReader, headers []*types.Header, seals []bool) (chan<- struct{}, <-chan error) + VerifyHeaders(chain ChainHeaderReader, headers []*types.Header, seals []bool) (chan<- struct{}, <-chan error) // VerifyUncles verifies that the given block's uncles conform to the consensus // rules of a given engine. @@ -73,18 +79,18 @@ type Engine interface { // VerifySeal checks whether the crypto seal on a header is valid according to // the consensus rules of the given engine. - VerifySeal(chain ChainReader, header *types.Header) error + VerifySeal(chain ChainHeaderReader, header *types.Header) error // Prepare initializes the consensus fields of a block header according to the // rules of a particular engine. The changes are executed inline. - Prepare(chain ChainReader, header *types.Header) error + Prepare(chain ChainHeaderReader, header *types.Header) error // Finalize runs any post-transaction state modifications (e.g. block rewards) // but does not assemble the block. // // Note: The block header and state database might be updated to reflect any // consensus rules that happen at finalization (e.g. block rewards). - Finalize(chain ChainReader, header *types.Header, state *state.StateDB, txs []*types.Transaction, + Finalize(chain ChainHeaderReader, header *types.Header, state *state.StateDB, txs []*types.Transaction, uncles []*types.Header) // FinalizeAndAssemble runs any post-transaction state modifications (e.g. block @@ -92,7 +98,7 @@ type Engine interface { // // Note: The block header and state database might be updated to reflect any // consensus rules that happen at finalization (e.g. block rewards). - FinalizeAndAssemble(chain ChainReader, header *types.Header, state *state.StateDB, txs []*types.Transaction, + FinalizeAndAssemble(chain ChainHeaderReader, header *types.Header, state *state.StateDB, txs []*types.Transaction, uncles []*types.Header, receipts []*types.Receipt) (*types.Block, error) // Seal generates a new sealing request for the given input block and pushes @@ -100,17 +106,17 @@ type Engine interface { // // Note, the method returns immediately and will send the result async. More // than one result may also be returned depending on the consensus algorithm. - Seal(chain ChainReader, block *types.Block, results chan<- *types.Block, stop <-chan struct{}) error + Seal(chain ChainHeaderReader, block *types.Block, results chan<- *types.Block, stop <-chan struct{}) error // SealHash returns the hash of a block prior to it being sealed. SealHash(header *types.Header) common.Hash // CalcDifficulty is the difficulty adjustment algorithm. It returns the difficulty // that a new block should have. - CalcDifficulty(chain ChainReader, time uint64, parent *types.Header) *big.Int + CalcDifficulty(chain ChainHeaderReader, time uint64, parent *types.Header) *big.Int // APIs returns the RPC APIs this consensus engine provides. - APIs(chain ChainReader) []rpc.API + APIs(chain ChainHeaderReader) []rpc.API // Close terminates any background threads maintained by the consensus engine. Close() error diff --git a/consensus/ethash/consensus.go b/consensus/ethash/consensus.go index 8ae99b1994..bbc554951d 100644 --- a/consensus/ethash/consensus.go +++ b/consensus/ethash/consensus.go @@ -86,7 +86,7 @@ func (ethash *Ethash) Author(header *types.Header) (common.Address, error) { // VerifyHeader checks whether a header conforms to the consensus rules of the // stock Ethereum ethash engine. -func (ethash *Ethash) VerifyHeader(chain consensus.ChainReader, header *types.Header, seal bool) error { +func (ethash *Ethash) VerifyHeader(chain consensus.ChainHeaderReader, header *types.Header, seal bool) error { // If we're running a full engine faking, accept any input as valid if ethash.config.PowMode == ModeFullFake { return nil @@ -107,7 +107,7 @@ func (ethash *Ethash) VerifyHeader(chain consensus.ChainReader, header *types.He // VerifyHeaders is similar to VerifyHeader, but verifies a batch of headers // concurrently. The method returns a quit channel to abort the operations and // a results channel to retrieve the async verifications. -func (ethash *Ethash) VerifyHeaders(chain consensus.ChainReader, headers []*types.Header, seals []bool) (chan<- struct{}, <-chan error) { +func (ethash *Ethash) VerifyHeaders(chain consensus.ChainHeaderReader, headers []*types.Header, seals []bool) (chan<- struct{}, <-chan error) { // If we're running a full engine faking, accept any input as valid if ethash.config.PowMode == ModeFullFake || len(headers) == 0 { abort, results := make(chan struct{}), make(chan error, len(headers)) @@ -169,7 +169,7 @@ func (ethash *Ethash) VerifyHeaders(chain consensus.ChainReader, headers []*type return abort, errorsOut } -func (ethash *Ethash) verifyHeaderWorker(chain consensus.ChainReader, headers []*types.Header, seals []bool, index int) error { +func (ethash *Ethash) verifyHeaderWorker(chain consensus.ChainHeaderReader, headers []*types.Header, seals []bool, index int) error { var parent *types.Header if index == 0 { parent = chain.GetHeader(headers[0].ParentHash, headers[0].Number.Uint64()-1) @@ -243,7 +243,7 @@ func (ethash *Ethash) VerifyUncles(chain consensus.ChainReader, block *types.Blo // verifyHeader checks whether a header conforms to the consensus rules of the // stock Ethereum ethash engine. // See YP section 4.3.4. "Block Header Validity" -func (ethash *Ethash) verifyHeader(chain consensus.ChainReader, header, parent *types.Header, uncle bool, seal bool) error { +func (ethash *Ethash) verifyHeader(chain consensus.ChainHeaderReader, header, parent *types.Header, uncle bool, seal bool) error { // Ensure that the header's extra-data section is of a reasonable size if uint64(len(header.Extra)) > params.MaximumExtraDataSize { return fmt.Errorf("extra-data too long: %d > %d", len(header.Extra), params.MaximumExtraDataSize) @@ -306,7 +306,7 @@ func (ethash *Ethash) verifyHeader(chain consensus.ChainReader, header, parent * // CalcDifficulty is the difficulty adjustment algorithm. It returns // the difficulty that a new block should have when created at time // given the parent block's time and difficulty. -func (ethash *Ethash) CalcDifficulty(chain consensus.ChainReader, time uint64, parent *types.Header) *big.Int { +func (ethash *Ethash) CalcDifficulty(chain consensus.ChainHeaderReader, time uint64, parent *types.Header) *big.Int { return CalcDifficulty(chain.Config(), time, parent) } @@ -486,14 +486,14 @@ func calcDifficultyFrontier(time uint64, parent *types.Header) *big.Int { // VerifySeal implements consensus.Engine, checking whether the given block satisfies // the PoW difficulty requirements. -func (ethash *Ethash) VerifySeal(chain consensus.ChainReader, header *types.Header) error { +func (ethash *Ethash) VerifySeal(chain consensus.ChainHeaderReader, header *types.Header) error { return ethash.verifySeal(chain, header, false) } // verifySeal checks whether a block satisfies the PoW difficulty requirements, // either using the usual ethash cache for it, or alternatively using a full DAG // to make remote mining fast. -func (ethash *Ethash) verifySeal(chain consensus.ChainReader, header *types.Header, fulldag bool) error { +func (ethash *Ethash) verifySeal(chain consensus.ChainHeaderReader, header *types.Header, fulldag bool) error { // If we're running a fake PoW, accept any seal as valid if ethash.config.PowMode == ModeFake || ethash.config.PowMode == ModeFullFake { time.Sleep(ethash.fakeDelay) @@ -558,7 +558,7 @@ func (ethash *Ethash) verifySeal(chain consensus.ChainReader, header *types.Head // Prepare implements consensus.Engine, initializing the difficulty field of a // header to conform to the ethash protocol. The changes are done inline. -func (ethash *Ethash) Prepare(chain consensus.ChainReader, header *types.Header) error { +func (ethash *Ethash) Prepare(chain consensus.ChainHeaderReader, header *types.Header) error { parent := chain.GetHeader(header.ParentHash, header.Number.Uint64()-1) if parent == nil { return consensus.ErrUnknownAncestor @@ -569,7 +569,7 @@ func (ethash *Ethash) Prepare(chain consensus.ChainReader, header *types.Header) // Finalize implements consensus.Engine, accumulating the block and uncle rewards, // setting the final state on the header -func (ethash *Ethash) Finalize(chain consensus.ChainReader, header *types.Header, state *state.StateDB, txs []*types.Transaction, uncles []*types.Header) { +func (ethash *Ethash) Finalize(chain consensus.ChainHeaderReader, header *types.Header, state *state.StateDB, txs []*types.Transaction, uncles []*types.Header) { // Accumulate any block and uncle rewards and commit the final state root accumulateRewards(chain.Config(), state, header, uncles) header.Root = state.IntermediateRoot(chain.Config().IsEIP158(header.Number)) @@ -577,7 +577,7 @@ func (ethash *Ethash) Finalize(chain consensus.ChainReader, header *types.Header // FinalizeAndAssemble implements consensus.Engine, accumulating the block and // uncle rewards, setting the final state and assembling the block. -func (ethash *Ethash) FinalizeAndAssemble(chain consensus.ChainReader, header *types.Header, state *state.StateDB, txs []*types.Transaction, uncles []*types.Header, receipts []*types.Receipt) (*types.Block, error) { +func (ethash *Ethash) FinalizeAndAssemble(chain consensus.ChainHeaderReader, header *types.Header, state *state.StateDB, txs []*types.Transaction, uncles []*types.Header, receipts []*types.Receipt) (*types.Block, error) { // Accumulate any block and uncle rewards and commit the final state root accumulateRewards(chain.Config(), state, header, uncles) header.Root = state.IntermediateRoot(chain.Config().IsEIP158(header.Number)) diff --git a/consensus/ethash/ethash.go b/consensus/ethash/ethash.go index 31bdceb4c3..aa3f002c0d 100644 --- a/consensus/ethash/ethash.go +++ b/consensus/ethash/ethash.go @@ -656,7 +656,7 @@ func (ethash *Ethash) Hashrate() float64 { } // APIs implements consensus.Engine, returning the user facing RPC APIs. -func (ethash *Ethash) APIs(chain consensus.ChainReader) []rpc.API { +func (ethash *Ethash) APIs(chain consensus.ChainHeaderReader) []rpc.API { // In order to ensure backward compatibility, we exposes ethash RPC APIs // to both eth and ethash namespaces. return []rpc.API{ diff --git a/consensus/ethash/sealer.go b/consensus/ethash/sealer.go index 52c4ed46dc..2053534028 100644 --- a/consensus/ethash/sealer.go +++ b/consensus/ethash/sealer.go @@ -48,7 +48,7 @@ var ( // Seal implements consensus.Engine, attempting to find a nonce that satisfies // the block's difficulty requirements. -func (ethash *Ethash) Seal(chain consensus.ChainReader, block *types.Block, results chan<- *types.Block, stop <-chan struct{}) error { +func (ethash *Ethash) Seal(chain consensus.ChainHeaderReader, block *types.Block, results chan<- *types.Block, stop <-chan struct{}) error { // If we're running a fake PoW, simply return a 0 nonce immediately if ethash.config.PowMode == ModeFake || ethash.config.PowMode == ModeFullFake { header := block.Header() diff --git a/eth/fetcher/block_fetcher.go b/eth/fetcher/block_fetcher.go index b7aa47e5a1..2c2dabad96 100644 --- a/eth/fetcher/block_fetcher.go +++ b/eth/fetcher/block_fetcher.go @@ -14,7 +14,7 @@ // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see . -// Package fetcher contains the announcement based blocks or transaction synchronisation. +// Package fetcher contains the announcement based header, blocks or transaction synchronisation. package fetcher import ( @@ -31,6 +31,7 @@ import ( ) const ( + lightTimeout = time.Millisecond // Time allowance before an announced header is explicitly requested arriveTimeout = 500 * time.Millisecond // Time allowance before an announced block/transaction is explicitly requested gatherSlack = 100 * time.Millisecond // Interval used to collate almost-expired announces with fetches fetchTimeout = 5 * time.Second // Maximum allotted time to return an explicitly requested block/transaction @@ -39,7 +40,7 @@ const ( const ( maxUncleDist = 7 // Maximum allowed backward distance from the chain head maxQueueDist = 32 // Maximum allowed distance from the chain head to queue - hashLimit = 256 // Maximum number of unique blocks a peer may have announced + hashLimit = 256 // Maximum number of unique blocks or headers a peer may have announced blockLimit = 64 // Maximum number of unique blocks a peer may have delivered ) @@ -63,9 +64,10 @@ var ( bodyFilterOutMeter = metrics.NewRegisteredMeter("eth/fetcher/block/filter/bodies/out", nil) ) -var ( - errTerminated = errors.New("terminated") -) +var errTerminated = errors.New("terminated") + +// HeaderRetrievalFn is a callback type for retrieving a header from the local chain. +type HeaderRetrievalFn func(common.Hash) *types.Header // blockRetrievalFn is a callback type for retrieving a block from the local chain. type blockRetrievalFn func(common.Hash) *types.Block @@ -85,6 +87,9 @@ type blockBroadcasterFn func(block *types.Block, propagate bool) // chainHeightFn is a callback type to retrieve the current chain height. type chainHeightFn func() uint64 +// headersInsertFn is a callback type to insert a batch of headers into the local chain. +type headersInsertFn func(headers []*types.Header) (int, error) + // chainInsertFn is a callback type to insert a batch of blocks into the local chain. type chainInsertFn func(types.Blocks) (int, error) @@ -121,18 +126,38 @@ type bodyFilterTask struct { time time.Time // Arrival time of the blocks' contents } -// blockInject represents a schedules import operation. -type blockInject struct { +// blockOrHeaderInject represents a schedules import operation. +type blockOrHeaderInject struct { origin string - block *types.Block + + header *types.Header // Used for light mode fetcher which only cares about header. + block *types.Block // Used for normal mode fetcher which imports full block. +} + +// number returns the block number of the injected object. +func (inject *blockOrHeaderInject) number() uint64 { + if inject.header != nil { + return inject.header.Number.Uint64() + } + return inject.block.NumberU64() +} + +// number returns the block hash of the injected object. +func (inject *blockOrHeaderInject) hash() common.Hash { + if inject.header != nil { + return inject.header.Hash() + } + return inject.block.Hash() } // BlockFetcher is responsible for accumulating block announcements from various peers // and scheduling them for retrieval. type BlockFetcher struct { + light bool // The indicator whether it's a light fetcher or normal one. + // Various event channels notify chan *blockAnnounce - inject chan *blockInject + inject chan *blockOrHeaderInject headerFilter chan chan *headerFilterTask bodyFilter chan chan *bodyFilterTask @@ -148,31 +173,34 @@ type BlockFetcher struct { completing map[common.Hash]*blockAnnounce // Blocks with headers, currently body-completing // Block cache - queue *prque.Prque // Queue containing the import operations (block number sorted) - queues map[string]int // Per peer block counts to prevent memory exhaustion - queued map[common.Hash]*blockInject // Set of already queued blocks (to dedupe imports) + queue *prque.Prque // Queue containing the import operations (block number sorted) + queues map[string]int // Per peer block counts to prevent memory exhaustion + queued map[common.Hash]*blockOrHeaderInject // Set of already queued blocks (to dedup imports) // Callbacks + getHeader HeaderRetrievalFn // Retrieves a header from the local chain getBlock blockRetrievalFn // Retrieves a block from the local chain verifyHeader headerVerifierFn // Checks if a block's headers have a valid proof of work broadcastBlock blockBroadcasterFn // Broadcasts a block to connected peers chainHeight chainHeightFn // Retrieves the current chain's height + insertHeaders headersInsertFn // Injects a batch of headers into the chain insertChain chainInsertFn // Injects a batch of blocks into the chain dropPeer peerDropFn // Drops a peer for misbehaving // Testing hooks - announceChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a hash from the blockAnnounce list - queueChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a block from the import queue - fetchingHook func([]common.Hash) // Method to call upon starting a block (eth/61) or header (eth/62) fetch - completingHook func([]common.Hash) // Method to call upon starting a block body fetch (eth/62) - importedHook func(*types.Block) // Method to call upon successful block import (both eth/61 and eth/62) + announceChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a hash from the blockAnnounce list + queueChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a block from the import queue + fetchingHook func([]common.Hash) // Method to call upon starting a block (eth/61) or header (eth/62) fetch + completingHook func([]common.Hash) // Method to call upon starting a block body fetch (eth/62) + importedHook func(*types.Header, *types.Block) // Method to call upon successful header or block import (both eth/61 and eth/62) } // NewBlockFetcher creates a block fetcher to retrieve blocks based on hash announcements. -func NewBlockFetcher(getBlock blockRetrievalFn, verifyHeader headerVerifierFn, broadcastBlock blockBroadcasterFn, chainHeight chainHeightFn, insertChain chainInsertFn, dropPeer peerDropFn) *BlockFetcher { +func NewBlockFetcher(light bool, getHeader HeaderRetrievalFn, getBlock blockRetrievalFn, verifyHeader headerVerifierFn, broadcastBlock blockBroadcasterFn, chainHeight chainHeightFn, insertHeaders headersInsertFn, insertChain chainInsertFn, dropPeer peerDropFn) *BlockFetcher { return &BlockFetcher{ + light: light, notify: make(chan *blockAnnounce), - inject: make(chan *blockInject), + inject: make(chan *blockOrHeaderInject), headerFilter: make(chan chan *headerFilterTask), bodyFilter: make(chan chan *bodyFilterTask), done: make(chan common.Hash), @@ -184,11 +212,13 @@ func NewBlockFetcher(getBlock blockRetrievalFn, verifyHeader headerVerifierFn, b completing: make(map[common.Hash]*blockAnnounce), queue: prque.New(nil), queues: make(map[string]int), - queued: make(map[common.Hash]*blockInject), + queued: make(map[common.Hash]*blockOrHeaderInject), + getHeader: getHeader, getBlock: getBlock, verifyHeader: verifyHeader, broadcastBlock: broadcastBlock, chainHeight: chainHeight, + insertHeaders: insertHeaders, insertChain: insertChain, dropPeer: dropPeer, } @@ -228,7 +258,7 @@ func (f *BlockFetcher) Notify(peer string, hash common.Hash, number uint64, time // Enqueue tries to fill gaps the fetcher's future import queue. func (f *BlockFetcher) Enqueue(peer string, block *types.Block) error { - op := &blockInject{ + op := &blockOrHeaderInject{ origin: peer, block: block, } @@ -315,13 +345,13 @@ func (f *BlockFetcher) loop() { // Import any queued blocks that could potentially fit height := f.chainHeight() for !f.queue.Empty() { - op := f.queue.PopItem().(*blockInject) - hash := op.block.Hash() + op := f.queue.PopItem().(*blockOrHeaderInject) + hash := op.hash() if f.queueChangeHook != nil { f.queueChangeHook(hash, false) } // If too high up the chain or phase, continue later - number := op.block.NumberU64() + number := op.number() if number > height+1 { f.queue.Push(op, -int64(number)) if f.queueChangeHook != nil { @@ -330,11 +360,15 @@ func (f *BlockFetcher) loop() { break } // Otherwise if fresh and still unknown, try and import - if number+maxUncleDist < height || f.getBlock(hash) != nil { + if (number+maxUncleDist < height) || (f.light && f.getHeader(hash) != nil) || (!f.light && f.getBlock(hash) != nil) { f.forgetBlock(hash) continue } - f.insert(op.origin, op.block) + if f.light { + f.importHeaders(op.origin, op.header) + } else { + f.importBlocks(op.origin, op.block) + } } // Wait for an outside event to occur select { @@ -379,7 +413,13 @@ func (f *BlockFetcher) loop() { case op := <-f.inject: // A direct block insertion was requested, try and fill any pending gaps blockBroadcastInMeter.Mark(1) - f.enqueue(op.origin, op.block) + + // Now only direct block injection is allowed, drop the header injection + // here silently if we receive. + if f.light { + continue + } + f.enqueue(op.origin, nil, op.block) case hash := <-f.done: // A pending import finished, remove all traces of the notification @@ -391,13 +431,19 @@ func (f *BlockFetcher) loop() { request := make(map[string][]common.Hash) for hash, announces := range f.announced { - if time.Since(announces[0].time) > arriveTimeout-gatherSlack { + // In current LES protocol(les2/les3), only header announce is + // available, no need to wait too much time for header broadcast. + timeout := arriveTimeout - gatherSlack + if f.light { + timeout = 0 + } + if time.Since(announces[0].time) > timeout { // Pick a random peer to retrieve from, reset all others announce := announces[rand.Intn(len(announces))] f.forgetHash(hash) // If the block still didn't arrive, queue for fetching - if f.getBlock(hash) == nil { + if (f.light && f.getHeader(hash) == nil) || (!f.light && f.getBlock(hash) == nil) { request[announce.origin] = append(request[announce.origin], hash) f.fetching[hash] = announce } @@ -465,7 +511,7 @@ func (f *BlockFetcher) loop() { // Split the batch of headers into unknown ones (to return to the caller), // known incomplete ones (requiring body retrievals) and completed blocks. - unknown, incomplete, complete := []*types.Header{}, []*blockAnnounce{}, []*types.Block{} + unknown, incomplete, complete, lightHeaders := []*types.Header{}, []*blockAnnounce{}, []*types.Block{}, []*blockAnnounce{} for _, header := range task.headers { hash := header.Hash() @@ -478,6 +524,16 @@ func (f *BlockFetcher) loop() { f.forgetHash(hash) continue } + // Collect all headers only if we are running in light + // mode and the headers are not imported by other means. + if f.light { + if f.getHeader(hash) == nil { + announce.header = header + lightHeaders = append(lightHeaders, announce) + } + f.forgetHash(hash) + continue + } // Only keep if not imported by other means if f.getBlock(hash) == nil { announce.header = header @@ -522,10 +578,14 @@ func (f *BlockFetcher) loop() { f.rescheduleComplete(completeTimer) } } + // Schedule the header for light fetcher import + for _, announce := range lightHeaders { + f.enqueue(announce.origin, announce.header, nil) + } // Schedule the header-only blocks for import for _, block := range complete { if announce := f.completing[block.Hash()]; announce != nil { - f.enqueue(announce.origin, block) + f.enqueue(announce.origin, nil, block) } } @@ -592,7 +652,7 @@ func (f *BlockFetcher) loop() { // Schedule the retrieved blocks for ordered import for _, block := range blocks { if announce := f.completing[block.Hash()]; announce != nil { - f.enqueue(announce.origin, block) + f.enqueue(announce.origin, nil, block) } } } @@ -605,6 +665,12 @@ func (f *BlockFetcher) rescheduleFetch(fetch *time.Timer) { if len(f.announced) == 0 { return } + // Schedule announcement retrieval quickly for light mode + // since server won't send any headers to client. + if f.light { + fetch.Reset(lightTimeout) + return + } // Otherwise find the earliest expiring announcement earliest := time.Now() for _, announces := range f.announced { @@ -631,46 +697,88 @@ func (f *BlockFetcher) rescheduleComplete(complete *time.Timer) { complete.Reset(gatherSlack - time.Since(earliest)) } -// enqueue schedules a new future import operation, if the block to be imported -// has not yet been seen. -func (f *BlockFetcher) enqueue(peer string, block *types.Block) { - hash := block.Hash() - +// enqueue schedules a new header or block import operation, if the component +// to be imported has not yet been seen. +func (f *BlockFetcher) enqueue(peer string, header *types.Header, block *types.Block) { + var ( + hash common.Hash + number uint64 + ) + if header != nil { + hash, number = header.Hash(), header.Number.Uint64() + } else { + hash, number = block.Hash(), block.NumberU64() + } // Ensure the peer isn't DOSing us count := f.queues[peer] + 1 if count > blockLimit { - log.Debug("Discarded propagated block, exceeded allowance", "peer", peer, "number", block.Number(), "hash", hash, "limit", blockLimit) + log.Debug("Discarded delivered header or block, exceeded allowance", "peer", peer, "number", number, "hash", hash, "limit", blockLimit) blockBroadcastDOSMeter.Mark(1) f.forgetHash(hash) return } // Discard any past or too distant blocks - if dist := int64(block.NumberU64()) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist { - log.Debug("Discarded propagated block, too far away", "peer", peer, "number", block.Number(), "hash", hash, "distance", dist) + if dist := int64(number) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist { + log.Debug("Discarded delivered header or block, too far away", "peer", peer, "number", number, "hash", hash, "distance", dist) blockBroadcastDropMeter.Mark(1) f.forgetHash(hash) return } // Schedule the block for future importing if _, ok := f.queued[hash]; !ok { - op := &blockInject{ - origin: peer, - block: block, + op := &blockOrHeaderInject{origin: peer} + if header != nil { + op.header = header + } else { + op.block = block } f.queues[peer] = count f.queued[hash] = op - f.queue.Push(op, -int64(block.NumberU64())) + f.queue.Push(op, -int64(number)) if f.queueChangeHook != nil { - f.queueChangeHook(op.block.Hash(), true) + f.queueChangeHook(hash, true) } - log.Debug("Queued propagated block", "peer", peer, "number", block.Number(), "hash", hash, "queued", f.queue.Size()) + log.Debug("Queued delivered header or block", "peer", peer, "number", number, "hash", hash, "queued", f.queue.Size()) } } -// insert spawns a new goroutine to run a block insertion into the chain. If the +// importHeaders spawns a new goroutine to run a header insertion into the chain. +// If the header's number is at the same height as the current import phase, it +// updates the phase states accordingly. +func (f *BlockFetcher) importHeaders(peer string, header *types.Header) { + hash := header.Hash() + log.Debug("Importing propagated header", "peer", peer, "number", header.Number, "hash", hash) + + go func() { + defer func() { f.done <- hash }() + // If the parent's unknown, abort insertion + parent := f.getHeader(header.ParentHash) + if parent == nil { + log.Debug("Unknown parent of propagated header", "peer", peer, "number", header.Number, "hash", hash, "parent", header.ParentHash) + return + } + // Validate the header and if something went wrong, drop the peer + if err := f.verifyHeader(header); err != nil && err != consensus.ErrFutureBlock { + log.Debug("Propagated header verification failed", "peer", peer, "number", header.Number, "hash", hash, "err", err) + f.dropPeer(peer) + return + } + // Run the actual import and log any issues + if _, err := f.insertHeaders([]*types.Header{header}); err != nil { + log.Debug("Propagated header import failed", "peer", peer, "number", header.Number, "hash", hash, "err", err) + return + } + // Invoke the testing hook if needed + if f.importedHook != nil { + f.importedHook(header, nil) + } + }() +} + +// importBlocks spawns a new goroutine to run a block insertion into the chain. If the // block's number is at the same height as the current import phase, it updates // the phase states accordingly. -func (f *BlockFetcher) insert(peer string, block *types.Block) { +func (f *BlockFetcher) importBlocks(peer string, block *types.Block) { hash := block.Hash() // Run the import on a new thread @@ -711,7 +819,7 @@ func (f *BlockFetcher) insert(peer string, block *types.Block) { // Invoke the testing hook if needed if f.importedHook != nil { - f.importedHook(block) + f.importedHook(nil, block) } }() } diff --git a/eth/fetcher/block_fetcher_test.go b/eth/fetcher/block_fetcher_test.go index 038ead12e7..a6854ffcf8 100644 --- a/eth/fetcher/block_fetcher_test.go +++ b/eth/fetcher/block_fetcher_test.go @@ -78,26 +78,36 @@ func makeChain(n int, seed byte, parent *types.Block) ([]common.Hash, map[common type fetcherTester struct { fetcher *BlockFetcher - hashes []common.Hash // Hash chain belonging to the tester - blocks map[common.Hash]*types.Block // Blocks belonging to the tester - drops map[string]bool // Map of peers dropped by the fetcher + hashes []common.Hash // Hash chain belonging to the tester + headers map[common.Hash]*types.Header // Headers belonging to the tester + blocks map[common.Hash]*types.Block // Blocks belonging to the tester + drops map[string]bool // Map of peers dropped by the fetcher lock sync.RWMutex } // newTester creates a new fetcher test mocker. -func newTester() *fetcherTester { +func newTester(light bool) *fetcherTester { tester := &fetcherTester{ - hashes: []common.Hash{genesis.Hash()}, - blocks: map[common.Hash]*types.Block{genesis.Hash(): genesis}, - drops: make(map[string]bool), + hashes: []common.Hash{genesis.Hash()}, + headers: map[common.Hash]*types.Header{genesis.Hash(): genesis.Header()}, + blocks: map[common.Hash]*types.Block{genesis.Hash(): genesis}, + drops: make(map[string]bool), } - tester.fetcher = NewBlockFetcher(tester.getBlock, tester.verifyHeader, tester.broadcastBlock, tester.chainHeight, tester.insertChain, tester.dropPeer) + tester.fetcher = NewBlockFetcher(light, tester.getHeader, tester.getBlock, tester.verifyHeader, tester.broadcastBlock, tester.chainHeight, tester.insertHeaders, tester.insertChain, tester.dropPeer) tester.fetcher.Start() return tester } +// getHeader retrieves a header from the tester's block chain. +func (f *fetcherTester) getHeader(hash common.Hash) *types.Header { + f.lock.RLock() + defer f.lock.RUnlock() + + return f.headers[hash] +} + // getBlock retrieves a block from the tester's block chain. func (f *fetcherTester) getBlock(hash common.Hash) *types.Block { f.lock.RLock() @@ -120,9 +130,33 @@ func (f *fetcherTester) chainHeight() uint64 { f.lock.RLock() defer f.lock.RUnlock() + if f.fetcher.light { + return f.headers[f.hashes[len(f.hashes)-1]].Number.Uint64() + } return f.blocks[f.hashes[len(f.hashes)-1]].NumberU64() } +// insertChain injects a new headers into the simulated chain. +func (f *fetcherTester) insertHeaders(headers []*types.Header) (int, error) { + f.lock.Lock() + defer f.lock.Unlock() + + for i, header := range headers { + // Make sure the parent in known + if _, ok := f.headers[header.ParentHash]; !ok { + return i, errors.New("unknown parent") + } + // Discard any new blocks if the same height already exists + if header.Number.Uint64() <= f.headers[f.hashes[len(f.hashes)-1]].Number.Uint64() { + return i, nil + } + // Otherwise build our current chain + f.hashes = append(f.hashes, header.Hash()) + f.headers[header.Hash()] = header + } + return 0, nil +} + // insertChain injects a new blocks into the simulated chain. func (f *fetcherTester) insertChain(blocks types.Blocks) (int, error) { f.lock.Lock() @@ -233,7 +267,7 @@ func verifyCompletingEvent(t *testing.T, completing chan []common.Hash, arrive b } // verifyImportEvent verifies that one single event arrive on an import channel. -func verifyImportEvent(t *testing.T, imported chan *types.Block, arrive bool) { +func verifyImportEvent(t *testing.T, imported chan interface{}, arrive bool) { if arrive { select { case <-imported: @@ -251,7 +285,7 @@ func verifyImportEvent(t *testing.T, imported chan *types.Block, arrive bool) { // verifyImportCount verifies that exactly count number of events arrive on an // import hook channel. -func verifyImportCount(t *testing.T, imported chan *types.Block, count int) { +func verifyImportCount(t *testing.T, imported chan interface{}, count int) { for i := 0; i < count; i++ { select { case <-imported: @@ -263,7 +297,7 @@ func verifyImportCount(t *testing.T, imported chan *types.Block, count int) { } // verifyImportDone verifies that no more events are arriving on an import channel. -func verifyImportDone(t *testing.T, imported chan *types.Block) { +func verifyImportDone(t *testing.T, imported chan interface{}) { select { case <-imported: t.Fatalf("extra block imported") @@ -271,45 +305,62 @@ func verifyImportDone(t *testing.T, imported chan *types.Block) { } } -// Tests that a fetcher accepts block announcements and initiates retrievals for -// them, successfully importing into the local chain. -func TestSequentialAnnouncements62(t *testing.T) { testSequentialAnnouncements(t, 62) } -func TestSequentialAnnouncements63(t *testing.T) { testSequentialAnnouncements(t, 63) } -func TestSequentialAnnouncements64(t *testing.T) { testSequentialAnnouncements(t, 64) } +// verifyChainHeight verifies the chain height is as expected. +func verifyChainHeight(t *testing.T, fetcher *fetcherTester, height uint64) { + if fetcher.chainHeight() != height { + t.Fatalf("chain height mismatch, got %d, want %d", fetcher.chainHeight(), height) + } +} -func testSequentialAnnouncements(t *testing.T, protocol int) { +// Tests that a fetcher accepts block/header announcements and initiates retrievals +// for them, successfully importing into the local chain. +func TestFullSequentialAnnouncements(t *testing.T) { testSequentialAnnouncements(t, false) } +func TestLightSequentialAnnouncements(t *testing.T) { testSequentialAnnouncements(t, true) } + +func testSequentialAnnouncements(t *testing.T, light bool) { // Create a chain of blocks to import targetBlocks := 4 * hashLimit hashes, blocks := makeChain(targetBlocks, 0, genesis) - tester := newTester() + tester := newTester(light) headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack) bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0) // Iteratively announce blocks until all are imported - imported := make(chan *types.Block) - tester.fetcher.importedHook = func(block *types.Block) { imported <- block } - + imported := make(chan interface{}) + tester.fetcher.importedHook = func(header *types.Header, block *types.Block) { + if light { + if header == nil { + t.Fatalf("Fetcher try to import empty header") + } + imported <- header + } else { + if block == nil { + t.Fatalf("Fetcher try to import empty block") + } + imported <- block + } + } for i := len(hashes) - 2; i >= 0; i-- { tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher) verifyImportEvent(t, imported, true) } verifyImportDone(t, imported) + verifyChainHeight(t, tester, uint64(len(hashes)-1)) } // Tests that if blocks are announced by multiple peers (or even the same buggy // peer), they will only get downloaded at most once. -func TestConcurrentAnnouncements62(t *testing.T) { testConcurrentAnnouncements(t, 62) } -func TestConcurrentAnnouncements63(t *testing.T) { testConcurrentAnnouncements(t, 63) } -func TestConcurrentAnnouncements64(t *testing.T) { testConcurrentAnnouncements(t, 64) } +func TestFullConcurrentAnnouncements(t *testing.T) { testConcurrentAnnouncements(t, false) } +func TestLightConcurrentAnnouncements(t *testing.T) { testConcurrentAnnouncements(t, true) } -func testConcurrentAnnouncements(t *testing.T, protocol int) { +func testConcurrentAnnouncements(t *testing.T, light bool) { // Create a chain of blocks to import targetBlocks := 4 * hashLimit hashes, blocks := makeChain(targetBlocks, 0, genesis) // Assemble a tester with a built in counter for the requests - tester := newTester() + tester := newTester(light) firstHeaderFetcher := tester.makeHeaderFetcher("first", blocks, -gatherSlack) firstBodyFetcher := tester.makeBodyFetcher("first", blocks, 0) secondHeaderFetcher := tester.makeHeaderFetcher("second", blocks, -gatherSlack) @@ -325,9 +376,20 @@ func testConcurrentAnnouncements(t *testing.T, protocol int) { return secondHeaderFetcher(hash) } // Iteratively announce blocks until all are imported - imported := make(chan *types.Block) - tester.fetcher.importedHook = func(block *types.Block) { imported <- block } - + imported := make(chan interface{}) + tester.fetcher.importedHook = func(header *types.Header, block *types.Block) { + if light { + if header == nil { + t.Fatalf("Fetcher try to import empty header") + } + imported <- header + } else { + if block == nil { + t.Fatalf("Fetcher try to import empty block") + } + imported <- block + } + } for i := len(hashes) - 2; i >= 0; i-- { tester.fetcher.Notify("first", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), firstHeaderWrapper, firstBodyFetcher) tester.fetcher.Notify("second", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout+time.Millisecond), secondHeaderWrapper, secondBodyFetcher) @@ -340,30 +402,42 @@ func testConcurrentAnnouncements(t *testing.T, protocol int) { if int(counter) != targetBlocks { t.Fatalf("retrieval count mismatch: have %v, want %v", counter, targetBlocks) } + verifyChainHeight(t, tester, uint64(len(hashes)-1)) } // Tests that announcements arriving while a previous is being fetched still // results in a valid import. -func TestOverlappingAnnouncements62(t *testing.T) { testOverlappingAnnouncements(t, 62) } -func TestOverlappingAnnouncements63(t *testing.T) { testOverlappingAnnouncements(t, 63) } -func TestOverlappingAnnouncements64(t *testing.T) { testOverlappingAnnouncements(t, 64) } +func TestFullOverlappingAnnouncements(t *testing.T) { testOverlappingAnnouncements(t, false) } +func TestLightOverlappingAnnouncements(t *testing.T) { testOverlappingAnnouncements(t, true) } -func testOverlappingAnnouncements(t *testing.T, protocol int) { +func testOverlappingAnnouncements(t *testing.T, light bool) { // Create a chain of blocks to import targetBlocks := 4 * hashLimit hashes, blocks := makeChain(targetBlocks, 0, genesis) - tester := newTester() + tester := newTester(light) headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack) bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0) // Iteratively announce blocks, but overlap them continuously overlap := 16 - imported := make(chan *types.Block, len(hashes)-1) + imported := make(chan interface{}, len(hashes)-1) for i := 0; i < overlap; i++ { imported <- nil } - tester.fetcher.importedHook = func(block *types.Block) { imported <- block } + tester.fetcher.importedHook = func(header *types.Header, block *types.Block) { + if light { + if header == nil { + t.Fatalf("Fetcher try to import empty header") + } + imported <- header + } else { + if block == nil { + t.Fatalf("Fetcher try to import empty block") + } + imported <- block + } + } for i := len(hashes) - 2; i >= 0; i-- { tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher) @@ -375,19 +449,19 @@ func testOverlappingAnnouncements(t *testing.T, protocol int) { } // Wait for all the imports to complete and check count verifyImportCount(t, imported, overlap) + verifyChainHeight(t, tester, uint64(len(hashes)-1)) } // Tests that announces already being retrieved will not be duplicated. -func TestPendingDeduplication62(t *testing.T) { testPendingDeduplication(t, 62) } -func TestPendingDeduplication63(t *testing.T) { testPendingDeduplication(t, 63) } -func TestPendingDeduplication64(t *testing.T) { testPendingDeduplication(t, 64) } +func TestFullPendingDeduplication(t *testing.T) { testPendingDeduplication(t, false) } +func TestLightPendingDeduplication(t *testing.T) { testPendingDeduplication(t, true) } -func testPendingDeduplication(t *testing.T, protocol int) { +func testPendingDeduplication(t *testing.T, light bool) { // Create a hash and corresponding block hashes, blocks := makeChain(1, 0, genesis) // Assemble a tester with a built in counter and delayed fetcher - tester := newTester() + tester := newTester(light) headerFetcher := tester.makeHeaderFetcher("repeater", blocks, -gatherSlack) bodyFetcher := tester.makeBodyFetcher("repeater", blocks, 0) @@ -403,42 +477,58 @@ func testPendingDeduplication(t *testing.T, protocol int) { }() return nil } + checkNonExist := func() bool { + return tester.getBlock(hashes[0]) == nil + } + if light { + checkNonExist = func() bool { + return tester.getHeader(hashes[0]) == nil + } + } // Announce the same block many times until it's fetched (wait for any pending ops) - for tester.getBlock(hashes[0]) == nil { + for checkNonExist() { tester.fetcher.Notify("repeater", hashes[0], 1, time.Now().Add(-arriveTimeout), headerWrapper, bodyFetcher) time.Sleep(time.Millisecond) } time.Sleep(delay) // Check that all blocks were imported and none fetched twice - if imported := len(tester.blocks); imported != 2 { - t.Fatalf("synchronised block mismatch: have %v, want %v", imported, 2) - } if int(counter) != 1 { t.Fatalf("retrieval count mismatch: have %v, want %v", counter, 1) } + verifyChainHeight(t, tester, 1) } // Tests that announcements retrieved in a random order are cached and eventually // imported when all the gaps are filled in. -func TestRandomArrivalImport62(t *testing.T) { testRandomArrivalImport(t, 62) } -func TestRandomArrivalImport63(t *testing.T) { testRandomArrivalImport(t, 63) } -func TestRandomArrivalImport64(t *testing.T) { testRandomArrivalImport(t, 64) } +func TestFullRandomArrivalImport(t *testing.T) { testRandomArrivalImport(t, false) } +func TestLightRandomArrivalImport(t *testing.T) { testRandomArrivalImport(t, true) } -func testRandomArrivalImport(t *testing.T, protocol int) { +func testRandomArrivalImport(t *testing.T, light bool) { // Create a chain of blocks to import, and choose one to delay targetBlocks := maxQueueDist hashes, blocks := makeChain(targetBlocks, 0, genesis) skip := targetBlocks / 2 - tester := newTester() + tester := newTester(light) headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack) bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0) // Iteratively announce blocks, skipping one entry - imported := make(chan *types.Block, len(hashes)-1) - tester.fetcher.importedHook = func(block *types.Block) { imported <- block } - + imported := make(chan interface{}, len(hashes)-1) + tester.fetcher.importedHook = func(header *types.Header, block *types.Block) { + if light { + if header == nil { + t.Fatalf("Fetcher try to import empty header") + } + imported <- header + } else { + if block == nil { + t.Fatalf("Fetcher try to import empty block") + } + imported <- block + } + } for i := len(hashes) - 1; i >= 0; i-- { if i != skip { tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher) @@ -448,27 +538,24 @@ func testRandomArrivalImport(t *testing.T, protocol int) { // Finally announce the skipped entry and check full import tester.fetcher.Notify("valid", hashes[skip], uint64(len(hashes)-skip-1), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher) verifyImportCount(t, imported, len(hashes)-1) + verifyChainHeight(t, tester, uint64(len(hashes)-1)) } // Tests that direct block enqueues (due to block propagation vs. hash announce) // are correctly schedule, filling and import queue gaps. -func TestQueueGapFill62(t *testing.T) { testQueueGapFill(t, 62) } -func TestQueueGapFill63(t *testing.T) { testQueueGapFill(t, 63) } -func TestQueueGapFill64(t *testing.T) { testQueueGapFill(t, 64) } - -func testQueueGapFill(t *testing.T, protocol int) { +func TestQueueGapFill(t *testing.T) { // Create a chain of blocks to import, and choose one to not announce at all targetBlocks := maxQueueDist hashes, blocks := makeChain(targetBlocks, 0, genesis) skip := targetBlocks / 2 - tester := newTester() + tester := newTester(false) headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack) bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0) // Iteratively announce blocks, skipping one entry - imported := make(chan *types.Block, len(hashes)-1) - tester.fetcher.importedHook = func(block *types.Block) { imported <- block } + imported := make(chan interface{}, len(hashes)-1) + tester.fetcher.importedHook = func(header *types.Header, block *types.Block) { imported <- block } for i := len(hashes) - 1; i >= 0; i-- { if i != skip { @@ -479,20 +566,17 @@ func testQueueGapFill(t *testing.T, protocol int) { // Fill the missing block directly as if propagated tester.fetcher.Enqueue("valid", blocks[hashes[skip]]) verifyImportCount(t, imported, len(hashes)-1) + verifyChainHeight(t, tester, uint64(len(hashes)-1)) } // Tests that blocks arriving from various sources (multiple propagations, hash // announces, etc) do not get scheduled for import multiple times. -func TestImportDeduplication62(t *testing.T) { testImportDeduplication(t, 62) } -func TestImportDeduplication63(t *testing.T) { testImportDeduplication(t, 63) } -func TestImportDeduplication64(t *testing.T) { testImportDeduplication(t, 64) } - -func testImportDeduplication(t *testing.T, protocol int) { +func TestImportDeduplication(t *testing.T) { // Create two blocks to import (one for duplication, the other for stalling) hashes, blocks := makeChain(2, 0, genesis) // Create the tester and wrap the importer with a counter - tester := newTester() + tester := newTester(false) headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack) bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0) @@ -503,9 +587,9 @@ func testImportDeduplication(t *testing.T, protocol int) { } // Instrument the fetching and imported events fetching := make(chan []common.Hash) - imported := make(chan *types.Block, len(hashes)-1) + imported := make(chan interface{}, len(hashes)-1) tester.fetcher.fetchingHook = func(hashes []common.Hash) { fetching <- hashes } - tester.fetcher.importedHook = func(block *types.Block) { imported <- block } + tester.fetcher.importedHook = func(header *types.Header, block *types.Block) { imported <- block } // Announce the duplicating block, wait for retrieval, and also propagate directly tester.fetcher.Notify("valid", hashes[0], 1, time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher) @@ -534,7 +618,7 @@ func TestDistantPropagationDiscarding(t *testing.T) { low, high := len(hashes)/2+maxUncleDist+1, len(hashes)/2-maxQueueDist-1 // Create a tester and simulate a head block being the middle of the above chain - tester := newTester() + tester := newTester(false) tester.lock.Lock() tester.hashes = []common.Hash{head} @@ -558,11 +642,10 @@ func TestDistantPropagationDiscarding(t *testing.T) { // Tests that announcements with numbers much lower or higher than out current // head get discarded to prevent wasting resources on useless blocks from faulty // peers. -func TestDistantAnnouncementDiscarding62(t *testing.T) { testDistantAnnouncementDiscarding(t, 62) } -func TestDistantAnnouncementDiscarding63(t *testing.T) { testDistantAnnouncementDiscarding(t, 63) } -func TestDistantAnnouncementDiscarding64(t *testing.T) { testDistantAnnouncementDiscarding(t, 64) } +func TestFullDistantAnnouncementDiscarding(t *testing.T) { testDistantAnnouncementDiscarding(t, false) } +func TestLightDistantAnnouncementDiscarding(t *testing.T) { testDistantAnnouncementDiscarding(t, true) } -func testDistantAnnouncementDiscarding(t *testing.T, protocol int) { +func testDistantAnnouncementDiscarding(t *testing.T, light bool) { // Create a long chain to import and define the discard boundaries hashes, blocks := makeChain(3*maxQueueDist, 0, genesis) head := hashes[len(hashes)/2] @@ -570,10 +653,11 @@ func testDistantAnnouncementDiscarding(t *testing.T, protocol int) { low, high := len(hashes)/2+maxUncleDist+1, len(hashes)/2-maxQueueDist-1 // Create a tester and simulate a head block being the middle of the above chain - tester := newTester() + tester := newTester(light) tester.lock.Lock() tester.hashes = []common.Hash{head} + tester.headers = map[common.Hash]*types.Header{head: blocks[head].Header()} tester.blocks = map[common.Hash]*types.Block{head: blocks[head]} tester.lock.Unlock() @@ -601,21 +685,31 @@ func testDistantAnnouncementDiscarding(t *testing.T, protocol int) { // Tests that peers announcing blocks with invalid numbers (i.e. not matching // the headers provided afterwards) get dropped as malicious. -func TestInvalidNumberAnnouncement62(t *testing.T) { testInvalidNumberAnnouncement(t, 62) } -func TestInvalidNumberAnnouncement63(t *testing.T) { testInvalidNumberAnnouncement(t, 63) } -func TestInvalidNumberAnnouncement64(t *testing.T) { testInvalidNumberAnnouncement(t, 64) } +func TestFullInvalidNumberAnnouncement(t *testing.T) { testInvalidNumberAnnouncement(t, false) } +func TestLightInvalidNumberAnnouncement(t *testing.T) { testInvalidNumberAnnouncement(t, true) } -func testInvalidNumberAnnouncement(t *testing.T, protocol int) { +func testInvalidNumberAnnouncement(t *testing.T, light bool) { // Create a single block to import and check numbers against hashes, blocks := makeChain(1, 0, genesis) - tester := newTester() + tester := newTester(light) badHeaderFetcher := tester.makeHeaderFetcher("bad", blocks, -gatherSlack) badBodyFetcher := tester.makeBodyFetcher("bad", blocks, 0) - imported := make(chan *types.Block) - tester.fetcher.importedHook = func(block *types.Block) { imported <- block } - + imported := make(chan interface{}) + tester.fetcher.importedHook = func(header *types.Header, block *types.Block) { + if light { + if header == nil { + t.Fatalf("Fetcher try to import empty header") + } + imported <- header + } else { + if block == nil { + t.Fatalf("Fetcher try to import empty block") + } + imported <- block + } + } // Announce a block with a bad number, check for immediate drop tester.fetcher.Notify("bad", hashes[0], 2, time.Now().Add(-arriveTimeout), badHeaderFetcher, badBodyFetcher) verifyImportEvent(t, imported, false) @@ -646,15 +740,11 @@ func testInvalidNumberAnnouncement(t *testing.T, protocol int) { // Tests that if a block is empty (i.e. header only), no body request should be // made, and instead the header should be assembled into a whole block in itself. -func TestEmptyBlockShortCircuit62(t *testing.T) { testEmptyBlockShortCircuit(t, 62) } -func TestEmptyBlockShortCircuit63(t *testing.T) { testEmptyBlockShortCircuit(t, 63) } -func TestEmptyBlockShortCircuit64(t *testing.T) { testEmptyBlockShortCircuit(t, 64) } - -func testEmptyBlockShortCircuit(t *testing.T, protocol int) { +func TestEmptyBlockShortCircuit(t *testing.T) { // Create a chain of blocks to import hashes, blocks := makeChain(32, 0, genesis) - tester := newTester() + tester := newTester(false) headerFetcher := tester.makeHeaderFetcher("valid", blocks, -gatherSlack) bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0) @@ -665,9 +755,13 @@ func testEmptyBlockShortCircuit(t *testing.T, protocol int) { completing := make(chan []common.Hash) tester.fetcher.completingHook = func(hashes []common.Hash) { completing <- hashes } - imported := make(chan *types.Block) - tester.fetcher.importedHook = func(block *types.Block) { imported <- block } - + imported := make(chan interface{}) + tester.fetcher.importedHook = func(header *types.Header, block *types.Block) { + if block == nil { + t.Fatalf("Fetcher try to import empty block") + } + imported <- block + } // Iteratively announce blocks until all are imported for i := len(hashes) - 2; i >= 0; i-- { tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher) @@ -687,16 +781,12 @@ func testEmptyBlockShortCircuit(t *testing.T, protocol int) { // Tests that a peer is unable to use unbounded memory with sending infinite // block announcements to a node, but that even in the face of such an attack, // the fetcher remains operational. -func TestHashMemoryExhaustionAttack62(t *testing.T) { testHashMemoryExhaustionAttack(t, 62) } -func TestHashMemoryExhaustionAttack63(t *testing.T) { testHashMemoryExhaustionAttack(t, 63) } -func TestHashMemoryExhaustionAttack64(t *testing.T) { testHashMemoryExhaustionAttack(t, 64) } - -func testHashMemoryExhaustionAttack(t *testing.T, protocol int) { +func TestHashMemoryExhaustionAttack(t *testing.T) { // Create a tester with instrumented import hooks - tester := newTester() + tester := newTester(false) - imported, announces := make(chan *types.Block), int32(0) - tester.fetcher.importedHook = func(block *types.Block) { imported <- block } + imported, announces := make(chan interface{}), int32(0) + tester.fetcher.importedHook = func(header *types.Header, block *types.Block) { imported <- block } tester.fetcher.announceChangeHook = func(hash common.Hash, added bool) { if added { atomic.AddInt32(&announces, 1) @@ -740,10 +830,10 @@ func testHashMemoryExhaustionAttack(t *testing.T, protocol int) { // system memory. func TestBlockMemoryExhaustionAttack(t *testing.T) { // Create a tester with instrumented import hooks - tester := newTester() + tester := newTester(false) - imported, enqueued := make(chan *types.Block), int32(0) - tester.fetcher.importedHook = func(block *types.Block) { imported <- block } + imported, enqueued := make(chan interface{}), int32(0) + tester.fetcher.importedHook = func(header *types.Header, block *types.Block) { imported <- block } tester.fetcher.queueChangeHook = func(hash common.Hash, added bool) { if added { atomic.AddInt32(&enqueued, 1) diff --git a/eth/handler.go b/eth/handler.go index 9180706ada..1a15765dda 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -188,7 +188,7 @@ func NewProtocolManager(config *params.ChainConfig, checkpoint *params.TrustedCh } return n, err } - manager.blockFetcher = fetcher.NewBlockFetcher(blockchain.GetBlockByHash, validator, manager.BroadcastBlock, heighter, inserter, manager.removePeer) + manager.blockFetcher = fetcher.NewBlockFetcher(false, nil, blockchain.GetBlockByHash, validator, manager.BroadcastBlock, heighter, nil, inserter, manager.removePeer) fetchTx := func(peer string, hashes []common.Hash) error { p := manager.peers.Peer(peer) diff --git a/les/client.go b/les/client.go index 49b21c9673..a3ae647517 100644 --- a/les/client.go +++ b/les/client.go @@ -269,7 +269,7 @@ func (s *LightEthereum) EventMux() *event.TypeMux { return s.eventMux // network protocols to start. func (s *LightEthereum) Protocols() []p2p.Protocol { return s.makeProtocols(ClientProtocolVersions, s.handler.runPeer, func(id enode.ID) interface{} { - if p := s.peers.peer(peerIdToString(id)); p != nil { + if p := s.peers.peer(id.String()); p != nil { return p.Info() } return nil @@ -285,6 +285,7 @@ func (s *LightEthereum) Start(srvr *p2p.Server) error { // Start bloom request workers. s.wg.Add(bloomServiceThreads) s.startBloomHandlers(params.BloomBitsBlocksClient) + s.handler.start() s.netRPCService = ethapi.NewPublicNetAPI(srvr, s.config.NetworkId) return nil diff --git a/les/client_handler.go b/les/client_handler.go index abe472e463..cfeec7a03c 100644 --- a/les/client_handler.go +++ b/les/client_handler.go @@ -64,16 +64,20 @@ func newClientHandler(ulcServers []string, ulcFraction int, checkpoint *params.T if checkpoint != nil { height = (checkpoint.SectionIndex+1)*params.CHTFrequency - 1 } - handler.fetcher = newLightFetcher(handler, backend.serverPool.getTimeout) + handler.fetcher = newLightFetcher(backend.blockchain, backend.engine, backend.peers, handler.ulc, backend.chainDb, backend.reqDist, handler.synchronise) handler.downloader = downloader.New(height, backend.chainDb, nil, backend.eventMux, nil, backend.blockchain, handler.removePeer) handler.backend.peers.subscribe((*downloaderPeerNotify)(handler)) return handler } +func (h *clientHandler) start() { + h.fetcher.start() +} + func (h *clientHandler) stop() { close(h.closeCh) h.downloader.Terminate() - h.fetcher.close() + h.fetcher.stop() h.wg.Wait() } @@ -121,7 +125,6 @@ func (h *clientHandler) handle(p *serverPeer) error { connectionTimer.Update(time.Duration(mclock.Now() - connectedAt)) serverConnectionGauge.Update(int64(h.backend.peers.len())) }() - h.fetcher.announce(p, &announceData{Hash: p.headInfo.Hash, Number: p.headInfo.Number, Td: p.headInfo.Td}) // Mark the peer starts to be served. @@ -185,6 +188,9 @@ func (h *clientHandler) handleMsg(p *serverPeer) error { p.Log().Trace("Valid announcement signature") } p.Log().Trace("Announce message content", "number", req.Number, "hash", req.Hash, "td", req.Td, "reorg", req.ReorgDepth) + + // Update peer head information first and then notify the announcement + p.updateHead(req.Hash, req.Number, req.Td) h.fetcher.announce(p, &req) } case BlockHeadersMsg: @@ -196,12 +202,17 @@ func (h *clientHandler) handleMsg(p *serverPeer) error { if err := msg.Decode(&resp); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) } + headers := resp.Headers p.fcServer.ReceivedReply(resp.ReqID, resp.BV) p.answeredRequest(resp.ReqID) - if h.fetcher.requestedID(resp.ReqID) { - h.fetcher.deliverHeaders(p, resp.ReqID, resp.Headers) - } else { - if err := h.downloader.DeliverHeaders(p.id, resp.Headers); err != nil { + + // Filter out any explicitly requested headers, deliver the rest to the downloader + filter := len(headers) == 1 + if filter { + headers = h.fetcher.deliverHeaders(p, resp.ReqID, resp.Headers) + } + if len(headers) != 0 || !filter { + if err := h.downloader.DeliverHeaders(p.id, headers); err != nil { log.Debug("Failed to deliver headers", "err", err) } } @@ -320,8 +331,7 @@ func (h *clientHandler) handleMsg(p *serverPeer) error { // Deliver the received response to retriever. if deliverMsg != nil { if err := h.backend.retriever.deliver(p, deliverMsg); err != nil { - p.errCount++ - if p.errCount > maxResponseErrors { + if val := p.errCount.Add(1, mclock.Now()); val > maxResponseErrors { return err } } diff --git a/les/clientpool.go b/les/clientpool.go index 05970f99ab..9c4060fc2c 100644 --- a/les/clientpool.go +++ b/les/clientpool.go @@ -212,7 +212,7 @@ func (f *clientPool) connect(peer clientPoolPeer, capacity uint64) bool { id, freeID := peer.ID(), peer.freeClientId() if _, ok := f.connectedMap[id]; ok { clientRejectedMeter.Mark(1) - log.Debug("Client already connected", "address", freeID, "id", peerIdToString(id)) + log.Debug("Client already connected", "address", freeID, "id", id.String()) return false } // Create a clientInfo but do not add it yet @@ -277,7 +277,7 @@ func (f *clientPool) connect(peer clientPoolPeer, capacity uint64) bool { f.connectedQueue.Push(c) } clientRejectedMeter.Mark(1) - log.Debug("Client rejected", "address", freeID, "id", peerIdToString(id)) + log.Debug("Client rejected", "address", freeID, "id", id.String()) return false } // accept new client, drop old ones @@ -322,7 +322,7 @@ func (f *clientPool) disconnect(p clientPoolPeer) { // Short circuit if the peer hasn't been registered. e := f.connectedMap[p.ID()] if e == nil { - log.Debug("Client not connected", "address", p.freeClientId(), "id", peerIdToString(p.ID())) + log.Debug("Client not connected", "address", p.freeClientId(), "id", p.ID().String()) return } f.dropClient(e, f.clock.Now(), false) diff --git a/les/fetcher.go b/les/fetcher.go index aaf74aaa14..fc4c5e386a 100644 --- a/les/fetcher.go +++ b/les/fetcher.go @@ -18,870 +18,547 @@ package les import ( "math/big" + "math/rand" "sync" "time" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/consensus" + "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/eth/fetcher" + "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/light" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/p2p/enode" ) const ( - blockDelayTimeout = time.Second * 10 // timeout for a peer to announce a head that has already been confirmed by others - maxNodeCount = 20 // maximum number of fetcherTreeNode entries remembered for each peer - serverStateAvailable = 100 // number of recent blocks where state availability is assumed + blockDelayTimeout = 10 * time.Second // Timeout for retrieving the headers from the peer + gatherSlack = 100 * time.Millisecond // Interval used to collate almost-expired requests + cachedAnnosThreshold = 64 // The maximum queued announcements ) -// lightFetcher implements retrieval of newly announced headers. It also provides a peerHasBlock function for the -// ODR system to ensure that we only request data related to a certain block from peers who have already processed -// and announced that block. +// announce represents an new block announcement from the les server. +type announce struct { + data *announceData + trust bool + peerid enode.ID +} + +// request represents a record when the header request is sent. +type request struct { + reqid uint64 + peerid enode.ID + sendAt time.Time + hash common.Hash +} + +// response represents a response packet from network as well as a channel +// to return all un-requested data. +type response struct { + reqid uint64 + headers []*types.Header + peerid enode.ID + remain chan []*types.Header +} + +// fetcherPeer holds the fetcher-specific information for each active peer +type fetcherPeer struct { + latest *announceData // The latest announcement sent from the peer + + // These following two fields can track the latest announces + // from the peer with limited size for caching. We hold the + // assumption that all enqueued announces are td-monotonic. + announces map[common.Hash]*announce // Announcement map + announcesList []common.Hash // FIFO announces list +} + +// addAnno enqueues an new trusted announcement. If the queued announces overflow, +// evict from the oldest. +func (fp *fetcherPeer) addAnno(anno *announce) { + // Short circuit if the anno already exists. In normal case it should + // never happen since only monotonic anno is accepted. But the adversary + // may feed us fake announces with higher td but same hash. In this case, + // ignore the anno anyway. + hash := anno.data.Hash + if _, exist := fp.announces[hash]; exist { + return + } + fp.announces[hash] = anno + fp.announcesList = append(fp.announcesList, hash) + + // Evict oldest if the announces are oversized. + if len(fp.announcesList)-cachedAnnosThreshold > 0 { + for i := 0; i < len(fp.announcesList)-cachedAnnosThreshold; i++ { + delete(fp.announces, fp.announcesList[i]) + } + copy(fp.announcesList, fp.announcesList[len(fp.announcesList)-cachedAnnosThreshold:]) + fp.announcesList = fp.announcesList[:cachedAnnosThreshold] + } +} + +// forwardAnno removes all announces from the map with a number lower than +// the provided threshold. +func (fp *fetcherPeer) forwardAnno(td *big.Int) []*announce { + var ( + cutset int + evicted []*announce + ) + for ; cutset < len(fp.announcesList); cutset++ { + anno := fp.announces[fp.announcesList[cutset]] + if anno == nil { + continue // In theory it should never ever happen + } + if anno.data.Td.Cmp(td) > 0 { + break + } + evicted = append(evicted, anno) + delete(fp.announces, anno.data.Hash) + } + if cutset > 0 { + copy(fp.announcesList, fp.announcesList[cutset:]) + fp.announcesList = fp.announcesList[:len(fp.announcesList)-cutset] + } + return evicted +} + +// lightFetcher implements retrieval of newly announced headers. It reuses +// the eth.BlockFetcher as the underlying fetcher but adding more additional +// rules: e.g. evict "timeout" peers. type lightFetcher struct { - handler *clientHandler - chain *light.LightChain - softRequestTimeout func() time.Duration + // Various handlers + ulc *ulc + chaindb ethdb.Database + reqDist *requestDistributor + peerset *serverPeerSet // The global peerset of light client which shared by all components + chain *light.LightChain // The local light chain which maintains the canonical header chain. + fetcher *fetcher.BlockFetcher // The underlying fetcher which takes care block header retrieval. - lock sync.Mutex // lock protects access to the fetcher's internal state variables except sent requests - maxConfirmedTd *big.Int - peers map[*serverPeer]*fetcherPeerInfo - lastUpdateStats *updateStatsEntry - syncing bool - syncDone chan *serverPeer + // Peerset maintained by fetcher + plock sync.RWMutex + peers map[enode.ID]*fetcherPeer - reqMu sync.RWMutex // reqMu protects access to sent header fetch requests - requested map[uint64]fetchRequest - deliverChn chan fetchResponse - timeoutChn chan uint64 - requestTriggered bool - requestTrigger chan struct{} - lastTrustedHeader *types.Header + // Various channels + announceCh chan *announce + requestCh chan *request + deliverCh chan *response + syncDone chan *types.Header closeCh chan struct{} wg sync.WaitGroup + + // Callback + synchronise func(peer *serverPeer) + + // Test fields or hooks + noAnnounce bool + newHeadHook func(*types.Header) + newAnnounce func(*serverPeer, *announceData) } -// fetcherPeerInfo holds fetcher-specific information about each active peer -type fetcherPeerInfo struct { - root, lastAnnounced *fetcherTreeNode - nodeCnt int - confirmedTd *big.Int - bestConfirmed *fetcherTreeNode - nodeByHash map[common.Hash]*fetcherTreeNode - firstUpdateStats *updateStatsEntry -} - -// fetcherTreeNode is a node of a tree that holds information about blocks recently -// announced and confirmed by a certain peer. Each new announce message from a peer -// adds nodes to the tree, based on the previous announced head and the reorg depth. -// There are three possible states for a tree node: -// - announced: not downloaded (known) yet, but we know its head, number and td -// - intermediate: not known, hash and td are empty, they are filled out when it becomes known -// - known: both announced by this peer and downloaded (from any peer). -// This structure makes it possible to always know which peer has a certain block, -// which is necessary for selecting a suitable peer for ODR requests and also for -// canonizing new heads. It also helps to always download the minimum necessary -// amount of headers with a single request. -type fetcherTreeNode struct { - hash common.Hash - number uint64 - td *big.Int - known, requested bool - parent *fetcherTreeNode - children []*fetcherTreeNode -} - -// fetchRequest represents a header download request -type fetchRequest struct { - hash common.Hash - amount uint64 - peer *serverPeer - sent mclock.AbsTime - timeout bool -} - -// fetchResponse represents a header download response -type fetchResponse struct { - reqID uint64 - headers []*types.Header - peer *serverPeer -} - -// newLightFetcher creates a new light fetcher -func newLightFetcher(h *clientHandler, softRequestTimeout func() time.Duration) *lightFetcher { - f := &lightFetcher{ - handler: h, - chain: h.backend.blockchain, - peers: make(map[*serverPeer]*fetcherPeerInfo), - deliverChn: make(chan fetchResponse, 100), - requested: make(map[uint64]fetchRequest), - timeoutChn: make(chan uint64), - requestTrigger: make(chan struct{}, 1), - syncDone: make(chan *serverPeer), - closeCh: make(chan struct{}), - maxConfirmedTd: big.NewInt(0), - softRequestTimeout: softRequestTimeout, +// newLightFetcher creates a light fetcher instance. +func newLightFetcher(chain *light.LightChain, engine consensus.Engine, peers *serverPeerSet, ulc *ulc, chaindb ethdb.Database, reqDist *requestDistributor, syncFn func(p *serverPeer)) *lightFetcher { + // Construct the fetcher by offering all necessary APIs + validator := func(header *types.Header) error { + // Disable seal verification explicitly if we are running in ulc mode. + return engine.VerifyHeader(chain, header, ulc == nil) } - h.backend.peers.subscribe(f) - - f.wg.Add(1) - go f.syncLoop() + heighter := func() uint64 { return chain.CurrentHeader().Number.Uint64() } + dropper := func(id string) { peers.unregister(id) } + inserter := func(headers []*types.Header) (int, error) { + // Disable PoW checking explicitly if we are running in ulc mode. + checkFreq := 1 + if ulc != nil { + checkFreq = 0 + } + return chain.InsertHeaderChain(headers, checkFreq) + } + f := &lightFetcher{ + ulc: ulc, + peerset: peers, + chaindb: chaindb, + chain: chain, + reqDist: reqDist, + fetcher: fetcher.NewBlockFetcher(true, chain.GetHeaderByHash, nil, validator, nil, heighter, inserter, nil, dropper), + peers: make(map[enode.ID]*fetcherPeer), + synchronise: syncFn, + announceCh: make(chan *announce), + requestCh: make(chan *request), + deliverCh: make(chan *response), + syncDone: make(chan *types.Header), + closeCh: make(chan struct{}), + } + peers.subscribe(f) return f } -func (f *lightFetcher) close() { +func (f *lightFetcher) start() { + f.wg.Add(1) + f.fetcher.Start() + go f.mainloop() +} + +func (f *lightFetcher) stop() { close(f.closeCh) + f.fetcher.Stop() f.wg.Wait() } -// syncLoop is the main event loop of the light fetcher -func (f *lightFetcher) syncLoop() { +// registerPeer adds an new peer to the fetcher's peer set +func (f *lightFetcher) registerPeer(p *serverPeer) { + f.plock.Lock() + defer f.plock.Unlock() + + f.peers[p.ID()] = &fetcherPeer{announces: make(map[common.Hash]*announce)} +} + +// unregisterPeer removes the specified peer from the fetcher's peer set +func (f *lightFetcher) unregisterPeer(p *serverPeer) { + f.plock.Lock() + defer f.plock.Unlock() + + delete(f.peers, p.ID()) +} + +// peer returns the peer from the fetcher peerset. +func (f *lightFetcher) peer(id enode.ID) *fetcherPeer { + f.plock.RLock() + defer f.plock.RUnlock() + + return f.peers[id] +} + +// forEachPeer iterates the fetcher peerset, abort the iteration if the +// callback returns false. +func (f *lightFetcher) forEachPeer(check func(id enode.ID, p *fetcherPeer) bool) { + f.plock.RLock() + defer f.plock.RUnlock() + + for id, peer := range f.peers { + if !check(id, peer) { + return + } + } +} + +// mainloop is the main event loop of the light fetcher, which is responsible for +// - announcement maintenance(ulc) +// If we are running in ultra light client mode, then all announcements from +// the trusted servers are maintained. If the same announcements from trusted +// servers reach the threshold, then the relevant header is requested for retrieval. +// +// - block header retrieval +// Whenever we receive announce with higher td compared with local chain, the +// request will be made for header retrieval. +// +// - re-sync trigger +// If the local chain lags too much, then the fetcher will enter "synnchronise" +// mode to retrieve missing headers in batch. +func (f *lightFetcher) mainloop() { defer f.wg.Done() + + var ( + syncInterval = uint64(1) // Interval used to trigger a light resync. + syncing bool // Indicator whether the client is syncing + + ulc = f.ulc != nil + headCh = make(chan core.ChainHeadEvent, 100) + fetching = make(map[uint64]*request) + requestTimer = time.NewTimer(0) + + // Local status + localHead = f.chain.CurrentHeader() + localTd = f.chain.GetTd(localHead.Hash(), localHead.Number.Uint64()) + ) + sub := f.chain.SubscribeChainHeadEvent(headCh) + defer sub.Unsubscribe() + + // reset updates the local status with given header. + reset := func(header *types.Header) { + localHead = header + localTd = f.chain.GetTd(header.Hash(), header.Number.Uint64()) + } + // trustedHeader returns an indicator whether the header is regarded as + // trusted. If we are running in the ulc mode, only when we receive enough + // same announcement from trusted server, the header will be trusted. + trustedHeader := func(hash common.Hash, number uint64) (bool, []enode.ID) { + var ( + agreed []enode.ID + trusted bool + ) + f.forEachPeer(func(id enode.ID, p *fetcherPeer) bool { + if anno := p.announces[hash]; anno != nil && anno.trust && anno.data.Number == number { + agreed = append(agreed, id) + if 100*len(agreed)/len(f.ulc.keys) >= f.ulc.fraction { + trusted = true + return false // abort iteration + } + } + return true + }) + return trusted, agreed + } for { select { - case <-f.closeCh: - return - // request loop keeps running until no further requests are necessary or possible - case <-f.requestTrigger: - f.lock.Lock() - var ( - rq *distReq - reqID uint64 - syncing bool - ) - if !f.syncing { - rq, reqID, syncing = f.nextRequest() - } - f.requestTriggered = rq != nil - f.lock.Unlock() + case anno := <-f.announceCh: + peerid, data := anno.peerid, anno.data + log.Debug("Received new announce", "peer", peerid, "number", data.Number, "hash", data.Hash, "reorg", data.ReorgDepth) - if rq != nil { - if _, ok := <-f.handler.backend.reqDist.queue(rq); ok { - if syncing { - f.lock.Lock() - f.syncing = true - f.lock.Unlock() - } else { - go func() { - time.Sleep(f.softRequestTimeout()) - f.reqMu.Lock() - req, ok := f.requested[reqID] - if ok { - req.timeout = true - f.requested[reqID] = req - } - f.reqMu.Unlock() - // keep starting new requests while possible - f.requestTrigger <- struct{}{} - }() - } - } else { - f.requestTrigger <- struct{}{} - } - } - case reqID := <-f.timeoutChn: - f.reqMu.Lock() - req, ok := f.requested[reqID] - if ok { - delete(f.requested, reqID) - } - f.reqMu.Unlock() - if ok { - req.peer.Log().Debug("Fetching data timed out hard") - go f.handler.removePeer(req.peer.id) - } - case resp := <-f.deliverChn: - f.reqMu.Lock() - req, ok := f.requested[resp.reqID] - if ok && req.peer != resp.peer { - ok = false - } - if ok { - delete(f.requested, resp.reqID) - } - f.reqMu.Unlock() - f.lock.Lock() - if !ok || !(f.syncing || f.processResponse(req, resp)) { - resp.peer.Log().Debug("Failed processing response") - go f.handler.removePeer(resp.peer.id) - } - f.lock.Unlock() - case p := <-f.syncDone: - f.lock.Lock() - p.Log().Debug("Done synchronising with peer") - f.checkSyncedHeaders(p) - f.syncing = false - f.lock.Unlock() - f.requestTrigger <- struct{}{} // f.requestTriggered is always true here - } - } -} - -// registerPeer adds a new peer to the fetcher's peer set -func (f *lightFetcher) registerPeer(p *serverPeer) { - p.lock.Lock() - p.hasBlock = func(hash common.Hash, number uint64, hasState bool) bool { - return f.peerHasBlock(p, hash, number, hasState) - } - p.lock.Unlock() - - f.lock.Lock() - defer f.lock.Unlock() - f.peers[p] = &fetcherPeerInfo{nodeByHash: make(map[common.Hash]*fetcherTreeNode)} -} - -// unregisterPeer removes a new peer from the fetcher's peer set -func (f *lightFetcher) unregisterPeer(p *serverPeer) { - p.lock.Lock() - p.hasBlock = nil - p.lock.Unlock() - - f.lock.Lock() - defer f.lock.Unlock() - - // check for potential timed out block delay statistics - f.checkUpdateStats(p, nil) - delete(f.peers, p) -} - -// announce processes a new announcement message received from a peer, adding new -// nodes to the peer's block tree and removing old nodes if necessary -func (f *lightFetcher) announce(p *serverPeer, head *announceData) { - f.lock.Lock() - defer f.lock.Unlock() - p.Log().Debug("Received new announcement", "number", head.Number, "hash", head.Hash, "reorg", head.ReorgDepth) - - fp := f.peers[p] - if fp == nil { - p.Log().Debug("Announcement from unknown peer") - return - } - - if fp.lastAnnounced != nil && head.Td.Cmp(fp.lastAnnounced.td) <= 0 { - // announced tds should be strictly monotonic - p.Log().Debug("Received non-monotonic td", "current", head.Td, "previous", fp.lastAnnounced.td) - go f.handler.removePeer(p.id) - return - } - - n := fp.lastAnnounced - for i := uint64(0); i < head.ReorgDepth; i++ { - if n == nil { - break - } - n = n.parent - } - // n is now the reorg common ancestor, add a new branch of nodes - if n != nil && (head.Number >= n.number+maxNodeCount || head.Number <= n.number) { - // if announced head block height is lower or same as n or too far from it to add - // intermediate nodes then discard previous announcement info and trigger a resync - n = nil - fp.nodeCnt = 0 - fp.nodeByHash = make(map[common.Hash]*fetcherTreeNode) - } - // check if the node count is too high to add new nodes, discard oldest ones if necessary - if n != nil { - // n is now the reorg common ancestor, add a new branch of nodes - // check if the node count is too high to add new nodes - locked := false - for uint64(fp.nodeCnt)+head.Number-n.number > maxNodeCount && fp.root != nil { - if !locked { - f.chain.LockChain() - defer f.chain.UnlockChain() - locked = true - } - // if one of root's children is canonical, keep it, delete other branches and root itself - var newRoot *fetcherTreeNode - for i, nn := range fp.root.children { - if rawdb.ReadCanonicalHash(f.handler.backend.chainDb, nn.number) == nn.hash { - fp.root.children = append(fp.root.children[:i], fp.root.children[i+1:]...) - nn.parent = nil - newRoot = nn - break - } - } - fp.deleteNode(fp.root) - if n == fp.root { - n = newRoot - } - fp.root = newRoot - if newRoot == nil || !f.checkKnownNode(p, newRoot) { - fp.bestConfirmed = nil - fp.confirmedTd = nil - } - - if n == nil { - break - } - } - if n != nil { - for n.number < head.Number { - nn := &fetcherTreeNode{number: n.number + 1, parent: n} - n.children = append(n.children, nn) - n = nn - fp.nodeCnt++ - } - n.hash = head.Hash - n.td = head.Td - fp.nodeByHash[n.hash] = n - } - } - - if n == nil { - // could not find reorg common ancestor or had to delete entire tree, a new root and a resync is needed - if fp.root != nil { - fp.deleteNode(fp.root) - } - n = &fetcherTreeNode{hash: head.Hash, number: head.Number, td: head.Td} - fp.root = n - fp.nodeCnt++ - fp.nodeByHash[n.hash] = n - fp.bestConfirmed = nil - fp.confirmedTd = nil - } - - f.checkKnownNode(p, n) - p.lock.Lock() - p.headInfo = blockInfo{Number: head.Number, Hash: head.Hash, Td: head.Td} - fp.lastAnnounced = n - p.lock.Unlock() - f.checkUpdateStats(p, nil) - if !f.requestTriggered { - f.requestTriggered = true - f.requestTrigger <- struct{}{} - } -} - -// peerHasBlock returns true if we can assume the peer knows the given block -// based on its announcements -func (f *lightFetcher) peerHasBlock(p *serverPeer, hash common.Hash, number uint64, hasState bool) bool { - f.lock.Lock() - defer f.lock.Unlock() - - fp := f.peers[p] - if fp == nil || fp.root == nil { - return false - } - - if hasState { - if fp.lastAnnounced == nil || fp.lastAnnounced.number > number+serverStateAvailable { - return false - } - } - - if f.syncing { - // always return true when syncing - // false positives are acceptable, a more sophisticated condition can be implemented later - return true - } - - if number >= fp.root.number { - // it is recent enough that if it is known, is should be in the peer's block tree - return fp.nodeByHash[hash] != nil - } - f.chain.LockChain() - defer f.chain.UnlockChain() - // if it's older than the peer's block tree root but it's in the same canonical chain - // as the root, we can still be sure the peer knows it - // - // when syncing, just check if it is part of the known chain, there is nothing better we - // can do since we do not know the most recent block hash yet - return rawdb.ReadCanonicalHash(f.handler.backend.chainDb, fp.root.number) == fp.root.hash && rawdb.ReadCanonicalHash(f.handler.backend.chainDb, number) == hash -} - -// requestAmount calculates the amount of headers to be downloaded starting -// from a certain head backwards -func (f *lightFetcher) requestAmount(p *serverPeer, n *fetcherTreeNode) uint64 { - amount := uint64(0) - nn := n - for nn != nil && !f.checkKnownNode(p, nn) { - nn = nn.parent - amount++ - } - if nn == nil { - amount = n.number - } - return amount -} - -// requestedID tells if a certain reqID has been requested by the fetcher -func (f *lightFetcher) requestedID(reqID uint64) bool { - f.reqMu.RLock() - _, ok := f.requested[reqID] - f.reqMu.RUnlock() - return ok -} - -// nextRequest selects the peer and announced head to be requested next, amount -// to be downloaded starting from the head backwards is also returned -func (f *lightFetcher) nextRequest() (*distReq, uint64, bool) { - var ( - bestHash common.Hash - bestAmount uint64 - bestTd *big.Int - bestSyncing bool - ) - bestHash, bestAmount, bestTd, bestSyncing = f.findBestRequest() - - if bestTd == f.maxConfirmedTd { - return nil, 0, false - } - - var rq *distReq - reqID := genReqID() - if bestSyncing { - rq = f.newFetcherDistReqForSync(bestHash) - } else { - rq = f.newFetcherDistReq(bestHash, reqID, bestAmount) - } - return rq, reqID, bestSyncing -} - -// findBestRequest finds the best head to request that has been announced by but not yet requested from a known peer. -// It also returns the announced Td (which should be verified after fetching the head), -// the necessary amount to request and whether a downloader sync is necessary instead of a normal header request. -func (f *lightFetcher) findBestRequest() (bestHash common.Hash, bestAmount uint64, bestTd *big.Int, bestSyncing bool) { - bestTd = f.maxConfirmedTd - bestSyncing = false - - for p, fp := range f.peers { - for hash, n := range fp.nodeByHash { - if f.checkKnownNode(p, n) || n.requested { + peer := f.peer(peerid) + if peer == nil { + log.Debug("Receive announce from unknown peer", "peer", peerid) continue } - // if ulc mode is disabled, isTrustedHash returns true - amount := f.requestAmount(p, n) - if (bestTd == nil || n.td.Cmp(bestTd) > 0 || amount < bestAmount) && (f.isTrustedHash(hash) || f.maxConfirmedTd.Int64() == 0) { - bestHash = hash - bestTd = n.td - bestAmount = amount - bestSyncing = fp.bestConfirmed == nil || fp.root == nil || !f.checkKnownNode(p, fp.root) + // Announced tds should be strictly monotonic, drop the peer if + // the announce is out-of-order. + if peer.latest != nil && data.Td.Cmp(peer.latest.Td) <= 0 { + f.peerset.unregister(peerid.String()) + log.Debug("Non-monotonic td", "peer", peerid, "current", data.Td, "previous", peer.latest.Td) + continue } - } - } - return -} + peer.latest = data -// isTrustedHash checks if the block can be trusted by the minimum trusted fraction. -func (f *lightFetcher) isTrustedHash(hash common.Hash) bool { - // If ultra light cliet mode is disabled, trust all hashes - if f.handler.ulc == nil { - return true - } - // Ultra light enabled, only trust after enough confirmations - var agreed int - for peer, info := range f.peers { - if peer.trusted && info.nodeByHash[hash] != nil { - agreed++ - } - } - return 100*agreed/len(f.handler.ulc.keys) >= f.handler.ulc.fraction -} - -func (f *lightFetcher) newFetcherDistReqForSync(bestHash common.Hash) *distReq { - return &distReq{ - getCost: func(dp distPeer) uint64 { - return 0 - }, - canSend: func(dp distPeer) bool { - p := dp.(*serverPeer) - f.lock.Lock() - defer f.lock.Unlock() - - if p.onlyAnnounce { - return false + // Filter out any stale announce, the local chain is ahead of announce + if localTd != nil && data.Td.Cmp(localTd) <= 0 { + continue } - fp := f.peers[p] - return fp != nil && fp.nodeByHash[bestHash] != nil - }, - request: func(dp distPeer) func() { - if f.handler.ulc != nil { - // Keep last trusted header before sync - f.setLastTrustedHeader(f.chain.CurrentHeader()) - } - go func() { - p := dp.(*serverPeer) - p.Log().Debug("Synchronisation started") - f.handler.synchronise(p) - f.syncDone <- p - }() - return nil - }, - } -} + peer.addAnno(anno) -// newFetcherDistReq creates a new request for the distributor. -func (f *lightFetcher) newFetcherDistReq(bestHash common.Hash, reqID uint64, bestAmount uint64) *distReq { - return &distReq{ - getCost: func(dp distPeer) uint64 { - p := dp.(*serverPeer) - return p.getRequestCost(GetBlockHeadersMsg, int(bestAmount)) - }, - canSend: func(dp distPeer) bool { - p := dp.(*serverPeer) - f.lock.Lock() - defer f.lock.Unlock() - - if p.onlyAnnounce { - return false + // If we are not syncing, try to trigger a single retrieval or re-sync + if !ulc && !syncing { + // Two scenarios lead to re-sync: + // - reorg happens + // - local chain lags + // We can't retrieve the parent of the announce by single retrieval + // in both cases, so resync is necessary. + if data.Number > localHead.Number.Uint64()+syncInterval || data.ReorgDepth > 0 { + syncing = true + go f.startSync(peerid) + log.Debug("Trigger light sync", "peer", peerid, "local", localHead.Number, "localhash", localHead.Hash(), "remote", data.Number, "remotehash", data.Hash) + continue + } + f.fetcher.Notify(peerid.String(), data.Hash, data.Number, time.Now(), f.requestHeaderByHash(peerid), nil) + log.Debug("Trigger header retrieval", "peer", peerid, "number", data.Number, "hash", data.Hash) } - fp := f.peers[p] - if fp == nil { - return false - } - n := fp.nodeByHash[bestHash] - return n != nil && !n.requested - }, - request: func(dp distPeer) func() { - p := dp.(*serverPeer) - f.lock.Lock() - fp := f.peers[p] - if fp != nil { - n := fp.nodeByHash[bestHash] - if n != nil { - n.requested = true + // Keep collecting announces from trusted server even we are syncing. + if ulc && anno.trust { + // Notify underlying fetcher to retrieve header or trigger a resync if + // we have receive enough announcements from trusted server. + trusted, agreed := trustedHeader(data.Hash, data.Number) + if trusted && !syncing { + if data.Number > localHead.Number.Uint64()+syncInterval || data.ReorgDepth > 0 { + syncing = true + go f.startSync(peerid) + log.Debug("Trigger trusted light sync", "local", localHead.Number, "localhash", localHead.Hash(), "remote", data.Number, "remotehash", data.Hash) + continue + } + p := agreed[rand.Intn(len(agreed))] + f.fetcher.Notify(p.String(), data.Hash, data.Number, time.Now(), f.requestHeaderByHash(p), nil) + log.Debug("Trigger trusted header retrieval", "number", data.Number, "hash", data.Hash) } } - f.lock.Unlock() - cost := p.getRequestCost(GetBlockHeadersMsg, int(bestAmount)) - p.fcServer.QueuedRequest(reqID, cost) - f.reqMu.Lock() - f.requested[reqID] = fetchRequest{hash: bestHash, amount: bestAmount, peer: p, sent: mclock.Now()} - f.reqMu.Unlock() - go func() { - time.Sleep(hardRequestTimeout) - f.timeoutChn <- reqID - }() - return func() { p.requestHeadersByHash(reqID, bestHash, int(bestAmount), 0, true) } - }, + case req := <-f.requestCh: + fetching[req.reqid] = req // Tracking all in-flight requests for response latency statistic. + if len(fetching) == 1 { + f.rescheduleTimer(fetching, requestTimer) + } + + case <-requestTimer.C: + for reqid, request := range fetching { + if time.Since(request.sendAt) > blockDelayTimeout-gatherSlack { + delete(fetching, reqid) + f.peerset.unregister(request.peerid.String()) + log.Debug("Request timeout", "peer", request.peerid, "reqid", reqid) + } + } + f.rescheduleTimer(fetching, requestTimer) + + case resp := <-f.deliverCh: + if req := fetching[resp.reqid]; req != nil { + delete(fetching, resp.reqid) + f.rescheduleTimer(fetching, requestTimer) + + // The underlying fetcher does not check the consistency of request and response. + // The adversary can send the fake announces with invalid hash and number but always + // delivery some mismatched header. So it can't be punished by the underlying fetcher. + // We have to add two more rules here to detect. + if len(resp.headers) != 1 { + f.peerset.unregister(req.peerid.String()) + log.Debug("Deliver more than requested", "peer", req.peerid, "reqid", req.reqid) + continue + } + if resp.headers[0].Hash() != req.hash { + f.peerset.unregister(req.peerid.String()) + log.Debug("Deliver invalid header", "peer", req.peerid, "reqid", req.reqid) + continue + } + resp.remain <- f.fetcher.FilterHeaders(resp.peerid.String(), resp.headers, time.Now()) + } else { + // Discard the entire packet no matter it's a timeout response or unexpected one. + resp.remain <- resp.headers + } + + case ev := <-headCh: + // Short circuit if we are still syncing. + if syncing { + continue + } + reset(ev.Block.Header()) + + // Clean stale announcements from les-servers. + var droplist []enode.ID + f.forEachPeer(func(id enode.ID, p *fetcherPeer) bool { + removed := p.forwardAnno(localTd) + for _, anno := range removed { + if header := f.chain.GetHeaderByHash(anno.data.Hash); header != nil { + if header.Number.Uint64() != anno.data.Number { + droplist = append(droplist, id) + break + } + // In theory td should exists. + td := f.chain.GetTd(anno.data.Hash, anno.data.Number) + if td != nil && td.Cmp(anno.data.Td) != 0 { + droplist = append(droplist, id) + break + } + } + } + return true + }) + for _, id := range droplist { + f.peerset.unregister(id.String()) + log.Debug("Kicked out peer for invalid announcement") + } + if f.newHeadHook != nil { + f.newHeadHook(localHead) + } + + case origin := <-f.syncDone: + syncing = false // Reset the status + + // Rewind all untrusted headers for ulc mode. + if ulc { + head := f.chain.CurrentHeader() + ancestor := rawdb.FindCommonAncestor(f.chaindb, origin, head) + var untrusted []common.Hash + for head.Number.Cmp(ancestor.Number) > 0 { + hash, number := head.Hash(), head.Number.Uint64() + if trusted, _ := trustedHeader(hash, number); trusted { + break + } + untrusted = append(untrusted, hash) + head = f.chain.GetHeader(head.ParentHash, number-1) + } + if len(untrusted) > 0 { + for i, j := 0, len(untrusted)-1; i < j; i, j = i+1, j-1 { + untrusted[i], untrusted[j] = untrusted[j], untrusted[i] + } + f.chain.Rollback(untrusted) + } + } + // Reset local status. + reset(f.chain.CurrentHeader()) + if f.newHeadHook != nil { + f.newHeadHook(localHead) + } + log.Debug("light sync finished", "number", localHead.Number, "hash", localHead.Hash()) + + case <-f.closeCh: + return + } } } +// announce processes a new announcement message received from a peer. +func (f *lightFetcher) announce(p *serverPeer, head *announceData) { + if f.newAnnounce != nil { + f.newAnnounce(p, head) + } + if f.noAnnounce { + return + } + select { + case f.announceCh <- &announce{peerid: p.ID(), trust: p.trusted, data: head}: + case <-f.closeCh: + return + } +} + +// trackRequest sends a reqID to main loop for in-flight request tracking. +func (f *lightFetcher) trackRequest(peerid enode.ID, reqid uint64, hash common.Hash) { + select { + case f.requestCh <- &request{reqid: reqid, peerid: peerid, sendAt: time.Now(), hash: hash}: + case <-f.closeCh: + } +} + +// requestHeaderByHash constructs a header retrieval request and sends it to +// local request distributor. +// +// Note, we rely on the underlying eth/fetcher to retrieve and validate the +// response, so that we have to obey the rule of eth/fetcher which only accepts +// the response from given peer. +func (f *lightFetcher) requestHeaderByHash(peerid enode.ID) func(common.Hash) error { + return func(hash common.Hash) error { + req := &distReq{ + getCost: func(dp distPeer) uint64 { return dp.(*serverPeer).getRequestCost(GetBlockHeadersMsg, 1) }, + canSend: func(dp distPeer) bool { return dp.(*serverPeer).ID() == peerid }, + request: func(dp distPeer) func() { + peer, id := dp.(*serverPeer), genReqID() + cost := peer.getRequestCost(GetBlockHeadersMsg, 1) + peer.fcServer.QueuedRequest(id, cost) + + return func() { + f.trackRequest(peer.ID(), id, hash) + peer.requestHeadersByHash(id, hash, 1, 0, false) + } + }, + } + f.reqDist.queue(req) + return nil + } +} + +// requestResync invokes synchronisation callback to start syncing. +func (f *lightFetcher) startSync(id enode.ID) { + defer func(header *types.Header) { + f.syncDone <- header + }(f.chain.CurrentHeader()) + + peer := f.peerset.peer(id.String()) + if peer == nil || peer.onlyAnnounce { + return + } + f.synchronise(peer) +} + // deliverHeaders delivers header download request responses for processing -func (f *lightFetcher) deliverHeaders(peer *serverPeer, reqID uint64, headers []*types.Header) { - f.deliverChn <- fetchResponse{reqID: reqID, headers: headers, peer: peer} +func (f *lightFetcher) deliverHeaders(peer *serverPeer, reqid uint64, headers []*types.Header) []*types.Header { + remain := make(chan []*types.Header, 1) + select { + case f.deliverCh <- &response{reqid: reqid, headers: headers, peerid: peer.ID(), remain: remain}: + case <-f.closeCh: + return nil + } + return <-remain } -// processResponse processes header download request responses, returns true if successful -func (f *lightFetcher) processResponse(req fetchRequest, resp fetchResponse) bool { - if uint64(len(resp.headers)) != req.amount || resp.headers[0].Hash() != req.hash { - req.peer.Log().Debug("Response content mismatch", "requested", len(resp.headers), "reqfrom", resp.headers[0], "delivered", req.amount, "delfrom", req.hash) - return false - } - headers := make([]*types.Header, req.amount) - for i, header := range resp.headers { - headers[int(req.amount)-1-i] = header - } - - if _, err := f.chain.InsertHeaderChain(headers, 1); err != nil { - if err == consensus.ErrFutureBlock { - return true - } - log.Debug("Failed to insert header chain", "err", err) - return false - } - tds := make([]*big.Int, len(headers)) - for i, header := range headers { - td := f.chain.GetTd(header.Hash(), header.Number.Uint64()) - if td == nil { - log.Debug("Total difficulty not found for header", "index", i+1, "number", header.Number, "hash", header.Hash()) - return false - } - tds[i] = td - } - f.newHeaders(headers, tds) - return true -} - -// newHeaders updates the block trees of all active peers according to a newly -// downloaded and validated batch or headers -func (f *lightFetcher) newHeaders(headers []*types.Header, tds []*big.Int) { - var maxTd *big.Int - - for p, fp := range f.peers { - if !f.checkAnnouncedHeaders(fp, headers, tds) { - p.Log().Debug("Inconsistent announcement") - go f.handler.removePeer(p.id) - } - if fp.confirmedTd != nil && (maxTd == nil || maxTd.Cmp(fp.confirmedTd) > 0) { - maxTd = fp.confirmedTd - } - } - - if maxTd != nil { - f.updateMaxConfirmedTd(maxTd) - } -} - -// checkAnnouncedHeaders updates peer's block tree if necessary after validating -// a batch of headers. It searches for the latest header in the batch that has a -// matching tree node (if any), and if it has not been marked as known already, -// sets it and its parents to known (even those which are older than the currently -// validated ones). Return value shows if all hashes, numbers and Tds matched -// correctly to the announced values (otherwise the peer should be dropped). -func (f *lightFetcher) checkAnnouncedHeaders(fp *fetcherPeerInfo, headers []*types.Header, tds []*big.Int) bool { - var ( - n *fetcherTreeNode - header *types.Header - td *big.Int - ) - - for i := len(headers) - 1; ; i-- { - if i < 0 { - if n == nil { - // no more headers and nothing to match - return true - } - // we ran out of recently delivered headers but have not reached a node known by this peer yet, continue matching - hash, number := header.ParentHash, header.Number.Uint64()-1 - td = f.chain.GetTd(hash, number) - header = f.chain.GetHeader(hash, number) - if header == nil || td == nil { - log.Error("Missing parent of validated header", "hash", hash, "number", number) - return false - } - } else { - header = headers[i] - td = tds[i] - } - hash := header.Hash() - number := header.Number.Uint64() - if n == nil { - n = fp.nodeByHash[hash] - } - if n != nil { - if n.td == nil { - // node was unannounced - if nn := fp.nodeByHash[hash]; nn != nil { - // if there was already a node with the same hash, continue there and drop this one - nn.children = append(nn.children, n.children...) - n.children = nil - fp.deleteNode(n) - n = nn - } else { - n.hash = hash - n.td = td - fp.nodeByHash[hash] = n - } - } - // check if it matches the header - if n.hash != hash || n.number != number || n.td.Cmp(td) != 0 { - // peer has previously made an invalid announcement - return false - } - if n.known { - // we reached a known node that matched our expectations, return with success - return true - } - n.known = true - if fp.confirmedTd == nil || td.Cmp(fp.confirmedTd) > 0 { - fp.confirmedTd = td - fp.bestConfirmed = n - } - n = n.parent - if n == nil { - return true - } - } - } -} - -// checkSyncedHeaders updates peer's block tree after synchronisation by marking -// downloaded headers as known. If none of the announced headers are found after -// syncing, the peer is dropped. -func (f *lightFetcher) checkSyncedHeaders(p *serverPeer) { - fp := f.peers[p] - if fp == nil { - p.Log().Debug("Unknown peer to check sync headers") +// rescheduleTimer resets the specified timeout timer to the next request timeout. +func (f *lightFetcher) rescheduleTimer(requests map[uint64]*request, timer *time.Timer) { + // Short circuit if no inflight requests + if len(requests) == 0 { + timer.Stop() return } - var ( - node = fp.lastAnnounced - td *big.Int - ) - if f.handler.ulc != nil { - // Roll back untrusted blocks - h, unapproved := f.lastTrustedTreeNode(p) - f.chain.Rollback(unapproved) - node = fp.nodeByHash[h.Hash()] - } - // Find last valid block - for node != nil { - if td = f.chain.GetTd(node.hash, node.number); td != nil { - break - } - node = node.parent - } - // Now node is the latest downloaded/approved header after syncing - if node == nil { - p.Log().Debug("Synchronisation failed") - go f.handler.removePeer(p.id) - return - } - header := f.chain.GetHeader(node.hash, node.number) - f.newHeaders([]*types.Header{header}, []*big.Int{td}) -} - -// lastTrustedTreeNode return last approved treeNode and a list of unapproved hashes -func (f *lightFetcher) lastTrustedTreeNode(p *serverPeer) (*types.Header, []common.Hash) { - unapprovedHashes := make([]common.Hash, 0) - current := f.chain.CurrentHeader() - - if f.lastTrustedHeader == nil { - return current, unapprovedHashes - } - - canonical := f.chain.CurrentHeader() - if canonical.Number.Uint64() > f.lastTrustedHeader.Number.Uint64() { - canonical = f.chain.GetHeaderByNumber(f.lastTrustedHeader.Number.Uint64()) - } - commonAncestor := rawdb.FindCommonAncestor(f.handler.backend.chainDb, canonical, f.lastTrustedHeader) - if commonAncestor == nil { - log.Error("Common ancestor of last trusted header and canonical header is nil", "canonical hash", canonical.Hash(), "trusted hash", f.lastTrustedHeader.Hash()) - return current, unapprovedHashes - } - - for current.Hash() == commonAncestor.Hash() { - if f.isTrustedHash(current.Hash()) { - break - } - unapprovedHashes = append(unapprovedHashes, current.Hash()) - current = f.chain.GetHeader(current.ParentHash, current.Number.Uint64()-1) - } - return current, unapprovedHashes -} - -func (f *lightFetcher) setLastTrustedHeader(h *types.Header) { - f.lock.Lock() - defer f.lock.Unlock() - f.lastTrustedHeader = h -} - -// checkKnownNode checks if a block tree node is known (downloaded and validated) -// If it was not known previously but found in the database, sets its known flag -func (f *lightFetcher) checkKnownNode(p *serverPeer, n *fetcherTreeNode) bool { - if n.known { - return true - } - td := f.chain.GetTd(n.hash, n.number) - if td == nil { - return false - } - header := f.chain.GetHeader(n.hash, n.number) - // check the availability of both header and td because reads are not protected by chain db mutex - // Note: returning false is always safe here - if header == nil { - return false - } - - fp := f.peers[p] - if fp == nil { - p.Log().Debug("Unknown peer to check known nodes") - return false - } - if !f.checkAnnouncedHeaders(fp, []*types.Header{header}, []*big.Int{td}) { - p.Log().Debug("Inconsistent announcement") - go f.handler.removePeer(p.id) - } - if fp.confirmedTd != nil { - f.updateMaxConfirmedTd(fp.confirmedTd) - } - return n.known -} - -// deleteNode deletes a node and its child subtrees from a peer's block tree -func (fp *fetcherPeerInfo) deleteNode(n *fetcherTreeNode) { - if n.parent != nil { - for i, nn := range n.parent.children { - if nn == n { - n.parent.children = append(n.parent.children[:i], n.parent.children[i+1:]...) - break - } - } - } - for { - if n.td != nil { - delete(fp.nodeByHash, n.hash) - } - fp.nodeCnt-- - if len(n.children) == 0 { - return - } - for i, nn := range n.children { - if i == 0 { - n = nn - } else { - fp.deleteNode(nn) - } - } - } -} - -// updateStatsEntry items form a linked list that is expanded with a new item every time a new head with a higher Td -// than the previous one has been downloaded and validated. The list contains a series of maximum confirmed Td values -// and the time these values have been confirmed, both increasing monotonically. A maximum confirmed Td is calculated -// both globally for all peers and also for each individual peer (meaning that the given peer has announced the head -// and it has also been downloaded from any peer, either before or after the given announcement). -// The linked list has a global tail where new confirmed Td entries are added and a separate head for each peer, -// pointing to the next Td entry that is higher than the peer's max confirmed Td (nil if it has already confirmed -// the current global head). -type updateStatsEntry struct { - time mclock.AbsTime - td *big.Int - next *updateStatsEntry -} - -// updateMaxConfirmedTd updates the block delay statistics of active peers. Whenever a new highest Td is confirmed, -// adds it to the end of a linked list together with the time it has been confirmed. Then checks which peers have -// already confirmed a head with the same or higher Td (which counts as zero block delay) and updates their statistics. -// Those who have not confirmed such a head by now will be updated by a subsequent checkUpdateStats call with a -// positive block delay value. -func (f *lightFetcher) updateMaxConfirmedTd(td *big.Int) { - if f.maxConfirmedTd == nil || td.Cmp(f.maxConfirmedTd) > 0 { - f.maxConfirmedTd = td - newEntry := &updateStatsEntry{ - time: mclock.Now(), - td: td, - } - if f.lastUpdateStats != nil { - f.lastUpdateStats.next = newEntry - } - - f.lastUpdateStats = newEntry - for p := range f.peers { - f.checkUpdateStats(p, newEntry) - } - } -} - -// checkUpdateStats checks those peers who have not confirmed a certain highest Td (or a larger one) by the time it -// has been confirmed by another peer. If they have confirmed such a head by now, their stats are updated with the -// block delay which is (this peer's confirmation time)-(first confirmation time). After blockDelayTimeout has passed, -// the stats are updated with blockDelayTimeout value. In either case, the confirmed or timed out updateStatsEntry -// items are removed from the head of the linked list. -// If a new entry has been added to the global tail, it is passed as a parameter here even though this function -// assumes that it has already been added, so that if the peer's list is empty (all heads confirmed, head is nil), -// it can set the new head to newEntry. -func (f *lightFetcher) checkUpdateStats(p *serverPeer, newEntry *updateStatsEntry) { - now := mclock.Now() - fp := f.peers[p] - if fp == nil { - p.Log().Debug("Unknown peer to check update stats") - return - } - - if newEntry != nil && fp.firstUpdateStats == nil { - fp.firstUpdateStats = newEntry - } - for fp.firstUpdateStats != nil && fp.firstUpdateStats.time <= now-mclock.AbsTime(blockDelayTimeout) { - fp.firstUpdateStats = fp.firstUpdateStats.next - } - if fp.confirmedTd != nil { - for fp.firstUpdateStats != nil && fp.firstUpdateStats.td.Cmp(fp.confirmedTd) <= 0 { - fp.firstUpdateStats = fp.firstUpdateStats.next + // Otherwise find the earliest expiring request + earliest := time.Now() + for _, req := range requests { + if earliest.After(req.sendAt) { + earliest = req.sendAt } } + timer.Reset(blockDelayTimeout - time.Since(earliest)) } diff --git a/les/fetcher_test.go b/les/fetcher_test.go new file mode 100644 index 0000000000..418f9ee09a --- /dev/null +++ b/les/fetcher_test.go @@ -0,0 +1,268 @@ +// Copyright 2020 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package les + +import ( + "math/big" + "testing" + "time" + + "github.com/ethereum/go-ethereum/consensus/ethash" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/p2p/enode" +) + +// verifyImportEvent verifies that one single event arrive on an import channel. +func verifyImportEvent(t *testing.T, imported chan interface{}, arrive bool) { + if arrive { + select { + case <-imported: + case <-time.After(time.Second): + t.Fatalf("import timeout") + } + } else { + select { + case <-imported: + t.Fatalf("import invoked") + case <-time.After(20 * time.Millisecond): + } + } +} + +// verifyImportDone verifies that no more events are arriving on an import channel. +func verifyImportDone(t *testing.T, imported chan interface{}) { + select { + case <-imported: + t.Fatalf("extra block imported") + case <-time.After(50 * time.Millisecond): + } +} + +// verifyChainHeight verifies the chain height is as expected. +func verifyChainHeight(t *testing.T, fetcher *lightFetcher, height uint64) { + local := fetcher.chain.CurrentHeader().Number.Uint64() + if local != height { + t.Fatalf("chain height mismatch, got %d, want %d", local, height) + } +} + +func TestSequentialAnnouncementsLes2(t *testing.T) { testSequentialAnnouncements(t, 2) } +func TestSequentialAnnouncementsLes3(t *testing.T) { testSequentialAnnouncements(t, 3) } + +func testSequentialAnnouncements(t *testing.T, protocol int) { + s, c, teardown := newClientServerEnv(t, 4, protocol, nil, nil, 0, false, false) + defer teardown() + + // Create connected peer pair. + c.handler.fetcher.noAnnounce = true // Ignore the first announce from peer which can trigger a resync. + p1, _, err := newTestPeerPair("peer", protocol, s.handler, c.handler) + if err != nil { + t.Fatalf("Failed to create peer pair %v", err) + } + c.handler.fetcher.noAnnounce = false + + importCh := make(chan interface{}) + c.handler.fetcher.newHeadHook = func(header *types.Header) { + importCh <- header + } + for i := uint64(1); i <= s.backend.Blockchain().CurrentHeader().Number.Uint64(); i++ { + header := s.backend.Blockchain().GetHeaderByNumber(i) + hash, number := header.Hash(), header.Number.Uint64() + td := rawdb.ReadTd(s.db, hash, number) + + announce := announceData{hash, number, td, 0, nil} + if p1.cpeer.announceType == announceTypeSigned { + announce.sign(s.handler.server.privateKey) + } + p1.cpeer.sendAnnounce(announce) + verifyImportEvent(t, importCh, true) + } + verifyImportDone(t, importCh) + verifyChainHeight(t, c.handler.fetcher, 4) +} + +func TestGappedAnnouncementsLes2(t *testing.T) { testGappedAnnouncements(t, 2) } +func TestGappedAnnouncementsLes3(t *testing.T) { testGappedAnnouncements(t, 3) } + +func testGappedAnnouncements(t *testing.T, protocol int) { + s, c, teardown := newClientServerEnv(t, 4, protocol, nil, nil, 0, false, false) + defer teardown() + + // Create connected peer pair. + c.handler.fetcher.noAnnounce = true // Ignore the first announce from peer which can trigger a resync. + peer, _, err := newTestPeerPair("peer", protocol, s.handler, c.handler) + if err != nil { + t.Fatalf("Failed to create peer pair %v", err) + } + c.handler.fetcher.noAnnounce = false + + done := make(chan *types.Header, 1) + c.handler.fetcher.newHeadHook = func(header *types.Header) { done <- header } + + // Prepare announcement by latest header. + latest := s.backend.Blockchain().CurrentHeader() + hash, number := latest.Hash(), latest.Number.Uint64() + td := rawdb.ReadTd(s.db, hash, number) + + // Sign the announcement if necessary. + announce := announceData{hash, number, td, 0, nil} + if peer.cpeer.announceType == announceTypeSigned { + announce.sign(s.handler.server.privateKey) + } + peer.cpeer.sendAnnounce(announce) + + <-done // Wait syncing + verifyChainHeight(t, c.handler.fetcher, 4) + + // Send a reorged announcement + var newAnno = make(chan struct{}, 1) + c.handler.fetcher.noAnnounce = true + c.handler.fetcher.newAnnounce = func(*serverPeer, *announceData) { + newAnno <- struct{}{} + } + blocks, _ := core.GenerateChain(rawdb.ReadChainConfig(s.db, s.backend.Blockchain().Genesis().Hash()), s.backend.Blockchain().GetBlockByNumber(3), + ethash.NewFaker(), s.db, 2, func(i int, gen *core.BlockGen) { + gen.OffsetTime(-9) // higher block difficulty + }) + s.backend.Blockchain().InsertChain(blocks) + <-newAnno + c.handler.fetcher.noAnnounce = false + c.handler.fetcher.newAnnounce = nil + + latest = blocks[len(blocks)-1].Header() + hash, number = latest.Hash(), latest.Number.Uint64() + td = rawdb.ReadTd(s.db, hash, number) + + announce = announceData{hash, number, td, 1, nil} + if peer.cpeer.announceType == announceTypeSigned { + announce.sign(s.handler.server.privateKey) + } + peer.cpeer.sendAnnounce(announce) + + <-done // Wait syncing + verifyChainHeight(t, c.handler.fetcher, 5) +} + +func TestTrustedAnnouncementsLes2(t *testing.T) { testTrustedAnnouncement(t, 2) } +func TestTrustedAnnouncementsLes3(t *testing.T) { testTrustedAnnouncement(t, 3) } + +func testTrustedAnnouncement(t *testing.T, protocol int) { + var ( + servers []*testServer + teardowns []func() + nodes []*enode.Node + ids []string + cpeers []*clientPeer + speers []*serverPeer + ) + for i := 0; i < 10; i++ { + s, n, teardown := newTestServerPeer(t, 10, protocol) + + servers = append(servers, s) + nodes = append(nodes, n) + teardowns = append(teardowns, teardown) + + // A half of them are trusted servers. + if i < 5 { + ids = append(ids, n.String()) + } + } + _, c, teardown := newClientServerEnv(t, 0, protocol, nil, ids, 60, false, false) + defer teardown() + defer func() { + for i := 0; i < len(teardowns); i++ { + teardowns[i]() + } + }() + + c.handler.fetcher.noAnnounce = true // Ignore the first announce from peer which can trigger a resync. + + // Connect all server instances. + for i := 0; i < len(servers); i++ { + sp, cp, err := connect(servers[i].handler, nodes[i].ID(), c.handler, protocol) + if err != nil { + t.Fatalf("connect server and client failed, err %s", err) + } + cpeers = append(cpeers, cp) + speers = append(speers, sp) + } + c.handler.fetcher.noAnnounce = false + + newHead := make(chan *types.Header, 1) + c.handler.fetcher.newHeadHook = func(header *types.Header) { newHead <- header } + + check := func(height []uint64, expected uint64, callback func()) { + for i := 0; i < len(height); i++ { + for j := 0; j < len(servers); j++ { + h := servers[j].backend.Blockchain().GetHeaderByNumber(height[i]) + hash, number := h.Hash(), h.Number.Uint64() + td := rawdb.ReadTd(servers[j].db, hash, number) + + // Sign the announcement if necessary. + announce := announceData{hash, number, td, 0, nil} + p := cpeers[j] + if p.announceType == announceTypeSigned { + announce.sign(servers[j].handler.server.privateKey) + } + p.sendAnnounce(announce) + } + } + if callback != nil { + callback() + } + verifyChainHeight(t, c.handler.fetcher, expected) + } + check([]uint64{1}, 1, func() { <-newHead }) // Sequential announcements + check([]uint64{4}, 4, func() { <-newHead }) // ULC-style light syncing, rollback untrusted headers + check([]uint64{10}, 10, func() { <-newHead }) // Sync the whole chain. +} + +func TestInvalidAnnounces(t *testing.T) { + s, c, teardown := newClientServerEnv(t, 4, lpv3, nil, nil, 0, false, false) + defer teardown() + + // Create connected peer pair. + c.handler.fetcher.noAnnounce = true // Ignore the first announce from peer which can trigger a resync. + peer, _, err := newTestPeerPair("peer", lpv3, s.handler, c.handler) + if err != nil { + t.Fatalf("Failed to create peer pair %v", err) + } + c.handler.fetcher.noAnnounce = false + + done := make(chan *types.Header, 1) + c.handler.fetcher.newHeadHook = func(header *types.Header) { done <- header } + + // Prepare announcement by latest header. + headerOne := s.backend.Blockchain().GetHeaderByNumber(1) + hash, number := headerOne.Hash(), headerOne.Number.Uint64() + td := big.NewInt(200) // bad td + + // Sign the announcement if necessary. + announce := announceData{hash, number, td, 0, nil} + if peer.cpeer.announceType == announceTypeSigned { + announce.sign(s.handler.server.privateKey) + } + peer.cpeer.sendAnnounce(announce) + <-done // Wait syncing + + // Ensure the bad peer is evicited + if c.handler.backend.peers.len() != 0 { + t.Fatalf("Failed to evict invalid peer") + } +} diff --git a/les/odr_test.go b/les/odr_test.go index d30642c4f7..ccd220d692 100644 --- a/les/odr_test.go +++ b/les/odr_test.go @@ -222,13 +222,13 @@ func testOdr(t *testing.T, protocol int, expFail uint64, checkCached bool, fn od // expect retrievals to fail (except genesis block) without a les peer client.handler.backend.peers.lock.Lock() - client.peer.speer.hasBlock = func(common.Hash, uint64, bool) bool { return false } + client.peer.speer.hasBlockHook = func(common.Hash, uint64, bool) bool { return false } client.handler.backend.peers.lock.Unlock() test(expFail) // expect all retrievals to pass client.handler.backend.peers.lock.Lock() - client.peer.speer.hasBlock = func(common.Hash, uint64, bool) bool { return true } + client.peer.speer.hasBlockHook = func(common.Hash, uint64, bool) bool { return true } client.handler.backend.peers.lock.Unlock() test(5) diff --git a/les/peer.go b/les/peer.go index bda77b97cf..72156814dd 100644 --- a/les/peer.go +++ b/les/peer.go @@ -36,7 +36,6 @@ import ( "github.com/ethereum/go-ethereum/les/utils" "github.com/ethereum/go-ethereum/light" "github.com/ethereum/go-ethereum/p2p" - "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" ) @@ -115,11 +114,6 @@ func (m keyValueMap) get(key string, val interface{}) error { return rlp.DecodeBytes(enc, val) } -// peerIdToString converts enode.ID to a string form -func peerIdToString(id enode.ID) string { - return fmt.Sprintf("%x", id.Bytes()) -} - // peerCommons contains fields needed by both server peer and client peer. type peerCommons struct { *p2p.Peer @@ -343,12 +337,12 @@ type serverPeer struct { sentReqs map[uint64]sentReqEntry // Statistics - errCount int // Counter the invalid responses server has replied + errCount utils.LinearExpiredValue // Counter the invalid responses server has replied updateCount uint64 updateTime mclock.AbsTime - // Callbacks - hasBlock func(common.Hash, uint64, bool) bool // Used to determine whether the server has the specified block. + // Test callback hooks + hasBlockHook func(common.Hash, uint64, bool) bool // Used to determine whether the server has the specified block. } func newServerPeer(version int, network uint64, trusted bool, p *p2p.Peer, rw p2p.MsgReadWriter) *serverPeer { @@ -356,13 +350,14 @@ func newServerPeer(version int, network uint64, trusted bool, p *p2p.Peer, rw p2 peerCommons: peerCommons{ Peer: p, rw: rw, - id: peerIdToString(p.ID()), + id: p.ID().String(), version: version, network: network, sendQueue: utils.NewExecQueue(100), closeCh: make(chan struct{}), }, - trusted: trusted, + trusted: trusted, + errCount: utils.LinearExpiredValue{Rate: mclock.AbsTime(time.Hour)}, } } @@ -524,7 +519,11 @@ func (p *serverPeer) getTxRelayCost(amount, size int) uint64 { // HasBlock checks if the peer has a given block func (p *serverPeer) HasBlock(hash common.Hash, number uint64, hasState bool) bool { p.lock.RLock() + defer p.lock.RUnlock() + if p.hasBlockHook != nil { + return p.hasBlockHook(hash, number, hasState) + } head := p.headInfo.Number var since, recent uint64 if hasState { @@ -534,10 +533,7 @@ func (p *serverPeer) HasBlock(hash common.Hash, number uint64, hasState bool) bo since = p.chainSince recent = p.chainRecent } - hasBlock := p.hasBlock - p.lock.RUnlock() - - return head >= number && number >= since && (recent == 0 || number+recent+4 > head) && hasBlock != nil && hasBlock(hash, number, hasState) + return head >= number && number >= since && (recent == 0 || number+recent+4 > head) } // updateFlowControl updates the flow control parameters belonging to the server @@ -562,6 +558,15 @@ func (p *serverPeer) updateFlowControl(update keyValueMap) { } } +// updateHead updates the head information based on the announcement from +// the peer. +func (p *serverPeer) updateHead(hash common.Hash, number uint64, td *big.Int) { + p.lock.Lock() + defer p.lock.Unlock() + + p.headInfo = blockInfo{Hash: hash, Number: number, Td: td} +} + // Handshake executes the les protocol handshake, negotiating version number, // network IDs, difficulties, head and genesis blocks. func (p *serverPeer) Handshake(td *big.Int, head common.Hash, headNum uint64, genesis common.Hash, server *LesServer) error { @@ -712,11 +717,15 @@ type clientPeer struct { // responseLock ensures that responses are queued in the same order as // RequestProcessed is called responseLock sync.Mutex - server bool - invalidCount uint32 // Counter the invalid request the client peer has made. responseCount uint64 // Counter to generate an unique id for request processing. - errCh chan error - fcClient *flowcontrol.ClientNode // Server side mirror token bucket. + + // invalidLock is used for protecting invalidCount. + invalidLock sync.RWMutex + invalidCount utils.LinearExpiredValue // Counter the invalid request the client peer has made. + + server bool + errCh chan error + fcClient *flowcontrol.ClientNode // Server side mirror token bucket. } func newClientPeer(version int, network uint64, p *p2p.Peer, rw p2p.MsgReadWriter) *clientPeer { @@ -724,13 +733,14 @@ func newClientPeer(version int, network uint64, p *p2p.Peer, rw p2p.MsgReadWrite peerCommons: peerCommons{ Peer: p, rw: rw, - id: peerIdToString(p.ID()), + id: p.ID().String(), version: version, network: network, sendQueue: utils.NewExecQueue(100), closeCh: make(chan struct{}), }, - errCh: make(chan error, 1), + invalidCount: utils.LinearExpiredValue{Rate: mclock.AbsTime(time.Hour)}, + errCh: make(chan error, 1), } } @@ -970,6 +980,18 @@ func (p *clientPeer) Handshake(td *big.Int, head common.Hash, headNum uint64, ge }) } +func (p *clientPeer) bumpInvalid() { + p.invalidLock.Lock() + p.invalidCount.Add(1, mclock.Now()) + p.invalidLock.Unlock() +} + +func (p *clientPeer) getInvalid() uint64 { + p.invalidLock.RLock() + defer p.invalidLock.RUnlock() + return p.invalidCount.Value(mclock.Now()) +} + // serverPeerSubscriber is an interface to notify services about added or // removed server peers type serverPeerSubscriber interface { diff --git a/les/server.go b/les/server.go index a154571b4f..609a24fd2b 100644 --- a/les/server.go +++ b/les/server.go @@ -116,7 +116,7 @@ func NewLesServer(e *eth.Ethereum, config *eth.Config) (*LesServer, error) { srv.maxCapacity = totalRecharge } srv.fcManager.SetCapacityLimits(srv.freeCapacity, srv.maxCapacity, srv.freeCapacity*2) - srv.clientPool = newClientPool(srv.chainDb, srv.freeCapacity, mclock.System{}, func(id enode.ID) { go srv.peers.unregister(peerIdToString(id)) }) + srv.clientPool = newClientPool(srv.chainDb, srv.freeCapacity, mclock.System{}, func(id enode.ID) { go srv.peers.unregister(id.String()) }) srv.clientPool.setDefaultFactors(priceFactors{0, 1, 1}, priceFactors{0, 1, 1}) checkpoint := srv.latestLocalCheckpoint() @@ -153,7 +153,7 @@ func (s *LesServer) APIs() []rpc.API { func (s *LesServer) Protocols() []p2p.Protocol { ps := s.makeProtocols(ServerProtocolVersions, s.handler.runPeer, func(id enode.ID) interface{} { - if p := s.peers.peer(peerIdToString(id)); p != nil { + if p := s.peers.peer(id.String()); p != nil { return p.Info() } return nil diff --git a/les/server_handler.go b/les/server_handler.go index dd1c37f66d..70c6a310af 100644 --- a/les/server_handler.go +++ b/les/server_handler.go @@ -322,7 +322,7 @@ func (h *serverHandler) handleMsg(p *clientPeer, wg *sync.WaitGroup) error { origin = h.blockchain.GetHeaderByNumber(query.Origin.Number) } if origin == nil { - atomic.AddUint32(&p.invalidCount, 1) + p.bumpInvalid() break } headers = append(headers, origin) @@ -419,7 +419,7 @@ func (h *serverHandler) handleMsg(p *clientPeer, wg *sync.WaitGroup) error { } body := h.blockchain.GetBodyRLP(hash) if body == nil { - atomic.AddUint32(&p.invalidCount, 1) + p.bumpInvalid() continue } bodies = append(bodies, body) @@ -467,7 +467,7 @@ func (h *serverHandler) handleMsg(p *clientPeer, wg *sync.WaitGroup) error { header := h.blockchain.GetHeaderByHash(request.BHash) if header == nil { p.Log().Warn("Failed to retrieve associate header for code", "hash", request.BHash) - atomic.AddUint32(&p.invalidCount, 1) + p.bumpInvalid() continue } // Refuse to search stale state data in the database since looking for @@ -475,7 +475,7 @@ func (h *serverHandler) handleMsg(p *clientPeer, wg *sync.WaitGroup) error { local := h.blockchain.CurrentHeader().Number.Uint64() if !h.server.archiveMode && header.Number.Uint64()+core.TriesInMemory <= local { p.Log().Debug("Reject stale code request", "number", header.Number.Uint64(), "head", local) - atomic.AddUint32(&p.invalidCount, 1) + p.bumpInvalid() continue } triedb := h.blockchain.StateCache().TrieDB() @@ -483,7 +483,7 @@ func (h *serverHandler) handleMsg(p *clientPeer, wg *sync.WaitGroup) error { account, err := h.getAccount(triedb, header.Root, common.BytesToHash(request.AccKey)) if err != nil { p.Log().Warn("Failed to retrieve account for code", "block", header.Number, "hash", header.Hash(), "account", common.BytesToHash(request.AccKey), "err", err) - atomic.AddUint32(&p.invalidCount, 1) + p.bumpInvalid() continue } code, err := triedb.Node(common.BytesToHash(account.CodeHash)) @@ -542,7 +542,7 @@ func (h *serverHandler) handleMsg(p *clientPeer, wg *sync.WaitGroup) error { results := h.blockchain.GetReceiptsByHash(hash) if results == nil { if header := h.blockchain.GetHeaderByHash(hash); header == nil || header.ReceiptHash != types.EmptyRootHash { - atomic.AddUint32(&p.invalidCount, 1) + p.bumpInvalid() continue } } @@ -605,7 +605,7 @@ func (h *serverHandler) handleMsg(p *clientPeer, wg *sync.WaitGroup) error { if header = h.blockchain.GetHeaderByHash(request.BHash); header == nil { p.Log().Warn("Failed to retrieve header for proof", "hash", request.BHash) - atomic.AddUint32(&p.invalidCount, 1) + p.bumpInvalid() continue } // Refuse to search stale state data in the database since looking for @@ -613,14 +613,14 @@ func (h *serverHandler) handleMsg(p *clientPeer, wg *sync.WaitGroup) error { local := h.blockchain.CurrentHeader().Number.Uint64() if !h.server.archiveMode && header.Number.Uint64()+core.TriesInMemory <= local { p.Log().Debug("Reject stale trie request", "number", header.Number.Uint64(), "head", local) - atomic.AddUint32(&p.invalidCount, 1) + p.bumpInvalid() continue } root = header.Root } // If a header lookup failed (non existent), ignore subsequent requests for the same header if root == (common.Hash{}) { - atomic.AddUint32(&p.invalidCount, 1) + p.bumpInvalid() continue } // Open the account or storage trie for the request @@ -639,7 +639,7 @@ func (h *serverHandler) handleMsg(p *clientPeer, wg *sync.WaitGroup) error { account, err := h.getAccount(statedb.TrieDB(), root, common.BytesToHash(request.AccKey)) if err != nil { p.Log().Warn("Failed to retrieve account for proof", "block", header.Number, "hash", header.Hash(), "account", common.BytesToHash(request.AccKey), "err", err) - atomic.AddUint32(&p.invalidCount, 1) + p.bumpInvalid() continue } trie, err = statedb.OpenStorageTrie(common.BytesToHash(request.AccKey), account.Root) @@ -833,9 +833,9 @@ func (h *serverHandler) handleMsg(p *clientPeer, wg *sync.WaitGroup) error { clientErrorMeter.Mark(1) return errResp(ErrInvalidMsgCode, "%v", msg.Code) } - // If the client has made too much invalid request(e.g. request a non-exist data), + // If the client has made too much invalid request(e.g. request a non-existent data), // reject them to prevent SPAM attack. - if atomic.LoadUint32(&p.invalidCount) > maxRequestErrors { + if p.getInvalid() > maxRequestErrors { clientErrorMeter.Mark(1) return errTooManyInvalidRequest } diff --git a/les/test_helper.go b/les/test_helper.go index 28906f1f10..4ce1d03c2b 100644 --- a/les/test_helper.go +++ b/les/test_helper.go @@ -223,6 +223,7 @@ func newTestClientHandler(backend *backends.SimulatedBackend, odr *LesOdr, index if client.oracle != nil { client.oracle.Start(backend) } + client.handler.start() return client.handler } diff --git a/les/utils/expiredvalue.go b/les/utils/expiredvalue.go index a58587368f..980156d21c 100644 --- a/les/utils/expiredvalue.go +++ b/les/utils/expiredvalue.go @@ -124,6 +124,50 @@ func (e *ExpiredValue) SubExp(a ExpiredValue) { } } +// LinearExpiredValue is very similar with the expiredValue which the value +// will continuously expired. But the different part is it's expired linearly. +type LinearExpiredValue struct { + Offset uint64 // The latest time offset + Val uint64 // The remaining value, can never be negative + Rate mclock.AbsTime `rlp:"-"` // Expiration rate(by nanosecond), will ignored by RLP +} + +// value calculates the value at the given moment. This function always has the +// assumption that the given timestamp shouldn't less than the recorded one. +func (e LinearExpiredValue) Value(now mclock.AbsTime) uint64 { + offset := uint64(now / e.Rate) + if e.Offset < offset { + diff := offset - e.Offset + if e.Val >= diff { + e.Val -= diff + } else { + e.Val = 0 + } + } + return e.Val +} + +// add adds a signed value at the given moment. This function always has the +// assumption that the given timestamp shouldn't less than the recorded one. +func (e *LinearExpiredValue) Add(amount int64, now mclock.AbsTime) uint64 { + offset := uint64(now / e.Rate) + if e.Offset < offset { + diff := offset - e.Offset + if e.Val >= diff { + e.Val -= diff + } else { + e.Val = 0 + } + e.Offset = offset + } + if amount < 0 && uint64(-amount) > e.Val { + e.Val = 0 + } else { + e.Val = uint64(int64(e.Val) + amount) + } + return e.Val +} + // Expirer changes logOffset with a linear rate which can be changed during operation. // It is not thread safe, if access by multiple goroutines is needed then it should be // encapsulated into a locked structure. diff --git a/les/utils/expiredvalue_test.go b/les/utils/expiredvalue_test.go index fa22d58274..1c751d8cc6 100644 --- a/les/utils/expiredvalue_test.go +++ b/les/utils/expiredvalue_test.go @@ -18,6 +18,8 @@ package utils import ( "testing" + + "github.com/ethereum/go-ethereum/common/mclock" ) func TestValueExpiration(t *testing.T) { @@ -116,3 +118,78 @@ func TestExpiredValueSubtraction(t *testing.T) { } } } + +func TestLinearExpiredValue(t *testing.T) { + var cases = []struct { + value LinearExpiredValue + now mclock.AbsTime + expect uint64 + }{ + {LinearExpiredValue{ + Offset: 0, + Val: 0, + Rate: mclock.AbsTime(1), + }, 0, 0}, + + {LinearExpiredValue{ + Offset: 1, + Val: 1, + Rate: mclock.AbsTime(1), + }, 0, 1}, + + {LinearExpiredValue{ + Offset: 1, + Val: 1, + Rate: mclock.AbsTime(1), + }, mclock.AbsTime(2), 0}, + + {LinearExpiredValue{ + Offset: 1, + Val: 1, + Rate: mclock.AbsTime(1), + }, mclock.AbsTime(3), 0}, + } + for _, c := range cases { + if value := c.value.Value(c.now); value != c.expect { + t.Fatalf("Value mismatch, want=%d, got=%d", c.expect, value) + } + } +} + +func TestLinearExpiredAddition(t *testing.T) { + var cases = []struct { + value LinearExpiredValue + amount int64 + now mclock.AbsTime + expect uint64 + }{ + {LinearExpiredValue{ + Offset: 0, + Val: 0, + Rate: mclock.AbsTime(1), + }, -1, 0, 0}, + + {LinearExpiredValue{ + Offset: 1, + Val: 1, + Rate: mclock.AbsTime(1), + }, -1, 0, 0}, + + {LinearExpiredValue{ + Offset: 1, + Val: 2, + Rate: mclock.AbsTime(1), + }, -1, mclock.AbsTime(2), 0}, + + {LinearExpiredValue{ + Offset: 1, + Val: 2, + Rate: mclock.AbsTime(1), + }, -2, mclock.AbsTime(2), 0}, + } + for _, c := range cases { + if value := c.value.Add(c.amount, c.now); value != c.expect { + t.Fatalf("Value mismatch, want=%d, got=%d", c.expect, value) + } + } +}