From 5d891909889acb1cf1870523eb8c13fa6b20f851 Mon Sep 17 00:00:00 2001 From: Gary Rong Date: Wed, 15 May 2024 16:48:15 +0800 Subject: [PATCH] core/rawdb: introduce flush offset in freezer --- core/blockchain.go | 9 +- core/rawdb/accessors_chain_test.go | 3 +- core/rawdb/ancient_scheme.go | 1 + core/rawdb/freezer_batch.go | 10 +- core/rawdb/freezer_meta.go | 187 ++++++++++----- core/rawdb/freezer_meta_test.go | 88 ++++++- core/rawdb/freezer_table.go | 260 ++++++++++++++------- core/rawdb/freezer_table_test.go | 356 ++++++++++++++++++++--------- 8 files changed, 654 insertions(+), 260 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index c3da61b281..26fa7fc1b0 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -113,21 +113,28 @@ const ( // * the `BlockNumber`, `TxHash`, `TxIndex`, `BlockHash` and `Index` fields of log are deleted // * the `Bloom` field of receipt is deleted // * the `BlockIndex` and `TxIndex` fields of txlookup are deleted + // // - Version 5 // The following incompatible database changes were added: // * the `TxHash`, `GasCost`, and `ContractAddress` fields are no longer stored for a receipt // * the `TxHash`, `GasCost`, and `ContractAddress` fields are computed by looking up the // receipts' corresponding block + // // - Version 6 // The following incompatible database changes were added: // * Transaction lookup information stores the corresponding block number instead of block hash + // // - Version 7 // The following incompatible database changes were added: // * Use freezer as the ancient database to maintain all ancient data + // // - Version 8 // The following incompatible database changes were added: // * New scheme for contract code in order to separate the codes and trie nodes - BlockChainVersion uint64 = 8 + // + // - Version 9 + // * The metadata structure of freezer is changed by adding 'flushOffset' + BlockChainVersion uint64 = 9 ) // CacheConfig contains the configuration values for the trie database diff --git a/core/rawdb/accessors_chain_test.go b/core/rawdb/accessors_chain_test.go index 0b9dbe1335..9e8bdfdbfc 100644 --- a/core/rawdb/accessors_chain_test.go +++ b/core/rawdb/accessors_chain_test.go @@ -879,6 +879,7 @@ func TestHeadersRLPStorage(t *testing.T) { t.Fatalf("failed to create database with ancient backend") } defer db.Close() + // Create blocks var chain []*types.Block var pHash common.Hash @@ -894,7 +895,7 @@ func TestHeadersRLPStorage(t *testing.T) { chain = append(chain, block) pHash = block.Hash() } - var receipts []types.Receipts = make([]types.Receipts, 100) + receipts := make([]types.Receipts, 100) // Write first half to ancients WriteAncientBlocks(db, chain[:50], receipts[:50], big.NewInt(100)) // Write second half to db diff --git a/core/rawdb/ancient_scheme.go b/core/rawdb/ancient_scheme.go index 371fd384ad..3e16d5c044 100644 --- a/core/rawdb/ancient_scheme.go +++ b/core/rawdb/ancient_scheme.go @@ -62,6 +62,7 @@ const ( stateHistoryStorageData = "storage.data" ) +// stateFreezerNoSnappy configures whether compression is disabled for the state freezer. var stateFreezerNoSnappy = map[string]bool{ stateHistoryMeta: true, stateHistoryAccountIndex: false, diff --git a/core/rawdb/freezer_batch.go b/core/rawdb/freezer_batch.go index 013d0b9d13..0b354cb6be 100644 --- a/core/rawdb/freezer_batch.go +++ b/core/rawdb/freezer_batch.go @@ -19,6 +19,7 @@ package rawdb import ( "fmt" "math" + "time" "github.com/ethereum/go-ethereum/rlp" "github.com/golang/snappy" @@ -188,9 +189,6 @@ func (batch *freezerTableBatch) commit() error { if err != nil { return err } - if err := batch.t.head.Sync(); err != nil { - return err - } dataSize := int64(len(batch.dataBuffer)) batch.dataBuffer = batch.dataBuffer[:0] @@ -208,6 +206,12 @@ func (batch *freezerTableBatch) commit() error { // Update metrics. batch.t.sizeGauge.Inc(dataSize + indexSize) batch.t.writeMeter.Mark(dataSize + indexSize) + + // Periodically sync the table, todo (rjl493456442) make it configurable? + if time.Since(batch.t.lastSync) > 30*time.Second { + batch.t.lastSync = time.Now() + return batch.t.syncWithNoLock() + } return nil } diff --git a/core/rawdb/freezer_meta.go b/core/rawdb/freezer_meta.go index 9eef9df351..f60fce79ac 100644 --- a/core/rawdb/freezer_meta.go +++ b/core/rawdb/freezer_meta.go @@ -17,93 +17,166 @@ package rawdb import ( + "errors" "io" "os" - "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/rlp" ) -const freezerVersion = 1 // The initial version tag of freezer table metadata +const ( + freezerTableV1 = 1 // Initial version of metadata struct + freezerTableV2 = 2 // Add field: 'flushOffset' +) -// freezerTableMeta wraps all the metadata of the freezer table. +// freezerTableMeta is a collection of additional properties that describe the +// freezer table. These properties are designed with error resilience, allowing +// them to be automatically corrected after an error occurs without significantly +// impacting overall correctness. type freezerTableMeta struct { - // Version is the versioning descriptor of the freezer table. - Version uint16 + file *os.File // file handler of metadata + version uint16 // version descriptor of the freezer table - // VirtualTail indicates how many items have been marked as deleted. - // Its value is equal to the number of items removed from the table - // plus the number of items hidden in the table, so it should never - // be lower than the "actual tail". - VirtualTail uint64 + // virtualTail represents the number of items marked as deleted. It is + // calculated as the sum of items removed from the table and the items + // hidden within the table, and should never be less than the "actual + // tail". + // + // If lost due to a crash or other reasons, it will be reset to the number + // of items deleted from the table, causing the previously hidden items + // to become visible, which is an acceptable consequence. + virtualTail uint64 + + // flushOffset represents the offset in the index file up to which the index + // items along with the corresponding data items in data files has been flushed + // (fsync’d) to disk. Beyond this offset, data integrity is not guaranteed, + // the extra index items along with the associated data items should be removed + // during the startup. + // + // The principle is that all data items above the flush offset are considered + // volatile and should be recoverable if they are discarded after the unclean + // shutdown. If data integrity is required, manually force a sync of the + // freezer before proceeding with further operations (e.g. do freezer.Sync() + // first and then write data to key value store in some circumstances). + // + // The offset could be moved forward by applying sync operation, or be moved + // backward in cases of head/tail truncation, etc. + flushOffset uint64 } -// newMetadata initializes the metadata object with the given virtual tail. -func newMetadata(tail uint64) *freezerTableMeta { +// decodeV1 attempts to decode the metadata structure in v1 format. If fails or +// the result is incompatible, nil is returned. +func decodeV1(file *os.File) *freezerTableMeta { + _, err := file.Seek(0, io.SeekStart) + if err != nil { + return nil + } + type obj struct { + Version uint16 + Tail uint64 + } + var o obj + if err := rlp.Decode(file, &o); err != nil { + return nil + } + if o.Version != freezerTableV1 { + return nil + } return &freezerTableMeta{ - Version: freezerVersion, - VirtualTail: tail, + file: file, + version: o.Version, + virtualTail: o.Tail, } } -// readMetadata reads the metadata of the freezer table from the -// given metadata file. -func readMetadata(file *os.File) (*freezerTableMeta, error) { +// decodeV2 attempts to decode the metadata structure in v2 format. If fails or +// the result is incompatible, nil is returned. +func decodeV2(file *os.File) *freezerTableMeta { _, err := file.Seek(0, io.SeekStart) if err != nil { - return nil, err + return nil } - var meta freezerTableMeta - if err := rlp.Decode(file, &meta); err != nil { - return nil, err + type obj struct { + Version uint16 + Tail uint64 + Offset uint64 + } + var o obj + if err := rlp.Decode(file, &o); err != nil { + return nil + } + if o.Version != freezerTableV2 { + return nil + } + return &freezerTableMeta{ + file: file, + version: freezerTableV2, + virtualTail: o.Tail, + flushOffset: o.Offset, } - return &meta, nil } -// writeMetadata writes the metadata of the freezer table into the -// given metadata file. -func writeMetadata(file *os.File, meta *freezerTableMeta) error { - _, err := file.Seek(0, io.SeekStart) - if err != nil { - return err - } - return rlp.Encode(file, meta) -} - -// loadMetadata loads the metadata from the given metadata file. -// Initializes the metadata file with the given "actual tail" if -// it's empty. -func loadMetadata(file *os.File, tail uint64) (*freezerTableMeta, error) { +// newMetadata initializes the metadata object, either by loading it from the file +// or by constructing a new one from scratch. +func newMetadata(file *os.File) (*freezerTableMeta, error) { stat, err := file.Stat() if err != nil { return nil, err } - // Write the metadata with the given actual tail into metadata file - // if it's non-existent. There are two possible scenarios here: - // - the freezer table is empty - // - the freezer table is legacy - // In both cases, write the meta into the file with the actual tail - // as the virtual tail. if stat.Size() == 0 { - m := newMetadata(tail) - if err := writeMetadata(file, m); err != nil { + m := &freezerTableMeta{ + file: file, + version: freezerTableV2, + virtualTail: 0, + flushOffset: 0, + } + if err := m.write(true); err != nil { return nil, err } return m, nil } - m, err := readMetadata(file) - if err != nil { - return nil, err + if m := decodeV2(file); m != nil { + return m, nil } - // Update the virtual tail with the given actual tail if it's even - // lower than it. Theoretically it shouldn't happen at all, print - // a warning here. - if m.VirtualTail < tail { - log.Warn("Updated virtual tail", "have", m.VirtualTail, "now", tail) - m.VirtualTail = tail - if err := writeMetadata(file, m); err != nil { - return nil, err - } + if m := decodeV1(file); m != nil { + return m, nil // legacy metadata } - return m, nil + return nil, errors.New("failed to decode metadata") +} + +// setVirtualTail sets the virtual tail and flushes the metadata if sync is true. +func (m *freezerTableMeta) setVirtualTail(tail uint64, sync bool) error { + m.virtualTail = tail + return m.write(sync) +} + +// setFlushOffset sets the flush offset and flushes the metadata if sync is true. +func (m *freezerTableMeta) setFlushOffset(offset uint64, sync bool) error { + m.flushOffset = offset + return m.write(sync) +} + +// write flushes the content of metadata into file and performs a fsync if required. +func (m *freezerTableMeta) write(sync bool) error { + type obj struct { + Version uint16 + Tail uint64 + Offset uint64 + } + var o obj + o.Version = freezerTableV2 // forcibly set it to v2 + o.Tail = m.virtualTail + o.Offset = m.flushOffset + + _, err := m.file.Seek(0, io.SeekStart) + if err != nil { + return err + } + if err := rlp.Encode(m.file, &o); err != nil { + return err + } + if !sync { + return nil + } + return m.file.Sync() } diff --git a/core/rawdb/freezer_meta_test.go b/core/rawdb/freezer_meta_test.go index 409e811026..b02c230901 100644 --- a/core/rawdb/freezer_meta_test.go +++ b/core/rawdb/freezer_meta_test.go @@ -19,6 +19,8 @@ package rawdb import ( "os" "testing" + + "github.com/ethereum/go-ethereum/rlp" ) func TestReadWriteFreezerTableMeta(t *testing.T) { @@ -27,36 +29,98 @@ func TestReadWriteFreezerTableMeta(t *testing.T) { t.Fatalf("Failed to create file %v", err) } defer f.Close() - err = writeMetadata(f, newMetadata(100)) + + meta, err := newMetadata(f) if err != nil { - t.Fatalf("Failed to write metadata %v", err) + t.Fatalf("Failed to new metadata %v", err) } - meta, err := readMetadata(f) + meta.setVirtualTail(100, false) + + meta, err = newMetadata(f) if err != nil { - t.Fatalf("Failed to read metadata %v", err) + t.Fatalf("Failed to reload metadata %v", err) } - if meta.Version != freezerVersion { + if meta.version != freezerTableV2 { t.Fatalf("Unexpected version field") } - if meta.VirtualTail != uint64(100) { + if meta.virtualTail != uint64(100) { t.Fatalf("Unexpected virtual tail field") } } -func TestInitializeFreezerTableMeta(t *testing.T) { +func TestUpgradeMetadata(t *testing.T) { f, err := os.CreateTemp(t.TempDir(), "*") if err != nil { t.Fatalf("Failed to create file %v", err) } defer f.Close() - meta, err := loadMetadata(f, uint64(100)) + + // Write legacy metadata into file + type obj struct { + Version uint16 + Tail uint64 + } + var o obj + o.Version = freezerTableV1 + o.Tail = 100 + + if err := rlp.Encode(f, &o); err != nil { + t.Fatalf("Failed to encode %v", err) + } + + // Reload the metadata, a silent upgrade is expected + meta, err := newMetadata(f) if err != nil { t.Fatalf("Failed to read metadata %v", err) } - if meta.Version != freezerVersion { - t.Fatalf("Unexpected version field") + if meta.version != freezerTableV1 { + t.Fatal("Unexpected version field") } - if meta.VirtualTail != uint64(100) { - t.Fatalf("Unexpected virtual tail field") + if meta.virtualTail != uint64(100) { + t.Fatal("Unexpected virtual tail field") + } + if meta.flushOffset != uint64(0) { + t.Fatal("Unexpected flush offset field") + } + + meta.setFlushOffset(100, true) + + meta, err = newMetadata(f) + if err != nil { + t.Fatalf("Failed to read metadata %v", err) + } + if meta.version != freezerTableV2 { + t.Fatal("Unexpected version field") + } + if meta.virtualTail != uint64(100) { + t.Fatal("Unexpected virtual tail field") + } + if meta.flushOffset != uint64(100) { + t.Fatal("Unexpected flush offset field") + } +} + +func TestInvalidMetadata(t *testing.T) { + f, err := os.CreateTemp(t.TempDir(), "*") + if err != nil { + t.Fatalf("Failed to create file %v", err) + } + defer f.Close() + + // Write invalid legacy metadata into file + type obj struct { + Version uint16 + Tail uint64 + } + var o obj + o.Version = freezerTableV2 // -> invalid version tag + o.Tail = 100 + + if err := rlp.Encode(f, &o); err != nil { + t.Fatalf("Failed to encode %v", err) + } + _, err = newMetadata(f) + if err == nil { + t.Fatal("Unexpected success") } } diff --git a/core/rawdb/freezer_table.go b/core/rawdb/freezer_table.go index a8ed17b371..7cc7c64c1f 100644 --- a/core/rawdb/freezer_table.go +++ b/core/rawdb/freezer_table.go @@ -108,11 +108,13 @@ type freezerTable struct { head *os.File // File descriptor for the data head of the table index *os.File // File descriptor for the indexEntry file of the table - meta *os.File // File descriptor for metadata of the table files map[uint32]*os.File // open files headId uint32 // number of the currently active head file tailId uint32 // number of the earliest file + metadata *freezerTableMeta // metadata of the table + lastSync time.Time // Timestamp when the last sync was performed + headBytes int64 // Number of bytes written to the head file readMeter metrics.Meter // Meter for measuring the effective amount of data read writeMeter metrics.Meter // Meter for measuring the effective amount of data written @@ -166,10 +168,17 @@ func newTable(path string, name string, readMeter metrics.Meter, writeMeter metr return nil, err } } + // Load metadata from the file. The tag will be true if legacy metadata + // is detected. + metadata, err := newMetadata(meta) + if err != nil { + return nil, err + } // Create the table and repair any past inconsistency tab := &freezerTable{ index: index, - meta: meta, + metadata: metadata, + lastSync: time.Now(), files: make(map[uint32]*os.File), readMeter: readMeter, writeMeter: writeMeter, @@ -221,13 +230,11 @@ func (t *freezerTable) repair() error { return err } // New file can't trigger this path } - // Validate the index file as it might contain some garbage data after the - // power failures. if err := t.repairIndex(); err != nil { return err } // Retrieve the file sizes and prepare for truncation. Note the file size - // might be changed after index validation. + // might be changed after index repair. if stat, err = t.index.Stat(); err != nil { return err } @@ -253,12 +260,14 @@ func (t *freezerTable) repair() error { t.tailId = firstIndex.filenum t.itemOffset.Store(uint64(firstIndex.offset)) - // Load metadata from the file - meta, err := loadMetadata(t.meta, t.itemOffset.Load()) - if err != nil { - return err + // Adjust the number of hidden items if it is less than the number of items + // being removed. + if t.itemOffset.Load() > t.metadata.virtualTail { + if err := t.metadata.setVirtualTail(t.itemOffset.Load(), true); err != nil { + return err + } } - t.itemHidden.Store(meta.VirtualTail) + t.itemHidden.Store(t.metadata.virtualTail) // Read the last index, use the default value in case the freezer is empty if offsetsSize == indexEntrySize { @@ -267,12 +276,6 @@ func (t *freezerTable) repair() error { t.index.ReadAt(buffer, offsetsSize-indexEntrySize) lastIndex.unmarshalBinary(buffer) } - // Print an error log if the index is corrupted due to an incorrect - // last index item. While it is theoretically possible to have a zero offset - // by storing all zero-size items, it is highly unlikely to occur in practice. - if lastIndex.offset == 0 && offsetsSize/indexEntrySize > 1 { - log.Error("Corrupted index file detected", "lastOffset", lastIndex.offset, "indexes", offsetsSize/indexEntrySize) - } if t.readonly { t.head, err = t.openFile(lastIndex.filenum, openFreezerFileForReadOnly) } else { @@ -293,6 +296,7 @@ func (t *freezerTable) repair() error { return fmt.Errorf("freezer table(path: %s, name: %s, num: %d) is corrupted", t.path, t.name, lastIndex.filenum) } verbose = true + // Truncate the head file to the last offset pointer if contentExp < contentSize { t.logger.Warn("Truncating dangling head", "indexed", contentExp, "stored", contentSize) @@ -304,11 +308,23 @@ func (t *freezerTable) repair() error { // Truncate the index to point within the head file if contentExp > contentSize { t.logger.Warn("Truncating dangling indexes", "indexes", offsetsSize/indexEntrySize, "indexed", contentExp, "stored", contentSize) - if err := truncateFreezerFile(t.index, offsetsSize-indexEntrySize); err != nil { + + newOffset := offsetsSize - indexEntrySize + if err := truncateFreezerFile(t.index, newOffset); err != nil { return err } offsetsSize -= indexEntrySize + // If the index file is truncated beyond the flush offset, move the flush + // offset back to the new end of the file. A crash may occur before the + // offset is updated, leaving a dangling reference that points to a position + // outside the file. If so, the offset will be reset to the new end of the + // file during the next run. + if t.metadata.flushOffset < uint64(newOffset) { + if err := t.metadata.setFlushOffset(uint64(newOffset), true); err != nil { + return err + } + } // Read the new head index, use the default value in case // the freezer is already empty. var newLastIndex indexEntry @@ -345,7 +361,7 @@ func (t *freezerTable) repair() error { if err := t.head.Sync(); err != nil { return err } - if err := t.meta.Sync(); err != nil { + if err := t.metadata.file.Sync(); err != nil { return err } } @@ -372,7 +388,60 @@ func (t *freezerTable) repair() error { return nil } -// repairIndex validates the integrity of the index file. According to the design, +func (t *freezerTable) repairIndex() error { + stat, err := t.index.Stat() + if err != nil { + return err + } + offset := stat.Size() + + // Validate the items in the index file to ensure the data integrity. + // It's possible some garbage data is retained in the index file after + // the power failures and should be truncated first. + offset, err = t.checkIndex(offset) + if err != nil { + return err + } + // If legacy metadata is detected, attempt to recover the offset from the + // index file to avoid clearing the entire table. + if t.metadata.version == freezerTableV1 { + t.logger.Info("Recover the flush offset for legacy table", "offset", offset) + return t.metadata.setFlushOffset(uint64(offset), true) + } + // Move the flush offset to the end of the file for fresh new freezer table + if offset == indexEntrySize && t.metadata.flushOffset == 0 { + return t.metadata.setFlushOffset(uint64(offset), true) + } + // Short circuit if the offset is aligned with the index file + if offset == int64(t.metadata.flushOffset) { + return nil + } + // Extra index items have been detected beyond the flush offset. Since these + // entries correspond to data that has not been fully flushed to disk in the + // last run (because of unclean shutdown), their integrity cannot be guaranteed. + // To ensure consistency, these index items will be truncated, as there is no + // reliable way to validate or recover their associated data. + if offset > int64(t.metadata.flushOffset) { + size := offset - int64(t.metadata.flushOffset) + if t.readonly { + return fmt.Errorf("index file(path: %s, name: %s) contains garbage data %d", t.path, t.name, size) + } + t.logger.Info("Truncate excessive items", "size", size) + return truncateFreezerFile(t.index, int64(t.metadata.flushOffset)) + } + // Flush offset refers to the position which exceeds the index file. The only + // possible scenario for this is: power failure or system crash occurs after + // truncating the segment in index file from head or tail, but without updating + // the flush offset. In this case, automatically reset the flush offset with + // the file size which implies the entire index file is complete. + if t.readonly { + return nil // do nothing in read only mode + } + t.logger.Info("Rewind the flush offset", "old", t.metadata.flushOffset, "new", offset) + return t.metadata.setFlushOffset(uint64(offset), true) +} + +// checkIndex validates the integrity of the index file. According to the design, // the initial entry in the file denotes the earliest data file along with the // count of deleted items. Following this, all subsequent entries in the file must // be in order. This function identifies any corrupted entries and truncates items @@ -392,18 +461,11 @@ func (t *freezerTable) repair() error { // leftover garbage or if all items in the table have zero size is impossible. // In such instances, the file will remain unchanged to prevent potential data // loss or misinterpretation. -func (t *freezerTable) repairIndex() error { - // Retrieve the file sizes and prepare for validation - stat, err := t.index.Stat() - if err != nil { - return err - } - size := stat.Size() - +func (t *freezerTable) checkIndex(size int64) (int64, error) { // Move the read cursor to the beginning of the file - _, err = t.index.Seek(0, io.SeekStart) + _, err := t.index.Seek(0, io.SeekStart) if err != nil { - return err + return 0, err } fr := bufio.NewReader(t.index) @@ -425,21 +487,21 @@ func (t *freezerTable) repairIndex() error { entry.unmarshalBinary(buff) return entry, nil } - truncate = func(offset int64) error { + truncate = func(offset int64) (int64, error) { if t.readonly { - return fmt.Errorf("index file is corrupted at %d, size: %d", offset, size) + return 0, fmt.Errorf("index file is corrupted at %d, size: %d", offset, size) } if err := truncateFreezerFile(t.index, offset); err != nil { - return err + return 0, err } log.Warn("Truncated index file", "offset", offset, "truncated", size-offset) - return nil + return offset, nil } ) for offset := int64(0); offset < size; offset += indexEntrySize { entry, err := read() if err != nil { - return err + return 0, err } if offset == 0 { head = entry @@ -468,10 +530,10 @@ func (t *freezerTable) repairIndex() error { // the seek operation anyway as a precaution. _, err = t.index.Seek(0, io.SeekEnd) if err != nil { - return err + return 0, err } log.Debug("Verified index file", "items", size/indexEntrySize, "elapsed", common.PrettyDuration(time.Since(start))) - return nil + return size, nil } // checkIndexItems validates the correctness of two consecutive index items based @@ -550,12 +612,23 @@ func (t *freezerTable) truncateHead(items uint64) error { // Truncate the index file first, the tail position is also considered // when calculating the new freezer table length. length := items - t.itemOffset.Load() - if err := truncateFreezerFile(t.index, int64(length+1)*indexEntrySize); err != nil { + newOffset := (length + 1) * indexEntrySize + if err := truncateFreezerFile(t.index, int64(newOffset)); err != nil { return err } if err := t.index.Sync(); err != nil { return err } + // If the index file is truncated beyond the flush offset, move the flush + // offset back to the new end of the file. A crash may occur before the + // offset is updated, leaving a dangling reference that points to a position + // outside the file. If so, the offset will be reset to the new end of the + // file during the next run. + if t.metadata.flushOffset > newOffset { + if err := t.metadata.setFlushOffset(newOffset, true); err != nil { + return err + } + } // Calculate the new expected size of the data file and truncate it var expected indexEntry if length == 0 { @@ -652,7 +725,10 @@ func (t *freezerTable) truncateTail(items uint64) error { } // Update the virtual tail marker and hidden these entries in table. t.itemHidden.Store(items) - if err := writeMetadata(t.meta, newMetadata(items)); err != nil { + + // Update the virtual tail without fsync, otherwise it will significantly + // impact the overall performance. + if err := t.metadata.setVirtualTail(items, false); err != nil { return err } // Hidden items still fall in the current tail file, no data file @@ -664,6 +740,18 @@ func (t *freezerTable) truncateTail(items uint64) error { if t.tailId > newTailId { return fmt.Errorf("invalid index, tail-file %d, item-file %d", t.tailId, newTailId) } + // Sync the table before performing the index tail truncation. A crash may + // occur after truncating the index file without updating the flush offset, + // leaving a dangling offset that points to a position outside the file. + // The offset will be rewound to the end of file during the next run + // automatically and implicitly assumes all the items within the file are + // complete. + // + // Therefore, forcibly flush everything above the offset to ensure this + // assumption is satisfied! + if err := t.syncWithNoLock(); err != nil { + return err + } // Count how many items can be deleted from the file. var ( newDeleted = items @@ -681,11 +769,6 @@ func (t *freezerTable) truncateTail(items uint64) error { } newDeleted = current } - // Commit the changes of metadata file first before manipulating - // the indexes file. - if err := t.meta.Sync(); err != nil { - return err - } // Close the index file before shorten it. if err := t.index.Close(); err != nil { return err @@ -716,6 +799,21 @@ func (t *freezerTable) truncateTail(items uint64) error { t.itemOffset.Store(newDeleted) t.releaseFilesBefore(t.tailId, true) + // Move the index flush offset backward due to the deletion of an index segment. + // A crash may occur before the offset is updated, leaving a dangling reference + // that points to a position outside the file. If so, the offset will be reset + // to the new end of the file during the next run. + // + // Note, both the index and head data file has been persisted before performing + // tail truncation and all the items in these files are regarded as complete. + shorten := indexEntrySize * (newDeleted - deleted) + if t.metadata.flushOffset <= shorten { + return fmt.Errorf("invalid index flush offset: %d, shorten: %d", t.metadata.flushOffset, shorten) + } else { + if err := t.metadata.setFlushOffset(t.metadata.flushOffset-shorten, true); err != nil { + return err + } + } // Retrieve the new size and update the total size counter newSize, err := t.sizeNolock() if err != nil { @@ -725,40 +823,30 @@ func (t *freezerTable) truncateTail(items uint64) error { return nil } -// Close closes all opened files. +// Close closes all opened files and finalizes the freezer table for use. +// This operation must be completed before shutdown to prevent the loss of +// recent writes. func (t *freezerTable) Close() error { t.lock.Lock() defer t.lock.Unlock() + if err := t.syncWithNoLock(); err != nil { + return err + } var errs []error - doClose := func(f *os.File, sync bool, close bool) { - if sync && !t.readonly { - if err := f.Sync(); err != nil { - errs = append(errs, err) - } - } - if close { - if err := f.Close(); err != nil { - errs = append(errs, err) - } + doClose := func(f *os.File) { + if err := f.Close(); err != nil { + errs = append(errs, err) } } - // Trying to fsync a file opened in rdonly causes "Access denied" - // error on Windows. - doClose(t.index, true, true) - doClose(t.meta, true, true) - - // The preopened non-head data-files are all opened in readonly. - // The head is opened in rw-mode, so we sync it here - but since it's also - // part of t.files, it will be closed in the loop below. - doClose(t.head, true, false) // sync but do not close - + doClose(t.index) + doClose(t.metadata.file) for _, f := range t.files { - doClose(f, false, true) // close but do not sync + doClose(f) } t.index = nil - t.meta = nil t.head = nil + t.metadata.file = nil if errs != nil { return fmt.Errorf("%v", errs) @@ -917,7 +1005,7 @@ func (t *freezerTable) retrieveItems(start, count, maxBytes uint64) ([]byte, []i defer t.lock.RUnlock() // Ensure the table and the item are accessible - if t.index == nil || t.head == nil || t.meta == nil { + if t.index == nil || t.head == nil || t.metadata.file == nil { return nil, nil, errClosed } var ( @@ -1042,6 +1130,9 @@ func (t *freezerTable) advanceHead() error { t.lock.Lock() defer t.lock.Unlock() + if err := t.syncWithNoLock(); err != nil { + return err + } // We open the next file in truncated mode -- if this file already // exists, we need to start over from scratch on it. nextID := t.headId + 1 @@ -1069,7 +1160,19 @@ func (t *freezerTable) advanceHead() error { func (t *freezerTable) Sync() error { t.lock.Lock() defer t.lock.Unlock() - if t.index == nil || t.head == nil || t.meta == nil { + + return t.syncWithNoLock() +} + +// syncWithNoLock is the internal version of Sync which assumes the lock is +// already held. +func (t *freezerTable) syncWithNoLock() error { + // Trying to fsync a file opened in rdonly causes "Access denied" + // error on Windows. + if t.readonly { + return nil + } + if t.index == nil || t.head == nil || t.metadata.file == nil { return errClosed } var err error @@ -1078,10 +1181,18 @@ func (t *freezerTable) Sync() error { err = e } } - trackError(t.index.Sync()) - trackError(t.meta.Sync()) trackError(t.head.Sync()) + + // A crash may occur before the offset is updated, leaving the offset + // points to a old position. If so, the extra items above the offset + // will be truncated during the next run. + stat, err := t.index.Stat() + if err != nil { + return err + } + offset := stat.Size() + trackError(t.metadata.setFlushOffset(uint64(offset), true)) return err } @@ -1097,13 +1208,8 @@ func (t *freezerTable) dumpIndexString(start, stop int64) string { } func (t *freezerTable) dumpIndex(w io.Writer, start, stop int64) { - meta, err := readMetadata(t.meta) - if err != nil { - fmt.Fprintf(w, "Failed to decode freezer table %v\n", err) - return - } - fmt.Fprintf(w, "Version %d count %d, deleted %d, hidden %d\n", meta.Version, - t.items.Load(), t.itemOffset.Load(), t.itemHidden.Load()) + fmt.Fprintf(w, "Version %d count %d, deleted %d, hidden %d\n", + t.metadata.version, t.items.Load(), t.itemOffset.Load(), t.itemHidden.Load()) buf := make([]byte, indexEntrySize) diff --git a/core/rawdb/freezer_table_test.go b/core/rawdb/freezer_table_test.go index fd6e3cf199..4bbb3aaf70 100644 --- a/core/rawdb/freezer_table_test.go +++ b/core/rawdb/freezer_table_test.go @@ -262,18 +262,6 @@ func TestSnappyDetection(t *testing.T) { f.Close() } - // Open without snappy - { - f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, false, false) - if err != nil { - t.Fatal(err) - } - if _, err = f.Retrieve(0); err == nil { - f.Close() - t.Fatalf("expected empty table") - } - } - // Open with snappy { f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true, false) @@ -286,6 +274,18 @@ func TestSnappyDetection(t *testing.T) { t.Fatalf("expected no error, got %v", err) } } + + // Open without snappy + { + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, false, false) + if err != nil { + t.Fatal(err) + } + if _, err = f.Retrieve(0); err == nil { + f.Close() + t.Fatalf("expected empty table") + } + } } func assertFileSize(f string, size int64) error { @@ -521,93 +521,53 @@ func TestFreezerOffset(t *testing.T) { fname := fmt.Sprintf("offset-%d", rand.Uint64()) // Fill table - { - f, err := newTable(os.TempDir(), fname, rm, wm, sg, 40, true, false) - if err != nil { - t.Fatal(err) - } - - // Write 6 x 20 bytes, splitting out into three files - batch := f.newBatch() - require.NoError(t, batch.AppendRaw(0, getChunk(20, 0xFF))) - require.NoError(t, batch.AppendRaw(1, getChunk(20, 0xEE))) - - require.NoError(t, batch.AppendRaw(2, getChunk(20, 0xdd))) - require.NoError(t, batch.AppendRaw(3, getChunk(20, 0xcc))) - - require.NoError(t, batch.AppendRaw(4, getChunk(20, 0xbb))) - require.NoError(t, batch.AppendRaw(5, getChunk(20, 0xaa))) - require.NoError(t, batch.commit()) - - t.Log(f.dumpIndexString(0, 100)) - f.Close() + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 40, true, false) + if err != nil { + t.Fatal(err) } + // Write 6 x 20 bytes, splitting out into three files + batch := f.newBatch() + require.NoError(t, batch.AppendRaw(0, getChunk(20, 0xFF))) + require.NoError(t, batch.AppendRaw(1, getChunk(20, 0xEE))) + + require.NoError(t, batch.AppendRaw(2, getChunk(20, 0xdd))) + require.NoError(t, batch.AppendRaw(3, getChunk(20, 0xcc))) + + require.NoError(t, batch.AppendRaw(4, getChunk(20, 0xbb))) + require.NoError(t, batch.AppendRaw(5, getChunk(20, 0xaa))) + require.NoError(t, batch.commit()) + + t.Log(f.dumpIndexString(0, 100)) + // Now crop it. - { - // delete files 0 and 1 - for i := 0; i < 2; i++ { - p := filepath.Join(os.TempDir(), fmt.Sprintf("%v.%04d.rdat", fname, i)) - if err := os.Remove(p); err != nil { - t.Fatal(err) - } - } - // Read the index file - p := filepath.Join(os.TempDir(), fmt.Sprintf("%v.ridx", fname)) - indexFile, err := os.OpenFile(p, os.O_RDWR, 0644) - if err != nil { - t.Fatal(err) - } - indexBuf := make([]byte, 7*indexEntrySize) - indexFile.Read(indexBuf) - - // Update the index file, so that we store - // [ file = 2, offset = 4 ] at index zero - - zeroIndex := indexEntry{ - filenum: uint32(2), // First file is 2 - offset: uint32(4), // We have removed four items - } - buf := zeroIndex.append(nil) - - // Overwrite index zero - copy(indexBuf, buf) - - // Remove the four next indices by overwriting - copy(indexBuf[indexEntrySize:], indexBuf[indexEntrySize*5:]) - indexFile.WriteAt(indexBuf, 0) - - // Need to truncate the moved index items - indexFile.Truncate(indexEntrySize * (1 + 2)) - indexFile.Close() - } + f.truncateTail(4) + f.Close() // Now open again - { - f, err := newTable(os.TempDir(), fname, rm, wm, sg, 40, true, false) - if err != nil { - t.Fatal(err) - } - defer f.Close() - t.Log(f.dumpIndexString(0, 100)) - - // It should allow writing item 6. - batch := f.newBatch() - require.NoError(t, batch.AppendRaw(6, getChunk(20, 0x99))) - require.NoError(t, batch.commit()) - - checkRetrieveError(t, f, map[uint64]error{ - 0: errOutOfBounds, - 1: errOutOfBounds, - 2: errOutOfBounds, - 3: errOutOfBounds, - }) - checkRetrieve(t, f, map[uint64][]byte{ - 4: getChunk(20, 0xbb), - 5: getChunk(20, 0xaa), - 6: getChunk(20, 0x99), - }) + f, err = newTable(os.TempDir(), fname, rm, wm, sg, 40, true, false) + if err != nil { + t.Fatal(err) } + t.Log(f.dumpIndexString(0, 100)) + + // It should allow writing item 6. + batch = f.newBatch() + require.NoError(t, batch.AppendRaw(6, getChunk(20, 0x99))) + require.NoError(t, batch.commit()) + + checkRetrieveError(t, f, map[uint64]error{ + 0: errOutOfBounds, + 1: errOutOfBounds, + 2: errOutOfBounds, + 3: errOutOfBounds, + }) + checkRetrieve(t, f, map[uint64][]byte{ + 4: getChunk(20, 0xbb), + 5: getChunk(20, 0xaa), + 6: getChunk(20, 0x99), + }) + f.Close() // Edit the index again, with a much larger initial offset of 1M. { @@ -1369,45 +1329,63 @@ func TestRandom(t *testing.T) { } func TestIndexValidation(t *testing.T) { - const ( - items = 30 - dataSize = 10 - ) + const dataSize = 10 + garbage := indexEntry{ filenum: 100, offset: 200, } var cases = []struct { - offset int64 - data []byte - expItems int + write int + offset int64 + data []byte + expItems int + hasCorruption bool }{ // extend index file with zero bytes at the end { - offset: (items + 1) * indexEntrySize, + write: 5, + offset: (5 + 1) * indexEntrySize, data: make([]byte, indexEntrySize), - expItems: 30, + expItems: 5, + }, + // extend index file with unaligned zero bytes at the end + { + write: 5, + offset: (5 + 1) * indexEntrySize, + data: make([]byte, indexEntrySize*1.5), + expItems: 5, }, // write garbage in the first non-head item { + write: 5, offset: indexEntrySize, data: garbage.append(nil), expItems: 0, }, - // write garbage in the first non-head item + // write garbage in the middle { - offset: (items/2 + 1) * indexEntrySize, + write: 5, + offset: 3 * indexEntrySize, data: garbage.append(nil), - expItems: items / 2, + expItems: 2, + }, + // fulfill the first data file (but not yet advanced), the zero bytes + // at tail should be truncated. + { + write: 10, + offset: 11 * indexEntrySize, + data: garbage.append(nil), + expItems: 10, }, } for _, c := range cases { fn := fmt.Sprintf("t-%d", rand.Uint64()) - f, err := newTable(os.TempDir(), fn, metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge(), 100, true, false) + f, err := newTable(os.TempDir(), fn, metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge(), 10*dataSize, true, false) if err != nil { t.Fatal(err) } - writeChunks(t, f, items, dataSize) + writeChunks(t, f, c.write, dataSize) // write corrupted data f.index.WriteAt(c.data, c.offset) @@ -1421,10 +1399,10 @@ func TestIndexValidation(t *testing.T) { for i := 0; i < c.expItems; i++ { exp := getChunk(10, i) got, err := f.Retrieve(uint64(i)) - if err != nil { + if err != nil && !c.hasCorruption { t.Fatalf("Failed to read from table, %v", err) } - if !bytes.Equal(exp, got) { + if !bytes.Equal(exp, got) && !c.hasCorruption { t.Fatalf("Unexpected item data, want: %v, got: %v", exp, got) } } @@ -1433,3 +1411,163 @@ func TestIndexValidation(t *testing.T) { } } } + +// TestFlushOffsetTracking tests the flush offset tracking. The offset moving +// in the test is mostly triggered by the advanceHead (new data file) and +// heda/tail truncation. +func TestFlushOffsetTracking(t *testing.T) { + const ( + items = 35 + dataSize = 10 + fileSize = 100 + ) + fn := fmt.Sprintf("t-%d", rand.Uint64()) + f, err := newTable(os.TempDir(), fn, metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge(), fileSize, true, false) + if err != nil { + t.Fatal(err) + } + // Data files: + // F1(10 items) -> F2(10 items) -> F3(10 items) -> F4(5 items, non-full) + writeChunks(t, f, items, dataSize) + + var cases = []struct { + op func(*freezerTable) + offset uint64 + }{ + { + // Data files: + // F1(10 items) -> F2(10 items) -> F3(10 items) -> F4(5 items, non-full) + func(f *freezerTable) {}, // no-op + 31 * indexEntrySize, + }, + { + // Write more items to fulfill the newest data file, but the file advance + // is not triggered. + + // Data files: + // F1(10 items) -> F2(10 items) -> F3(10 items) -> F4(10 items, full) + func(f *freezerTable) { + batch := f.newBatch() + for i := 0; i < 5; i++ { + batch.AppendRaw(items+uint64(i), make([]byte, dataSize)) + } + batch.commit() + }, + 31 * indexEntrySize, + }, + { + // Write more items to trigger the data file advance + + // Data files: + // F1(10 items) -> F2(10 items) -> F3(10 items) -> F4(10 items) -> F5(1 item) + func(f *freezerTable) { + batch := f.newBatch() + batch.AppendRaw(items+5, make([]byte, dataSize)) + batch.commit() + }, + 41 * indexEntrySize, + }, + { + // Head truncate + + // Data files: + // F1(10 items) -> F2(10 items) -> F3(10 items) -> F4(10 items) -> F5(0 item) + func(f *freezerTable) { + f.truncateHead(items + 5) + }, + 41 * indexEntrySize, + }, + { + // Tail truncate + + // Data files: + // F1(1 hidden, 9 visible) -> F2(10 items) -> F3(10 items) -> F4(10 items) -> F5(0 item) + func(f *freezerTable) { + f.truncateTail(1) + }, + 41 * indexEntrySize, + }, + { + // Tail truncate + + // Data files: + // F2(10 items) -> F3(10 items) -> F4(10 items) -> F5(0 item) + func(f *freezerTable) { + f.truncateTail(10) + }, + 31 * indexEntrySize, + }, + { + // Tail truncate + + // Data files: + // F4(10 items) -> F5(0 item) + func(f *freezerTable) { + f.truncateTail(30) + }, + 11 * indexEntrySize, + }, + { + // Head truncate + + // Data files: + // F4(9 items) + func(f *freezerTable) { + f.truncateHead(items + 4) + }, + 10 * indexEntrySize, + }, + } + for _, c := range cases { + c.op(f) + if f.metadata.flushOffset != c.offset { + t.Fatalf("Unexpected index flush offset, want: %d, got: %d", c.offset, f.metadata.flushOffset) + } + } +} + +func TestTailTruncationCrash(t *testing.T) { + const ( + items = 35 + dataSize = 10 + fileSize = 100 + ) + fn := fmt.Sprintf("t-%d", rand.Uint64()) + f, err := newTable(os.TempDir(), fn, metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge(), fileSize, true, false) + if err != nil { + t.Fatal(err) + } + // Data files: + // F1(10 items) -> F2(10 items) -> F3(10 items) -> F4(5 items, non-full) + writeChunks(t, f, items, dataSize) + + // The latest 5 items are not persisted yet + if f.metadata.flushOffset != 31*indexEntrySize { + t.Fatalf("Unexpected index flush offset, want: %d, got: %d", 31*indexEntrySize, f.metadata.flushOffset) + } + + f.truncateTail(5) + if f.metadata.flushOffset != 31*indexEntrySize { + t.Fatalf("Unexpected index flush offset, want: %d, got: %d", 31*indexEntrySize, f.metadata.flushOffset) + } + + // Truncate the first 10 items which results in the first data file + // being removed. The offset should be moved to 26*indexEntrySize. + f.truncateTail(10) + if f.metadata.flushOffset != 26*indexEntrySize { + t.Fatalf("Unexpected index flush offset, want: %d, got: %d", 26*indexEntrySize, f.metadata.flushOffset) + } + + // Write the offset back to 31*indexEntrySize to simulate a crash + // which occurs after truncating the index file without updating + // the offset + f.metadata.setFlushOffset(31*indexEntrySize, true) + + f, err = newTable(os.TempDir(), fn, metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge(), fileSize, true, false) + if err != nil { + t.Fatal(err) + } + if f.metadata.flushOffset != 26*indexEntrySize { + t.Fatalf("Unexpected index flush offset, want: %d, got: %d", 26*indexEntrySize, f.metadata.flushOffset) + } +}