freezer: enforce readonly in newFreezer

This commit is contained in:
Sina Mahmoodi 2021-12-16 11:23:05 +03:30
parent 169e10092c
commit 41f42bfff5
2 changed files with 73 additions and 2 deletions

View File

@ -144,8 +144,15 @@ func newFreezer(datadir string, namespace string, readonly bool, maxTableSize ui
freezer.tables[name] = table freezer.tables[name] = table
} }
// Truncate all tables to common length. if freezer.readonly {
if err := freezer.repair(); err != nil { // In readonly mode only validate, don't truncate.
// validate also sets `freezer.frozen`.
err = freezer.validate()
} else {
// Truncate all tables to common length.
err = freezer.repair()
}
if err != nil {
for _, table := range freezer.tables { for _, table := range freezer.tables {
table.Close() table.Close()
} }
@ -308,6 +315,32 @@ func (f *freezer) Sync() error {
return nil return nil
} }
// validate checks that every table has the same length.
// Used instead of `repair` in readonly mode.
func (f *freezer) validate() error {
if len(f.tables) == 0 {
return nil
}
var length uint64
var name string
// Hack to get length of any table
for k, table := range f.tables {
length = atomic.LoadUint64(&table.items)
name = k
break
}
log.Info("validate", "name", name, "length", length)
// Now check every table against that length
for k, table := range f.tables {
items := atomic.LoadUint64(&table.items)
if length != items {
return fmt.Errorf("freezer tables %s and %s have differing lengths: %d != %d", k, name, items, length)
}
}
atomic.StoreUint64(&f.frozen, length)
return nil
}
// repair truncates all data tables to the same length. // repair truncates all data tables to the same length.
func (f *freezer) repair() error { func (f *freezer) repair() error {
min := uint64(math.MaxUint64) min := uint64(math.MaxUint64)

View File

@ -253,6 +253,44 @@ func TestFreezerConcurrentModifyTruncate(t *testing.T) {
} }
} }
func TestFreezerReadonlyValidate(t *testing.T) {
tables := map[string]bool{"a": true, "b": true}
dir, err := ioutil.TempDir("", "freezer")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(dir)
// Open non-readonly freezer and fill individual tables
// with different amount of data.
f, err := newFreezer(dir, "", false, 2049, tables)
if err != nil {
t.Fatal("can't open freezer", err)
}
var item = make([]byte, 1024)
aBatch := f.tables["a"].newBatch()
require.NoError(t, aBatch.AppendRaw(0, item))
require.NoError(t, aBatch.AppendRaw(1, item))
require.NoError(t, aBatch.AppendRaw(2, item))
require.NoError(t, aBatch.commit())
bBatch := f.tables["b"].newBatch()
require.NoError(t, bBatch.AppendRaw(0, item))
require.NoError(t, bBatch.commit())
if f.tables["a"].items != 3 {
t.Fatalf("unexpected number of items in table")
}
if f.tables["b"].items != 1 {
t.Fatalf("unexpected number of items in table")
}
require.NoError(t, f.Close())
// Re-openening as readonly should fail when validating
// table lengths.
f, err = newFreezer(dir, "", true, 2049, tables)
if err == nil {
t.Fatal("readonly freezer should fail with differing table lengths")
}
}
func newFreezerForTesting(t *testing.T, tables map[string]bool) (*freezer, string) { func newFreezerForTesting(t *testing.T, tables map[string]bool) (*freezer, string) {
t.Helper() t.Helper()