cmd/geth, cmd/utils: make chain importing interruptible
Interrupting import with Ctrl-C could cause database corruption because the signal wasn't handled. utils.ImportChain now checks for a queued interrupt on every batch.
This commit is contained in:
parent
705beb4c25
commit
67effb94b6
|
@ -54,10 +54,11 @@ func importChain(ctx *cli.Context) {
|
||||||
}
|
}
|
||||||
chain, blockDB, stateDB, extraDB := utils.MakeChain(ctx)
|
chain, blockDB, stateDB, extraDB := utils.MakeChain(ctx)
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
if err := utils.ImportChain(chain, ctx.Args().First()); err != nil {
|
err := utils.ImportChain(chain, ctx.Args().First())
|
||||||
utils.Fatalf("Import error: %v\n", err)
|
|
||||||
}
|
|
||||||
flushAll(blockDB, stateDB, extraDB)
|
flushAll(blockDB, stateDB, extraDB)
|
||||||
|
if err != nil {
|
||||||
|
utils.Fatalf("Import error: %v", err)
|
||||||
|
}
|
||||||
fmt.Printf("Import done in %v", time.Since(start))
|
fmt.Printf("Import done in %v", time.Since(start))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -106,7 +107,7 @@ func upgradeDB(ctx *cli.Context) {
|
||||||
filename := fmt.Sprintf("blockchain_%d_%s.chain", bcVersion, time.Now().Format("20060102_150405"))
|
filename := fmt.Sprintf("blockchain_%d_%s.chain", bcVersion, time.Now().Format("20060102_150405"))
|
||||||
exportFile := filepath.Join(ctx.GlobalString(utils.DataDirFlag.Name), filename)
|
exportFile := filepath.Join(ctx.GlobalString(utils.DataDirFlag.Name), filename)
|
||||||
if err := utils.ExportChain(chain, exportFile); err != nil {
|
if err := utils.ExportChain(chain, exportFile); err != nil {
|
||||||
utils.Fatalf("Unable to export chain for reimport %s\n", err)
|
utils.Fatalf("Unable to export chain for reimport %s", err)
|
||||||
}
|
}
|
||||||
flushAll(blockDB, stateDB, extraDB)
|
flushAll(blockDB, stateDB, extraDB)
|
||||||
os.RemoveAll(filepath.Join(ctx.GlobalString(utils.DataDirFlag.Name), "blockchain"))
|
os.RemoveAll(filepath.Join(ctx.GlobalString(utils.DataDirFlag.Name), "blockchain"))
|
||||||
|
@ -118,7 +119,7 @@ func upgradeDB(ctx *cli.Context) {
|
||||||
err := utils.ImportChain(chain, exportFile)
|
err := utils.ImportChain(chain, exportFile)
|
||||||
flushAll(blockDB, stateDB, extraDB)
|
flushAll(blockDB, stateDB, extraDB)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
utils.Fatalf("Import error %v (a backup is made in %s, use the import command to import it)\n", err, exportFile)
|
utils.Fatalf("Import error %v (a backup is made in %s, use the import command to import it)", err, exportFile)
|
||||||
} else {
|
} else {
|
||||||
os.Remove(exportFile)
|
os.Remove(exportFile)
|
||||||
glog.Infoln("Import finished")
|
glog.Infoln("Import finished")
|
||||||
|
|
|
@ -173,22 +173,47 @@ func FormatTransactionData(data string) []byte {
|
||||||
return d
|
return d
|
||||||
}
|
}
|
||||||
|
|
||||||
func ImportChain(chainmgr *core.ChainManager, fn string) error {
|
func ImportChain(chain *core.ChainManager, fn string) error {
|
||||||
|
// Watch for Ctrl-C while the import is running.
|
||||||
|
// If a signal is received, the import will stop at the next batch.
|
||||||
|
interrupt := make(chan os.Signal, 1)
|
||||||
|
stop := make(chan struct{})
|
||||||
|
signal.Notify(interrupt, os.Interrupt)
|
||||||
|
defer signal.Stop(interrupt)
|
||||||
|
defer close(interrupt)
|
||||||
|
go func() {
|
||||||
|
if _, ok := <-interrupt; ok {
|
||||||
|
glog.Info("caught interrupt during import, will stop at next batch")
|
||||||
|
}
|
||||||
|
close(stop)
|
||||||
|
}()
|
||||||
|
checkInterrupt := func() bool {
|
||||||
|
select {
|
||||||
|
case <-stop:
|
||||||
|
return true
|
||||||
|
default:
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
glog.Infoln("Importing blockchain", fn)
|
glog.Infoln("Importing blockchain", fn)
|
||||||
fh, err := os.OpenFile(fn, os.O_RDONLY, os.ModePerm)
|
fh, err := os.Open(fn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
defer fh.Close()
|
defer fh.Close()
|
||||||
|
|
||||||
chainmgr.Reset()
|
|
||||||
stream := rlp.NewStream(fh, 0)
|
stream := rlp.NewStream(fh, 0)
|
||||||
|
|
||||||
|
// Remove all existing blocks and start the import.
|
||||||
|
chain.Reset()
|
||||||
batchSize := 2500
|
batchSize := 2500
|
||||||
blocks := make(types.Blocks, batchSize)
|
blocks := make(types.Blocks, batchSize)
|
||||||
n := 0
|
n := 0
|
||||||
for {
|
for {
|
||||||
// Load a batch of RLP blocks.
|
// Load a batch of RLP blocks.
|
||||||
|
if checkInterrupt() {
|
||||||
|
return fmt.Errorf("interrupted")
|
||||||
|
}
|
||||||
i := 0
|
i := 0
|
||||||
for ; i < batchSize; i++ {
|
for ; i < batchSize; i++ {
|
||||||
var b types.Block
|
var b types.Block
|
||||||
|
@ -204,7 +229,10 @@ func ImportChain(chainmgr *core.ChainManager, fn string) error {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
// Import the batch.
|
// Import the batch.
|
||||||
if _, err := chainmgr.InsertChain(blocks[:i]); err != nil {
|
if checkInterrupt() {
|
||||||
|
return fmt.Errorf("interrupted")
|
||||||
|
}
|
||||||
|
if _, err := chain.InsertChain(blocks[:i]); err != nil {
|
||||||
return fmt.Errorf("invalid block %d: %v", n, err)
|
return fmt.Errorf("invalid block %d: %v", n, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue