core/rawdb: swap tailId and itemOffset for deleted items in freezer (#21220)
* fix(freezer): tailId filenum offset were misplaced * core/rawdb: assume first item in freezer always start from zero
This commit is contained in:
parent
eb9d7d15ec
commit
bcf19bc4be
|
@ -232,8 +232,8 @@ func (t *freezerTable) repair() error {
|
||||||
t.index.ReadAt(buffer, 0)
|
t.index.ReadAt(buffer, 0)
|
||||||
firstIndex.unmarshalBinary(buffer)
|
firstIndex.unmarshalBinary(buffer)
|
||||||
|
|
||||||
t.tailId = firstIndex.offset
|
t.tailId = firstIndex.filenum
|
||||||
t.itemOffset = firstIndex.filenum
|
t.itemOffset = firstIndex.offset
|
||||||
|
|
||||||
t.index.ReadAt(buffer, offsetsSize-indexEntrySize)
|
t.index.ReadAt(buffer, offsetsSize-indexEntrySize)
|
||||||
lastIndex.unmarshalBinary(buffer)
|
lastIndex.unmarshalBinary(buffer)
|
||||||
|
@ -519,16 +519,27 @@ func (t *freezerTable) Append(item uint64, blob []byte) error {
|
||||||
// getBounds returns the indexes for the item
|
// getBounds returns the indexes for the item
|
||||||
// returns start, end, filenumber and error
|
// returns start, end, filenumber and error
|
||||||
func (t *freezerTable) getBounds(item uint64) (uint32, uint32, uint32, error) {
|
func (t *freezerTable) getBounds(item uint64) (uint32, uint32, uint32, error) {
|
||||||
var startIdx, endIdx indexEntry
|
|
||||||
buffer := make([]byte, indexEntrySize)
|
buffer := make([]byte, indexEntrySize)
|
||||||
if _, err := t.index.ReadAt(buffer, int64(item*indexEntrySize)); err != nil {
|
var startIdx, endIdx indexEntry
|
||||||
return 0, 0, 0, err
|
// Read second index
|
||||||
}
|
|
||||||
startIdx.unmarshalBinary(buffer)
|
|
||||||
if _, err := t.index.ReadAt(buffer, int64((item+1)*indexEntrySize)); err != nil {
|
if _, err := t.index.ReadAt(buffer, int64((item+1)*indexEntrySize)); err != nil {
|
||||||
return 0, 0, 0, err
|
return 0, 0, 0, err
|
||||||
}
|
}
|
||||||
endIdx.unmarshalBinary(buffer)
|
endIdx.unmarshalBinary(buffer)
|
||||||
|
// Read first index (unless it's the very first item)
|
||||||
|
if item != 0 {
|
||||||
|
if _, err := t.index.ReadAt(buffer, int64(item*indexEntrySize)); err != nil {
|
||||||
|
return 0, 0, 0, err
|
||||||
|
}
|
||||||
|
startIdx.unmarshalBinary(buffer)
|
||||||
|
} else {
|
||||||
|
// Special case if we're reading the first item in the freezer. We assume that
|
||||||
|
// the first item always start from zero(regarding the deletion, we
|
||||||
|
// only support deletion by files, so that the assumption is held).
|
||||||
|
// This means we can use the first item metadata to carry information about
|
||||||
|
// the 'global' offset, for the deletion-case
|
||||||
|
return 0, endIdx.offset, endIdx.filenum, nil
|
||||||
|
}
|
||||||
if startIdx.filenum != endIdx.filenum {
|
if startIdx.filenum != endIdx.filenum {
|
||||||
// If a piece of data 'crosses' a data-file,
|
// If a piece of data 'crosses' a data-file,
|
||||||
// it's actually in one piece on the second data-file.
|
// it's actually in one piece on the second data-file.
|
||||||
|
|
|
@ -552,8 +552,8 @@ func TestOffset(t *testing.T) {
|
||||||
tailId := uint32(2) // First file is 2
|
tailId := uint32(2) // First file is 2
|
||||||
itemOffset := uint32(4) // We have removed four items
|
itemOffset := uint32(4) // We have removed four items
|
||||||
zeroIndex := indexEntry{
|
zeroIndex := indexEntry{
|
||||||
offset: tailId,
|
filenum: tailId,
|
||||||
filenum: itemOffset,
|
offset: itemOffset,
|
||||||
}
|
}
|
||||||
buf := zeroIndex.marshallBinary()
|
buf := zeroIndex.marshallBinary()
|
||||||
// Overwrite index zero
|
// Overwrite index zero
|
||||||
|
@ -567,39 +567,67 @@ func TestOffset(t *testing.T) {
|
||||||
|
|
||||||
}
|
}
|
||||||
// Now open again
|
// Now open again
|
||||||
{
|
checkPresent := func(numDeleted uint64) {
|
||||||
f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 40, true)
|
f, err := newCustomTable(os.TempDir(), fname, rm, wm, sg, 40, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
f.printIndex()
|
f.printIndex()
|
||||||
// It should allow writing item 6
|
// It should allow writing item 6
|
||||||
f.Append(6, getChunk(20, 0x99))
|
f.Append(numDeleted+2, getChunk(20, 0x99))
|
||||||
|
|
||||||
// It should be fine to fetch 4,5,6
|
// It should be fine to fetch 4,5,6
|
||||||
if got, err := f.Retrieve(4); err != nil {
|
if got, err := f.Retrieve(numDeleted); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
} else if exp := getChunk(20, 0xbb); !bytes.Equal(got, exp) {
|
} else if exp := getChunk(20, 0xbb); !bytes.Equal(got, exp) {
|
||||||
t.Fatalf("expected %x got %x", exp, got)
|
t.Fatalf("expected %x got %x", exp, got)
|
||||||
}
|
}
|
||||||
if got, err := f.Retrieve(5); err != nil {
|
if got, err := f.Retrieve(numDeleted + 1); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
} else if exp := getChunk(20, 0xaa); !bytes.Equal(got, exp) {
|
} else if exp := getChunk(20, 0xaa); !bytes.Equal(got, exp) {
|
||||||
t.Fatalf("expected %x got %x", exp, got)
|
t.Fatalf("expected %x got %x", exp, got)
|
||||||
}
|
}
|
||||||
if got, err := f.Retrieve(6); err != nil {
|
if got, err := f.Retrieve(numDeleted + 2); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
} else if exp := getChunk(20, 0x99); !bytes.Equal(got, exp) {
|
} else if exp := getChunk(20, 0x99); !bytes.Equal(got, exp) {
|
||||||
t.Fatalf("expected %x got %x", exp, got)
|
t.Fatalf("expected %x got %x", exp, got)
|
||||||
}
|
}
|
||||||
|
|
||||||
// It should error at 0, 1,2,3
|
// It should error at 0, 1,2,3
|
||||||
for i := 0; i < 4; i++ {
|
for i := numDeleted - 1; i > numDeleted-10; i-- {
|
||||||
if _, err := f.Retrieve(uint64(i)); err == nil {
|
if _, err := f.Retrieve(i); err == nil {
|
||||||
t.Fatal("expected err")
|
t.Fatal("expected err")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
checkPresent(4)
|
||||||
|
// Now, let's pretend we have deleted 1M items
|
||||||
|
{
|
||||||
|
// Read the index file
|
||||||
|
p := filepath.Join(os.TempDir(), fmt.Sprintf("%v.ridx", fname))
|
||||||
|
indexFile, err := os.OpenFile(p, os.O_RDWR, 0644)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
indexBuf := make([]byte, 3*indexEntrySize)
|
||||||
|
indexFile.Read(indexBuf)
|
||||||
|
|
||||||
|
// Update the index file, so that we store
|
||||||
|
// [ file = 2, offset = 1M ] at index zero
|
||||||
|
|
||||||
|
tailId := uint32(2) // First file is 2
|
||||||
|
itemOffset := uint32(1000000) // We have removed 1M items
|
||||||
|
zeroIndex := indexEntry{
|
||||||
|
offset: itemOffset,
|
||||||
|
filenum: tailId,
|
||||||
|
}
|
||||||
|
buf := zeroIndex.marshallBinary()
|
||||||
|
// Overwrite index zero
|
||||||
|
copy(indexBuf, buf)
|
||||||
|
indexFile.WriteAt(indexBuf, 0)
|
||||||
|
indexFile.Close()
|
||||||
|
}
|
||||||
|
checkPresent(1000000)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO (?)
|
// TODO (?)
|
||||||
|
|
Loading…
Reference in New Issue