Merge pull request #24197 from rjl493456442/periodically-flush-batch

core: periodically flush the transaction indexes
This commit is contained in:
Péter Szilágyi 2022-01-11 11:32:43 +02:00 committed by GitHub
commit acd7b36999
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 11 additions and 12 deletions

View File

@ -979,32 +979,31 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
// range. In this case, all tx indices of newly imported blocks should be // range. In this case, all tx indices of newly imported blocks should be
// generated. // generated.
var batch = bc.db.NewBatch() var batch = bc.db.NewBatch()
for _, block := range blockChain { for i, block := range blockChain {
if bc.txLookupLimit == 0 || ancientLimit <= bc.txLookupLimit || block.NumberU64() >= ancientLimit-bc.txLookupLimit { if bc.txLookupLimit == 0 || ancientLimit <= bc.txLookupLimit || block.NumberU64() >= ancientLimit-bc.txLookupLimit {
rawdb.WriteTxLookupEntriesByBlock(batch, block) rawdb.WriteTxLookupEntriesByBlock(batch, block)
} else if rawdb.ReadTxIndexTail(bc.db) != nil { } else if rawdb.ReadTxIndexTail(bc.db) != nil {
rawdb.WriteTxLookupEntriesByBlock(batch, block) rawdb.WriteTxLookupEntriesByBlock(batch, block)
} }
stats.processed++ stats.processed++
}
// Flush all tx-lookup index data. if batch.ValueSize() > ethdb.IdealBatchSize || i == len(blockChain)-1 {
size += int64(batch.ValueSize()) size += int64(batch.ValueSize())
if err := batch.Write(); err != nil { if err = batch.Write(); err != nil {
// The tx index data could not be written.
// Roll back the ancient store update.
fastBlock := bc.CurrentFastBlock().NumberU64() fastBlock := bc.CurrentFastBlock().NumberU64()
if err := bc.db.TruncateAncients(fastBlock + 1); err != nil { if err := bc.db.TruncateAncients(fastBlock + 1); err != nil {
log.Error("Can't truncate ancient store after failed insert", "err", err) log.Error("Can't truncate ancient store after failed insert", "err", err)
} }
return 0, err return 0, err
} }
batch.Reset()
}
}
// Sync the ancient store explicitly to ensure all data has been flushed to disk. // Sync the ancient store explicitly to ensure all data has been flushed to disk.
if err := bc.db.Sync(); err != nil { if err := bc.db.Sync(); err != nil {
return 0, err return 0, err
} }
// Update the current fast block because all block data is now present in DB. // Update the current fast block because all block data is now present in DB.
previousFastBlock := bc.CurrentFastBlock().NumberU64() previousFastBlock := bc.CurrentFastBlock().NumberU64()
if !updateHead(blockChain[len(blockChain)-1]) { if !updateHead(blockChain[len(blockChain)-1]) {