swarm/storage: cleanup task - remove bigger chunks (#17424)

This commit is contained in:
Anton Evangelatov 2018-08-20 14:10:30 +02:00 committed by Péter Szilágyi
parent c4078fc805
commit a8aa89accb
1 changed files with 45 additions and 22 deletions

View File

@ -36,6 +36,7 @@ import (
"github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/log" "github.com/ethereum/go-ethereum/swarm/log"
"github.com/ethereum/go-ethereum/swarm/storage/mock" "github.com/ethereum/go-ethereum/swarm/storage/mock"
"github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb"
@ -384,14 +385,13 @@ func (s *LDBStore) Import(in io.Reader) (int64, error) {
} }
func (s *LDBStore) Cleanup() { func (s *LDBStore) Cleanup() {
//Iterates over the database and checks that there are no faulty chunks //Iterates over the database and checks that there are no chunks bigger than 4kb
var errorsFound, removed, total int
it := s.db.NewIterator() it := s.db.NewIterator()
startPosition := []byte{keyIndex} defer it.Release()
it.Seek(startPosition) for ok := it.Seek([]byte{keyIndex}); ok; ok = it.Next() {
var key []byte key := it.Key()
var errorsFound, total int
for it.Valid() {
key = it.Key()
if (key == nil) || (key[0] != keyIndex) { if (key == nil) || (key[0] != keyIndex) {
break break
} }
@ -399,27 +399,50 @@ func (s *LDBStore) Cleanup() {
var index dpaDBIndex var index dpaDBIndex
err := decodeIndex(it.Value(), &index) err := decodeIndex(it.Value(), &index)
if err != nil { if err != nil {
it.Next() log.Warn("Cannot decode")
errorsFound++
continue continue
} }
data, err := s.db.Get(getDataKey(index.Idx, s.po(Address(key[1:])))) hash := key[1:]
po := s.po(hash)
datakey := getDataKey(index.Idx, po)
data, err := s.db.Get(datakey)
if err != nil { if err != nil {
log.Warn(fmt.Sprintf("Chunk %x found but could not be accessed: %v", key[:], err)) found := false
s.delete(index.Idx, getIndexKey(key[1:]), s.po(Address(key[1:])))
errorsFound++ // highest possible proximity is 255
} else { for po = 1; po <= 255; po++ {
hasher := s.hashfunc() datakey = getDataKey(index.Idx, po)
hasher.Write(data[32:]) data, err = s.db.Get(datakey)
hash := hasher.Sum(nil) if err == nil {
if !bytes.Equal(hash, key[1:]) { found = true
log.Warn(fmt.Sprintf("Found invalid chunk. Hash mismatch. hash=%x, key=%x", hash, key[:])) break
s.delete(index.Idx, getIndexKey(key[1:]), s.po(Address(key[1:]))) }
}
if !found {
log.Warn(fmt.Sprintf("Chunk %x found but count not be accessed with any po", key[:]))
errorsFound++
continue
} }
} }
it.Next()
c := &Chunk{}
ck := data[:32]
decodeData(data, c)
cs := int64(binary.LittleEndian.Uint64(c.SData[:8]))
log.Trace("chunk", "key", fmt.Sprintf("%x", key[:]), "ck", fmt.Sprintf("%x", ck), "dkey", fmt.Sprintf("%x", datakey), "dataidx", index.Idx, "po", po, "len data", len(data), "len sdata", len(c.SData), "size", cs)
if len(c.SData) > chunk.DefaultSize+8 {
log.Warn("chunk for cleanup", "key", fmt.Sprintf("%x", key[:]), "ck", fmt.Sprintf("%x", ck), "dkey", fmt.Sprintf("%x", datakey), "dataidx", index.Idx, "po", po, "len data", len(data), "len sdata", len(c.SData), "size", cs)
s.delete(index.Idx, getIndexKey(key[1:]), po)
removed++
errorsFound++
}
} }
it.Release()
log.Warn(fmt.Sprintf("Found %v errors out of %v entries", errorsFound, total)) log.Warn(fmt.Sprintf("Found %v errors out of %v entries. Removed %v chunks.", errorsFound, total, removed))
} }
func (s *LDBStore) ReIndex() { func (s *LDBStore) ReIndex() {