core/rawdb: introduce flush offset in freezer (#30392)
This is a follow-up PR to #29792 to get rid of the data file sync. **This is a non-backward compatible change, which increments the database version from 8 to 9**. We introduce a flushOffset for each freezer table, which tracks the position of the most recently fsync’d item in the index file. When this offset moves forward, it indicates that all index entries below it, along with their corresponding data items, have been properly persisted to disk. The offset can also be moved backward when truncating from either the head or tail of the file. Previously, the data file required an explicit fsync after every mutation, which was highly inefficient. With the introduction of the flush offset, the synchronization strategy becomes more flexible, allowing the freezer to sync every 30 seconds instead. The data items above the flush offset are regarded volatile and callers must ensure they are recoverable after the unclean shutdown, or explicitly sync the freezer before any proceeding operations. --------- Co-authored-by: Felix Lange <fjl@twurst.com>
This commit is contained in:
parent
e26dd774a9
commit
0ad0966cec
|
@ -113,23 +113,29 @@ const (
|
|||
// * the `BlockNumber`, `TxHash`, `TxIndex`, `BlockHash` and `Index` fields of log are deleted
|
||||
// * the `Bloom` field of receipt is deleted
|
||||
// * the `BlockIndex` and `TxIndex` fields of txlookup are deleted
|
||||
//
|
||||
// - Version 5
|
||||
// The following incompatible database changes were added:
|
||||
// * the `TxHash`, `GasCost`, and `ContractAddress` fields are no longer stored for a receipt
|
||||
// * the `TxHash`, `GasCost`, and `ContractAddress` fields are computed by looking up the
|
||||
// receipts' corresponding block
|
||||
//
|
||||
// - Version 6
|
||||
// The following incompatible database changes were added:
|
||||
// * Transaction lookup information stores the corresponding block number instead of block hash
|
||||
//
|
||||
// - Version 7
|
||||
// The following incompatible database changes were added:
|
||||
// * Use freezer as the ancient database to maintain all ancient data
|
||||
//
|
||||
// - Version 8
|
||||
// The following incompatible database changes were added:
|
||||
// * New scheme for contract code in order to separate the codes and trie nodes
|
||||
//
|
||||
// - Version 9
|
||||
// Total difficulty has been removed from both the key-value store and the
|
||||
// ancient store, the td freezer table has been deprecated since that.
|
||||
// The following incompatible database changes were added:
|
||||
// * Total difficulty has been removed from both the key-value store and the ancient store.
|
||||
// * The metadata structure of freezer is changed by adding 'flushOffset'
|
||||
BlockChainVersion uint64 = 9
|
||||
)
|
||||
|
||||
|
|
|
@ -849,6 +849,7 @@ func TestHeadersRLPStorage(t *testing.T) {
|
|||
t.Fatalf("failed to create database with ancient backend")
|
||||
}
|
||||
defer db.Close()
|
||||
|
||||
// Create blocks
|
||||
var chain []*types.Block
|
||||
var pHash common.Hash
|
||||
|
@ -864,7 +865,7 @@ func TestHeadersRLPStorage(t *testing.T) {
|
|||
chain = append(chain, block)
|
||||
pHash = block.Hash()
|
||||
}
|
||||
var receipts []types.Receipts = make([]types.Receipts, 100)
|
||||
receipts := make([]types.Receipts, 100)
|
||||
// Write first half to ancients
|
||||
WriteAncientBlocks(db, chain[:50], receipts[:50])
|
||||
// Write second half to db
|
||||
|
|
|
@ -58,6 +58,7 @@ const (
|
|||
stateHistoryStorageData = "storage.data"
|
||||
)
|
||||
|
||||
// stateFreezerNoSnappy configures whether compression is disabled for the state freezer.
|
||||
var stateFreezerNoSnappy = map[string]bool{
|
||||
stateHistoryMeta: true,
|
||||
stateHistoryAccountIndex: false,
|
||||
|
|
|
@ -19,6 +19,7 @@ package rawdb
|
|||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/rlp"
|
||||
"github.com/golang/snappy"
|
||||
|
@ -188,9 +189,6 @@ func (batch *freezerTableBatch) commit() error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := batch.t.head.Sync(); err != nil {
|
||||
return err
|
||||
}
|
||||
dataSize := int64(len(batch.dataBuffer))
|
||||
batch.dataBuffer = batch.dataBuffer[:0]
|
||||
|
||||
|
@ -208,6 +206,12 @@ func (batch *freezerTableBatch) commit() error {
|
|||
// Update metrics.
|
||||
batch.t.sizeGauge.Inc(dataSize + indexSize)
|
||||
batch.t.writeMeter.Mark(dataSize + indexSize)
|
||||
|
||||
// Periodically sync the table, todo (rjl493456442) make it configurable?
|
||||
if time.Since(batch.t.lastSync) > 30*time.Second {
|
||||
batch.t.lastSync = time.Now()
|
||||
return batch.t.Sync()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -17,93 +17,173 @@
|
|||
package rawdb
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"io"
|
||||
"math"
|
||||
"os"
|
||||
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/rlp"
|
||||
)
|
||||
|
||||
const freezerVersion = 1 // The initial version tag of freezer table metadata
|
||||
const (
|
||||
freezerTableV1 = 1 // Initial version of metadata struct
|
||||
freezerTableV2 = 2 // Add field: 'flushOffset'
|
||||
freezerVersion = freezerTableV2 // The current used version
|
||||
)
|
||||
|
||||
// freezerTableMeta wraps all the metadata of the freezer table.
|
||||
// freezerTableMeta is a collection of additional properties that describe the
|
||||
// freezer table. These properties are designed with error resilience, allowing
|
||||
// them to be automatically corrected after an error occurs without significantly
|
||||
// impacting overall correctness.
|
||||
type freezerTableMeta struct {
|
||||
// Version is the versioning descriptor of the freezer table.
|
||||
Version uint16
|
||||
file *os.File // file handler of metadata
|
||||
version uint16 // version descriptor of the freezer table
|
||||
|
||||
// VirtualTail indicates how many items have been marked as deleted.
|
||||
// Its value is equal to the number of items removed from the table
|
||||
// plus the number of items hidden in the table, so it should never
|
||||
// be lower than the "actual tail".
|
||||
VirtualTail uint64
|
||||
// virtualTail represents the number of items marked as deleted. It is
|
||||
// calculated as the sum of items removed from the table and the items
|
||||
// hidden within the table, and should never be less than the "actual
|
||||
// tail".
|
||||
//
|
||||
// If lost due to a crash or other reasons, it will be reset to the number
|
||||
// of items deleted from the table, causing the previously hidden items
|
||||
// to become visible, which is an acceptable consequence.
|
||||
virtualTail uint64
|
||||
|
||||
// flushOffset represents the offset in the index file up to which the index
|
||||
// items along with the corresponding data items in data files has been flushed
|
||||
// (fsync’d) to disk. Beyond this offset, data integrity is not guaranteed,
|
||||
// the extra index items along with the associated data items should be removed
|
||||
// during the startup.
|
||||
//
|
||||
// The principle is that all data items above the flush offset are considered
|
||||
// volatile and should be recoverable if they are discarded after the unclean
|
||||
// shutdown. If data integrity is required, manually force a sync of the
|
||||
// freezer before proceeding with further operations (e.g. do freezer.Sync()
|
||||
// first and then write data to key value store in some circumstances).
|
||||
//
|
||||
// The offset could be moved forward by applying sync operation, or be moved
|
||||
// backward in cases of head/tail truncation, etc.
|
||||
flushOffset int64
|
||||
}
|
||||
|
||||
// newMetadata initializes the metadata object with the given virtual tail.
|
||||
func newMetadata(tail uint64) *freezerTableMeta {
|
||||
// decodeV1 attempts to decode the metadata structure in v1 format. If fails or
|
||||
// the result is incompatible, nil is returned.
|
||||
func decodeV1(file *os.File) *freezerTableMeta {
|
||||
_, err := file.Seek(0, io.SeekStart)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
type obj struct {
|
||||
Version uint16
|
||||
Tail uint64
|
||||
}
|
||||
var o obj
|
||||
if err := rlp.Decode(file, &o); err != nil {
|
||||
return nil
|
||||
}
|
||||
if o.Version != freezerTableV1 {
|
||||
return nil
|
||||
}
|
||||
return &freezerTableMeta{
|
||||
Version: freezerVersion,
|
||||
VirtualTail: tail,
|
||||
file: file,
|
||||
version: o.Version,
|
||||
virtualTail: o.Tail,
|
||||
}
|
||||
}
|
||||
|
||||
// readMetadata reads the metadata of the freezer table from the
|
||||
// given metadata file.
|
||||
func readMetadata(file *os.File) (*freezerTableMeta, error) {
|
||||
// decodeV2 attempts to decode the metadata structure in v2 format. If fails or
|
||||
// the result is incompatible, nil is returned.
|
||||
func decodeV2(file *os.File) *freezerTableMeta {
|
||||
_, err := file.Seek(0, io.SeekStart)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil
|
||||
}
|
||||
var meta freezerTableMeta
|
||||
if err := rlp.Decode(file, &meta); err != nil {
|
||||
return nil, err
|
||||
type obj struct {
|
||||
Version uint16
|
||||
Tail uint64
|
||||
Offset uint64
|
||||
}
|
||||
var o obj
|
||||
if err := rlp.Decode(file, &o); err != nil {
|
||||
return nil
|
||||
}
|
||||
if o.Version != freezerTableV2 {
|
||||
return nil
|
||||
}
|
||||
if o.Offset > math.MaxInt64 {
|
||||
log.Error("Invalid flushOffset %d in freezer metadata", o.Offset, "file", file.Name())
|
||||
return nil
|
||||
}
|
||||
return &freezerTableMeta{
|
||||
file: file,
|
||||
version: freezerTableV2,
|
||||
virtualTail: o.Tail,
|
||||
flushOffset: int64(o.Offset),
|
||||
}
|
||||
return &meta, nil
|
||||
}
|
||||
|
||||
// writeMetadata writes the metadata of the freezer table into the
|
||||
// given metadata file.
|
||||
func writeMetadata(file *os.File, meta *freezerTableMeta) error {
|
||||
_, err := file.Seek(0, io.SeekStart)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return rlp.Encode(file, meta)
|
||||
}
|
||||
|
||||
// loadMetadata loads the metadata from the given metadata file.
|
||||
// Initializes the metadata file with the given "actual tail" if
|
||||
// it's empty.
|
||||
func loadMetadata(file *os.File, tail uint64) (*freezerTableMeta, error) {
|
||||
// newMetadata initializes the metadata object, either by loading it from the file
|
||||
// or by constructing a new one from scratch.
|
||||
func newMetadata(file *os.File) (*freezerTableMeta, error) {
|
||||
stat, err := file.Stat()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Write the metadata with the given actual tail into metadata file
|
||||
// if it's non-existent. There are two possible scenarios here:
|
||||
// - the freezer table is empty
|
||||
// - the freezer table is legacy
|
||||
// In both cases, write the meta into the file with the actual tail
|
||||
// as the virtual tail.
|
||||
if stat.Size() == 0 {
|
||||
m := newMetadata(tail)
|
||||
if err := writeMetadata(file, m); err != nil {
|
||||
m := &freezerTableMeta{
|
||||
file: file,
|
||||
version: freezerTableV2,
|
||||
virtualTail: 0,
|
||||
flushOffset: 0,
|
||||
}
|
||||
if err := m.write(true); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return m, nil
|
||||
}
|
||||
m, err := readMetadata(file)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
if m := decodeV2(file); m != nil {
|
||||
return m, nil
|
||||
}
|
||||
// Update the virtual tail with the given actual tail if it's even
|
||||
// lower than it. Theoretically it shouldn't happen at all, print
|
||||
// a warning here.
|
||||
if m.VirtualTail < tail {
|
||||
log.Warn("Updated virtual tail", "have", m.VirtualTail, "now", tail)
|
||||
m.VirtualTail = tail
|
||||
if err := writeMetadata(file, m); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if m := decodeV1(file); m != nil {
|
||||
return m, nil // legacy metadata
|
||||
}
|
||||
return m, nil
|
||||
return nil, errors.New("failed to decode metadata")
|
||||
}
|
||||
|
||||
// setVirtualTail sets the virtual tail and flushes the metadata if sync is true.
|
||||
func (m *freezerTableMeta) setVirtualTail(tail uint64, sync bool) error {
|
||||
m.virtualTail = tail
|
||||
return m.write(sync)
|
||||
}
|
||||
|
||||
// setFlushOffset sets the flush offset and flushes the metadata if sync is true.
|
||||
func (m *freezerTableMeta) setFlushOffset(offset int64, sync bool) error {
|
||||
m.flushOffset = offset
|
||||
return m.write(sync)
|
||||
}
|
||||
|
||||
// write flushes the content of metadata into file and performs a fsync if required.
|
||||
func (m *freezerTableMeta) write(sync bool) error {
|
||||
type obj struct {
|
||||
Version uint16
|
||||
Tail uint64
|
||||
Offset uint64
|
||||
}
|
||||
var o obj
|
||||
o.Version = freezerVersion // forcibly use the current version
|
||||
o.Tail = m.virtualTail
|
||||
o.Offset = uint64(m.flushOffset)
|
||||
|
||||
_, err := m.file.Seek(0, io.SeekStart)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := rlp.Encode(m.file, &o); err != nil {
|
||||
return err
|
||||
}
|
||||
if !sync {
|
||||
return nil
|
||||
}
|
||||
return m.file.Sync()
|
||||
}
|
||||
|
|
|
@ -19,6 +19,8 @@ package rawdb
|
|||
import (
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/ethereum/go-ethereum/rlp"
|
||||
)
|
||||
|
||||
func TestReadWriteFreezerTableMeta(t *testing.T) {
|
||||
|
@ -27,36 +29,98 @@ func TestReadWriteFreezerTableMeta(t *testing.T) {
|
|||
t.Fatalf("Failed to create file %v", err)
|
||||
}
|
||||
defer f.Close()
|
||||
err = writeMetadata(f, newMetadata(100))
|
||||
|
||||
meta, err := newMetadata(f)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to write metadata %v", err)
|
||||
t.Fatalf("Failed to new metadata %v", err)
|
||||
}
|
||||
meta, err := readMetadata(f)
|
||||
meta.setVirtualTail(100, false)
|
||||
|
||||
meta, err = newMetadata(f)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to read metadata %v", err)
|
||||
t.Fatalf("Failed to reload metadata %v", err)
|
||||
}
|
||||
if meta.Version != freezerVersion {
|
||||
if meta.version != freezerTableV2 {
|
||||
t.Fatalf("Unexpected version field")
|
||||
}
|
||||
if meta.VirtualTail != uint64(100) {
|
||||
if meta.virtualTail != uint64(100) {
|
||||
t.Fatalf("Unexpected virtual tail field")
|
||||
}
|
||||
}
|
||||
|
||||
func TestInitializeFreezerTableMeta(t *testing.T) {
|
||||
func TestUpgradeMetadata(t *testing.T) {
|
||||
f, err := os.CreateTemp(t.TempDir(), "*")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create file %v", err)
|
||||
}
|
||||
defer f.Close()
|
||||
meta, err := loadMetadata(f, uint64(100))
|
||||
|
||||
// Write legacy metadata into file
|
||||
type obj struct {
|
||||
Version uint16
|
||||
Tail uint64
|
||||
}
|
||||
var o obj
|
||||
o.Version = freezerTableV1
|
||||
o.Tail = 100
|
||||
|
||||
if err := rlp.Encode(f, &o); err != nil {
|
||||
t.Fatalf("Failed to encode %v", err)
|
||||
}
|
||||
|
||||
// Reload the metadata, a silent upgrade is expected
|
||||
meta, err := newMetadata(f)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to read metadata %v", err)
|
||||
}
|
||||
if meta.Version != freezerVersion {
|
||||
t.Fatalf("Unexpected version field")
|
||||
if meta.version != freezerTableV1 {
|
||||
t.Fatal("Unexpected version field")
|
||||
}
|
||||
if meta.VirtualTail != uint64(100) {
|
||||
t.Fatalf("Unexpected virtual tail field")
|
||||
if meta.virtualTail != uint64(100) {
|
||||
t.Fatal("Unexpected virtual tail field")
|
||||
}
|
||||
if meta.flushOffset != 0 {
|
||||
t.Fatal("Unexpected flush offset field")
|
||||
}
|
||||
|
||||
meta.setFlushOffset(100, true)
|
||||
|
||||
meta, err = newMetadata(f)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to read metadata %v", err)
|
||||
}
|
||||
if meta.version != freezerTableV2 {
|
||||
t.Fatal("Unexpected version field")
|
||||
}
|
||||
if meta.virtualTail != uint64(100) {
|
||||
t.Fatal("Unexpected virtual tail field")
|
||||
}
|
||||
if meta.flushOffset != 100 {
|
||||
t.Fatal("Unexpected flush offset field")
|
||||
}
|
||||
}
|
||||
|
||||
func TestInvalidMetadata(t *testing.T) {
|
||||
f, err := os.CreateTemp(t.TempDir(), "*")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create file %v", err)
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
// Write invalid legacy metadata into file
|
||||
type obj struct {
|
||||
Version uint16
|
||||
Tail uint64
|
||||
}
|
||||
var o obj
|
||||
o.Version = freezerTableV2 // -> invalid version tag
|
||||
o.Tail = 100
|
||||
|
||||
if err := rlp.Encode(f, &o); err != nil {
|
||||
t.Fatalf("Failed to encode %v", err)
|
||||
}
|
||||
_, err = newMetadata(f)
|
||||
if err == nil {
|
||||
t.Fatal("Unexpected success")
|
||||
}
|
||||
}
|
||||
|
|
|
@ -108,11 +108,13 @@ type freezerTable struct {
|
|||
|
||||
head *os.File // File descriptor for the data head of the table
|
||||
index *os.File // File descriptor for the indexEntry file of the table
|
||||
meta *os.File // File descriptor for metadata of the table
|
||||
files map[uint32]*os.File // open files
|
||||
headId uint32 // number of the currently active head file
|
||||
tailId uint32 // number of the earliest file
|
||||
|
||||
metadata *freezerTableMeta // metadata of the table
|
||||
lastSync time.Time // Timestamp when the last sync was performed
|
||||
|
||||
headBytes int64 // Number of bytes written to the head file
|
||||
readMeter *metrics.Meter // Meter for measuring the effective amount of data read
|
||||
writeMeter *metrics.Meter // Meter for measuring the effective amount of data written
|
||||
|
@ -166,10 +168,17 @@ func newTable(path string, name string, readMeter, writeMeter *metrics.Meter, si
|
|||
return nil, err
|
||||
}
|
||||
}
|
||||
// Load metadata from the file. The tag will be true if legacy metadata
|
||||
// is detected.
|
||||
metadata, err := newMetadata(meta)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Create the table and repair any past inconsistency
|
||||
tab := &freezerTable{
|
||||
index: index,
|
||||
meta: meta,
|
||||
metadata: metadata,
|
||||
lastSync: time.Now(),
|
||||
files: make(map[uint32]*os.File),
|
||||
readMeter: readMeter,
|
||||
writeMeter: writeMeter,
|
||||
|
@ -221,13 +230,11 @@ func (t *freezerTable) repair() error {
|
|||
return err
|
||||
} // New file can't trigger this path
|
||||
}
|
||||
// Validate the index file as it might contain some garbage data after the
|
||||
// power failures.
|
||||
if err := t.repairIndex(); err != nil {
|
||||
return err
|
||||
}
|
||||
// Retrieve the file sizes and prepare for truncation. Note the file size
|
||||
// might be changed after index validation.
|
||||
// might be changed after index repair.
|
||||
if stat, err = t.index.Stat(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -253,12 +260,14 @@ func (t *freezerTable) repair() error {
|
|||
t.tailId = firstIndex.filenum
|
||||
t.itemOffset.Store(uint64(firstIndex.offset))
|
||||
|
||||
// Load metadata from the file
|
||||
meta, err := loadMetadata(t.meta, t.itemOffset.Load())
|
||||
if err != nil {
|
||||
return err
|
||||
// Adjust the number of hidden items if it is less than the number of items
|
||||
// being removed.
|
||||
if t.itemOffset.Load() > t.metadata.virtualTail {
|
||||
if err := t.metadata.setVirtualTail(t.itemOffset.Load(), true); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
t.itemHidden.Store(meta.VirtualTail)
|
||||
t.itemHidden.Store(t.metadata.virtualTail)
|
||||
|
||||
// Read the last index, use the default value in case the freezer is empty
|
||||
if offsetsSize == indexEntrySize {
|
||||
|
@ -267,12 +276,6 @@ func (t *freezerTable) repair() error {
|
|||
t.index.ReadAt(buffer, offsetsSize-indexEntrySize)
|
||||
lastIndex.unmarshalBinary(buffer)
|
||||
}
|
||||
// Print an error log if the index is corrupted due to an incorrect
|
||||
// last index item. While it is theoretically possible to have a zero offset
|
||||
// by storing all zero-size items, it is highly unlikely to occur in practice.
|
||||
if lastIndex.offset == 0 && offsetsSize/indexEntrySize > 1 {
|
||||
log.Error("Corrupted index file detected", "lastOffset", lastIndex.offset, "indexes", offsetsSize/indexEntrySize)
|
||||
}
|
||||
if t.readonly {
|
||||
t.head, err = t.openFile(lastIndex.filenum, openFreezerFileForReadOnly)
|
||||
} else {
|
||||
|
@ -293,6 +296,7 @@ func (t *freezerTable) repair() error {
|
|||
return fmt.Errorf("freezer table(path: %s, name: %s, num: %d) is corrupted", t.path, t.name, lastIndex.filenum)
|
||||
}
|
||||
verbose = true
|
||||
|
||||
// Truncate the head file to the last offset pointer
|
||||
if contentExp < contentSize {
|
||||
t.logger.Warn("Truncating dangling head", "indexed", contentExp, "stored", contentSize)
|
||||
|
@ -304,11 +308,23 @@ func (t *freezerTable) repair() error {
|
|||
// Truncate the index to point within the head file
|
||||
if contentExp > contentSize {
|
||||
t.logger.Warn("Truncating dangling indexes", "indexes", offsetsSize/indexEntrySize, "indexed", contentExp, "stored", contentSize)
|
||||
if err := truncateFreezerFile(t.index, offsetsSize-indexEntrySize); err != nil {
|
||||
|
||||
newOffset := offsetsSize - indexEntrySize
|
||||
if err := truncateFreezerFile(t.index, newOffset); err != nil {
|
||||
return err
|
||||
}
|
||||
offsetsSize -= indexEntrySize
|
||||
|
||||
// If the index file is truncated beyond the flush offset, move the flush
|
||||
// offset back to the new end of the file. A crash may occur before the
|
||||
// offset is updated, leaving a dangling reference that points to a position
|
||||
// outside the file. If so, the offset will be reset to the new end of the
|
||||
// file during the next run.
|
||||
if t.metadata.flushOffset > newOffset {
|
||||
if err := t.metadata.setFlushOffset(newOffset, true); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
// Read the new head index, use the default value in case
|
||||
// the freezer is already empty.
|
||||
var newLastIndex indexEntry
|
||||
|
@ -345,7 +361,7 @@ func (t *freezerTable) repair() error {
|
|||
if err := t.head.Sync(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := t.meta.Sync(); err != nil {
|
||||
if err := t.metadata.file.Sync(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -372,7 +388,65 @@ func (t *freezerTable) repair() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// repairIndex validates the integrity of the index file. According to the design,
|
||||
func (t *freezerTable) repairIndex() error {
|
||||
stat, err := t.index.Stat()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
size := stat.Size()
|
||||
|
||||
// Validate the items in the index file to ensure the data integrity.
|
||||
// It's possible some garbage data is retained in the index file after
|
||||
// the power failures and should be truncated first.
|
||||
size, err = t.checkIndex(size)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// If legacy metadata is detected, attempt to recover the offset from the
|
||||
// index file to avoid clearing the entire table.
|
||||
if t.metadata.version == freezerTableV1 {
|
||||
t.logger.Info("Recovering freezer flushOffset for legacy table", "offset", size)
|
||||
return t.metadata.setFlushOffset(size, true)
|
||||
}
|
||||
|
||||
switch {
|
||||
case size == indexEntrySize && t.metadata.flushOffset == 0:
|
||||
// It's a new freezer table with no content.
|
||||
// Move the flush offset to the end of the file.
|
||||
return t.metadata.setFlushOffset(size, true)
|
||||
|
||||
case size == t.metadata.flushOffset:
|
||||
// flushOffset is aligned with the index file, all is well.
|
||||
return nil
|
||||
|
||||
case size > t.metadata.flushOffset:
|
||||
// Extra index items have been detected beyond the flush offset. Since these
|
||||
// entries correspond to data that has not been fully flushed to disk in the
|
||||
// last run (because of unclean shutdown), their integrity cannot be guaranteed.
|
||||
// To ensure consistency, these index items will be truncated, as there is no
|
||||
// reliable way to validate or recover their associated data.
|
||||
extraSize := size - t.metadata.flushOffset
|
||||
if t.readonly {
|
||||
return fmt.Errorf("index file(path: %s, name: %s) contains %d garbage data bytes", t.path, t.name, extraSize)
|
||||
}
|
||||
t.logger.Warn("Truncating freezer items after flushOffset", "size", extraSize)
|
||||
return truncateFreezerFile(t.index, t.metadata.flushOffset)
|
||||
|
||||
default: // size < flushOffset
|
||||
// Flush offset refers to a position larger than index file. The only
|
||||
// possible scenario for this is: a power failure or system crash has occurred after
|
||||
// truncating the segment in index file from head or tail, but without updating
|
||||
// the flush offset. In this case, automatically reset the flush offset with
|
||||
// the file size which implies the entire index file is complete.
|
||||
if t.readonly {
|
||||
return nil // do nothing in read only mode
|
||||
}
|
||||
t.logger.Warn("Rewinding freezer flushOffset", "old", t.metadata.flushOffset, "new", size)
|
||||
return t.metadata.setFlushOffset(size, true)
|
||||
}
|
||||
}
|
||||
|
||||
// checkIndex validates the integrity of the index file. According to the design,
|
||||
// the initial entry in the file denotes the earliest data file along with the
|
||||
// count of deleted items. Following this, all subsequent entries in the file must
|
||||
// be in order. This function identifies any corrupted entries and truncates items
|
||||
|
@ -392,18 +466,11 @@ func (t *freezerTable) repair() error {
|
|||
// leftover garbage or if all items in the table have zero size is impossible.
|
||||
// In such instances, the file will remain unchanged to prevent potential data
|
||||
// loss or misinterpretation.
|
||||
func (t *freezerTable) repairIndex() error {
|
||||
// Retrieve the file sizes and prepare for validation
|
||||
stat, err := t.index.Stat()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
size := stat.Size()
|
||||
|
||||
func (t *freezerTable) checkIndex(size int64) (int64, error) {
|
||||
// Move the read cursor to the beginning of the file
|
||||
_, err = t.index.Seek(0, io.SeekStart)
|
||||
_, err := t.index.Seek(0, io.SeekStart)
|
||||
if err != nil {
|
||||
return err
|
||||
return 0, err
|
||||
}
|
||||
fr := bufio.NewReader(t.index)
|
||||
|
||||
|
@ -425,21 +492,21 @@ func (t *freezerTable) repairIndex() error {
|
|||
entry.unmarshalBinary(buff)
|
||||
return entry, nil
|
||||
}
|
||||
truncate = func(offset int64) error {
|
||||
truncate = func(offset int64) (int64, error) {
|
||||
if t.readonly {
|
||||
return fmt.Errorf("index file is corrupted at %d, size: %d", offset, size)
|
||||
return 0, fmt.Errorf("index file is corrupted at %d, size: %d", offset, size)
|
||||
}
|
||||
if err := truncateFreezerFile(t.index, offset); err != nil {
|
||||
return err
|
||||
return 0, err
|
||||
}
|
||||
log.Warn("Truncated index file", "offset", offset, "truncated", size-offset)
|
||||
return nil
|
||||
return offset, nil
|
||||
}
|
||||
)
|
||||
for offset := int64(0); offset < size; offset += indexEntrySize {
|
||||
entry, err := read()
|
||||
if err != nil {
|
||||
return err
|
||||
return 0, err
|
||||
}
|
||||
if offset == 0 {
|
||||
head = entry
|
||||
|
@ -468,10 +535,10 @@ func (t *freezerTable) repairIndex() error {
|
|||
// the seek operation anyway as a precaution.
|
||||
_, err = t.index.Seek(0, io.SeekEnd)
|
||||
if err != nil {
|
||||
return err
|
||||
return 0, err
|
||||
}
|
||||
log.Debug("Verified index file", "items", size/indexEntrySize, "elapsed", common.PrettyDuration(time.Since(start)))
|
||||
return nil
|
||||
return size, nil
|
||||
}
|
||||
|
||||
// checkIndexItems validates the correctness of two consecutive index items based
|
||||
|
@ -550,12 +617,23 @@ func (t *freezerTable) truncateHead(items uint64) error {
|
|||
// Truncate the index file first, the tail position is also considered
|
||||
// when calculating the new freezer table length.
|
||||
length := items - t.itemOffset.Load()
|
||||
if err := truncateFreezerFile(t.index, int64(length+1)*indexEntrySize); err != nil {
|
||||
newOffset := (length + 1) * indexEntrySize
|
||||
if err := truncateFreezerFile(t.index, int64(newOffset)); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := t.index.Sync(); err != nil {
|
||||
return err
|
||||
}
|
||||
// If the index file is truncated beyond the flush offset, move the flush
|
||||
// offset back to the new end of the file. A crash may occur before the
|
||||
// offset is updated, leaving a dangling reference that points to a position
|
||||
// outside the file. If so, the offset will be reset to the new end of the
|
||||
// file during the next run.
|
||||
if t.metadata.flushOffset > int64(newOffset) {
|
||||
if err := t.metadata.setFlushOffset(int64(newOffset), true); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
// Calculate the new expected size of the data file and truncate it
|
||||
var expected indexEntry
|
||||
if length == 0 {
|
||||
|
@ -652,7 +730,10 @@ func (t *freezerTable) truncateTail(items uint64) error {
|
|||
}
|
||||
// Update the virtual tail marker and hidden these entries in table.
|
||||
t.itemHidden.Store(items)
|
||||
if err := writeMetadata(t.meta, newMetadata(items)); err != nil {
|
||||
|
||||
// Update the virtual tail without fsync, otherwise it will significantly
|
||||
// impact the overall performance.
|
||||
if err := t.metadata.setVirtualTail(items, false); err != nil {
|
||||
return err
|
||||
}
|
||||
// Hidden items still fall in the current tail file, no data file
|
||||
|
@ -664,6 +745,18 @@ func (t *freezerTable) truncateTail(items uint64) error {
|
|||
if t.tailId > newTailId {
|
||||
return fmt.Errorf("invalid index, tail-file %d, item-file %d", t.tailId, newTailId)
|
||||
}
|
||||
// Sync the table before performing the index tail truncation. A crash may
|
||||
// occur after truncating the index file without updating the flush offset,
|
||||
// leaving a dangling offset that points to a position outside the file.
|
||||
// The offset will be rewound to the end of file during the next run
|
||||
// automatically and implicitly assumes all the items within the file are
|
||||
// complete.
|
||||
//
|
||||
// Therefore, forcibly flush everything above the offset to ensure this
|
||||
// assumption is satisfied!
|
||||
if err := t.doSync(); err != nil {
|
||||
return err
|
||||
}
|
||||
// Count how many items can be deleted from the file.
|
||||
var (
|
||||
newDeleted = items
|
||||
|
@ -681,11 +774,6 @@ func (t *freezerTable) truncateTail(items uint64) error {
|
|||
}
|
||||
newDeleted = current
|
||||
}
|
||||
// Commit the changes of metadata file first before manipulating
|
||||
// the indexes file.
|
||||
if err := t.meta.Sync(); err != nil {
|
||||
return err
|
||||
}
|
||||
// Close the index file before shorten it.
|
||||
if err := t.index.Close(); err != nil {
|
||||
return err
|
||||
|
@ -716,6 +804,21 @@ func (t *freezerTable) truncateTail(items uint64) error {
|
|||
t.itemOffset.Store(newDeleted)
|
||||
t.releaseFilesBefore(t.tailId, true)
|
||||
|
||||
// Move the index flush offset backward due to the deletion of an index segment.
|
||||
// A crash may occur before the offset is updated, leaving a dangling reference
|
||||
// that points to a position outside the file. If so, the offset will be reset
|
||||
// to the new end of the file during the next run.
|
||||
//
|
||||
// Note, both the index and head data file has been persisted before performing
|
||||
// tail truncation and all the items in these files are regarded as complete.
|
||||
shorten := indexEntrySize * int64(newDeleted-deleted)
|
||||
if t.metadata.flushOffset <= shorten {
|
||||
return fmt.Errorf("invalid index flush offset: %d, shorten: %d", t.metadata.flushOffset, shorten)
|
||||
} else {
|
||||
if err := t.metadata.setFlushOffset(t.metadata.flushOffset-shorten, true); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
// Retrieve the new size and update the total size counter
|
||||
newSize, err := t.sizeNolock()
|
||||
if err != nil {
|
||||
|
@ -725,40 +828,30 @@ func (t *freezerTable) truncateTail(items uint64) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Close closes all opened files.
|
||||
// Close closes all opened files and finalizes the freezer table for use.
|
||||
// This operation must be completed before shutdown to prevent the loss of
|
||||
// recent writes.
|
||||
func (t *freezerTable) Close() error {
|
||||
t.lock.Lock()
|
||||
defer t.lock.Unlock()
|
||||
|
||||
if err := t.doSync(); err != nil {
|
||||
return err
|
||||
}
|
||||
var errs []error
|
||||
doClose := func(f *os.File, sync bool, close bool) {
|
||||
if sync && !t.readonly {
|
||||
if err := f.Sync(); err != nil {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
}
|
||||
if close {
|
||||
if err := f.Close(); err != nil {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
doClose := func(f *os.File) {
|
||||
if err := f.Close(); err != nil {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
}
|
||||
// Trying to fsync a file opened in rdonly causes "Access denied"
|
||||
// error on Windows.
|
||||
doClose(t.index, true, true)
|
||||
doClose(t.meta, true, true)
|
||||
|
||||
// The preopened non-head data-files are all opened in readonly.
|
||||
// The head is opened in rw-mode, so we sync it here - but since it's also
|
||||
// part of t.files, it will be closed in the loop below.
|
||||
doClose(t.head, true, false) // sync but do not close
|
||||
|
||||
doClose(t.index)
|
||||
doClose(t.metadata.file)
|
||||
for _, f := range t.files {
|
||||
doClose(f, false, true) // close but do not sync
|
||||
doClose(f)
|
||||
}
|
||||
t.index = nil
|
||||
t.meta = nil
|
||||
t.head = nil
|
||||
t.metadata.file = nil
|
||||
|
||||
if errs != nil {
|
||||
return fmt.Errorf("%v", errs)
|
||||
|
@ -917,7 +1010,7 @@ func (t *freezerTable) retrieveItems(start, count, maxBytes uint64) ([]byte, []i
|
|||
defer t.lock.RUnlock()
|
||||
|
||||
// Ensure the table and the item are accessible
|
||||
if t.index == nil || t.head == nil || t.meta == nil {
|
||||
if t.index == nil || t.head == nil || t.metadata.file == nil {
|
||||
return nil, nil, errClosed
|
||||
}
|
||||
var (
|
||||
|
@ -1042,6 +1135,9 @@ func (t *freezerTable) advanceHead() error {
|
|||
t.lock.Lock()
|
||||
defer t.lock.Unlock()
|
||||
|
||||
if err := t.doSync(); err != nil {
|
||||
return err
|
||||
}
|
||||
// We open the next file in truncated mode -- if this file already
|
||||
// exists, we need to start over from scratch on it.
|
||||
nextID := t.headId + 1
|
||||
|
@ -1069,7 +1165,18 @@ func (t *freezerTable) advanceHead() error {
|
|||
func (t *freezerTable) Sync() error {
|
||||
t.lock.Lock()
|
||||
defer t.lock.Unlock()
|
||||
if t.index == nil || t.head == nil || t.meta == nil {
|
||||
|
||||
return t.doSync()
|
||||
}
|
||||
|
||||
// doSync is the internal version of Sync which assumes the lock is already held.
|
||||
func (t *freezerTable) doSync() error {
|
||||
// Trying to fsync a file opened in rdonly causes "Access denied"
|
||||
// error on Windows.
|
||||
if t.readonly {
|
||||
return nil
|
||||
}
|
||||
if t.index == nil || t.head == nil || t.metadata.file == nil {
|
||||
return errClosed
|
||||
}
|
||||
var err error
|
||||
|
@ -1078,10 +1185,18 @@ func (t *freezerTable) Sync() error {
|
|||
err = e
|
||||
}
|
||||
}
|
||||
|
||||
trackError(t.index.Sync())
|
||||
trackError(t.meta.Sync())
|
||||
trackError(t.head.Sync())
|
||||
|
||||
// A crash may occur before the offset is updated, leaving the offset
|
||||
// points to a old position. If so, the extra items above the offset
|
||||
// will be truncated during the next run.
|
||||
stat, err := t.index.Stat()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
offset := stat.Size()
|
||||
trackError(t.metadata.setFlushOffset(offset, true))
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -1097,13 +1212,8 @@ func (t *freezerTable) dumpIndexString(start, stop int64) string {
|
|||
}
|
||||
|
||||
func (t *freezerTable) dumpIndex(w io.Writer, start, stop int64) {
|
||||
meta, err := readMetadata(t.meta)
|
||||
if err != nil {
|
||||
fmt.Fprintf(w, "Failed to decode freezer table %v\n", err)
|
||||
return
|
||||
}
|
||||
fmt.Fprintf(w, "Version %d count %d, deleted %d, hidden %d\n", meta.Version,
|
||||
t.items.Load(), t.itemOffset.Load(), t.itemHidden.Load())
|
||||
fmt.Fprintf(w, "Version %d count %d, deleted %d, hidden %d\n",
|
||||
t.metadata.version, t.items.Load(), t.itemOffset.Load(), t.itemHidden.Load())
|
||||
|
||||
buf := make([]byte, indexEntrySize)
|
||||
|
||||
|
|
|
@ -262,18 +262,6 @@ func TestSnappyDetection(t *testing.T) {
|
|||
f.Close()
|
||||
}
|
||||
|
||||
// Open without snappy
|
||||
{
|
||||
f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, false, false)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if _, err = f.Retrieve(0); err == nil {
|
||||
f.Close()
|
||||
t.Fatalf("expected empty table")
|
||||
}
|
||||
}
|
||||
|
||||
// Open with snappy
|
||||
{
|
||||
f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true, false)
|
||||
|
@ -286,6 +274,18 @@ func TestSnappyDetection(t *testing.T) {
|
|||
t.Fatalf("expected no error, got %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Open without snappy
|
||||
{
|
||||
f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, false, false)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if _, err = f.Retrieve(0); err == nil {
|
||||
f.Close()
|
||||
t.Fatalf("expected empty table")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func assertFileSize(f string, size int64) error {
|
||||
|
@ -521,93 +521,53 @@ func TestFreezerOffset(t *testing.T) {
|
|||
fname := fmt.Sprintf("offset-%d", rand.Uint64())
|
||||
|
||||
// Fill table
|
||||
{
|
||||
f, err := newTable(os.TempDir(), fname, rm, wm, sg, 40, true, false)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Write 6 x 20 bytes, splitting out into three files
|
||||
batch := f.newBatch()
|
||||
require.NoError(t, batch.AppendRaw(0, getChunk(20, 0xFF)))
|
||||
require.NoError(t, batch.AppendRaw(1, getChunk(20, 0xEE)))
|
||||
|
||||
require.NoError(t, batch.AppendRaw(2, getChunk(20, 0xdd)))
|
||||
require.NoError(t, batch.AppendRaw(3, getChunk(20, 0xcc)))
|
||||
|
||||
require.NoError(t, batch.AppendRaw(4, getChunk(20, 0xbb)))
|
||||
require.NoError(t, batch.AppendRaw(5, getChunk(20, 0xaa)))
|
||||
require.NoError(t, batch.commit())
|
||||
|
||||
t.Log(f.dumpIndexString(0, 100))
|
||||
f.Close()
|
||||
f, err := newTable(os.TempDir(), fname, rm, wm, sg, 40, true, false)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Write 6 x 20 bytes, splitting out into three files
|
||||
batch := f.newBatch()
|
||||
require.NoError(t, batch.AppendRaw(0, getChunk(20, 0xFF)))
|
||||
require.NoError(t, batch.AppendRaw(1, getChunk(20, 0xEE)))
|
||||
|
||||
require.NoError(t, batch.AppendRaw(2, getChunk(20, 0xdd)))
|
||||
require.NoError(t, batch.AppendRaw(3, getChunk(20, 0xcc)))
|
||||
|
||||
require.NoError(t, batch.AppendRaw(4, getChunk(20, 0xbb)))
|
||||
require.NoError(t, batch.AppendRaw(5, getChunk(20, 0xaa)))
|
||||
require.NoError(t, batch.commit())
|
||||
|
||||
t.Log(f.dumpIndexString(0, 100))
|
||||
|
||||
// Now crop it.
|
||||
{
|
||||
// delete files 0 and 1
|
||||
for i := 0; i < 2; i++ {
|
||||
p := filepath.Join(os.TempDir(), fmt.Sprintf("%v.%04d.rdat", fname, i))
|
||||
if err := os.Remove(p); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
// Read the index file
|
||||
p := filepath.Join(os.TempDir(), fmt.Sprintf("%v.ridx", fname))
|
||||
indexFile, err := os.OpenFile(p, os.O_RDWR, 0644)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
indexBuf := make([]byte, 7*indexEntrySize)
|
||||
indexFile.Read(indexBuf)
|
||||
|
||||
// Update the index file, so that we store
|
||||
// [ file = 2, offset = 4 ] at index zero
|
||||
|
||||
zeroIndex := indexEntry{
|
||||
filenum: uint32(2), // First file is 2
|
||||
offset: uint32(4), // We have removed four items
|
||||
}
|
||||
buf := zeroIndex.append(nil)
|
||||
|
||||
// Overwrite index zero
|
||||
copy(indexBuf, buf)
|
||||
|
||||
// Remove the four next indices by overwriting
|
||||
copy(indexBuf[indexEntrySize:], indexBuf[indexEntrySize*5:])
|
||||
indexFile.WriteAt(indexBuf, 0)
|
||||
|
||||
// Need to truncate the moved index items
|
||||
indexFile.Truncate(indexEntrySize * (1 + 2))
|
||||
indexFile.Close()
|
||||
}
|
||||
f.truncateTail(4)
|
||||
f.Close()
|
||||
|
||||
// Now open again
|
||||
{
|
||||
f, err := newTable(os.TempDir(), fname, rm, wm, sg, 40, true, false)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer f.Close()
|
||||
t.Log(f.dumpIndexString(0, 100))
|
||||
|
||||
// It should allow writing item 6.
|
||||
batch := f.newBatch()
|
||||
require.NoError(t, batch.AppendRaw(6, getChunk(20, 0x99)))
|
||||
require.NoError(t, batch.commit())
|
||||
|
||||
checkRetrieveError(t, f, map[uint64]error{
|
||||
0: errOutOfBounds,
|
||||
1: errOutOfBounds,
|
||||
2: errOutOfBounds,
|
||||
3: errOutOfBounds,
|
||||
})
|
||||
checkRetrieve(t, f, map[uint64][]byte{
|
||||
4: getChunk(20, 0xbb),
|
||||
5: getChunk(20, 0xaa),
|
||||
6: getChunk(20, 0x99),
|
||||
})
|
||||
f, err = newTable(os.TempDir(), fname, rm, wm, sg, 40, true, false)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Log(f.dumpIndexString(0, 100))
|
||||
|
||||
// It should allow writing item 6.
|
||||
batch = f.newBatch()
|
||||
require.NoError(t, batch.AppendRaw(6, getChunk(20, 0x99)))
|
||||
require.NoError(t, batch.commit())
|
||||
|
||||
checkRetrieveError(t, f, map[uint64]error{
|
||||
0: errOutOfBounds,
|
||||
1: errOutOfBounds,
|
||||
2: errOutOfBounds,
|
||||
3: errOutOfBounds,
|
||||
})
|
||||
checkRetrieve(t, f, map[uint64][]byte{
|
||||
4: getChunk(20, 0xbb),
|
||||
5: getChunk(20, 0xaa),
|
||||
6: getChunk(20, 0x99),
|
||||
})
|
||||
f.Close()
|
||||
|
||||
// Edit the index again, with a much larger initial offset of 1M.
|
||||
{
|
||||
|
@ -1369,45 +1329,63 @@ func TestRandom(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestIndexValidation(t *testing.T) {
|
||||
const (
|
||||
items = 30
|
||||
dataSize = 10
|
||||
)
|
||||
const dataSize = 10
|
||||
|
||||
garbage := indexEntry{
|
||||
filenum: 100,
|
||||
offset: 200,
|
||||
}
|
||||
var cases = []struct {
|
||||
offset int64
|
||||
data []byte
|
||||
expItems int
|
||||
write int
|
||||
offset int64
|
||||
data []byte
|
||||
expItems int
|
||||
hasCorruption bool
|
||||
}{
|
||||
// extend index file with zero bytes at the end
|
||||
{
|
||||
offset: (items + 1) * indexEntrySize,
|
||||
write: 5,
|
||||
offset: (5 + 1) * indexEntrySize,
|
||||
data: make([]byte, indexEntrySize),
|
||||
expItems: 30,
|
||||
expItems: 5,
|
||||
},
|
||||
// extend index file with unaligned zero bytes at the end
|
||||
{
|
||||
write: 5,
|
||||
offset: (5 + 1) * indexEntrySize,
|
||||
data: make([]byte, indexEntrySize*1.5),
|
||||
expItems: 5,
|
||||
},
|
||||
// write garbage in the first non-head item
|
||||
{
|
||||
write: 5,
|
||||
offset: indexEntrySize,
|
||||
data: garbage.append(nil),
|
||||
expItems: 0,
|
||||
},
|
||||
// write garbage in the first non-head item
|
||||
// write garbage in the middle
|
||||
{
|
||||
offset: (items/2 + 1) * indexEntrySize,
|
||||
write: 5,
|
||||
offset: 3 * indexEntrySize,
|
||||
data: garbage.append(nil),
|
||||
expItems: items / 2,
|
||||
expItems: 2,
|
||||
},
|
||||
// fulfill the first data file (but not yet advanced), the zero bytes
|
||||
// at tail should be truncated.
|
||||
{
|
||||
write: 10,
|
||||
offset: 11 * indexEntrySize,
|
||||
data: garbage.append(nil),
|
||||
expItems: 10,
|
||||
},
|
||||
}
|
||||
for _, c := range cases {
|
||||
fn := fmt.Sprintf("t-%d", rand.Uint64())
|
||||
f, err := newTable(os.TempDir(), fn, metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge(), 100, true, false)
|
||||
f, err := newTable(os.TempDir(), fn, metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge(), 10*dataSize, true, false)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
writeChunks(t, f, items, dataSize)
|
||||
writeChunks(t, f, c.write, dataSize)
|
||||
|
||||
// write corrupted data
|
||||
f.index.WriteAt(c.data, c.offset)
|
||||
|
@ -1421,10 +1399,10 @@ func TestIndexValidation(t *testing.T) {
|
|||
for i := 0; i < c.expItems; i++ {
|
||||
exp := getChunk(10, i)
|
||||
got, err := f.Retrieve(uint64(i))
|
||||
if err != nil {
|
||||
if err != nil && !c.hasCorruption {
|
||||
t.Fatalf("Failed to read from table, %v", err)
|
||||
}
|
||||
if !bytes.Equal(exp, got) {
|
||||
if !bytes.Equal(exp, got) && !c.hasCorruption {
|
||||
t.Fatalf("Unexpected item data, want: %v, got: %v", exp, got)
|
||||
}
|
||||
}
|
||||
|
@ -1433,3 +1411,163 @@ func TestIndexValidation(t *testing.T) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestFlushOffsetTracking tests the flush offset tracking. The offset moving
|
||||
// in the test is mostly triggered by the advanceHead (new data file) and
|
||||
// heda/tail truncation.
|
||||
func TestFlushOffsetTracking(t *testing.T) {
|
||||
const (
|
||||
items = 35
|
||||
dataSize = 10
|
||||
fileSize = 100
|
||||
)
|
||||
fn := fmt.Sprintf("t-%d", rand.Uint64())
|
||||
f, err := newTable(os.TempDir(), fn, metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge(), fileSize, true, false)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// Data files:
|
||||
// F1(10 items) -> F2(10 items) -> F3(10 items) -> F4(5 items, non-full)
|
||||
writeChunks(t, f, items, dataSize)
|
||||
|
||||
var cases = []struct {
|
||||
op func(*freezerTable)
|
||||
offset int64
|
||||
}{
|
||||
{
|
||||
// Data files:
|
||||
// F1(10 items) -> F2(10 items) -> F3(10 items) -> F4(5 items, non-full)
|
||||
func(f *freezerTable) {}, // no-op
|
||||
31 * indexEntrySize,
|
||||
},
|
||||
{
|
||||
// Write more items to fulfill the newest data file, but the file advance
|
||||
// is not triggered.
|
||||
|
||||
// Data files:
|
||||
// F1(10 items) -> F2(10 items) -> F3(10 items) -> F4(10 items, full)
|
||||
func(f *freezerTable) {
|
||||
batch := f.newBatch()
|
||||
for i := 0; i < 5; i++ {
|
||||
batch.AppendRaw(items+uint64(i), make([]byte, dataSize))
|
||||
}
|
||||
batch.commit()
|
||||
},
|
||||
31 * indexEntrySize,
|
||||
},
|
||||
{
|
||||
// Write more items to trigger the data file advance
|
||||
|
||||
// Data files:
|
||||
// F1(10 items) -> F2(10 items) -> F3(10 items) -> F4(10 items) -> F5(1 item)
|
||||
func(f *freezerTable) {
|
||||
batch := f.newBatch()
|
||||
batch.AppendRaw(items+5, make([]byte, dataSize))
|
||||
batch.commit()
|
||||
},
|
||||
41 * indexEntrySize,
|
||||
},
|
||||
{
|
||||
// Head truncate
|
||||
|
||||
// Data files:
|
||||
// F1(10 items) -> F2(10 items) -> F3(10 items) -> F4(10 items) -> F5(0 item)
|
||||
func(f *freezerTable) {
|
||||
f.truncateHead(items + 5)
|
||||
},
|
||||
41 * indexEntrySize,
|
||||
},
|
||||
{
|
||||
// Tail truncate
|
||||
|
||||
// Data files:
|
||||
// F1(1 hidden, 9 visible) -> F2(10 items) -> F3(10 items) -> F4(10 items) -> F5(0 item)
|
||||
func(f *freezerTable) {
|
||||
f.truncateTail(1)
|
||||
},
|
||||
41 * indexEntrySize,
|
||||
},
|
||||
{
|
||||
// Tail truncate
|
||||
|
||||
// Data files:
|
||||
// F2(10 items) -> F3(10 items) -> F4(10 items) -> F5(0 item)
|
||||
func(f *freezerTable) {
|
||||
f.truncateTail(10)
|
||||
},
|
||||
31 * indexEntrySize,
|
||||
},
|
||||
{
|
||||
// Tail truncate
|
||||
|
||||
// Data files:
|
||||
// F4(10 items) -> F5(0 item)
|
||||
func(f *freezerTable) {
|
||||
f.truncateTail(30)
|
||||
},
|
||||
11 * indexEntrySize,
|
||||
},
|
||||
{
|
||||
// Head truncate
|
||||
|
||||
// Data files:
|
||||
// F4(9 items)
|
||||
func(f *freezerTable) {
|
||||
f.truncateHead(items + 4)
|
||||
},
|
||||
10 * indexEntrySize,
|
||||
},
|
||||
}
|
||||
for _, c := range cases {
|
||||
c.op(f)
|
||||
if f.metadata.flushOffset != c.offset {
|
||||
t.Fatalf("Unexpected index flush offset, want: %d, got: %d", c.offset, f.metadata.flushOffset)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestTailTruncationCrash(t *testing.T) {
|
||||
const (
|
||||
items = 35
|
||||
dataSize = 10
|
||||
fileSize = 100
|
||||
)
|
||||
fn := fmt.Sprintf("t-%d", rand.Uint64())
|
||||
f, err := newTable(os.TempDir(), fn, metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge(), fileSize, true, false)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// Data files:
|
||||
// F1(10 items) -> F2(10 items) -> F3(10 items) -> F4(5 items, non-full)
|
||||
writeChunks(t, f, items, dataSize)
|
||||
|
||||
// The latest 5 items are not persisted yet
|
||||
if f.metadata.flushOffset != 31*indexEntrySize {
|
||||
t.Fatalf("Unexpected index flush offset, want: %d, got: %d", 31*indexEntrySize, f.metadata.flushOffset)
|
||||
}
|
||||
|
||||
f.truncateTail(5)
|
||||
if f.metadata.flushOffset != 31*indexEntrySize {
|
||||
t.Fatalf("Unexpected index flush offset, want: %d, got: %d", 31*indexEntrySize, f.metadata.flushOffset)
|
||||
}
|
||||
|
||||
// Truncate the first 10 items which results in the first data file
|
||||
// being removed. The offset should be moved to 26*indexEntrySize.
|
||||
f.truncateTail(10)
|
||||
if f.metadata.flushOffset != 26*indexEntrySize {
|
||||
t.Fatalf("Unexpected index flush offset, want: %d, got: %d", 26*indexEntrySize, f.metadata.flushOffset)
|
||||
}
|
||||
|
||||
// Write the offset back to 31*indexEntrySize to simulate a crash
|
||||
// which occurs after truncating the index file without updating
|
||||
// the offset
|
||||
f.metadata.setFlushOffset(31*indexEntrySize, true)
|
||||
|
||||
f, err = newTable(os.TempDir(), fn, metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge(), fileSize, true, false)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if f.metadata.flushOffset != 26*indexEntrySize {
|
||||
t.Fatalf("Unexpected index flush offset, want: %d, got: %d", 26*indexEntrySize, f.metadata.flushOffset)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue