This commit is contained in:
rjl493456442 2024-11-25 09:58:41 +07:00 committed by GitHub
commit 716fa3c79d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 654 additions and 260 deletions

View File

@ -113,21 +113,28 @@ const (
// * the `BlockNumber`, `TxHash`, `TxIndex`, `BlockHash` and `Index` fields of log are deleted
// * the `Bloom` field of receipt is deleted
// * the `BlockIndex` and `TxIndex` fields of txlookup are deleted
//
// - Version 5
// The following incompatible database changes were added:
// * the `TxHash`, `GasCost`, and `ContractAddress` fields are no longer stored for a receipt
// * the `TxHash`, `GasCost`, and `ContractAddress` fields are computed by looking up the
// receipts' corresponding block
//
// - Version 6
// The following incompatible database changes were added:
// * Transaction lookup information stores the corresponding block number instead of block hash
//
// - Version 7
// The following incompatible database changes were added:
// * Use freezer as the ancient database to maintain all ancient data
//
// - Version 8
// The following incompatible database changes were added:
// * New scheme for contract code in order to separate the codes and trie nodes
BlockChainVersion uint64 = 8
//
// - Version 9
// * The metadata structure of freezer is changed by adding 'flushOffset'
BlockChainVersion uint64 = 9
)
// CacheConfig contains the configuration values for the trie database

View File

@ -883,6 +883,7 @@ func TestHeadersRLPStorage(t *testing.T) {
t.Fatalf("failed to create database with ancient backend")
}
defer db.Close()
// Create blocks
var chain []*types.Block
var pHash common.Hash
@ -898,7 +899,7 @@ func TestHeadersRLPStorage(t *testing.T) {
chain = append(chain, block)
pHash = block.Hash()
}
var receipts []types.Receipts = make([]types.Receipts, 100)
receipts := make([]types.Receipts, 100)
// Write first half to ancients
WriteAncientBlocks(db, chain[:50], receipts[:50], big.NewInt(100))
// Write second half to db

View File

@ -62,6 +62,7 @@ const (
stateHistoryStorageData = "storage.data"
)
// stateFreezerNoSnappy configures whether compression is disabled for the state freezer.
var stateFreezerNoSnappy = map[string]bool{
stateHistoryMeta: true,
stateHistoryAccountIndex: false,

View File

@ -19,6 +19,7 @@ package rawdb
import (
"fmt"
"math"
"time"
"github.com/ethereum/go-ethereum/rlp"
"github.com/golang/snappy"
@ -188,9 +189,6 @@ func (batch *freezerTableBatch) commit() error {
if err != nil {
return err
}
if err := batch.t.head.Sync(); err != nil {
return err
}
dataSize := int64(len(batch.dataBuffer))
batch.dataBuffer = batch.dataBuffer[:0]
@ -208,6 +206,12 @@ func (batch *freezerTableBatch) commit() error {
// Update metrics.
batch.t.sizeGauge.Inc(dataSize + indexSize)
batch.t.writeMeter.Mark(dataSize + indexSize)
// Periodically sync the table, todo (rjl493456442) make it configurable?
if time.Since(batch.t.lastSync) > 30*time.Second {
batch.t.lastSync = time.Now()
return batch.t.syncWithNoLock()
}
return nil
}

View File

@ -17,93 +17,166 @@
package rawdb
import (
"errors"
"io"
"os"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
)
const freezerVersion = 1 // The initial version tag of freezer table metadata
const (
freezerTableV1 = 1 // Initial version of metadata struct
freezerTableV2 = 2 // Add field: 'flushOffset'
)
// freezerTableMeta wraps all the metadata of the freezer table.
// freezerTableMeta is a collection of additional properties that describe the
// freezer table. These properties are designed with error resilience, allowing
// them to be automatically corrected after an error occurs without significantly
// impacting overall correctness.
type freezerTableMeta struct {
// Version is the versioning descriptor of the freezer table.
Version uint16
file *os.File // file handler of metadata
version uint16 // version descriptor of the freezer table
// VirtualTail indicates how many items have been marked as deleted.
// Its value is equal to the number of items removed from the table
// plus the number of items hidden in the table, so it should never
// be lower than the "actual tail".
VirtualTail uint64
// virtualTail represents the number of items marked as deleted. It is
// calculated as the sum of items removed from the table and the items
// hidden within the table, and should never be less than the "actual
// tail".
//
// If lost due to a crash or other reasons, it will be reset to the number
// of items deleted from the table, causing the previously hidden items
// to become visible, which is an acceptable consequence.
virtualTail uint64
// flushOffset represents the offset in the index file up to which the index
// items along with the corresponding data items in data files has been flushed
// (fsyncd) to disk. Beyond this offset, data integrity is not guaranteed,
// the extra index items along with the associated data items should be removed
// during the startup.
//
// The principle is that all data items above the flush offset are considered
// volatile and should be recoverable if they are discarded after the unclean
// shutdown. If data integrity is required, manually force a sync of the
// freezer before proceeding with further operations (e.g. do freezer.Sync()
// first and then write data to key value store in some circumstances).
//
// The offset could be moved forward by applying sync operation, or be moved
// backward in cases of head/tail truncation, etc.
flushOffset uint64
}
// newMetadata initializes the metadata object with the given virtual tail.
func newMetadata(tail uint64) *freezerTableMeta {
// decodeV1 attempts to decode the metadata structure in v1 format. If fails or
// the result is incompatible, nil is returned.
func decodeV1(file *os.File) *freezerTableMeta {
_, err := file.Seek(0, io.SeekStart)
if err != nil {
return nil
}
type obj struct {
Version uint16
Tail uint64
}
var o obj
if err := rlp.Decode(file, &o); err != nil {
return nil
}
if o.Version != freezerTableV1 {
return nil
}
return &freezerTableMeta{
Version: freezerVersion,
VirtualTail: tail,
file: file,
version: o.Version,
virtualTail: o.Tail,
}
}
// readMetadata reads the metadata of the freezer table from the
// given metadata file.
func readMetadata(file *os.File) (*freezerTableMeta, error) {
// decodeV2 attempts to decode the metadata structure in v2 format. If fails or
// the result is incompatible, nil is returned.
func decodeV2(file *os.File) *freezerTableMeta {
_, err := file.Seek(0, io.SeekStart)
if err != nil {
return nil, err
return nil
}
var meta freezerTableMeta
if err := rlp.Decode(file, &meta); err != nil {
return nil, err
type obj struct {
Version uint16
Tail uint64
Offset uint64
}
var o obj
if err := rlp.Decode(file, &o); err != nil {
return nil
}
if o.Version != freezerTableV2 {
return nil
}
return &freezerTableMeta{
file: file,
version: freezerTableV2,
virtualTail: o.Tail,
flushOffset: o.Offset,
}
return &meta, nil
}
// writeMetadata writes the metadata of the freezer table into the
// given metadata file.
func writeMetadata(file *os.File, meta *freezerTableMeta) error {
_, err := file.Seek(0, io.SeekStart)
if err != nil {
return err
}
return rlp.Encode(file, meta)
}
// loadMetadata loads the metadata from the given metadata file.
// Initializes the metadata file with the given "actual tail" if
// it's empty.
func loadMetadata(file *os.File, tail uint64) (*freezerTableMeta, error) {
// newMetadata initializes the metadata object, either by loading it from the file
// or by constructing a new one from scratch.
func newMetadata(file *os.File) (*freezerTableMeta, error) {
stat, err := file.Stat()
if err != nil {
return nil, err
}
// Write the metadata with the given actual tail into metadata file
// if it's non-existent. There are two possible scenarios here:
// - the freezer table is empty
// - the freezer table is legacy
// In both cases, write the meta into the file with the actual tail
// as the virtual tail.
if stat.Size() == 0 {
m := newMetadata(tail)
if err := writeMetadata(file, m); err != nil {
m := &freezerTableMeta{
file: file,
version: freezerTableV2,
virtualTail: 0,
flushOffset: 0,
}
if err := m.write(true); err != nil {
return nil, err
}
return m, nil
}
m, err := readMetadata(file)
if err != nil {
return nil, err
if m := decodeV2(file); m != nil {
return m, nil
}
// Update the virtual tail with the given actual tail if it's even
// lower than it. Theoretically it shouldn't happen at all, print
// a warning here.
if m.VirtualTail < tail {
log.Warn("Updated virtual tail", "have", m.VirtualTail, "now", tail)
m.VirtualTail = tail
if err := writeMetadata(file, m); err != nil {
return nil, err
}
if m := decodeV1(file); m != nil {
return m, nil // legacy metadata
}
return m, nil
return nil, errors.New("failed to decode metadata")
}
// setVirtualTail sets the virtual tail and flushes the metadata if sync is true.
func (m *freezerTableMeta) setVirtualTail(tail uint64, sync bool) error {
m.virtualTail = tail
return m.write(sync)
}
// setFlushOffset sets the flush offset and flushes the metadata if sync is true.
func (m *freezerTableMeta) setFlushOffset(offset uint64, sync bool) error {
m.flushOffset = offset
return m.write(sync)
}
// write flushes the content of metadata into file and performs a fsync if required.
func (m *freezerTableMeta) write(sync bool) error {
type obj struct {
Version uint16
Tail uint64
Offset uint64
}
var o obj
o.Version = freezerTableV2 // forcibly set it to v2
o.Tail = m.virtualTail
o.Offset = m.flushOffset
_, err := m.file.Seek(0, io.SeekStart)
if err != nil {
return err
}
if err := rlp.Encode(m.file, &o); err != nil {
return err
}
if !sync {
return nil
}
return m.file.Sync()
}

View File

@ -19,6 +19,8 @@ package rawdb
import (
"os"
"testing"
"github.com/ethereum/go-ethereum/rlp"
)
func TestReadWriteFreezerTableMeta(t *testing.T) {
@ -27,36 +29,98 @@ func TestReadWriteFreezerTableMeta(t *testing.T) {
t.Fatalf("Failed to create file %v", err)
}
defer f.Close()
err = writeMetadata(f, newMetadata(100))
meta, err := newMetadata(f)
if err != nil {
t.Fatalf("Failed to write metadata %v", err)
t.Fatalf("Failed to new metadata %v", err)
}
meta, err := readMetadata(f)
meta.setVirtualTail(100, false)
meta, err = newMetadata(f)
if err != nil {
t.Fatalf("Failed to read metadata %v", err)
t.Fatalf("Failed to reload metadata %v", err)
}
if meta.Version != freezerVersion {
if meta.version != freezerTableV2 {
t.Fatalf("Unexpected version field")
}
if meta.VirtualTail != uint64(100) {
if meta.virtualTail != uint64(100) {
t.Fatalf("Unexpected virtual tail field")
}
}
func TestInitializeFreezerTableMeta(t *testing.T) {
func TestUpgradeMetadata(t *testing.T) {
f, err := os.CreateTemp(t.TempDir(), "*")
if err != nil {
t.Fatalf("Failed to create file %v", err)
}
defer f.Close()
meta, err := loadMetadata(f, uint64(100))
// Write legacy metadata into file
type obj struct {
Version uint16
Tail uint64
}
var o obj
o.Version = freezerTableV1
o.Tail = 100
if err := rlp.Encode(f, &o); err != nil {
t.Fatalf("Failed to encode %v", err)
}
// Reload the metadata, a silent upgrade is expected
meta, err := newMetadata(f)
if err != nil {
t.Fatalf("Failed to read metadata %v", err)
}
if meta.Version != freezerVersion {
t.Fatalf("Unexpected version field")
if meta.version != freezerTableV1 {
t.Fatal("Unexpected version field")
}
if meta.VirtualTail != uint64(100) {
t.Fatalf("Unexpected virtual tail field")
if meta.virtualTail != uint64(100) {
t.Fatal("Unexpected virtual tail field")
}
if meta.flushOffset != uint64(0) {
t.Fatal("Unexpected flush offset field")
}
meta.setFlushOffset(100, true)
meta, err = newMetadata(f)
if err != nil {
t.Fatalf("Failed to read metadata %v", err)
}
if meta.version != freezerTableV2 {
t.Fatal("Unexpected version field")
}
if meta.virtualTail != uint64(100) {
t.Fatal("Unexpected virtual tail field")
}
if meta.flushOffset != uint64(100) {
t.Fatal("Unexpected flush offset field")
}
}
func TestInvalidMetadata(t *testing.T) {
f, err := os.CreateTemp(t.TempDir(), "*")
if err != nil {
t.Fatalf("Failed to create file %v", err)
}
defer f.Close()
// Write invalid legacy metadata into file
type obj struct {
Version uint16
Tail uint64
}
var o obj
o.Version = freezerTableV2 // -> invalid version tag
o.Tail = 100
if err := rlp.Encode(f, &o); err != nil {
t.Fatalf("Failed to encode %v", err)
}
_, err = newMetadata(f)
if err == nil {
t.Fatal("Unexpected success")
}
}

View File

@ -108,11 +108,13 @@ type freezerTable struct {
head *os.File // File descriptor for the data head of the table
index *os.File // File descriptor for the indexEntry file of the table
meta *os.File // File descriptor for metadata of the table
files map[uint32]*os.File // open files
headId uint32 // number of the currently active head file
tailId uint32 // number of the earliest file
metadata *freezerTableMeta // metadata of the table
lastSync time.Time // Timestamp when the last sync was performed
headBytes int64 // Number of bytes written to the head file
readMeter metrics.Meter // Meter for measuring the effective amount of data read
writeMeter metrics.Meter // Meter for measuring the effective amount of data written
@ -166,10 +168,17 @@ func newTable(path string, name string, readMeter metrics.Meter, writeMeter metr
return nil, err
}
}
// Load metadata from the file. The tag will be true if legacy metadata
// is detected.
metadata, err := newMetadata(meta)
if err != nil {
return nil, err
}
// Create the table and repair any past inconsistency
tab := &freezerTable{
index: index,
meta: meta,
metadata: metadata,
lastSync: time.Now(),
files: make(map[uint32]*os.File),
readMeter: readMeter,
writeMeter: writeMeter,
@ -221,13 +230,11 @@ func (t *freezerTable) repair() error {
return err
} // New file can't trigger this path
}
// Validate the index file as it might contain some garbage data after the
// power failures.
if err := t.repairIndex(); err != nil {
return err
}
// Retrieve the file sizes and prepare for truncation. Note the file size
// might be changed after index validation.
// might be changed after index repair.
if stat, err = t.index.Stat(); err != nil {
return err
}
@ -253,12 +260,14 @@ func (t *freezerTable) repair() error {
t.tailId = firstIndex.filenum
t.itemOffset.Store(uint64(firstIndex.offset))
// Load metadata from the file
meta, err := loadMetadata(t.meta, t.itemOffset.Load())
if err != nil {
return err
// Adjust the number of hidden items if it is less than the number of items
// being removed.
if t.itemOffset.Load() > t.metadata.virtualTail {
if err := t.metadata.setVirtualTail(t.itemOffset.Load(), true); err != nil {
return err
}
}
t.itemHidden.Store(meta.VirtualTail)
t.itemHidden.Store(t.metadata.virtualTail)
// Read the last index, use the default value in case the freezer is empty
if offsetsSize == indexEntrySize {
@ -267,12 +276,6 @@ func (t *freezerTable) repair() error {
t.index.ReadAt(buffer, offsetsSize-indexEntrySize)
lastIndex.unmarshalBinary(buffer)
}
// Print an error log if the index is corrupted due to an incorrect
// last index item. While it is theoretically possible to have a zero offset
// by storing all zero-size items, it is highly unlikely to occur in practice.
if lastIndex.offset == 0 && offsetsSize/indexEntrySize > 1 {
log.Error("Corrupted index file detected", "lastOffset", lastIndex.offset, "indexes", offsetsSize/indexEntrySize)
}
if t.readonly {
t.head, err = t.openFile(lastIndex.filenum, openFreezerFileForReadOnly)
} else {
@ -293,6 +296,7 @@ func (t *freezerTable) repair() error {
return fmt.Errorf("freezer table(path: %s, name: %s, num: %d) is corrupted", t.path, t.name, lastIndex.filenum)
}
verbose = true
// Truncate the head file to the last offset pointer
if contentExp < contentSize {
t.logger.Warn("Truncating dangling head", "indexed", contentExp, "stored", contentSize)
@ -304,11 +308,23 @@ func (t *freezerTable) repair() error {
// Truncate the index to point within the head file
if contentExp > contentSize {
t.logger.Warn("Truncating dangling indexes", "indexes", offsetsSize/indexEntrySize, "indexed", contentExp, "stored", contentSize)
if err := truncateFreezerFile(t.index, offsetsSize-indexEntrySize); err != nil {
newOffset := offsetsSize - indexEntrySize
if err := truncateFreezerFile(t.index, newOffset); err != nil {
return err
}
offsetsSize -= indexEntrySize
// If the index file is truncated beyond the flush offset, move the flush
// offset back to the new end of the file. A crash may occur before the
// offset is updated, leaving a dangling reference that points to a position
// outside the file. If so, the offset will be reset to the new end of the
// file during the next run.
if t.metadata.flushOffset < uint64(newOffset) {
if err := t.metadata.setFlushOffset(uint64(newOffset), true); err != nil {
return err
}
}
// Read the new head index, use the default value in case
// the freezer is already empty.
var newLastIndex indexEntry
@ -345,7 +361,7 @@ func (t *freezerTable) repair() error {
if err := t.head.Sync(); err != nil {
return err
}
if err := t.meta.Sync(); err != nil {
if err := t.metadata.file.Sync(); err != nil {
return err
}
}
@ -372,7 +388,60 @@ func (t *freezerTable) repair() error {
return nil
}
// repairIndex validates the integrity of the index file. According to the design,
func (t *freezerTable) repairIndex() error {
stat, err := t.index.Stat()
if err != nil {
return err
}
offset := stat.Size()
// Validate the items in the index file to ensure the data integrity.
// It's possible some garbage data is retained in the index file after
// the power failures and should be truncated first.
offset, err = t.checkIndex(offset)
if err != nil {
return err
}
// If legacy metadata is detected, attempt to recover the offset from the
// index file to avoid clearing the entire table.
if t.metadata.version == freezerTableV1 {
t.logger.Info("Recover the flush offset for legacy table", "offset", offset)
return t.metadata.setFlushOffset(uint64(offset), true)
}
// Move the flush offset to the end of the file for fresh new freezer table
if offset == indexEntrySize && t.metadata.flushOffset == 0 {
return t.metadata.setFlushOffset(uint64(offset), true)
}
// Short circuit if the offset is aligned with the index file
if offset == int64(t.metadata.flushOffset) {
return nil
}
// Extra index items have been detected beyond the flush offset. Since these
// entries correspond to data that has not been fully flushed to disk in the
// last run (because of unclean shutdown), their integrity cannot be guaranteed.
// To ensure consistency, these index items will be truncated, as there is no
// reliable way to validate or recover their associated data.
if offset > int64(t.metadata.flushOffset) {
size := offset - int64(t.metadata.flushOffset)
if t.readonly {
return fmt.Errorf("index file(path: %s, name: %s) contains garbage data %d", t.path, t.name, size)
}
t.logger.Info("Truncate excessive items", "size", size)
return truncateFreezerFile(t.index, int64(t.metadata.flushOffset))
}
// Flush offset refers to the position which exceeds the index file. The only
// possible scenario for this is: power failure or system crash occurs after
// truncating the segment in index file from head or tail, but without updating
// the flush offset. In this case, automatically reset the flush offset with
// the file size which implies the entire index file is complete.
if t.readonly {
return nil // do nothing in read only mode
}
t.logger.Info("Rewind the flush offset", "old", t.metadata.flushOffset, "new", offset)
return t.metadata.setFlushOffset(uint64(offset), true)
}
// checkIndex validates the integrity of the index file. According to the design,
// the initial entry in the file denotes the earliest data file along with the
// count of deleted items. Following this, all subsequent entries in the file must
// be in order. This function identifies any corrupted entries and truncates items
@ -392,18 +461,11 @@ func (t *freezerTable) repair() error {
// leftover garbage or if all items in the table have zero size is impossible.
// In such instances, the file will remain unchanged to prevent potential data
// loss or misinterpretation.
func (t *freezerTable) repairIndex() error {
// Retrieve the file sizes and prepare for validation
stat, err := t.index.Stat()
if err != nil {
return err
}
size := stat.Size()
func (t *freezerTable) checkIndex(size int64) (int64, error) {
// Move the read cursor to the beginning of the file
_, err = t.index.Seek(0, io.SeekStart)
_, err := t.index.Seek(0, io.SeekStart)
if err != nil {
return err
return 0, err
}
fr := bufio.NewReader(t.index)
@ -425,21 +487,21 @@ func (t *freezerTable) repairIndex() error {
entry.unmarshalBinary(buff)
return entry, nil
}
truncate = func(offset int64) error {
truncate = func(offset int64) (int64, error) {
if t.readonly {
return fmt.Errorf("index file is corrupted at %d, size: %d", offset, size)
return 0, fmt.Errorf("index file is corrupted at %d, size: %d", offset, size)
}
if err := truncateFreezerFile(t.index, offset); err != nil {
return err
return 0, err
}
log.Warn("Truncated index file", "offset", offset, "truncated", size-offset)
return nil
return offset, nil
}
)
for offset := int64(0); offset < size; offset += indexEntrySize {
entry, err := read()
if err != nil {
return err
return 0, err
}
if offset == 0 {
head = entry
@ -468,10 +530,10 @@ func (t *freezerTable) repairIndex() error {
// the seek operation anyway as a precaution.
_, err = t.index.Seek(0, io.SeekEnd)
if err != nil {
return err
return 0, err
}
log.Debug("Verified index file", "items", size/indexEntrySize, "elapsed", common.PrettyDuration(time.Since(start)))
return nil
return size, nil
}
// checkIndexItems validates the correctness of two consecutive index items based
@ -550,12 +612,23 @@ func (t *freezerTable) truncateHead(items uint64) error {
// Truncate the index file first, the tail position is also considered
// when calculating the new freezer table length.
length := items - t.itemOffset.Load()
if err := truncateFreezerFile(t.index, int64(length+1)*indexEntrySize); err != nil {
newOffset := (length + 1) * indexEntrySize
if err := truncateFreezerFile(t.index, int64(newOffset)); err != nil {
return err
}
if err := t.index.Sync(); err != nil {
return err
}
// If the index file is truncated beyond the flush offset, move the flush
// offset back to the new end of the file. A crash may occur before the
// offset is updated, leaving a dangling reference that points to a position
// outside the file. If so, the offset will be reset to the new end of the
// file during the next run.
if t.metadata.flushOffset > newOffset {
if err := t.metadata.setFlushOffset(newOffset, true); err != nil {
return err
}
}
// Calculate the new expected size of the data file and truncate it
var expected indexEntry
if length == 0 {
@ -652,7 +725,10 @@ func (t *freezerTable) truncateTail(items uint64) error {
}
// Update the virtual tail marker and hidden these entries in table.
t.itemHidden.Store(items)
if err := writeMetadata(t.meta, newMetadata(items)); err != nil {
// Update the virtual tail without fsync, otherwise it will significantly
// impact the overall performance.
if err := t.metadata.setVirtualTail(items, false); err != nil {
return err
}
// Hidden items still fall in the current tail file, no data file
@ -664,6 +740,18 @@ func (t *freezerTable) truncateTail(items uint64) error {
if t.tailId > newTailId {
return fmt.Errorf("invalid index, tail-file %d, item-file %d", t.tailId, newTailId)
}
// Sync the table before performing the index tail truncation. A crash may
// occur after truncating the index file without updating the flush offset,
// leaving a dangling offset that points to a position outside the file.
// The offset will be rewound to the end of file during the next run
// automatically and implicitly assumes all the items within the file are
// complete.
//
// Therefore, forcibly flush everything above the offset to ensure this
// assumption is satisfied!
if err := t.syncWithNoLock(); err != nil {
return err
}
// Count how many items can be deleted from the file.
var (
newDeleted = items
@ -681,11 +769,6 @@ func (t *freezerTable) truncateTail(items uint64) error {
}
newDeleted = current
}
// Commit the changes of metadata file first before manipulating
// the indexes file.
if err := t.meta.Sync(); err != nil {
return err
}
// Close the index file before shorten it.
if err := t.index.Close(); err != nil {
return err
@ -716,6 +799,21 @@ func (t *freezerTable) truncateTail(items uint64) error {
t.itemOffset.Store(newDeleted)
t.releaseFilesBefore(t.tailId, true)
// Move the index flush offset backward due to the deletion of an index segment.
// A crash may occur before the offset is updated, leaving a dangling reference
// that points to a position outside the file. If so, the offset will be reset
// to the new end of the file during the next run.
//
// Note, both the index and head data file has been persisted before performing
// tail truncation and all the items in these files are regarded as complete.
shorten := indexEntrySize * (newDeleted - deleted)
if t.metadata.flushOffset <= shorten {
return fmt.Errorf("invalid index flush offset: %d, shorten: %d", t.metadata.flushOffset, shorten)
} else {
if err := t.metadata.setFlushOffset(t.metadata.flushOffset-shorten, true); err != nil {
return err
}
}
// Retrieve the new size and update the total size counter
newSize, err := t.sizeNolock()
if err != nil {
@ -725,40 +823,30 @@ func (t *freezerTable) truncateTail(items uint64) error {
return nil
}
// Close closes all opened files.
// Close closes all opened files and finalizes the freezer table for use.
// This operation must be completed before shutdown to prevent the loss of
// recent writes.
func (t *freezerTable) Close() error {
t.lock.Lock()
defer t.lock.Unlock()
if err := t.syncWithNoLock(); err != nil {
return err
}
var errs []error
doClose := func(f *os.File, sync bool, close bool) {
if sync && !t.readonly {
if err := f.Sync(); err != nil {
errs = append(errs, err)
}
}
if close {
if err := f.Close(); err != nil {
errs = append(errs, err)
}
doClose := func(f *os.File) {
if err := f.Close(); err != nil {
errs = append(errs, err)
}
}
// Trying to fsync a file opened in rdonly causes "Access denied"
// error on Windows.
doClose(t.index, true, true)
doClose(t.meta, true, true)
// The preopened non-head data-files are all opened in readonly.
// The head is opened in rw-mode, so we sync it here - but since it's also
// part of t.files, it will be closed in the loop below.
doClose(t.head, true, false) // sync but do not close
doClose(t.index)
doClose(t.metadata.file)
for _, f := range t.files {
doClose(f, false, true) // close but do not sync
doClose(f)
}
t.index = nil
t.meta = nil
t.head = nil
t.metadata.file = nil
if errs != nil {
return fmt.Errorf("%v", errs)
@ -917,7 +1005,7 @@ func (t *freezerTable) retrieveItems(start, count, maxBytes uint64) ([]byte, []i
defer t.lock.RUnlock()
// Ensure the table and the item are accessible
if t.index == nil || t.head == nil || t.meta == nil {
if t.index == nil || t.head == nil || t.metadata.file == nil {
return nil, nil, errClosed
}
var (
@ -1042,6 +1130,9 @@ func (t *freezerTable) advanceHead() error {
t.lock.Lock()
defer t.lock.Unlock()
if err := t.syncWithNoLock(); err != nil {
return err
}
// We open the next file in truncated mode -- if this file already
// exists, we need to start over from scratch on it.
nextID := t.headId + 1
@ -1069,7 +1160,19 @@ func (t *freezerTable) advanceHead() error {
func (t *freezerTable) Sync() error {
t.lock.Lock()
defer t.lock.Unlock()
if t.index == nil || t.head == nil || t.meta == nil {
return t.syncWithNoLock()
}
// syncWithNoLock is the internal version of Sync which assumes the lock is
// already held.
func (t *freezerTable) syncWithNoLock() error {
// Trying to fsync a file opened in rdonly causes "Access denied"
// error on Windows.
if t.readonly {
return nil
}
if t.index == nil || t.head == nil || t.metadata.file == nil {
return errClosed
}
var err error
@ -1078,10 +1181,18 @@ func (t *freezerTable) Sync() error {
err = e
}
}
trackError(t.index.Sync())
trackError(t.meta.Sync())
trackError(t.head.Sync())
// A crash may occur before the offset is updated, leaving the offset
// points to a old position. If so, the extra items above the offset
// will be truncated during the next run.
stat, err := t.index.Stat()
if err != nil {
return err
}
offset := stat.Size()
trackError(t.metadata.setFlushOffset(uint64(offset), true))
return err
}
@ -1097,13 +1208,8 @@ func (t *freezerTable) dumpIndexString(start, stop int64) string {
}
func (t *freezerTable) dumpIndex(w io.Writer, start, stop int64) {
meta, err := readMetadata(t.meta)
if err != nil {
fmt.Fprintf(w, "Failed to decode freezer table %v\n", err)
return
}
fmt.Fprintf(w, "Version %d count %d, deleted %d, hidden %d\n", meta.Version,
t.items.Load(), t.itemOffset.Load(), t.itemHidden.Load())
fmt.Fprintf(w, "Version %d count %d, deleted %d, hidden %d\n",
t.metadata.version, t.items.Load(), t.itemOffset.Load(), t.itemHidden.Load())
buf := make([]byte, indexEntrySize)

View File

@ -262,18 +262,6 @@ func TestSnappyDetection(t *testing.T) {
f.Close()
}
// Open without snappy
{
f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, false, false)
if err != nil {
t.Fatal(err)
}
if _, err = f.Retrieve(0); err == nil {
f.Close()
t.Fatalf("expected empty table")
}
}
// Open with snappy
{
f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true, false)
@ -286,6 +274,18 @@ func TestSnappyDetection(t *testing.T) {
t.Fatalf("expected no error, got %v", err)
}
}
// Open without snappy
{
f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, false, false)
if err != nil {
t.Fatal(err)
}
if _, err = f.Retrieve(0); err == nil {
f.Close()
t.Fatalf("expected empty table")
}
}
}
func assertFileSize(f string, size int64) error {
@ -521,93 +521,53 @@ func TestFreezerOffset(t *testing.T) {
fname := fmt.Sprintf("offset-%d", rand.Uint64())
// Fill table
{
f, err := newTable(os.TempDir(), fname, rm, wm, sg, 40, true, false)
if err != nil {
t.Fatal(err)
}
// Write 6 x 20 bytes, splitting out into three files
batch := f.newBatch()
require.NoError(t, batch.AppendRaw(0, getChunk(20, 0xFF)))
require.NoError(t, batch.AppendRaw(1, getChunk(20, 0xEE)))
require.NoError(t, batch.AppendRaw(2, getChunk(20, 0xdd)))
require.NoError(t, batch.AppendRaw(3, getChunk(20, 0xcc)))
require.NoError(t, batch.AppendRaw(4, getChunk(20, 0xbb)))
require.NoError(t, batch.AppendRaw(5, getChunk(20, 0xaa)))
require.NoError(t, batch.commit())
t.Log(f.dumpIndexString(0, 100))
f.Close()
f, err := newTable(os.TempDir(), fname, rm, wm, sg, 40, true, false)
if err != nil {
t.Fatal(err)
}
// Write 6 x 20 bytes, splitting out into three files
batch := f.newBatch()
require.NoError(t, batch.AppendRaw(0, getChunk(20, 0xFF)))
require.NoError(t, batch.AppendRaw(1, getChunk(20, 0xEE)))
require.NoError(t, batch.AppendRaw(2, getChunk(20, 0xdd)))
require.NoError(t, batch.AppendRaw(3, getChunk(20, 0xcc)))
require.NoError(t, batch.AppendRaw(4, getChunk(20, 0xbb)))
require.NoError(t, batch.AppendRaw(5, getChunk(20, 0xaa)))
require.NoError(t, batch.commit())
t.Log(f.dumpIndexString(0, 100))
// Now crop it.
{
// delete files 0 and 1
for i := 0; i < 2; i++ {
p := filepath.Join(os.TempDir(), fmt.Sprintf("%v.%04d.rdat", fname, i))
if err := os.Remove(p); err != nil {
t.Fatal(err)
}
}
// Read the index file
p := filepath.Join(os.TempDir(), fmt.Sprintf("%v.ridx", fname))
indexFile, err := os.OpenFile(p, os.O_RDWR, 0644)
if err != nil {
t.Fatal(err)
}
indexBuf := make([]byte, 7*indexEntrySize)
indexFile.Read(indexBuf)
// Update the index file, so that we store
// [ file = 2, offset = 4 ] at index zero
zeroIndex := indexEntry{
filenum: uint32(2), // First file is 2
offset: uint32(4), // We have removed four items
}
buf := zeroIndex.append(nil)
// Overwrite index zero
copy(indexBuf, buf)
// Remove the four next indices by overwriting
copy(indexBuf[indexEntrySize:], indexBuf[indexEntrySize*5:])
indexFile.WriteAt(indexBuf, 0)
// Need to truncate the moved index items
indexFile.Truncate(indexEntrySize * (1 + 2))
indexFile.Close()
}
f.truncateTail(4)
f.Close()
// Now open again
{
f, err := newTable(os.TempDir(), fname, rm, wm, sg, 40, true, false)
if err != nil {
t.Fatal(err)
}
defer f.Close()
t.Log(f.dumpIndexString(0, 100))
// It should allow writing item 6.
batch := f.newBatch()
require.NoError(t, batch.AppendRaw(6, getChunk(20, 0x99)))
require.NoError(t, batch.commit())
checkRetrieveError(t, f, map[uint64]error{
0: errOutOfBounds,
1: errOutOfBounds,
2: errOutOfBounds,
3: errOutOfBounds,
})
checkRetrieve(t, f, map[uint64][]byte{
4: getChunk(20, 0xbb),
5: getChunk(20, 0xaa),
6: getChunk(20, 0x99),
})
f, err = newTable(os.TempDir(), fname, rm, wm, sg, 40, true, false)
if err != nil {
t.Fatal(err)
}
t.Log(f.dumpIndexString(0, 100))
// It should allow writing item 6.
batch = f.newBatch()
require.NoError(t, batch.AppendRaw(6, getChunk(20, 0x99)))
require.NoError(t, batch.commit())
checkRetrieveError(t, f, map[uint64]error{
0: errOutOfBounds,
1: errOutOfBounds,
2: errOutOfBounds,
3: errOutOfBounds,
})
checkRetrieve(t, f, map[uint64][]byte{
4: getChunk(20, 0xbb),
5: getChunk(20, 0xaa),
6: getChunk(20, 0x99),
})
f.Close()
// Edit the index again, with a much larger initial offset of 1M.
{
@ -1369,45 +1329,63 @@ func TestRandom(t *testing.T) {
}
func TestIndexValidation(t *testing.T) {
const (
items = 30
dataSize = 10
)
const dataSize = 10
garbage := indexEntry{
filenum: 100,
offset: 200,
}
var cases = []struct {
offset int64
data []byte
expItems int
write int
offset int64
data []byte
expItems int
hasCorruption bool
}{
// extend index file with zero bytes at the end
{
offset: (items + 1) * indexEntrySize,
write: 5,
offset: (5 + 1) * indexEntrySize,
data: make([]byte, indexEntrySize),
expItems: 30,
expItems: 5,
},
// extend index file with unaligned zero bytes at the end
{
write: 5,
offset: (5 + 1) * indexEntrySize,
data: make([]byte, indexEntrySize*1.5),
expItems: 5,
},
// write garbage in the first non-head item
{
write: 5,
offset: indexEntrySize,
data: garbage.append(nil),
expItems: 0,
},
// write garbage in the first non-head item
// write garbage in the middle
{
offset: (items/2 + 1) * indexEntrySize,
write: 5,
offset: 3 * indexEntrySize,
data: garbage.append(nil),
expItems: items / 2,
expItems: 2,
},
// fulfill the first data file (but not yet advanced), the zero bytes
// at tail should be truncated.
{
write: 10,
offset: 11 * indexEntrySize,
data: garbage.append(nil),
expItems: 10,
},
}
for _, c := range cases {
fn := fmt.Sprintf("t-%d", rand.Uint64())
f, err := newTable(os.TempDir(), fn, metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge(), 100, true, false)
f, err := newTable(os.TempDir(), fn, metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge(), 10*dataSize, true, false)
if err != nil {
t.Fatal(err)
}
writeChunks(t, f, items, dataSize)
writeChunks(t, f, c.write, dataSize)
// write corrupted data
f.index.WriteAt(c.data, c.offset)
@ -1421,10 +1399,10 @@ func TestIndexValidation(t *testing.T) {
for i := 0; i < c.expItems; i++ {
exp := getChunk(10, i)
got, err := f.Retrieve(uint64(i))
if err != nil {
if err != nil && !c.hasCorruption {
t.Fatalf("Failed to read from table, %v", err)
}
if !bytes.Equal(exp, got) {
if !bytes.Equal(exp, got) && !c.hasCorruption {
t.Fatalf("Unexpected item data, want: %v, got: %v", exp, got)
}
}
@ -1433,3 +1411,163 @@ func TestIndexValidation(t *testing.T) {
}
}
}
// TestFlushOffsetTracking tests the flush offset tracking. The offset moving
// in the test is mostly triggered by the advanceHead (new data file) and
// heda/tail truncation.
func TestFlushOffsetTracking(t *testing.T) {
const (
items = 35
dataSize = 10
fileSize = 100
)
fn := fmt.Sprintf("t-%d", rand.Uint64())
f, err := newTable(os.TempDir(), fn, metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge(), fileSize, true, false)
if err != nil {
t.Fatal(err)
}
// Data files:
// F1(10 items) -> F2(10 items) -> F3(10 items) -> F4(5 items, non-full)
writeChunks(t, f, items, dataSize)
var cases = []struct {
op func(*freezerTable)
offset uint64
}{
{
// Data files:
// F1(10 items) -> F2(10 items) -> F3(10 items) -> F4(5 items, non-full)
func(f *freezerTable) {}, // no-op
31 * indexEntrySize,
},
{
// Write more items to fulfill the newest data file, but the file advance
// is not triggered.
// Data files:
// F1(10 items) -> F2(10 items) -> F3(10 items) -> F4(10 items, full)
func(f *freezerTable) {
batch := f.newBatch()
for i := 0; i < 5; i++ {
batch.AppendRaw(items+uint64(i), make([]byte, dataSize))
}
batch.commit()
},
31 * indexEntrySize,
},
{
// Write more items to trigger the data file advance
// Data files:
// F1(10 items) -> F2(10 items) -> F3(10 items) -> F4(10 items) -> F5(1 item)
func(f *freezerTable) {
batch := f.newBatch()
batch.AppendRaw(items+5, make([]byte, dataSize))
batch.commit()
},
41 * indexEntrySize,
},
{
// Head truncate
// Data files:
// F1(10 items) -> F2(10 items) -> F3(10 items) -> F4(10 items) -> F5(0 item)
func(f *freezerTable) {
f.truncateHead(items + 5)
},
41 * indexEntrySize,
},
{
// Tail truncate
// Data files:
// F1(1 hidden, 9 visible) -> F2(10 items) -> F3(10 items) -> F4(10 items) -> F5(0 item)
func(f *freezerTable) {
f.truncateTail(1)
},
41 * indexEntrySize,
},
{
// Tail truncate
// Data files:
// F2(10 items) -> F3(10 items) -> F4(10 items) -> F5(0 item)
func(f *freezerTable) {
f.truncateTail(10)
},
31 * indexEntrySize,
},
{
// Tail truncate
// Data files:
// F4(10 items) -> F5(0 item)
func(f *freezerTable) {
f.truncateTail(30)
},
11 * indexEntrySize,
},
{
// Head truncate
// Data files:
// F4(9 items)
func(f *freezerTable) {
f.truncateHead(items + 4)
},
10 * indexEntrySize,
},
}
for _, c := range cases {
c.op(f)
if f.metadata.flushOffset != c.offset {
t.Fatalf("Unexpected index flush offset, want: %d, got: %d", c.offset, f.metadata.flushOffset)
}
}
}
func TestTailTruncationCrash(t *testing.T) {
const (
items = 35
dataSize = 10
fileSize = 100
)
fn := fmt.Sprintf("t-%d", rand.Uint64())
f, err := newTable(os.TempDir(), fn, metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge(), fileSize, true, false)
if err != nil {
t.Fatal(err)
}
// Data files:
// F1(10 items) -> F2(10 items) -> F3(10 items) -> F4(5 items, non-full)
writeChunks(t, f, items, dataSize)
// The latest 5 items are not persisted yet
if f.metadata.flushOffset != 31*indexEntrySize {
t.Fatalf("Unexpected index flush offset, want: %d, got: %d", 31*indexEntrySize, f.metadata.flushOffset)
}
f.truncateTail(5)
if f.metadata.flushOffset != 31*indexEntrySize {
t.Fatalf("Unexpected index flush offset, want: %d, got: %d", 31*indexEntrySize, f.metadata.flushOffset)
}
// Truncate the first 10 items which results in the first data file
// being removed. The offset should be moved to 26*indexEntrySize.
f.truncateTail(10)
if f.metadata.flushOffset != 26*indexEntrySize {
t.Fatalf("Unexpected index flush offset, want: %d, got: %d", 26*indexEntrySize, f.metadata.flushOffset)
}
// Write the offset back to 31*indexEntrySize to simulate a crash
// which occurs after truncating the index file without updating
// the offset
f.metadata.setFlushOffset(31*indexEntrySize, true)
f, err = newTable(os.TempDir(), fn, metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge(), fileSize, true, false)
if err != nil {
t.Fatal(err)
}
if f.metadata.flushOffset != 26*indexEntrySize {
t.Fatalf("Unexpected index flush offset, want: %d, got: %d", 26*indexEntrySize, f.metadata.flushOffset)
}
}