From a6942b9f25bd6c67a5ab9e80ab95aae25a08da6d Mon Sep 17 00:00:00 2001 From: lash Date: Thu, 15 Nov 2018 14:57:03 +0100 Subject: [PATCH] swarm/storage: Batched database migration (#18113) --- swarm/storage/ldbstore.go | 139 ++++++++++++++++++++++++--------- swarm/storage/ldbstore_test.go | 32 ++++++++ 2 files changed, 135 insertions(+), 36 deletions(-) diff --git a/swarm/storage/ldbstore.go b/swarm/storage/ldbstore.go index fbae59facf..bd4f6b9162 100644 --- a/swarm/storage/ldbstore.go +++ b/swarm/storage/ldbstore.go @@ -284,7 +284,7 @@ func getGCIdxValue(index *dpaDBIndex, po uint8, addr Address) []byte { return val } -func parseGCIdxKey(key []byte) (byte, []byte) { +func parseIdxKey(key []byte) (byte, []byte) { return key[0], key[1:] } @@ -589,7 +589,7 @@ func (s *LDBStore) CleanGCIndex() error { it.Seek([]byte{keyGCIdx}) var gcDeletes int for it.Valid() { - rowType, _ := parseGCIdxKey(it.Key()) + rowType, _ := parseIdxKey(it.Key()) if rowType != keyGCIdx { break } @@ -601,47 +601,113 @@ func (s *LDBStore) CleanGCIndex() error { if err := s.db.Write(&batch); err != nil { return err } - - it.Seek([]byte{keyIndex}) - var idx dpaDBIndex - var poPtrs [256]uint64 - for it.Valid() { - rowType, chunkHash := parseGCIdxKey(it.Key()) - if rowType != keyIndex { - break - } - err := decodeIndex(it.Value(), &idx) - if err != nil { - return fmt.Errorf("corrupt index: %v", err) - } - po := s.po(chunkHash) - - // if we don't find the data key, remove the entry - dataKey := getDataKey(idx.Idx, po) - _, err = s.db.Get(dataKey) - if err != nil { - log.Warn("deleting inconsistent index (missing data)", "key", chunkHash) - batch.Delete(it.Key()) - } else { - gcIdxKey := getGCIdxKey(&idx) - gcIdxData := getGCIdxValue(&idx, po, chunkHash) - batch.Put(gcIdxKey, gcIdxData) - log.Trace("clean ok", "key", chunkHash, "gcKey", gcIdxKey, "gcData", gcIdxData) - okEntryCount++ - if idx.Idx > poPtrs[po] { - poPtrs[po] = idx.Idx - } - } - totalEntryCount++ - it.Next() - } + batch.Reset() it.Release() + + // corrected po index pointer values + var poPtrs [256]uint64 + + // set to true if chunk count not on 4096 iteration boundary + var doneIterating bool + + // last key index in previous iteration + lastIdxKey := []byte{keyIndex} + + // counter for debug output + var cleanBatchCount int + + // go through all key index entries + for !doneIterating { + cleanBatchCount++ + var idxs []dpaDBIndex + var chunkHashes [][]byte + var pos []uint8 + it := s.db.NewIterator() + + it.Seek(lastIdxKey) + + // 4096 is just a nice number, don't look for any hidden meaning here... + var i int + for i = 0; i < 4096; i++ { + + // this really shouldn't happen unless database is empty + // but let's keep it to be safe + if !it.Valid() { + doneIterating = true + break + } + + // if it's not keyindex anymore we're done iterating + rowType, chunkHash := parseIdxKey(it.Key()) + if rowType != keyIndex { + doneIterating = true + break + } + + // decode the retrieved index + var idx dpaDBIndex + err := decodeIndex(it.Value(), &idx) + if err != nil { + return fmt.Errorf("corrupt index: %v", err) + } + po := s.po(chunkHash) + lastIdxKey = it.Key() + + // if we don't find the data key, remove the entry + // if we find it, add to the array of new gc indices to create + dataKey := getDataKey(idx.Idx, po) + _, err = s.db.Get(dataKey) + if err != nil { + log.Warn("deleting inconsistent index (missing data)", "key", chunkHash) + batch.Delete(it.Key()) + } else { + idxs = append(idxs, idx) + chunkHashes = append(chunkHashes, chunkHash) + pos = append(pos, po) + okEntryCount++ + if idx.Idx > poPtrs[po] { + poPtrs[po] = idx.Idx + } + } + totalEntryCount++ + it.Next() + } + it.Release() + + // flush the key index corrections + err := s.db.Write(&batch) + if err != nil { + return err + } + batch.Reset() + + // add correct gc indices + for i, okIdx := range idxs { + gcIdxKey := getGCIdxKey(&okIdx) + gcIdxData := getGCIdxValue(&okIdx, pos[i], chunkHashes[i]) + batch.Put(gcIdxKey, gcIdxData) + log.Trace("clean ok", "key", chunkHashes[i], "gcKey", gcIdxKey, "gcData", gcIdxData) + } + + // flush them + err = s.db.Write(&batch) + if err != nil { + return err + } + batch.Reset() + + log.Debug("clean gc index pass", "batch", cleanBatchCount, "checked", i, "kept", len(idxs)) + } + log.Debug("gc cleanup entries", "ok", okEntryCount, "total", totalEntryCount, "batchlen", batch.Len()) + // lastly add updated entry count var entryCount [8]byte binary.BigEndian.PutUint64(entryCount[:], okEntryCount) batch.Put(keyEntryCnt, entryCount[:]) + + // and add the new po index pointers var poKey [2]byte poKey[0] = keyDistanceCnt for i, poPtr := range poPtrs { @@ -655,6 +721,7 @@ func (s *LDBStore) CleanGCIndex() error { } } + // if you made it this far your harddisk has survived. Congratulations return s.db.Write(&batch) } diff --git a/swarm/storage/ldbstore_test.go b/swarm/storage/ldbstore_test.go index 07557980c6..9c10b36290 100644 --- a/swarm/storage/ldbstore_test.go +++ b/swarm/storage/ldbstore_test.go @@ -761,6 +761,38 @@ func TestCleanIndex(t *testing.T) { t.Fatalf("expected sum of bin indices to be 3, was %d", binTotal) } } + + // check that the iterator quits properly + chunks, err = mputRandomChunks(ldb, 4100, 4096) + if err != nil { + t.Fatal(err) + } + + po = ldb.po(chunks[4099].Address()[:]) + dataKey = make([]byte, 10) + dataKey[0] = keyData + dataKey[1] = byte(po) + binary.BigEndian.PutUint64(dataKey[2:], 4099+3) + if _, err := ldb.db.Get(dataKey); err != nil { + t.Fatal(err) + } + if err := ldb.db.Delete(dataKey); err != nil { + t.Fatal(err) + } + + if err := ldb.CleanGCIndex(); err != nil { + t.Fatal(err) + } + + // entrycount should now be one less of added chunks + c, err = ldb.db.Get(keyEntryCnt) + if err != nil { + t.Fatalf("expected gc 2 idx to be present: %v", idxKey) + } + entryCount = binary.BigEndian.Uint64(c) + if entryCount != 4099+2 { + t.Fatalf("expected entrycnt to be 2, was %d", c) + } } func waitGc(ctx context.Context, ldb *LDBStore) {