swarm/storage: fix HashExplore concurrency bug ethersphere#1211 (#19028)

* swarm/storage: fix HashExplore concurrency bug ethersphere#1211

*  swarm/storage: lock as value not pointer

* swarm/storage: wait for  to complete

* swarm/storage: fix linter problems

* swarm/storage: append to nil slice

(cherry picked from commit 3d22a46c94)
This commit is contained in:
holisticode 2019-02-12 18:17:44 -05:00 committed by Rafael Matias
parent 799fe99537
commit 7a333e4104
No known key found for this signature in database
GPG Key ID: 1BC39532FB4A2DBD
1 changed files with 17 additions and 9 deletions

View File

@ -20,6 +20,7 @@ import (
"context" "context"
"io" "io"
"sort" "sort"
"sync"
) )
/* /*
@ -101,38 +102,45 @@ func (f *FileStore) HashSize() int {
// GetAllReferences is a public API. This endpoint returns all chunk hashes (only) for a given file // GetAllReferences is a public API. This endpoint returns all chunk hashes (only) for a given file
func (f *FileStore) GetAllReferences(ctx context.Context, data io.Reader, toEncrypt bool) (addrs AddressCollection, err error) { func (f *FileStore) GetAllReferences(ctx context.Context, data io.Reader, toEncrypt bool) (addrs AddressCollection, err error) {
// create a special kind of putter, which only will store the references // create a special kind of putter, which only will store the references
putter := &HashExplorer{ putter := &hashExplorer{
hasherStore: NewHasherStore(f.ChunkStore, f.hashFunc, toEncrypt), hasherStore: NewHasherStore(f.ChunkStore, f.hashFunc, toEncrypt),
References: make([]Reference, 0),
} }
// do the actual splitting anyway, no way around it // do the actual splitting anyway, no way around it
_, _, err = PyramidSplit(ctx, data, putter, putter) _, wait, err := PyramidSplit(ctx, data, putter, putter)
if err != nil {
return nil, err
}
// wait for splitting to be complete and all chunks processed
err = wait(ctx)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// collect all references // collect all references
addrs = NewAddressCollection(0) addrs = NewAddressCollection(0)
for _, ref := range putter.References { for _, ref := range putter.references {
addrs = append(addrs, Address(ref)) addrs = append(addrs, Address(ref))
} }
sort.Sort(addrs) sort.Sort(addrs)
return addrs, nil return addrs, nil
} }
// HashExplorer is a special kind of putter which will only store chunk references // hashExplorer is a special kind of putter which will only store chunk references
type HashExplorer struct { type hashExplorer struct {
*hasherStore *hasherStore
References []Reference references []Reference
lock sync.Mutex
} }
// HashExplorer's Put will add just the chunk hashes to its `References` // HashExplorer's Put will add just the chunk hashes to its `References`
func (he *HashExplorer) Put(ctx context.Context, chunkData ChunkData) (Reference, error) { func (he *hashExplorer) Put(ctx context.Context, chunkData ChunkData) (Reference, error) {
// Need to do the actual Put, which returns the references // Need to do the actual Put, which returns the references
ref, err := he.hasherStore.Put(ctx, chunkData) ref, err := he.hasherStore.Put(ctx, chunkData)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// internally store the reference // internally store the reference
he.References = append(he.References, ref) he.lock.Lock()
he.references = append(he.references, ref)
he.lock.Unlock()
return ref, nil return ref, nil
} }