core: parallelise nonce checking when processing blocks
ChainManager now uses a parallel approach to block processing where all nonces are checked seperatly from the block processing process. This speeds up the process by about 3 times on my i7
This commit is contained in:
parent
443d024843
commit
c67424ecc8
|
@ -336,8 +336,8 @@ func GetChain(ctx *cli.Context) (*core.ChainManager, common.Database, common.Dat
|
|||
}
|
||||
|
||||
eventMux := new(event.TypeMux)
|
||||
chainManager := core.NewChainManager(blockDb, stateDb, eventMux)
|
||||
pow := ethash.New()
|
||||
chainManager := core.NewChainManager(blockDb, stateDb, pow, eventMux)
|
||||
txPool := core.NewTxPool(eventMux, chainManager.State, chainManager.GasLimit)
|
||||
blockProcessor := core.NewBlockProcessor(stateDb, extraDb, pow, txPool, chainManager, eventMux)
|
||||
chainManager.SetProcessor(blockProcessor)
|
||||
|
|
|
@ -189,7 +189,7 @@ func (sm *BlockProcessor) processWithParent(block, parent *types.Block) (logs st
|
|||
state := state.New(parent.Root(), sm.db)
|
||||
|
||||
// Block validation
|
||||
if err = sm.ValidateHeader(block.Header(), parent.Header()); err != nil {
|
||||
if err = sm.ValidateHeader(block.Header(), parent.Header(), false); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -269,7 +269,7 @@ func (sm *BlockProcessor) processWithParent(block, parent *types.Block) (logs st
|
|||
// Validates the current block. Returns an error if the block was invalid,
|
||||
// an uncle or anything that isn't on the current block chain.
|
||||
// Validation validates easy over difficult (dagger takes longer time = difficult)
|
||||
func (sm *BlockProcessor) ValidateHeader(block, parent *types.Header) error {
|
||||
func (sm *BlockProcessor) ValidateHeader(block, parent *types.Header, checkPow bool) error {
|
||||
if big.NewInt(int64(len(block.Extra))).Cmp(params.MaximumExtraDataSize) == 1 {
|
||||
return fmt.Errorf("Block extra data too long (%d)", len(block.Extra))
|
||||
}
|
||||
|
@ -300,10 +300,12 @@ func (sm *BlockProcessor) ValidateHeader(block, parent *types.Header) error {
|
|||
return BlockEqualTSErr //ValidationError("Block timestamp equal or less than previous block (%v - %v)", block.Time, parent.Time)
|
||||
}
|
||||
|
||||
if checkPow {
|
||||
// Verify the nonce of the block. Return an error if it's not valid
|
||||
if !sm.Pow.Verify(types.NewBlockWithHeader(block)) {
|
||||
return ValidationError("Block's nonce is invalid (= %x)", block.Nonce)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -358,7 +360,7 @@ func (sm *BlockProcessor) VerifyUncles(statedb *state.StateDB, block, parent *ty
|
|||
return UncleError("uncle[%d](%x)'s parent unknown (%x)", i, hash[:4], uncle.ParentHash[0:4])
|
||||
}
|
||||
|
||||
if err := sm.ValidateHeader(uncle, ancestorHeaders[uncle.ParentHash]); err != nil {
|
||||
if err := sm.ValidateHeader(uncle, ancestorHeaders[uncle.ParentHash], true); err != nil {
|
||||
return ValidationError(fmt.Sprintf("uncle[%d](%x) header invalid: %v", i, hash[:4], err))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"fmt"
|
||||
"io"
|
||||
"math/big"
|
||||
"runtime"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
|
@ -15,6 +16,7 @@ import (
|
|||
"github.com/ethereum/go-ethereum/logger"
|
||||
"github.com/ethereum/go-ethereum/logger/glog"
|
||||
"github.com/ethereum/go-ethereum/params"
|
||||
"github.com/ethereum/go-ethereum/pow"
|
||||
"github.com/ethereum/go-ethereum/rlp"
|
||||
)
|
||||
|
||||
|
@ -100,9 +102,11 @@ type ChainManager struct {
|
|||
|
||||
quit chan struct{}
|
||||
wg sync.WaitGroup
|
||||
|
||||
pow pow.PoW
|
||||
}
|
||||
|
||||
func NewChainManager(blockDb, stateDb common.Database, mux *event.TypeMux) *ChainManager {
|
||||
func NewChainManager(blockDb, stateDb common.Database, pow pow.PoW, mux *event.TypeMux) *ChainManager {
|
||||
bc := &ChainManager{
|
||||
blockDb: blockDb,
|
||||
stateDb: stateDb,
|
||||
|
@ -110,6 +114,7 @@ func NewChainManager(blockDb, stateDb common.Database, mux *event.TypeMux) *Chai
|
|||
eventMux: mux,
|
||||
quit: make(chan struct{}),
|
||||
cache: NewBlockCache(blockCacheLimit),
|
||||
pow: pow,
|
||||
}
|
||||
bc.setLastState()
|
||||
|
||||
|
@ -529,10 +534,19 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) {
|
|||
stats struct{ queued, processed, ignored int }
|
||||
tstart = time.Now()
|
||||
)
|
||||
|
||||
// check the nonce in parallel to the block processing
|
||||
// this speeds catching up significantly
|
||||
nonceErrCh := make(chan error)
|
||||
go func() {
|
||||
nonceErrCh <- verifyNonces(self.pow, chain)
|
||||
}()
|
||||
|
||||
for i, block := range chain {
|
||||
if block == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// Setting block.Td regardless of error (known for example) prevents errors down the line
|
||||
// in the protocol handler
|
||||
block.Td = new(big.Int).Set(CalcTD(block, self.GetBlock(block.ParentHash())))
|
||||
|
@ -562,11 +576,7 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) {
|
|||
continue
|
||||
}
|
||||
|
||||
h := block.Header()
|
||||
|
||||
glog.V(logger.Error).Infof("INVALID block #%v (%x)\n", h.Number, h.Hash().Bytes())
|
||||
glog.V(logger.Error).Infoln(err)
|
||||
glog.V(logger.Debug).Infoln(block)
|
||||
blockErr(block, err)
|
||||
|
||||
return i, err
|
||||
}
|
||||
|
@ -620,6 +630,13 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) {
|
|||
|
||||
}
|
||||
|
||||
// check and wait for the nonce error channel and
|
||||
// make sure no nonce error was thrown in the process
|
||||
err := <-nonceErrCh
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
if (stats.queued > 0 || stats.processed > 0 || stats.ignored > 0) && bool(glog.V(logger.Info)) {
|
||||
tend := time.Since(tstart)
|
||||
start, end := chain[0], chain[len(chain)-1]
|
||||
|
@ -718,3 +735,63 @@ out:
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
func blockErr(block *types.Block, err error) {
|
||||
h := block.Header()
|
||||
glog.V(logger.Error).Infof("INVALID block #%v (%x)\n", h.Number, h.Hash().Bytes())
|
||||
glog.V(logger.Error).Infoln(err)
|
||||
glog.V(logger.Debug).Infoln(block)
|
||||
}
|
||||
|
||||
// verifyNonces verifies nonces of the given blocks in parallel and returns
|
||||
// an error if one of the blocks nonce verifications failed.
|
||||
func verifyNonces(pow pow.PoW, blocks []*types.Block) error {
|
||||
// Spawn a few workers. They listen for blocks on the in channel
|
||||
// and send results on done. The workers will exit in the
|
||||
// background when in is closed.
|
||||
var (
|
||||
in = make(chan *types.Block)
|
||||
done = make(chan error, runtime.GOMAXPROCS(0))
|
||||
)
|
||||
defer close(in)
|
||||
for i := 0; i < cap(done); i++ {
|
||||
go verifyNonce(pow, in, done)
|
||||
}
|
||||
// Feed blocks to the workers, aborting at the first invalid nonce.
|
||||
var (
|
||||
running, i int
|
||||
block *types.Block
|
||||
sendin = in
|
||||
)
|
||||
for i < len(blocks) || running > 0 {
|
||||
if i == len(blocks) {
|
||||
// Disable sending to in.
|
||||
sendin = nil
|
||||
} else {
|
||||
block = blocks[i]
|
||||
i++
|
||||
}
|
||||
select {
|
||||
case sendin <- block:
|
||||
running++
|
||||
case err := <-done:
|
||||
running--
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// verifyNonce is a worker for the verifyNonces method. It will run until
|
||||
// in is closed.
|
||||
func verifyNonce(pow pow.PoW, in <-chan *types.Block, done chan<- error) {
|
||||
for block := range in {
|
||||
if !pow.Verify(block) {
|
||||
done <- ValidationError("Block's nonce is invalid (= %x)", block.Nonce)
|
||||
} else {
|
||||
done <- nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -266,9 +266,9 @@ func New(config *Config) (*Ethereum, error) {
|
|||
MinerThreads: config.MinerThreads,
|
||||
}
|
||||
|
||||
eth.chainManager = core.NewChainManager(blockDb, stateDb, eth.EventMux())
|
||||
eth.downloader = downloader.New(eth.EventMux(), eth.chainManager.HasBlock, eth.chainManager.GetBlock)
|
||||
eth.pow = ethash.New()
|
||||
eth.chainManager = core.NewChainManager(blockDb, stateDb, eth.pow, eth.EventMux())
|
||||
eth.downloader = downloader.New(eth.EventMux(), eth.chainManager.HasBlock, eth.chainManager.GetBlock)
|
||||
eth.txPool = core.NewTxPool(eth.EventMux(), eth.chainManager.State, eth.chainManager.GasLimit)
|
||||
eth.blockProcessor = core.NewBlockProcessor(stateDb, extraDb, eth.pow, eth.txPool, eth.chainManager, eth.EventMux())
|
||||
eth.chainManager.SetProcessor(eth.blockProcessor)
|
||||
|
|
Loading…
Reference in New Issue